You're Invited: Meet the Socket team at BSidesSF and RSAC - April 27 - May 1.RSVP

pondjs

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

pondjs - npm Package Compare versions

Comparing version

to
1.0.0-alpha.3

import * as Immutable from "immutable";
import { Base } from "./base";
import { Collection } from "./collection";
import { Event } from "./event";

@@ -9,68 +8,132 @@ import { Index } from "./index";

import { TimeRange } from "./timerange";
import { AggregationSpec, AlignmentOptions, CoalesceOptions, CollapseOptions, FillOptions, RateOptions, ReduceOptions, SelectOptions, WindowingOptions } from "./types";
import { Node } from "./node";
import { AggregationSpec, AlignmentOptions, CoalesceOptions, CollapseOptions, EventCallback, FillOptions, KeyedCollection, KeyedCollectionCallback, RateOptions, ReduceOptions, SelectOptions, WindowingOptions } from "./types";
/**
* A Node is a transformation between type S and type T. Both S
* and T much extend Base.
*
* The transformation happens when a `Node` has its `set()` method called
* by another `Node`. The `input` to set() is of type `S`. When this happens
* a subclass specific implementation of `process` is called to actually
* transform the input (of type `S` to an output of type `T`). Of course
* `S` and `T` maybe the same if the input and output types are expected
* to be the same. The result of `process`, of type `T`, is returned and
* the passed onto other downstream Nodes, by calling their `set()` methods.
*/
export declare type EventCallback = (event: Event<Key>) => void;
export declare type KeyedCollectionCallback<T extends Key> = (collection: Collection<T>, key: string) => void;
export declare type KeyedCollection<T extends Key> = [string, Collection<T>];
/**
* @private
*
* A `StreamInterface` is the base class for the family of facards placed in front of
* the underlying `Stream` to provide the appropiate API layer depending on what type
* of data is being passed through the pipeline at any given stage.
*
* At this base class level, it holds onto a reference to the underlying `Stream`
* object (which contains the root of the `Node` tree into which `Event`s are
* inserted). It also contains the ability to `addEvent()` method to achieve this (from
* the user's point of view) and `addNode()` which gives allows additions to the
* tree.
*
* Note that while the tree is held onto by its root node within the `Stream` object,
* the current addition point, the `tail` is held by each `StreamInterface`. When a
* `Node` is appended to the `tail` an entirely new interface is returned (its type
* dependent on the output type of the `Node` appended), and that interface will contain
* the new tail point on the tree, while the old one is unchanged. This allows for
* branching of the tree.
*/
export declare abstract class Node<S extends Base, T extends Base> {
protected observers: Immutable.List<Node<T, Base>>;
addObserver(node: Node<T, Base>): void;
set(input: S): void;
protected notify(output: T): void;
protected abstract process(input: S): Immutable.List<T>;
export declare class StreamInterface<IN extends Key, S extends Key> {
protected stream: Stream<S>;
protected tail: Node<Base, Base>;
constructor(stream: Stream<S>, tail: Node<Base, Base>);
/**
* Returns the underlying `Stream` object, which primarily contains the
* `root` of the processing graph.
*/
getStream(): Stream<S>;
/**
* Add events into the stream
*/
addEvent(e: Event<S>): void;
/**
* @protected
*/
addNode(node: any): void;
}
/**
* An `EventStream` is the interface to the stream provided for manipulation of
* parts of the streaming pipeline that map a stream of Events of type <T>.
* parts of the streaming pipeline that map a stream of `Event`s of type <IN>.
*
* For example a stream of Events<Time> can be mapped to an output stream of
* new Events<Time> that are aligned to a fixed period boundary. Less or more Events
* For example a stream of `Event<Time>`s can be mapped to an output stream of
* new `Event<Time>`s that are aligned to a fixed period boundary. Less or more `Event`s
* may result.
*
* The type parameter `<U>` is the input `Event` type at the top of the stream, since each
* interface exposes the `addEvent(Event<U>)` method for inserting events at the top of
* the stream.
* The type parameter `<S>` is the input `Event` type at the top of the stream, since each
* interface exposes the `addEvent(e: Event<S>)` method for inserting events at the top of
* the stream this type is maintained across all stream interfaces.
*
* The type parameter `<T>` is the type of `Event`s in this part of the stream. That is
* nodes created by the API at this point of the stream will expect Events of type T,
* and will output new Events, potentially of a different type.
* The type parameter `<IN>` is the type of `Event`s in this part of the stream. That is
* nodes created by the API at this point of the tree will expect `Event<IN>`s,
* and will output new Events, potentially of a different type (identified as `<OUT>`).
* Typically `<IN>` and `<OUT>` would be `Time`, `TimeRange` or `Index`.
*/
export declare class EventStream<T extends Key, U extends Key> {
private stream;
constructor(stream: Stream<U>);
export declare class EventStream<IN extends Key, S extends Key> extends StreamInterface<IN, S> {
constructor(stream: Stream<S>, tail: Node<Base, Base>);
/**
* @private
*
* Add events into the stream
* Adds a new `Node` which converts a stream of `Event<IN>` to `Event<OUT>`
*
* <IN> is the source type for the processing node
* <OUT> is the output Event type for the processing node
*
* Both IN and OUT extend Key, which is `Time`, `TimeRange` or `Index`, typically.
*/
addEvent(e: Event<U>): void;
addEventToEventNode<OUT extends Key>(node: EventMap<IN, OUT>): EventStream<OUT, S>;
/**
* @private
*
* Adds a new `Node` which converts a stream of `Event<IN>`s to a `KeyedCollection<OUT>`.
*
* <IN> is the source type for the processing node
* <OUT> is the output Event type for the processing node
*
* Both IN and OUT extend Key, which is Time, TimeRange or Index, typically.
*/
addEventToCollectorNode<OUT extends Key>(node: EventToKeyedCollection<IN, OUT>): KeyedCollectionStream<OUT, S>;
/**
* Remaps each Event<T> in the stream to a new Event<M>.
*/
map<M extends Key>(mapper: (event: Event<T>) => Event<M>): EventStream<M, U>;
map<OUT extends Key>(mapper: (event: Event<IN>) => Event<OUT>): EventStream<OUT, S>;
/**
* Remaps each Event<T> in the stream to 0, 1 or many Event<M>s.
* Remaps each Event<IN> in the stream to 0, 1 or many Event<OUT>s.
*/
flatMap<M extends Key>(mapper: (event: Event<T>) => Immutable.List<Event<M>>): EventStream<M, U>;
flatMap<OUT extends Key>(mapper: (event: Event<IN>) => Immutable.List<Event<OUT>>): EventStream<OUT, S>;
/**
* Reduces a sequence of past Event<T>s in the stream to a single output Event<M>.
* Reduces a sequence of past Event<IN>s in the stream to a single output Event<M>.
*/
reduce<M extends Key>(options: ReduceOptions<T>): EventStream<T, U>;
coalesce(options: CoalesceOptions): EventStream<T, U>;
reduce(options: ReduceOptions<IN>): EventStream<IN, S>;
/**
* Filter out `Event<IN>`s in the stream. Provide a predicate function that
* given an Event returns true or false.
*
* Example:
* ```
* const source = stream<Time>()
* .filter(e => e.get("a") % 2 !== 0)
* .output(evt => {
* // -> 1, 3, 5
* });
*
* source.addEvent(...); // <- 1, 2, 3, 4, 5
* ```
*/
filter<M extends Key>(predicate: (event: Event<IN>) => boolean): EventStream<IN, S>;
/**
* If you have multiple sources you can feed them into the same stream and combine them
* with the `coalese()` processor. In this example two event sources are fed into the
* `Stream`. One contains `Event`s with just a field "in", and the other just a field
* "out". The resulting output is `Event`s with the latest (in arrival time) value for
* "in" and "out" together:
*
* ```typescript
* const source = stream()
* .coalesce({ fields: ["in", "out"] })
* .output((e: Event) => results.push(e));
*
* // Stream events
* for (let i = 0; i < 5; i++) {
* source.addEvent(streamIn[i]); // from stream 1
* source.addEvent(streamOut[i]); // from stream 2
* }
* ```
*/
coalesce(options: CoalesceOptions): EventStream<IN, S>;
/**
* Fill missing values in stream events.

@@ -89,3 +152,3 @@ *

*
* @example
* Example:
* ```

@@ -100,11 +163,11 @@ * const source = stream()

*/
fill(options: FillOptions): EventStream<T, U>;
fill(options: FillOptions): EventStream<IN, S>;
/**
* Align Events in the stream to a specific boundary at a fixed period.
* Align `Event`s in the stream to a specific boundary at a fixed `period`.
* Options are a `AlignmentOptions` object where you specify which field to
* align with `fieldSpec`, what boundary period to use with `window` and
* align with `fieldSpec`, what boundary `period` to use with `window` and
* the method of alignment with `method` (which can be either `Linear`
* interpolation, or `Hold`).
*
* @example
* Example:
* ```

@@ -119,6 +182,6 @@ * const s = stream()

*/
align(options: AlignmentOptions): EventStream<T, U>;
align(options: AlignmentOptions): EventStream<IN, S>;
/**
* Convert incoming Events in the stream to rates (essentially taking
* the derivative over time). The resulting output Events will be
* Convert incoming `Event`s in the stream to rates (essentially taking
* the derivative over time). The resulting output `Event`s will be
* of type `Event<TimeRange>`, where the `TimeRange` key will be

@@ -135,3 +198,3 @@ * the time span over which the rate was calculated. If you want you

*
* @example
* Example:
*

@@ -144,8 +207,8 @@ * ```

*/
rate(options: RateOptions): EventStream<TimeRange, U>;
rate(options: RateOptions): EventStream<TimeRange, S>;
/**
* Convert incoming events to new events with on the specified
* Convert incoming `Event`s to new `Event`s with on the specified
* fields selected out of the source.
*
* @example
* Example:
*

@@ -160,8 +223,8 @@ * Events with fields a, b, c can be mapped to events with only

*/
select(options: SelectOptions): EventStream<T, U>;
select(options: SelectOptions): EventStream<IN, S>;
/**
* Convert incoming events to new events with specified
* Convert incoming `Event`s to new `Event`s with specified
* fields collapsed into a new field using an aggregation function.
*
* @example
* Example:
*

@@ -181,5 +244,6 @@ * Events with fields a, b, c can be mapped to events with only a field

*/
collapse(options: CollapseOptions): EventStream<T, U>;
collapse(options: CollapseOptions): EventStream<IN, S>;
/**
* An output, specified as an `EventCallback`, essentially `(event: Event<Key>) => void`.
*
* Using this method you are able to access the stream result. Your callback

@@ -190,3 +254,3 @@ * function will be called whenever a new Event is available. Not that currently the

*
* @example
* Example:
* ```

@@ -202,11 +266,11 @@ * const source = stream<Time>()

*/
output(callback: EventCallback): EventStream<T, U>;
output(callback: EventCallback<IN>): EventStream<IN, S>;
/**
* The heart of the streaming code is that in addition to remapping operations of
* a stream of events, you can also group by a window. This is what allows you to do
* rollups with the streaming code.
* rollups within the streaming code.
*
* A window is defined with the `WindowingOptions`, which allows you to specify
* the window period as a `Period` (e.g. `period("30m")` for each 30 minutes window)
* as the `window` and a `Trigger` enum value (emit a completed window on each
* as the `window`, and a `Trigger` enum value (emit a completed window on each
* incoming `Event` or on each completed window).

@@ -222,3 +286,3 @@ *

*
* @example
* Example:
*

@@ -232,27 +296,43 @@ * ```

* .aggregate({...})
* .output(event => {
* .output(e => {
* ...
* });
*/
groupByWindow(options: WindowingOptions): KeyedCollectionStream<T, U>;
groupByWindow(options: WindowingOptions): KeyedCollectionStream<IN, S>;
}
/**
* A `KeyedCollectionStream` is a stream containing tuples mapping a string key
* to a `Collection`. When you window a stream you will get one of these that
* maps a string representing the window to the `Collection` of all `Event`s in
* that window.
*
* Using this class you can `output()` that or `aggregate()` the `Collection`s
* back to `Event`s.
*/
export declare class KeyedCollectionStream<T extends Key, U extends Key> {
private stream;
constructor(stream: Stream<U>);
export declare class KeyedCollectionStream<IN extends Key, S extends Key> extends StreamInterface<IN, S> {
constructor(stream: Stream<S>, tail: Node<Base, Base>);
/**
* @private
* Add events into the stream
*
* A helper function to create a new `Node` in the graph. The new node will be a
* processor that remaps a stream of `KeyedCollection`s to another stream of
* `KeyedCollection`s.
*/
addEvent(e: Event<U>): void;
addKeyedCollectionToKeyedCollectionNode<OUT extends Key>(node: KeyedCollectionMap<IN, OUT>): KeyedCollectionStream<OUT, S>;
/**
* An output, specified as an `KeyedCollectionCallback`, essentially
* `(collection: Collection<T>,vkey: string) => void`.
* @private
*
* Helper function to create a new `Node` in the graph. The new node will be a
* processor that we remap a stream of `KeyedCollection`s back to `Event`s. An
* example would be an aggregation.
*/
addKeyedCollectionToEventNode<OUT extends Key>(node: KeyedCollectionToEvent<IN, OUT>): EventStream<OUT, S>;
/**
* An output, specified as an `KeyedCollectionCallback`:
* `(collection: Collection<T>, key: string) => void`.
*
* Using this method you are able to access the stream result. Your callback
* function will be called whenever a new `Collection` is available.
*
* @example
* Example:
* ```

@@ -266,5 +346,5 @@ * const source = stream<Time>()

*/
output(callback: KeyedCollectionCallback<T>): KeyedCollectionStream<T, U>;
output(callback: KeyedCollectionCallback<IN>): KeyedCollectionStream<IN, S>;
/**
* Takes an incoming tuple mapping a key (the window name) to a `Collection`
* Takes an incoming tuple mapping a key to a `Collection`
* (containing all `Event`s in the window) and reduces that down

@@ -300,3 +380,3 @@ * to an output `Event<Index>` using an aggregation specification. As

*/
aggregate(spec: AggregationSpec<T>): EventStream<Index, U>;
aggregate(spec: AggregationSpec<IN>): EventStream<Index, S>;
}

@@ -317,9 +397,18 @@ export declare type EventToKeyedCollection<S extends Key, T extends Key> = Node<Event<S>, KeyedCollection<T>>;

*
* A `Stream` object manages a chain of processing nodes, each type of which
* provides an appropiate interface. When a `Stream` is initially created with
* the `stream()` factory function the interface exposed is an `EventStream`.
* If you perform a windowing operation you will be exposed to a
* `KeyedCollectionStream`. If you aggregate a `KeyedCollectionStream` you
* will be back to an `EventStream` and so on.
* A `Stream` object manages a tree of processing `Node`s, each type of which
* maps either `Event`s to other `Event`s or to and from a `KeyedCollection`.
* When an `Event` is added to the stream it will enter the top processing
* node where it will be processed to produce 0, 1 or many output `Event`s.
* Then then are passed down the tree until an output is reached.
*
* When a `Stream` is initially created with the `stream()` factory function
* the interface exposed is an `EventStream`. If you perform a windowing operation
* you will be exposed to a `KeyedCollectionStream`. If you aggregate a
* `KeyedCollectionStream` you will be back to an `EventStream` and so on.
*
* You can think of the `Stream` as the thing that holds the root of the processing
* node chain, while either an `EventStream` or `KeyedCollectionStream` holds the
* current leaf of the tree (the `tail`) onto which additional operating nodes
* can be added using the `EventStream` or `KeyedCollectionStream` API.
*
* ---

@@ -334,3 +423,3 @@ * Note:

* simplify passing of events to a browser and enabling convenient processing for visualization
* purposes, or for light weight handling of events in Node.
* purposes, or for light weight handling of events in Node.js such as simple event alerting.
*

@@ -372,2 +461,26 @@ * ---

*
* If you need to branch a stream, pass the parent stream into the `stream()` factory
* function as its only arg:
*
* ```
* const source = stream().map(
* e => event(e.getKey(), Immutable.Map({ a: e.get("a") * 2 }))
* );
*
* stream(source)
* .map(e => event(e.getKey(), Immutable.Map({ a: e.get("a") * 3 })))
* .output(e => {
* //
* });
*
* stream(source)
* .map(e => event(e.getKey(), Immutable.Map({ a: e.get("a") * 4 })))
* .output(e => {
* //
* });
*
* source.addEvent(...);
*
* ```
*
* If you have multiple sources you can feed them into the same stream and combine them

@@ -382,3 +495,3 @@ * with the `coalese()` processor. In this example two event sources are fed into the

* .coalesce({ fields: ["in", "out"] })
* .output((e: Event) => results.push(e));
* .output(e => results.push(e));
*

@@ -409,3 +522,3 @@ * // Stream events

* })
* .output((e: Event) => console.log("Running total:", e.toString()) );
* .output(e => console.log("Running total:", e.toString()) );
*

@@ -429,3 +542,3 @@ * // Add Events into the source...

* })
* .output((e: Event) => console.log("Rolling average:", e.toString()) );
* .output(e => console.log("Rolling average:", e.toString()) );
*

@@ -437,30 +550,24 @@ * // Add Events into the source...

export declare class Stream<U extends Key = Time> {
private head;
private tail;
/**
* @private
* The root of the entire event processing tree. All incoming `Event`s are
* provided to this `Node`.
*/
addEventMappingNode<S extends Key, T extends Key>(node: EventMap<S, T>): EventStream<T, U>;
private root;
constructor(upstream?: Stream<U>);
/**
* @private
* Set a new new root onto the `Stream`. This is used internally.
*/
addEventToCollectorNode<S extends Key, T extends Key>(node: EventToKeyedCollection<S, T>): KeyedCollectionStream<T, U>;
setRoot(node: Node<Base, Base>): void;
/**
* @private
* Returns the `root` node of the entire processing tree. This is used internally.
*/
addCollectorMappingNode<S extends Key, T extends Key>(node: KeyedCollectionMap<S, T>): KeyedCollectionStream<T, U>;
getRoot(): Node<Base, Base>;
/**
* @private
* Add an `Event` into the root node of the stream
*/
addCollectionToEventNode<S extends Key, T extends Key>(node: KeyedCollectionToEvent<S, T>): EventStream<T, U>;
/**
* Add an `Event` into the stream
*/
addEvent<T extends Key>(e: Event<U>): void;
/**
* @private
*/
protected addNode(node: any): void;
}
declare function streamFactory<T extends Key>(): EventStream<T, T>;
export { streamFactory as stream };
declare function eventStreamFactory<T extends Key>(): EventStream<T, T>;
export { eventStreamFactory as stream };

@@ -15,260 +15,76 @@ "use strict";

const event_1 = require("./event");
const index_1 = require("./index");
const align_1 = require("./align");
const collapse_1 = require("./collapse");
const fill_1 = require("./fill");
const rate_1 = require("./rate");
const reduce_1 = require("./reduce");
const select_1 = require("./select");
const windowedcollection_1 = require("./windowedcollection");
const node_1 = require("./node");
/**
* @private
*
*/
// tslint:disable-next-line:max-classes-per-file
class Node {
constructor() {
// Members
this.observers = Immutable.List();
}
addObserver(node) {
this.observers = this.observers.push(node);
}
set(input) {
const outputs = this.process(input);
if (outputs) {
outputs.forEach(output => this.notify(output));
}
}
notify(output) {
if (this.observers.size > 0) {
this.observers.forEach(node => {
node.set(output);
});
}
}
}
exports.Node = Node;
//
// Nodes
//
/**
* @private
* A `StreamInterface` is the base class for the family of facards placed in front of
* the underlying `Stream` to provide the appropiate API layer depending on what type
* of data is being passed through the pipeline at any given stage.
*
*/
// tslint:disable-next-line:max-classes-per-file
class EventInputNode extends Node {
constructor() {
super();
// pass
}
process(e) {
return Immutable.List([e]);
}
}
/**
* @private
* At this base class level, it holds onto a reference to the underlying `Stream`
* object (which contains the root of the `Node` tree into which `Event`s are
* inserted). It also contains the ability to `addEvent()` method to achieve this (from
* the user's point of view) and `addNode()` which gives allows additions to the
* tree.
*
* Note that while the tree is held onto by its root node within the `Stream` object,
* the current addition point, the `tail` is held by each `StreamInterface`. When a
* `Node` is appended to the `tail` an entirely new interface is returned (its type
* dependent on the output type of the `Node` appended), and that interface will contain
* the new tail point on the tree, while the old one is unchanged. This allows for
* branching of the tree.
*/
// tslint:disable-next-line:max-classes-per-file
class EventOutputNode extends Node {
constructor(callback) {
super();
this.callback = callback;
// pass
class StreamInterface {
// tslint:disable-line:max-classes-per-file
constructor(stream, tail) {
this.stream = stream;
this.tail = tail;
}
process(e) {
this.callback(e);
return Immutable.List();
/**
* Returns the underlying `Stream` object, which primarily contains the
* `root` of the processing graph.
*/
getStream() {
return this.stream;
}
}
/**
* @private
*
*/
// tslint:disable-next-line:max-classes-per-file
class KeyedCollectionOutputNode extends Node {
constructor(callback) {
super();
this.callback = callback;
// pass
/**
* Add events into the stream
*/
addEvent(e) {
this.stream.addEvent(e);
}
process(keyedCollection) {
const [key, collection] = keyedCollection;
this.callback(collection, key);
return Immutable.List();
/**
* @protected
*/
addNode(node) {
if (!this.stream.getRoot()) {
this.stream.setRoot(node);
}
if (this.tail) {
this.tail.addObserver(node);
}
}
}
exports.StreamInterface = StreamInterface;
/**
* @private
*
*/
// tslint:disable-next-line:max-classes-per-file
class MapNode extends Node {
constructor(mapper) {
super();
this.mapper = mapper;
}
process(e) {
return Immutable.List([this.mapper(e)]);
}
}
/**
* @private
*
*/
// tslint:disable-next-line:max-classes-per-file
class FlatMapNode extends Node {
constructor(mapper) {
super();
this.mapper = mapper;
}
process(e) {
return this.mapper(e);
}
}
/**
* @private
*
*/
// tslint:disable-next-line:max-classes-per-file
class FillNode extends Node {
constructor(options) {
super();
this.processor = new fill_1.Fill(options);
}
process(e) {
return this.processor.addEvent(e);
}
}
/**
* @private
*
*/
// tslint:disable-next-line:max-classes-per-file
class AlignNode extends Node {
constructor(options) {
super();
this.processor = new align_1.Align(options);
}
process(e) {
return this.processor.addEvent(e);
}
}
/**
* @private
*
*/
// tslint:disable-next-line:max-classes-per-file
class SelectNode extends Node {
constructor(options) {
super();
this.processor = new select_1.Select(options);
}
process(e) {
return this.processor.addEvent(e);
}
}
/**
* @private
*
*/
// tslint:disable-next-line:max-classes-per-file
class CollapseNode extends Node {
constructor(options) {
super();
this.processor = new collapse_1.Collapse(options);
}
process(e) {
return this.processor.addEvent(e);
}
}
/**
* @private
*
*/
// tslint:disable-next-line:max-classes-per-file
class RateNode extends Node {
constructor(options) {
super();
this.processor = new rate_1.Rate(options);
}
process(e) {
return this.processor.addEvent(e);
}
}
/**
* @private
*
*/
// tslint:disable-next-line:max-classes-per-file
class ReduceNode extends Node {
constructor(options) {
super();
this.processor = new reduce_1.Reducer(options);
}
process(e) {
return this.processor.addEvent(e);
}
}
/**
* @private
*
*/
// tslint:disable-next-line:max-classes-per-file
class WindowOutputNode extends Node {
constructor(options) {
super();
this.processor = new windowedcollection_1.WindowedCollection(options);
}
process(e) {
const keyedCollections = this.processor.addEvent(e);
return keyedCollections;
}
}
/**
* @private
*
*/
// tslint:disable-next-line:max-classes-per-file
class AggregationNode extends Node {
constructor(aggregationSpec) {
super();
this.aggregationSpec = aggregationSpec;
}
process(keyedCollection) {
const [group, collection] = keyedCollection;
const d = {};
const [groupKey, windowKey] = group.split("::").length === 2 ? group.split("::") : [null, group];
_.forEach(this.aggregationSpec, (src, dest) => {
const [srcField, reducer] = src;
d[dest] = collection.aggregate(reducer, srcField);
});
const indexedEvent = new event_1.Event(index_1.index(windowKey), Immutable.fromJS(d));
return Immutable.List([indexedEvent]);
}
}
//
// Stream interfaces
//
/**
* An `EventStream` is the interface to the stream provided for manipulation of
* parts of the streaming pipeline that map a stream of Events of type <T>.
* parts of the streaming pipeline that map a stream of `Event`s of type <IN>.
*
* For example a stream of Events<Time> can be mapped to an output stream of
* new Events<Time> that are aligned to a fixed period boundary. Less or more Events
* For example a stream of `Event<Time>`s can be mapped to an output stream of
* new `Event<Time>`s that are aligned to a fixed period boundary. Less or more `Event`s
* may result.
*
* The type parameter `<U>` is the input `Event` type at the top of the stream, since each
* interface exposes the `addEvent(Event<U>)` method for inserting events at the top of
* the stream.
* The type parameter `<S>` is the input `Event` type at the top of the stream, since each
* interface exposes the `addEvent(e: Event<S>)` method for inserting events at the top of
* the stream this type is maintained across all stream interfaces.
*
* The type parameter `<T>` is the type of `Event`s in this part of the stream. That is
* nodes created by the API at this point of the stream will expect Events of type T,
* and will output new Events, potentially of a different type.
* The type parameter `<IN>` is the type of `Event`s in this part of the stream. That is
* nodes created by the API at this point of the tree will expect `Event<IN>`s,
* and will output new Events, potentially of a different type (identified as `<OUT>`).
* Typically `<IN>` and `<OUT>` would be `Time`, `TimeRange` or `Index`.
*/
class EventStream {
class EventStream extends StreamInterface {
// tslint:disable-line:max-classes-per-file
constructor(stream) {
this.stream = stream;
constructor(stream, tail) {
super(stream, tail);
}

@@ -278,25 +94,85 @@ /**

*
* Add events into the stream
* Adds a new `Node` which converts a stream of `Event<IN>` to `Event<OUT>`
*
* <IN> is the source type for the processing node
* <OUT> is the output Event type for the processing node
*
* Both IN and OUT extend Key, which is `Time`, `TimeRange` or `Index`, typically.
*/
addEvent(e) {
this.stream.addEvent(e);
addEventToEventNode(node) {
this.addNode(node);
return new EventStream(this.getStream(), node);
}
/**
* @private
*
* Adds a new `Node` which converts a stream of `Event<IN>`s to a `KeyedCollection<OUT>`.
*
* <IN> is the source type for the processing node
* <OUT> is the output Event type for the processing node
*
* Both IN and OUT extend Key, which is Time, TimeRange or Index, typically.
*/
addEventToCollectorNode(node) {
this.addNode(node);
return new KeyedCollectionStream(this.getStream(), node);
}
//
// Public API to a stream carrying `Event`s
//
/**
* Remaps each Event<T> in the stream to a new Event<M>.
*/
map(mapper) {
return this.stream.addEventMappingNode(new MapNode(mapper));
return this.addEventToEventNode(new node_1.MapNode(mapper));
}
/**
* Remaps each Event<T> in the stream to 0, 1 or many Event<M>s.
* Remaps each Event<IN> in the stream to 0, 1 or many Event<OUT>s.
*/
flatMap(mapper) {
return this.stream.addEventMappingNode(new FlatMapNode(mapper));
return this.addEventToEventNode(new node_1.FlatMapNode(mapper));
}
/**
* Reduces a sequence of past Event<T>s in the stream to a single output Event<M>.
* Reduces a sequence of past Event<IN>s in the stream to a single output Event<M>.
*/
reduce(options) {
return this.stream.addEventMappingNode(new ReduceNode(options));
return this.addEventToEventNode(new node_1.ReduceNode(options));
}
/**
* Filter out `Event<IN>`s in the stream. Provide a predicate function that
* given an Event returns true or false.
*
* Example:
* ```
* const source = stream<Time>()
* .filter(e => e.get("a") % 2 !== 0)
* .output(evt => {
* // -> 1, 3, 5
* });
*
* source.addEvent(...); // <- 1, 2, 3, 4, 5
* ```
*/
filter(predicate) {
return this.addEventToEventNode(new node_1.FilterNode(predicate));
}
/**
* If you have multiple sources you can feed them into the same stream and combine them
* with the `coalese()` processor. In this example two event sources are fed into the
* `Stream`. One contains `Event`s with just a field "in", and the other just a field
* "out". The resulting output is `Event`s with the latest (in arrival time) value for
* "in" and "out" together:
*
* ```typescript
* const source = stream()
* .coalesce({ fields: ["in", "out"] })
* .output((e: Event) => results.push(e));
*
* // Stream events
* for (let i = 0; i < 5; i++) {
* source.addEvent(streamIn[i]); // from stream 1
* source.addEvent(streamOut[i]); // from stream 2
* }
* ```
*/
coalesce(options) {

@@ -310,3 +186,3 @@ const { fields } = options;

}
return this.stream.addEventMappingNode(new ReduceNode({
return this.addEventToEventNode(new node_1.ReduceNode({
count: 1,

@@ -338,3 +214,3 @@ iteratee(accum, eventList) {

*
* @example
* Example:
* ```

@@ -350,12 +226,12 @@ * const source = stream()

fill(options) {
return this.stream.addEventMappingNode(new FillNode(options));
return this.addEventToEventNode(new node_1.FillNode(options));
}
/**
* Align Events in the stream to a specific boundary at a fixed period.
* Align `Event`s in the stream to a specific boundary at a fixed `period`.
* Options are a `AlignmentOptions` object where you specify which field to
* align with `fieldSpec`, what boundary period to use with `window` and
* align with `fieldSpec`, what boundary `period` to use with `window` and
* the method of alignment with `method` (which can be either `Linear`
* interpolation, or `Hold`).
*
* @example
* Example:
* ```

@@ -371,7 +247,7 @@ * const s = stream()

align(options) {
return this.stream.addEventMappingNode(new AlignNode(options));
return this.addEventToEventNode(new node_1.AlignNode(options));
}
/**
* Convert incoming Events in the stream to rates (essentially taking
* the derivative over time). The resulting output Events will be
* Convert incoming `Event`s in the stream to rates (essentially taking
* the derivative over time). The resulting output `Event`s will be
* of type `Event<TimeRange>`, where the `TimeRange` key will be

@@ -388,3 +264,3 @@ * the time span over which the rate was calculated. If you want you

*
* @example
* Example:
*

@@ -398,9 +274,9 @@ * ```

rate(options) {
return this.stream.addEventMappingNode(new RateNode(options));
return this.addEventToEventNode(new node_1.RateNode(options));
}
/**
* Convert incoming events to new events with on the specified
* Convert incoming `Event`s to new `Event`s with on the specified
* fields selected out of the source.
*
* @example
* Example:
*

@@ -416,9 +292,9 @@ * Events with fields a, b, c can be mapped to events with only

select(options) {
return this.stream.addEventMappingNode(new SelectNode(options));
return this.addEventToEventNode(new node_1.SelectNode(options));
}
/**
* Convert incoming events to new events with specified
* Convert incoming `Event`s to new `Event`s with specified
* fields collapsed into a new field using an aggregation function.
*
* @example
* Example:
*

@@ -439,6 +315,7 @@ * Events with fields a, b, c can be mapped to events with only a field

collapse(options) {
return this.stream.addEventMappingNode(new CollapseNode(options));
return this.addEventToEventNode(new node_1.CollapseNode(options));
}
/**
* An output, specified as an `EventCallback`, essentially `(event: Event<Key>) => void`.
*
* Using this method you are able to access the stream result. Your callback

@@ -449,3 +326,3 @@ * function will be called whenever a new Event is available. Not that currently the

*
* @example
* Example:
* ```

@@ -462,3 +339,3 @@ * const source = stream<Time>()

output(callback) {
return this.stream.addEventMappingNode(new EventOutputNode(callback));
return this.addEventToEventNode(new node_1.EventOutputNode(callback));
}

@@ -468,7 +345,7 @@ /**

* a stream of events, you can also group by a window. This is what allows you to do
* rollups with the streaming code.
* rollups within the streaming code.
*
* A window is defined with the `WindowingOptions`, which allows you to specify
* the window period as a `Period` (e.g. `period("30m")` for each 30 minutes window)
* as the `window` and a `Trigger` enum value (emit a completed window on each
* as the `window`, and a `Trigger` enum value (emit a completed window on each
* incoming `Event` or on each completed window).

@@ -484,3 +361,3 @@ *

*
* @example
* Example:
*

@@ -494,3 +371,3 @@ * ```

* .aggregate({...})
* .output(event => {
* .output(e => {
* ...

@@ -500,3 +377,3 @@ * });

groupByWindow(options) {
return this.stream.addEventToCollectorNode(new WindowOutputNode(options));
return this.addEventToCollectorNode(new node_1.WindowOutputNode(options));
}

@@ -506,24 +383,46 @@ }

/**
* A `KeyedCollectionStream` is a stream containing tuples mapping a string key
* to a `Collection`. When you window a stream you will get one of these that
* maps a string representing the window to the `Collection` of all `Event`s in
* that window.
*
* Using this class you can `output()` that or `aggregate()` the `Collection`s
* back to `Event`s.
*/
// tslint:disable-next-line:max-classes-per-file
class KeyedCollectionStream {
constructor(stream) {
this.stream = stream;
class KeyedCollectionStream extends StreamInterface {
// tslint:disable-line:max-classes-per-file
constructor(stream, tail) {
super(stream, tail);
}
/**
* @private
* Add events into the stream
*
* A helper function to create a new `Node` in the graph. The new node will be a
* processor that remaps a stream of `KeyedCollection`s to another stream of
* `KeyedCollection`s.
*/
addEvent(e) {
this.stream.addEvent(e);
addKeyedCollectionToKeyedCollectionNode(node) {
this.addNode(node);
return new KeyedCollectionStream(this.getStream(), node);
}
/**
* An output, specified as an `KeyedCollectionCallback`, essentially
* `(collection: Collection<T>,vkey: string) => void`.
* @private
*
* Helper function to create a new `Node` in the graph. The new node will be a
* processor that we remap a stream of `KeyedCollection`s back to `Event`s. An
* example would be an aggregation.
*/
addKeyedCollectionToEventNode(node) {
this.addNode(node);
return new EventStream(this.getStream(), node);
}
/**
* An output, specified as an `KeyedCollectionCallback`:
* `(collection: Collection<T>, key: string) => void`.
*
* Using this method you are able to access the stream result. Your callback
* function will be called whenever a new `Collection` is available.
*
* @example
* Example:
* ```

@@ -538,6 +437,6 @@ * const source = stream<Time>()

output(callback) {
return this.stream.addCollectorMappingNode(new KeyedCollectionOutputNode(callback));
return this.addKeyedCollectionToKeyedCollectionNode(new node_1.KeyedCollectionOutputNode(callback));
}
/**
* Takes an incoming tuple mapping a key (the window name) to a `Collection`
* Takes an incoming tuple mapping a key to a `Collection`
* (containing all `Event`s in the window) and reduces that down

@@ -574,3 +473,3 @@ * to an output `Event<Index>` using an aggregation specification. As

aggregate(spec) {
return this.stream.addCollectionToEventNode(new AggregationNode(spec));
return this.addKeyedCollectionToEventNode(new node_1.AggregationNode(spec));
}

@@ -592,9 +491,18 @@ }

*
* A `Stream` object manages a chain of processing nodes, each type of which
* provides an appropiate interface. When a `Stream` is initially created with
* the `stream()` factory function the interface exposed is an `EventStream`.
* If you perform a windowing operation you will be exposed to a
* `KeyedCollectionStream`. If you aggregate a `KeyedCollectionStream` you
* will be back to an `EventStream` and so on.
* A `Stream` object manages a tree of processing `Node`s, each type of which
* maps either `Event`s to other `Event`s or to and from a `KeyedCollection`.
* When an `Event` is added to the stream it will enter the top processing
* node where it will be processed to produce 0, 1 or many output `Event`s.
* Then then are passed down the tree until an output is reached.
*
* When a `Stream` is initially created with the `stream()` factory function
* the interface exposed is an `EventStream`. If you perform a windowing operation
* you will be exposed to a `KeyedCollectionStream`. If you aggregate a
* `KeyedCollectionStream` you will be back to an `EventStream` and so on.
*
* You can think of the `Stream` as the thing that holds the root of the processing
* node chain, while either an `EventStream` or `KeyedCollectionStream` holds the
* current leaf of the tree (the `tail`) onto which additional operating nodes
* can be added using the `EventStream` or `KeyedCollectionStream` API.
*
* ---

@@ -609,3 +517,3 @@ * Note:

* simplify passing of events to a browser and enabling convenient processing for visualization
* purposes, or for light weight handling of events in Node.
* purposes, or for light weight handling of events in Node.js such as simple event alerting.
*

@@ -647,2 +555,26 @@ * ---

*
* If you need to branch a stream, pass the parent stream into the `stream()` factory
* function as its only arg:
*
* ```
* const source = stream().map(
* e => event(e.getKey(), Immutable.Map({ a: e.get("a") * 2 }))
* );
*
* stream(source)
* .map(e => event(e.getKey(), Immutable.Map({ a: e.get("a") * 3 })))
* .output(e => {
* //
* });
*
* stream(source)
* .map(e => event(e.getKey(), Immutable.Map({ a: e.get("a") * 4 })))
* .output(e => {
* //
* });
*
* source.addEvent(...);
*
* ```
*
* If you have multiple sources you can feed them into the same stream and combine them

@@ -657,3 +589,3 @@ * with the `coalese()` processor. In this example two event sources are fed into the

* .coalesce({ fields: ["in", "out"] })
* .output((e: Event) => results.push(e));
* .output(e => results.push(e));
*

@@ -684,3 +616,3 @@ * // Stream events

* })
* .output((e: Event) => console.log("Running total:", e.toString()) );
* .output(e => console.log("Running total:", e.toString()) );
*

@@ -704,3 +636,3 @@ * // Add Events into the source...

* })
* .output((e: Event) => console.log("Rolling average:", e.toString()) );
* .output(e => console.log("Rolling average:", e.toString()) );
*

@@ -713,57 +645,40 @@ * // Add Events into the source...

class Stream {
/**
* @private
*/
addEventMappingNode(node) {
this.addNode(node);
return new EventStream(this);
constructor(upstream) {
this.root = null;
}
/**
* @private
* Set a new new root onto the `Stream`. This is used internally.
*/
addEventToCollectorNode(node) {
this.addNode(node);
return new KeyedCollectionStream(this);
setRoot(node) {
this.root = node;
}
/**
* @private
* Returns the `root` node of the entire processing tree. This is used internally.
*/
addCollectorMappingNode(node) {
this.addNode(node);
return new KeyedCollectionStream(this);
getRoot() {
return this.root;
}
/**
* @private
* Add an `Event` into the root node of the stream
*/
addCollectionToEventNode(node) {
this.addNode(node);
return new EventStream(this);
}
/**
* Add an `Event` into the stream
*/
addEvent(e) {
if (this.head) {
this.head.set(e);
if (this.root) {
this.root.set(e);
}
}
/**
* @private
*/
addNode(node) {
if (!this.head) {
this.head = node;
}
if (this.tail) {
this.tail.addObserver(node);
}
this.tail = node;
}
}
exports.Stream = Stream;
function streamFactory() {
/*
* `Stream` and its associated objects are designed for processing of incoming
* `Event` streams at real time. This is useful for live dashboard situations or
* possibly real time monitoring and alerting from event streams.
*/
function eventStreamFactory() {
const s = new Stream();
return s.addEventMappingNode(new EventInputNode());
const n = new node_1.EventInputNode();
return new EventStream(s, n);
}
exports.stream = streamFactory;
//# sourceMappingURL=data:application/json;base64,
exports.stream = eventStreamFactory;
//# sourceMappingURL=data:application/json;base64,eyJ2ZXJzaW9uIjozLCJmaWxlIjoic3RyZWFtLmpzIiwic291cmNlUm9vdCI6IiIsInNvdXJjZXMiOlsiLi4vc3JjL3N0cmVhbS50cyJdLCJuYW1lcyI6W10sIm1hcHBpbmdzIjoiO0FBQUE7Ozs7Ozs7O0dBUUc7O0FBRUgsdUNBQXVDO0FBQ3ZDLDRCQUE0QjtBQUk1QixtQ0FBdUM7QUFhdkMsaUNBZ0JnQjtBQW1CaEI7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7R0FtQkc7QUFDSDtJQUNJLDJDQUEyQztJQUUzQyxZQUFzQixNQUFpQixFQUFZLElBQXNCO1FBQW5ELFdBQU0sR0FBTixNQUFNLENBQVc7UUFBWSxTQUFJLEdBQUosSUFBSSxDQUFrQjtJQUFHLENBQUM7SUFFN0U7OztPQUdHO0lBQ0gsU0FBUztRQUNMLE1BQU0sQ0FBQyxJQUFJLENBQUMsTUFBTSxDQUFDO0lBQ3ZCLENBQUM7SUFFRDs7T0FFRztJQUNILFFBQVEsQ0FBQyxDQUFXO1FBQ2hCLElBQUksQ0FBQyxNQUFNLENBQUMsUUFBUSxDQUFDLENBQUMsQ0FBQyxDQUFDO0lBQzVCLENBQUM7SUFFRDs7T0FFRztJQUNILE9BQU8sQ0FBQyxJQUFJO1FBQ1IsRUFBRSxDQUFDLENBQUMsQ0FBQyxJQUFJLENBQUMsTUFBTSxDQUFDLE9BQU8sRUFBRSxDQUFDLENBQUMsQ0FBQztZQUN6QixJQUFJLENBQUMsTUFBTSxDQUFDLE9BQU8sQ0FBQyxJQUFJLENBQUMsQ0FBQztRQUM5QixDQUFDO1FBQ0QsRUFBRSxDQUFDLENBQUMsSUFBSSxDQUFDLElBQUksQ0FBQyxDQUFDLENBQUM7WUFDWixJQUFJLENBQUMsSUFBSSxDQUFDLFdBQVcsQ0FBQyxJQUFJLENBQUMsQ0FBQztRQUNoQyxDQUFDO0lBQ0wsQ0FBQztDQUNKO0FBL0JELDBDQStCQztBQUVEOzs7Ozs7Ozs7Ozs7Ozs7O0dBZ0JHO0FBQ0gsaUJBQXdELFNBQVEsZUFBc0I7SUFDbEYsMkNBQTJDO0lBQzNDLFlBQVksTUFBaUIsRUFBRSxJQUFzQjtRQUNqRCxLQUFLLENBQUMsTUFBTSxFQUFFLElBQUksQ0FBQyxDQUFDO0lBQ3hCLENBQUM7SUFFRDs7Ozs7Ozs7O09BU0c7SUFDSCxtQkFBbUIsQ0FBa0IsSUFBdUI7UUFDeEQsSUFBSSxDQUFDLE9BQU8sQ0FBQyxJQUFJLENBQUMsQ0FBQztRQUNuQixNQUFNLENBQUMsSUFBSSxXQUFXLENBQVMsSUFBSSxDQUFDLFNBQVMsRUFBRSxFQUFFLElBQUksQ0FBQyxDQUFDO0lBQzNELENBQUM7SUFFRDs7Ozs7Ozs7O09BU0c7SUFDSCx1QkFBdUIsQ0FBa0IsSUFBcUM7UUFDMUUsSUFBSSxDQUFDLE9BQU8sQ0FBQyxJQUFJLENBQUMsQ0FBQztRQUNuQixNQUFNLENBQUMsSUFBSSxxQkFBcUIsQ0FBUyxJQUFJLENBQUMsU0FBUyxFQUFFLEVBQUUsSUFBSSxDQUFDLENBQUM7SUFDckUsQ0FBQztJQUVELEVBQUU7SUFDRiwyQ0FBMkM7SUFDM0MsRUFBRTtJQUVGOztPQUVHO0lBQ0gsR0FBRyxDQUFrQixNQUF3QztRQUN6RCxNQUFNLENBQUMsSUFBSSxDQUFDLG1CQUFtQixDQUFDLElBQUksY0FBTyxDQUFVLE1BQU0sQ0FBQyxDQUFDLENBQUM7SUFDbEUsQ0FBQztJQUVEOztPQUVHO0lBQ0gsT0FBTyxDQUFrQixNQUF3RDtRQUM3RSxNQUFNLENBQUMsSUFBSSxDQUFDLG1CQUFtQixDQUFDLElBQUksa0JBQVcsQ0FBVSxNQUFNLENBQUMsQ0FBQyxDQUFDO0lBQ3RFLENBQUM7SUFFRDs7T0FFRztJQUNILE1BQU0sQ0FBQyxPQUEwQjtRQUM3QixNQUFNLENBQUMsSUFBSSxDQUFDLG1CQUFtQixDQUFDLElBQUksaUJBQVUsQ0FBSyxPQUFPLENBQUMsQ0FBQyxDQUFDO0lBQ2pFLENBQUM7SUFFRDs7Ozs7Ozs7Ozs7Ozs7T0FjRztJQUNILE1BQU0sQ0FBZ0IsU0FBd0M7UUFDMUQsTUFBTSxDQUFDLElBQUksQ0FBQyxtQkFBbUIsQ0FBQyxJQUFJLGlCQUFVLENBQUssU0FBUyxDQUFDLENBQUMsQ0FBQztJQUNuRSxDQUFDO0lBRUQ7Ozs7Ozs7Ozs7Ozs7Ozs7OztPQWtCRztJQUNILFFBQVEsQ0FBQyxPQUF3QjtRQUM3QixNQUFNLEVBQUUsTUFBTSxFQUFFLEdBQUcsT0FBTyxDQUFDO1FBQzNCLGVBQWUsR0FBRyxJQUFJO1lBQ2xCLE1BQU0sTUFBTSxHQUFHLFNBQVMsQ0FBQyxHQUFHLENBQUMsR0FBRyxJQUFJLENBQUMsQ0FBQztZQUN0QyxNQUFNLENBQUMsQ0FBQyxDQUFDLEVBQUUsQ0FBQyxFQUFFLEVBQUU7Z0JBQ1osTUFBTSxDQUFDLE1BQU0sQ0FBQyxHQUFHLENBQUMsQ0FBQyxDQUFDLENBQUM7WUFDekIsQ0FBQyxDQUFDO1FBQ04sQ0FBQztRQUNELE1BQU0sQ0FBQyxJQUFJLENBQUMsbUJBQW1CLENBQzNCLElBQUksaUJBQVUsQ0FBSztZQUNmLEtBQUssRUFBRSxDQUFDO1lBQ1IsUUFBUSxDQUFDLEtBQUssRUFBRSxTQUFTO2dCQUNyQixNQUFNLFlBQVksR0FBRyxTQUFTLENBQUMsR0FBRyxDQUFDLENBQUMsQ0FBQyxDQUFDO2dCQUN0QyxNQUFNLFVBQVUsR0FBRyxZQUFZLENBQUMsTUFBTSxFQUFFLENBQUM7Z0JBQ3pDLE1BQU0sZ0JBQWdCLEdBQUcsQ0FBQyxDQUFDLENBQUMsTUFBTSxDQUFDLEtBQUssQ0FBQztvQkFDckMsQ0FBQyxDQUFDLEtBQUs7b0JBQ1AsQ0FBQyxDQUFDLGFBQUssQ0FBQyxVQUFVLEVBQUUsU0FBUyxDQUFDLEdBQUcsQ0FBQyxFQUFFLENBQUMsQ0FBQyxDQUFDO2dCQUMzQyxNQUFNLFlBQVksR0FBRyxZQUFZLENBQUMsT0FBTyxFQUFFLENBQUMsTUFBTSxDQUFDLEtBQUssQ0FBQyxNQUFNLENBQUMsQ0FBQyxDQUFDO2dCQUNsRSxNQUFNLENBQUMsYUFBSyxDQUFDLFVBQVUsRUFBRSxnQkFBZ0IsQ0FBQyxPQUFPLEVBQUUsQ0FBQyxLQUFLLENBQUMsWUFBWSxDQUFDLENBQUMsQ0FBQztZQUM3RSxDQUFDO1NBQ0osQ0FBQyxDQUNMLENBQUM7SUFDTixDQUFDO0lBRUQ7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7O09BdUJHO0lBQ0gsSUFBSSxDQUFDLE9BQW9CO1FBQ3JCLE1BQU0sQ0FBQyxJQUFJLENBQUMsbUJBQW1CLENBQUMsSUFBSSxlQUFRLENBQUssT0FBTyxDQUFDLENBQUMsQ0FBQztJQUMvRCxDQUFDO0lBRUQ7Ozs7Ozs7Ozs7Ozs7Ozs7T0FnQkc7SUFDSCxLQUFLLENBQUMsT0FBeUI7UUFDM0IsTUFBTSxDQUFDLElBQUksQ0FBQyxtQkFBbUIsQ0FBQyxJQUFJLGdCQUFTLENBQUssT0FBTyxDQUFDLENBQUMsQ0FBQztJQUNoRSxDQUFDO0lBRUQ7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7OztPQXFCRztJQUNILElBQUksQ0FBQyxPQUFvQjtRQUNyQixNQUFNLENBQUMsSUFBSSxDQUFDLG1CQUFtQixDQUFDLElBQUksZUFBUSxDQUFLLE9BQU8sQ0FBQyxDQUFDLENBQUM7SUFDL0QsQ0FBQztJQUVEOzs7Ozs7Ozs7Ozs7O09BYUc7SUFDSCxNQUFNLENBQUMsT0FBc0I7UUFDekIsTUFBTSxDQUFDLElBQUksQ0FBQyxtQkFBbUIsQ0FBQyxJQUFJLGlCQUFVLENBQUssT0FBTyxDQUFDLENBQUMsQ0FBQztJQUNqRSxDQUFDO0lBRUQ7Ozs7Ozs7Ozs7Ozs7Ozs7OztPQWtCRztJQUNILFFBQVEsQ0FBQyxPQUF3QjtRQUM3QixNQUFNLENBQUMsSUFBSSxDQUFDLG1CQUFtQixDQUFDLElBQUksbUJBQVksQ0FBSyxPQUFPLENBQUMsQ0FBQyxDQUFDO0lBQ25FLENBQUM7SUFFRDs7Ozs7Ozs7Ozs7Ozs7Ozs7O09Ba0JHO0lBQ0gsTUFBTSxDQUFDLFFBQTJCO1FBQzlCLE1BQU0sQ0FBQyxJQUFJLENBQUMsbUJBQW1CLENBQUssSUFBSSxzQkFBZSxDQUFLLFFBQVEsQ0FBQyxDQUFDLENBQUM7SUFDM0UsQ0FBQztJQUVEOzs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7T0E4Qkc7SUFDSCxhQUFhLENBQUMsT0FBeUI7UUFDbkMsTUFBTSxDQUFDLElBQUksQ0FBQyx1QkFBdUIsQ0FBQyxJQUFJLHVCQUFnQixDQUFLLE9BQU8sQ0FBQyxDQUFDLENBQUM7SUFDM0UsQ0FBQztDQUNKO0FBeFNELGtDQXdTQztBQUVEOzs7Ozs7OztHQVFHO0FBQ0gsZ0RBQWdEO0FBQ2hELDJCQUFrRSxTQUFRLGVBQXNCO0lBQzVGLDJDQUEyQztJQUMzQyxZQUFZLE1BQWlCLEVBQUUsSUFBc0I7UUFDakQsS0FBSyxDQUFDLE1BQU0sRUFBRSxJQUFJLENBQUMsQ0FBQztJQUN4QixDQUFDO0lBRUQ7Ozs7OztPQU1HO0lBQ0gsdUNBQXVDLENBQWtCLElBQWlDO1FBQ3RGLElBQUksQ0FBQyxPQUFPLENBQUMsSUFBSSxDQUFDLENBQUM7UUFDbkIsTUFBTSxDQUFDLElBQUkscUJBQXFCLENBQVMsSUFBSSxDQUFDLFNBQVMsRUFBRSxFQUFFLElBQUksQ0FBQyxDQUFDO0lBQ3JFLENBQUM7SUFFRDs7Ozs7O09BTUc7SUFDSCw2QkFBNkIsQ0FBa0IsSUFBcUM7UUFDaEYsSUFBSSxDQUFDLE9BQU8sQ0FBQyxJQUFJLENBQUMsQ0FBQztRQUNuQixNQUFNLENBQUMsSUFBSSxXQUFXLENBQVMsSUFBSSxDQUFDLFNBQVMsRUFBRSxFQUFFLElBQUksQ0FBQyxDQUFDO0lBQzNELENBQUM7SUFFRDs7Ozs7Ozs7Ozs7Ozs7O09BZUc7SUFDSCxNQUFNLENBQUMsUUFBcUM7UUFDeEMsTUFBTSxDQUFDLElBQUksQ0FBQyx1Q0FBdUMsQ0FDL0MsSUFBSSxnQ0FBeUIsQ0FBSyxRQUFRLENBQUMsQ0FDOUMsQ0FBQztJQUNOLENBQUM7SUFFRDs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7OztPQStCRztJQUNILFNBQVMsQ0FBQyxJQUF5QjtRQUMvQixNQUFNLENBQUMsSUFBSSxDQUFDLDZCQUE2QixDQUFRLElBQUksc0JBQWUsQ0FBSyxJQUFJLENBQUMsQ0FBQyxDQUFDO0lBQ3BGLENBQUM7Q0FDSjtBQXZGRCxzREF1RkM7QUFvQkQsRUFBRTtBQUNGLGdCQUFnQjtBQUNoQixFQUFFO0FBRUY7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7Ozs7O0dBMkpHO0FBQ0gsZ0RBQWdEO0FBQ2hEO0lBT0ksWUFBWSxRQUFvQjtRQUM1QixJQUFJLENBQUMsSUFBSSxHQUFHLElBQUksQ0FBQztJQUNyQixDQUFDO0lBRUQ7OztPQUdHO0lBQ0gsT0FBTyxDQUFDLElBQXNCO1FBQzFCLElBQUksQ0FBQyxJQUFJLEdBQUcsSUFBSSxDQUFDO0lBQ3JCLENBQUM7SUFFRDs7O09BR0c7SUFDSCxPQUFPO1FBQ0gsTUFBTSxDQUFDLElBQUksQ0FBQyxJQUFJLENBQUM7SUFDckIsQ0FBQztJQUVEOztPQUVHO0lBQ0gsUUFBUSxDQUFnQixDQUFXO1FBQy9CLEVBQUUsQ0FBQyxDQUFDLElBQUksQ0FBQyxJQUFJLENBQUMsQ0FBQyxDQUFDO1lBQ1osSUFBSSxDQUFDLElBQUksQ0FBQyxHQUFHLENBQUMsQ0FBQyxDQUFDLENBQUM7UUFDckIsQ0FBQztJQUNMLENBQUM7Q0FDSjtBQW5DRCx3QkFtQ0M7QUFFRDs7OztHQUlHO0FBQ0g7SUFDSSxNQUFNLENBQUMsR0FBRyxJQUFJLE1BQU0sRUFBSyxDQUFDO0lBQzFCLE1BQU0sQ0FBQyxHQUFHLElBQUkscUJBQWMsRUFBSyxDQUFDO0lBQ2xDLE1BQU0sQ0FBQyxJQUFJLFdBQVcsQ0FBTyxDQUFDLEVBQUUsQ0FBQyxDQUFDLENBQUM7QUFDdkMsQ0FBQztBQUU4QixvQ0FBTSJ9

@@ -228,2 +228,14 @@ import * as Immutable from "immutable";

/**
* A callback function which is passed an `Event`
*/
export declare type EventCallback<T extends Key> = (event: Event<T>) => void;
/**
* A callback function which is passed a `Collection` and associated `key`
*/
export declare type KeyedCollectionCallback<T extends Key> = (collection: Collection<T>, key: string) => void;
/**
* A tuple of string key and associated `Collection`
*/
export declare type KeyedCollection<T extends Key> = [string, Collection<T>];
/**
* Tuple mapping a string -> `ReducerFunction`

@@ -230,0 +242,0 @@ * e.g. `["value", avg()]`

@@ -8,4 +8,6 @@ import * as Immutable from "immutable";

import { SortedCollection } from "./sortedcollection";
import { KeyedCollection } from "./stream";
import { AggregationSpec, WindowingOptions } from "./types";
import { AggregationSpec, KeyedCollection, WindowingOptions } from "./types";
/**
* A map of `SortedCollection`s indexed by a string key representing a window.
*/
export declare class WindowedCollection<T extends Key> extends Base {

@@ -41,3 +43,3 @@ protected collections: Immutable.Map<string, SortedCollection<T>>;

/**
* Fetch the SortedCollection of events contained in the windowed grouping
* Fetch the `SortedCollection` of `Event`s contained in the windowed grouping
*/

@@ -44,0 +46,0 @@ get(key: string): SortedCollection<T>;

@@ -22,2 +22,5 @@ "use strict";

const types_1 = require("./types");
/**
* A map of `SortedCollection`s indexed by a string key representing a window.
*/
class WindowedCollection extends base_1.Base {

@@ -64,3 +67,2 @@ constructor(arg1, arg2, arg3) {

if (collection) {
// TODO: do we use this code path?
throw new Error("Unimplemented");

@@ -75,3 +77,3 @@ }

/**
* Fetch the SortedCollection of events contained in the windowed grouping
* Fetch the `SortedCollection` of `Event`s contained in the windowed grouping
*/

@@ -205,2 +207,2 @@ get(key) {

exports.windowed = windowFactory;
//# sourceMappingURL=data:application/json;base64,
//# sourceMappingURL=data:application/json;base64,
{
"name": "pondjs",
"version": "1.0.0-alpha.2",
"version": "1.0.0-alpha.3",
"description": "A TimeSeries library built on Immutable.js with Typescript",

@@ -5,0 +5,0 @@ "main": "lib/exports.js",

@@ -64,5 +64,3 @@ declare const describe: any;

.rate({ fieldSpec: "value", allowNegative: false })
.output(e => {
result.push(e);
});
.output(e => result.push(e));

@@ -94,3 +92,3 @@ list.forEach(e => {

let calls = 0;
const source = stream()
const source = stream<Time>()
.groupByWindow({

@@ -101,3 +99,3 @@ window: everyThirtyMinutes,

.output((c, key) => {
result[key] = c as Collection<Time>;
result[key] = c;
calls += 1;

@@ -134,4 +132,3 @@ });

})
.output(evt => {
const e = evt as Event<Index>;
.output(e => {
result[e.getKey().toString()] = e;

@@ -198,6 +195,3 @@ calls += 1;

.map(e => event(e.getKey(), Immutable.Map({ a: e.get("a") * 2 })))
.output(evt => {
const e = evt as Event<Time>;
result.push(e);
});
.output(e => result.push(e));

@@ -210,2 +204,3 @@ eventsIn.forEach(e => source.addEvent(e));

});
it("can do streaming event flatmap", () => {

@@ -231,6 +226,3 @@ const eventsIn = [

})
.output(evt => {
const e = evt as Event<Time>;
result.push(e);
});
.output(e => result.push(e));

@@ -249,2 +241,24 @@ eventsIn.forEach(e => source.addEvent(e));

it("can do filtering on a stream of events", () => {
const eventsIn = [
event(time(Date.UTC(2015, 2, 14, 7, 31, 0)), Immutable.Map({ a: 1 })),
event(time(Date.UTC(2015, 2, 14, 7, 32, 0)), Immutable.Map({ a: 2 })),
event(time(Date.UTC(2015, 2, 14, 7, 33, 0)), Immutable.Map({ a: 3 })),
event(time(Date.UTC(2015, 2, 14, 7, 34, 0)), Immutable.Map({ a: 4 })),
event(time(Date.UTC(2015, 2, 14, 7, 35, 0)), Immutable.Map({ a: 5 }))
];
const result: Event[] = [];
const source = stream<Time>()
.filter(e => e.get("a") % 2 !== 0)
.output(e => result.push(e));
eventsIn.forEach(e => source.addEvent(e));
expect(result[0].get("a")).toEqual(1);
expect(result[1].get("a")).toEqual(3);
expect(result[2].get("a")).toEqual(5);
});
it("can selection of specific event fields", () => {

@@ -263,5 +277,3 @@ const DATA = [[1471824030000, 1, 2, 3], [1471824105000, 4, 5, 6], [1471824210000, 7, 8, 9]];

})
.output(e => {
result.push(e);
});
.output(e => result.push(e));

@@ -381,3 +393,3 @@ list.forEach(e => {

})
.output((e: Event) => results.push(e));
.output(e => results.push(e));

@@ -406,3 +418,3 @@ // Stream events

})
.output((e: Event) => results.push(e));
.output(e => results.push(e));

@@ -417,2 +429,35 @@ // Stream events

it("can do a split of two streams", () => {
const eventsIn = [
event(time(Date.UTC(2015, 2, 14, 7, 57, 0)), Immutable.Map({ a: 1 })),
event(time(Date.UTC(2015, 2, 14, 7, 58, 0)), Immutable.Map({ a: 2 })),
event(time(Date.UTC(2015, 2, 14, 7, 59, 0)), Immutable.Map({ a: 3 }))
];
const result1: Event[] = [];
const result2: Event[] = [];
const source = stream<Time>().map(e =>
event(e.getKey(), Immutable.Map({ a: e.get("a") * 2 }))
); // 2, 4, 6
const branch1 = source
.map(e => event(e.getKey(), Immutable.Map({ a: e.get("a") * 3 }))) // 6, 12, 18
.output(e => result1.push(e));
const branch2 = source
.map(e => event(e.getKey(), Immutable.Map({ a: e.get("a") * 4 }))) // 8, 16, 24
.output(e => result2.push(e));
eventsIn.forEach(e => source.addEvent(e));
expect(result1[0].get("a")).toBe(6);
expect(result1[1].get("a")).toBe(12);
expect(result1[2].get("a")).toBe(18);
expect(result2[0].get("a")).toBe(8);
expect(result2[1].get("a")).toBe(16);
expect(result2[2].get("a")).toBe(24);
});
it("can coalese two streams", () => {

@@ -419,0 +464,0 @@ const results = [];