Saturday, February 11, 2017

node.js clustered express app with socket.io chat with redis-server boost

var express = require('express'),
        cluster = require('cluster'),
        net = require('net'),
        socketio = require('socket.io'),
        socket_redis = require('socket.io-redis');

var port = 80, num_processes = require('os').cpus().length;

if (cluster.isMaster) {
    // This stores our workers. We need to keep them to be able to reference
    // them based on source IP address. It's also useful for auto-restart,
    // for example.
    var workers = [];

    // Helper function for spawning worker at index 'i'.
    var spawn = function (i) {
        workers[i] = cluster.fork();

        // Optional: Restart worker on exit
        workers[i].on('exit', function (code, signal) {
            console.log('respawning worker', i);
            spawn(i);
        });
    };

    // Spawn workers.
    for (var i = 0; i < num_processes; i++) {
        spawn(i);
    }

    // Helper function for getting a worker index based on IP address.
    // This is a hot path so it should be really fast. The way it works
    // is by converting the IP address to a number by removing non numeric
    // characters, then compressing it to the number of slots we have.
    //
    // Compared against "real" hashing (from the sticky-session code) and
    // "real" IP number conversion, this function is on par in terms of
    // worker index distribution only much faster.
    var worker_index = function (ip, len) {
        var s = '';
        for (var i = 0, _len = ip.length; i < _len; i++) {
            if (!isNaN(ip[i])) {
                s += ip[i];
            }
        }
        return Number(s) % len;
    };

    // Create the outside facing server listening on our port.
    var server = net.createServer({pauseOnConnect: true}, function (connection) {
        // We received a connection and need to pass it to the appropriate
        // worker. Get the worker for this connection's source IP and pass
        // it the connection.
        var worker = workers[worker_index(connection.remoteAddress, num_processes)];
        worker.send('sticky-session:connection', connection);
    }).listen(port);

} else {
    /* don't use yet
     * var app = require('express')();
     var server = require('http').Server(app);
     var io = require('socket.io')(server);
     var redis = require('socket.io-redis');
     var http = app.listen(80, function () {
     console.log('ProcId : ' + process.pid + ' is listening to all incoming requests...');
     });*/
    // Note we don't use a port here because the master listens on it for us.
    var app = new express();

    // Here you might use middleware, attach routes, etc.
    app.use('/assets', express.static(__dirname + '/public'));
    app.get('/', function (req, res) {
        res.sendFile(__dirname + '/index.html');
    });

    // Don't expose our internal server to the outside.
    var server = app.listen(),
            io = socketio(server);

    // Tell Socket.IO to use the redis adapter. By default, the redis
    // server is assumed to be on localhost:6379. You don't have to
    // specify them explicitly unless you want to change them.
    io.adapter(socket_redis({host: 'localhost', port: 6379}));

    var online_members = 0;
    var chat_socket = io.of('/chat')
            .on('connection', function (client_socket) {
                var log = process.pid + ', ' + client_socket.id + ' : Connected';
                console.log(log);

                client_socket.on('login', function (data) {
                    online_members = online_members + 1;

                    client_socket.emit('ilogged', {id: client_socket.id, pid: process.pid, online_members: online_members});
                    client_socket.broadcast.emit('ulogged', {id: client_socket.id, pid: process.pid, online_members: online_members});

                    log = process.pid + ', ' + client_socket.id + ' : Logged';
                    console.log(log);
                });

                client_socket.on('message', function (data) {
                    var msg = process.pid + " : " + data;
                    client_socket.broadcast.emit('message', msg);

                    log = process.pid + ' : ' + data;
                    console.log(log);
                });

                client_socket.on('disconnect', function () {
                    online_members = online_members - 1;
                    client_socket.broadcast.emit('uclosed', {id: client_socket.id, pid: process.pid, online_members: online_members});

                    log = process.pid + ', ' + client_socket.id + ' : Disconnected';
                    console.log(log);
                });
            });

    console.log('ProcId : ' + process.pid + ' is listening to all incoming requests...');

    // Listen to messages sent from the master. Ignore everything else.
    process.on('message', function (message, connection) {
        if (message !== 'sticky-session:connection') {
            return;
        }

        // Emulate a connection event on the server by emitting the
        // event with the connection the master sent us.
        server.emit('connection', connection);

        connection.resume();
    });
}



scripts



var chat_client_start = function () { var logged = null; var chat_client = io.connect('http://BOR-PC/chat'); chat_client.on('connect', function () { $('#messages').append($('
  • ').text('Logging...')); if (logged === null) { chat_client.emit("login", "I am logging..."); } }); chat_client.on('ilogged', function (data) { logged = data; var txt = "I am Logged: " + data.pid + ', ' + data.id + ', c:' + data.online_members; $('#messages').append($('


  • ').text(txt)); }); chat_client.on('ulogged', function (data) { var txt = "User Logged: " + data.pid + ', ' + data.id + ', c:' + data.online_members; $('#messages').append($('
  • ').text(txt)); }); chat_client.on('uclosed', function (data) { var txt = "User Closed: " + data.pid + ', ' + data.id + ', c:' + data.online_members; $('#messages').append($('
  • ').text(txt)); }); chat_client.on('message', function (msg) { $('#messages').append($('
  • ').text(msg)); }); chat_client.on('disconnect', function () { logged = null; // chat_client.disconnect(); // chat_client.removeAllListeners('connect'); $('#messages').append($('
  • ').text('Server disconnected.')); $('#messages').append($('
  • ').text('Reconnecting...')); setTimeout(function () { chat_client_start(); }, 5000); }); $('form').submit(function () { var msg_txt = $('#msg_txt').val(); $('#messages').append($('
  • ').text(msg_txt)); chat_client.emit('message', msg_txt); $('#msg_txt').val(''); return false; }); }; chat_client_start();
  •  

    No comments: