Comparing version 1.1.6 to 1.1.7
@@ -11,36 +11,6 @@ var util = require('util') | ||
self.db = client.db; | ||
self.collection = Channel.collection(self.db, client.namespace).then(function(collection) { | ||
var cursor = collection.find().sort({ $natural: -1 }).limit(1); | ||
return q.ninvoke(cursor, 'nextObject').then(function(doc) { | ||
if (doc) { | ||
return doc; | ||
} else { | ||
var deferred = q.defer(); | ||
collection.insert({ init: true }, { w: 'majority' }, function(err, docs) { | ||
if (err || !docs || !docs.length) { | ||
deferred.reject('failed to insert the first doc'); | ||
} else { | ||
deferred.resolve(docs[0]); | ||
} | ||
}); | ||
return deferred.promise; | ||
} | ||
}).then(function(doc) { | ||
// if we've come here, then: | ||
// - the collection exists | ||
// - it had at least a document | ||
// - and we know the _id to the most recent one | ||
self.lastMessage = lastMessage ? mongo.ObjectID(lastMessage) : doc._id; | ||
self._query(self.lastMessage, function(err, data) { | ||
self._handleNext(err, data); | ||
}); | ||
self.emit('ready', collection); | ||
return collection; | ||
}); | ||
}).fail(function(err) { | ||
console.error('failed to initialize channel', name, client.namespace, err); | ||
self.leave(); | ||
self.emit('error', err); | ||
}); | ||
if (lastMessage) { | ||
self.lastMessage = mongo.ObjectID(lastMessage.toString()); | ||
} | ||
self._init(); | ||
self.db.collection('channels').insert({ client: self.client.id, channel: self.name, ns: self.client.namespace, ping: new Date() }, { w: 0 }); | ||
@@ -77,6 +47,43 @@ } | ||
} | ||
Channel.prototype._query = function(lastMessage, cb) { | ||
Channel.prototype._init = function() { | ||
var self = this; | ||
self.collection = Channel.collection(self.db, self.client.namespace).then(function(collection) { | ||
var cursor = collection.find().sort({ $natural: -1 }).limit(1); | ||
return q.ninvoke(cursor, 'nextObject').then(function(doc) { | ||
if (doc) { | ||
return doc; | ||
} else { | ||
var deferred = q.defer(); | ||
collection.insert({ init: true }, { w: 1 }, function(err, docs) { | ||
if (err || !docs || !docs.length) { | ||
console.error('failed to insert the first doc', err, docs, collection); | ||
deferred.reject(err || 'first_doc'); | ||
} else { | ||
deferred.resolve(docs[0]); | ||
} | ||
}); | ||
return deferred.promise; | ||
} | ||
}).then(function(doc) { | ||
// if we've come here, then: | ||
// - the collection exists | ||
// - it had at least a document | ||
// - and we know the _id to the most recent one | ||
self.lastMessage = self.lastMessage || doc._id; | ||
self._query(function(err, data) { | ||
self._handleNext(err, data); | ||
}); | ||
self.emit('ready', collection); | ||
return collection; | ||
}); | ||
}).fail(function(err) { | ||
console.error('failed to initialize channel', self.name, self.client.namespace, err); | ||
self.leave(); | ||
self.emit('error', err); | ||
}); | ||
} | ||
Channel.prototype._query = function(cb) { | ||
var self = this; | ||
var query = { | ||
_id: { $gt: lastMessage }, | ||
_id: { $gt: self.lastMessage }, | ||
channel: self.name, | ||
@@ -99,13 +106,13 @@ from: { $ne: self.client.id } | ||
console.error('Channel._handleNext', self.name, err); | ||
self.leave(); | ||
self.emit('close', { reason: "error" }); | ||
console.warn('Channel._handleNext', self.name, 'reinitializing'); | ||
self._init(); | ||
} else if (!data) { | ||
if (self.cursor) { | ||
console.warn('Channel._handleNext', self.name, "no data"); | ||
self._query(self.lastMessage, function(err, data) { | ||
console.warn('Channel._handleNext', self.name, 'no data'); | ||
self._query(function(err, data) { | ||
self._handleNext(err, data); | ||
}); | ||
} else { | ||
console.warn('Channel._handleNext', self.name, "closed"); | ||
self.emit('close', { reason: "closed" }); | ||
console.warn('Channel._handleNext', self.name, 'closed'); | ||
self.emit('close', { reason: 'closed' }); | ||
} | ||
@@ -112,0 +119,0 @@ } else { |
{ | ||
"name": "vubsub", | ||
"version": "1.1.6", | ||
"version": "1.1.7", | ||
"description": "Pub/Sub for Node.js and MongoDB", | ||
@@ -5,0 +5,0 @@ "homepage": "https://github.com/vivocha/vubsub", |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
12052
250