websocket-rpc-protocol
Advanced tools
Comparing version
@@ -15,6 +15,10 @@ import { Signal } from 'easy-signal'; | ||
close(): void; | ||
send(action: string, ...rest: any[]): Promise<any>; | ||
sendAfterAuthed(action: string, ...rest: any[]): Promise<any>; | ||
sendAndListen<T extends GenericFunction>(action: string, ...args: [...any, T]): Promise<Unsubscribe>; | ||
listen<T extends GenericFunction>(listener: T): Unsubscribe; | ||
api: T; | ||
send<T = any>(action: string, ...args: [...any[], AbortSignal, GenericFunction]): Promise<T>; | ||
send<T = any>(action: string, ...args: [...any[], GenericFunction]): Promise<T>; | ||
send<T = any>(action: string, ...args: any[]): Promise<T>; | ||
sendAfterAuthed<T = any>(action: string, ...args: [...any[], AbortSignal, GenericFunction]): Promise<T>; | ||
sendAfterAuthed<T = any>(action: string, ...args: [...any[], GenericFunction]): Promise<T>; | ||
sendAfterAuthed<T = any>(action: string, ...args: any[]): Promise<T>; | ||
onMessage<T extends GenericFunction>(listener: T): Unsubscribe; | ||
auth(idToken?: string): Promise<string>; | ||
@@ -33,4 +37,4 @@ pause(pause?: boolean): void; | ||
} | ||
export default function createClient<T = {}>(url: string): ClientAPI<T> & T; | ||
export default function createClient<T = {}>(url: string): ClientAPI<T>; | ||
declare type GenericFunction = (...args: any[]) => any; | ||
export {}; |
@@ -5,3 +5,2 @@ import { signal } from 'easy-signal'; | ||
const MAX_RETRY_BACKOFF = 4; | ||
const NOOP = () => { }; | ||
export default function createClient(url) { | ||
@@ -152,2 +151,3 @@ const requests = {}; | ||
shouldConnect = false; | ||
clearTimeout(reconnectTimeout); | ||
closeSocket(); | ||
@@ -168,3 +168,3 @@ } | ||
} | ||
function listen(listener) { | ||
function onMessage(listener) { | ||
return listeners[1](listener); | ||
@@ -181,6 +181,2 @@ } | ||
} | ||
let onMessage; | ||
if (typeof args[args.length - 1] === 'function') { | ||
onMessage = args.pop(); | ||
} | ||
while (args.length && args[args.length - 1] === undefined) | ||
@@ -190,2 +186,19 @@ args.pop(); | ||
return new Promise((resolve, reject) => { | ||
let onMessage, abortSignal; | ||
if (typeof args[args.length - 1] === 'function') { | ||
onMessage = args.pop(); | ||
if (args[args.length - 1] instanceof AbortSignal) { | ||
abortSignal = args.pop(); | ||
abortSignal.onabort = () => { | ||
try { | ||
if (abortSignal.reason) | ||
reject(abortSignal.reason); | ||
else | ||
resolve(); | ||
send('_abort', r); | ||
} | ||
catch (err) { } | ||
}; | ||
} | ||
} | ||
requests[r] = { action, args, resolve, reject, onMessage }; | ||
@@ -212,13 +225,2 @@ try { | ||
} | ||
async function sendAndListen(action, ...args) { | ||
const listener = args.pop(); | ||
const ref = await send(action, ...args); | ||
listeners[ref] = signal(); | ||
const unsubscribe = listeners[ref](listener); | ||
return async () => { | ||
unsubscribe(); | ||
delete listeners[ref]; | ||
await send('unlisten', ref); | ||
}; | ||
} | ||
async function auth(idToken) { | ||
@@ -255,6 +257,7 @@ const uid = await send('auth', idToken); | ||
apply: (_, __, args) => send(name, ...args), | ||
get: (obj, prop) => prop in obj ? obj[prop] : proxy(NOOP, name ? `${name}.${prop}` : prop), | ||
get: (obj, prop) => prop in obj ? obj[prop] : (obj[prop] = proxy(() => { }, name ? `${name}.${prop}` : prop)), | ||
}); | ||
} | ||
return proxy({ | ||
return { | ||
api: proxy({}), | ||
connect, | ||
@@ -266,4 +269,3 @@ disconnect, | ||
sendAfterAuthed, | ||
sendAndListen, | ||
listen, | ||
onMessage, | ||
auth, | ||
@@ -278,3 +280,3 @@ getNow, | ||
onError, | ||
}); | ||
}; | ||
} |
@@ -6,3 +6,2 @@ import { signal, Signal } from 'easy-signal'; | ||
const MAX_RETRY_BACKOFF = 4; | ||
const NOOP = () => {}; | ||
@@ -24,6 +23,10 @@ export interface Client { | ||
close(): void; | ||
send(action: string, ...rest: any[]): Promise<any>; | ||
sendAfterAuthed(action: string, ...rest: any[]): Promise<any>; | ||
sendAndListen<T extends GenericFunction>(action: string, ...args: [...any, T]): Promise<Unsubscribe>; | ||
listen<T extends GenericFunction>(listener: T): Unsubscribe; | ||
api: T; | ||
send<T = any>(action: string, ...args: [...any[], AbortSignal, GenericFunction]): Promise<T>; | ||
send<T = any>(action: string, ...args: [...any[], GenericFunction]): Promise<T>; | ||
send<T = any>(action: string, ...args: any[]): Promise<T>; | ||
sendAfterAuthed<T = any>(action: string, ...args: [...any[], AbortSignal, GenericFunction]): Promise<T>; | ||
sendAfterAuthed<T = any>(action: string, ...args: [...any[], GenericFunction]): Promise<T>; | ||
sendAfterAuthed<T = any>(action: string, ...args: any[]): Promise<T>; | ||
onMessage<T extends GenericFunction>(listener: T): Unsubscribe; | ||
auth(idToken?: string): Promise<string>; | ||
@@ -41,3 +44,3 @@ pause(pause?: boolean): void; | ||
export default function createClient<T = {}>(url: string): ClientAPI<T> & T { | ||
export default function createClient<T = {}>(url: string): ClientAPI<T> { | ||
const requests: {[r: string]: Request} = {}; | ||
@@ -198,2 +201,3 @@ const afterConnectedQueue: Array<Request> = []; | ||
shouldConnect = false; | ||
clearTimeout(reconnectTimeout); | ||
closeSocket(); | ||
@@ -215,6 +219,7 @@ } | ||
function listen<T extends GenericFunction>(listener: T) { | ||
function onMessage<T extends GenericFunction>(listener: T) { | ||
return listeners[1](listener); | ||
} | ||
function send<T = any>(action: string, ...args: any[]): Promise<T>; | ||
async function send(action: string, ...args: any[]): Promise<any> { | ||
@@ -229,11 +234,20 @@ if (!socket || socket.readyState > 1 || closing) { | ||
let onMessage: GenericFunction; | ||
if (typeof args[args.length - 1] === 'function') { | ||
onMessage = args.pop(); | ||
} | ||
while (args.length && args[args.length - 1] === undefined) args.pop(); | ||
const r = requestNumber++; | ||
return new Promise((resolve, reject) => { | ||
return new Promise<void>((resolve, reject) => { | ||
let onMessage: GenericFunction, abortSignal: AbortSignal; | ||
if (typeof args[args.length - 1] === 'function') { | ||
onMessage = args.pop(); | ||
if (args[args.length - 1] instanceof AbortSignal) { | ||
abortSignal = args.pop(); | ||
abortSignal.onabort = () => { | ||
try { | ||
if (abortSignal.reason) reject(abortSignal.reason); | ||
else resolve(); | ||
send('_abort', r); | ||
} catch (err) {} | ||
}; | ||
} | ||
} | ||
requests[r] = { action, args, resolve, reject, onMessage }; | ||
@@ -262,15 +276,2 @@ try { | ||
async function sendAndListen<T extends GenericFunction>(action: string, ...args: [...any, T]): Promise<Unsubscribe> { | ||
const listener = args.pop() as T; | ||
const ref = await send(action, ...args); | ||
listeners[ref] = signal(); | ||
const unsubscribe = listeners[ref](listener); | ||
return async () => { | ||
unsubscribe(); | ||
delete listeners[ref]; | ||
await send('unlisten', ref); | ||
}; | ||
} | ||
async function auth(idToken?: string) { | ||
@@ -312,7 +313,8 @@ const uid = await send('auth', idToken); | ||
apply: (_, __, args) => send(name, ...args), | ||
get: (obj, prop: string) => prop in obj ? obj[prop] : proxy(NOOP, name ? `${name}.${prop}` : prop), | ||
get: (obj, prop: string) => prop in obj ? obj[prop] : (obj[prop] = proxy(() => {}, name ? `${name}.${prop}` : prop)), | ||
}); | ||
} | ||
return proxy({ | ||
return { | ||
api: proxy({}), | ||
connect, | ||
@@ -324,4 +326,3 @@ disconnect, | ||
sendAfterAuthed, | ||
sendAndListen, | ||
listen, | ||
onMessage, | ||
auth, | ||
@@ -336,3 +337,3 @@ getNow, | ||
onError, | ||
}) as any; | ||
}; | ||
} | ||
@@ -339,0 +340,0 @@ |
{ | ||
"name": "websocket-rpc-protocol", | ||
"version": "0.1.9", | ||
"version": "0.1.10", | ||
"description": "A JSON RPC protocol for working over websockets. Sheds the weight of JSON RPC to simplify argument names and adds features.", | ||
@@ -5,0 +5,0 @@ "scripts": { |
@@ -27,4 +27,4 @@ # Agreeable Websocket Protocol | ||
* `d`: The optional **d**ata result returned from the method. | ||
* `s`: An optional **s**tream flag indicating partial data for a request that is streaming data. These requests must | ||
end with a terminating message that does not contain `s`. | ||
* `s`: An optional **s**tream flag (the number `1`) indicating partial data for a request that is streaming data. These | ||
requests must end with a terminating message that does not contain `s`. | ||
* `err`: The **err**or returned from the method if one was thrown. | ||
@@ -44,14 +44,25 @@ | ||
properties: | ||
* `p`: A **p**ush number indicating this is a push message sent to the client. A `1` (a smaller boolean true) indicates | ||
message was sent without being requested. A higher number may be used to associate this data with a specific | ||
request for it. For example, an action `"listen"` might be used to start listening to data and the server could | ||
return id `5` in response. Then any data that matches would be returned with **push** being 5. | ||
* `p`: A **p**ush flag indicating this is a push message sent to the client. | ||
* `d`: The data for this **push** notification. | ||
-> `{"a":"listen","r":3,"d":["user_projects"]}` | ||
<- `{"r":3,"d":5` | ||
<- `{"p":5,"d":{"projectId":"abc123", ...}}` | ||
<- `{"p":5,"d":{"projectId":"abc123", ...}}` | ||
<- `{"p":-1,"d":{"_connection":"closing"}}` | ||
-> `{"a":"listen","r":3,"d":["these","pubsub","topics"]}` | ||
<- `{"r":3}` | ||
<- `{"p":1,"d":{"subject":"pubsub","payload":{...}}` | ||
<- `{"p":1,"d":{"subject":"topcis","payload":{...}}` | ||
## Targeted Message Handlers | ||
Rather than a single stream of messages being pushed, you may want to handle individual streams. You can do this using | ||
the stream part of this protocol. You may do so by making an initial request and then leaving it open, slowly streaming | ||
the response back over a long period. An open stream request may be aborted with a special action defined by your server | ||
such as an `_abort` action. | ||
-> `{"a":"getPresences","r":3,"d":["roomID123ABC"]}` | ||
<- `{"r":3}` | ||
<- `{"r":3,"s":1,"d":{"uid":"3jf9","status":"online"}}` | ||
<- `{"r":3,"s":1,"d":{"uid":"0toe","status":"offline"}}` | ||
-> `{"a":"_abort","r":4,"d":[3]}` | ||
<- `{"r":3}` | ||
<- `{"r":4,"d":true}` | ||
## That's it | ||
@@ -58,0 +69,0 @@ |
@@ -6,2 +6,3 @@ // Exposes an API to a websocket endpoint using the protocol described in PROTOCOL.md | ||
let preMessages = []; | ||
const streamingRequests = new Map(); | ||
socket.addEventListener('message', onMessage); | ||
@@ -62,2 +63,9 @@ socket.addEventListener('close', close); | ||
} | ||
if (a === '_abort') { | ||
const otherR = d[0]; | ||
const success = streamingRequests.get(otherR)?.(true) || false; | ||
if (success) | ||
send({ r: otherR }); | ||
return send({ r, d: success }); | ||
} | ||
if (a[0] === '_' || typeof apiFunction !== 'function') { | ||
@@ -73,3 +81,4 @@ return sendError('Unknown action'); | ||
const signal = result; | ||
const unsubscribe = signal((d) => { | ||
const unsubscribers = []; | ||
unsubscribers.push(signal((d) => { | ||
if (d === undefined) { | ||
@@ -82,11 +91,19 @@ unsubscribe(); | ||
} | ||
}); | ||
const { error } = signal; | ||
})); | ||
const { error, abort } = signal; | ||
if (typeof error === 'function' && typeof error.dispatch === 'function') { | ||
const errorUnsub = error((err) => { | ||
unsubscribers.push(error((err) => { | ||
sendError(err); | ||
unsubscribe(); | ||
errorUnsub(); | ||
}); | ||
})); | ||
} | ||
const unsubscribe = (aborted) => { | ||
if (!streamingRequests.delete(r)) | ||
return false; | ||
if (aborted && abort) | ||
abort.dispatch(); | ||
unsubscribers.forEach(u => u()); | ||
return true; | ||
}; | ||
streamingRequests.set(r, unsubscribe); | ||
} | ||
@@ -93,0 +110,0 @@ else { |
@@ -1,2 +0,2 @@ | ||
import { Signal } from 'easy-signal'; | ||
import { Signal, Unsubscriber } from 'easy-signal'; | ||
@@ -18,2 +18,3 @@ export type APIMethod = (...args: any[]) => any; | ||
let preMessages: string[] = []; | ||
const streamingRequests = new Map<number, (aborted?: boolean) => boolean>(); | ||
@@ -85,2 +86,9 @@ socket.addEventListener('message', onMessage); | ||
if (a === '_abort') { | ||
const otherR = d[0]; | ||
const success = streamingRequests.get(otherR)?.(true) || false; | ||
if (success) send({ r: otherR }); | ||
return send({ r, d: success }); | ||
} | ||
if (a[0] === '_' || typeof apiFunction !== 'function') { | ||
@@ -97,3 +105,4 @@ return sendError('Unknown action'); | ||
const signal = result as Signal; | ||
const unsubscribe = signal((d: any) => { | ||
const unsubscribers: Unsubscriber[] = []; | ||
unsubscribers.push(signal((d: any) => { | ||
if (d === undefined) { | ||
@@ -105,11 +114,17 @@ unsubscribe(); | ||
} | ||
}); | ||
const { error } = signal as any as {error: Signal}; | ||
})); | ||
const { error, abort } = signal as any as {error: Signal, abort: Signal}; | ||
if (typeof error === 'function' && typeof error.dispatch === 'function') { | ||
const errorUnsub = error((err: Error) => { | ||
unsubscribers.push(error((err: Error) => { | ||
sendError(err); | ||
unsubscribe(); | ||
errorUnsub(); | ||
}); | ||
})); | ||
} | ||
const unsubscribe = (aborted?: boolean) => { | ||
if (!streamingRequests.delete(r)) return false; | ||
if (aborted && abort) abort.dispatch(); | ||
unsubscribers.forEach(u => u()); | ||
return true; | ||
}; | ||
streamingRequests.set(r, unsubscribe); | ||
} else { | ||
@@ -116,0 +131,0 @@ send({ r, d: result }); |
36116
7.44%868
4.96%