Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

vubsub

Package Overview
Dependencies
Maintainers
1
Versions
21
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

vubsub - npm Package Compare versions

Comparing version 1.1.6 to 1.1.7

91

lib/channel.js

@@ -11,36 +11,6 @@ var util = require('util')

self.db = client.db;
self.collection = Channel.collection(self.db, client.namespace).then(function(collection) {
var cursor = collection.find().sort({ $natural: -1 }).limit(1);
return q.ninvoke(cursor, 'nextObject').then(function(doc) {
if (doc) {
return doc;
} else {
var deferred = q.defer();
collection.insert({ init: true }, { w: 'majority' }, function(err, docs) {
if (err || !docs || !docs.length) {
deferred.reject('failed to insert the first doc');
} else {
deferred.resolve(docs[0]);
}
});
return deferred.promise;
}
}).then(function(doc) {
// if we've come here, then:
// - the collection exists
// - it had at least a document
// - and we know the _id to the most recent one
self.lastMessage = lastMessage ? mongo.ObjectID(lastMessage) : doc._id;
self._query(self.lastMessage, function(err, data) {
self._handleNext(err, data);
});
self.emit('ready', collection);
return collection;
});
}).fail(function(err) {
console.error('failed to initialize channel', name, client.namespace, err);
self.leave();
self.emit('error', err);
});
if (lastMessage) {
self.lastMessage = mongo.ObjectID(lastMessage.toString());
}
self._init();
self.db.collection('channels').insert({ client: self.client.id, channel: self.name, ns: self.client.namespace, ping: new Date() }, { w: 0 });

@@ -77,6 +47,43 @@ }

}
Channel.prototype._query = function(lastMessage, cb) {
Channel.prototype._init = function() {
var self = this;
self.collection = Channel.collection(self.db, self.client.namespace).then(function(collection) {
var cursor = collection.find().sort({ $natural: -1 }).limit(1);
return q.ninvoke(cursor, 'nextObject').then(function(doc) {
if (doc) {
return doc;
} else {
var deferred = q.defer();
collection.insert({ init: true }, { w: 1 }, function(err, docs) {
if (err || !docs || !docs.length) {
console.error('failed to insert the first doc', err, docs, collection);
deferred.reject(err || 'first_doc');
} else {
deferred.resolve(docs[0]);
}
});
return deferred.promise;
}
}).then(function(doc) {
// if we've come here, then:
// - the collection exists
// - it had at least a document
// - and we know the _id to the most recent one
self.lastMessage = self.lastMessage || doc._id;
self._query(function(err, data) {
self._handleNext(err, data);
});
self.emit('ready', collection);
return collection;
});
}).fail(function(err) {
console.error('failed to initialize channel', self.name, self.client.namespace, err);
self.leave();
self.emit('error', err);
});
}
Channel.prototype._query = function(cb) {
var self = this;
var query = {
_id: { $gt: lastMessage },
_id: { $gt: self.lastMessage },
channel: self.name,

@@ -99,13 +106,13 @@ from: { $ne: self.client.id }

console.error('Channel._handleNext', self.name, err);
self.leave();
self.emit('close', { reason: "error" });
console.warn('Channel._handleNext', self.name, 'reinitializing');
self._init();
} else if (!data) {
if (self.cursor) {
console.warn('Channel._handleNext', self.name, "no data");
self._query(self.lastMessage, function(err, data) {
console.warn('Channel._handleNext', self.name, 'no data');
self._query(function(err, data) {
self._handleNext(err, data);
});
} else {
console.warn('Channel._handleNext', self.name, "closed");
self.emit('close', { reason: "closed" });
console.warn('Channel._handleNext', self.name, 'closed');
self.emit('close', { reason: 'closed' });
}

@@ -112,0 +119,0 @@ } else {

{
"name": "vubsub",
"version": "1.1.6",
"version": "1.1.7",
"description": "Pub/Sub for Node.js and MongoDB",

@@ -5,0 +5,0 @@ "homepage": "https://github.com/vivocha/vubsub",

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc