hyperswarm
Advanced tools
Comparing version 3.0.0-beta1 to 3.0.0-beta2
85
index.js
@@ -14,2 +14,3 @@ const { EventEmitter } = require('events') | ||
const ERR_MISSING_TOPIC = 'Topic is required and must be a 32-byte buffer' | ||
const ERR_DESTROYED = 'Swarm has been destroyed' | ||
@@ -31,3 +32,7 @@ const ERR_DUPLICATE = 'Duplicate connection' | ||
this.keyPair = keyPair | ||
this.dht = new DHT() | ||
const networkOpts = {} | ||
if (opts.bootstrap) networkOpts.bootstrap = opts.bootstrap | ||
this.dht = new DHT(networkOpts) | ||
this.server = this.dht.createServer({ | ||
@@ -47,3 +52,6 @@ firewall: this._handleFirewall.bind(this), | ||
this._discovery = new Map() | ||
this._timer = new RetryTimer(this._requeue.bind(this)) | ||
this._timer = new RetryTimer(this._requeue.bind(this), { | ||
backoffs: opts.backoffs, | ||
jitter: opts.jitter | ||
}) | ||
this._queue = spq() | ||
@@ -93,6 +101,16 @@ | ||
_shouldConnect () { | ||
return this.connections.size < this.maxPeers && | ||
return !this.destroyed && | ||
this.connections.size < this.maxPeers && | ||
this._clientConnections < this.maxClientConnections | ||
} | ||
_shouldRequeue (peerInfo) { | ||
for (const topic of peerInfo.topics) { | ||
if (this._discovery.has(topic.toString('hex')) && !this.destroyed) { | ||
return true | ||
} | ||
} | ||
return false | ||
} | ||
// Called when the PeerQueue indicates a connection should be attempted. | ||
@@ -111,3 +129,3 @@ _attemptClientConnections () { | ||
// TODO: Support async firewalling at some point. | ||
if (!this._firewall(peerInfo.publicKey, null)) { | ||
if (!this._handleFirewall(peerInfo.publicKey, null)) { | ||
peerInfo.ban() | ||
@@ -123,2 +141,3 @@ this._flushMaybe(peerInfo) | ||
this.connections.add(conn) | ||
this._clientConnections++ | ||
@@ -131,3 +150,3 @@ let opened = false | ||
peerInfo._disconnected() | ||
this._timer.add(peerInfo) | ||
if (this._shouldRequeue(peerInfo)) this._timer.add(peerInfo) | ||
if (!opened) this._flushMaybe(peerInfo) | ||
@@ -141,4 +160,4 @@ }) | ||
peerInfo.client = true | ||
this.emit('connection', conn, peerInfo) | ||
this._flushMaybe(peerInfo) | ||
this.emit('connection', conn, peerInfo) | ||
}) | ||
@@ -149,7 +168,12 @@ } | ||
_handleFirewall (remotePublicKey, payload) { | ||
if (remotePublicKey.equals(this.keyPair.publicKey)) return false | ||
const existing = this.connections.get(remotePublicKey) | ||
if (existing && isOpen(existing)) return false | ||
if (remotePublicKey.equals(this.keyPair.publicKey)) return false | ||
if (existing) { | ||
if (existing.isInitiator === true && isOpen(existing)) return false | ||
} | ||
const peerInfo = this.peers.get(remotePublicKey.toString('hex')) | ||
if (peerInfo && peerInfo.banned) return false | ||
return this._firewall(remotePublicKey, payload) | ||
@@ -160,5 +184,11 @@ } | ||
_handleServerConnection (conn) { | ||
if (this.destroyed) { | ||
// TODO: Investigate why a final server connection can be received after close | ||
conn.on('error', noop) | ||
return conn.destroy(ERR_DESTROYED) | ||
} | ||
const existing = this.connections.get(conn.remotePublicKey) | ||
if (existing) { | ||
if (isOpen(existing)) { | ||
if (existing.isInitiator && isOpen(existing)) { | ||
conn.on('error', noop) | ||
@@ -168,3 +198,3 @@ conn.destroy(new Error(ERR_DUPLICATE)) | ||
} | ||
existing.on('error', noop) | ||
existing.destroy(new Error(ERR_DUPLICATE)) | ||
@@ -212,5 +242,8 @@ } | ||
const peerInfo = this._upsertPeer(peer.publicKey, peer.nodes) | ||
if (peerInfo) peerInfo._topic(topic) | ||
if (!peerInfo || this.connections.has(peer.publicKey)) return | ||
if (!peerInfo.prioritized || peerInfo.server) peerInfo._reset() | ||
if (peerInfo._updatePriority()) this._enqueue(peerInfo) | ||
if (peerInfo._updatePriority()) { | ||
this._enqueue(peerInfo) | ||
} | ||
} | ||
@@ -231,2 +264,3 @@ | ||
join (topic, opts = {}) { | ||
if (!topic) throw new Error(ERR_MISSING_TOPIC) | ||
const topicString = topic.toString('hex') | ||
@@ -244,2 +278,3 @@ if (this._discovery.has(topicString)) return this._discovery.get(topicString) | ||
leave (topic) { | ||
if (!topic) throw new Error(ERR_MISSING_TOPIC) | ||
const topicString = topic.toString('hex') | ||
@@ -254,4 +289,5 @@ if (!this._discovery.has(topicString)) return Promise.resolve() | ||
async flush () { | ||
const allFlushedPromises = [...this._discovery.values()].map(v => v.flushed()) | ||
await Promise.all(allFlushedPromises) | ||
const allFlushed = [...this._discovery.values()].map(v => v.flushed()) | ||
await Promise.all(allFlushed) | ||
if (!this._queue.length && !this.connections.pending) return Promise.resolve() | ||
return new Promise((resolve, reject) => { | ||
@@ -267,3 +303,19 @@ this._pendingFlushes.push({ | ||
async clear () { | ||
const cleared = Promise.allSettled([...this._discovery.values()].map(d => d.destroy())) | ||
this._discovery.clear() | ||
return cleared | ||
} | ||
async destroy () { | ||
if (this.destroyed) return | ||
this.destroyed = true | ||
this._timer.destroy() | ||
await this.clear() | ||
await this.dht.destroy() | ||
await this.server.close() | ||
while (this._pendingFlushes.length) { | ||
@@ -273,7 +325,10 @@ const flush = this._pendingFlushes.pop() | ||
} | ||
// TODO: Other destroy stuff | ||
for (const conn of this.connections) { | ||
conn.destroy() | ||
} | ||
} | ||
} | ||
function noop () {} | ||
function noop () { } | ||
@@ -280,0 +335,0 @@ function allowAll () { |
@@ -8,5 +8,8 @@ module.exports = class BulkTimer { | ||
this._pending = [] | ||
this._destroyed = false | ||
} | ||
destroy () { | ||
if (this._destroyed) return | ||
this._destroyed = true | ||
clearInterval(this._interval) | ||
@@ -24,2 +27,3 @@ this._interval = null | ||
add (info) { | ||
if (this._destroyed) return | ||
if (!this._interval) { | ||
@@ -26,0 +30,0 @@ this._interval = setInterval(this._ontick.bind(this), Math.floor(this._time * 0.66)) |
@@ -10,2 +10,10 @@ module.exports = class ConnectionSet { | ||
get pending () { | ||
let pending = 0 | ||
for (const conn of this) { | ||
if (!conn.id) pending++ | ||
} | ||
return pending | ||
} | ||
get size () { | ||
@@ -12,0 +20,0 @@ return this._byPublicKey.size |
@@ -5,5 +5,2 @@ const REFRESH_INTERVAL = 1000 * 60 * 10 // 10 min | ||
// TODO: Improvements for later | ||
// 1. Cache closest nodes | ||
module.exports = class PeerDiscovery { | ||
@@ -138,2 +135,5 @@ constructor (swarm, topic, { server = true, client = true, onpeer = noop, onerror = noop }) { | ||
} | ||
if (!this.isServer) return | ||
return this.swarm.dht.unannounce(this.topic, this.swarm.keyPair) | ||
} | ||
@@ -140,0 +140,0 @@ } |
@@ -0,1 +1,3 @@ | ||
const { EventEmitter } = require('events') | ||
const VERY_LOW_PRIORITY = 0 | ||
@@ -7,4 +9,6 @@ const LOW_PRIORITY = 1 | ||
module.exports = class PeerInfo { | ||
module.exports = class PeerInfo extends EventEmitter { | ||
constructor ({ publicKey, nodes }) { | ||
super() | ||
this.publicKey = publicKey | ||
@@ -21,2 +25,3 @@ this.nodes = nodes | ||
this.client = false | ||
this.topics = [] | ||
@@ -31,2 +36,5 @@ this.attempts = 0 | ||
this._flushTick = 0 | ||
// Used for topic multiplexing | ||
this._seenTopics = new Set() | ||
} | ||
@@ -77,2 +85,10 @@ | ||
_topic (topic) { | ||
const hex = topic.toString('hex') | ||
if (this._seenTopics.has(hex)) return | ||
this._seenTopics.add(hex) | ||
this.topics.push(topic) | ||
this.emit('topic', topic) | ||
} | ||
reconnect (val) { | ||
@@ -79,0 +95,0 @@ this.reconnecting = !!val |
@@ -1,3 +0,1 @@ | ||
const { EventEmitter } = require('events') | ||
const spq = require('shuffled-priority-queue') | ||
@@ -10,12 +8,12 @@ const BulkTimer = require('./bulk-timer') | ||
module.exports = class PeerQueue extends EventEmitter { | ||
constructor ({ onreadable = noop } = {}) { | ||
super() | ||
module.exports = class PeerQueue { | ||
constructor ({ onreadable = noop, backoffs = [BACKOFF_S, BACKOFF_M, BACKOFF_L] } = {}) { | ||
this._queue = spq() | ||
this._onreadable = onreadable | ||
this.destroyed = false | ||
const push = this._push.bind(this) | ||
this._sTimer = new BulkTimer(BACKOFF_S, push) | ||
this._mTimer = new BulkTimer(BACKOFF_M, push) | ||
this._lTimer = new BulkTimer(BACKOFF_L, push) | ||
this._sTimer = new BulkTimer(backoffs[0], push) | ||
this._mTimer = new BulkTimer(backoffs[1], push) | ||
this._lTimer = new BulkTimer(backoffs[2], push) | ||
} | ||
@@ -45,3 +43,3 @@ | ||
for (const peerInfo of batch) { | ||
if (peerInfo.update() === false) continue | ||
if (peerInfo._updatePriority() === false) continue | ||
peerInfo.queued = true | ||
@@ -60,2 +58,3 @@ this._queue.add(peerInfo) | ||
queue (peer) { | ||
if (this.destroyed) return | ||
const empty = !this._queue.head() | ||
@@ -68,5 +67,6 @@ peer.queued = true | ||
queueLater (peer) { | ||
if (this.destroyed) return | ||
const timer = this._selectRetryDelay(peer) | ||
if (!timer) return | ||
timer.push(peer) | ||
timer.add(peer) | ||
} | ||
@@ -79,4 +79,11 @@ | ||
} | ||
destroy () { | ||
this.destroyed = true | ||
this._sTimer.destroy() | ||
this._mTimer.destroy() | ||
this._lTimer.destroy() | ||
} | ||
} | ||
function noop () {} |
@@ -9,6 +9,9 @@ const BulkTimer = require('./bulk-timer') | ||
module.exports = class RetryTimer { | ||
constructor (push) { | ||
this._sTimer = new BulkTimer(BACKOFF_S, push) | ||
this._mTimer = new BulkTimer(BACKOFF_M, push) | ||
this._lTimer = new BulkTimer(BACKOFF_L, push) | ||
constructor (push, { backoffs = [BACKOFF_S, BACKOFF_M, BACKOFF_L], jitter = BACKOFF_JITTER } = {}) { | ||
this.jitter = jitter | ||
this.backoffs = backoffs | ||
this._sTimer = new BulkTimer(backoffs[0] + Math.round(jitter * Math.random()), push) | ||
this._mTimer = new BulkTimer(backoffs[1] + Math.round(jitter * Math.random()), push) | ||
this._lTimer = new BulkTimer(backoffs[2] + Math.round(jitter * Math.random()), push) | ||
} | ||
@@ -18,2 +21,3 @@ | ||
if (peerInfo.banned || !peerInfo.reconnecting || peerInfo.attempts > 3) return null | ||
if (peerInfo.attempts === 0) return this._sTimer | ||
if (peerInfo.proven) { | ||
@@ -20,0 +24,0 @@ switch (peerInfo.attempts) { |
{ | ||
"name": "hyperswarm", | ||
"version": "3.0.0-beta1", | ||
"version": "3.0.0-beta2", | ||
"description": "A distributed networking stack for connecting peers", | ||
@@ -10,2 +10,5 @@ "dependencies": { | ||
"devDependencies": { | ||
"hypercore-crypto": "^2.3.0", | ||
"math-random-seed": "^2.0.0", | ||
"nonsynchronous": "^1.2.0", | ||
"standard": "^12.0.1", | ||
@@ -12,0 +15,0 @@ "tape": "^5.2.2" |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
44174
16
1271
5