Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

epidemic-broadcast-trees

Package Overview
Dependencies
Maintainers
9
Versions
47
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

epidemic-broadcast-trees - npm Package Compare versions

Comparing version 9.0.0 to 9.0.1

204

events.js

@@ -15,3 +15,3 @@ 'use strict'

function isEmpty (o) {
for(var k in o) return false
for (var k in o) return false
return true

@@ -28,17 +28,17 @@ }

function isShared (state, id, peer_id) {
return state.follows[id] && !isBlocked(state, id, peer_id)
function isShared (state, id, peerId) {
return state.follows[id] && !isBlocked(state, id, peerId)
}
//check if a feed is already being replicated on another peer from ignore_id
function isAlreadyReplicating(state, feed_id, ignore_id) {
for(var id in state.peers) {
if(id !== ignore_id) {
//check if a feed is already being replicated on another peer from ignoreId
function isAlreadyReplicating(state, feedId, ignoreId) {
for (var id in state.peers) {
if (id !== ignoreId) {
var peer = state.peers[id]
if(peer.notes && getReceive(peer.notes[id])) return id
if (peer.notes && getReceive(peer.notes[id])) return id
// for replicating the node must have replicated something not just rx
// this fixed a partial replication bug where a node is unable to send the full log
var id_has_sent = peer.replicating && peer.replicating[id] && peer.replicating[id].sent != -1
if(peer.replicating && peer.replicating[feed_id] && peer.replicating[feed_id].rx && id_has_sent) return id
var idHasSent = peer.replicating && peer.replicating[id] && peer.replicating[id].sent != -1
if (peer.replicating && peer.replicating[feedId] && peer.replicating[feedId].rx && idHasSent) return id
}

@@ -52,17 +52,17 @@ }

function fixSeq (local, remote) {
if(local == null) return remote
if(local == -1 || remote == -1) return remote
if(remote == 0) return 0
if(remote > 0 && local > 0 && remote < local) return remote
if (local == null) return remote
if (local == -1 || remote == -1) return remote
if (remote == 0) return 0
if (remote > 0 && local > 0 && remote < local) return remote
return Math.max(local, remote)
}
//check if a feed is available from a peer apart from ignore_id
//check if a feed is available from a peer apart from ignoreId
function isAvailable(state, feed_id, ignore_id) {
for(var peer_id in state.peers) {
if(peer_id != ignore_id) {
var peer = state.peers[peer_id]
function isAvailable(state, feedId, ignoreId) {
for (var peerId in state.peers) {
if (peerId != ignoreId) {
var peer = state.peers[peerId]
//BLOCK: check wether id has blocked this peer
if((peer.clock && peer.clock[feed_id] || 0) > (state.clock[feed_id] || 0) && isShared(state, feed_id, peer_id)) {
if ((peer.clock && peer.clock[feedId] || 0) > (state.clock[feedId] || 0) && isShared(state, feedId, peerId)) {
return true

@@ -81,6 +81,6 @@ }

var i = keys.indexOf(key)
if(!~i) return
if (!~i) return
//start at 1 because we want to visit all keys but key.
for(var j = 1; j < keys.length; j++)
if(iter(keys[(j+i)%keys.length], j))
for (var j = 1; j < keys.length; j++)
if (iter(keys[(j+i) % keys.length], j))
return

@@ -94,3 +94,3 @@ }

var rep = peer.replicating[feed]
if(rep) {
if (rep) {
rep.rx = rx

@@ -131,6 +131,6 @@ rep.requested = seq

exports.connect = function (state, ev) {
if(state.peers[ev.id]) throw new Error('already connected to peer:'+ev.id)
if(typeof ev.client != 'boolean') throw new Error('connect.client must be boolean')
if (state.peers[ev.id]) throw new Error('already connected to peer:'+ev.id)
if (typeof ev.client != 'boolean') throw new Error('connect.client must be boolean')
// if(isBlocked(state, state.id, ev.id)) return state
// if (isBlocked(state, state.id, ev.id)) return state

@@ -161,3 +161,3 @@ state.peers[ev.id] = {

exports.peerClock = function (state, ev) {
if(!state.peers[ev.id])
if (!state.peers[ev.id])
throw new Error('peerClock called for:'+ev.id + ' but only connected to:'+ Object.keys(state.peers))

@@ -169,3 +169,3 @@ var peer = state.peers[ev.id]

//can error before a peer sends a massive handshake.
if(peer.replicating == null) return state
if (peer.replicating == null) return state

@@ -181,6 +181,6 @@ //always set an empty clock here, so that if we don't have anything

for(var id in state.follows) {
for (var id in state.follows) {
var seq = clock[id] || 0, lseq = state.clock[id] || 0
//BLOCK: check wether id has blocked this peer
if(isShared(state, id, ev.id) && seq !== -1 && seq !== state.clock[id]) {
if (isShared(state, id, ev.id) && seq !== -1 && seq !== state.clock[id]) {

@@ -209,7 +209,7 @@ //if we are already replicating, and this feed is at zero, ask for it anyway,

var replicating = false
if(!!state.follows[ev.id] !== ev.value) {
if (!!state.follows[ev.id] !== ev.value) {
state.follows[ev.id] = ev.value
for(var id in state.peers) {
for (var id in state.peers) {
var peer = state.peers[id]
if(!peer.clock || !peer.replicating || !isShared(state, ev.id, id)) continue
if (!peer.clock || !peer.replicating || !isShared(state, ev.id, id)) continue
//BLOCK: check wether this feed has has blocked this peer.

@@ -224,9 +224,9 @@ //..... don't replicate feeds with peers that have blocked them at all?

var seq = peer.clock[ev.id] || 0, lseq = state.clock[ev.id] || 0
if(seq === -1) {
if (seq === -1) {
//peer explicitly does not replicate this feed, don't ask for it.
}
else if(ev.value === false) { //unfollow
else if (ev.value === false) { //unfollow
setNotes(peer, ev.id, -1, false)
}
else if(ev.value === true && seq !== state.clock[ev.id]) {
else if (ev.value === true && seq !== state.clock[ev.id]) {
peer.replicating[ev.id] = {

@@ -246,5 +246,5 @@ rx: true, tx: false,

//check if any peer requires this msg
for(var id in state.peers) {
for (var id in state.peers) {
var peer = state.peers[id]
if(!peer.replicating) continue;
if (!peer.replicating) continue
//BLOCK: check wether id has blocked this peer

@@ -255,8 +255,8 @@ const author = exports.getMsgAuthor(msg)

if(rep && rep.tx && rep.sent === sequence - 1) {
if (rep && rep.tx && rep.sent === sequence - 1) {
rep.sent++
peer.msgs.push(msg)
if(rep.sent < state.clock[author]) {
if (rep.sent < state.clock[author]) {
//use continue, not return because we still need to loop through other peers.
if(~peer.retrive.indexOf(author)) continue
if (~peer.retrive.indexOf(author)) continue
peer.retrive.push(author)

@@ -270,5 +270,5 @@ }

function isAhead(seq1, seq2) {
if(seq2 === -1) return false
if(seq2 == null) return true
if(seq1 > seq2) return true
if (seq2 === -1) return false
if (seq2 == null) return true
if (seq1 > seq2) return true
}

@@ -280,9 +280,9 @@

//check if any peer requires this msg
if(state.clock[author] != null && state.clock[author] !== sequence - 1)
if (state.clock[author] != null && state.clock[author] !== sequence - 1)
return state //ignore
var lseq = state.clock[author] = sequence
for(var id in state.peers) {
for (var id in state.peers) {
var peer = state.peers[id]
if(!peer.clock || !peer.replicating || !isShared(state, author, id)) continue
if (!peer.clock || !peer.replicating || !isShared(state, author, id)) continue
//BLOCK: check wether msg.author has blocked this peer

@@ -294,3 +294,3 @@

if(rep && rep.tx && rep.sent == lseq - 1 && lseq > seq) {
if (rep && rep.tx && rep.sent == lseq - 1 && lseq > seq) {
peer.msgs.push(msg)

@@ -300,3 +300,3 @@ rep.sent++

//if we are ahead of this peer, and not in tx mode, let them know that.
else if(
else if (
isAhead(lseq, seq) &&

@@ -320,3 +320,3 @@ (rep ? !rep.tx && rep.sent != null : state.follows[author])

if(!state.peers[ev.id]) throw new Error('lost peer state:'+ev.id)
if (!state.peers[ev.id]) throw new Error('lost peer state:'+ev.id)

@@ -331,3 +331,3 @@ //we _know_ that this peer is upto at least this message now.

//if we havn't asked for this, ignore it. (this is remote speaking protocol wrong!)
if(!rep) return state
if (!rep) return state

@@ -338,3 +338,3 @@ peer.clock[author] = Math.max(peer.clock[author], sequence)

//if this message has already been seen, ignore.
if(state.clock[author] >= sequence) {
if (state.clock[author] >= sequence) {
if (rep.rx) {

@@ -351,3 +351,3 @@ setNotes(peer, author, state.clock[author], false)

//FORKS ignore additional messages if we have already found an invalid one.
if(isShared(state, exports.getMsgAuthor(ev.value), ev.id))
if (isShared(state, exports.getMsgAuthor(ev.value), ev.id))
state.receive.push(ev)

@@ -359,3 +359,2 @@ //Q: possibly update the receiving mode?

//XXX check if we are already receiving a feed

@@ -369,12 +368,12 @@ //and if so put this into lazy mode.

//this is to allow room for backwards compatible upgrades.
if(isObject(ev.value.clock))
if (isObject(ev.value.clock))
clock = ev.value.clock
var peer = state.peers[ev.id]
if(!peer) throw new Error('lost state of peer:'+ev.id)
if(!peer.clock) throw new Error("received notes, but has not set the peer's clock yet")
if (!peer) throw new Error('lost state of peer:'+ev.id)
if (!peer.clock) throw new Error("received notes, but has not set the peer's clock yet")
var count = 0
//if we are client, and this is the first notes we receive
if(!peer.replicating) {
if (!peer.replicating) {
peer.replicating = {}

@@ -384,3 +383,3 @@ state = exports.peerClock(state, {id: ev.id, value: state.peers[ev.id].clock})

for(var id in clock) {
for (var id in clock) {
count++

@@ -396,6 +395,6 @@

//BLOCK: or wether id has blocked this peer
if(!isShared(state, id, ev.id)) {
if(!peer.replicating[id])
setNotes(peer, id, -1)
peer.replicating[id] = {tx:false, rx:false, sent: -1, requested: -1}
if (!isShared(state, id, ev.id)) {
if (!peer.replicating[id])
setNotes(peer, id, -1, false)
peer.replicating[id] = {tx: false, rx: false, sent: -1, requested: -1}
}

@@ -405,3 +404,3 @@ else {

var replicating = isAlreadyReplicating(state, id, ev.id)
if(!rep) {
if (!rep) {
rep = peer.replicating[id] = {

@@ -415,4 +414,4 @@ tx: true,

}
else if(!rep.rx && seq > lseq) {
if(!replicating) {
else if (!rep.rx && seq > lseq) {
if (!replicating) {
peer.ts = ev.ts //remember ts, so we can switch this feed if necessary

@@ -428,3 +427,3 @@ setNotes(peer, id, lseq, true)

// received theirs.
if(seq > (_peer.clock[id] || 0)) {
if (seq > (_peer.clock[id] || 0)) {
peer.ts = ev.ts

@@ -441,5 +440,5 @@ setNotes(peer, id, lseq, true)

rep.sent = seq
if(lseq > seq) {
if(tx) peer.retrive.push(id)
else if(isReplicate) setNotes(peer, id, lseq, rep.rx)
if (lseq > seq) {
if (tx) peer.retrive.push(id)
else if (isReplicate) setNotes(peer, id, lseq, rep.rx)
}

@@ -455,15 +454,16 @@ }

var want = {}
for(var peer_id in state.peers) {
var peer = state.peers[peer_id]
for (var peerId in state.peers) {
var peer = state.peers[peerId]
//check if the peer hasn't received a message recently.
//if we havn't received a message from this peer recently
if((peer.ts || 0) + state.timeout < ev.ts) {
if ((peer.ts || 0) + state.timeout < ev.ts) {
//check if they have claimed a higher sequence, but not sent us
for(var id in peer.replicating) {
for (var id in peer.replicating) {
var rep = peer.replicating[id]
//if yes, prepare to switch this feed to that peer
if(rep.rx && isAvailable(state, id, peer_id)) {
want[id] = peer_id
if (rep.rx && isAvailable(state, id, peerId)) {
want[id] = peerId
setNotes(peer, id, state.clock[id], false)

@@ -474,13 +474,14 @@ }

}
var peer_ids = Object.keys(state.peers)
for(var feed_id in want) {
var ignore_id = want[feed_id]
eachFrom(peer_ids, ignore_id, function (peer_id) {
var peer = state.peers[peer_id]
if(peer.clock && peer.clock[feed_id] || 0 > state.clock[feed_id] || 0) {
var peerIds = Object.keys(state.peers)
for (var feedId in want) {
var ignoreId = want[feedId]
eachFrom(peerIds, ignoreId, function (peerId) {
var peer = state.peers[peerId]
if (peer.clock && peer.clock[feedId] || 0 > state.clock[feedId] || 0) {
peer.replicating = peer.replicating || {}
peer.replicating[feed_id] = peer.replicating[feed_id] || {
tx: false, rx: true, sent: -1, requested: state.clock[feed_id]
peer.replicating[feedId] = peer.replicating[feedId] || {
tx: false, rx: true, sent: -1, requested: state.clock[feedId]
}
setNotes(peer, feed_id, state.clock[feed_id], true)
setNotes(peer, feedId, state.clock[feedId], true)
peer.ts = ev.ts

@@ -492,2 +493,3 @@ //returning true triggers the end of eachFrom

}
return state

@@ -497,5 +499,5 @@ }

exports.block = function (state, ev) {
if(!ev.value) {
if(state.blocks[ev.id]) delete state.blocks[ev.id][ev.target]
if(isEmpty(state.blocks[ev.id]))
if (!ev.value) {
if (state.blocks[ev.id]) delete state.blocks[ev.id][ev.target]
if (isEmpty(state.blocks[ev.id]))
delete state.blocks[ev.id]

@@ -506,16 +508,16 @@ }

state.blocks[ev.id][ev.target] = true
}
//if we blocked this peer, and we are also connected to them.
//then stop replicating immediately.
if(state.id === ev.id && state.peers[ev.target]) {
//end replication immediately.
state.peers[ev.target].blocked = ev.value
}
//if we blocked this peer, and we are also connected to them.
//then stop replicating immediately.
if (state.id === ev.id && state.peers[ev.target]) {
//end replication immediately.
state.peers[ev.target].blocked = ev.value
}
for(var id in state.peers) {
var peer = state.peers[id]
if(!peer.replicating) continue
if(id === ev.target && peer.replicating[ev.id])
setNotes(peer, ev.id, -1, false)
for (var id in state.peers) {
var peer = state.peers[id]
if (!peer.replicating) continue
if (id === ev.target && peer.replicating[ev.id])
setNotes(peer, ev.id, -1, false)
}
}

@@ -522,0 +524,0 @@

@@ -34,3 +34,3 @@ const Events = require('./events')

request: function (id, follows) {
if(opts.isFeed && !opts.isFeed(id)) return
if (opts.isFeed && !opts.isFeed(id)) return
self.state = events.follow(self.state, {id: id, value: follows !== false, ts: timestamp()})

@@ -40,31 +40,33 @@ self.update()

pause: function (id, paused) {
self.state = events.pause(self.state, {id: id, paused: paused !== false})
self.state = events.pause(self.state, {id, paused: paused !== false})
self.update()
},
block: function (id, target, value) {
self.state = events.block(self.state, {id: id, target: target, value: value !== false, ts: timestamp()})
self.state = events.block(self.state, {id, target, value: value !== false, ts: timestamp()})
self.update()
},
createStream: function (remote_id, version, client) {
if(self.streams[remote_id])
self.streams[remote_id].end(new Error('reconnected to peer'))
if(self.logging) console.log('EBT:conn', remote_id)
var stream = self.streams[remote_id] = new Stream(this, remote_id, version, client, opts.isMsg, function (peerState) {
opts.setClock(remote_id, peerState.clock)
})
createStream: function (remoteId, version, client) {
if (self.streams[remoteId])
self.streams[remoteId].end(new Error('reconnected to peer'))
if (self.logging) console.log('EBT:conn', remoteId)
function onClose(peerState) {
opts.setClock(remoteId, peerState.clock)
}
var stream = new Stream(this, remoteId, version, client, opts.isMsg, onClose)
self.streams[remoteId] = stream
opts.getClock(remote_id, function (err, clock) {
opts.getClock(remoteId, (err, clock) => {
//check if peer exists in state, because we may
//have disconect in the meantime
if(self.state.peers[remote_id])
if (self.state.peers[remoteId])
stream.clock(err ? {} : clock)
})
return stream
},
_retrive: function (err, msg) {
if(msg) {
if (msg) {
self.state = events.retrive(self.state, msg)
self.update()
}
else {
} else {
//this should never happen.

@@ -84,18 +86,19 @@ //replication for this feed is in bad state now.

//for that peer/stream.
for(var peer in self.state.peers) {
for (var peer in self.state.peers) {
var state = self.state.peers[peer]
while(state.retrive.length) {
while (state.retrive.length) {
var id = state.retrive.shift()
if(state.replicating[id])
if (state.replicating[id])
opts.getAt({
id: id,
sequence:state.replicating[id].sent+1
id,
sequence: state.replicating[id].sent+1
}, self._retrive)
}
}
if(self.state.receive.length) {
if (self.state.receive.length) {
var ev = self.state.receive.shift()
opts.append(ev.value, function (err) {
if(err) {
if(self.logging) console.error('EBT:err', err)
if (err) {
if (self.logging) console.error('EBT:err', err)
self.block(ev.value.author, ev.id, true)

@@ -105,3 +108,4 @@ }

}
for(var k in self.streams)
for (var k in self.streams)
self.streams[k].resume()

@@ -111,9 +115,9 @@ },

var int = setInterval(function () {
var int = setInterval(() => {
self.state = events.timeout(self.state, {ts: timestamp()})
self.update()
}, state.timeout)
if(int.unref) int.unref()
if (int.unref) int.unref()
return self
}
{
"name": "epidemic-broadcast-trees",
"description": "bandwidth efficient broadcast gossip",
"version": "9.0.0",
"version": "9.0.1",
"homepage": "https://github.com/dominictarr/epidemic-broadcast-trees",

@@ -6,0 +6,0 @@ "repository": {

@@ -11,14 +11,14 @@ /*

module.exports = function (state) {
var prog = {start:0, current: 0, target: 0}
for(var peer_id in state.peers) {
var prog = { start: 0, current: 0, target: 0 }
for (var peer_id in state.peers) {
var peer = state.peers[peer_id]
for(var feed_id in peer.replicating) {
for (var feed_id in peer.replicating) {
var rep = peer.replicating[feed_id]
//progress for sending initial note
prog.target++
if(rep.sent != null) prog.current++
if (rep.sent != null) prog.current++
prog.target++
if(rep.requested != null) prog.current++
if (rep.requested != null) prog.current++

@@ -28,3 +28,3 @@ var seq = peer.clock[feed_id]

if(rep.rx && rep.requested != null && rep.requested > -1 && lseq < seq) {
if (rep.rx && rep.requested != null && rep.requested > -1 && lseq < seq) {
prog.current += lseq - rep.requested

@@ -34,3 +34,3 @@ prog.target += seq - rep.requested

if(rep.tx && seq > -1 && seq < lseq) {
if (rep.tx && seq > -1 && seq < lseq) {
prog.current += rep.sent - seq

@@ -37,0 +37,0 @@ prog.target += lseq - seq

@@ -32,7 +32,7 @@ const v3 = require('./v3')

this.peer.update()
if(this.source) this.source.resume()
if (this.source) this.source.resume()
}
EBTStream.prototype.write = function (data) {
if(this.peer.logging) {
if (this.peer.logging) {
if (Buffer.isBuffer(data))

@@ -43,4 +43,6 @@ console.log("EBT:recv binary (" + this.peer.id + ")", "0x" + data.toString('hex'))

}
if(this.ended) throw new Error('write after ebt stream ended:'+this.remote)
if(this.isMsg(data)) {
if (this.ended) throw new Error('write after ebt stream ended:'+this.remote)
if (this.isMsg(data)) {
this.peer.state = events.receive(this.peer.state, {

@@ -65,5 +67,5 @@ id: this.remote,

//check if we have already ended
if(!this.peer.state.peers[this.remote]) return
if (!this.peer.state.peers[this.remote]) return
if(this.peer.logging) console.log('EBT:dcon', this.remote)
if (this.peer.logging) console.log('EBT:dcon', this.remote)

@@ -75,7 +77,7 @@ var peerState = this.peer.state.peers[this.remote]

})
if(this._onClose) this._onClose(peerState)
if (this._onClose) this._onClose(peerState)
//remove from the peer...
delete this.peer.streams[this.remote]
if(this.source && !this.source.ended) this.source.abort(err)
if(this.sink && !this.sink.ended) this.sink.end(err)
if (this.source && !this.source.ended) this.source.abort(err)
if (this.sink && !this.sink.ended) this.sink.end(err)
}

@@ -97,9 +99,10 @@

EBTStream.prototype.resume = function () {
if (!this.sink || this.sink.paused) return
var state = this.peer.state.peers[this.remote]
if(!this.sink || this.sink.paused) return
while(this.canSend()) {
if(state.blocked)
while (this.canSend()) {
if (state.blocked) {
this.end()
else if(state.msgs.length) {
if(this.peer.logging) {
} else if (state.msgs.length) {
if (this.peer.logging) {
if (Buffer.isBuffer(state.msgs[0]))

@@ -111,7 +114,6 @@ console.log("EBT:send binary (" + this.peer.id + ")", "0x" + state.msgs[0].toString('hex'))

this.sink.write(state.msgs.shift())
}
else {
} else {
var notes = state.notes
state.notes = null
if(this.peer.logging) {
if (this.peer.logging) {
const formattedNotes = {}

@@ -118,0 +120,0 @@ for (let feed in notes) {

@@ -85,3 +85,22 @@ var test = require('tape')

test("blocking false on a connected peer does nothing", function (t) {
var state = events.initialize('alice')
state.clock = {alice: 3, charles: 2}
state = events.connect(state, {id: 'charles', ts: 1, client: false})
//state = events.peerClock(state, {id: 'charles', value: { charles: 2 }})
state = events.peerClock(state, {id: 'charles', value: {'alice': 0, 'charles': 0}})
state = events.follow(state, {id: 'alice', value: true})
state = events.follow(state, {id: 'charles', value: true})
state = events.notes(state, {id: 'charles', value: {charles: 2}})
const existingNote = state.peers['charles'].notes['alice']
// this should not change anything
state = events.block(state, {id: 'alice', target: 'charles', value: false})
t.equal(state.peers['charles'].notes['alice'], existingNote)
t.end()
})
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc