@divine/synchronization
Advanced tools
Comparing version 1.1.0 to 1.2.0
@@ -1395,2 +1395,26 @@ (function (global, factory) { | ||
var FilteredQueue = /** @class */ (function () { | ||
function FilteredQueue(queue, filter) { | ||
this.queue = queue; | ||
this.filter = filter; | ||
} | ||
FilteredQueue.prototype.push = function (data) { | ||
return this.filter(data) ? this.queue.push(data) : null; | ||
}; | ||
FilteredQueue.prototype.pushOrWait = function (data, timeout) { | ||
return tslib_es6.__awaiter(this, void 0, void 0, function () { | ||
return tslib_es6.__generator(this, function (_a) { | ||
return [2 /*return*/, this.filter(data) ? this.queue.pushOrWait(data, timeout) : null]; | ||
}); | ||
}); | ||
}; | ||
FilteredQueue.prototype.shiftOrWait = function (timeout) { | ||
return tslib_es6.__awaiter(this, void 0, void 0, function () { | ||
return tslib_es6.__generator(this, function (_a) { | ||
return [2 /*return*/, this.queue.shiftOrWait(timeout)]; | ||
}); | ||
}); | ||
}; | ||
return FilteredQueue; | ||
}()); | ||
var PubSubBase = /** @class */ (function () { | ||
@@ -1407,4 +1431,5 @@ function PubSubBase() { | ||
try { | ||
queue.push(data); | ||
++result; | ||
if (queue.push(data) !== null) { | ||
++result; | ||
} | ||
} | ||
@@ -1435,3 +1460,3 @@ catch (ex) { | ||
result = _a.sent(); | ||
return [2 /*return*/, result.filter(function (res) { return res !== undefined; }).length]; | ||
return [2 /*return*/, result.filter(function (res) { return res !== undefined && res !== null; }).length]; | ||
} | ||
@@ -1441,3 +1466,3 @@ }); | ||
}; | ||
PubSubBase.prototype.subscribe = function (timeout) { | ||
PubSubBase.prototype.subscribe = function (filter, timeout) { | ||
return tslib_es6.__asyncGenerator(this, arguments, function subscribe_1() { | ||
@@ -1448,3 +1473,10 @@ var queue; | ||
case 0: | ||
queue = this._createQueue(); | ||
if (typeof filter === 'number') { | ||
timeout = filter; | ||
filter = undefined; | ||
} | ||
if (filter === undefined) { | ||
filter = function () { return true; }; | ||
} | ||
queue = this._createQueue(filter); | ||
_a.label = 1; | ||
@@ -1487,4 +1519,4 @@ case 1: | ||
} | ||
PubSub.prototype._createQueue = function () { | ||
return new queue.Queue(this._capacity); | ||
PubSub.prototype._createQueue = function (filter) { | ||
return new FilteredQueue(new queue.Queue(this._capacity), filter); | ||
}; | ||
@@ -1501,4 +1533,4 @@ return PubSub; | ||
} | ||
FairPubSub.prototype._createQueue = function () { | ||
return new queue.FairQueue(this._capacity); | ||
FairPubSub.prototype._createQueue = function (filter) { | ||
return new FilteredQueue(new queue.FairQueue(this._capacity), function (data) { return filter(data.data); }); | ||
}; | ||
@@ -1505,0 +1537,0 @@ return FairPubSub; |
@@ -1,2 +0,10 @@ | ||
import { BlockingQueue, DRRData, FairQueue, Queue } from './queue'; | ||
import { BlockingQueue, DRRData } from './queue'; | ||
declare class FilteredQueue<T, W> { | ||
private queue; | ||
private filter; | ||
constructor(queue: BlockingQueue<T, W>, filter: (data: W) => boolean); | ||
push(data: W): number | null; | ||
pushOrWait(data: W, timeout?: number): Promise<number | null | undefined>; | ||
shiftOrWait(timeout?: number): Promise<T | undefined>; | ||
} | ||
declare abstract class PubSubBase<T, W> { | ||
@@ -6,6 +14,7 @@ private _topic; | ||
publishOrWait(data: W, timeout?: number): Promise<number>; | ||
subscribe(): AsyncGenerator<T, void>; | ||
subscribe(filter?: (data: T) => boolean): AsyncGenerator<T, void>; | ||
subscribe(filter?: (data: T) => boolean, timeout?: number): AsyncGenerator<T | undefined, void>; | ||
subscribe(timeout?: number): AsyncGenerator<T | undefined, void>; | ||
get subscribers(): number; | ||
protected abstract _createQueue(): BlockingQueue<T, W>; | ||
protected abstract _createQueue(filter: (value: T) => boolean): FilteredQueue<T, W>; | ||
} | ||
@@ -15,3 +24,3 @@ export declare class PubSub<T> extends PubSubBase<T, T> { | ||
constructor(_capacity?: number | undefined); | ||
protected _createQueue(): Queue<T>; | ||
protected _createQueue(filter: (value: T) => boolean): FilteredQueue<T, T>; | ||
} | ||
@@ -21,4 +30,4 @@ export declare class FairPubSub<T> extends PubSubBase<T, DRRData<T>> { | ||
constructor(_capacity?: number | undefined); | ||
protected _createQueue(): FairQueue<T>; | ||
protected _createQueue(filter: (value: T) => boolean): FilteredQueue<T, DRRData<T>>; | ||
} | ||
export {}; |
@@ -6,2 +6,26 @@ "use strict"; | ||
var queue_1 = require("./queue"); | ||
var FilteredQueue = /** @class */ (function () { | ||
function FilteredQueue(queue, filter) { | ||
this.queue = queue; | ||
this.filter = filter; | ||
} | ||
FilteredQueue.prototype.push = function (data) { | ||
return this.filter(data) ? this.queue.push(data) : null; | ||
}; | ||
FilteredQueue.prototype.pushOrWait = function (data, timeout) { | ||
return tslib_1.__awaiter(this, void 0, void 0, function () { | ||
return tslib_1.__generator(this, function (_a) { | ||
return [2 /*return*/, this.filter(data) ? this.queue.pushOrWait(data, timeout) : null]; | ||
}); | ||
}); | ||
}; | ||
FilteredQueue.prototype.shiftOrWait = function (timeout) { | ||
return tslib_1.__awaiter(this, void 0, void 0, function () { | ||
return tslib_1.__generator(this, function (_a) { | ||
return [2 /*return*/, this.queue.shiftOrWait(timeout)]; | ||
}); | ||
}); | ||
}; | ||
return FilteredQueue; | ||
}()); | ||
var PubSubBase = /** @class */ (function () { | ||
@@ -18,4 +42,5 @@ function PubSubBase() { | ||
try { | ||
queue.push(data); | ||
++result; | ||
if (queue.push(data) !== null) { | ||
++result; | ||
} | ||
} | ||
@@ -46,3 +71,3 @@ catch (ex) { | ||
result = _a.sent(); | ||
return [2 /*return*/, result.filter(function (res) { return res !== undefined; }).length]; | ||
return [2 /*return*/, result.filter(function (res) { return res !== undefined && res !== null; }).length]; | ||
} | ||
@@ -52,3 +77,3 @@ }); | ||
}; | ||
PubSubBase.prototype.subscribe = function (timeout) { | ||
PubSubBase.prototype.subscribe = function (filter, timeout) { | ||
return tslib_1.__asyncGenerator(this, arguments, function subscribe_1() { | ||
@@ -59,3 +84,10 @@ var queue; | ||
case 0: | ||
queue = this._createQueue(); | ||
if (typeof filter === 'number') { | ||
timeout = filter; | ||
filter = undefined; | ||
} | ||
if (filter === undefined) { | ||
filter = function () { return true; }; | ||
} | ||
queue = this._createQueue(filter); | ||
_a.label = 1; | ||
@@ -99,4 +131,4 @@ case 1: | ||
} | ||
PubSub.prototype._createQueue = function () { | ||
return new queue_1.Queue(this._capacity); | ||
PubSub.prototype._createQueue = function (filter) { | ||
return new FilteredQueue(new queue_1.Queue(this._capacity), filter); | ||
}; | ||
@@ -113,4 +145,4 @@ return PubSub; | ||
} | ||
FairPubSub.prototype._createQueue = function () { | ||
return new queue_1.FairQueue(this._capacity); | ||
FairPubSub.prototype._createQueue = function (filter) { | ||
return new FilteredQueue(new queue_1.FairQueue(this._capacity), function (data) { return filter(data.data); }); | ||
}; | ||
@@ -117,0 +149,0 @@ return FairPubSub; |
{ | ||
"name": "@divine/synchronization", | ||
"version": "1.1.0", | ||
"version": "1.2.0", | ||
"description": "The Divine Synchronization Library", | ||
@@ -28,2 +28,5 @@ "main": "lib/src/index.js", | ||
"cyclic barrier", | ||
"pubsub", | ||
"publish", | ||
"subscribe", | ||
"lightswitch", | ||
@@ -30,0 +33,0 @@ "read-write lock" |
import { BlockingQueue, DRRData, FairQueue, Queue } from './queue'; | ||
class FilteredQueue<T, W> { | ||
constructor(private queue: BlockingQueue<T, W>, private filter: (data: W) => boolean) { | ||
} | ||
push(data: W) { | ||
return this.filter(data) ? this.queue.push(data) : null; | ||
} | ||
async pushOrWait(data: W, timeout?: number) { | ||
return this.filter(data) ? this.queue.pushOrWait(data, timeout) : null; | ||
} | ||
async shiftOrWait(timeout?: number) { | ||
return this.queue.shiftOrWait(timeout); | ||
} | ||
} | ||
abstract class PubSubBase<T, W> { | ||
private _topic = new Set<BlockingQueue<T, W>>(); | ||
private _topic = new Set<FilteredQueue<T, W>>(); | ||
@@ -11,4 +28,5 @@ publish(data: W): number { | ||
try { | ||
queue.push(data); | ||
++result; | ||
if (queue.push(data) !== null) { | ||
++result; | ||
} | ||
} | ||
@@ -28,10 +46,20 @@ catch (ex) { | ||
return result.filter((res) => res !== undefined).length; | ||
return result.filter((res) => res !== undefined && res !== null).length; | ||
} | ||
subscribe(): AsyncGenerator<T, void>; | ||
subscribe(filter?: (data: T) => boolean): AsyncGenerator<T, void>; | ||
subscribe(filter?: (data: T) => boolean, timeout?: number): AsyncGenerator<T | undefined, void>; | ||
subscribe(timeout?: number): AsyncGenerator<T | undefined, void>; | ||
async *subscribe(timeout?: number): AsyncGenerator<T | undefined, void> { | ||
const queue = this._createQueue(); | ||
async *subscribe(filter?: ((data: T) => boolean) | number, timeout?: number): AsyncGenerator<T | undefined, void> { | ||
if (typeof filter === 'number') { | ||
timeout = filter; | ||
filter = undefined; | ||
} | ||
if (filter === undefined) { | ||
filter = () => true; | ||
} | ||
const queue = this._createQueue(filter); | ||
try { | ||
@@ -53,3 +81,3 @@ this._topic.add(queue); | ||
protected abstract _createQueue(): BlockingQueue<T, W>; | ||
protected abstract _createQueue(filter: (value: T) => boolean): FilteredQueue<T, W>; | ||
} | ||
@@ -62,4 +90,4 @@ | ||
protected _createQueue() { | ||
return new Queue<T>(this._capacity); | ||
protected _createQueue(filter: (value: T) => boolean) { | ||
return new FilteredQueue(new Queue<T>(this._capacity), filter); | ||
} | ||
@@ -73,5 +101,5 @@ } | ||
protected _createQueue() { | ||
return new FairQueue<T>(this._capacity); | ||
protected _createQueue(filter: (value: T) => boolean) { | ||
return new FilteredQueue(new FairQueue<T>(this._capacity), (data: DRRData<T>) => filter(data.data)); | ||
} | ||
} |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
222572
3027