exchange-protocol
Advanced tools
Comparing version 0.0.1 to 4.0.0
152
index.js
@@ -0,6 +1,12 @@ | ||
// SPDX-License-Identifier: GPL-3.0-or-later | ||
const { Exchange } = require('./messages') | ||
const NAME = 'exchange' | ||
class CoreExchangeExtension { | ||
const VALUE_ENCODING_BUFFER = 0 | ||
const VALUE_ENCODING_JSON = 1 | ||
const VALUE_ENCODING_UTF8 = 2 | ||
class ExchangeExtension { | ||
constructor (handlers, opts = {}) { | ||
this.name = 'exchange' | ||
this.name = NAME | ||
this.encoding = Exchange | ||
@@ -15,2 +21,3 @@ this.requestTimeout = opts.requestTimeout || 15000 | ||
}, | ||
onerror (e) { throw e /* Warning: error handler was not supplied! */ }, | ||
...handlers | ||
@@ -27,4 +34,3 @@ } | ||
this.pendingRequests = { | ||
} | ||
this.pendingRequests = {} | ||
} | ||
@@ -35,25 +41,33 @@ | ||
if (msg.manifest) { | ||
const m = { | ||
id: msg.manifest.id, | ||
namespace: msg.manifest.namespace, | ||
keys: msg.manifest.feeds.map(f => f.key.toString('hex')), | ||
meta: msg.manifest.feeds.map(f => { | ||
const meta = {} | ||
f.headers.forEach(kv => { | ||
meta[kv.key] = JSON.parse(kv.value) | ||
}) | ||
return meta | ||
}) | ||
} | ||
const { manifest } = msg | ||
// Register what remote offered. | ||
this.remoteOfferedKeys[m.namespace] = this.remoteOfferedKeys[m.namespace] || [] | ||
m.keys.forEach(key => { | ||
if (this.remoteOfferedKeys[m.namespace].indexOf(key) === -1) { | ||
this.remoteOfferedKeys[m.namespace].push(key) | ||
} | ||
// Lazy init remote offer namespace | ||
this.remoteOfferedKeys[manifest.namespace] = this.remoteOfferedKeys[manifest.namespace] || {} | ||
manifest.feeds.forEach(feed => { | ||
// Register what remote offered. | ||
this.remoteOfferedKeys[manifest.namespace][feed.key.toString('hex')] = true | ||
feed.headers = feed.headers.reduce((lut, header) => { | ||
switch (header.valueEncoding) { | ||
case VALUE_ENCODING_UTF8: | ||
lut[header.key] = header.value.toString('utf8') | ||
break | ||
case VALUE_ENCODING_JSON: | ||
lut[header.key] = JSON.parse(header.value.toString('utf8')) | ||
break | ||
default: | ||
case VALUE_ENCODING_BUFFER: | ||
lut[header.key] = header.value | ||
break | ||
} | ||
return lut | ||
}, {}) | ||
}) | ||
process.nextTick(() => this.handlers.onmanifest(m, peer)) | ||
const accept = keys => { | ||
return this.sendRequest (manifest.namespace, keys, manifest.id, peer) | ||
} | ||
process.nextTick(() => this.handlers.onmanifest(manifest, accept, peer)) | ||
} else if (msg.req) { | ||
peer.stats.requestsRecv++ | ||
const req = msg.req | ||
@@ -70,21 +84,12 @@ | ||
} catch (err) { | ||
// Todo: peer#kill() refers to decentstack/peer-connection | ||
// on error we proably want to disconnect or close the channel | ||
// but we need to have hypercore support extensions before we know which | ||
// action to take. | ||
peer.kill(err) | ||
this.handlers.onerror(err) | ||
} | ||
} | ||
sendManifest (namespace, manifest, cb) { | ||
sendManifest (namespace, manifest, peer, cb) { | ||
if (typeof peer === 'function') return this.sendManifest(namespace, manifest, undefined, peer) | ||
const mid = ++this.__mctr | ||
// Save which keys were offered on this connection | ||
this.offeredKeys[namespace] = this.offeredKeys[namespace] || [] | ||
this.offeredKeys[namespace] = this.offeredKeys[namespace] || {} | ||
manifest.keys.forEach(k => { | ||
if (this.offeredKeys[namespace].indexOf(k) === -1) { | ||
this.offeredKeys[namespace].push(k) | ||
} | ||
}) | ||
const message = { | ||
@@ -94,13 +99,25 @@ manifest: { | ||
id: mid, | ||
feeds: manifest.keys.map((key, n) => { | ||
const meta = manifest.meta[n] | ||
return { | ||
key: Buffer.from(key, 'hex'), | ||
headers: Object.keys(meta).map(k => { | ||
return { | ||
key: k, | ||
value: JSON.stringify(meta[k]) | ||
} | ||
}) | ||
} | ||
feeds: manifest.map(feed => { | ||
const strKey = feed.key.toString('hex') | ||
this.offeredKeys[namespace][strKey] = 1 | ||
const arrayHeaders = [] | ||
Object.keys(feed.headers).forEach(key => { | ||
const h = { key } | ||
const v = feed.headers[key] | ||
if (Buffer.isBuffer(v)) { | ||
h.valueEncoding = VALUE_ENCODING_BUFFER | ||
h.value = v | ||
} else if (typeof v === 'string') { | ||
h.valueEncoding = VALUE_ENCODING_UTF8 | ||
h.value = Buffer.from(v, 'utf8') | ||
} else { | ||
h.valueEncoding = VALUE_ENCODING_JSON | ||
h.value = Buffer.from(JSON.stringify(v), 'utf8') | ||
} | ||
arrayHeaders.push(h) | ||
}) | ||
feed.headers = arrayHeaders | ||
return feed | ||
}) | ||
@@ -112,2 +129,3 @@ } | ||
let triggered = false | ||
let timerId = null | ||
const race = (err, f) => { | ||
@@ -117,2 +135,3 @@ if (!triggered) { | ||
delete this.pendingRequests[mid] | ||
clearTimeout(timerId) // cleanup pending timer | ||
cb(err, f) | ||
@@ -124,3 +143,3 @@ } | ||
setTimeout(() => { | ||
timerId = setTimeout(() => { | ||
race(new ManifestResponseTimedOutError()) | ||
@@ -130,7 +149,8 @@ }, this.requestTimeout) | ||
this.send(message) | ||
if (peer) this.send(message, peer) | ||
else this.broadcast(message) | ||
return mid | ||
} | ||
sendRequest (namespace, keys, manifestId) { | ||
sendRequest (namespace, keys, manifestId, peer) { | ||
this.requestedKeys[namespace] = this.requestedKeys[namespace] || [] | ||
@@ -150,3 +170,5 @@ keys.forEach(k => { | ||
} | ||
this.send(message) | ||
if (peer) this.send(message, peer) | ||
else this.broadcast(message) | ||
} | ||
@@ -162,3 +184,3 @@ | ||
/* | ||
* Each peer allows offered-keys, requested-keys and the ~~exchange-key~~ | ||
* Each peer allows offered-keys and requested-keys | ||
* to be replicated on the stream | ||
@@ -170,6 +192,2 @@ * negotiated = offered - requested for each namespace | ||
const m = {} | ||
// Presumably not needed anymore after proto:v7 upgrade, | ||
// the exchangeKey is implicitly allowed otherwise we wouldn't | ||
// be exchanging any messages to begin with. | ||
// m[this.exchangeChannel.key.hexSlice()] = null | ||
@@ -186,4 +204,2 @@ Object.keys(this.offeredKeys).forEach(ns => { | ||
module.exports = CoreExchangeExtension | ||
class ManifestResponseTimedOutError extends Error { | ||
@@ -198,1 +214,19 @@ constructor (msg = 'timeout while waiting for request after manifest', ...params) { | ||
} | ||
module.exports = function HostAdapter (extensionHost, ...a) { | ||
const inst = new ExchangeExtension(...a) | ||
const ext = extensionHost.registerExtension(NAME, inst) | ||
// This might be exclusive for hypercore/hypercore-protocol hosts | ||
// we're already injecting send/broadcast in decentstack | ||
inst.send = (...a) => ext.send(...a) | ||
inst.broadcast = (...a) => { | ||
if (typeof ext.broadcast === 'function') { | ||
ext.broadcast(...a) | ||
} else { | ||
ext.send(...a) | ||
} | ||
} | ||
return inst | ||
} | ||
module.exports.ExchangeExtension = ExchangeExtension |
@@ -331,3 +331,5 @@ // This file is auto generated by the protocol-buffers compiler | ||
var enc = [ | ||
encodings.string | ||
encodings.string, | ||
encodings.bytes, | ||
encodings.varint | ||
] | ||
@@ -345,5 +347,9 @@ | ||
if (defined(obj.value)) { | ||
var len = enc[0].encodingLength(obj.value) | ||
var len = enc[1].encodingLength(obj.value) | ||
length += 1 + len | ||
} | ||
if (defined(obj.valueEncoding)) { | ||
var len = enc[2].encodingLength(obj.valueEncoding) | ||
length += 1 + len | ||
} | ||
return length | ||
@@ -362,5 +368,10 @@ } | ||
buf[offset++] = 18 | ||
enc[0].encode(obj.value, buf, offset) | ||
offset += enc[0].encode.bytes | ||
enc[1].encode(obj.value, buf, offset) | ||
offset += enc[1].encode.bytes | ||
} | ||
if (defined(obj.valueEncoding)) { | ||
buf[offset++] = 24 | ||
enc[2].encode(obj.valueEncoding, buf, offset) | ||
offset += enc[2].encode.bytes | ||
} | ||
encode.bytes = offset - oldOffset | ||
@@ -377,3 +388,4 @@ return buf | ||
key: "", | ||
value: "" | ||
value: null, | ||
valueEncoding: 0 | ||
} | ||
@@ -397,5 +409,9 @@ var found0 = false | ||
case 2: | ||
obj.value = enc[0].decode(buf, offset) | ||
offset += enc[0].decode.bytes | ||
obj.value = enc[1].decode(buf, offset) | ||
offset += enc[1].decode.bytes | ||
break | ||
case 3: | ||
obj.valueEncoding = enc[2].decode(buf, offset) | ||
offset += enc[2].decode.bytes | ||
break | ||
default: | ||
@@ -402,0 +418,0 @@ offset = skip(prefix & 7, buf, offset) |
{ | ||
"name": "exchange-protocol", | ||
"version": "0.0.1", | ||
"version": "4.0.0", | ||
"description": "Hypercore-extension that allows peers to exchange feed-descriptors", | ||
@@ -8,6 +8,6 @@ "main": "index.js", | ||
"author": "Tony Ivanov", | ||
"license": "ISC", | ||
"license": "GPL-3.0-or-later", | ||
"private": false, | ||
"scripts": { | ||
"test": "tape test/*", | ||
"test": "tape test.js", | ||
"debug": "node inspect $(npm bin)/tape test.js", | ||
@@ -18,3 +18,7 @@ "protobuf": "protocol-buffers -o messages.js schema.proto", | ||
"devDependencies": { | ||
"deferinfer": "^1.0.1", | ||
"hypercore": "^8.3.0", | ||
"hypercore-protocol": "^7.7.0", | ||
"protocol-buffers": "^4.1.0", | ||
"random-access-memory": "^3.1.1", | ||
"standard": "^14.3.1", | ||
@@ -21,0 +25,0 @@ "tape": "^4.11.0" |
133
README.md
@@ -1,4 +0,131 @@ | ||
# extension-protocol | ||
Hypercore-extension that allows exchange of feed-descriptors | ||
# exchange-protocol | ||
[exchange-protocol docs page](https://decentstack.org/#/exchange_proto) | ||
Hypercore-extension that enables two peers to exchange feed keys and | ||
descriptors. | ||
Following extension hosts are supported: | ||
- hypercore | ||
- hypercore-protocol | ||
- decentstack (built-in by default) | ||
Take a look at [exchange-protocol docs page](https://decentstack.org/#/exchange_proto) for a higher level description and sequence diagrams. | ||
## Usage | ||
Given the following manifest: | ||
```js | ||
// Header-values are allowed to contain primitive types or Buffers | ||
// if you want to use a custom encoder/decoder | ||
const myManifest = [ | ||
{ | ||
key: 'deadbeefdeadbeefdeadbeefdeadbeef', | ||
headers: { seq: 1, hello: 'world' } | ||
}, | ||
{ | ||
key: otherKey, | ||
headers: { seq: 55, hello: 'planet', title: 'My awesome feed' } | ||
} | ||
] | ||
``` | ||
And given the following handlers Object: | ||
```js | ||
const exchangeHandlers = { | ||
onmanifest (shared, accept) { | ||
shared.namespace // => String - 'default' | ||
shared.feeds // => Array<Object> - offered keys | ||
for (const feed of feeds) { // Print the manifest contents. | ||
console.log('Remote Shared feed:', feed.key.toString('hex')) | ||
console.log('Headers:', feed.headers, '\n') | ||
} | ||
// `accept` is a Function that directly responds to a given | ||
// manifest with `FeedRequest' | ||
const acceptedKeys = shared.feeds.filter(...).map(f => f.key) | ||
accept(acceptedKeys) // accept all keys | ||
// This is a stub, you should let your replication manager | ||
// take care of joining the other feeds into the feed stream. | ||
acceptedKeys.forEach(key => { | ||
coreStorage.get(key).replicate(true, { stream: myPeerStream }) | ||
}) | ||
}, | ||
onrequest (req) { | ||
req.manifest_id // => Number - increment | ||
req.namespace // => String - 'default' | ||
req.keys // => Array - requested keys | ||
// Another replication manager stub | ||
req.keys.forEach(key => { | ||
coreStorage.get(key).replicate(false, { stream: myPeerStream }) | ||
}) | ||
}, | ||
onerror (err) { | ||
throw err | ||
} | ||
} | ||
``` | ||
Using **hypercore-protocol** | ||
```js | ||
const Protocol = require('protocol') | ||
const stream = new Protocol(true) | ||
const ext = exchange(stream1, exchangeHandlers) | ||
const key = Buffer.from('deadbeefdeadbeefdeadbeefdeadbeef', 'hex') | ||
const channel = stream.open(key, { | ||
onopen () { | ||
ext.sendManifest('default', myManifest, ch1, (err, acceptedKeys) => { | ||
console.log('Remote peer selected', acceptedKeys) | ||
}) | ||
} | ||
}) | ||
``` | ||
Or using vanilla **hypercore** | ||
```js | ||
const hypercore = require('hypercore') | ||
const ram = require('random-access-memory') | ||
const exchange = require('exchange-protocol') | ||
const feed = hypercore(ram) | ||
const ext = exchange(feed, exchangeHandlers) | ||
feed.on('peer-open', peer => { | ||
// Arguments: namespace, myManifest, target, callback | ||
ext.sendManifest('default', manifest, peer, (err, acceptedKeys) => { | ||
// Callback invoked on response from remote peer. | ||
// The `requestedKeys` are a copy of `req` param in `onrequest`, | ||
// they should be handeled in onrequest. | ||
}) | ||
}) | ||
``` | ||
**Result** | ||
In a real world scenario your peer would not be communicating with it self. | ||
But in this example, if we were to send `myManifest` to the `exchangeHandlers` the log lines would produce the following output: | ||
``` | ||
> Remote shared feed: deadbeefdeadbeefdeadbeefdeadbeef | ||
> Headers: { seq: 1, hello: 'world' } | ||
> | ||
> Remote shared feed: 1234feed43afdeafa41efeed4124beeb | ||
> Headers: { seq: 55, hello: 'planet', title: 'My awesome feed' } | ||
> | ||
``` | ||
## License | ||
GNU GPLv3 |
Sorry, the diff of this file is not supported yet
Copyleft License
License(Experimental) Copyleft license information was found.
Found 1 instance in 1 package
Mixed license
License(Experimental) Package contains multiple licenses.
Found 1 instance in 1 package
Non-permissive License
License(Experimental) A license not known to be considered permissive was found.
Found 1 instance in 1 package
Copyleft License
License(Experimental) Copyleft license information was found.
Found 1 instance in 1 package
Mixed license
License(Experimental) Package contains multiple licenses.
Found 1 instance in 1 package
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
Non-permissive License
License(Experimental) A license not known to be considered permissive was found.
Found 1 instance in 1 package
37074
7
779
1
132
7