import * as Immutable from "immutable"; | ||
import { Base } from "./base"; | ||
import { Collection } from "./collection"; | ||
import { Event } from "./event"; | ||
@@ -9,68 +8,132 @@ import { Index } from "./index"; | ||
import { TimeRange } from "./timerange"; | ||
import { AggregationSpec, AlignmentOptions, CoalesceOptions, CollapseOptions, FillOptions, RateOptions, ReduceOptions, SelectOptions, WindowingOptions } from "./types"; | ||
import { Node } from "./node"; | ||
import { AggregationSpec, AlignmentOptions, CoalesceOptions, CollapseOptions, EventCallback, FillOptions, KeyedCollection, KeyedCollectionCallback, RateOptions, ReduceOptions, SelectOptions, WindowingOptions } from "./types"; | ||
/** | ||
* A Node is a transformation between type S and type T. Both S | ||
* and T much extend Base. | ||
* | ||
* The transformation happens when a `Node` has its `set()` method called | ||
* by another `Node`. The `input` to set() is of type `S`. When this happens | ||
* a subclass specific implementation of `process` is called to actually | ||
* transform the input (of type `S` to an output of type `T`). Of course | ||
* `S` and `T` maybe the same if the input and output types are expected | ||
* to be the same. The result of `process`, of type `T`, is returned and | ||
* the passed onto other downstream Nodes, by calling their `set()` methods. | ||
*/ | ||
export declare type EventCallback = (event: Event<Key>) => void; | ||
export declare type KeyedCollectionCallback<T extends Key> = (collection: Collection<T>, key: string) => void; | ||
export declare type KeyedCollection<T extends Key> = [string, Collection<T>]; | ||
/** | ||
* @private | ||
* | ||
* A `StreamInterface` is the base class for the family of facards placed in front of | ||
* the underlying `Stream` to provide the appropiate API layer depending on what type | ||
* of data is being passed through the pipeline at any given stage. | ||
* | ||
* At this base class level, it holds onto a reference to the underlying `Stream` | ||
* object (which contains the root of the `Node` tree into which `Event`s are | ||
* inserted). It also contains the ability to `addEvent()` method to achieve this (from | ||
* the user's point of view) and `addNode()` which gives allows additions to the | ||
* tree. | ||
* | ||
* Note that while the tree is held onto by its root node within the `Stream` object, | ||
* the current addition point, the `tail` is held by each `StreamInterface`. When a | ||
* `Node` is appended to the `tail` an entirely new interface is returned (its type | ||
* dependent on the output type of the `Node` appended), and that interface will contain | ||
* the new tail point on the tree, while the old one is unchanged. This allows for | ||
* branching of the tree. | ||
*/ | ||
export declare abstract class Node<S extends Base, T extends Base> { | ||
protected observers: Immutable.List<Node<T, Base>>; | ||
addObserver(node: Node<T, Base>): void; | ||
set(input: S): void; | ||
protected notify(output: T): void; | ||
protected abstract process(input: S): Immutable.List<T>; | ||
export declare class StreamInterface<IN extends Key, S extends Key> { | ||
protected stream: Stream<S>; | ||
protected tail: Node<Base, Base>; | ||
constructor(stream: Stream<S>, tail: Node<Base, Base>); | ||
/** | ||
* Returns the underlying `Stream` object, which primarily contains the | ||
* `root` of the processing graph. | ||
*/ | ||
getStream(): Stream<S>; | ||
/** | ||
* Add events into the stream | ||
*/ | ||
addEvent(e: Event<S>): void; | ||
/** | ||
* @protected | ||
*/ | ||
addNode(node: any): void; | ||
} | ||
/** | ||
* An `EventStream` is the interface to the stream provided for manipulation of | ||
* parts of the streaming pipeline that map a stream of Events of type <T>. | ||
* parts of the streaming pipeline that map a stream of `Event`s of type <IN>. | ||
* | ||
* For example a stream of Events<Time> can be mapped to an output stream of | ||
* new Events<Time> that are aligned to a fixed period boundary. Less or more Events | ||
* For example a stream of `Event<Time>`s can be mapped to an output stream of | ||
* new `Event<Time>`s that are aligned to a fixed period boundary. Less or more `Event`s | ||
* may result. | ||
* | ||
* The type parameter `<U>` is the input `Event` type at the top of the stream, since each | ||
* interface exposes the `addEvent(Event<U>)` method for inserting events at the top of | ||
* the stream. | ||
* The type parameter `<S>` is the input `Event` type at the top of the stream, since each | ||
* interface exposes the `addEvent(e: Event<S>)` method for inserting events at the top of | ||
* the stream this type is maintained across all stream interfaces. | ||
* | ||
* The type parameter `<T>` is the type of `Event`s in this part of the stream. That is | ||
* nodes created by the API at this point of the stream will expect Events of type T, | ||
* and will output new Events, potentially of a different type. | ||
* The type parameter `<IN>` is the type of `Event`s in this part of the stream. That is | ||
* nodes created by the API at this point of the tree will expect `Event<IN>`s, | ||
* and will output new Events, potentially of a different type (identified as `<OUT>`). | ||
* Typically `<IN>` and `<OUT>` would be `Time`, `TimeRange` or `Index`. | ||
*/ | ||
export declare class EventStream<T extends Key, U extends Key> { | ||
private stream; | ||
constructor(stream: Stream<U>); | ||
export declare class EventStream<IN extends Key, S extends Key> extends StreamInterface<IN, S> { | ||
constructor(stream: Stream<S>, tail: Node<Base, Base>); | ||
/** | ||
* @private | ||
* | ||
* Add events into the stream | ||
* Adds a new `Node` which converts a stream of `Event<IN>` to `Event<OUT>` | ||
* | ||
* <IN> is the source type for the processing node | ||
* <OUT> is the output Event type for the processing node | ||
* | ||
* Both IN and OUT extend Key, which is `Time`, `TimeRange` or `Index`, typically. | ||
*/ | ||
addEvent(e: Event<U>): void; | ||
addEventToEventNode<OUT extends Key>(node: EventMap<IN, OUT>): EventStream<OUT, S>; | ||
/** | ||
* @private | ||
* | ||
* Adds a new `Node` which converts a stream of `Event<IN>`s to a `KeyedCollection<OUT>`. | ||
* | ||
* <IN> is the source type for the processing node | ||
* <OUT> is the output Event type for the processing node | ||
* | ||
* Both IN and OUT extend Key, which is Time, TimeRange or Index, typically. | ||
*/ | ||
addEventToCollectorNode<OUT extends Key>(node: EventToKeyedCollection<IN, OUT>): KeyedCollectionStream<OUT, S>; | ||
/** | ||
* Remaps each Event<T> in the stream to a new Event<M>. | ||
*/ | ||
map<M extends Key>(mapper: (event: Event<T>) => Event<M>): EventStream<M, U>; | ||
map<OUT extends Key>(mapper: (event: Event<IN>) => Event<OUT>): EventStream<OUT, S>; | ||
/** | ||
* Remaps each Event<T> in the stream to 0, 1 or many Event<M>s. | ||
* Remaps each Event<IN> in the stream to 0, 1 or many Event<OUT>s. | ||
*/ | ||
flatMap<M extends Key>(mapper: (event: Event<T>) => Immutable.List<Event<M>>): EventStream<M, U>; | ||
flatMap<OUT extends Key>(mapper: (event: Event<IN>) => Immutable.List<Event<OUT>>): EventStream<OUT, S>; | ||
/** | ||
* Reduces a sequence of past Event<T>s in the stream to a single output Event<M>. | ||
* Reduces a sequence of past Event<IN>s in the stream to a single output Event<M>. | ||
*/ | ||
reduce<M extends Key>(options: ReduceOptions<T>): EventStream<T, U>; | ||
coalesce(options: CoalesceOptions): EventStream<T, U>; | ||
reduce(options: ReduceOptions<IN>): EventStream<IN, S>; | ||
/** | ||
* Filter out `Event<IN>`s in the stream. Provide a predicate function that | ||
* given an Event returns true or false. | ||
* | ||
* Example: | ||
* ``` | ||
* const source = stream<Time>() | ||
* .filter(e => e.get("a") % 2 !== 0) | ||
* .output(evt => { | ||
* // -> 1, 3, 5 | ||
* }); | ||
* | ||
* source.addEvent(...); // <- 1, 2, 3, 4, 5 | ||
* ``` | ||
*/ | ||
filter<M extends Key>(predicate: (event: Event<IN>) => boolean): EventStream<IN, S>; | ||
/** | ||
* If you have multiple sources you can feed them into the same stream and combine them | ||
* with the `coalese()` processor. In this example two event sources are fed into the | ||
* `Stream`. One contains `Event`s with just a field "in", and the other just a field | ||
* "out". The resulting output is `Event`s with the latest (in arrival time) value for | ||
* "in" and "out" together: | ||
* | ||
* ```typescript | ||
* const source = stream() | ||
* .coalesce({ fields: ["in", "out"] }) | ||
* .output((e: Event) => results.push(e)); | ||
* | ||
* // Stream events | ||
* for (let i = 0; i < 5; i++) { | ||
* source.addEvent(streamIn[i]); // from stream 1 | ||
* source.addEvent(streamOut[i]); // from stream 2 | ||
* } | ||
* ``` | ||
*/ | ||
coalesce(options: CoalesceOptions): EventStream<IN, S>; | ||
/** | ||
* Fill missing values in stream events. | ||
@@ -89,3 +152,3 @@ * | ||
* | ||
* @example | ||
* Example: | ||
* ``` | ||
@@ -100,11 +163,11 @@ * const source = stream() | ||
*/ | ||
fill(options: FillOptions): EventStream<T, U>; | ||
fill(options: FillOptions): EventStream<IN, S>; | ||
/** | ||
* Align Events in the stream to a specific boundary at a fixed period. | ||
* Align `Event`s in the stream to a specific boundary at a fixed `period`. | ||
* Options are a `AlignmentOptions` object where you specify which field to | ||
* align with `fieldSpec`, what boundary period to use with `window` and | ||
* align with `fieldSpec`, what boundary `period` to use with `window` and | ||
* the method of alignment with `method` (which can be either `Linear` | ||
* interpolation, or `Hold`). | ||
* | ||
* @example | ||
* Example: | ||
* ``` | ||
@@ -119,6 +182,6 @@ * const s = stream() | ||
*/ | ||
align(options: AlignmentOptions): EventStream<T, U>; | ||
align(options: AlignmentOptions): EventStream<IN, S>; | ||
/** | ||
* Convert incoming Events in the stream to rates (essentially taking | ||
* the derivative over time). The resulting output Events will be | ||
* Convert incoming `Event`s in the stream to rates (essentially taking | ||
* the derivative over time). The resulting output `Event`s will be | ||
* of type `Event<TimeRange>`, where the `TimeRange` key will be | ||
@@ -135,3 +198,3 @@ * the time span over which the rate was calculated. If you want you | ||
* | ||
* @example | ||
* Example: | ||
* | ||
@@ -144,8 +207,8 @@ * ``` | ||
*/ | ||
rate(options: RateOptions): EventStream<TimeRange, U>; | ||
rate(options: RateOptions): EventStream<TimeRange, S>; | ||
/** | ||
* Convert incoming events to new events with on the specified | ||
* Convert incoming `Event`s to new `Event`s with on the specified | ||
* fields selected out of the source. | ||
* | ||
* @example | ||
* Example: | ||
* | ||
@@ -160,8 +223,8 @@ * Events with fields a, b, c can be mapped to events with only | ||
*/ | ||
select(options: SelectOptions): EventStream<T, U>; | ||
select(options: SelectOptions): EventStream<IN, S>; | ||
/** | ||
* Convert incoming events to new events with specified | ||
* Convert incoming `Event`s to new `Event`s with specified | ||
* fields collapsed into a new field using an aggregation function. | ||
* | ||
* @example | ||
* Example: | ||
* | ||
@@ -181,5 +244,6 @@ * Events with fields a, b, c can be mapped to events with only a field | ||
*/ | ||
collapse(options: CollapseOptions): EventStream<T, U>; | ||
collapse(options: CollapseOptions): EventStream<IN, S>; | ||
/** | ||
* An output, specified as an `EventCallback`, essentially `(event: Event<Key>) => void`. | ||
* | ||
* Using this method you are able to access the stream result. Your callback | ||
@@ -190,3 +254,3 @@ * function will be called whenever a new Event is available. Not that currently the | ||
* | ||
* @example | ||
* Example: | ||
* ``` | ||
@@ -202,11 +266,11 @@ * const source = stream<Time>() | ||
*/ | ||
output(callback: EventCallback): EventStream<T, U>; | ||
output(callback: EventCallback<IN>): EventStream<IN, S>; | ||
/** | ||
* The heart of the streaming code is that in addition to remapping operations of | ||
* a stream of events, you can also group by a window. This is what allows you to do | ||
* rollups with the streaming code. | ||
* rollups within the streaming code. | ||
* | ||
* A window is defined with the `WindowingOptions`, which allows you to specify | ||
* the window period as a `Period` (e.g. `period("30m")` for each 30 minutes window) | ||
* as the `window` and a `Trigger` enum value (emit a completed window on each | ||
* as the `window`, and a `Trigger` enum value (emit a completed window on each | ||
* incoming `Event` or on each completed window). | ||
@@ -222,3 +286,3 @@ * | ||
* | ||
* @example | ||
* Example: | ||
* | ||
@@ -232,27 +296,43 @@ * ``` | ||
* .aggregate({...}) | ||
* .output(event => { | ||
* .output(e => { | ||
* ... | ||
* }); | ||
*/ | ||
groupByWindow(options: WindowingOptions): KeyedCollectionStream<T, U>; | ||
groupByWindow(options: WindowingOptions): KeyedCollectionStream<IN, S>; | ||
} | ||
/** | ||
* A `KeyedCollectionStream` is a stream containing tuples mapping a string key | ||
* to a `Collection`. When you window a stream you will get one of these that | ||
* maps a string representing the window to the `Collection` of all `Event`s in | ||
* that window. | ||
* | ||
* Using this class you can `output()` that or `aggregate()` the `Collection`s | ||
* back to `Event`s. | ||
*/ | ||
export declare class KeyedCollectionStream<T extends Key, U extends Key> { | ||
private stream; | ||
constructor(stream: Stream<U>); | ||
export declare class KeyedCollectionStream<IN extends Key, S extends Key> extends StreamInterface<IN, S> { | ||
constructor(stream: Stream<S>, tail: Node<Base, Base>); | ||
/** | ||
* @private | ||
* Add events into the stream | ||
* | ||
* A helper function to create a new `Node` in the graph. The new node will be a | ||
* processor that remaps a stream of `KeyedCollection`s to another stream of | ||
* `KeyedCollection`s. | ||
*/ | ||
addEvent(e: Event<U>): void; | ||
addKeyedCollectionToKeyedCollectionNode<OUT extends Key>(node: KeyedCollectionMap<IN, OUT>): KeyedCollectionStream<OUT, S>; | ||
/** | ||
* An output, specified as an `KeyedCollectionCallback`, essentially | ||
* `(collection: Collection<T>,vkey: string) => void`. | ||
* @private | ||
* | ||
* Helper function to create a new `Node` in the graph. The new node will be a | ||
* processor that we remap a stream of `KeyedCollection`s back to `Event`s. An | ||
* example would be an aggregation. | ||
*/ | ||
addKeyedCollectionToEventNode<OUT extends Key>(node: KeyedCollectionToEvent<IN, OUT>): EventStream<OUT, S>; | ||
/** | ||
* An output, specified as an `KeyedCollectionCallback`: | ||
* `(collection: Collection<T>, key: string) => void`. | ||
* | ||
* Using this method you are able to access the stream result. Your callback | ||
* function will be called whenever a new `Collection` is available. | ||
* | ||
* @example | ||
* Example: | ||
* ``` | ||
@@ -266,5 +346,5 @@ * const source = stream<Time>() | ||
*/ | ||
output(callback: KeyedCollectionCallback<T>): KeyedCollectionStream<T, U>; | ||
output(callback: KeyedCollectionCallback<IN>): KeyedCollectionStream<IN, S>; | ||
/** | ||
* Takes an incoming tuple mapping a key (the window name) to a `Collection` | ||
* Takes an incoming tuple mapping a key to a `Collection` | ||
* (containing all `Event`s in the window) and reduces that down | ||
@@ -300,3 +380,3 @@ * to an output `Event<Index>` using an aggregation specification. As | ||
*/ | ||
aggregate(spec: AggregationSpec<T>): EventStream<Index, U>; | ||
aggregate(spec: AggregationSpec<IN>): EventStream<Index, S>; | ||
} | ||
@@ -317,9 +397,18 @@ export declare type EventToKeyedCollection<S extends Key, T extends Key> = Node<Event<S>, KeyedCollection<T>>; | ||
* | ||
* A `Stream` object manages a chain of processing nodes, each type of which | ||
* provides an appropiate interface. When a `Stream` is initially created with | ||
* the `stream()` factory function the interface exposed is an `EventStream`. | ||
* If you perform a windowing operation you will be exposed to a | ||
* `KeyedCollectionStream`. If you aggregate a `KeyedCollectionStream` you | ||
* will be back to an `EventStream` and so on. | ||
* A `Stream` object manages a tree of processing `Node`s, each type of which | ||
* maps either `Event`s to other `Event`s or to and from a `KeyedCollection`. | ||
* When an `Event` is added to the stream it will enter the top processing | ||
* node where it will be processed to produce 0, 1 or many output `Event`s. | ||
* Then then are passed down the tree until an output is reached. | ||
* | ||
* When a `Stream` is initially created with the `stream()` factory function | ||
* the interface exposed is an `EventStream`. If you perform a windowing operation | ||
* you will be exposed to a `KeyedCollectionStream`. If you aggregate a | ||
* `KeyedCollectionStream` you will be back to an `EventStream` and so on. | ||
* | ||
* You can think of the `Stream` as the thing that holds the root of the processing | ||
* node chain, while either an `EventStream` or `KeyedCollectionStream` holds the | ||
* current leaf of the tree (the `tail`) onto which additional operating nodes | ||
* can be added using the `EventStream` or `KeyedCollectionStream` API. | ||
* | ||
* --- | ||
@@ -334,3 +423,3 @@ * Note: | ||
* simplify passing of events to a browser and enabling convenient processing for visualization | ||
* purposes, or for light weight handling of events in Node. | ||
* purposes, or for light weight handling of events in Node.js such as simple event alerting. | ||
* | ||
@@ -372,2 +461,26 @@ * --- | ||
* | ||
* If you need to branch a stream, pass the parent stream into the `stream()` factory | ||
* function as its only arg: | ||
* | ||
* ``` | ||
* const source = stream().map( | ||
* e => event(e.getKey(), Immutable.Map({ a: e.get("a") * 2 })) | ||
* ); | ||
* | ||
* stream(source) | ||
* .map(e => event(e.getKey(), Immutable.Map({ a: e.get("a") * 3 }))) | ||
* .output(e => { | ||
* // | ||
* }); | ||
* | ||
* stream(source) | ||
* .map(e => event(e.getKey(), Immutable.Map({ a: e.get("a") * 4 }))) | ||
* .output(e => { | ||
* // | ||
* }); | ||
* | ||
* source.addEvent(...); | ||
* | ||
* ``` | ||
* | ||
* If you have multiple sources you can feed them into the same stream and combine them | ||
@@ -382,3 +495,3 @@ * with the `coalese()` processor. In this example two event sources are fed into the | ||
* .coalesce({ fields: ["in", "out"] }) | ||
* .output((e: Event) => results.push(e)); | ||
* .output(e => results.push(e)); | ||
* | ||
@@ -409,3 +522,3 @@ * // Stream events | ||
* }) | ||
* .output((e: Event) => console.log("Running total:", e.toString()) ); | ||
* .output(e => console.log("Running total:", e.toString()) ); | ||
* | ||
@@ -429,3 +542,3 @@ * // Add Events into the source... | ||
* }) | ||
* .output((e: Event) => console.log("Rolling average:", e.toString()) ); | ||
* .output(e => console.log("Rolling average:", e.toString()) ); | ||
* | ||
@@ -437,30 +550,24 @@ * // Add Events into the source... | ||
export declare class Stream<U extends Key = Time> { | ||
private head; | ||
private tail; | ||
/** | ||
* @private | ||
* The root of the entire event processing tree. All incoming `Event`s are | ||
* provided to this `Node`. | ||
*/ | ||
addEventMappingNode<S extends Key, T extends Key>(node: EventMap<S, T>): EventStream<T, U>; | ||
private root; | ||
constructor(upstream?: Stream<U>); | ||
/** | ||
* @private | ||
* Set a new new root onto the `Stream`. This is used internally. | ||
*/ | ||
addEventToCollectorNode<S extends Key, T extends Key>(node: EventToKeyedCollection<S, T>): KeyedCollectionStream<T, U>; | ||
setRoot(node: Node<Base, Base>): void; | ||
/** | ||
* @private | ||
* Returns the `root` node of the entire processing tree. This is used internally. | ||
*/ | ||
addCollectorMappingNode<S extends Key, T extends Key>(node: KeyedCollectionMap<S, T>): KeyedCollectionStream<T, U>; | ||
getRoot(): Node<Base, Base>; | ||
/** | ||
* @private | ||
* Add an `Event` into the root node of the stream | ||
*/ | ||
addCollectionToEventNode<S extends Key, T extends Key>(node: KeyedCollectionToEvent<S, T>): EventStream<T, U>; | ||
/** | ||
* Add an `Event` into the stream | ||
*/ | ||
addEvent<T extends Key>(e: Event<U>): void; | ||
/** | ||
* @private | ||
*/ | ||
protected addNode(node: any): void; | ||
} | ||
declare function streamFactory<T extends Key>(): EventStream<T, T>; | ||
export { streamFactory as stream }; | ||
declare function eventStreamFactory<T extends Key>(): EventStream<T, T>; | ||
export { eventStreamFactory as stream }; |
@@ -15,260 +15,76 @@ "use strict"; | ||
const event_1 = require("./event"); | ||
const index_1 = require("./index"); | ||
const align_1 = require("./align"); | ||
const collapse_1 = require("./collapse"); | ||
const fill_1 = require("./fill"); | ||
const rate_1 = require("./rate"); | ||
const reduce_1 = require("./reduce"); | ||
const select_1 = require("./select"); | ||
const windowedcollection_1 = require("./windowedcollection"); | ||
const node_1 = require("./node"); | ||
/** | ||
* @private | ||
* | ||
*/ | ||
// tslint:disable-next-line:max-classes-per-file | ||
class Node { | ||
constructor() { | ||
// Members | ||
this.observers = Immutable.List(); | ||
} | ||
addObserver(node) { | ||
this.observers = this.observers.push(node); | ||
} | ||
set(input) { | ||
const outputs = this.process(input); | ||
if (outputs) { | ||
outputs.forEach(output => this.notify(output)); | ||
} | ||
} | ||
notify(output) { | ||
if (this.observers.size > 0) { | ||
this.observers.forEach(node => { | ||
node.set(output); | ||
}); | ||
} | ||
} | ||
} | ||
exports.Node = Node; | ||
// | ||
// Nodes | ||
// | ||
/** | ||
* @private | ||
* A `StreamInterface` is the base class for the family of facards placed in front of | ||
* the underlying `Stream` to provide the appropiate API layer depending on what type | ||
* of data is being passed through the pipeline at any given stage. | ||
* | ||
*/ | ||
// tslint:disable-next-line:max-classes-per-file | ||
class EventInputNode extends Node { | ||
constructor() { | ||
super(); | ||
// pass | ||
} | ||
process(e) { | ||
return Immutable.List([e]); | ||
} | ||
} | ||
/** | ||
* @private | ||
* At this base class level, it holds onto a reference to the underlying `Stream` | ||
* object (which contains the root of the `Node` tree into which `Event`s are | ||
* inserted). It also contains the ability to `addEvent()` method to achieve this (from | ||
* the user's point of view) and `addNode()` which gives allows additions to the | ||
* tree. | ||
* | ||
* Note that while the tree is held onto by its root node within the `Stream` object, | ||
* the current addition point, the `tail` is held by each `StreamInterface`. When a | ||
* `Node` is appended to the `tail` an entirely new interface is returned (its type | ||
* dependent on the output type of the `Node` appended), and that interface will contain | ||
* the new tail point on the tree, while the old one is unchanged. This allows for | ||
* branching of the tree. | ||
*/ | ||
// tslint:disable-next-line:max-classes-per-file | ||
class EventOutputNode extends Node { | ||
constructor(callback) { | ||
super(); | ||
this.callback = callback; | ||
// pass | ||
class StreamInterface { | ||
// tslint:disable-line:max-classes-per-file | ||
constructor(stream, tail) { | ||
this.stream = stream; | ||
this.tail = tail; | ||
} | ||
process(e) { | ||
this.callback(e); | ||
return Immutable.List(); | ||
/** | ||
* Returns the underlying `Stream` object, which primarily contains the | ||
* `root` of the processing graph. | ||
*/ | ||
getStream() { | ||
return this.stream; | ||
} | ||
} | ||
/** | ||
* @private | ||
* | ||
*/ | ||
// tslint:disable-next-line:max-classes-per-file | ||
class KeyedCollectionOutputNode extends Node { | ||
constructor(callback) { | ||
super(); | ||
this.callback = callback; | ||
// pass | ||
/** | ||
* Add events into the stream | ||
*/ | ||
addEvent(e) { | ||
this.stream.addEvent(e); | ||
} | ||
process(keyedCollection) { | ||
const [key, collection] = keyedCollection; | ||
this.callback(collection, key); | ||
return Immutable.List(); | ||
/** | ||
* @protected | ||
*/ | ||
addNode(node) { | ||
if (!this.stream.getRoot()) { | ||
this.stream.setRoot(node); | ||
} | ||
if (this.tail) { | ||
this.tail.addObserver(node); | ||
} | ||
} | ||
} | ||
exports.StreamInterface = StreamInterface; | ||
/** | ||
* @private | ||
* | ||
*/ | ||
// tslint:disable-next-line:max-classes-per-file | ||
class MapNode extends Node { | ||
constructor(mapper) { | ||
super(); | ||
this.mapper = mapper; | ||
} | ||
process(e) { | ||
return Immutable.List([this.mapper(e)]); | ||
} | ||
} | ||
/** | ||
* @private | ||
* | ||
*/ | ||
// tslint:disable-next-line:max-classes-per-file | ||
class FlatMapNode extends Node { | ||
constructor(mapper) { | ||
super(); | ||
this.mapper = mapper; | ||
} | ||
process(e) { | ||
return this.mapper(e); | ||
} | ||
} | ||
/** | ||
* @private | ||
* | ||
*/ | ||
// tslint:disable-next-line:max-classes-per-file | ||
class FillNode extends Node { | ||
constructor(options) { | ||
super(); | ||
this.processor = new fill_1.Fill(options); | ||
} | ||
process(e) { | ||
return this.processor.addEvent(e); | ||
} | ||
} | ||
/** | ||
* @private | ||
* | ||
*/ | ||
// tslint:disable-next-line:max-classes-per-file | ||
class AlignNode extends Node { | ||
constructor(options) { | ||
super(); | ||
this.processor = new align_1.Align(options); | ||
} | ||
process(e) { | ||
return this.processor.addEvent(e); | ||
} | ||
} | ||
/** | ||
* @private | ||
* | ||
*/ | ||
// tslint:disable-next-line:max-classes-per-file | ||
class SelectNode extends Node { | ||
constructor(options) { | ||
super(); | ||
this.processor = new select_1.Select(options); | ||
} | ||
process(e) { | ||
return this.processor.addEvent(e); | ||
} | ||
} | ||
/** | ||
* @private | ||
* | ||
*/ | ||
// tslint:disable-next-line:max-classes-per-file | ||
class CollapseNode extends Node { | ||
constructor(options) { | ||
super(); | ||
this.processor = new collapse_1.Collapse(options); | ||
} | ||
process(e) { | ||
return this.processor.addEvent(e); | ||
} | ||
} | ||
/** | ||
* @private | ||
* | ||
*/ | ||
// tslint:disable-next-line:max-classes-per-file | ||
class RateNode extends Node { | ||
constructor(options) { | ||
super(); | ||
this.processor = new rate_1.Rate(options); | ||
} | ||
process(e) { | ||
return this.processor.addEvent(e); | ||
} | ||
} | ||
/** | ||
* @private | ||
* | ||
*/ | ||
// tslint:disable-next-line:max-classes-per-file | ||
class ReduceNode extends Node { | ||
constructor(options) { | ||
super(); | ||
this.processor = new reduce_1.Reducer(options); | ||
} | ||
process(e) { | ||
return this.processor.addEvent(e); | ||
} | ||
} | ||
/** | ||
* @private | ||
* | ||
*/ | ||
// tslint:disable-next-line:max-classes-per-file | ||
class WindowOutputNode extends Node { | ||
constructor(options) { | ||
super(); | ||
this.processor = new windowedcollection_1.WindowedCollection(options); | ||
} | ||
process(e) { | ||
const keyedCollections = this.processor.addEvent(e); | ||
return keyedCollections; | ||
} | ||
} | ||
/** | ||
* @private | ||
* | ||
*/ | ||
// tslint:disable-next-line:max-classes-per-file | ||
class AggregationNode extends Node { | ||
constructor(aggregationSpec) { | ||
super(); | ||
this.aggregationSpec = aggregationSpec; | ||
} | ||
process(keyedCollection) { | ||
const [group, collection] = keyedCollection; | ||
const d = {}; | ||
const [groupKey, windowKey] = group.split("::").length === 2 ? group.split("::") : [null, group]; | ||
_.forEach(this.aggregationSpec, (src, dest) => { | ||
const [srcField, reducer] = src; | ||
d[dest] = collection.aggregate(reducer, srcField); | ||
}); | ||
const indexedEvent = new event_1.Event(index_1.index(windowKey), Immutable.fromJS(d)); | ||
return Immutable.List([indexedEvent]); | ||
} | ||
} | ||
// | ||
// Stream interfaces | ||
// | ||
/** | ||
* An `EventStream` is the interface to the stream provided for manipulation of | ||
* parts of the streaming pipeline that map a stream of Events of type <T>. | ||
* parts of the streaming pipeline that map a stream of `Event`s of type <IN>. | ||
* | ||
* For example a stream of Events<Time> can be mapped to an output stream of | ||
* new Events<Time> that are aligned to a fixed period boundary. Less or more Events | ||
* For example a stream of `Event<Time>`s can be mapped to an output stream of | ||
* new `Event<Time>`s that are aligned to a fixed period boundary. Less or more `Event`s | ||
* may result. | ||
* | ||
* The type parameter `<U>` is the input `Event` type at the top of the stream, since each | ||
* interface exposes the `addEvent(Event<U>)` method for inserting events at the top of | ||
* the stream. | ||
* The type parameter `<S>` is the input `Event` type at the top of the stream, since each | ||
* interface exposes the `addEvent(e: Event<S>)` method for inserting events at the top of | ||
* the stream this type is maintained across all stream interfaces. | ||
* | ||
* The type parameter `<T>` is the type of `Event`s in this part of the stream. That is | ||
* nodes created by the API at this point of the stream will expect Events of type T, | ||
* and will output new Events, potentially of a different type. | ||
* The type parameter `<IN>` is the type of `Event`s in this part of the stream. That is | ||
* nodes created by the API at this point of the tree will expect `Event<IN>`s, | ||
* and will output new Events, potentially of a different type (identified as `<OUT>`). | ||
* Typically `<IN>` and `<OUT>` would be `Time`, `TimeRange` or `Index`. | ||
*/ | ||
class EventStream { | ||
class EventStream extends StreamInterface { | ||
// tslint:disable-line:max-classes-per-file | ||
constructor(stream) { | ||
this.stream = stream; | ||
constructor(stream, tail) { | ||
super(stream, tail); | ||
} | ||
@@ -278,25 +94,85 @@ /** | ||
* | ||
* Add events into the stream | ||
* Adds a new `Node` which converts a stream of `Event<IN>` to `Event<OUT>` | ||
* | ||
* <IN> is the source type for the processing node | ||
* <OUT> is the output Event type for the processing node | ||
* | ||
* Both IN and OUT extend Key, which is `Time`, `TimeRange` or `Index`, typically. | ||
*/ | ||
addEvent(e) { | ||
this.stream.addEvent(e); | ||
addEventToEventNode(node) { | ||
this.addNode(node); | ||
return new EventStream(this.getStream(), node); | ||
} | ||
/** | ||
* @private | ||
* | ||
* Adds a new `Node` which converts a stream of `Event<IN>`s to a `KeyedCollection<OUT>`. | ||
* | ||
* <IN> is the source type for the processing node | ||
* <OUT> is the output Event type for the processing node | ||
* | ||
* Both IN and OUT extend Key, which is Time, TimeRange or Index, typically. | ||
*/ | ||
addEventToCollectorNode(node) { | ||
this.addNode(node); | ||
return new KeyedCollectionStream(this.getStream(), node); | ||
} | ||
// | ||
// Public API to a stream carrying `Event`s | ||
// | ||
/** | ||
* Remaps each Event<T> in the stream to a new Event<M>. | ||
*/ | ||
map(mapper) { | ||
return this.stream.addEventMappingNode(new MapNode(mapper)); | ||
return this.addEventToEventNode(new node_1.MapNode(mapper)); | ||
} | ||
/** | ||
* Remaps each Event<T> in the stream to 0, 1 or many Event<M>s. | ||
* Remaps each Event<IN> in the stream to 0, 1 or many Event<OUT>s. | ||
*/ | ||
flatMap(mapper) { | ||
return this.stream.addEventMappingNode(new FlatMapNode(mapper)); | ||
return this.addEventToEventNode(new node_1.FlatMapNode(mapper)); | ||
} | ||
/** | ||
* Reduces a sequence of past Event<T>s in the stream to a single output Event<M>. | ||
* Reduces a sequence of past Event<IN>s in the stream to a single output Event<M>. | ||
*/ | ||
reduce(options) { | ||
return this.stream.addEventMappingNode(new ReduceNode(options)); | ||
return this.addEventToEventNode(new node_1.ReduceNode(options)); | ||
} | ||
/** | ||
* Filter out `Event<IN>`s in the stream. Provide a predicate function that | ||
* given an Event returns true or false. | ||
* | ||
* Example: | ||
* ``` | ||
* const source = stream<Time>() | ||
* .filter(e => e.get("a") % 2 !== 0) | ||
* .output(evt => { | ||
* // -> 1, 3, 5 | ||
* }); | ||
* | ||
* source.addEvent(...); // <- 1, 2, 3, 4, 5 | ||
* ``` | ||
*/ | ||
filter(predicate) { | ||
return this.addEventToEventNode(new node_1.FilterNode(predicate)); | ||
} | ||
/** | ||
* If you have multiple sources you can feed them into the same stream and combine them | ||
* with the `coalese()` processor. In this example two event sources are fed into the | ||
* `Stream`. One contains `Event`s with just a field "in", and the other just a field | ||
* "out". The resulting output is `Event`s with the latest (in arrival time) value for | ||
* "in" and "out" together: | ||
* | ||
* ```typescript | ||
* const source = stream() | ||
* .coalesce({ fields: ["in", "out"] }) | ||
* .output((e: Event) => results.push(e)); | ||
* | ||
* // Stream events | ||
* for (let i = 0; i < 5; i++) { | ||
* source.addEvent(streamIn[i]); // from stream 1 | ||
* source.addEvent(streamOut[i]); // from stream 2 | ||
* } | ||
* ``` | ||
*/ | ||
coalesce(options) { | ||
@@ -310,3 +186,3 @@ const { fields } = options; | ||
} | ||
return this.stream.addEventMappingNode(new ReduceNode({ | ||
return this.addEventToEventNode(new node_1.ReduceNode({ | ||
count: 1, | ||
@@ -338,3 +214,3 @@ iteratee(accum, eventList) { | ||
* | ||
* @example | ||
* Example: | ||
* ``` | ||
@@ -350,12 +226,12 @@ * const source = stream() | ||
fill(options) { | ||
return this.stream.addEventMappingNode(new FillNode(options)); | ||
return this.addEventToEventNode(new node_1.FillNode(options)); | ||
} | ||
/** | ||
* Align Events in the stream to a specific boundary at a fixed period. | ||
* Align `Event`s in the stream to a specific boundary at a fixed `period`. | ||
* Options are a `AlignmentOptions` object where you specify which field to | ||
* align with `fieldSpec`, what boundary period to use with `window` and | ||
* align with `fieldSpec`, what boundary `period` to use with `window` and | ||
* the method of alignment with `method` (which can be either `Linear` | ||
* interpolation, or `Hold`). | ||
* | ||
* @example | ||
* Example: | ||
* ``` | ||
@@ -371,7 +247,7 @@ * const s = stream() | ||
align(options) { | ||
return this.stream.addEventMappingNode(new AlignNode(options)); | ||
return this.addEventToEventNode(new node_1.AlignNode(options)); | ||
} | ||
/** | ||
* Convert incoming Events in the stream to rates (essentially taking | ||
* the derivative over time). The resulting output Events will be | ||
* Convert incoming `Event`s in the stream to rates (essentially taking | ||
* the derivative over time). The resulting output `Event`s will be | ||
* of type `Event<TimeRange>`, where the `TimeRange` key will be | ||
@@ -388,3 +264,3 @@ * the time span over which the rate was calculated. If you want you | ||
* | ||
* @example | ||
* Example: | ||
* | ||
@@ -398,9 +274,9 @@ * ``` | ||
rate(options) { | ||
return this.stream.addEventMappingNode(new RateNode(options)); | ||
return this.addEventToEventNode(new node_1.RateNode(options)); | ||
} | ||
/** | ||
* Convert incoming events to new events with on the specified | ||
* Convert incoming `Event`s to new `Event`s with on the specified | ||
* fields selected out of the source. | ||
* | ||
* @example | ||
* Example: | ||
* | ||
@@ -416,9 +292,9 @@ * Events with fields a, b, c can be mapped to events with only | ||
select(options) { | ||
return this.stream.addEventMappingNode(new SelectNode(options)); | ||
return this.addEventToEventNode(new node_1.SelectNode(options)); | ||
} | ||
/** | ||
* Convert incoming events to new events with specified | ||
* Convert incoming `Event`s to new `Event`s with specified | ||
* fields collapsed into a new field using an aggregation function. | ||
* | ||
* @example | ||
* Example: | ||
* | ||
@@ -439,6 +315,7 @@ * Events with fields a, b, c can be mapped to events with only a field | ||
collapse(options) { | ||
return this.stream.addEventMappingNode(new CollapseNode(options)); | ||
return this.addEventToEventNode(new node_1.CollapseNode(options)); | ||
} | ||
/** | ||
* An output, specified as an `EventCallback`, essentially `(event: Event<Key>) => void`. | ||
* | ||
* Using this method you are able to access the stream result. Your callback | ||
@@ -449,3 +326,3 @@ * function will be called whenever a new Event is available. Not that currently the | ||
* | ||
* @example | ||
* Example: | ||
* ``` | ||
@@ -462,3 +339,3 @@ * const source = stream<Time>() | ||
output(callback) { | ||
return this.stream.addEventMappingNode(new EventOutputNode(callback)); | ||
return this.addEventToEventNode(new node_1.EventOutputNode(callback)); | ||
} | ||
@@ -468,7 +345,7 @@ /** | ||
* a stream of events, you can also group by a window. This is what allows you to do | ||
* rollups with the streaming code. | ||
* rollups within the streaming code. | ||
* | ||
* A window is defined with the `WindowingOptions`, which allows you to specify | ||
* the window period as a `Period` (e.g. `period("30m")` for each 30 minutes window) | ||
* as the `window` and a `Trigger` enum value (emit a completed window on each | ||
* as the `window`, and a `Trigger` enum value (emit a completed window on each | ||
* incoming `Event` or on each completed window). | ||
@@ -484,3 +361,3 @@ * | ||
* | ||
* @example | ||
* Example: | ||
* | ||
@@ -494,3 +371,3 @@ * ``` | ||
* .aggregate({...}) | ||
* .output(event => { | ||
* .output(e => { | ||
* ... | ||
@@ -500,3 +377,3 @@ * }); | ||
groupByWindow(options) { | ||
return this.stream.addEventToCollectorNode(new WindowOutputNode(options)); | ||
return this.addEventToCollectorNode(new node_1.WindowOutputNode(options)); | ||
} | ||
@@ -506,24 +383,46 @@ } | ||
/** | ||
* A `KeyedCollectionStream` is a stream containing tuples mapping a string key | ||
* to a `Collection`. When you window a stream you will get one of these that | ||
* maps a string representing the window to the `Collection` of all `Event`s in | ||
* that window. | ||
* | ||
* Using this class you can `output()` that or `aggregate()` the `Collection`s | ||
* back to `Event`s. | ||
*/ | ||
// tslint:disable-next-line:max-classes-per-file | ||
class KeyedCollectionStream { | ||
constructor(stream) { | ||
this.stream = stream; | ||
class KeyedCollectionStream extends StreamInterface { | ||
// tslint:disable-line:max-classes-per-file | ||
constructor(stream, tail) { | ||
super(stream, tail); | ||
} | ||
/** | ||
* @private | ||
* Add events into the stream | ||
* | ||
* A helper function to create a new `Node` in the graph. The new node will be a | ||
* processor that remaps a stream of `KeyedCollection`s to another stream of | ||
* `KeyedCollection`s. | ||
*/ | ||
addEvent(e) { | ||
this.stream.addEvent(e); | ||
addKeyedCollectionToKeyedCollectionNode(node) { | ||
this.addNode(node); | ||
return new KeyedCollectionStream(this.getStream(), node); | ||
} | ||
/** | ||
* An output, specified as an `KeyedCollectionCallback`, essentially | ||
* `(collection: Collection<T>,vkey: string) => void`. | ||
* @private | ||
* | ||
* Helper function to create a new `Node` in the graph. The new node will be a | ||
* processor that we remap a stream of `KeyedCollection`s back to `Event`s. An | ||
* example would be an aggregation. | ||
*/ | ||
addKeyedCollectionToEventNode(node) { | ||
this.addNode(node); | ||
return new EventStream(this.getStream(), node); | ||
} | ||
/** | ||
* An output, specified as an `KeyedCollectionCallback`: | ||
* `(collection: Collection<T>, key: string) => void`. | ||
* | ||
* Using this method you are able to access the stream result. Your callback | ||
* function will be called whenever a new `Collection` is available. | ||
* | ||
* @example | ||
* Example: | ||
* ``` | ||
@@ -538,6 +437,6 @@ * const source = stream<Time>() | ||
output(callback) { | ||
return this.stream.addCollectorMappingNode(new KeyedCollectionOutputNode(callback)); | ||
return this.addKeyedCollectionToKeyedCollectionNode(new node_1.KeyedCollectionOutputNode(callback)); | ||
} | ||
/** | ||
* Takes an incoming tuple mapping a key (the window name) to a `Collection` | ||
* Takes an incoming tuple mapping a key to a `Collection` | ||
* (containing all `Event`s in the window) and reduces that down | ||
@@ -574,3 +473,3 @@ * to an output `Event<Index>` using an aggregation specification. As | ||
aggregate(spec) { | ||
return this.stream.addCollectionToEventNode(new AggregationNode(spec)); | ||
return this.addKeyedCollectionToEventNode(new node_1.AggregationNode(spec)); | ||
} | ||
@@ -592,9 +491,18 @@ } | ||
* | ||
* A `Stream` object manages a chain of processing nodes, each type of which | ||
* provides an appropiate interface. When a `Stream` is initially created with | ||
* the `stream()` factory function the interface exposed is an `EventStream`. | ||
* If you perform a windowing operation you will be exposed to a | ||
* `KeyedCollectionStream`. If you aggregate a `KeyedCollectionStream` you | ||
* will be back to an `EventStream` and so on. | ||
* A `Stream` object manages a tree of processing `Node`s, each type of which | ||
* maps either `Event`s to other `Event`s or to and from a `KeyedCollection`. | ||
* When an `Event` is added to the stream it will enter the top processing | ||
* node where it will be processed to produce 0, 1 or many output `Event`s. | ||
* Then then are passed down the tree until an output is reached. | ||
* | ||
* When a `Stream` is initially created with the `stream()` factory function | ||
* the interface exposed is an `EventStream`. If you perform a windowing operation | ||
* you will be exposed to a `KeyedCollectionStream`. If you aggregate a | ||
* `KeyedCollectionStream` you will be back to an `EventStream` and so on. | ||
* | ||
* You can think of the `Stream` as the thing that holds the root of the processing | ||
* node chain, while either an `EventStream` or `KeyedCollectionStream` holds the | ||
* current leaf of the tree (the `tail`) onto which additional operating nodes | ||
* can be added using the `EventStream` or `KeyedCollectionStream` API. | ||
* | ||
* --- | ||
@@ -609,3 +517,3 @@ * Note: | ||
* simplify passing of events to a browser and enabling convenient processing for visualization | ||
* purposes, or for light weight handling of events in Node. | ||
* purposes, or for light weight handling of events in Node.js such as simple event alerting. | ||
* | ||
@@ -647,2 +555,26 @@ * --- | ||
* | ||
* If you need to branch a stream, pass the parent stream into the `stream()` factory | ||
* function as its only arg: | ||
* | ||
* ``` | ||
* const source = stream().map( | ||
* e => event(e.getKey(), Immutable.Map({ a: e.get("a") * 2 })) | ||
* ); | ||
* | ||
* stream(source) | ||
* .map(e => event(e.getKey(), Immutable.Map({ a: e.get("a") * 3 }))) | ||
* .output(e => { | ||
* // | ||
* }); | ||
* | ||
* stream(source) | ||
* .map(e => event(e.getKey(), Immutable.Map({ a: e.get("a") * 4 }))) | ||
* .output(e => { | ||
* // | ||
* }); | ||
* | ||
* source.addEvent(...); | ||
* | ||
* ``` | ||
* | ||
* If you have multiple sources you can feed them into the same stream and combine them | ||
@@ -657,3 +589,3 @@ * with the `coalese()` processor. In this example two event sources are fed into the | ||
* .coalesce({ fields: ["in", "out"] }) | ||
* .output((e: Event) => results.push(e)); | ||
* .output(e => results.push(e)); | ||
* | ||
@@ -684,3 +616,3 @@ * // Stream events | ||
* }) | ||
* .output((e: Event) => console.log("Running total:", e.toString()) ); | ||
* .output(e => console.log("Running total:", e.toString()) ); | ||
* | ||
@@ -704,3 +636,3 @@ * // Add Events into the source... | ||
* }) | ||
* .output((e: Event) => console.log("Rolling average:", e.toString()) ); | ||
* .output(e => console.log("Rolling average:", e.toString()) ); | ||
* | ||
@@ -713,57 +645,40 @@ * // Add Events into the source... | ||
class Stream { | ||
/** | ||
* @private | ||
*/ | ||
addEventMappingNode(node) { | ||
this.addNode(node); | ||
return new EventStream(this); | ||
constructor(upstream) { | ||
this.root = null; | ||
} | ||
/** | ||
* @private | ||
* Set a new new root onto the `Stream`. This is used internally. | ||
*/ | ||
addEventToCollectorNode(node) { | ||
this.addNode(node); | ||
return new KeyedCollectionStream(this); | ||
setRoot(node) { | ||
this.root = node; | ||
} | ||
/** | ||
* @private | ||
* Returns the `root` node of the entire processing tree. This is used internally. | ||
*/ | ||
addCollectorMappingNode(node) { | ||
this.addNode(node); | ||
return new KeyedCollectionStream(this); | ||
getRoot() { | ||
return this.root; | ||
} | ||
/** | ||
* @private | ||
* Add an `Event` into the root node of the stream | ||
*/ | ||
addCollectionToEventNode(node) { | ||
this.addNode(node); | ||
return new EventStream(this); | ||
} | ||
/** | ||
* Add an `Event` into the stream | ||
*/ | ||
addEvent(e) { | ||
if (this.head) { | ||
this.head.set(e); | ||
if (this.root) { | ||
this.root.set(e); | ||
} | ||
} | ||
/** | ||
* @private | ||
*/ | ||
addNode(node) { | ||
if (!this.head) { | ||
this.head = node; | ||
} | ||
if (this.tail) { | ||
this.tail.addObserver(node); | ||
} | ||
this.tail = node; | ||
} | ||
} | ||
exports.Stream = Stream; | ||
function streamFactory() { | ||
/* | ||
* `Stream` and its associated objects are designed for processing of incoming | ||
* `Event` streams at real time. This is useful for live dashboard situations or | ||
* possibly real time monitoring and alerting from event streams. | ||
*/ | ||
function eventStreamFactory() { | ||
const s = new Stream(); | ||
return s.addEventMappingNode(new EventInputNode()); | ||
const n = new node_1.EventInputNode(); | ||
return new EventStream(s, n); | ||
} | ||
exports.stream = streamFactory; | ||
//# sourceMappingURL=data:application/json;base64, | ||
exports.stream = eventStreamFactory; | ||
//# sourceMappingURL=data:application/json;base64,eyJ2ZXJzaW9uIjozLCJmaWxlIjoic3RyZWFtLmpzIiwic291cmNlUm9vdCI6IiIsInNvdXJjZXMiOlsiLi4vc3JjL3N0cmVhbS50cyJdLCJuYW1lcyI6W10sIm1hcHBpbmdzIjoiO0FBQUE7Ozs7Ozs7O0dBUUc7O0FBRUgsdUNBQXVDO0FBQ3ZDLDRCQUE0QjtBQUk1QixtQ0FBdUM7QUFhdkMsaUNBZ0JnQjtBQW1CaEI7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7R0FtQkc7QUFDSDtJQUNJLDJDQUEyQztJQUUzQyxZQUFzQixNQUFpQixFQUFZLElBQXNCO1FBQW5ELFdBQU0sR0FBTixNQUFNLENBQVc7UUFBWSxTQUFJLEdBQUosSUFBSSxDQUFrQjtJQUFHLENBQUM7SUFFN0U7OztPQUdHO0lBQ0gsU0FBUztRQUNMLE1BQU0sQ0FBQyxJQUFJLENBQUMsTUFBTSxDQUFDO0lBQ3ZCLENBQUM7SUFFRDs7T0FFRztJQUNILFFBQVEsQ0FBQyxDQUFXO1FBQ2hCLElBQUksQ0FBQyxNQUFNLENBQUMsUUFBUSxDQUFDLENBQUMsQ0FBQyxDQUFDO0lBQzVCLENBQUM7SUFFRDs7T0FFRztJQUNILE9BQU8sQ0FBQyxJQUFJO1FBQ1IsRUFBRSxDQUFDLENBQUMsQ0FBQyxJQUFJLENBQUMsTUFBTSxDQUFDLE9BQU8sRUFBRSxDQUFDLENBQUMsQ0FBQztZQUN6QixJQUFJLENBQUMsTUFBTSxDQUFDLE9BQU8sQ0FBQyxJQUFJLENBQUMsQ0FBQztRQUM5QixDQUFDO1FBQ0QsRUFBRSxDQUFDLENBQUMsSUFBSSxDQUFDLElBQUksQ0FBQyxDQUFDLENBQUM7WUFDWixJQUFJLENBQUMsSUFBSSxDQUFDLFdBQVcsQ0FBQyxJQUFJLENBQUMsQ0FBQztRQUNoQyxDQUFDO0lBQ0wsQ0FBQztDQUNKO0FBL0JELDBDQStCQztBQUVEOzs7Ozs7Ozs7Ozs7Ozs7O0dBZ0JHO0FBQ0gsaUJBQXdELFNBQVEsZUFBc0I7SUFDbEYsMkNBQTJDO0lBQzNDLFlBQVksTUFBaUIsRUFBRSxJQUFzQjtRQUNqRCxLQUFLLENBQUMsTUFBTSxFQUFFLElBQUksQ0FBQyxDQUFDO0lBQ3hCLENBQUM7SUFFRDs7Ozs7Ozs7O09BU0c7SUFDSCxtQkFBbUIsQ0FBa0IsSUFBdUI7UUFDeEQsSUFBSSxDQUFDLE9BQU8sQ0FBQyxJQUFJLENBQUMsQ0FBQztRQUNuQixNQUFNLENBQUMsSUFBSSxXQUFXLENBQVMsSUFBSSxDQUFDLFNBQVMsRUFBRSxFQUFFLElBQUksQ0FBQyxDQUFDO0lBQzNELENBQUM7SUFFRDs7Ozs7Ozs7O09BU0c7SUFDSCx1QkFBdUIsQ0FBa0IsSUFBcUM7UUFDMUUsSUFBSSxDQUFDLE9BQU8sQ0FBQyxJQUFJLENBQUMsQ0FBQztRQUNuQixNQUFNLENBQUMsSUFBSSxxQkFBcUIsQ0FBUyxJQUFJLENBQUMsU0FBUyxFQUFFLEVBQUUsSUFBSSxDQUFDLENBQUM7SUFDckUsQ0FBQztJQUVELEVBQUU7SUFDRiwyQ0FBMkM7SUFDM0MsRUFBRTtJQUVGOztPQUVHO0lBQ0gsR0FBRyxDQUFrQixNQUF3QztRQUN6RCxNQUFNLENBQUMsSUFBSSxDQUFDLG1CQUFtQixDQUFDLElBQUksY0FBTyxDQUFVLE1BQU0sQ0FBQyxDQUFDLENBQUM7SUFDbEUsQ0FBQztJQUVEOztPQUVHO0lBQ0gsT0FBTyxDQUFrQixNQUF3RDtRQUM3RSxNQUFNLENBQUMsSUFBSSxDQUFDLG1CQUFtQixDQUFDLElBQUksa0JBQVcsQ0FBVSxNQUFNLENBQUMsQ0FBQyxDQUFDO0lBQ3RFLENBQUM7SUFFRDs7T0FFRztJQUNILE1BQU0sQ0FBQyxPQUEwQjtRQUM3QixNQUFNLENBQUMsSUFBSSxDQUFDLG1CQUFtQixDQUFDLElBQUksaUJBQVUsQ0FBSyxPQUFPLENBQUMsQ0FBQyxDQUFDO0lBQ2pFLENBQUM7SUFFRDs7Ozs7Ozs7Ozs7Ozs7T0FjRztJQUNILE1BQU0sQ0FBZ0IsU0FBd0M7UUFDMUQsTUFBTSxDQUFDLElBQUksQ0FBQyxtQkFBbUIsQ0FBQyxJQUFJLGlCQUFVLENBQUssU0FBUyxDQUFDLENBQUMsQ0FBQztJQUNuRSxDQUFDO0lBRUQ7Ozs7Ozs7Ozs7Ozs7Ozs7OztPQWtCRztJQUNILFFBQVEsQ0FBQyxPQUF3QjtRQUM3QixNQUFNLEVBQUUsTUFBTSxFQUFFLEdBQUcsT0FBTyxDQUFDO1FBQzNCLGVBQWUsR0FBRyxJQUFJO1lBQ2xCLE1BQU0sTUFBTSxHQUFHLFNBQVMsQ0FBQyxHQUFHLENBQUMsR0FBRyxJQUFJLENBQUMsQ0FBQztZQUN0QyxNQUFNLENBQUMsQ0FBQyxDQUFDLEVBQUUsQ0FBQyxFQUFFLEVBQUU7Z0JBQ1osTUFBTSxDQUFDLE1BQU0sQ0FBQyxHQUFHLENBQUMsQ0FBQyxDQUFDLENBQUM7WUFDekIsQ0FBQyxDQUFDO1FBQ04sQ0FBQztRQUNELE1BQU0sQ0FBQyxJQUFJLENBQUMsbUJBQW1CLENBQzNCLElBQUksaUJBQVUsQ0FBSztZQUNmLEtBQUssRUFBRSxDQUFDO1lBQ1IsUUFBUSxDQUFDLEtBQUssRUFBRSxTQUFTO2dCQUNyQixNQUFNLFlBQVksR0FBRyxTQUFTLENBQUMsR0FBRyxDQUFDLENBQUMsQ0FBQyxDQUFDO2dCQUN0QyxNQUFNLFVBQVUsR0FBRyxZQUFZLENBQUMsTUFBTSxFQUFFLENBQUM7Z0JBQ3pDLE1BQU0sZ0JBQWdCLEdBQUcsQ0FBQyxDQUFDLENBQUMsTUFBTSxDQUFDLEtBQUssQ0FBQztvQkFDckMsQ0FBQyxDQUFDLEtBQUs7b0JBQ1AsQ0FBQyxDQUFDLGFBQUssQ0FBQyxVQUFVLEVBQUUsU0FBUyxDQUFDLEdBQUcsQ0FBQyxFQUFFLENBQUMsQ0FBQyxDQUFDO2dCQUMzQyxNQUFNLFlBQVksR0FBRyxZQUFZLENBQUMsT0FBTyxFQUFFLENBQUMsTUFBTSxDQUFDLEtBQUssQ0FBQyxNQUFNLENBQUMsQ0FBQyxDQUFDO2dCQUNsRSxNQUFNLENBQUMsYUFBSyxDQUFDLFVBQVUsRUFBRSxnQkFBZ0IsQ0FBQyxPQUFPLEVBQUUsQ0FBQyxLQUFLLENBQUMsWUFBWSxDQUFDLENBQUMsQ0FBQztZQUM3RSxDQUFDO1NBQ0osQ0FBQyxDQUNMLENBQUM7SUFDTixDQUFDO0lBRUQ7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7O09BdUJHO0lBQ0gsSUFBSSxDQUFDLE9BQW9CO1FBQ3JCLE1BQU0sQ0FBQyxJQUFJLENBQUMsbUJBQW1CLENBQUMsSUFBSSxlQUFRLENBQUssT0FBTyxDQUFDLENBQUMsQ0FBQztJQUMvRCxDQUFDO0lBRUQ7Ozs7Ozs7Ozs7Ozs7Ozs7T0FnQkc7SUFDSCxLQUFLLENBQUMsT0FBeUI7UUFDM0IsTUFBTSxDQUFDLElBQUksQ0FBQyxtQkFBbUIsQ0FBQyxJQUFJLGdCQUFTLENBQUssT0FBTyxDQUFDLENBQUMsQ0FBQztJQUNoRSxDQUFDO0lBRUQ7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7OztPQXFCRztJQUNILElBQUksQ0FBQyxPQUFvQjtRQUNyQixNQUFNLENBQUMsSUFBSSxDQUFDLG1CQUFtQixDQUFDLElBQUksZUFBUSxDQUFLLE9BQU8sQ0FBQyxDQUFDLENBQUM7SUFDL0QsQ0FBQztJQUVEOzs7Ozs7Ozs7Ozs7O09BYUc7SUFDSCxNQUFNLENBQUMsT0FBc0I7UUFDekIsTUFBTSxDQUFDLElBQUksQ0FBQyxtQkFBbUIsQ0FBQyxJQUFJLGlCQUFVLENBQUssT0FBTyxDQUFDLENBQUMsQ0FBQztJQUNqRSxDQUFDO0lBRUQ7Ozs7Ozs7Ozs7Ozs7Ozs7OztPQWtCRztJQUNILFFBQVEsQ0FBQyxPQUF3QjtRQUM3QixNQUFNLENBQUMsSUFBSSxDQUFDLG1CQUFtQixDQUFDLElBQUksbUJBQVksQ0FBSyxPQUFPLENBQUMsQ0FBQyxDQUFDO0lBQ25FLENBQUM7SUFFRDs7Ozs7Ozs7Ozs7Ozs7Ozs7O09Ba0JHO0lBQ0gsTUFBTSxDQUFDLFFBQTJCO1FBQzlCLE1BQU0sQ0FBQyxJQUFJLENBQUMsbUJBQW1CLENBQUssSUFBSSxzQkFBZSxDQUFLLFFBQVEsQ0FBQyxDQUFDLENBQUM7SUFDM0UsQ0FBQztJQUVEOzs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7T0E4Qkc7SUFDSCxhQUFhLENBQUMsT0FBeUI7UUFDbkMsTUFBTSxDQUFDLElBQUksQ0FBQyx1QkFBdUIsQ0FBQyxJQUFJLHVCQUFnQixDQUFLLE9BQU8sQ0FBQyxDQUFDLENBQUM7SUFDM0UsQ0FBQztDQUNKO0FBeFNELGtDQXdTQztBQUVEOzs7Ozs7OztHQVFHO0FBQ0gsZ0RBQWdEO0FBQ2hELDJCQUFrRSxTQUFRLGVBQXNCO0lBQzVGLDJDQUEyQztJQUMzQyxZQUFZLE1BQWlCLEVBQUUsSUFBc0I7UUFDakQsS0FBSyxDQUFDLE1BQU0sRUFBRSxJQUFJLENBQUMsQ0FBQztJQUN4QixDQUFDO0lBRUQ7Ozs7OztPQU1HO0lBQ0gsdUNBQXVDLENBQWtCLElBQWlDO1FBQ3RGLElBQUksQ0FBQyxPQUFPLENBQUMsSUFBSSxDQUFDLENBQUM7UUFDbkIsTUFBTSxDQUFDLElBQUkscUJBQXFCLENBQVMsSUFBSSxDQUFDLFNBQVMsRUFBRSxFQUFFLElBQUksQ0FBQyxDQUFDO0lBQ3JFLENBQUM7SUFFRDs7Ozs7O09BTUc7SUFDSCw2QkFBNkIsQ0FBa0IsSUFBcUM7UUFDaEYsSUFBSSxDQUFDLE9BQU8sQ0FBQyxJQUFJLENBQUMsQ0FBQztRQUNuQixNQUFNLENBQUMsSUFBSSxXQUFXLENBQVMsSUFBSSxDQUFDLFNBQVMsRUFBRSxFQUFFLElBQUksQ0FBQyxDQUFDO0lBQzNELENBQUM7SUFFRDs7Ozs7Ozs7Ozs7Ozs7O09BZUc7SUFDSCxNQUFNLENBQUMsUUFBcUM7UUFDeEMsTUFBTSxDQUFDLElBQUksQ0FBQyx1Q0FBdUMsQ0FDL0MsSUFBSSxnQ0FBeUIsQ0FBSyxRQUFRLENBQUMsQ0FDOUMsQ0FBQztJQUNOLENBQUM7SUFFRDs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7OztPQStCRztJQUNILFNBQVMsQ0FBQyxJQUF5QjtRQUMvQixNQUFNLENBQUMsSUFBSSxDQUFDLDZCQUE2QixDQUFRLElBQUksc0JBQWUsQ0FBSyxJQUFJLENBQUMsQ0FBQyxDQUFDO0lBQ3BGLENBQUM7Q0FDSjtBQXZGRCxzREF1RkM7QUFvQkQsRUFBRTtBQUNGLGdCQUFnQjtBQUNoQixFQUFFO0FBRUY7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7O0dBMkpHO0FBQ0gsZ0RBQWdEO0FBQ2hEO0lBT0ksWUFBWSxRQUFvQjtRQUM1QixJQUFJLENBQUMsSUFBSSxHQUFHLElBQUksQ0FBQztJQUNyQixDQUFDO0lBRUQ7OztPQUdHO0lBQ0gsT0FBTyxDQUFDLElBQXNCO1FBQzFCLElBQUksQ0FBQyxJQUFJLEdBQUcsSUFBSSxDQUFDO0lBQ3JCLENBQUM7SUFFRDs7O09BR0c7SUFDSCxPQUFPO1FBQ0gsTUFBTSxDQUFDLElBQUksQ0FBQyxJQUFJLENBQUM7SUFDckIsQ0FBQztJQUVEOztPQUVHO0lBQ0gsUUFBUSxDQUFnQixDQUFXO1FBQy9CLEVBQUUsQ0FBQyxDQUFDLElBQUksQ0FBQyxJQUFJLENBQUMsQ0FBQyxDQUFDO1lBQ1osSUFBSSxDQUFDLElBQUksQ0FBQyxHQUFHLENBQUMsQ0FBQyxDQUFDLENBQUM7UUFDckIsQ0FBQztJQUNMLENBQUM7Q0FDSjtBQW5DRCx3QkFtQ0M7QUFFRDs7OztHQUlHO0FBQ0g7SUFDSSxNQUFNLENBQUMsR0FBRyxJQUFJLE1BQU0sRUFBSyxDQUFDO0lBQzFCLE1BQU0sQ0FBQyxHQUFHLElBQUkscUJBQWMsRUFBSyxDQUFDO0lBQ2xDLE1BQU0sQ0FBQyxJQUFJLFdBQVcsQ0FBTyxDQUFDLEVBQUUsQ0FBQyxDQUFDLENBQUM7QUFDdkMsQ0FBQztBQUU4QixvQ0FBTSJ9 |
@@ -228,2 +228,14 @@ import * as Immutable from "immutable"; | ||
/** | ||
* A callback function which is passed an `Event` | ||
*/ | ||
export declare type EventCallback<T extends Key> = (event: Event<T>) => void; | ||
/** | ||
* A callback function which is passed a `Collection` and associated `key` | ||
*/ | ||
export declare type KeyedCollectionCallback<T extends Key> = (collection: Collection<T>, key: string) => void; | ||
/** | ||
* A tuple of string key and associated `Collection` | ||
*/ | ||
export declare type KeyedCollection<T extends Key> = [string, Collection<T>]; | ||
/** | ||
* Tuple mapping a string -> `ReducerFunction` | ||
@@ -230,0 +242,0 @@ * e.g. `["value", avg()]` |
@@ -8,4 +8,6 @@ import * as Immutable from "immutable"; | ||
import { SortedCollection } from "./sortedcollection"; | ||
import { KeyedCollection } from "./stream"; | ||
import { AggregationSpec, WindowingOptions } from "./types"; | ||
import { AggregationSpec, KeyedCollection, WindowingOptions } from "./types"; | ||
/** | ||
* A map of `SortedCollection`s indexed by a string key representing a window. | ||
*/ | ||
export declare class WindowedCollection<T extends Key> extends Base { | ||
@@ -41,3 +43,3 @@ protected collections: Immutable.Map<string, SortedCollection<T>>; | ||
/** | ||
* Fetch the SortedCollection of events contained in the windowed grouping | ||
* Fetch the `SortedCollection` of `Event`s contained in the windowed grouping | ||
*/ | ||
@@ -44,0 +46,0 @@ get(key: string): SortedCollection<T>; |
@@ -22,2 +22,5 @@ "use strict"; | ||
const types_1 = require("./types"); | ||
/** | ||
* A map of `SortedCollection`s indexed by a string key representing a window. | ||
*/ | ||
class WindowedCollection extends base_1.Base { | ||
@@ -64,3 +67,2 @@ constructor(arg1, arg2, arg3) { | ||
if (collection) { | ||
// TODO: do we use this code path? | ||
throw new Error("Unimplemented"); | ||
@@ -75,3 +77,3 @@ } | ||
/** | ||
* Fetch the SortedCollection of events contained in the windowed grouping | ||
* Fetch the `SortedCollection` of `Event`s contained in the windowed grouping | ||
*/ | ||
@@ -205,2 +207,2 @@ get(key) { | ||
exports.windowed = windowFactory; | ||
//# sourceMappingURL=data:application/json;base64, | ||
//# sourceMappingURL=data:application/json;base64, |
{ | ||
"name": "pondjs", | ||
"version": "1.0.0-alpha.2", | ||
"version": "1.0.0-alpha.3", | ||
"description": "A TimeSeries library built on Immutable.js with Typescript", | ||
@@ -5,0 +5,0 @@ "main": "lib/exports.js", |
@@ -64,5 +64,3 @@ declare const describe: any; | ||
.rate({ fieldSpec: "value", allowNegative: false }) | ||
.output(e => { | ||
result.push(e); | ||
}); | ||
.output(e => result.push(e)); | ||
@@ -94,3 +92,3 @@ list.forEach(e => { | ||
let calls = 0; | ||
const source = stream() | ||
const source = stream<Time>() | ||
.groupByWindow({ | ||
@@ -101,3 +99,3 @@ window: everyThirtyMinutes, | ||
.output((c, key) => { | ||
result[key] = c as Collection<Time>; | ||
result[key] = c; | ||
calls += 1; | ||
@@ -134,4 +132,3 @@ }); | ||
}) | ||
.output(evt => { | ||
const e = evt as Event<Index>; | ||
.output(e => { | ||
result[e.getKey().toString()] = e; | ||
@@ -198,6 +195,3 @@ calls += 1; | ||
.map(e => event(e.getKey(), Immutable.Map({ a: e.get("a") * 2 }))) | ||
.output(evt => { | ||
const e = evt as Event<Time>; | ||
result.push(e); | ||
}); | ||
.output(e => result.push(e)); | ||
@@ -210,2 +204,3 @@ eventsIn.forEach(e => source.addEvent(e)); | ||
}); | ||
it("can do streaming event flatmap", () => { | ||
@@ -231,6 +226,3 @@ const eventsIn = [ | ||
}) | ||
.output(evt => { | ||
const e = evt as Event<Time>; | ||
result.push(e); | ||
}); | ||
.output(e => result.push(e)); | ||
@@ -249,2 +241,24 @@ eventsIn.forEach(e => source.addEvent(e)); | ||
it("can do filtering on a stream of events", () => { | ||
const eventsIn = [ | ||
event(time(Date.UTC(2015, 2, 14, 7, 31, 0)), Immutable.Map({ a: 1 })), | ||
event(time(Date.UTC(2015, 2, 14, 7, 32, 0)), Immutable.Map({ a: 2 })), | ||
event(time(Date.UTC(2015, 2, 14, 7, 33, 0)), Immutable.Map({ a: 3 })), | ||
event(time(Date.UTC(2015, 2, 14, 7, 34, 0)), Immutable.Map({ a: 4 })), | ||
event(time(Date.UTC(2015, 2, 14, 7, 35, 0)), Immutable.Map({ a: 5 })) | ||
]; | ||
const result: Event[] = []; | ||
const source = stream<Time>() | ||
.filter(e => e.get("a") % 2 !== 0) | ||
.output(e => result.push(e)); | ||
eventsIn.forEach(e => source.addEvent(e)); | ||
expect(result[0].get("a")).toEqual(1); | ||
expect(result[1].get("a")).toEqual(3); | ||
expect(result[2].get("a")).toEqual(5); | ||
}); | ||
it("can selection of specific event fields", () => { | ||
@@ -263,5 +277,3 @@ const DATA = [[1471824030000, 1, 2, 3], [1471824105000, 4, 5, 6], [1471824210000, 7, 8, 9]]; | ||
}) | ||
.output(e => { | ||
result.push(e); | ||
}); | ||
.output(e => result.push(e)); | ||
@@ -381,3 +393,3 @@ list.forEach(e => { | ||
}) | ||
.output((e: Event) => results.push(e)); | ||
.output(e => results.push(e)); | ||
@@ -406,3 +418,3 @@ // Stream events | ||
}) | ||
.output((e: Event) => results.push(e)); | ||
.output(e => results.push(e)); | ||
@@ -417,2 +429,35 @@ // Stream events | ||
it("can do a split of two streams", () => { | ||
const eventsIn = [ | ||
event(time(Date.UTC(2015, 2, 14, 7, 57, 0)), Immutable.Map({ a: 1 })), | ||
event(time(Date.UTC(2015, 2, 14, 7, 58, 0)), Immutable.Map({ a: 2 })), | ||
event(time(Date.UTC(2015, 2, 14, 7, 59, 0)), Immutable.Map({ a: 3 })) | ||
]; | ||
const result1: Event[] = []; | ||
const result2: Event[] = []; | ||
const source = stream<Time>().map(e => | ||
event(e.getKey(), Immutable.Map({ a: e.get("a") * 2 })) | ||
); // 2, 4, 6 | ||
const branch1 = source | ||
.map(e => event(e.getKey(), Immutable.Map({ a: e.get("a") * 3 }))) // 6, 12, 18 | ||
.output(e => result1.push(e)); | ||
const branch2 = source | ||
.map(e => event(e.getKey(), Immutable.Map({ a: e.get("a") * 4 }))) // 8, 16, 24 | ||
.output(e => result2.push(e)); | ||
eventsIn.forEach(e => source.addEvent(e)); | ||
expect(result1[0].get("a")).toBe(6); | ||
expect(result1[1].get("a")).toBe(12); | ||
expect(result1[2].get("a")).toBe(18); | ||
expect(result2[0].get("a")).toBe(8); | ||
expect(result2[1].get("a")).toBe(16); | ||
expect(result2[2].get("a")).toBe(24); | ||
}); | ||
it("can coalese two streams", () => { | ||
@@ -419,0 +464,0 @@ const results = []; |
1295271
1.77%86
2.38%21552
2.52%