epidemic-broadcast-trees
Advanced tools
Comparing version 9.0.0 to 9.0.1
204
events.js
@@ -15,3 +15,3 @@ 'use strict' | ||
function isEmpty (o) { | ||
for(var k in o) return false | ||
for (var k in o) return false | ||
return true | ||
@@ -28,17 +28,17 @@ } | ||
function isShared (state, id, peer_id) { | ||
return state.follows[id] && !isBlocked(state, id, peer_id) | ||
function isShared (state, id, peerId) { | ||
return state.follows[id] && !isBlocked(state, id, peerId) | ||
} | ||
//check if a feed is already being replicated on another peer from ignore_id | ||
function isAlreadyReplicating(state, feed_id, ignore_id) { | ||
for(var id in state.peers) { | ||
if(id !== ignore_id) { | ||
//check if a feed is already being replicated on another peer from ignoreId | ||
function isAlreadyReplicating(state, feedId, ignoreId) { | ||
for (var id in state.peers) { | ||
if (id !== ignoreId) { | ||
var peer = state.peers[id] | ||
if(peer.notes && getReceive(peer.notes[id])) return id | ||
if (peer.notes && getReceive(peer.notes[id])) return id | ||
// for replicating the node must have replicated something not just rx | ||
// this fixed a partial replication bug where a node is unable to send the full log | ||
var id_has_sent = peer.replicating && peer.replicating[id] && peer.replicating[id].sent != -1 | ||
if(peer.replicating && peer.replicating[feed_id] && peer.replicating[feed_id].rx && id_has_sent) return id | ||
var idHasSent = peer.replicating && peer.replicating[id] && peer.replicating[id].sent != -1 | ||
if (peer.replicating && peer.replicating[feedId] && peer.replicating[feedId].rx && idHasSent) return id | ||
} | ||
@@ -52,17 +52,17 @@ } | ||
function fixSeq (local, remote) { | ||
if(local == null) return remote | ||
if(local == -1 || remote == -1) return remote | ||
if(remote == 0) return 0 | ||
if(remote > 0 && local > 0 && remote < local) return remote | ||
if (local == null) return remote | ||
if (local == -1 || remote == -1) return remote | ||
if (remote == 0) return 0 | ||
if (remote > 0 && local > 0 && remote < local) return remote | ||
return Math.max(local, remote) | ||
} | ||
//check if a feed is available from a peer apart from ignore_id | ||
//check if a feed is available from a peer apart from ignoreId | ||
function isAvailable(state, feed_id, ignore_id) { | ||
for(var peer_id in state.peers) { | ||
if(peer_id != ignore_id) { | ||
var peer = state.peers[peer_id] | ||
function isAvailable(state, feedId, ignoreId) { | ||
for (var peerId in state.peers) { | ||
if (peerId != ignoreId) { | ||
var peer = state.peers[peerId] | ||
//BLOCK: check wether id has blocked this peer | ||
if((peer.clock && peer.clock[feed_id] || 0) > (state.clock[feed_id] || 0) && isShared(state, feed_id, peer_id)) { | ||
if ((peer.clock && peer.clock[feedId] || 0) > (state.clock[feedId] || 0) && isShared(state, feedId, peerId)) { | ||
return true | ||
@@ -81,6 +81,6 @@ } | ||
var i = keys.indexOf(key) | ||
if(!~i) return | ||
if (!~i) return | ||
//start at 1 because we want to visit all keys but key. | ||
for(var j = 1; j < keys.length; j++) | ||
if(iter(keys[(j+i)%keys.length], j)) | ||
for (var j = 1; j < keys.length; j++) | ||
if (iter(keys[(j+i) % keys.length], j)) | ||
return | ||
@@ -94,3 +94,3 @@ } | ||
var rep = peer.replicating[feed] | ||
if(rep) { | ||
if (rep) { | ||
rep.rx = rx | ||
@@ -131,6 +131,6 @@ rep.requested = seq | ||
exports.connect = function (state, ev) { | ||
if(state.peers[ev.id]) throw new Error('already connected to peer:'+ev.id) | ||
if(typeof ev.client != 'boolean') throw new Error('connect.client must be boolean') | ||
if (state.peers[ev.id]) throw new Error('already connected to peer:'+ev.id) | ||
if (typeof ev.client != 'boolean') throw new Error('connect.client must be boolean') | ||
// if(isBlocked(state, state.id, ev.id)) return state | ||
// if (isBlocked(state, state.id, ev.id)) return state | ||
@@ -161,3 +161,3 @@ state.peers[ev.id] = { | ||
exports.peerClock = function (state, ev) { | ||
if(!state.peers[ev.id]) | ||
if (!state.peers[ev.id]) | ||
throw new Error('peerClock called for:'+ev.id + ' but only connected to:'+ Object.keys(state.peers)) | ||
@@ -169,3 +169,3 @@ var peer = state.peers[ev.id] | ||
//can error before a peer sends a massive handshake. | ||
if(peer.replicating == null) return state | ||
if (peer.replicating == null) return state | ||
@@ -181,6 +181,6 @@ //always set an empty clock here, so that if we don't have anything | ||
for(var id in state.follows) { | ||
for (var id in state.follows) { | ||
var seq = clock[id] || 0, lseq = state.clock[id] || 0 | ||
//BLOCK: check wether id has blocked this peer | ||
if(isShared(state, id, ev.id) && seq !== -1 && seq !== state.clock[id]) { | ||
if (isShared(state, id, ev.id) && seq !== -1 && seq !== state.clock[id]) { | ||
@@ -209,7 +209,7 @@ //if we are already replicating, and this feed is at zero, ask for it anyway, | ||
var replicating = false | ||
if(!!state.follows[ev.id] !== ev.value) { | ||
if (!!state.follows[ev.id] !== ev.value) { | ||
state.follows[ev.id] = ev.value | ||
for(var id in state.peers) { | ||
for (var id in state.peers) { | ||
var peer = state.peers[id] | ||
if(!peer.clock || !peer.replicating || !isShared(state, ev.id, id)) continue | ||
if (!peer.clock || !peer.replicating || !isShared(state, ev.id, id)) continue | ||
//BLOCK: check wether this feed has has blocked this peer. | ||
@@ -224,9 +224,9 @@ //..... don't replicate feeds with peers that have blocked them at all? | ||
var seq = peer.clock[ev.id] || 0, lseq = state.clock[ev.id] || 0 | ||
if(seq === -1) { | ||
if (seq === -1) { | ||
//peer explicitly does not replicate this feed, don't ask for it. | ||
} | ||
else if(ev.value === false) { //unfollow | ||
else if (ev.value === false) { //unfollow | ||
setNotes(peer, ev.id, -1, false) | ||
} | ||
else if(ev.value === true && seq !== state.clock[ev.id]) { | ||
else if (ev.value === true && seq !== state.clock[ev.id]) { | ||
peer.replicating[ev.id] = { | ||
@@ -246,5 +246,5 @@ rx: true, tx: false, | ||
//check if any peer requires this msg | ||
for(var id in state.peers) { | ||
for (var id in state.peers) { | ||
var peer = state.peers[id] | ||
if(!peer.replicating) continue; | ||
if (!peer.replicating) continue | ||
//BLOCK: check wether id has blocked this peer | ||
@@ -255,8 +255,8 @@ const author = exports.getMsgAuthor(msg) | ||
if(rep && rep.tx && rep.sent === sequence - 1) { | ||
if (rep && rep.tx && rep.sent === sequence - 1) { | ||
rep.sent++ | ||
peer.msgs.push(msg) | ||
if(rep.sent < state.clock[author]) { | ||
if (rep.sent < state.clock[author]) { | ||
//use continue, not return because we still need to loop through other peers. | ||
if(~peer.retrive.indexOf(author)) continue | ||
if (~peer.retrive.indexOf(author)) continue | ||
peer.retrive.push(author) | ||
@@ -270,5 +270,5 @@ } | ||
function isAhead(seq1, seq2) { | ||
if(seq2 === -1) return false | ||
if(seq2 == null) return true | ||
if(seq1 > seq2) return true | ||
if (seq2 === -1) return false | ||
if (seq2 == null) return true | ||
if (seq1 > seq2) return true | ||
} | ||
@@ -280,9 +280,9 @@ | ||
//check if any peer requires this msg | ||
if(state.clock[author] != null && state.clock[author] !== sequence - 1) | ||
if (state.clock[author] != null && state.clock[author] !== sequence - 1) | ||
return state //ignore | ||
var lseq = state.clock[author] = sequence | ||
for(var id in state.peers) { | ||
for (var id in state.peers) { | ||
var peer = state.peers[id] | ||
if(!peer.clock || !peer.replicating || !isShared(state, author, id)) continue | ||
if (!peer.clock || !peer.replicating || !isShared(state, author, id)) continue | ||
//BLOCK: check wether msg.author has blocked this peer | ||
@@ -294,3 +294,3 @@ | ||
if(rep && rep.tx && rep.sent == lseq - 1 && lseq > seq) { | ||
if (rep && rep.tx && rep.sent == lseq - 1 && lseq > seq) { | ||
peer.msgs.push(msg) | ||
@@ -300,3 +300,3 @@ rep.sent++ | ||
//if we are ahead of this peer, and not in tx mode, let them know that. | ||
else if( | ||
else if ( | ||
isAhead(lseq, seq) && | ||
@@ -320,3 +320,3 @@ (rep ? !rep.tx && rep.sent != null : state.follows[author]) | ||
if(!state.peers[ev.id]) throw new Error('lost peer state:'+ev.id) | ||
if (!state.peers[ev.id]) throw new Error('lost peer state:'+ev.id) | ||
@@ -331,3 +331,3 @@ //we _know_ that this peer is upto at least this message now. | ||
//if we havn't asked for this, ignore it. (this is remote speaking protocol wrong!) | ||
if(!rep) return state | ||
if (!rep) return state | ||
@@ -338,3 +338,3 @@ peer.clock[author] = Math.max(peer.clock[author], sequence) | ||
//if this message has already been seen, ignore. | ||
if(state.clock[author] >= sequence) { | ||
if (state.clock[author] >= sequence) { | ||
if (rep.rx) { | ||
@@ -351,3 +351,3 @@ setNotes(peer, author, state.clock[author], false) | ||
//FORKS ignore additional messages if we have already found an invalid one. | ||
if(isShared(state, exports.getMsgAuthor(ev.value), ev.id)) | ||
if (isShared(state, exports.getMsgAuthor(ev.value), ev.id)) | ||
state.receive.push(ev) | ||
@@ -359,3 +359,2 @@ //Q: possibly update the receiving mode? | ||
//XXX check if we are already receiving a feed | ||
@@ -369,12 +368,12 @@ //and if so put this into lazy mode. | ||
//this is to allow room for backwards compatible upgrades. | ||
if(isObject(ev.value.clock)) | ||
if (isObject(ev.value.clock)) | ||
clock = ev.value.clock | ||
var peer = state.peers[ev.id] | ||
if(!peer) throw new Error('lost state of peer:'+ev.id) | ||
if(!peer.clock) throw new Error("received notes, but has not set the peer's clock yet") | ||
if (!peer) throw new Error('lost state of peer:'+ev.id) | ||
if (!peer.clock) throw new Error("received notes, but has not set the peer's clock yet") | ||
var count = 0 | ||
//if we are client, and this is the first notes we receive | ||
if(!peer.replicating) { | ||
if (!peer.replicating) { | ||
peer.replicating = {} | ||
@@ -384,3 +383,3 @@ state = exports.peerClock(state, {id: ev.id, value: state.peers[ev.id].clock}) | ||
for(var id in clock) { | ||
for (var id in clock) { | ||
count++ | ||
@@ -396,6 +395,6 @@ | ||
//BLOCK: or wether id has blocked this peer | ||
if(!isShared(state, id, ev.id)) { | ||
if(!peer.replicating[id]) | ||
setNotes(peer, id, -1) | ||
peer.replicating[id] = {tx:false, rx:false, sent: -1, requested: -1} | ||
if (!isShared(state, id, ev.id)) { | ||
if (!peer.replicating[id]) | ||
setNotes(peer, id, -1, false) | ||
peer.replicating[id] = {tx: false, rx: false, sent: -1, requested: -1} | ||
} | ||
@@ -405,3 +404,3 @@ else { | ||
var replicating = isAlreadyReplicating(state, id, ev.id) | ||
if(!rep) { | ||
if (!rep) { | ||
rep = peer.replicating[id] = { | ||
@@ -415,4 +414,4 @@ tx: true, | ||
} | ||
else if(!rep.rx && seq > lseq) { | ||
if(!replicating) { | ||
else if (!rep.rx && seq > lseq) { | ||
if (!replicating) { | ||
peer.ts = ev.ts //remember ts, so we can switch this feed if necessary | ||
@@ -428,3 +427,3 @@ setNotes(peer, id, lseq, true) | ||
// received theirs. | ||
if(seq > (_peer.clock[id] || 0)) { | ||
if (seq > (_peer.clock[id] || 0)) { | ||
peer.ts = ev.ts | ||
@@ -441,5 +440,5 @@ setNotes(peer, id, lseq, true) | ||
rep.sent = seq | ||
if(lseq > seq) { | ||
if(tx) peer.retrive.push(id) | ||
else if(isReplicate) setNotes(peer, id, lseq, rep.rx) | ||
if (lseq > seq) { | ||
if (tx) peer.retrive.push(id) | ||
else if (isReplicate) setNotes(peer, id, lseq, rep.rx) | ||
} | ||
@@ -455,15 +454,16 @@ } | ||
var want = {} | ||
for(var peer_id in state.peers) { | ||
var peer = state.peers[peer_id] | ||
for (var peerId in state.peers) { | ||
var peer = state.peers[peerId] | ||
//check if the peer hasn't received a message recently. | ||
//if we havn't received a message from this peer recently | ||
if((peer.ts || 0) + state.timeout < ev.ts) { | ||
if ((peer.ts || 0) + state.timeout < ev.ts) { | ||
//check if they have claimed a higher sequence, but not sent us | ||
for(var id in peer.replicating) { | ||
for (var id in peer.replicating) { | ||
var rep = peer.replicating[id] | ||
//if yes, prepare to switch this feed to that peer | ||
if(rep.rx && isAvailable(state, id, peer_id)) { | ||
want[id] = peer_id | ||
if (rep.rx && isAvailable(state, id, peerId)) { | ||
want[id] = peerId | ||
setNotes(peer, id, state.clock[id], false) | ||
@@ -474,13 +474,14 @@ } | ||
} | ||
var peer_ids = Object.keys(state.peers) | ||
for(var feed_id in want) { | ||
var ignore_id = want[feed_id] | ||
eachFrom(peer_ids, ignore_id, function (peer_id) { | ||
var peer = state.peers[peer_id] | ||
if(peer.clock && peer.clock[feed_id] || 0 > state.clock[feed_id] || 0) { | ||
var peerIds = Object.keys(state.peers) | ||
for (var feedId in want) { | ||
var ignoreId = want[feedId] | ||
eachFrom(peerIds, ignoreId, function (peerId) { | ||
var peer = state.peers[peerId] | ||
if (peer.clock && peer.clock[feedId] || 0 > state.clock[feedId] || 0) { | ||
peer.replicating = peer.replicating || {} | ||
peer.replicating[feed_id] = peer.replicating[feed_id] || { | ||
tx: false, rx: true, sent: -1, requested: state.clock[feed_id] | ||
peer.replicating[feedId] = peer.replicating[feedId] || { | ||
tx: false, rx: true, sent: -1, requested: state.clock[feedId] | ||
} | ||
setNotes(peer, feed_id, state.clock[feed_id], true) | ||
setNotes(peer, feedId, state.clock[feedId], true) | ||
peer.ts = ev.ts | ||
@@ -492,2 +493,3 @@ //returning true triggers the end of eachFrom | ||
} | ||
return state | ||
@@ -497,5 +499,5 @@ } | ||
exports.block = function (state, ev) { | ||
if(!ev.value) { | ||
if(state.blocks[ev.id]) delete state.blocks[ev.id][ev.target] | ||
if(isEmpty(state.blocks[ev.id])) | ||
if (!ev.value) { | ||
if (state.blocks[ev.id]) delete state.blocks[ev.id][ev.target] | ||
if (isEmpty(state.blocks[ev.id])) | ||
delete state.blocks[ev.id] | ||
@@ -506,16 +508,16 @@ } | ||
state.blocks[ev.id][ev.target] = true | ||
} | ||
//if we blocked this peer, and we are also connected to them. | ||
//then stop replicating immediately. | ||
if(state.id === ev.id && state.peers[ev.target]) { | ||
//end replication immediately. | ||
state.peers[ev.target].blocked = ev.value | ||
} | ||
//if we blocked this peer, and we are also connected to them. | ||
//then stop replicating immediately. | ||
if (state.id === ev.id && state.peers[ev.target]) { | ||
//end replication immediately. | ||
state.peers[ev.target].blocked = ev.value | ||
} | ||
for(var id in state.peers) { | ||
var peer = state.peers[id] | ||
if(!peer.replicating) continue | ||
if(id === ev.target && peer.replicating[ev.id]) | ||
setNotes(peer, ev.id, -1, false) | ||
for (var id in state.peers) { | ||
var peer = state.peers[id] | ||
if (!peer.replicating) continue | ||
if (id === ev.target && peer.replicating[ev.id]) | ||
setNotes(peer, ev.id, -1, false) | ||
} | ||
} | ||
@@ -522,0 +524,0 @@ |
56
index.js
@@ -34,3 +34,3 @@ const Events = require('./events') | ||
request: function (id, follows) { | ||
if(opts.isFeed && !opts.isFeed(id)) return | ||
if (opts.isFeed && !opts.isFeed(id)) return | ||
self.state = events.follow(self.state, {id: id, value: follows !== false, ts: timestamp()}) | ||
@@ -40,31 +40,33 @@ self.update() | ||
pause: function (id, paused) { | ||
self.state = events.pause(self.state, {id: id, paused: paused !== false}) | ||
self.state = events.pause(self.state, {id, paused: paused !== false}) | ||
self.update() | ||
}, | ||
block: function (id, target, value) { | ||
self.state = events.block(self.state, {id: id, target: target, value: value !== false, ts: timestamp()}) | ||
self.state = events.block(self.state, {id, target, value: value !== false, ts: timestamp()}) | ||
self.update() | ||
}, | ||
createStream: function (remote_id, version, client) { | ||
if(self.streams[remote_id]) | ||
self.streams[remote_id].end(new Error('reconnected to peer')) | ||
if(self.logging) console.log('EBT:conn', remote_id) | ||
var stream = self.streams[remote_id] = new Stream(this, remote_id, version, client, opts.isMsg, function (peerState) { | ||
opts.setClock(remote_id, peerState.clock) | ||
}) | ||
createStream: function (remoteId, version, client) { | ||
if (self.streams[remoteId]) | ||
self.streams[remoteId].end(new Error('reconnected to peer')) | ||
if (self.logging) console.log('EBT:conn', remoteId) | ||
function onClose(peerState) { | ||
opts.setClock(remoteId, peerState.clock) | ||
} | ||
var stream = new Stream(this, remoteId, version, client, opts.isMsg, onClose) | ||
self.streams[remoteId] = stream | ||
opts.getClock(remote_id, function (err, clock) { | ||
opts.getClock(remoteId, (err, clock) => { | ||
//check if peer exists in state, because we may | ||
//have disconect in the meantime | ||
if(self.state.peers[remote_id]) | ||
if (self.state.peers[remoteId]) | ||
stream.clock(err ? {} : clock) | ||
}) | ||
return stream | ||
}, | ||
_retrive: function (err, msg) { | ||
if(msg) { | ||
if (msg) { | ||
self.state = events.retrive(self.state, msg) | ||
self.update() | ||
} | ||
else { | ||
} else { | ||
//this should never happen. | ||
@@ -84,18 +86,19 @@ //replication for this feed is in bad state now. | ||
//for that peer/stream. | ||
for(var peer in self.state.peers) { | ||
for (var peer in self.state.peers) { | ||
var state = self.state.peers[peer] | ||
while(state.retrive.length) { | ||
while (state.retrive.length) { | ||
var id = state.retrive.shift() | ||
if(state.replicating[id]) | ||
if (state.replicating[id]) | ||
opts.getAt({ | ||
id: id, | ||
sequence:state.replicating[id].sent+1 | ||
id, | ||
sequence: state.replicating[id].sent+1 | ||
}, self._retrive) | ||
} | ||
} | ||
if(self.state.receive.length) { | ||
if (self.state.receive.length) { | ||
var ev = self.state.receive.shift() | ||
opts.append(ev.value, function (err) { | ||
if(err) { | ||
if(self.logging) console.error('EBT:err', err) | ||
if (err) { | ||
if (self.logging) console.error('EBT:err', err) | ||
self.block(ev.value.author, ev.id, true) | ||
@@ -105,3 +108,4 @@ } | ||
} | ||
for(var k in self.streams) | ||
for (var k in self.streams) | ||
self.streams[k].resume() | ||
@@ -111,9 +115,9 @@ }, | ||
var int = setInterval(function () { | ||
var int = setInterval(() => { | ||
self.state = events.timeout(self.state, {ts: timestamp()}) | ||
self.update() | ||
}, state.timeout) | ||
if(int.unref) int.unref() | ||
if (int.unref) int.unref() | ||
return self | ||
} |
{ | ||
"name": "epidemic-broadcast-trees", | ||
"description": "bandwidth efficient broadcast gossip", | ||
"version": "9.0.0", | ||
"version": "9.0.1", | ||
"homepage": "https://github.com/dominictarr/epidemic-broadcast-trees", | ||
@@ -6,0 +6,0 @@ "repository": { |
@@ -11,14 +11,14 @@ /* | ||
module.exports = function (state) { | ||
var prog = {start:0, current: 0, target: 0} | ||
for(var peer_id in state.peers) { | ||
var prog = { start: 0, current: 0, target: 0 } | ||
for (var peer_id in state.peers) { | ||
var peer = state.peers[peer_id] | ||
for(var feed_id in peer.replicating) { | ||
for (var feed_id in peer.replicating) { | ||
var rep = peer.replicating[feed_id] | ||
//progress for sending initial note | ||
prog.target++ | ||
if(rep.sent != null) prog.current++ | ||
if (rep.sent != null) prog.current++ | ||
prog.target++ | ||
if(rep.requested != null) prog.current++ | ||
if (rep.requested != null) prog.current++ | ||
@@ -28,3 +28,3 @@ var seq = peer.clock[feed_id] | ||
if(rep.rx && rep.requested != null && rep.requested > -1 && lseq < seq) { | ||
if (rep.rx && rep.requested != null && rep.requested > -1 && lseq < seq) { | ||
prog.current += lseq - rep.requested | ||
@@ -34,3 +34,3 @@ prog.target += seq - rep.requested | ||
if(rep.tx && seq > -1 && seq < lseq) { | ||
if (rep.tx && seq > -1 && seq < lseq) { | ||
prog.current += rep.sent - seq | ||
@@ -37,0 +37,0 @@ prog.target += lseq - seq |
@@ -32,7 +32,7 @@ const v3 = require('./v3') | ||
this.peer.update() | ||
if(this.source) this.source.resume() | ||
if (this.source) this.source.resume() | ||
} | ||
EBTStream.prototype.write = function (data) { | ||
if(this.peer.logging) { | ||
if (this.peer.logging) { | ||
if (Buffer.isBuffer(data)) | ||
@@ -43,4 +43,6 @@ console.log("EBT:recv binary (" + this.peer.id + ")", "0x" + data.toString('hex')) | ||
} | ||
if(this.ended) throw new Error('write after ebt stream ended:'+this.remote) | ||
if(this.isMsg(data)) { | ||
if (this.ended) throw new Error('write after ebt stream ended:'+this.remote) | ||
if (this.isMsg(data)) { | ||
this.peer.state = events.receive(this.peer.state, { | ||
@@ -65,5 +67,5 @@ id: this.remote, | ||
//check if we have already ended | ||
if(!this.peer.state.peers[this.remote]) return | ||
if (!this.peer.state.peers[this.remote]) return | ||
if(this.peer.logging) console.log('EBT:dcon', this.remote) | ||
if (this.peer.logging) console.log('EBT:dcon', this.remote) | ||
@@ -75,7 +77,7 @@ var peerState = this.peer.state.peers[this.remote] | ||
}) | ||
if(this._onClose) this._onClose(peerState) | ||
if (this._onClose) this._onClose(peerState) | ||
//remove from the peer... | ||
delete this.peer.streams[this.remote] | ||
if(this.source && !this.source.ended) this.source.abort(err) | ||
if(this.sink && !this.sink.ended) this.sink.end(err) | ||
if (this.source && !this.source.ended) this.source.abort(err) | ||
if (this.sink && !this.sink.ended) this.sink.end(err) | ||
} | ||
@@ -97,9 +99,10 @@ | ||
EBTStream.prototype.resume = function () { | ||
if (!this.sink || this.sink.paused) return | ||
var state = this.peer.state.peers[this.remote] | ||
if(!this.sink || this.sink.paused) return | ||
while(this.canSend()) { | ||
if(state.blocked) | ||
while (this.canSend()) { | ||
if (state.blocked) { | ||
this.end() | ||
else if(state.msgs.length) { | ||
if(this.peer.logging) { | ||
} else if (state.msgs.length) { | ||
if (this.peer.logging) { | ||
if (Buffer.isBuffer(state.msgs[0])) | ||
@@ -111,7 +114,6 @@ console.log("EBT:send binary (" + this.peer.id + ")", "0x" + state.msgs[0].toString('hex')) | ||
this.sink.write(state.msgs.shift()) | ||
} | ||
else { | ||
} else { | ||
var notes = state.notes | ||
state.notes = null | ||
if(this.peer.logging) { | ||
if (this.peer.logging) { | ||
const formattedNotes = {} | ||
@@ -118,0 +120,0 @@ for (let feed in notes) { |
@@ -85,3 +85,22 @@ var test = require('tape') | ||
test("blocking false on a connected peer does nothing", function (t) { | ||
var state = events.initialize('alice') | ||
state.clock = {alice: 3, charles: 2} | ||
state = events.connect(state, {id: 'charles', ts: 1, client: false}) | ||
//state = events.peerClock(state, {id: 'charles', value: { charles: 2 }}) | ||
state = events.peerClock(state, {id: 'charles', value: {'alice': 0, 'charles': 0}}) | ||
state = events.follow(state, {id: 'alice', value: true}) | ||
state = events.follow(state, {id: 'charles', value: true}) | ||
state = events.notes(state, {id: 'charles', value: {charles: 2}}) | ||
const existingNote = state.peers['charles'].notes['alice'] | ||
// this should not change anything | ||
state = events.block(state, {id: 'alice', target: 'charles', value: false}) | ||
t.equal(state.peers['charles'].notes['alice'], existingNote) | ||
t.end() | ||
}) | ||
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
106522
2433