Skip to content

Commit

Permalink
Get all clients in a room across all nodes (#109)
Browse files Browse the repository at this point in the history
  • Loading branch information
Fernando Godino authored and darrachequesne committed Sep 24, 2016
1 parent 6a78282 commit 679e73c
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 0 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ The following options are allowed:
- `subEvent`: optional, the redis client event name to subscribe to (`message`)
- `pubClient`: optional, the redis client to publish events on
- `subClient`: optional, the redis client to subscribe to events on
- `clientsTimeout`: optional, after this timeout the adapter will stop waiting from responses to `clients` request (`1000ms`)

If you decide to supply `pubClient` and `subClient`, make sure you use
[node_redis](https://github.com/mranney/node_redis) as a client or one
Expand All @@ -55,6 +56,11 @@ that a regular `Adapter` does not
- `prefix`
- `pubClient`
- `subClient`
- `clientsTimeout`

### RedisAdapter#clients(rooms:Array, fn:Function)

Returns the list of client IDs connected to `rooms` across all nodes. See [Namespace#clients(fn:Function)](https://github.com/socketio/socket.io#namespaceclientsfnfunction)

## Client error handling

Expand Down
133 changes: 133 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ function adapter(uri, opts){
// opts
var pub = opts.pubClient;
var sub = opts.subClient;

var prefix = opts.key || 'socket.io';
var subEvent = opts.subEvent || 'message';
var clientsTimeout = opts.clientsTimeout || 1000;

// init clients if needed
function createClient(redis_opts) {
Expand All @@ -51,6 +53,8 @@ function adapter(uri, opts){

if (!pub) pub = createClient();
if (!sub) sub = createClient({ return_buffers: true });

var subJson = sub.duplicate({ return_buffers: false });

// this server's key
var uid = uid2(6);
Expand All @@ -67,7 +71,11 @@ function adapter(uri, opts){

this.uid = uid;
this.prefix = prefix;
this.clientsTimeout = clientsTimeout;

this.channel = prefix + '#' + nsp.name + '#';
this.syncChannel = prefix + '-sync#request#' + this.nsp.name + '#';

if (String.prototype.startsWith) {
this.channelMatches = function (messageChannel, subscribedChannel) {
return messageChannel.startsWith(subscribedChannel);
Expand All @@ -81,10 +89,17 @@ function adapter(uri, opts){
this.subClient = sub;

var self = this;

sub.subscribe(this.channel, function(err){
if (err) self.emit('error', err);
});

subJson.subscribe(this.syncChannel, function(err){
if (err) self.emit('error', err);
});

sub.on(subEvent, this.onmessage.bind(this));
subJson.on(subEvent, this.onclients.bind(this));
}

/**
Expand Down Expand Up @@ -123,6 +138,43 @@ function adapter(uri, opts){
this.broadcast.apply(this, args);
};

/**
* Called with a subscription message on sync
*
* @api private
*/

Redis.prototype.onclients = function(channel, msg){

var self = this;

if (!self.channelMatches(channel.toString(), self.syncChannel)) {
return debug('ignore different channel');
}

try {
var decoded = JSON.parse(msg);
} catch(err){
self.emit('error', err);
return;
}

Adapter.prototype.clients.call(self, decoded.rooms, function(err, clients){
if(err){
self.emit('error', err);
return;
}

var responseChn = prefix + '-sync#response#' + decoded.transaction;
var response = JSON.stringify({
clients : clients
});

pub.publish(responseChn, response);
});

};

/**
* Broadcasts a packet.
*
Expand Down Expand Up @@ -236,10 +288,91 @@ function adapter(uri, opts){
});
};

/**
* Gets a list of clients by sid.
*
* @param {Array} explicit set of rooms to check.
* @api public
*/

Redis.prototype.clients = function(rooms, fn){
if ('function' == typeof rooms){
fn = rooms;
rooms = null;
}

rooms = rooms || [];

var self = this;

var transaction = uid2(6);
var responseChn = prefix + '-sync#response#' + transaction;

pub.send_command('pubsub', ['numsub', self.syncChannel], function(err, numsub){
if (err) {
self.emit('error', err);
if (fn) fn(err);
return;
}

numsub = numsub[1];

var msg_count = 0;
var clients = {};

subJson.on('subscribe', function subscribed(channel, count) {

var request = JSON.stringify({
transaction : transaction,
rooms : rooms
});

/*If there is no response for 1 second, return result;*/
var timeout = setTimeout(function() {
if (fn) process.nextTick(fn.bind(null, null, Object.keys(clients)));
}, self.clientsTimeout);

subJson.on(subEvent, function onEvent(channel, msg) {

if (!self.channelMatches(channel.toString(), responseChn)) {
return debug('ignore different channel');
}

var response = JSON.parse(msg);

//Ignore if response does not contain 'clients' key
if(!response.clients || !Array.isArray(response.clients)) return;

for(var i = 0; i < response.clients.length; i++){
clients[response.clients[i]] = true;
}

msg_count++;
if(msg_count == numsub){
clearTimeout(timeout);
subJson.unsubscribe(responseChn);
subJson.removeListener('subscribe', subscribed);
subJson.removeListener(subEvent, onEvent);

if (fn) process.nextTick(fn.bind(null, null, Object.keys(clients)));
}
});

pub.publish(self.syncChannel, request);

});

subJson.subscribe(responseChn);

});

};

Redis.uid = uid;
Redis.pubClient = pub;
Redis.subClient = sub;
Redis.prefix = prefix;
Redis.clientsTimeout = clientsTimeout;

return Redis;

Expand Down
44 changes: 44 additions & 0 deletions test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,50 @@ describe('socket.io-redis', function(){
});
});

it('returns clients in the same room', function(done){
create(function(server1, client1){
create(function(server2, client2){
create(function(server3, client3){

var ready = 0;

server1.on('connection', function(c1){
c1.join('woot');
ready++;;
if(ready === 3){
test();
}
});

server2.on('connection', function(c1){
c1.join('woot');
ready++;;
if(ready === 3){
test();
}
});

server3.on('connection', function(c3){
ready++;;
if(ready === 3){
test();
}
});

function test(){
setTimeout(function(){
server1.adapter.clients(['woot'], function(err, clients){
expect(clients.length).to.eql(2);
done();
})
}, 100);
}

});
});
});
});

// create a pair of socket.io server+client
function create(nsp, fn){
var srv = http();
Expand Down

2 comments on commit 679e73c

@wolczek
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

var subJson = sub.duplicate({ return_buffers: false });

This doesn't work with existing redis-sentinel adapters for nodejs:

  • redis-sentinel
  • redis-sentinel-client

TypeError: sub.duplicate is not a function

@darrachequesne
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redis-sentinel does not seem to be maintained anymore: https://github.com/ortoo/node-redis-sentinel

I recommend using ioredis rather than this library. It has inbuilt sentinel support and is likely much more robust

That being said, we could maybe add a fallback, in case duplicate method does not exist. PR is welcome! 👼

Please sign in to comment.