Comparing version 0.8.2 to 0.8.3
@@ -296,35 +296,37 @@ "use strict"; | ||
return; | ||
const MAX_STAGED_PUBS = 3; | ||
const pausable = Pausable(); | ||
pull(this.ssb.messagesByType({ type: 'pub', live: true, keys: false }), pull.filter((msg) => !msg.sync), pull.filter((msg) => msg.content && | ||
msg.content.address && | ||
Ref.isAddress(msg.content.address)), pausable, pull.drain((msg) => { | ||
try { | ||
const address = Ref.toMultiServerAddress(msg.content.address); | ||
const key = Ref.getKeyFromAddress(address); | ||
if (this.weBlockThem([address, { key }])) { | ||
this.ssb.conn.forget(address); | ||
setTimeout(() => { | ||
const MAX_STAGED_PUBS = 3; | ||
const pausable = Pausable(); | ||
pull(this.ssb.messagesByType({ type: 'pub', live: true, keys: false }), pull.filter((msg) => !msg.sync), pull.asyncMap((x, cb) => setTimeout(() => cb(null, x), 250)), pull.filter((msg) => msg.content && | ||
msg.content.address && | ||
Ref.isAddress(msg.content.address)), pausable, pull.drain((msg) => { | ||
try { | ||
const address = Ref.toMultiServerAddress(msg.content.address); | ||
const key = Ref.getKeyFromAddress(address); | ||
if (this.weBlockThem([address, { key }])) { | ||
this.ssb.conn.forget(address); | ||
} | ||
else if (!this.ssb.conn.internalConnDB().has(address)) { | ||
this.ssb.conn.stage(address, { key, type: 'pub' }); | ||
this.ssb.conn.remember(address, { | ||
key, | ||
type: 'pub', | ||
autoconnect: false, | ||
}); | ||
} | ||
} | ||
else if (!this.ssb.conn.internalConnDB().has(address)) { | ||
this.ssb.conn.stage(address, { key, type: 'pub' }); | ||
this.ssb.conn.remember(address, { | ||
key, | ||
type: 'pub', | ||
autoconnect: false, | ||
}); | ||
catch (err) { | ||
debug('cannot process discovered pub because: %s', err); | ||
} | ||
} | ||
catch (err) { | ||
debug('cannot process discovered pub because: %s', err); | ||
} | ||
})); | ||
pull(this.ssb.conn.internalConnStaging().liveEntries(), pull.drain((staged) => { | ||
const stagedPubs = staged.filter(([, data]) => data.type === 'pub'); | ||
if (stagedPubs.length >= MAX_STAGED_PUBS) { | ||
pausable.pause(); | ||
} | ||
else { | ||
pausable.resume(); | ||
} | ||
})); | ||
})); | ||
pull(this.ssb.conn.internalConnStaging().liveEntries(), pull.drain((staged) => { | ||
const stagedPubs = staged.filter(([, data]) => data.type === 'pub'); | ||
if (stagedPubs.length >= MAX_STAGED_PUBS) { | ||
pausable.pause(); | ||
} | ||
else { | ||
pausable.resume(); | ||
} | ||
})); | ||
}, 1000); | ||
} | ||
@@ -331,0 +333,0 @@ setupBluetoothDiscovery() { |
{ | ||
"name": "ssb-conn", | ||
"description": "SSB plugin for establishing and managing peer connections", | ||
"version": "0.8.2", | ||
"version": "0.8.3", | ||
"homepage": "https://github.com/staltz/ssb-conn", | ||
@@ -6,0 +6,0 @@ "main": "lib/index.js", |
@@ -342,48 +342,52 @@ import ConnHub = require('ssb-conn-hub'); | ||
type PubContent = {address?: string}; | ||
const MAX_STAGED_PUBS = 3; | ||
const pausable = Pausable(); | ||
setTimeout(() => { | ||
type PubContent = {address?: string}; | ||
const MAX_STAGED_PUBS = 3; | ||
const pausable = Pausable(); | ||
pull( | ||
this.ssb.messagesByType({type: 'pub', live: true, keys: false}), | ||
pull.filter((msg: any) => !msg.sync), | ||
pull.filter( | ||
(msg: Msg<PubContent>['value']) => | ||
msg.content && | ||
msg.content.address && | ||
Ref.isAddress(msg.content.address), | ||
), | ||
pausable, | ||
pull.drain((msg: Msg<PubContent>['value']) => { | ||
try { | ||
const address = Ref.toMultiServerAddress(msg.content.address!); | ||
const key = Ref.getKeyFromAddress(address); | ||
if (this.weBlockThem([address, {key}])) { | ||
this.ssb.conn.forget(address); | ||
} else if (!this.ssb.conn.internalConnDB().has(address)) { | ||
this.ssb.conn.stage(address, {key, type: 'pub'}); | ||
this.ssb.conn.remember(address, { | ||
key, | ||
type: 'pub', | ||
autoconnect: false, | ||
}); | ||
pull( | ||
this.ssb.messagesByType({type: 'pub', live: true, keys: false}), | ||
pull.filter((msg: any) => !msg.sync), | ||
// Don't drain that fast, so to give other DB draining tasks priority | ||
pull.asyncMap((x: any, cb: any) => setTimeout(() => cb(null, x), 250)), | ||
pull.filter( | ||
(msg: Msg<PubContent>['value']) => | ||
msg.content && | ||
msg.content.address && | ||
Ref.isAddress(msg.content.address), | ||
), | ||
pausable, | ||
pull.drain((msg: Msg<PubContent>['value']) => { | ||
try { | ||
const address = Ref.toMultiServerAddress(msg.content.address!); | ||
const key = Ref.getKeyFromAddress(address); | ||
if (this.weBlockThem([address, {key}])) { | ||
this.ssb.conn.forget(address); | ||
} else if (!this.ssb.conn.internalConnDB().has(address)) { | ||
this.ssb.conn.stage(address, {key, type: 'pub'}); | ||
this.ssb.conn.remember(address, { | ||
key, | ||
type: 'pub', | ||
autoconnect: false, | ||
}); | ||
} | ||
} catch (err) { | ||
debug('cannot process discovered pub because: %s', err); | ||
} | ||
} catch (err) { | ||
debug('cannot process discovered pub because: %s', err); | ||
} | ||
}), | ||
); | ||
}), | ||
); | ||
// Pause or resume the draining depending on the number of staged pubs | ||
pull( | ||
this.ssb.conn.internalConnStaging().liveEntries(), | ||
pull.drain((staged: Array<any>) => { | ||
const stagedPubs = staged.filter(([, data]) => data.type === 'pub'); | ||
if (stagedPubs.length >= MAX_STAGED_PUBS) { | ||
pausable.pause(); | ||
} else { | ||
pausable.resume(); | ||
} | ||
}), | ||
); | ||
// Pause or resume the draining depending on the number of staged pubs | ||
pull( | ||
this.ssb.conn.internalConnStaging().liveEntries(), | ||
pull.drain((staged: Array<any>) => { | ||
const stagedPubs = staged.filter(([, data]) => data.type === 'pub'); | ||
if (stagedPubs.length >= MAX_STAGED_PUBS) { | ||
pausable.pause(); | ||
} else { | ||
pausable.resume(); | ||
} | ||
}), | ||
); | ||
}, 1000); | ||
} | ||
@@ -390,0 +394,0 @@ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
128518
2301