Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@push-rpc/core

Package Overview
Dependencies
Maintainers
1
Versions
62
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@push-rpc/core - npm Package Compare versions

Comparing version 1.1.9 to 1.2.0

4

dist/client.js

@@ -30,4 +30,4 @@ "use strict";

createContext: function () { return ({ remoteId: null }); },
localMiddleware: function (ctx, next, params) { return next(params); },
remoteMiddleware: function (ctx, next, params) { return next(params); },
localMiddleware: function (ctx, next, params, messageType) { return next(params); },
remoteMiddleware: function (ctx, next, params, messageType) { return next(params); },
messageParser: function (data) { return JSON.parse(data, utils_1.dateReviver); },

@@ -34,0 +34,0 @@ pingSendTimeout: null,

@@ -21,3 +21,3 @@ import { DataConsumer, DataSupplier, Topic, TopicImpl } from "./rpc";

private throttled;
subscribeSession(session: RpcSession, filter: F): Promise<void>;
subscribeSession(session: RpcSession, filter: F, messageId: any, ctx: any): Promise<D>;
unsubscribeSession(session: RpcSession, filter: F): void;

@@ -27,3 +27,3 @@ private subscriptions;

get(params?: F): Promise<D>;
subscribe(consumer: DataConsumer<D>, params: F, subscriptionKey: any): void;
subscribe(consumer: DataConsumer<D>, params: F, subscriptionKey: any): Promise<void>;
unsubscribe(params?: F, subscriptionKey?: any): void;

@@ -30,0 +30,0 @@ }

@@ -125,5 +125,5 @@ "use strict";

};
LocalTopicImpl.prototype.subscribeSession = function (session, filter) {
LocalTopicImpl.prototype.subscribeSession = function (session, filter, messageId, ctx) {
return __awaiter(this, void 0, void 0, function () {
var key, localTopic, subscription, data;
var key, localTopic, data, subscription;
return __generator(this, function (_a) {

@@ -134,2 +134,5 @@ switch (_a.label) {

localTopic = this;
return [4 /*yield*/, this.supplier(filter, ctx)];
case 1:
data = _a.sent();
subscription = this.subscriptions[key] || {

@@ -164,11 +167,5 @@ filter: filter,

};
// TODO if already subscribed, just send current data, and do not add a new subscription - it will save
// some network ops
subscription.sessions.push(session);
this.subscriptions[key] = subscription;
return [4 /*yield*/, this.supplier(filter, session.createContext())];
case 1:
data = _a.sent();
session.send(rpc_1.MessageType.Data, utils_1.createMessageId(), this.getTopicName(), filter, data);
return [2 /*return*/];
return [2 /*return*/, data];
}

@@ -196,3 +193,7 @@ });

};
LocalTopicImpl.prototype.subscribe = function (consumer, params, subscriptionKey) { };
LocalTopicImpl.prototype.subscribe = function (consumer, params, subscriptionKey) {
return __awaiter(this, void 0, void 0, function () { return __generator(this, function (_a) {
return [2 /*return*/];
}); });
};
LocalTopicImpl.prototype.unsubscribe = function (params, subscriptionKey) { };

@@ -199,0 +200,0 @@ return LocalTopicImpl;

@@ -7,3 +7,3 @@ import { DataConsumer, LocalTopic, RemoteTopic, TopicImpl } from "./rpc";

constructor(topicName: string, session: RpcSession);
subscribe<SubscriptionKey = DataConsumer<D>>(consumer: DataConsumer<D>, filter?: F, subscriptionKey?: SubscriptionKey): SubscriptionKey;
subscribe<SubscriptionKey = DataConsumer<D>>(consumer: DataConsumer<D>, filter?: F, subscriptionKey?: SubscriptionKey): Promise<SubscriptionKey>;
unsubscribe(params?: F, subscriptionKey?: any): void;

@@ -10,0 +10,0 @@ private deleteAllSubscriptions;

@@ -15,2 +15,38 @@ "use strict";

})();
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); }
return new (P || (P = Promise))(function (resolve, reject) {
function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); }
step((generator = generator.apply(thisArg, _arguments || [])).next());
});
};
var __generator = (this && this.__generator) || function (thisArg, body) {
var _ = { label: 0, sent: function() { if (t[0] & 1) throw t[1]; return t[1]; }, trys: [], ops: [] }, f, y, t, g;
return g = { next: verb(0), "throw": verb(1), "return": verb(2) }, typeof Symbol === "function" && (g[Symbol.iterator] = function() { return this; }), g;
function verb(n) { return function (v) { return step([n, v]); }; }
function step(op) {
if (f) throw new TypeError("Generator is already executing.");
while (_) try {
if (f = 1, y && (t = op[0] & 2 ? y["return"] : op[0] ? y["throw"] || ((t = y["return"]) && t.call(y), 0) : y.next) && !(t = t.call(y, op[1])).done) return t;
if (y = 0, t) op = [op[0] & 2, t.value];
switch (op[0]) {
case 0: case 1: t = op; break;
case 4: _.label++; return { value: op[1], done: false };
case 5: _.label++; y = op[1]; op = [0]; continue;
case 7: op = _.ops.pop(); _.trys.pop(); continue;
default:
if (!(t = _.trys, t = t.length > 0 && t[t.length - 1]) && (op[0] === 6 || op[0] === 2)) { _ = 0; continue; }
if (op[0] === 3 && (!t || (op[1] > t[0] && op[1] < t[3]))) { _.label = op[1]; break; }
if (op[0] === 6 && _.label < t[1]) { _.label = t[1]; t = op; break; }
if (t && _.label < t[2]) { _.label = t[2]; _.ops.push(op); break; }
if (t[2]) _.ops.pop();
_.trys.pop(); continue;
}
op = body.call(thisArg, _);
} catch (e) { op = [6, e]; y = 0; } finally { f = t = 0; }
if (op[0] & 5) throw op[1]; return { value: op[0] ? op[1] : void 0, done: true };
}
};
var __spreadArrays = (this && this.__spreadArrays) || function () {

@@ -40,13 +76,31 @@ for (var s = 0, i = 0, il = arguments.length; i < il; i++) s += arguments[i].length;

if (subscriptionKey === void 0) { subscriptionKey = consumer; }
if (filter === null) {
throw new Error("Subscribe with null filter is not supported, use empty object to get all data");
}
var paramsKey = JSON.stringify(filter);
this.consumers[paramsKey] = __spreadArrays((this.consumers[paramsKey] || []), [{ consumer: consumer, subscriptionKey: subscriptionKey }]);
// TODO it is not necessary to send subscribe if we already have cached value
this.session.send(rpc_1.MessageType.Subscribe, utils_1.createMessageId(), this.topicName, filter);
if (this.cached[paramsKey] !== undefined) {
consumer(this.cached[paramsKey]);
}
return subscriptionKey;
return __awaiter(this, void 0, void 0, function () {
var paramsKey, e_1;
return __generator(this, function (_a) {
switch (_a.label) {
case 0:
if (filter === null) {
throw new Error("Subscribe with null filter is not supported, use empty object to get all data");
}
paramsKey = JSON.stringify(filter);
// already have cached value with this params?
if (this.cached[paramsKey] !== undefined) {
consumer(this.cached[paramsKey]);
}
this.consumers[paramsKey] = __spreadArrays((this.consumers[paramsKey] || []), [{ consumer: consumer, subscriptionKey: subscriptionKey }]);
_a.label = 1;
case 1:
_a.trys.push([1, 3, , 4]);
return [4 /*yield*/, this.session.callRemote(this.topicName, filter, rpc_1.MessageType.Subscribe)];
case 2:
_a.sent();
return [3 /*break*/, 4];
case 3:
e_1 = _a.sent();
this.unsubscribe(filter, subscriptionKey);
throw e_1;
case 4: return [2 /*return*/, subscriptionKey];
}
});
});
};

@@ -53,0 +107,0 @@ RemoteTopicImpl.prototype.unsubscribe = function (params, subscriptionKey) {

@@ -20,3 +20,3 @@ /**

export interface RemoteTopic<D, P> {
subscribe(consumer: DataConsumer<D>, params?: P, subscriptionKey?: any): void;
subscribe(consumer: DataConsumer<D>, params?: P, subscriptionKey?: any): Promise<any>;
unsubscribe(params?: P, subscriptionKey?: any): any;

@@ -54,2 +54,2 @@ get(params?: P): Promise<D>;

}
export declare type Middleware = (ctx: RpcContext, next: (params: any) => Promise<any>, params: any) => Promise<any>;
export declare type Middleware = (ctx: any, next: (params: any) => Promise<any>, params: any, messageType: MessageType.Call | MessageType.Get | MessageType.Subscribe) => Promise<any>;

@@ -59,5 +59,5 @@ "use strict";

var logger_1 = require("./logger");
var remote_1 = require("./remote");
var rpc_1 = require("./rpc");
var utils_1 = require("./utils");
var rpc_1 = require("./rpc");
var remote_1 = require("./remote");
var RpcSession = /** @class */ (function () {

@@ -192,3 +192,3 @@ function RpcSession(local, remoteLevel, listeners, connectionContext, localMiddleware, remoteMiddleware, messageParser, pingSendTimeout, keepAliveTimeout, callTimeout, syncRemoteCalls) {

}
this.subscribe(localTopic, other[0]);
this.subscribe(localTopic, other[0], id);
break;

@@ -222,2 +222,7 @@ case rpc_1.MessageType.Unsubscribe:

}
// release await on subscribe
if (this.runningCalls[id]) {
this.callRemoteResponse(message_1);
}
// and deliver to callback
remoteTopic.receiveData(other[0], other[1]);

@@ -275,3 +280,3 @@ break;

};
return this.remoteMiddleware(null, sendMessage, params);
return this.remoteMiddleware(null, sendMessage, params, type);
};

@@ -306,10 +311,16 @@ RpcSession.prototype.sendCall = function () {

RpcSession.prototype.callRemoteResponse = function (data) {
var _ = data[0], id = data[1], res = data[2], description = data[3], details = data[4];
var messageType = data[0], id = data[1], other = data.slice(2);
if (this.runningCalls[id]) {
var _a = this.runningCalls[id], resolve = _a.resolve, reject = _a.reject;
delete this.runningCalls[id];
if (data[0] == rpc_1.MessageType.Result) {
resolve(res);
if (messageType == rpc_1.MessageType.Result || messageType == rpc_1.MessageType.Data) {
if (messageType == rpc_1.MessageType.Result) {
resolve(other[0]);
}
else {
resolve(other[2]);
}
}
else {
var res = other[0], description = other[1], details = other[2];
var error = new Error(description || res || "Remote call failed");

@@ -336,3 +347,3 @@ Object.assign(error, details || {});

};
return [4 /*yield*/, this.localMiddleware(callContext_1, invokeLocalMethod, params)];
return [4 /*yield*/, this.localMiddleware(callContext_1, invokeLocalMethod, params, rpc_1.MessageType.Call)];
case 1:

@@ -354,3 +365,3 @@ r = _a.sent();

return __awaiter(this, void 0, void 0, function () {
var d, e_3;
var callContext_2, getFromTopic, r, e_3;
return __generator(this, function (_a) {

@@ -360,6 +371,11 @@ switch (_a.label) {

_a.trys.push([0, 2, , 3]);
return [4 /*yield*/, topic.getData(params, this.createContext(id, topic.getTopicName()))];
callContext_2 = this.createContext(id, topic.getTopicName());
getFromTopic = function (p) {
if (p === void 0) { p = params; }
return topic.getData(p, callContext_2);
};
return [4 /*yield*/, this.localMiddleware(callContext_2, getFromTopic, params, rpc_1.MessageType.Get)];
case 1:
d = _a.sent();
this.send(rpc_1.MessageType.Result, id, d);
r = _a.sent();
this.send(rpc_1.MessageType.Result, id, r);
return [3 /*break*/, 3];

@@ -376,12 +392,27 @@ case 2:

};
RpcSession.prototype.subscribe = function (topic, params) {
RpcSession.prototype.subscribe = function (topic, params, messageId) {
return __awaiter(this, void 0, void 0, function () {
var ctx_1, subscribeTopic, r, e_4;
var _this = this;
return __generator(this, function (_a) {
switch (_a.label) {
case 0: return [4 /*yield*/, topic.subscribeSession(this, params)];
case 0:
_a.trys.push([0, 2, , 3]);
ctx_1 = this.createContext(messageId, topic.getTopicName());
subscribeTopic = function (p) {
if (p === void 0) { p = params; }
return topic.subscribeSession(_this, p, messageId, ctx_1);
};
return [4 /*yield*/, this.localMiddleware(ctx_1, subscribeTopic, params, rpc_1.MessageType.Subscribe)];
case 1:
_a.sent();
r = _a.sent();
this.send(rpc_1.MessageType.Data, messageId, topic.getTopicName(), params, r);
this.subscriptions.push({ topic: topic, params: params });
this.listeners.subscribed(this.subscriptions.length);
return [2 /*return*/];
return [3 /*break*/, 3];
case 2:
e_4 = _a.sent();
this.sendError(messageId, e_4);
return [3 /*break*/, 3];
case 3: return [2 /*return*/];
}

@@ -388,0 +419,0 @@ });

@@ -78,4 +78,4 @@ "use strict";

},
localMiddleware: function (ctx, next, params) { return next(params); },
remoteMiddleware: function (ctx, next, params) { return next(params); },
localMiddleware: function (ctx, next, params, messageType) { return next(params); },
remoteMiddleware: function (ctx, next, params, messageType) { return next(params); },
clientLevel: 0,

@@ -82,0 +82,0 @@ pingSendTimeout: 40 * 1000,

@@ -117,3 +117,3 @@ "use strict";

}
return function (ctx, next, params) {
return function (ctx, next, params, messageType) {
var index = -1;

@@ -130,3 +130,3 @@ return dispatch(0, params);

else {
return Promise.resolve(middleware[i](ctx, dispatch.bind(null, i + 1), p));
return Promise.resolve(middleware[i](ctx, dispatch.bind(null, i + 1), p, messageType));
}

@@ -133,0 +133,0 @@ }

{
"name": "@push-rpc/core",
"version": "1.1.9",
"version": "1.2.0",
"main": "dist/index.js",

@@ -20,3 +20,3 @@ "types": "dist/index.d.ts",

},
"gitHead": "570406851d106630e4d006b1fc5a23aae97e7c74"
"gitHead": "ea55b58bf0f022332eea31d2e1987b76cd351ea9"
}

@@ -49,4 +49,4 @@ import {RpcSession} from "./RpcSession"

createContext: () => ({remoteId: null}),
localMiddleware: (ctx, next, params) => next(params),
remoteMiddleware: (ctx, next, params) => next(params),
localMiddleware: (ctx, next, params, messageType) => next(params),
remoteMiddleware: (ctx, next, params, messageType) => next(params),
messageParser: data => JSON.parse(data, dateReviver),

@@ -53,0 +53,0 @@ pingSendTimeout: null,

@@ -63,7 +63,8 @@ import {DataConsumer, DataSupplier, MessageType, Topic, TopicImpl} from "./rpc"

async subscribeSession(session: RpcSession, filter: F) {
async subscribeSession(session: RpcSession, filter: F, messageId, ctx) {
const key = JSON.stringify(filter)
const localTopic = this
const data = await this.supplier(filter, ctx)
const subscription: Subscription<F, D, TD> = this.subscriptions[key] || {

@@ -85,10 +86,6 @@ filter,

// TODO if already subscribed, just send current data, and do not add a new subscription - it will save
// some network ops
subscription.sessions.push(session)
this.subscriptions[key] = subscription
const data = await this.supplier(filter, session.createContext())
session.send(MessageType.Data, createMessageId(), this.getTopicName(), filter, data)
return data
}

@@ -121,3 +118,3 @@

}
subscribe(consumer: DataConsumer<D>, params: F, subscriptionKey: any): void {}
async subscribe(consumer: DataConsumer<D>, params: F, subscriptionKey: any) {}
unsubscribe(params?: F, subscriptionKey?: any) {}

@@ -124,0 +121,0 @@ }

@@ -1,5 +0,4 @@

import {LocalTopicImpl} from "./local"
import {DataConsumer, LocalTopic, MessageType, Method, RemoteTopic, TopicImpl} from "./rpc"
import {RpcSession} from "./RpcSession"
import {createMessageId, getClassMethodNames} from "./utils"
import {RpcSession} from "./RpcSession"

@@ -17,7 +16,7 @@ interface Subscription<D> {

subscribe<SubscriptionKey = DataConsumer<D>>(
async subscribe<SubscriptionKey = DataConsumer<D>>(
consumer: DataConsumer<D>,
filter: F = {} as any,
subscriptionKey: SubscriptionKey = consumer as any
): SubscriptionKey {
): Promise<SubscriptionKey> {
if (filter === null) {

@@ -31,7 +30,3 @@ throw new Error(

this.consumers[paramsKey] = [...(this.consumers[paramsKey] || []), {consumer, subscriptionKey}]
// TODO it is not necessary to send subscribe if we already have cached value
this.session.send(MessageType.Subscribe, createMessageId(), this.topicName, filter)
// already have cached value with this params?
if (this.cached[paramsKey] !== undefined) {

@@ -41,2 +36,11 @@ consumer(this.cached[paramsKey])

this.consumers[paramsKey] = [...(this.consumers[paramsKey] || []), {consumer, subscriptionKey}]
try {
await this.session.callRemote(this.topicName, filter, MessageType.Subscribe)
} catch (e) {
this.unsubscribe(filter, subscriptionKey)
throw e
}
return subscriptionKey

@@ -43,0 +47,0 @@ }

@@ -43,3 +43,3 @@ /**

export interface RemoteTopic<D, P> {
subscribe(consumer: DataConsumer<D>, params?: P, subscriptionKey?: any): void
subscribe(consumer: DataConsumer<D>, params?: P, subscriptionKey?: any): Promise<any>
unsubscribe(params?: P, subscriptionKey?: any)

@@ -93,7 +93,8 @@ get(params?: P): Promise<D>

export type Middleware = (
ctx: RpcContext,
ctx: any,
next: (params: any) => Promise<any>,
params: any
params: any,
messageType: MessageType.Call | MessageType.Get | MessageType.Subscribe
) => Promise<any>
// id generator

@@ -0,14 +1,7 @@

import {LocalTopicImpl} from "./local"
import {log} from "./logger"
import {createMessageId, message} from "./utils"
import {
getServiceItem,
MessageType,
Method,
Middleware,
RpcConnectionContext,
RpcContext,
} from "./rpc"
import {LocalTopicImpl} from "./local"
import {createRemote, RemoteTopicImpl} from "./remote"
import {getServiceItem, MessageType, Method, Middleware, RpcConnectionContext, RpcContext,} from "./rpc"
import {Socket} from "./transport"
import {createMessageId, message} from "./utils"

@@ -167,3 +160,3 @@ export interface RpcSessionListeners {

this.subscribe(localTopic, other[0])
this.subscribe(localTopic, other[0], id)
break

@@ -209,2 +202,8 @@

// release await on subscribe
if (this.runningCalls[id]) {
this.callRemoteResponse(message)
}
// and deliver to callback
remoteTopic.receiveData(other[0], other[1])

@@ -264,3 +263,3 @@ break

return this.remoteMiddleware(null, sendMessage, params)
return this.remoteMiddleware(null, sendMessage, params, type)
}

@@ -303,3 +302,3 @@

private callRemoteResponse(data) {
const [_, id, res, description, details] = data
const [messageType, id, ...other] = data

@@ -310,5 +309,11 @@ if (this.runningCalls[id]) {

if (data[0] == MessageType.Result) {
resolve(res)
if (messageType == MessageType.Result || messageType == MessageType.Data) {
if (messageType == MessageType.Result) {
resolve(other[0])
} else {
resolve(other[2])
}
} else {
const [res, description, details] = other
const error = new Error(description || res || "Remote call failed")

@@ -329,3 +334,3 @@ Object.assign(error, details || {})

const invokeLocalMethod = (p = params) => localMethod.call(localMethodObject, p, callContext)
const r = await this.localMiddleware(callContext, invokeLocalMethod, params)
const r = await this.localMiddleware(callContext, invokeLocalMethod, params, MessageType.Call)

@@ -341,4 +346,8 @@ this.send(MessageType.Result, id, r)

try {
const d = await topic.getData(params, this.createContext(id, topic.getTopicName()))
this.send(MessageType.Result, id, d)
const callContext = this.createContext(id, topic.getTopicName())
const getFromTopic = (p = params) => topic.getData(p, callContext)
const r = await this.localMiddleware(callContext, getFromTopic, params, MessageType.Get)
this.send(MessageType.Result, id, r)
} catch (e) {

@@ -350,6 +359,16 @@ log.error(`Unable to get data from topic ${name}`, e)

private async subscribe(topic: LocalTopicImpl<any, any>, params) {
await topic.subscribeSession(this, params)
this.subscriptions.push({topic, params})
this.listeners.subscribed(this.subscriptions.length)
private async subscribe(topic: LocalTopicImpl<any, any>, params, messageId) {
try {
const ctx = this.createContext(messageId, topic.getTopicName())
const subscribeTopic = (p = params) => topic.subscribeSession(this, p, messageId, ctx)
const r = await this.localMiddleware(ctx, subscribeTopic, params, MessageType.Subscribe)
this.send(MessageType.Data, messageId, topic.getTopicName(), params, r)
this.subscriptions.push({topic, params})
this.listeners.subscribed(this.subscriptions.length)
} catch (e) {
this.sendError(messageId, e)
}
}

@@ -356,0 +375,0 @@

@@ -35,4 +35,4 @@ import * as UUID from "uuid-js"

}),
localMiddleware: (ctx, next, params) => next(params),
remoteMiddleware: (ctx, next, params) => next(params),
localMiddleware: (ctx, next, params, messageType) => next(params),
remoteMiddleware: (ctx, next, params, messageType) => next(params),
clientLevel: 0,

@@ -39,0 +39,0 @@ pingSendTimeout: 40 * 1000,

@@ -77,3 +77,3 @@ import * as UUID from "uuid-js"

export function composeMiddleware(...middleware: Middleware[]): Middleware {
return function(ctx, next, params) {
return function(ctx, next, params, messageType) {
let index = -1

@@ -91,3 +91,3 @@ return dispatch(0, params)

} else {
return Promise.resolve(middleware[i](ctx, dispatch.bind(null, i + 1), p))
return Promise.resolve(middleware[i](ctx, dispatch.bind(null, i + 1), p, messageType))
}

@@ -94,0 +94,0 @@ } catch (err) {

@@ -1,4 +0,5 @@

import {composeMiddleware} from "../src"
import {assert} from "chai"
import {composeMiddleware, LocalTopicImpl, MessageType} from "../src"
import {Middleware} from "../src/rpc"
import {assert} from "chai"
import {message} from "../src/utils"
import {createTestClient, startTestServer} from "./testUtils"

@@ -22,3 +23,3 @@

const r = await composed(null, async p => p, 0)
const r = await composed(null, async p => p, 0, MessageType.Call)

@@ -28,5 +29,98 @@ assert.equal(r, 3)

it("local", async () => {
it("local get topic", async () => {
let mwMessageType = null
await startTestServer(
{
item: new LocalTopicImpl(async () => "1"),
},
{
localMiddleware: (ctx, next, params, messageType) => {
mwMessageType = messageType
return next(params)
},
}
)
const client = await createTestClient(0)
const r = await client.item.get()
assert.equal(r, "1")
assert.equal(mwMessageType, MessageType.Get)
})
it("remote get topic", async () => {
let mwMessageType = null
await startTestServer(
{
item: new LocalTopicImpl(async () => "1"),
}
)
const client = await createTestClient(0, {
remoteMiddleware: (ctx, next, params, messageType) => {
mwMessageType = messageType
return next(params)
},
})
const r = await client.item.get()
assert.equal(r, "1")
assert.equal(mwMessageType, MessageType.Get)
})
it("local topic subscribe", async () => {
let mwMessageType = null
await startTestServer(
{
item: new LocalTopicImpl(async () => "1"),
},
{
localMiddleware: (ctx, next, params, messageType) => {
mwMessageType = messageType
return next(params)
},
}
)
let r = null
const client = await createTestClient(0)
await client.item.subscribe(data => r = data)
await new Promise(r => setTimeout(r, 50))
assert.equal(r, "1")
assert.equal(mwMessageType, MessageType.Subscribe)
})
it("remote topic subscribe", async () => {
let mwMessageType = null
await startTestServer(
{
item: new LocalTopicImpl(async () => "1"),
}
)
let r = null
const client = await createTestClient(0, {
remoteMiddleware: (ctx, next, params, messageType) => {
mwMessageType = messageType
return next(params)
},
})
await client.item.subscribe(data => r = data)
await new Promise(r => setTimeout(r, 50))
assert.equal(r, "1")
assert.equal(mwMessageType, MessageType.Subscribe)
})
it("local param update", async () => {
await startTestServer(
{
async getSomething(req) {

@@ -33,0 +127,0 @@ return req

import {assert} from "chai"
import {createRpcClient, LocalTopicImpl} from "../src"
import {createRpcClient, LocalTopicImpl, MessageType} from "../src"
import {groupReducer} from "../src/local"

@@ -8,2 +8,21 @@ import {createTestClient, startTestServer, TEST_PORT} from "./testUtils"

describe("Topics", () => {
it("error in supplier breaks subscribe", async () => {
await startTestServer(
{
item: new LocalTopicImpl(async () => {
throw new Error("AA")
}),
}
)
const client = await createTestClient(0)
try {
await client.item.subscribe(() => {})
assert.fail("Error expected")
} catch (e) {
assert.equal(e.message, "AA")
}
})
it("get", async () => {

@@ -201,3 +220,3 @@ const item = {r: "asf"}

let item2
await client.test.item.subscribe(item => {
client.test.item.subscribe(item => {
item2 = item

@@ -208,2 +227,7 @@ })

assert.deepEqual(item2, {r: "1"})
await new Promise(resolve => setTimeout(resolve, 50))
// and a new version after some time
assert.deepEqual(item2, item)
})

@@ -210,0 +234,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc