@push-rpc/core
Advanced tools
Comparing version 1.1.9 to 1.2.0
@@ -30,4 +30,4 @@ "use strict"; | ||
createContext: function () { return ({ remoteId: null }); }, | ||
localMiddleware: function (ctx, next, params) { return next(params); }, | ||
remoteMiddleware: function (ctx, next, params) { return next(params); }, | ||
localMiddleware: function (ctx, next, params, messageType) { return next(params); }, | ||
remoteMiddleware: function (ctx, next, params, messageType) { return next(params); }, | ||
messageParser: function (data) { return JSON.parse(data, utils_1.dateReviver); }, | ||
@@ -34,0 +34,0 @@ pingSendTimeout: null, |
@@ -21,3 +21,3 @@ import { DataConsumer, DataSupplier, Topic, TopicImpl } from "./rpc"; | ||
private throttled; | ||
subscribeSession(session: RpcSession, filter: F): Promise<void>; | ||
subscribeSession(session: RpcSession, filter: F, messageId: any, ctx: any): Promise<D>; | ||
unsubscribeSession(session: RpcSession, filter: F): void; | ||
@@ -27,3 +27,3 @@ private subscriptions; | ||
get(params?: F): Promise<D>; | ||
subscribe(consumer: DataConsumer<D>, params: F, subscriptionKey: any): void; | ||
subscribe(consumer: DataConsumer<D>, params: F, subscriptionKey: any): Promise<void>; | ||
unsubscribe(params?: F, subscriptionKey?: any): void; | ||
@@ -30,0 +30,0 @@ } |
@@ -125,5 +125,5 @@ "use strict"; | ||
}; | ||
LocalTopicImpl.prototype.subscribeSession = function (session, filter) { | ||
LocalTopicImpl.prototype.subscribeSession = function (session, filter, messageId, ctx) { | ||
return __awaiter(this, void 0, void 0, function () { | ||
var key, localTopic, subscription, data; | ||
var key, localTopic, data, subscription; | ||
return __generator(this, function (_a) { | ||
@@ -134,2 +134,5 @@ switch (_a.label) { | ||
localTopic = this; | ||
return [4 /*yield*/, this.supplier(filter, ctx)]; | ||
case 1: | ||
data = _a.sent(); | ||
subscription = this.subscriptions[key] || { | ||
@@ -164,11 +167,5 @@ filter: filter, | ||
}; | ||
// TODO if already subscribed, just send current data, and do not add a new subscription - it will save | ||
// some network ops | ||
subscription.sessions.push(session); | ||
this.subscriptions[key] = subscription; | ||
return [4 /*yield*/, this.supplier(filter, session.createContext())]; | ||
case 1: | ||
data = _a.sent(); | ||
session.send(rpc_1.MessageType.Data, utils_1.createMessageId(), this.getTopicName(), filter, data); | ||
return [2 /*return*/]; | ||
return [2 /*return*/, data]; | ||
} | ||
@@ -196,3 +193,7 @@ }); | ||
}; | ||
LocalTopicImpl.prototype.subscribe = function (consumer, params, subscriptionKey) { }; | ||
LocalTopicImpl.prototype.subscribe = function (consumer, params, subscriptionKey) { | ||
return __awaiter(this, void 0, void 0, function () { return __generator(this, function (_a) { | ||
return [2 /*return*/]; | ||
}); }); | ||
}; | ||
LocalTopicImpl.prototype.unsubscribe = function (params, subscriptionKey) { }; | ||
@@ -199,0 +200,0 @@ return LocalTopicImpl; |
@@ -7,3 +7,3 @@ import { DataConsumer, LocalTopic, RemoteTopic, TopicImpl } from "./rpc"; | ||
constructor(topicName: string, session: RpcSession); | ||
subscribe<SubscriptionKey = DataConsumer<D>>(consumer: DataConsumer<D>, filter?: F, subscriptionKey?: SubscriptionKey): SubscriptionKey; | ||
subscribe<SubscriptionKey = DataConsumer<D>>(consumer: DataConsumer<D>, filter?: F, subscriptionKey?: SubscriptionKey): Promise<SubscriptionKey>; | ||
unsubscribe(params?: F, subscriptionKey?: any): void; | ||
@@ -10,0 +10,0 @@ private deleteAllSubscriptions; |
@@ -15,2 +15,38 @@ "use strict"; | ||
})(); | ||
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) { | ||
function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); } | ||
return new (P || (P = Promise))(function (resolve, reject) { | ||
function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } } | ||
function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } } | ||
function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); } | ||
step((generator = generator.apply(thisArg, _arguments || [])).next()); | ||
}); | ||
}; | ||
var __generator = (this && this.__generator) || function (thisArg, body) { | ||
var _ = { label: 0, sent: function() { if (t[0] & 1) throw t[1]; return t[1]; }, trys: [], ops: [] }, f, y, t, g; | ||
return g = { next: verb(0), "throw": verb(1), "return": verb(2) }, typeof Symbol === "function" && (g[Symbol.iterator] = function() { return this; }), g; | ||
function verb(n) { return function (v) { return step([n, v]); }; } | ||
function step(op) { | ||
if (f) throw new TypeError("Generator is already executing."); | ||
while (_) try { | ||
if (f = 1, y && (t = op[0] & 2 ? y["return"] : op[0] ? y["throw"] || ((t = y["return"]) && t.call(y), 0) : y.next) && !(t = t.call(y, op[1])).done) return t; | ||
if (y = 0, t) op = [op[0] & 2, t.value]; | ||
switch (op[0]) { | ||
case 0: case 1: t = op; break; | ||
case 4: _.label++; return { value: op[1], done: false }; | ||
case 5: _.label++; y = op[1]; op = [0]; continue; | ||
case 7: op = _.ops.pop(); _.trys.pop(); continue; | ||
default: | ||
if (!(t = _.trys, t = t.length > 0 && t[t.length - 1]) && (op[0] === 6 || op[0] === 2)) { _ = 0; continue; } | ||
if (op[0] === 3 && (!t || (op[1] > t[0] && op[1] < t[3]))) { _.label = op[1]; break; } | ||
if (op[0] === 6 && _.label < t[1]) { _.label = t[1]; t = op; break; } | ||
if (t && _.label < t[2]) { _.label = t[2]; _.ops.push(op); break; } | ||
if (t[2]) _.ops.pop(); | ||
_.trys.pop(); continue; | ||
} | ||
op = body.call(thisArg, _); | ||
} catch (e) { op = [6, e]; y = 0; } finally { f = t = 0; } | ||
if (op[0] & 5) throw op[1]; return { value: op[0] ? op[1] : void 0, done: true }; | ||
} | ||
}; | ||
var __spreadArrays = (this && this.__spreadArrays) || function () { | ||
@@ -40,13 +76,31 @@ for (var s = 0, i = 0, il = arguments.length; i < il; i++) s += arguments[i].length; | ||
if (subscriptionKey === void 0) { subscriptionKey = consumer; } | ||
if (filter === null) { | ||
throw new Error("Subscribe with null filter is not supported, use empty object to get all data"); | ||
} | ||
var paramsKey = JSON.stringify(filter); | ||
this.consumers[paramsKey] = __spreadArrays((this.consumers[paramsKey] || []), [{ consumer: consumer, subscriptionKey: subscriptionKey }]); | ||
// TODO it is not necessary to send subscribe if we already have cached value | ||
this.session.send(rpc_1.MessageType.Subscribe, utils_1.createMessageId(), this.topicName, filter); | ||
if (this.cached[paramsKey] !== undefined) { | ||
consumer(this.cached[paramsKey]); | ||
} | ||
return subscriptionKey; | ||
return __awaiter(this, void 0, void 0, function () { | ||
var paramsKey, e_1; | ||
return __generator(this, function (_a) { | ||
switch (_a.label) { | ||
case 0: | ||
if (filter === null) { | ||
throw new Error("Subscribe with null filter is not supported, use empty object to get all data"); | ||
} | ||
paramsKey = JSON.stringify(filter); | ||
// already have cached value with this params? | ||
if (this.cached[paramsKey] !== undefined) { | ||
consumer(this.cached[paramsKey]); | ||
} | ||
this.consumers[paramsKey] = __spreadArrays((this.consumers[paramsKey] || []), [{ consumer: consumer, subscriptionKey: subscriptionKey }]); | ||
_a.label = 1; | ||
case 1: | ||
_a.trys.push([1, 3, , 4]); | ||
return [4 /*yield*/, this.session.callRemote(this.topicName, filter, rpc_1.MessageType.Subscribe)]; | ||
case 2: | ||
_a.sent(); | ||
return [3 /*break*/, 4]; | ||
case 3: | ||
e_1 = _a.sent(); | ||
this.unsubscribe(filter, subscriptionKey); | ||
throw e_1; | ||
case 4: return [2 /*return*/, subscriptionKey]; | ||
} | ||
}); | ||
}); | ||
}; | ||
@@ -53,0 +107,0 @@ RemoteTopicImpl.prototype.unsubscribe = function (params, subscriptionKey) { |
@@ -20,3 +20,3 @@ /** | ||
export interface RemoteTopic<D, P> { | ||
subscribe(consumer: DataConsumer<D>, params?: P, subscriptionKey?: any): void; | ||
subscribe(consumer: DataConsumer<D>, params?: P, subscriptionKey?: any): Promise<any>; | ||
unsubscribe(params?: P, subscriptionKey?: any): any; | ||
@@ -54,2 +54,2 @@ get(params?: P): Promise<D>; | ||
} | ||
export declare type Middleware = (ctx: RpcContext, next: (params: any) => Promise<any>, params: any) => Promise<any>; | ||
export declare type Middleware = (ctx: any, next: (params: any) => Promise<any>, params: any, messageType: MessageType.Call | MessageType.Get | MessageType.Subscribe) => Promise<any>; |
@@ -59,5 +59,5 @@ "use strict"; | ||
var logger_1 = require("./logger"); | ||
var remote_1 = require("./remote"); | ||
var rpc_1 = require("./rpc"); | ||
var utils_1 = require("./utils"); | ||
var rpc_1 = require("./rpc"); | ||
var remote_1 = require("./remote"); | ||
var RpcSession = /** @class */ (function () { | ||
@@ -192,3 +192,3 @@ function RpcSession(local, remoteLevel, listeners, connectionContext, localMiddleware, remoteMiddleware, messageParser, pingSendTimeout, keepAliveTimeout, callTimeout, syncRemoteCalls) { | ||
} | ||
this.subscribe(localTopic, other[0]); | ||
this.subscribe(localTopic, other[0], id); | ||
break; | ||
@@ -222,2 +222,7 @@ case rpc_1.MessageType.Unsubscribe: | ||
} | ||
// release await on subscribe | ||
if (this.runningCalls[id]) { | ||
this.callRemoteResponse(message_1); | ||
} | ||
// and deliver to callback | ||
remoteTopic.receiveData(other[0], other[1]); | ||
@@ -275,3 +280,3 @@ break; | ||
}; | ||
return this.remoteMiddleware(null, sendMessage, params); | ||
return this.remoteMiddleware(null, sendMessage, params, type); | ||
}; | ||
@@ -306,10 +311,16 @@ RpcSession.prototype.sendCall = function () { | ||
RpcSession.prototype.callRemoteResponse = function (data) { | ||
var _ = data[0], id = data[1], res = data[2], description = data[3], details = data[4]; | ||
var messageType = data[0], id = data[1], other = data.slice(2); | ||
if (this.runningCalls[id]) { | ||
var _a = this.runningCalls[id], resolve = _a.resolve, reject = _a.reject; | ||
delete this.runningCalls[id]; | ||
if (data[0] == rpc_1.MessageType.Result) { | ||
resolve(res); | ||
if (messageType == rpc_1.MessageType.Result || messageType == rpc_1.MessageType.Data) { | ||
if (messageType == rpc_1.MessageType.Result) { | ||
resolve(other[0]); | ||
} | ||
else { | ||
resolve(other[2]); | ||
} | ||
} | ||
else { | ||
var res = other[0], description = other[1], details = other[2]; | ||
var error = new Error(description || res || "Remote call failed"); | ||
@@ -336,3 +347,3 @@ Object.assign(error, details || {}); | ||
}; | ||
return [4 /*yield*/, this.localMiddleware(callContext_1, invokeLocalMethod, params)]; | ||
return [4 /*yield*/, this.localMiddleware(callContext_1, invokeLocalMethod, params, rpc_1.MessageType.Call)]; | ||
case 1: | ||
@@ -354,3 +365,3 @@ r = _a.sent(); | ||
return __awaiter(this, void 0, void 0, function () { | ||
var d, e_3; | ||
var callContext_2, getFromTopic, r, e_3; | ||
return __generator(this, function (_a) { | ||
@@ -360,6 +371,11 @@ switch (_a.label) { | ||
_a.trys.push([0, 2, , 3]); | ||
return [4 /*yield*/, topic.getData(params, this.createContext(id, topic.getTopicName()))]; | ||
callContext_2 = this.createContext(id, topic.getTopicName()); | ||
getFromTopic = function (p) { | ||
if (p === void 0) { p = params; } | ||
return topic.getData(p, callContext_2); | ||
}; | ||
return [4 /*yield*/, this.localMiddleware(callContext_2, getFromTopic, params, rpc_1.MessageType.Get)]; | ||
case 1: | ||
d = _a.sent(); | ||
this.send(rpc_1.MessageType.Result, id, d); | ||
r = _a.sent(); | ||
this.send(rpc_1.MessageType.Result, id, r); | ||
return [3 /*break*/, 3]; | ||
@@ -376,12 +392,27 @@ case 2: | ||
}; | ||
RpcSession.prototype.subscribe = function (topic, params) { | ||
RpcSession.prototype.subscribe = function (topic, params, messageId) { | ||
return __awaiter(this, void 0, void 0, function () { | ||
var ctx_1, subscribeTopic, r, e_4; | ||
var _this = this; | ||
return __generator(this, function (_a) { | ||
switch (_a.label) { | ||
case 0: return [4 /*yield*/, topic.subscribeSession(this, params)]; | ||
case 0: | ||
_a.trys.push([0, 2, , 3]); | ||
ctx_1 = this.createContext(messageId, topic.getTopicName()); | ||
subscribeTopic = function (p) { | ||
if (p === void 0) { p = params; } | ||
return topic.subscribeSession(_this, p, messageId, ctx_1); | ||
}; | ||
return [4 /*yield*/, this.localMiddleware(ctx_1, subscribeTopic, params, rpc_1.MessageType.Subscribe)]; | ||
case 1: | ||
_a.sent(); | ||
r = _a.sent(); | ||
this.send(rpc_1.MessageType.Data, messageId, topic.getTopicName(), params, r); | ||
this.subscriptions.push({ topic: topic, params: params }); | ||
this.listeners.subscribed(this.subscriptions.length); | ||
return [2 /*return*/]; | ||
return [3 /*break*/, 3]; | ||
case 2: | ||
e_4 = _a.sent(); | ||
this.sendError(messageId, e_4); | ||
return [3 /*break*/, 3]; | ||
case 3: return [2 /*return*/]; | ||
} | ||
@@ -388,0 +419,0 @@ }); |
@@ -78,4 +78,4 @@ "use strict"; | ||
}, | ||
localMiddleware: function (ctx, next, params) { return next(params); }, | ||
remoteMiddleware: function (ctx, next, params) { return next(params); }, | ||
localMiddleware: function (ctx, next, params, messageType) { return next(params); }, | ||
remoteMiddleware: function (ctx, next, params, messageType) { return next(params); }, | ||
clientLevel: 0, | ||
@@ -82,0 +82,0 @@ pingSendTimeout: 40 * 1000, |
@@ -117,3 +117,3 @@ "use strict"; | ||
} | ||
return function (ctx, next, params) { | ||
return function (ctx, next, params, messageType) { | ||
var index = -1; | ||
@@ -130,3 +130,3 @@ return dispatch(0, params); | ||
else { | ||
return Promise.resolve(middleware[i](ctx, dispatch.bind(null, i + 1), p)); | ||
return Promise.resolve(middleware[i](ctx, dispatch.bind(null, i + 1), p, messageType)); | ||
} | ||
@@ -133,0 +133,0 @@ } |
{ | ||
"name": "@push-rpc/core", | ||
"version": "1.1.9", | ||
"version": "1.2.0", | ||
"main": "dist/index.js", | ||
@@ -20,3 +20,3 @@ "types": "dist/index.d.ts", | ||
}, | ||
"gitHead": "570406851d106630e4d006b1fc5a23aae97e7c74" | ||
"gitHead": "ea55b58bf0f022332eea31d2e1987b76cd351ea9" | ||
} |
@@ -49,4 +49,4 @@ import {RpcSession} from "./RpcSession" | ||
createContext: () => ({remoteId: null}), | ||
localMiddleware: (ctx, next, params) => next(params), | ||
remoteMiddleware: (ctx, next, params) => next(params), | ||
localMiddleware: (ctx, next, params, messageType) => next(params), | ||
remoteMiddleware: (ctx, next, params, messageType) => next(params), | ||
messageParser: data => JSON.parse(data, dateReviver), | ||
@@ -53,0 +53,0 @@ pingSendTimeout: null, |
@@ -63,7 +63,8 @@ import {DataConsumer, DataSupplier, MessageType, Topic, TopicImpl} from "./rpc" | ||
async subscribeSession(session: RpcSession, filter: F) { | ||
async subscribeSession(session: RpcSession, filter: F, messageId, ctx) { | ||
const key = JSON.stringify(filter) | ||
const localTopic = this | ||
const data = await this.supplier(filter, ctx) | ||
const subscription: Subscription<F, D, TD> = this.subscriptions[key] || { | ||
@@ -85,10 +86,6 @@ filter, | ||
// TODO if already subscribed, just send current data, and do not add a new subscription - it will save | ||
// some network ops | ||
subscription.sessions.push(session) | ||
this.subscriptions[key] = subscription | ||
const data = await this.supplier(filter, session.createContext()) | ||
session.send(MessageType.Data, createMessageId(), this.getTopicName(), filter, data) | ||
return data | ||
} | ||
@@ -121,3 +118,3 @@ | ||
} | ||
subscribe(consumer: DataConsumer<D>, params: F, subscriptionKey: any): void {} | ||
async subscribe(consumer: DataConsumer<D>, params: F, subscriptionKey: any) {} | ||
unsubscribe(params?: F, subscriptionKey?: any) {} | ||
@@ -124,0 +121,0 @@ } |
@@ -1,5 +0,4 @@ | ||
import {LocalTopicImpl} from "./local" | ||
import {DataConsumer, LocalTopic, MessageType, Method, RemoteTopic, TopicImpl} from "./rpc" | ||
import {RpcSession} from "./RpcSession" | ||
import {createMessageId, getClassMethodNames} from "./utils" | ||
import {RpcSession} from "./RpcSession" | ||
@@ -17,7 +16,7 @@ interface Subscription<D> { | ||
subscribe<SubscriptionKey = DataConsumer<D>>( | ||
async subscribe<SubscriptionKey = DataConsumer<D>>( | ||
consumer: DataConsumer<D>, | ||
filter: F = {} as any, | ||
subscriptionKey: SubscriptionKey = consumer as any | ||
): SubscriptionKey { | ||
): Promise<SubscriptionKey> { | ||
if (filter === null) { | ||
@@ -31,7 +30,3 @@ throw new Error( | ||
this.consumers[paramsKey] = [...(this.consumers[paramsKey] || []), {consumer, subscriptionKey}] | ||
// TODO it is not necessary to send subscribe if we already have cached value | ||
this.session.send(MessageType.Subscribe, createMessageId(), this.topicName, filter) | ||
// already have cached value with this params? | ||
if (this.cached[paramsKey] !== undefined) { | ||
@@ -41,2 +36,11 @@ consumer(this.cached[paramsKey]) | ||
this.consumers[paramsKey] = [...(this.consumers[paramsKey] || []), {consumer, subscriptionKey}] | ||
try { | ||
await this.session.callRemote(this.topicName, filter, MessageType.Subscribe) | ||
} catch (e) { | ||
this.unsubscribe(filter, subscriptionKey) | ||
throw e | ||
} | ||
return subscriptionKey | ||
@@ -43,0 +47,0 @@ } |
@@ -43,3 +43,3 @@ /** | ||
export interface RemoteTopic<D, P> { | ||
subscribe(consumer: DataConsumer<D>, params?: P, subscriptionKey?: any): void | ||
subscribe(consumer: DataConsumer<D>, params?: P, subscriptionKey?: any): Promise<any> | ||
unsubscribe(params?: P, subscriptionKey?: any) | ||
@@ -93,7 +93,8 @@ get(params?: P): Promise<D> | ||
export type Middleware = ( | ||
ctx: RpcContext, | ||
ctx: any, | ||
next: (params: any) => Promise<any>, | ||
params: any | ||
params: any, | ||
messageType: MessageType.Call | MessageType.Get | MessageType.Subscribe | ||
) => Promise<any> | ||
// id generator |
@@ -0,14 +1,7 @@ | ||
import {LocalTopicImpl} from "./local" | ||
import {log} from "./logger" | ||
import {createMessageId, message} from "./utils" | ||
import { | ||
getServiceItem, | ||
MessageType, | ||
Method, | ||
Middleware, | ||
RpcConnectionContext, | ||
RpcContext, | ||
} from "./rpc" | ||
import {LocalTopicImpl} from "./local" | ||
import {createRemote, RemoteTopicImpl} from "./remote" | ||
import {getServiceItem, MessageType, Method, Middleware, RpcConnectionContext, RpcContext,} from "./rpc" | ||
import {Socket} from "./transport" | ||
import {createMessageId, message} from "./utils" | ||
@@ -167,3 +160,3 @@ export interface RpcSessionListeners { | ||
this.subscribe(localTopic, other[0]) | ||
this.subscribe(localTopic, other[0], id) | ||
break | ||
@@ -209,2 +202,8 @@ | ||
// release await on subscribe | ||
if (this.runningCalls[id]) { | ||
this.callRemoteResponse(message) | ||
} | ||
// and deliver to callback | ||
remoteTopic.receiveData(other[0], other[1]) | ||
@@ -264,3 +263,3 @@ break | ||
return this.remoteMiddleware(null, sendMessage, params) | ||
return this.remoteMiddleware(null, sendMessage, params, type) | ||
} | ||
@@ -303,3 +302,3 @@ | ||
private callRemoteResponse(data) { | ||
const [_, id, res, description, details] = data | ||
const [messageType, id, ...other] = data | ||
@@ -310,5 +309,11 @@ if (this.runningCalls[id]) { | ||
if (data[0] == MessageType.Result) { | ||
resolve(res) | ||
if (messageType == MessageType.Result || messageType == MessageType.Data) { | ||
if (messageType == MessageType.Result) { | ||
resolve(other[0]) | ||
} else { | ||
resolve(other[2]) | ||
} | ||
} else { | ||
const [res, description, details] = other | ||
const error = new Error(description || res || "Remote call failed") | ||
@@ -329,3 +334,3 @@ Object.assign(error, details || {}) | ||
const invokeLocalMethod = (p = params) => localMethod.call(localMethodObject, p, callContext) | ||
const r = await this.localMiddleware(callContext, invokeLocalMethod, params) | ||
const r = await this.localMiddleware(callContext, invokeLocalMethod, params, MessageType.Call) | ||
@@ -341,4 +346,8 @@ this.send(MessageType.Result, id, r) | ||
try { | ||
const d = await topic.getData(params, this.createContext(id, topic.getTopicName())) | ||
this.send(MessageType.Result, id, d) | ||
const callContext = this.createContext(id, topic.getTopicName()) | ||
const getFromTopic = (p = params) => topic.getData(p, callContext) | ||
const r = await this.localMiddleware(callContext, getFromTopic, params, MessageType.Get) | ||
this.send(MessageType.Result, id, r) | ||
} catch (e) { | ||
@@ -350,6 +359,16 @@ log.error(`Unable to get data from topic ${name}`, e) | ||
private async subscribe(topic: LocalTopicImpl<any, any>, params) { | ||
await topic.subscribeSession(this, params) | ||
this.subscriptions.push({topic, params}) | ||
this.listeners.subscribed(this.subscriptions.length) | ||
private async subscribe(topic: LocalTopicImpl<any, any>, params, messageId) { | ||
try { | ||
const ctx = this.createContext(messageId, topic.getTopicName()) | ||
const subscribeTopic = (p = params) => topic.subscribeSession(this, p, messageId, ctx) | ||
const r = await this.localMiddleware(ctx, subscribeTopic, params, MessageType.Subscribe) | ||
this.send(MessageType.Data, messageId, topic.getTopicName(), params, r) | ||
this.subscriptions.push({topic, params}) | ||
this.listeners.subscribed(this.subscriptions.length) | ||
} catch (e) { | ||
this.sendError(messageId, e) | ||
} | ||
} | ||
@@ -356,0 +375,0 @@ |
@@ -35,4 +35,4 @@ import * as UUID from "uuid-js" | ||
}), | ||
localMiddleware: (ctx, next, params) => next(params), | ||
remoteMiddleware: (ctx, next, params) => next(params), | ||
localMiddleware: (ctx, next, params, messageType) => next(params), | ||
remoteMiddleware: (ctx, next, params, messageType) => next(params), | ||
clientLevel: 0, | ||
@@ -39,0 +39,0 @@ pingSendTimeout: 40 * 1000, |
@@ -77,3 +77,3 @@ import * as UUID from "uuid-js" | ||
export function composeMiddleware(...middleware: Middleware[]): Middleware { | ||
return function(ctx, next, params) { | ||
return function(ctx, next, params, messageType) { | ||
let index = -1 | ||
@@ -91,3 +91,3 @@ return dispatch(0, params) | ||
} else { | ||
return Promise.resolve(middleware[i](ctx, dispatch.bind(null, i + 1), p)) | ||
return Promise.resolve(middleware[i](ctx, dispatch.bind(null, i + 1), p, messageType)) | ||
} | ||
@@ -94,0 +94,0 @@ } catch (err) { |
@@ -1,4 +0,5 @@ | ||
import {composeMiddleware} from "../src" | ||
import {assert} from "chai" | ||
import {composeMiddleware, LocalTopicImpl, MessageType} from "../src" | ||
import {Middleware} from "../src/rpc" | ||
import {assert} from "chai" | ||
import {message} from "../src/utils" | ||
import {createTestClient, startTestServer} from "./testUtils" | ||
@@ -22,3 +23,3 @@ | ||
const r = await composed(null, async p => p, 0) | ||
const r = await composed(null, async p => p, 0, MessageType.Call) | ||
@@ -28,5 +29,98 @@ assert.equal(r, 3) | ||
it("local", async () => { | ||
it("local get topic", async () => { | ||
let mwMessageType = null | ||
await startTestServer( | ||
{ | ||
item: new LocalTopicImpl(async () => "1"), | ||
}, | ||
{ | ||
localMiddleware: (ctx, next, params, messageType) => { | ||
mwMessageType = messageType | ||
return next(params) | ||
}, | ||
} | ||
) | ||
const client = await createTestClient(0) | ||
const r = await client.item.get() | ||
assert.equal(r, "1") | ||
assert.equal(mwMessageType, MessageType.Get) | ||
}) | ||
it("remote get topic", async () => { | ||
let mwMessageType = null | ||
await startTestServer( | ||
{ | ||
item: new LocalTopicImpl(async () => "1"), | ||
} | ||
) | ||
const client = await createTestClient(0, { | ||
remoteMiddleware: (ctx, next, params, messageType) => { | ||
mwMessageType = messageType | ||
return next(params) | ||
}, | ||
}) | ||
const r = await client.item.get() | ||
assert.equal(r, "1") | ||
assert.equal(mwMessageType, MessageType.Get) | ||
}) | ||
it("local topic subscribe", async () => { | ||
let mwMessageType = null | ||
await startTestServer( | ||
{ | ||
item: new LocalTopicImpl(async () => "1"), | ||
}, | ||
{ | ||
localMiddleware: (ctx, next, params, messageType) => { | ||
mwMessageType = messageType | ||
return next(params) | ||
}, | ||
} | ||
) | ||
let r = null | ||
const client = await createTestClient(0) | ||
await client.item.subscribe(data => r = data) | ||
await new Promise(r => setTimeout(r, 50)) | ||
assert.equal(r, "1") | ||
assert.equal(mwMessageType, MessageType.Subscribe) | ||
}) | ||
it("remote topic subscribe", async () => { | ||
let mwMessageType = null | ||
await startTestServer( | ||
{ | ||
item: new LocalTopicImpl(async () => "1"), | ||
} | ||
) | ||
let r = null | ||
const client = await createTestClient(0, { | ||
remoteMiddleware: (ctx, next, params, messageType) => { | ||
mwMessageType = messageType | ||
return next(params) | ||
}, | ||
}) | ||
await client.item.subscribe(data => r = data) | ||
await new Promise(r => setTimeout(r, 50)) | ||
assert.equal(r, "1") | ||
assert.equal(mwMessageType, MessageType.Subscribe) | ||
}) | ||
it("local param update", async () => { | ||
await startTestServer( | ||
{ | ||
async getSomething(req) { | ||
@@ -33,0 +127,0 @@ return req |
import {assert} from "chai" | ||
import {createRpcClient, LocalTopicImpl} from "../src" | ||
import {createRpcClient, LocalTopicImpl, MessageType} from "../src" | ||
import {groupReducer} from "../src/local" | ||
@@ -8,2 +8,21 @@ import {createTestClient, startTestServer, TEST_PORT} from "./testUtils" | ||
describe("Topics", () => { | ||
it("error in supplier breaks subscribe", async () => { | ||
await startTestServer( | ||
{ | ||
item: new LocalTopicImpl(async () => { | ||
throw new Error("AA") | ||
}), | ||
} | ||
) | ||
const client = await createTestClient(0) | ||
try { | ||
await client.item.subscribe(() => {}) | ||
assert.fail("Error expected") | ||
} catch (e) { | ||
assert.equal(e.message, "AA") | ||
} | ||
}) | ||
it("get", async () => { | ||
@@ -201,3 +220,3 @@ const item = {r: "asf"} | ||
let item2 | ||
await client.test.item.subscribe(item => { | ||
client.test.item.subscribe(item => { | ||
item2 = item | ||
@@ -208,2 +227,7 @@ }) | ||
assert.deepEqual(item2, {r: "1"}) | ||
await new Promise(resolve => setTimeout(resolve, 50)) | ||
// and a new version after some time | ||
assert.deepEqual(item2, item) | ||
}) | ||
@@ -210,0 +234,0 @@ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
144517
3825