Mutual
Mutual wraps a messaging layer with a simple EventEmitter-style interface. Mutual uses Evie instead of EventEmitter, which allows for event-bubbling and a few other niceties.
To use Mutual, you simply create a Channel
and subscribe to the events you're interested in.
{Channel} = require "mutual"
channel = Channel.create "hello"
channel.on message: (message) ->
assert message == "Hello, World"
channel.emit message: "Hello, World"
We can communicate remotely the same way just by adding a Transport
.
Client
{Channel, Transport} = require "mutual"
transport = Transport.Redis.Broadcast.create()
channel = Channel.create "hello", transport
channel.on message: (message) ->
assert message == "Hello, World"
Server
{Channel, Transport} = require "mutual"
transport = Transport.Redis.Broadcast.create()
channel = Channel.create "hello", transport
channel.emit message: "Hello, World"
The only code we had to change here was the creation of the channel. The code for using the channel remains the same.
Let's switch from a Broadcast
channel to a Queue
channel.
Client
{Channel, Transport} = require "mutual"
transport = Transport.Redis.Queue.create()
channel = Channel.create "hello", transport
channel.on message: (message) ->
assert message == "Hello, World"
Server
{Channel, Transport} = require "mutual"
transport = Transport.Redis.Queue.create()
channel = Channel.create "hello", transport
channel.emit message: "Hello, World"
Again, the only code we needed to change is to create a different type of transport.
Using a Queue
channel, you can implement Workers.
Worker
{Channel, Transport} = require "mutual"
transport = Transport.Redis.Queue.create()
tasks = Channel.create "hello-world-tasks", transport
results = Channel.create "hello-world-results", transport
tasks.on task: ({name}) ->
results.emit result: "Hello, #{name}"
Dispatcher
{Channel, Transport} = require "mutual"
transport = Transport.Redis.Queue.create()
tasks = Channel.create "hello-world-tasks", transport
results = Channel.create "hello-world-results", transport
tasks.emit task: name: "World"
results.on result: (greeting) ->
assert greeting == "Hello, World"
Let's implement a simple long-polling chat API using Queue channels.
Server
{Builder} = require "pbx-builder"
{processor} = require "pbx-processor"
{async, partial, _} = require "fairmont"
builder = Builder.create "chat-api"
builder.define "message",
template: "/{channel}"
.get()
.post()
.base_url "localhost:8080"
transport = Transport.local
make_channel = partial Channel.create _, transport
channels = new Proxy {},
get: (ch, name) -> ch[name] ?= make_channel name
handlers =
get: async ({respond, match: {path: {channel}}}) ->
channels[channel].once message: (body) -> respond 200, body
post: async ({data, respond, match: {path: {channel}}}) ->
channels[channel].emit message: yield data
respond 200
call ->
(require "http")
.createServer yield (processor api, handlers)
.listen 8080
Client
{client} = require "pbx-client"
{call, stream, join} = require "fairmont"
api = yield client.discover "localhost:8080"
call ->
while true
try
message = yield join stream yield api.get()
console.log message
catch error
# probably a timeout, just poll again
call ->
each api.post, stream lines process.stdin
If you run the server and the client, you can type your messages via standard input and they'll be echoed back to you as the message comes back to the server.
If we change one line, we can add servers to increase our message throughput. If we change the Broadcast.Redis
transport, we can run a hundred of these servers behind a load-balancer and scale to hundreds of thousands of messages per second, without changing any other code.