Socket
Socket
Sign inDemoInstall

futoin-eventstream

Package Overview
Dependencies
185
Maintainers
1
Versions
34
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

    futoin-eventstream

Neutral event stream interface


Version published
Weekly downloads
1
Maintainers
1
Install size
4.07 MB
Created
Weekly downloads
 

Readme

Source

NPM Version NPM Downloads Build Status stable

NPM

About

FutoIn EventStream is fundamental part for efficient global state update in distributed systems. It is used for reliable event delivery, decentralized cache invalidation, efficient online segregation of active and warehouse data.

Unlike various message/event brokers, the focus of FutoIn Event Stream is integration with database transactions for reliable efficient event recording and delivery.

The design is not focused on high throughput as the primary reason is reliable association of events with database changes. Please consider using pure message brokers for throughput-intensive cases.

Documentation --> FutoIn Guide

Reference implementation of:

FTN18: FutoIn Interface - Event Stream
Version: 1.1

Features

  • Database transaction-bound event generation.
  • Standalone event generation.
  • Event polling for simple, but less efficient solutions.
  • Event pushing for advanced efficient cases.

Supported database types

  • MySQL
  • PostgreSQL
  • SQLite
  • Potentially, any other SQL-compliant supported by futoin-database

Installation for Node.js

Command line:

$ yarn add futoin-eventstream 

or

$ npm install futoin-eventstream --save

Concept

Each event has auto-generated ID, type, data and timestamp. Type is all upper case identifier. Data is arbitrary JSON-friendly data.

Two configurable delivery strategies are supported: polling and streaming, but consumer acts as client in both cases.

There are two delivery modes: reliable and live. The later allow messages to be skipped. To ensure that events are reliably delivered, each consumer must register first.

Two message storage types are assumed: active small high performance area and slower data warehouse for all time history. DBEventArchiver tool is provided for efficient reliable data transfer.

More detailed concept is in the FTN18 spec.

Examples

1. Adding standalone events

GenFace.register(as, ccm, 'evtgen', endpoint );
// ...
const evtgen = ccm.iface('evtgen');
evtgen.addEvent(as, 'NULL_EVENT');
evtgen.addEvent(as, 'INT_EVENT', 123);
evtgen.addEvent(as, 'STR_EVENT', 'Some Str');
evtgen.addEvent(as, 'ARRAY_EVENT', [1, 2, 3]);
evtgen.addEvent(as, 'OBJECT_EVENT', { a: 1, b: 2, c: 3 });

2. Adding events in database transaction

For more advanced cases, you can check source code of DBGenFace#addXferEvent() to build more tailored statements.

DBGenFace.register(as, ccm, 'evtgen', endpoint );
// ...
const evtgen = ccm.iface('evtgen');
const db = ccm.db();

const xfer = db.newXfer();
xfer.insert('SomeTable').set('name', 'Name');
evtgen.addXferEvent(xfer, 'NEW_ENTRY', {name: 'Name'});
xfer.execute(as);

3. Poll for events with different components & filters

Each consumer is identifier by credentials + arbitrary component name.

As special "LIVE" component can be used for unreliable delivery.

PollFace.register(as, ccm, 'evtpoll', endpoint );
// ...
const evtpoll = ccm.iface('evtpoll');

// User info polling
evtpoll.registerConsumer(as, 'Security');
evtpoll.pollEvents(as, 'Security', last_known_id_here, ['USR_ADD', 'USR_MOD', 'USR_LOGIN']);
as.add((as, events) => {
    // ....
});

// Anti-Fraud processing
evtpoll.registerConsumer(as, 'AntiFraud');
evtpoll.pollEvents(as, 'AntiFraud', last_known_id_here, ['DEPOSIT', 'WITHDRAW', 'XFER']);
as.add((as, events) => {
    // ....
});

4. Event push with different components & filters

A child class of ReliableReceiverService should be created to properly handle incoming events.

A bi-directional channel like WebSockets or Internal must be used.

A separate Executor instance should be created for use in endpoint callbacks.

class UserReceiver extends ReliableReceiverService
{
    _onEvents( as, reqinfo, events ) {
        // ...
    }
}

const executor = Executor(ccm);
PollFace.register(as, ccm, 'evtpushsec', endpoint, credentials, { executor } );
UserReceiver.register(as, executor);

const evtpushsec = ccm.iface('evtpushsec');
evtpushsec.registerConsumer(as, 'Security');
evtpushsec.readyToReceive(as, 'Security', ['USR_ADD', 'USR_MOD', 'USR_LOGIN']);

5. Event history transfer

There should be a single system-wide instance of DBEventArchiver tool. The tool will automatically reconnect on errors. Processing state can be monitored through events.

DBAutoConfig(as, ccm, {
    evtdwh: {}
});
const archiver = new DBEventArchiver(ccm);

archiver.on('workerError', () => { ... });
archiver.on('receiverError', () => { ... });
archiver.on('newEvents', () => { ... });

// keep going until stopped
archiver.start(push_endpoint, credentials);

// to stop - automatically called on ccm.close()
archiver.stop();

6. Discarding event from active database

For performance and disaster recovery time reasons, operation critical database should be kept as small as possible. Events delivered to all consumers including DBEventArchiver can be removed the following way.

DBAutoConfig(as, ccm, {
    evt: {}
});
const discarder = new DBEventDiscarder(ccm);

archiver.on('workerError', () => { ... });
archiver.on('eventDiscard', () => { ... });

// keep going until stopped
discarder.start(ccm);

// to stop - automatically called on ccm.close()
discarder.stop();

7. Complete example

DBPushService inherits DBPollService, so there is no need to instance both.

This case show internal communicaton channel.

const ccm = new AdvancedCCM();
DBAutoConfig(as, ccm, {
    evt: {}
});
const executor = new Executor(ccm); // or NodeExecutor() for WebSockets
DBGenService.register(as, executor);
DBPushService.register(as, executor);

GenFace.register(as, ccm, 'evtgen', executor);
PollFace.register(as, ccm, 'evtpoll', executor);

const p = as.parallel();

p.loop( (as) => {
    ccm.iface('evtgen').addEvent(as, 'EVT', 'data');
});

p.add( (as) => {
    let last_id = null;
    
    as.loop( (as) => {
        ccm.iface('evtpoll').pollEvents(as, 'LIVE', last_id);
        
        as.add((as, events) => {
            if (events.length) {
                last_id = events[events.length - 1].id;
            } else {
                const timer = setTimeout( () => as.success(), 1e3 );
                as.setCancel((as) => clearTimeout(timer));
            }
        });
    });
});

API documentation

The concept is described in FutoIn specification: FTN18: FutoIn Interface - Event Stream v1.x

Classes

DBEventArchiver

Database Event Archiver service.

DBEventDiscarder

DB-specific event discarding.

It's assumed to be run against "active" database part as defined in the concept to reduce its size after all reliably delivered events are delivered to consumers.

Event are deleted based on limit_at_once to avoid too large transactions which may affect performance of realtime processes and break some DB clusters like Galera.

DBGenFace

GenFace for DB backend.

The only difference to original GenFace is native DB-specific API.

DBGenService

Database-specific event generation service

DBPollService

Database-based Poll Service

DBPushService

Database-specific Push Service

DBServiceApp

All-in-one DB EventStream initialization

GenFace

Event Stream - Generator Face

GenService

Event Stream - Generator Service Base

LiveReceiver

Reliable Event Receiver helper to minimize boilerplate code in projects.

PollFace

Event Stream - Poll Face

PollService

Event Stream - Poll Service Base

PushFace

Event Stream - Push Face

PushService

Event Stream - Push Service Base

ReceiverFace

Event Stream - Receiver Face

ReceiverService

Base implementation for receiver side

ReliableReceiver

Reliable Event Receiver helper to minimize boilerplate code in projects.

ReliableReceiverService

Base implementation for reliable receiver side.

DBEventArchiver

Database Event Archiver service.

Kind: global class
Note: No more than one instance should run at once.

new DBEventArchiver(db_ccm)

C-tor

ParamTypeDescription
db_ccmAdvancedCCMCCM instance with registered '#db.evtdwh' interface

DBEventDiscarder

DB-specific event discarding.

It's assumed to be run against "active" database part as defined in the concept to reduce its size after all reliably delivered events are delivered to consumers.

Event are deleted based on limit_at_once to avoid too large transactions which may affect performance of realtime processes and break some DB clusters like Galera.

Kind: global class

dbEventDiscarder.start(ccm, [options])

Start event discarding

Kind: instance method of DBEventDiscarder

ParamTypeDefaultDescription
ccmAdvancedCCMCCM with registered #db.evt interface
[options]object{}options
[options.poll_period_ms]integer600e3poll interval
[options.limit_at_once]integer1000events to delete at once
[options.event_table]string"default"events table
[options.consumer_table]string"default"consumers table

dbEventDiscarder.stop()

Stop event discarding

Kind: instance method of DBEventDiscarder

"workerError"

Emitted on worker errors

Kind: event emitted by DBEventDiscarder

"eventDiscard"

Emitted on discarded events

Kind: event emitted by DBEventDiscarder

DBGenFace

GenFace for DB backend.

The only difference to original GenFace is native DB-specific API.

Kind: global class

dbGenFace.DB_EVENT_TABLE ⇒ string

Easy access to DB event table name

Kind: instance property of DBGenFace
Returns: string - raw table name

dbGenFace.addXferEvent(xb, type, data, [table])

Helper to add event generation into DB transaction

Kind: instance method of DBGenFace

ParamTypeDefaultDescription
xbXferBuilderinstance of transaction builder
typestringevent type
data*any data
[table]string"evt_queue"event queue

DBGenService

Database-specific event generation service

Kind: global class

new DBGenService(_as, executor, [options])

Please use DBGenService.regster()

ParamTypeDefaultDescription
_asAsyncStepsasync step interface
executorExecutorrelated Executor
[options]object{}options
[options.event_table]string"default"events table

DBPollService

Database-based Poll Service

Kind: global class

new DBPollService(as, executor, [options])

Please use DBPollService,register()

ParamTypeDefaultDescription
asAsyncStepsasync step interface
executorExecutorrelated Executor
[options]object{}options
[options.event_table]string"default"events table
[options.consumer_table]string"default"consumers table

DBPushService

Database-specific Push Service

Kind: global class

new DBPushService(as, executor, [options])

Please use DBPushService,register()

ParamTypeDefaultDescription
asAsyncStepsasync step interface
executorExecutorrelated Executor
[options]object{}options
[options.event_table]string"default"events table
[options.consumer_table]string"default"consumers table
[options.sleep_min]integer100minimal sleep on lack of events
[options.sleep_max]integer3000maximal sleep on lack of events
[options.sleep_step]integer100sleep time increase on lack of events

DBServiceApp

All-in-one DB EventStream initialization

Kind: global class

new DBServiceApp(as, options)

C-tor

ParamTypeDefaultDescription
asAsyncStepsAsyncSteps interface
optionsobject{}options
[options.ccm]AdvancedCCMexternal CCM instance
[options.executor]Executorexternal private executor instance
[options.ccmOptions]objectauto-CCM options
[options.notExpectedHandler]callable'notExpected' error handler
[options.executorOptions]objectprivate auto-Executor options
[options.evtOptions]objecteventstream options
[options.discarderOptions]objectdiscarder options
[options.enableDiscarder]booleanenable discarder, if true
[options.archiverOptions]objectdiscarder options
[options.enableArchiver]booleanenable archiver, if true

dbServiceApp.ccm() ⇒ AdvancedCCM

CCM instance accessor

Kind: instance method of DBServiceApp
Returns: AdvancedCCM - instance

dbServiceApp.executor() ⇒ Executor

Executor instance accessor

Kind: instance method of DBServiceApp
Returns: Executor - instance

dbServiceApp.close([done])

Shutdown of app and related instances

Kind: instance method of DBServiceApp

ParamTypeDefaultDescription
[done]callabledone callback

GenFace

Event Stream - Generator Face

Kind: global class

GenFace.LATEST_VERSION

Latest supported FTN17 version

Kind: static property of GenFace

GenFace.PING_VERSION

Latest supported FTN4 version

Kind: static property of GenFace

GenFace.register(as, ccm, name, endpoint, [credentials], [options])

CCM registration helper

Kind: static method of GenFace

ParamTypeDefaultDescription
asAsyncStepssteps interface
ccmAdvancedCCMCCM instance
namestringCCM registration name
endpoint*see AdvancedCCM#register
[credentials]*see AdvancedCCM#register
[options]object{}interface options
[options.version]string"1.0"interface version to use

GenService

Event Stream - Generator Service Base

Kind: global class

GenService.register(as, executor, options) ⇒ GenService

Register futoin.evt.gen interface with Executor

Kind: static method of GenService
Returns: GenService - instance

ParamTypeDescription
asAsyncStepssteps interface
executorExecutorexecutor instance
optionsobjectimplementation defined options

LiveReceiver

Reliable Event Receiver helper to minimize boilerplate code in projects.

Kind: global class

new LiveReceiver(executor_ccm)

Initialize event archiver.

ParamTypeDescription
executor_ccmAdvancedCCMCCM for executor

liveReceiver.start(endpoint, [credentials], [options])

Start receiving events for archiving

Kind: instance method of LiveReceiver
Note: options.executor is overridden

ParamTypeDefaultDescription
endpoint*see PushFace
[credentials]*see PushFace
[options]*{}see PushFace
[options.component]stringcomponent name
[options.want]array"want" parameter for event filtering

liveReceiver.stop()

Stop receiving events

Kind: instance method of LiveReceiver

liveReceiver._registerReceiver(as, executor, options) ⇒ ReceiverService

Override to register custom instance of ReceiverService.

Kind: instance method of LiveReceiver
Returns: ReceiverService - instance of service

ParamTypeDescription
asAsyncStepsasync steps interface
executorExecutorInternal Executor instance
optionsobjectpassed options

liveReceiver._onEvents(as, events)

Override to catch new events here instead of using newEvents event handler.

Kind: instance method of LiveReceiver

ParamTypeDescription
asAsyncStepsasync steps interface
eventsarrayarray of events

"receiverError"

Emitted on not expected receiver errors

Kind: event emitted by LiveReceiver

"workerError"

Emitted on worker errors

Kind: event emitted by LiveReceiver

"newEvents"

Emitted on new events

Kind: event emitted by LiveReceiver

"ready"

Emitted after event receiver is ready

Kind: event emitted by LiveReceiver

PollFace

Event Stream - Poll Face

Kind: global class

PollFace.LATEST_VERSION

Latest supported FTN17 version

Kind: static property of PollFace

PollFace.PING_VERSION

Latest supported FTN4 version

Kind: static property of PollFace

PollFace.register(as, ccm, name, endpoint, [credentials], [options])

CCM registration helper

Kind: static method of PollFace

ParamTypeDefaultDescription
asAsyncStepssteps interface
ccmAdvancedCCMCCM instance
namestringCCM registration name
endpoint*see AdvancedCCM#register
[credentials]*see AdvancedCCM#register
[options]object{}interface options
[options.version]string"1.0"interface version to use

PollService

Event Stream - Poll Service Base

Kind: global class

PollService.register(as, executor, options) ⇒ PollService

Register futoin.evt.poll interface with Executor

Kind: static method of PollService
Returns: PollService - instance
Note: Chunk event count is lower then protocol permits by default as there is a typical amount 64K futoin message limit.

ParamTypeDefaultDescription
asAsyncStepssteps interface
executorExecutorexecutor instance
optionsobjectimplementation defined options
[options.allow_reliable]booleantrueallow reliable consumers
[options.allow_polling]booleantrueallow polling calls
[options.max_chunk_events]integer100maxium events per request

PushFace

Event Stream - Push Face

Kind: global class

PushFace.register(as, ccm, name, endpoint, [credentials], [options])

CCM registration helper

Kind: static method of PushFace

ParamTypeDefaultDescription
asAsyncStepssteps interface
ccmAdvancedCCMCCM instance
namestringCCM registration name
endpoint*see AdvancedCCM#register
[credentials]*see AdvancedCCM#register
[options]object{}interface options
[options.version]string"1.0"interface version to use

PushService

Event Stream - Push Service Base

Kind: global class

"pushError"

Emitted in push error handlers

Kind: event emitted by PushService

"queueOverflow"

Emitted in push error handlers

Kind: event emitted by PushService

PushService.register(as, executor, options) ⇒ PushService

Register futoin.evt.push interface with Executor

Kind: static method of PushService
Returns: PushService - instance

ParamTypeDefaultDescription
asAsyncStepssteps interface
executorExecutorexecutor instance
optionsobjectimplementation defined options
[options.allow_reliable]booleantrueallow reliable consumers
[options.allow_polling]booleantrueallow polling calls

ReceiverFace

Event Stream - Receiver Face

Kind: global class

ReceiverFace.LATEST_VERSION

Latest supported FTN17 version

Kind: static property of ReceiverFace

ReceiverFace.register(as, channel, [options]) ⇒ string

CCM registration helper

Kind: static method of ReceiverFace
Returns: string - - iface:ver of registered interface

ParamTypeDefaultDescription
asAsyncStepssteps interface
channelChannelContextBi-Direction channel instance
[options]object{}interface options
[options.version]string"1.0"interface version to use

ReceiverService

Base implementation for receiver side

Kind: global class

receiverService._onEvents(as, reqinfo, events)

Member to override to handle vents.

Kind: instance method of ReceiverService

ParamTypeDescription
asAsyncStepsAsyncSteps interface
reqinfoRequestInforequest info object
eventsarraylist of events

ReceiverService.register(as, executor, options) ⇒ PushService

Register futoin.evt.receiver interface with Executor

Kind: static method of ReceiverService
Returns: PushService - instance

ParamTypeDescription
asAsyncStepssteps interface
executorExecutorexecutor instance
optionsobjectimplementation defined options

ReliableReceiver

Reliable Event Receiver helper to minimize boilerplate code in projects.

Kind: global class

reliableReceiver._registerReceiver(as, executor, options) ⇒ ReliableReceiverService

Override to register custom instance of ReliableReceiverService.

Kind: instance method of ReliableReceiver
Returns: ReliableReceiverService - instance of service

ParamTypeDescription
asAsyncStepsasync steps interface
executorExecutorInternal Executor instance
optionsobjectpassed options

"processedEvents"

Emitted for count of archived events in each iteration.

Kind: event emitted by ReliableReceiver

ReliableReceiverService

Base implementation for reliable receiver side.

Kind: global class
Note: Unlike ReceiverService, it restores proper order of events.

documented by jsdoc-to-markdown.

Keywords

FAQs

Last updated on 27 Mar 2023

Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc