@peerbit/rpc
Advanced tools
Comparing version 2.0.1 to 2.1.0
import { AbstractType } from "@dao-xyz/borsh"; | ||
import { PublicSignKey } from "@peerbit/crypto"; | ||
import { RPCOptions, RPCResponse, PublishOptions } from "./io.js"; | ||
import { Address } from "@peerbit/program"; | ||
import { DataEvent } from "@peerbit/pubsub-interface"; | ||
import { Program } from "@peerbit/program"; | ||
export type SearchContext = (() => Address) | Program; | ||
export type CanRead = (key?: PublicSignKey) => Promise<boolean> | boolean; | ||
export type RPCSetupOptions<Q, R> = { | ||
@@ -13,13 +10,10 @@ topic: string; | ||
responseType: AbstractType<R>; | ||
canRead?: CanRead; | ||
responseHandler?: ResponseHandler<Q, R>; | ||
subscriptionData?: Uint8Array; | ||
}; | ||
export type QueryContext = { | ||
export type RequestContext = { | ||
from?: PublicSignKey; | ||
address: string; | ||
}; | ||
export type ResponseHandler<Q, R> = (query: Q, context: QueryContext) => Promise<R | undefined> | R | undefined; | ||
export type ResponseHandler<Q, R> = (query: Q, context: RequestContext) => Promise<R | undefined> | R | undefined; | ||
export declare class RPC<Q, R> extends Program<RPCSetupOptions<Q, R>> { | ||
canRead: CanRead; | ||
private _subscribed; | ||
@@ -29,2 +23,3 @@ private _responseHandler?; | ||
private _requestType; | ||
private _requestTypeIsUint8Array; | ||
private _responseType; | ||
@@ -42,3 +37,3 @@ private _rpcTopic; | ||
private _subscribing; | ||
private _subscribe; | ||
subscribe(data?: Uint8Array | undefined): Promise<void>; | ||
_onMessage(evt: CustomEvent<DataEvent>): Promise<void>; | ||
@@ -45,0 +40,0 @@ private seal; |
@@ -14,2 +14,3 @@ var __decorate = (this && this.__decorate) || function (decorators, target, key, desc) { | ||
import { waitFor } from "@peerbit/time"; | ||
import { equals } from "uint8arrays"; | ||
const createValueResolver = (type) => { | ||
@@ -24,3 +25,2 @@ if (type === Uint8Array) { | ||
export let RPC = class RPC extends Program { | ||
canRead; | ||
_subscribed = false; | ||
@@ -30,2 +30,3 @@ _responseHandler; | ||
_requestType; | ||
_requestTypeIsUint8Array; | ||
_responseType; | ||
@@ -42,10 +43,9 @@ _rpcTopic; | ||
this._requestType = args.queryType; | ||
this._requestTypeIsUint8Array = this._requestType === Uint8Array; | ||
this._responseType = args.responseType; | ||
this._responseResolver = new Map(); | ||
this._subscriptionMetaData = args.subscriptionData; | ||
this.canRead = args.canRead || (() => Promise.resolve(true)); | ||
this._getResponseValueFn = createValueResolver(this._responseType); | ||
this._getRequestValueFn = createValueResolver(this._requestType); | ||
this._keypair = await X25519Keypair.create(); | ||
await this._subscribe(); | ||
await this.subscribe(args.subscriptionData); | ||
} | ||
@@ -76,16 +76,32 @@ async _close(from) { | ||
_subscribing; | ||
async _subscribe() { | ||
async subscribe(data = this._subscriptionMetaData) { | ||
await this._subscribing; | ||
if (this._subscribed) { | ||
if (this._subscribed && | ||
(this._subscriptionMetaData === data || | ||
(this._subscriptionMetaData && | ||
data && | ||
equals(this._subscriptionMetaData, data)))) { | ||
return; | ||
} | ||
const prevSubscriptionData = this._subscriptionMetaData; | ||
this._subscriptionMetaData = data; | ||
const wasSubscribed = this._subscribed; | ||
this._subscribed = true; | ||
this._onMessageBinded = this._onMessageBinded || this._onMessage.bind(this); | ||
if (wasSubscribed) { | ||
await this.node.services.pubsub.unsubscribe(this.rpcTopic, { | ||
data: prevSubscriptionData, | ||
}); | ||
} | ||
this._subscribing = this.node.services.pubsub | ||
.subscribe(this.rpcTopic, { data: this._subscriptionMetaData }) | ||
.subscribe(this.rpcTopic, { data }) | ||
.then(() => { | ||
return this.node.services.pubsub.addEventListener("data", this._onMessageBinded); | ||
if (!wasSubscribed) { | ||
this.node.services.pubsub.addEventListener("data", this._onMessageBinded); | ||
} | ||
}); | ||
await this._subscribing; | ||
await this.node.services.pubsub.requestSubscribers(this.rpcTopic); | ||
if (!wasSubscribed) { | ||
await this.node.services.pubsub.requestSubscribers(this.rpcTopic); | ||
} | ||
logger.debug("subscribing to query topic (responses): " + this.rpcTopic); | ||
@@ -102,7 +118,3 @@ } | ||
const decrypted = await maybeEncrypted.decrypt(this.node.keychain); | ||
if (!(await this.canRead(message.sender))) { | ||
throw new AccessError(); | ||
} | ||
const response = await this._responseHandler(this._getRequestValueFn(decrypted), { | ||
address: this.rpcTopic, | ||
from: message.sender, | ||
@@ -114,3 +126,3 @@ }); | ||
// we use the peerId/libp2p identity for signatures, since we want to be able to send a message | ||
// with pubsub with a certain reciever. If we use (this.identity) we are going to use an identity | ||
// with pubsub with a certain receiver. If we use (this.identity) we are going to use an identity | ||
// that is now known in the .pubsub network, hence the message might not be delivired if we | ||
@@ -161,3 +173,3 @@ // send with { to: [RECIEVER] } param | ||
async seal(request, respondTo, options) { | ||
const requestData = this._requestType === Uint8Array | ||
const requestData = this._requestTypeIsUint8Array | ||
? request | ||
@@ -243,3 +255,3 @@ : serialize(request); | ||
// We are generatinga new encryption keypair for each send, so we now that when we get the responses, they are encrypted specifcally for me, and for this request | ||
// this allows us to easily disregard a bunch of message just beacuse they are for a different reciever! | ||
// this allows us to easily disregard a bunch of message just beacuse they are for a different receiver! | ||
const keypair = await X25519Keypair.create(); | ||
@@ -246,0 +258,0 @@ // send query and wait for replies in a generator like behaviour |
@@ -52,3 +52,3 @@ export class MissingResponsesError extends Error { | ||
if (missingReponses) { | ||
throw new MissingResponsesError("Did not recieve responses from all shards"); | ||
throw new MissingResponsesError("Did not receive responses from all shards"); | ||
} | ||
@@ -55,0 +55,0 @@ }; |
{ | ||
"name": "@peerbit/rpc", | ||
"version": "2.0.1", | ||
"version": "2.1.0", | ||
"description": "RPC calls for peers", | ||
@@ -35,11 +35,11 @@ "type": "module", | ||
"@dao-xyz/borsh": "^5.1.5", | ||
"@peerbit/crypto": "1.0.4", | ||
"@peerbit/crypto": "1.0.5", | ||
"@peerbit/logger": "1.0.1", | ||
"@peerbit/program": "2.1.0", | ||
"@peerbit/program": "2.2.0", | ||
"@peerbit/time": "1.0.2" | ||
}, | ||
"devDependencies": { | ||
"@peerbit/test-utils": "^1.0.15" | ||
"@peerbit/test-utils": "^1.0.16" | ||
}, | ||
"gitHead": "4752c301fb49b2f937b4039533ff5bc3537da3e0" | ||
"gitHead": "464e807d679e24b897b7811ac99d6f85fbd756f9" | ||
} |
@@ -29,6 +29,4 @@ import { | ||
import { waitFor } from "@peerbit/time"; | ||
import { equals } from "uint8arrays"; | ||
export type SearchContext = (() => Address) | Program; | ||
export type CanRead = (key?: PublicSignKey) => Promise<boolean> | boolean; | ||
export type RPCSetupOptions<Q, R> = { | ||
@@ -38,13 +36,11 @@ topic: string; | ||
responseType: AbstractType<R>; | ||
canRead?: CanRead; | ||
responseHandler?: ResponseHandler<Q, R>; | ||
subscriptionData?: Uint8Array; | ||
}; | ||
export type QueryContext = { | ||
export type RequestContext = { | ||
from?: PublicSignKey; | ||
address: string; | ||
}; | ||
export type ResponseHandler<Q, R> = ( | ||
query: Q, | ||
context: QueryContext | ||
context: RequestContext | ||
) => Promise<R | undefined> | R | undefined; | ||
@@ -64,4 +60,2 @@ | ||
export class RPC<Q, R> extends Program<RPCSetupOptions<Q, R>> { | ||
canRead: CanRead; | ||
private _subscribed = false; | ||
@@ -74,2 +68,3 @@ private _responseHandler?: ResponseHandler<Q, (R | undefined) | R>; | ||
private _requestType: AbstractType<Q> | Uint8ArrayConstructor; | ||
private _requestTypeIsUint8Array: boolean; | ||
private _responseType: AbstractType<R>; | ||
@@ -84,3 +79,2 @@ private _rpcTopic: string | undefined; | ||
private _getRequestValueFn: (decrypted: DecryptedThing<Q>) => Q; | ||
async open(args: RPCSetupOptions<Q, R>): Promise<void> { | ||
@@ -90,7 +84,5 @@ this._rpcTopic = args.topic ?? this._rpcTopic; | ||
this._requestType = args.queryType; | ||
this._requestTypeIsUint8Array = (this._requestType as any) === Uint8Array; | ||
this._responseType = args.responseType; | ||
this._responseResolver = new Map(); | ||
this._subscriptionMetaData = args.subscriptionData; | ||
this.canRead = args.canRead || (() => Promise.resolve(true)); | ||
this._getResponseValueFn = createValueResolver(this._responseType); | ||
@@ -100,3 +92,3 @@ this._getRequestValueFn = createValueResolver(this._requestType); | ||
this._keypair = await X25519Keypair.create(); | ||
await this._subscribe(); | ||
await this.subscribe(args.subscriptionData); | ||
} | ||
@@ -133,21 +125,42 @@ | ||
private _subscribing: Promise<void>; | ||
private async _subscribe(): Promise<void> { | ||
async subscribe(data = this._subscriptionMetaData): Promise<void> { | ||
await this._subscribing; | ||
if (this._subscribed) { | ||
if ( | ||
this._subscribed && | ||
(this._subscriptionMetaData === data || | ||
(this._subscriptionMetaData && | ||
data && | ||
equals(this._subscriptionMetaData, data))) | ||
) { | ||
return; | ||
} | ||
const prevSubscriptionData = this._subscriptionMetaData; | ||
this._subscriptionMetaData = data; | ||
const wasSubscribed = this._subscribed; | ||
this._subscribed = true; | ||
this._onMessageBinded = this._onMessageBinded || this._onMessage.bind(this); | ||
if (wasSubscribed) { | ||
await this.node.services.pubsub.unsubscribe(this.rpcTopic, { | ||
data: prevSubscriptionData, | ||
}); | ||
} | ||
this._subscribing = this.node.services.pubsub | ||
.subscribe(this.rpcTopic, { data: this._subscriptionMetaData }) | ||
.subscribe(this.rpcTopic, { data }) | ||
.then(() => { | ||
return this.node.services.pubsub.addEventListener( | ||
"data", | ||
this._onMessageBinded! | ||
); | ||
if (!wasSubscribed) { | ||
this.node.services.pubsub.addEventListener( | ||
"data", | ||
this._onMessageBinded! | ||
); | ||
} | ||
}); | ||
await this._subscribing; | ||
await this.node.services.pubsub.requestSubscribers(this.rpcTopic); | ||
if (!wasSubscribed) { | ||
await this.node.services.pubsub.requestSubscribers(this.rpcTopic); | ||
} | ||
logger.debug("subscribing to query topic (responses): " + this.rpcTopic); | ||
@@ -166,11 +179,5 @@ } | ||
const decrypted = await maybeEncrypted.decrypt(this.node.keychain); | ||
if (!(await this.canRead(message.sender))) { | ||
throw new AccessError(); | ||
} | ||
const response = await this._responseHandler( | ||
this._getRequestValueFn(decrypted), | ||
{ | ||
address: this.rpcTopic, | ||
from: message.sender, | ||
@@ -185,3 +192,3 @@ } | ||
// we use the peerId/libp2p identity for signatures, since we want to be able to send a message | ||
// with pubsub with a certain reciever. If we use (this.identity) we are going to use an identity | ||
// with pubsub with a certain receiver. If we use (this.identity) we are going to use an identity | ||
// that is now known in the .pubsub network, hence the message might not be delivired if we | ||
@@ -250,6 +257,5 @@ // send with { to: [RECIEVER] } param | ||
) { | ||
const requestData = | ||
(this._requestType as any) === Uint8Array | ||
? (request as Uint8Array) | ||
: serialize(request); | ||
const requestData = this._requestTypeIsUint8Array | ||
? (request as Uint8Array) | ||
: serialize(request); | ||
@@ -259,2 +265,3 @@ const decryptedMessage = new DecryptedThing<Uint8Array>({ | ||
}); | ||
let maybeEncryptedMessage: MaybeEncrypted<Uint8Array> = decryptedMessage; | ||
@@ -368,3 +375,3 @@ | ||
// We are generatinga new encryption keypair for each send, so we now that when we get the responses, they are encrypted specifcally for me, and for this request | ||
// this allows us to easily disregard a bunch of message just beacuse they are for a different reciever! | ||
// this allows us to easily disregard a bunch of message just beacuse they are for a different receiver! | ||
const keypair = await X25519Keypair.create(); | ||
@@ -371,0 +378,0 @@ |
@@ -68,3 +68,3 @@ import { RPC } from "./controller"; | ||
throw new MissingResponsesError( | ||
"Did not recieve responses from all shards" | ||
"Did not receive responses from all shards" | ||
); | ||
@@ -71,0 +71,0 @@ } |
Sorry, the diff of this file is not supported yet
64917
1035
+ Added@peerbit/crypto@1.0.5(transitive)
+ Added@peerbit/program@2.2.0(transitive)
- Removed@peerbit/crypto@1.0.4(transitive)
- Removed@peerbit/program@2.1.0(transitive)
Updated@peerbit/crypto@1.0.5
Updated@peerbit/program@2.2.0