Comparing version 0.4.16 to 0.4.17
@@ -22,3 +22,4 @@ // Generated by CoffeeScript 1.7.1 | ||
DurableChannel.__super__.constructor.apply(this, arguments); | ||
this.name = options.name, this.timeoutMonitorFrequency = options.timeoutMonitorFrequency; | ||
this.ending = false; | ||
this.name = options.name, this.timeoutMonitorFrequency = options.timeoutMonitorFrequency, this.transport = options.transport; | ||
if (this.name == null) { | ||
@@ -32,8 +33,17 @@ throw new Error("Durable channels cannot be anonymous"); | ||
this.events = new EventChannel; | ||
this.adapter = new Redis.Adapter({ | ||
events: new EventChannel, | ||
host: options.redis.host, | ||
port: options.redis.port | ||
this.isTransportShared = true; | ||
if (this.transport == null) { | ||
this.isTransportShared = false; | ||
this.transport = new Transport({ | ||
host: options.redis.host, | ||
port: options.redis.port | ||
}); | ||
this.transport.events.forward(this.events); | ||
} | ||
this.queue = new RemoteQueue({ | ||
name: "" + this.name + ".queue", | ||
transport: this.transport | ||
}); | ||
this.adapter.events.on("ready", (function(_this) { | ||
this.monitorTimeouts(); | ||
setImmediate((function(_this) { | ||
return function() { | ||
@@ -45,15 +55,2 @@ return _this.fire({ | ||
})(this)); | ||
this.transport = new Transport({ | ||
host: options.redis.host, | ||
port: options.redis.port | ||
}); | ||
this.transport.events.forward(this.events); | ||
this.store = null; | ||
this.queue = new RemoteQueue({ | ||
name: "" + this.name + ".queue", | ||
transport: this.transport | ||
}); | ||
this.destinationStores = {}; | ||
this.destinationQueues = {}; | ||
this.monitorTimeouts(); | ||
} | ||
@@ -74,38 +71,44 @@ | ||
DurableChannel.prototype.getStore = function() { | ||
if (this.store != null) { | ||
return this.store; | ||
} | ||
return this.events.serially((function(_this) { | ||
return function(go) { | ||
go(function() { | ||
return _this.adapter.collection("" + _this.name + ".messages"); | ||
DurableChannel.prototype.getMessage = function(channel, id) { | ||
return this.events.source((function(_this) { | ||
return function(events) { | ||
return _this.transport._acquire(function(client) { | ||
return client.hget("" + channel + ".messages", id, function(err, data) { | ||
_this.transport._release(client); | ||
return events.callback(err, data != null ? JSON.parse(data) : null); | ||
}); | ||
}); | ||
return go(function(store) { | ||
_this.store = store; | ||
return _this.store; | ||
}; | ||
})(this)); | ||
}; | ||
DurableChannel.prototype.putMessage = function(channel, id, message) { | ||
return this.events.source((function(_this) { | ||
return function(events) { | ||
return _this.transport._acquire(function(client) { | ||
return client.hset("" + channel + ".messages", id, JSON.stringify(message), function(err, data) { | ||
_this.transport._release(client); | ||
return events.callback(err, data); | ||
}); | ||
}); | ||
}; | ||
})(this))(); | ||
})(this)); | ||
}; | ||
DurableChannel.prototype.getDestinationStore = function(name) { | ||
if (this.destinationStores[name] != null) { | ||
return this.destinationStores[name]; | ||
} | ||
return this.events.serially((function(_this) { | ||
return function(go) { | ||
go(function() { | ||
return _this.adapter.collection("" + name + ".messages"); | ||
DurableChannel.prototype.deleteMessage = function(channel, id) { | ||
return this.events.source((function(_this) { | ||
return function(events) { | ||
return _this.transport._acquire(function(client) { | ||
return client.hdel("" + channel + ".messages", id, function(err, data) { | ||
_this.transport._release(client); | ||
return events.callback(err, data); | ||
}); | ||
}); | ||
return go(function(store) { | ||
return _this.destinationStores[name] = store; | ||
}); | ||
}; | ||
})(this))(); | ||
})(this)); | ||
}; | ||
DurableChannel.prototype.getDestinationQueue = function(name) { | ||
var _base; | ||
return (_base = this.destinationQueues)[name] != null ? _base[name] : _base[name] = new RemoteQueue({ | ||
var queue; | ||
return queue = new RemoteQueue({ | ||
name: "" + name + ".queue", | ||
@@ -120,3 +123,8 @@ transport: this.transport | ||
return function(events) { | ||
return _this.adapter.client.zadd(["" + name + ".pending", Date.now() + timeout, "" + channel + "::" + id], events.callback); | ||
return _this.transport._acquire(function(client) { | ||
return client.zadd(["" + name + ".pending", Date.now() + timeout, "" + channel + "::" + id], function(err, data) { | ||
_this.transport._release(client); | ||
return events.callback(err, data); | ||
}); | ||
}); | ||
}; | ||
@@ -131,3 +139,8 @@ })(this)); | ||
return function(events) { | ||
return _this.adapter.client.zrem(["" + name + ".pending", "" + channel + "::" + id], events.callback); | ||
return _this.transport._acquire(function(client) { | ||
return client.zrem(["" + name + ".pending", "" + channel + "::" + id], function(err, data) { | ||
_this.transport._release(client); | ||
return events.callback(err, data); | ||
}); | ||
}); | ||
}; | ||
@@ -142,3 +155,8 @@ })(this)); | ||
return function(events) { | ||
return _this.adapter.client.zscore(["" + name + ".pending", "" + channel + "::" + id], events.callback); | ||
return _this.transport._acquire(function(client) { | ||
return client.zscore(["" + name + ".pending", "" + channel + "::" + id], function(err, data) { | ||
_this.transport._release(client); | ||
return events.callback(err, data); | ||
}); | ||
}); | ||
}; | ||
@@ -156,3 +174,8 @@ })(this)); | ||
return _this.events.source(function(events) { | ||
return _this.adapter.client.zrangebyscore(["" + _this.name + ".pending", 0, Date.now()], events.callback); | ||
return _this.transport._acquire(function(client) { | ||
return client.zrangebyscore(["" + _this.name + ".pending", 0, Date.now()], function(err, data) { | ||
_this.transport._release(client); | ||
return events.callback(err, data); | ||
}); | ||
}); | ||
}); | ||
@@ -186,3 +209,5 @@ }); | ||
return go(function() { | ||
return _this.timeoutMonitor = setTimeout(loopToMonitor, _this.timeoutMonitorFrequency); | ||
if (!_this.ending) { | ||
return _this.timeoutMonitor = setTimeout(loopToMonitor, _this.timeoutMonitorFrequency); | ||
} | ||
}); | ||
@@ -196,4 +221,3 @@ })(); | ||
DurableChannel.prototype.expireMessage = function(channel, id) { | ||
var message, store; | ||
store = null; | ||
var message; | ||
message = null; | ||
@@ -203,12 +227,6 @@ return this.events.serially((function(_this) { | ||
go(function() { | ||
return _this.getDestinationStore(channel); | ||
return _this.getMessage(channel, id); | ||
}); | ||
go(function(_store) { | ||
store = _store; | ||
return store.get(id); | ||
}); | ||
go(function(_message) { | ||
return message = _message; | ||
}); | ||
go(function() { | ||
message = _message; | ||
return _this.getMessageTimeout(_this.name, channel, id); | ||
@@ -223,3 +241,3 @@ }); | ||
if (message != null) { | ||
return store["delete"](id); | ||
return _this.deleteMessage(channel, id); | ||
} | ||
@@ -258,7 +276,4 @@ }); | ||
go(function() { | ||
return _this.getDestinationStore(to); | ||
return _this.putMessage(to, message.id, message); | ||
}); | ||
go(function(destStore) { | ||
return destStore.put(message.id, message); | ||
}); | ||
go(function() { | ||
@@ -280,7 +295,4 @@ return _this.setMessageTimeout(_this.name, to, message.id, message.timeout); | ||
go(function() { | ||
return _this.getStore(); | ||
return _this.getMessage(_this.name, message.requestId); | ||
}); | ||
go(function(store) { | ||
return store.get(message.requestId); | ||
}); | ||
return go(function(request) { | ||
@@ -301,7 +313,4 @@ if (request == null) { | ||
go(function() { | ||
return _this.getDestinationStore(request.from); | ||
return _this.putMessage(request.from, message.id, message); | ||
}); | ||
go(function(destStore) { | ||
return destStore.put(message.id, message); | ||
}); | ||
go(function() { | ||
@@ -323,7 +332,4 @@ return _this.setMessageTimeout(_this.name, request.from, message.id, message.timeout); | ||
go(function() { | ||
return _this.getStore(); | ||
return _this.deleteMessage(_this.name, message.responseId); | ||
}); | ||
go(function(store) { | ||
return store["delete"](message.responseId); | ||
}); | ||
return go(function() { | ||
@@ -342,14 +348,7 @@ return _this.clearMessageTimeout(message.to, message.from, message.responseId); | ||
messageHandler = function(messageId) { | ||
var store; | ||
store = null; | ||
return _this.events.serially(function(go) { | ||
go(function() { | ||
return _this.getStore(); | ||
return _this.getMessage(_this.name, messageId); | ||
}); | ||
go(function(_store) { | ||
store = _store; | ||
return store.get(messageId); | ||
}); | ||
go(function(message) { | ||
var destStore; | ||
if (message == null) { | ||
@@ -361,12 +360,7 @@ return null; | ||
} | ||
destStore = null; | ||
return _this.events.source(function(events) { | ||
return _this.events.serially(function(go) { | ||
go(function() { | ||
return _this.getDestinationStore(message.from); | ||
return _this.getMessage(message.from, message.requestId); | ||
}); | ||
go(function(_destStore) { | ||
destStore = _destStore; | ||
return destStore.get(message.requestId); | ||
}); | ||
return go(function(request) { | ||
@@ -376,5 +370,5 @@ return _this.events.serially(function(go) { | ||
if (request != null) { | ||
return destStore["delete"](message.requestId); | ||
return _this.deleteMessage(message.from, message.requestId); | ||
} else { | ||
return store["delete"](messageId); | ||
return _this.deleteMessage(_this.name, messageId); | ||
} | ||
@@ -432,13 +426,5 @@ }); | ||
DurableChannel.prototype.end = function() { | ||
var key, queue, _ref, _results; | ||
this.ending = true; | ||
clearTimeout(this.timeoutMonitor); | ||
this.adapter.close(); | ||
this.queue.end(); | ||
_ref = this.destinationQueues; | ||
_results = []; | ||
for (key in _ref) { | ||
queue = _ref[key]; | ||
_results.push(queue.end()); | ||
} | ||
return _results; | ||
return this.transport.end("" + this.name + ".queue", !this.isTransportShared); | ||
}; | ||
@@ -445,0 +431,0 @@ |
@@ -16,12 +16,16 @@ // Generated by CoffeeScript 1.7.1 | ||
function RedisTransport(options) { | ||
var poolEvents; | ||
var poolEvents, poolSize; | ||
this.ending = {}; | ||
this.events = new EventChannel; | ||
poolEvents = this.events.source("pool"); | ||
this.blockTimeout = options.blockTimeout; | ||
this.blockTimeout = options.blockTimeout, poolSize = options.poolSize; | ||
if (this.blockTimeout == null) { | ||
this.blockTimeout = 1; | ||
} | ||
if (poolSize == null) { | ||
poolSize = 10; | ||
} | ||
this.clients = Pool({ | ||
name: "redis-transport", | ||
max: 10, | ||
max: poolSize, | ||
create: (function(_this) { | ||
@@ -114,2 +118,3 @@ return function(callback) { | ||
RedisTransport.prototype.dequeue = function(name) { | ||
this.ending[name] = false; | ||
return this.events.source((function(_this) { | ||
@@ -128,2 +133,6 @@ return function(events) { | ||
return _events.on("success", function(results) { | ||
if ((_this.ending[name] != null) && _this.ending[name] === true) { | ||
delete _this.ending[name]; | ||
return; | ||
} | ||
if (results == null) { | ||
@@ -164,8 +173,17 @@ return _dequeue(); | ||
RedisTransport.prototype.end = function() { | ||
return this.clients.drain((function(_this) { | ||
return function() { | ||
return _this.clients.destroyAllNow(); | ||
}; | ||
})(this)); | ||
RedisTransport.prototype.end = function(name, destroyConnections) { | ||
if (name == null) { | ||
name = ""; | ||
} | ||
if (destroyConnections == null) { | ||
destroyConnections = true; | ||
} | ||
this.ending[name] = true; | ||
if (destroyConnections) { | ||
return this.clients.drain((function(_this) { | ||
return function() { | ||
return _this.clients.destroyAllNow(); | ||
}; | ||
})(this)); | ||
} | ||
}; | ||
@@ -172,0 +190,0 @@ |
@@ -45,4 +45,4 @@ // Generated by CoffeeScript 1.7.1 | ||
_this.on = function(event, handler) { | ||
_this.superOn(event, handler); | ||
if (event !== "success" && event !== "error" && event !== "ready") { | ||
_this.superOn(event, handler); | ||
return _dequeue(); | ||
@@ -49,0 +49,0 @@ } |
{ | ||
"name": "mutual", | ||
"version": "0.4.16", | ||
"version": "0.4.17", | ||
"description": "Scala-inspired Actors that use Redis as a message transport", | ||
@@ -31,3 +31,3 @@ "main": "lib/index.js", | ||
"fairmont": "0.7.x", | ||
"generic-pool": "2.0.2", | ||
"generic-pool": "~2.1.1", | ||
"key-forge": "~0.1.3", | ||
@@ -34,0 +34,0 @@ "pirate": "~0.9.7", |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
50668
1224
+ Addedgeneric-pool@2.1.1(transitive)
- Removedgeneric-pool@2.0.2(transitive)
Updatedgeneric-pool@~2.1.1