New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

mutual

Package Overview
Dependencies
Maintainers
3
Versions
31
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

mutual - npm Package Compare versions

Comparing version 0.4.18 to 1.0.0-alpha-02

dump.rdb

185

lib/channel.js

@@ -1,72 +0,151 @@

// Generated by CoffeeScript 1.8.0
// Generated by CoffeeScript 1.9.1
(function() {
var Channel;
var Channel, Local, PatternSet, assert, async, empty, first, is_function, is_object, is_string, map, promise, ref, ref1, resolve,
slice = [].slice;
ref = require("when"), promise = ref.promise, resolve = ref.resolve;
ref1 = require("fairmont"), async = ref1.async, is_string = ref1.is_string, is_object = ref1.is_object, is_function = ref1.is_function, empty = ref1.empty, first = ref1.first;
Local = require("./transport/local");
PatternSet = require("evie-wildcards");
assert = function(x) {
if (!x) {
throw new TypeError;
}
};
map = function(fn) {
return function() {
var args, event, x;
args = 1 <= arguments.length ? slice.call(arguments, 0) : [];
if (args.length === 1 && is_object(first(args))) {
map = args[0];
for (event in map) {
x = map[event];
fn.call(this, event, x);
}
} else {
fn.call.apply(fn, [this].concat(slice.call(args)));
}
return this;
};
};
Channel = (function() {
function Channel() {
this.handlers = [];
function Channel(name, transport) {
this.name = name;
this.transport = transport;
assert(is_string(this.name));
if (this.transport == null) {
this.transport = Local.create();
}
this.handlers = {};
this.patterns = new PatternSet;
this.closed = this.listening = false;
}
Channel.prototype.send = function(message) {
return setImmediate((function(_this) {
Channel.prototype.emit = map(function() {
var args, event;
event = arguments[0], args = 2 <= arguments.length ? slice.call(arguments, 1) : [];
assert(is_string(event));
return this.transport.send(this.name, JSON.stringify([event].concat(slice.call(args))));
});
Channel.prototype.on = map(function(event, handler) {
var base, handlers;
assert(is_string(event));
assert(is_function(handler));
this.patterns.add(event);
handlers = ((base = this.handlers)[event] != null ? base[event] : base[event] = []);
handlers.push(handler);
return this.listen();
});
Channel.prototype.once = map(function(event, handler) {
assert(is_string(event));
assert(is_function(handler));
return this.on(event, (function(_this) {
return function() {
return _this.fire(message);
var args;
args = 1 <= arguments.length ? slice.call(arguments, 0) : [];
handler.apply(null, args);
return _this.remove(event, handler);
};
})(this));
};
});
Channel.prototype.fire = function(message) {
var handler, _i, _len, _ref, _results;
this["package"](message);
_ref = this.handlers;
_results = [];
for (_i = 0, _len = _ref.length; _i < _len; _i++) {
handler = _ref[_i];
_results.push(handler(message));
}
return _results;
};
Channel.prototype.remove = map(function(event, handler) {
var _h, base, handlers;
assert(is_string(event));
assert(is_function(handler));
handlers = ((base = this.handlers)[event] != null ? base[event] : base[event] = []);
return this.handlers[event] = (function() {
var i, len, results;
results = [];
for (i = 0, len = handlers.length; i < len; i++) {
_h = handlers[i];
if (_h !== handler) {
results.push(_h);
}
}
return results;
})();
});
Channel.prototype.receive = function(handler) {
return this.handlers.push(handler);
};
Channel.prototype.forward = map(function(event, emitter) {
var emit;
assert(is_string(event));
assert(emitter.emit != null);
emit = function() {
var args;
args = 1 <= arguments.length ? slice.call(arguments, 0) : [];
return emitter.emit.apply(emitter, [event].concat(slice.call(args)));
};
return this.on(event, emit);
});
Channel.prototype.remove = function(handler) {
var _handler;
return this.handlers = (function() {
var _i, _len, _ref, _results;
_ref = this.handlers;
_results = [];
for (_i = 0, _len = _ref.length; _i < _len; _i++) {
_handler = _ref[_i];
if (_handler !== handler) {
_results.push(_handler);
Channel.prototype.listen = async(function*() {
var args, event, ref2, result;
if (!this.listening) {
this.listening = true;
while (!this.closed) {
result = (yield (this.transport.receive(this.name)));
if (result != null) {
ref2 = JSON.parse(result), event = ref2[0], args = 2 <= ref2.length ? slice.call(ref2, 1) : [];
this.patterns.match(event, (function(_this) {
return function(event) {
var base, handler, handlers, i, len, results;
handlers = ((base = _this.handlers)[event] != null ? base[event] : base[event] = []);
results = [];
for (i = 0, len = handlers.length; i < len; i++) {
handler = handlers[i];
results.push(handler.apply(null, args));
}
return results;
};
})(this));
}
}
return _results;
}).call(this);
};
return this.listening = false;
}
});
Channel.prototype.forward = function(channel) {
return this.receive((function(_this) {
return function(message) {
return channel.fire(message);
};
})(this));
Channel.prototype.close = function() {
this.closed = true;
return this.transport.close();
};
Channel.prototype.source = function(block) {
var channel;
channel = new this.constructor;
channel.forward(this);
if (block != null) {
block(channel);
}
return channel;
Channel.create = function() {
var args;
args = 1 <= arguments.length ? slice.call(arguments, 0) : [];
return (function(func, args, ctor) {
ctor.prototype = func.prototype;
var child = new ctor, result = func.apply(child, args);
return Object(result) === result ? result : child;
})(Channel, args, function(){});
};
Channel.prototype["package"] = function(message) {
return message;
};
return Channel;

@@ -73,0 +152,0 @@

@@ -1,38 +0,14 @@

// Generated by CoffeeScript 1.8.0
// Generated by CoffeeScript 1.9.1
(function() {
var getter;
getter = function(fn) {
return {
get: function() {
return fn();
},
enumerable: true
};
module.exports = {
Channel: require("./channel"),
Transport: {
Local: require("./transport/local"),
Redis: {
Queue: require("./transport/redis/queue"),
Broadcast: require("./transport/redis/broadcast")
}
}
};
Object.defineProperties(module.exports, {
Channel: getter(function() {
return require("./channel");
}),
EventChannel: getter(function() {
return require("./event-channel");
}),
RemoteChannel: getter(function() {
return require("./remote-channel");
}),
RedisTransport: getter(function() {
return require("./redis-transport");
}),
RemoteQueue: getter(function() {
return require("./remote-queue");
}),
DurableChannel: getter(function() {
return require("./durable-channel");
}),
Promisifier: getter(function() {
return require("./promisifier");
})
});
}).call(this);
{
"name": "mutual",
"version": "0.4.18",
"description": "Scala-inspired Actors that use Redis as a message transport",
"version": "1.0.0-alpha-02",
"description": "TBD",
"main": "lib/index.js",
"scripts": {
"test": "rake test"
"prepublish": "coffee --nodejs --harmony --compile -o ./lib ./src",
"watch": "coffee --nodejs --harmony --compile --watch -o lib/ ./src",
"test": "coffee --nodejs --harmony test/index.coffee",
"examples": "coffee --nodejs --harmony examples/index.coffee"
},

@@ -13,31 +16,15 @@ "repository": {

},
"keywords": [
"scala",
"actor",
"channel",
"redis",
"message",
"distributed",
"queue",
"worker"
],
"keywords": [],
"author": "Dan Yoder",
"license": "MIT",
"readmeFilename": "README.md",
"scripts": {
"prepublish": "coffee -o lib/ -c src/*.coffee"
},
"dependencies": {
"fairmont": "0.7.x",
"generic-pool": "~2.1.1",
"key-forge": "~0.1.3",
"pirate": "~0.9.7",
"redis": "0.8.x",
"typely": "0.0.x",
"when": "~3.6.2"
"evie": "^1.0.0-alpha-02",
"evie-wildcards": "^1.0.0-alpha-01",
"redis": "^0.12.1",
"when": "^3.7.2"
},
"devDependencies": {
"testify": "0.2.9",
"express": "~3.3.5"
"fairmont": "^1.0.0-alpha-14"
}
}
# Mutual
Mutual is inspired by Scala's Actor model. Concurrency is managed by setting up Channels between participants. Remote channels are implemented by using Redis as a transport. Event channels provide an `EventEmitter` like interface. Builder methods, in combination with event-bubbling, can be used to build complex chains of asynchronous processing.
Mutual wraps a messaging layer with a simple EventEmitter-style interface. Mutual uses Evie instead of EventEmitter, which allows for event-bubbling and a few other niceties.
fs = require "fs"
{EventChannel} = require "mutual"
events = new EventChannel
# all error events will bubble-up here
events.on "error", (error) -> console.log error
# wrap a Node-style callback function
read = events.wrap(fs.readFile)
# use builder function to create an asynchronous control flow
do events.serially (go) ->
go -> read("foo.txt", encoding: "utf8")
go (text) -> console.log text
Remote channels are just event channels, which means you can swap them out without changing any code. Here's a simple express app that implements a chat interface:
To use Mutual, you simply create a `Channel` and subscribe to the events you're interested in.
http = require "http"
{EventChannel} = require "mutual"
events = new EventChannel
```coffee
{Channel} = require "mutual"
channel = Channel.create "hello"
# all error events will bubble-up here
events.on "error", (error) -> console.log error
channel.on message: (message) ->
assert message == "Hello, World"
{getChannel,makeChannel} = do (channels = {}) ->
makeChannel: (name) -> events.source name
getChannel: (name) -> channels[name] ?= makeChannel(name)
channel.emit message: "Hello, World"
```
express = require "express"
app = express()
We can communicate remotely the same way just by adding a `Transport`.
app.use (request, response, next) ->
body = ""
request.on "data", (data) -> body += data
request.on "end", ->
request.body = body
next()
**Client**
```coffee
{Channel, Transport} = require "mutual"
transport = Transport.Broadcast.Redis.create()
channel = Channel.create transport, "hello"
app.get '/:channel', (request, response) ->
{channel} = request.params
getChannel(channel).once "message", (message) ->
response.send message
channel.on message: (message) ->
assert message == "Hello, World"
```
app.post '/:channel', (request, response) ->
response.send 202, ""
{channel} = request.params
message = request.body
getChannel(channel).emit "message", message
**Server**
```coffee
{Channel, Transport} = require "mutual"
transport = Transport.Broadcast.Redis.create()
channel = Channel.create transport, "hello"
http.createServer(app).listen(1337)
If you run this, you can do a GET to a channel URL (ex: `/foo`) and then POST a message to it.
channel.emit message: "Hello, World"
```
curl http://localhost:1337/foo &
curl http://localhost:1337/foo -d "Hello"
The only code we had to change here was the creation of the channel. The code for using the channel remains the same.
The original GET will return the message.
Let's switch from a `Broadcast` channel to a `Queue` channel.
Of course, this isn't much different from what we could do using `EventEmitter`, outside of utilizing the event bubbling for `error` events. However, this version also has a big limitation: it only works for one process. If we start to get lots of messages, we'll want to be able to run multiple processes, perhaps even across multiple machines.
**Client**
```coffee
{Channel, Transport} = require "mutual"
transport = Transport.Queue.Redis.create()
channel = Channel.create transport, "hello"
With Mutual, all we need to do, basically, is change `makeChannel` so that it returns a `RemoteChannel`.
channel.on message: (message) ->
assert message == "Hello, World"
```
First, let's `require` the `RedisTransport` and `RemoteChannel`:
**Server**
```coffee
{Channel, Transport} = require "mutual"
transport = Transport.Queue.Redis.create()
channel = Channel.create transport, "hello"
{RemoteChannel,EventChannel,RedisTransport} = require "../src/index"
channel.emit message: "Hello, World"
```
Next, well instantiate the transport:
Again, the only code we needed to change is to create a different type of transport.
transport = new RedisTransport host: "localhost", port: 6379
Finally, we just change our `makeChannel` function:
Using a `Queue` channel, you can implement Workers.
makeChannel: (name) ->
channel = new RemoteChannel {name,transport}
channel.forward(events, name)
channel.listen()
channel
**Worker**
```coffee
{Channel, Transport} = require "mutual"
transport = Transport.Queue.Redis.create()
tasks = Channel.create transport, "hello-world-tasks"
results = Channel.create transport, "hello-world-results"
The rest of our code remains the same. We've just moved to an implementation that will work across multiple process or machine boundaries by adding and modifying a few lines of code. The bulk of our application is unchanged.
tasks.on task: ({name}) ->
results.emit result: "Hello, #{name}"
```
Our final version of our little chat API can be found [in the examples][ex].
**Dispatcher**
```coffee
{Channel, Transport} = require "mutual"
transport = Transport.Queue.Redis.create()
tasks = Channel.create transport, "hello-world-tasks"
results = Channel.create transport, "hello-world-results"
[ex]:https://github.com/dyoder/mutual/tree/master/examples
tasks.emit task: name: "World"
results.on result: (greeting) ->
assert greeting == "Hello, World"
```
## Installation
Let's implement a simple long-polling chat API using Queue channels.
npm install mutual
## Status
**Server**
```coffee
{Builder} = require "pbx-builder"
{processor} = require "pbx-processor"
{async, partial} = require "fairmont"
In development - the interface is relatively stable, but we haven't done a lot of load and performance testing.
builder = Builder.create "chat-api"
builder.define "message",
template: "/{channel}"
.get()
.post()
.base_url "localhost:8080"
transport = Transport.local
make_channel = partial Channel.create transport
channels = new Proxy {},
get: (ch, name) -> ch[name] ?= make_channel name
handlers =
get: async ({respond, match: {path: {channel}}}) ->
channels[channel].once message: (body) -> respond 200, body
post: async ({data, respond, match: {path: {channel}}}) ->
channels[channel].emit message: yield data
respond 200
call ->
(require "http")
.createServer yield (processor api, handlers)
.listen 8080
```
**Client**
```coffee
{client} = require "pbx-client"
{call, stream, join} = require "fairmont"
api = yield client.discover "localhost:8080"
call ->
while true
try
message = yield join stream yield api.get()
console.log message
catch error
# probably a timeout, just poll again
call ->
each api.post, stream lines process.stdin
```
If you run the server and the client, you can type your messages via standard input and they'll be echoed back to you as the message comes back to the server.
If we change one line, we can add servers to increase our message throughput. If we change the `Broadcast.Redis` transport, we can run a hundred of these servers behind a load-balancer and scale to hundreds of thousands of messages per second, without changing any other code.
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc