epidemic-broadcast-trees
This is an implementation of the plumtree Epidemic Broadcast Trees paper.
It's a algorithm that combines the robustness of a flooding epidemic gossip broadcast,
with the efficiency of a tree model. It's intended for implementing realtime protocols
(such as chat, scuttlebutt, also radio/video) over networks with random topology -
or networks where otherwise peers may be unable to all connect to each other or to a central hub.
Although the primary motivation for this module is to use it in secure scuttlebutt,
it's intended to be decoupled sufficiently to use for other applications.
example
A simple example is a chatroom - here we just store user messages in arrays.
to create an instance of this protocol for your application, you need to pass in
a vectorClock
object, and get
and append
functions.
(Note, that the functions need to be async - this is so that we can use it for bigger
things like replicating databases)
The vectorClock
is a map of the ids of all the nodes in the system, with the sequence
numbers they are currently up to.
get
takes a id, a sequence, and a callback.
append
takes a message object.
The message object should have {author: id, sequence: integer, content: ...}
Where content can be any string or serializable js value.
The returned stream will also have an onAppend
method, this should be called when ever
messages are appended to the structure, whether they where created locally or added with append
.
the onAppend
should be called immediately before calling append's callback.
In this example, we'll use observables to call
onAppend
this means we can connect each peer to multiple others and it will work great!
var pull = require('pull-stream')
var createEbtStream = require('epidemic-broadcast-trees')
var Obv = require('obv')
function createChatModel (id, log) {
var logs = {}
if(id) logs[id] = log || []
var onAppend = Obv()
return {
logs: logs,
append: function append (msg) {
(logs[msg.author] = logs[msg.author] || []).push(msg)
onAppend.set(msg)
},
onAppend: onAppend
}
}
function createStream(chat) {
var vectorClock = {}
for(var k in chat.logs)
vectorClock[k] = chat.logs[k].length
var stream = createEbtStream(
function (id, seq, cb) {
if(!chat.logs[id] || !chat.logs[id][seq-1])
return cb(new Error('not found'))
cb(null, chat.logs[id][seq-1])
},
function (msg, cb) {
chat.append(msg)
cb()
}
) ({
seqs: vectorClock,
})
chat.onAppend(stream.onAppend)
return stream
}
var alice = createChatModel('alice', [])
var bob = createChatModel('bob')
var as = createStream(alice)
var bs = createStream(bob)
pull(as, bs, as)
bob.onAppend(function (msg) {
console.log('msg at bob:', msg)
})
alice.append({author: 'alice', sequence: 1, content: 'hello bob!'})
api
var createStream = require('epidemic-broadcast-trees')
var stream = createStream(seqs, get, append, onChange, callback)
createStream(seqs, get, append, onChange, callback) => stream
seqs
is an object that maps id
to sequence
.
this represents who you want to follow initially.
{<id>:<sequence>,..}
get(id, seq, cb)
is an async function that gets the message by a feed at a particular sequence number.append(msg, callback)
an async function that appends a single message to the log.onChange
a function that is called each time the state changes. This is useful to call stream.progress()
callback(err)
is called when the replication connection ends.
stream
A duplex pull-stream returned by the createStream
method, it
also has a few extra methods.
stream.onAppend(msg)
sync function that must be called when a message is added to the local database.
stream.progress()
returns an object which represents the current replication progress.
an example object output looks like this, all values are integers >= 0.
{
start: S,
current: C,
total: T
}
this follows a common pattern I've used across ssbc modules for representing progress,
used for example here: https://github.com/ssbc/scuttlebot/blob/master/lib/progress.js
comparison to plumtree
I had an idea for a gossip protocol that avoided retransmitting messages by putting
unneeded connections into standby mode (which can be brought back into service when necessary)
and then was pleasantly surprised to discover it was not a new idea, but had already been described
in a paper - and there is an implementation of that paper in erlang here: https://github.com/helium/plumtree
There are some small differences, mainly because I want to send messages in order, which makes
it easy to represent what messages have not been seen using just a incrementing sequence number per feed.
todo
- call a user function to decide whether we want to replicate a given feed (say, for blocking bad pers)
- handle models where it's okay to have gaps in a log (as with classic insecure scuttlebutt
License
MIT