Implementing websocket server with Nodejs express clusters

If you have an existing node.js server with cluster being utilized for spawning workers, you can have one instance of the websocket running on each worker. In this article, we will see an implementation of the same using the ws package.

The code modifications are not that many, as you will be able to see in the example below.

'use strict';

var express = require('express');
var cluster = require('cluster');
const {WebSocketServer} = require('ws');

var app = express();
app.use(express.json());

var workers = {};
var count = require('os').cpus().length;

function spawn() {
  var worker = cluster.fork();
  logger.debug("Worker " + worker.process.pid + " started!");
  workers[worker.process.pid] = worker;
  return worker;
}

if (cluster.isMaster) {
  logger.debug("Number of cores is " + count.toString())
  for (var i = 0; i < count; i++) {
    spawn();
  }
  cluster.on('death', function (worker) {
    logger.debug('worker ' + worker.process.pid + ' died. spawning a new process...');
    delete workers[worker.process.pid];
    spawn();
  });
} else {
  var server = app.listen(process.env.PORT || 8081, function () {
    var host = server.address().address
    var port = server.address().port
    logger.debug("Example app listening at http://%s:%s", host, port)
  })

  const wss = new WebSocketServer({ server });
  wss.on('connection', function connection(ws, req) {
    ws.isAlive = true;
    
    //If the connection responds to ping with pong, set its isAlive state to true 
    ws.on('pong', () => {
      ws.isAlive = true;
    });

    ws.on('message', async function message(data) {
      console.log('received: %s', data);
      try{
        data = JSON.parse(data);
        //Do something with the data
      } catch(e){

    ws.send(JSON.stringify({"status":"Error","message":"Invalid input"}));
      }
    });


    ws.send(JSON.stringify({"status":"Success","message":"Received connection"}));
  });
  
  //Ping each connection every 10 seconds
  setInterval(() => {
    wss.clients.forEach((ws) => {
        
        if (!ws.isAlive) return ws.terminate();
        
        ws.isAlive = false;
        ws.ping();
    });
  }, 10000);
}

As you can see, each worker has a separate WebSocketServer instance. All the event handling is done within the app.listen() function.

Note: Each instance of the WebSocketServer runs a 10 second timer after which it is sending ping messages to all the connections. The connections are required to respond with a pong message. If they don’t, the connection is terminated in the next cycle. Thus, it would take a maximum of 20 seconds to close a dead connection. This can be increased or reduced by changing the timer interval.


I hope you liked this article. For more articles on IoT in general, check out https://iotespresso.com/

Leave a comment

Your email address will not be published. Required fields are marked *