
Security News
Attackers Are Hunting High-Impact Node.js Maintainers in a Coordinated Social Engineering Campaign
Multiple high-impact npm maintainers confirm they have been targeted in the same social engineering campaign that compromised Axios.
Lightweight pub/sub module with event backlog, timeout support that maps events to promises and back as well as chains them to other topics and flows.

Lightweight pub/sub module with event backlog, timeout support that maps events to promises and back as well as chains them to other topics. Use it to structure application business logic.
The main reason for creation of this module is to allow decoupling business logic into smaller isolated standalone components in the application via pub/sub API and/or common context that leads to a simpler/isolated unit testing with easy mocking of input data and easy composition of components into higher async structures.
The context based approach allows a developer to slice the business logic into small, isolated busness actions that communicate to each other via context and that encourages developers to use the same action interface across application code, which makes the code more predictable, easy to define and test with mock data and yes, boring.
This approach is based on pub/sub interface. It accumulates events in the backlog for new subscribers. This has pros and cons:
The module provides a way to slice the code into isolated, small and independent actions that can later be used to compose much more complex applications with dependency injection via context object.

npm install oja -S
The flow is generic definition of the control flow.
Flow([baseFlow]) creates a flow object
consume(topics[, callback]) adds a consumer to the flow for the given topic or topics
consumeStream(topic[, callback]) returns a readable stream of events for the given topic
getReader(topic) returns a reader for a series of events published under given topic.
undefineddefine(topics[, data|promise|function]) defines a producer for the given topic or an array of topics
on(topic, callback) adds a listener for the given topic.
once(topic, callback) adds a listener for the given topic that will be called for the fist event.
setMaxListeners(number) allows to increase a limit on how many listeners can be added to the given action, similar to event emitters.
catch(callback) adds an error handler to the flow. If no handlers are registered, it will re-throw the error.
timeout(topics, ms) sets a timeout for the topic or and array of topics. The timeout error will be generated if the topic(s) are not resolved within the specified period of time (in milliseconds)
state() returns a list of topics and their state (topics queue, pending topics)
Publisher API is a convenience object to publish events for the assigned topic
Action type is more specific to business unit that extends Flow further to allow defining executable independent business logic blocks called actions.
While flow starts executing immediately when one defines it, action is activate by calling execute method.
The actions can be composed into more complex actions. Once the action is activated, it will implicitly activate all children actions.
Actions cannot be added after they have been started.
Action() is an action constructor
add(action) adds actions to the main action
execute() is a method called by framework during activation; the action logic should be put into this method
activate() starts the action and all its children. This method better not be overridden or one needs to make sure a base function called.
While pub/sub is based on knowledge of topics consumed and published, context based approach provides a more explicit way of what kind actions are available in application.
First we would like you to focus more on how you can apply this module to simplify your business logic with the use of Action type, then you will see Context based examples and then more generic examples on how generic Flow type can be used and applied to the action type as it extends Flow.
const Action = require('oja').Action;
new Action().activate();
const Action = require('oja').Action;
class MyAction extends Action {
execute() {
this.define('foo', 'bar');
}
}
new MyAction()
.activate()
.consume('foo', data => console.log(data)); // will print bar
const Action = require('oja').Action;
class Greet extends Action {
execute() {
this.define('greet', 'Hello');
}
}
// demonstrate generic function instead of action object
function who(flow) {
// demonstrate re-mapping
flow.consume('name', name => {
flow.define('who', name);
});
}
class Greeting extends Action {
execute() {
this.consume(['greet', 'who'], data => {
this.define('greeting', `${data.greet} ${data.who}`)
});
}
}
const helloAction = new Greeting();
helloAction
.add(new Hello())
.add(who)
.activate()
.define('name', 'World')
.consume('greeting', console.log); // prints Hello World
modules.exports = async context => {
// action logic here
};
?/ or with parameters separation
modules.exports = context => (arg1, arg2, ...) => {
// action logic here
};
Calling action within other action
modules.exports = context => (keyword, ...) => {
// we can call other actions via context
// assuming we define actions domain (see bellow for more examples)
const results = await context.actions.find(context.keyword);
return {
keyword: context.keyword,
searchResults; results
}
};
Calling the above action, assuming it is called search under domain 'actions'
modules.exports = async context => {
const searchResults = await context.actions.search('foo'); // passing some parameters
};
There are two main components:
context.<property name> access patterncontext.<domain>.<action name>const createContext = require('oja/context');
// inject/configure context
const context = createContext({
// injecting properteis
properties: {
parameters: {
foo: 'foov',
bar: 'barv
}
},
// injection actions
functions: {
domainName1: {
actionName1: context => {}
actionName2: context => {}
},
domainName2: {
actionName3: context => {}
actionName4: context => {}
}
}
});
// use it
console.log(context.foo); // >> foov
console.log(context.bar); // >> barv
// call action
const actionResult1 = await context.domainName1.actionName1();
const actionResult3 = await context.domainName2.actionName3();
Context extends Flow API, hence it allows to mix pub/sub with context based approach
modules.exports = async context => {
const searchResults = await context.actions.search('foo'); // passing some parameters
// publish it
context.define('searchResults', searchResults);
};
// consume it in some other action
modules.exports = async context => {
const searchResults = await context.consume('searchResults');
return {
searchResults
};
};
The usage examples are generic and look more like generic event pub/sub mode.
It can be used to create more specific controls like Action mentioned above.
const Flow = require('oja').Flow;
const flow = new Flow();
// create consumer component
const consumer = flow.consume('foo');
consumer.then(foo => {
console.log(foo); // prints 'bar'
});
// define producer component
const producer = flow.define('foo');
// publish
producer.pub('bar');
// create consumer component
flow
.consume('foo', foo => {
console.log(foo); // prints 'bar'
})
.define('foo', 'bar');
// create consumer component
flow
.consume('foo', foo => {
console.log(foo); // prints 'bar1' and 'bar2'
})
// generate events
.define('foo', 'bar1')
.define('foo', 'bar2');
const buffer = [];
// create consumer stream
const stream = flow.consumeStream('foo');
stream.on('data', data => buffer.push(data));
stream.on('end', () => {
console.log(buffer); // prints one, two, three
})
// generate some data
flow.define('foo', 'one');
flow.define('foo', 'two');
flow.define('foo', 'three');
flow.define('foo', null);
const Flow = require('.').Flow;
const buffer = [];
const flow = new Flow();
// create consumer stream
const reader = flow.getReader('foo');
// generate some data
flow.define('foo', 'one');
flow.define('foo', 'two');
flow.define('foo', 'three');
flow.define('foo', null);
async function read() {
while(true) {
const data = await reader.next()
if (data === undefined) {
break;
}
console.log(data);
}
}
read();
// consume multiple topics
flow.consume(['foo', 'qoo'], input => {
console.log(input.foo); // prints faa
console.log(input.qoo); // prints qaa
});
flow.define('foo', 'faa');
flow.define('qoo', 'qaa');
// create consumer component
flow
.consume('foo', foo => {
console.log(foo); // prints 'bar'
})
.define('foo', new Promise(resolve => {
resolve('bar');
}));
// create consumer component
flow
.consume('foo', foo => {
console.log(foo); // prints 'bar'
})
.consume('foo', foo => {
console.log(foo); // prints 'bar'
})
.define('foo', 'bar');
// NOTE: the order of consume/define does not matter
// create consumer component
flow
.consume('foo', (foo, runtime) => {
console.log(foo); // prints 'faa'
runtime.define('qoo', 'qaa'); // can consume and produce new data
})
.consume('qoo', (qoo, runtime) => {
console.log(qoo); // prints 'qaa'
runtime.define('woo', Promise.resolve('waa')); // can use async promise
})
// start chain reaction here
.define('foo', 'faa')
// lets produce multiple events via event emitter
.consume('woo', (woo, runtime) => {
console.log(woo); // prints waa
// define as event emitter
const roo = runtime.define('roo');
// simulate async flow with two event emitted
setImmediate(() => {
// generate multiple events
roo.pub('raa1');
roo.pub('raa2');
});
})
// validate
.consume('roo', roo => {
console.log(roo); // prints raa1 and raa2
})
// consume multiple topics
.consume(['foo', 'qoo'], input => {
console.log(input.foo); // prints faa
console.log(input.qoo); // prints qaa
})
// can consume inside consume
.consume('foo', (foo, runtime) => {
console.log(foo); // prints faa
runtime.consume('qoo', qoo => {
console.log(input.qoo); // prints qaa
});
// or
flow.consume('qoo', qoo => {
console.log(input.qoo); // prints qaa
});
})
// can generate multiple events using pub
.define('doo', publisher => {
publisher.pub('daa1');
publisher.pub('daa2');
publisher.pub('daa3');
publisher.pub(null);
})
.consume('doo', doo => {
console.log(doo); // prints daa1, daa2, daa3, null
});
.consumeStream('doo', stream => {
stream.on('data', console.log) // prints daa1, daa2, daa3
stream.on('end', () => console.log('end of "doo"'));
})
// NOTE: we can consume first event via promise if we are not interested in the rest
.consume('doo').then(doo => {
console.log(doo); // prints daa1
});
// for debug you can listen to all events
flow.consume('*', evt => {
console.log(`Event type: ${evt.name}, data: ${evt.data}`);
})
const base = new Flow();
base.define('foo', 'bar')
const flow = new Flow(base);
flow.consume('foo', foo => {
console.log(foo); // prints bar
});
You can also make them depend on each other
const base = new Flow();
base.consume('shared', (_, rt) => {
rt.define('foo', 'bar');
});
const flow = new Flow(base);
flow.consume('foo', foo => {
console.log(foo); // prints bar
});
flow.define('shared', ''); // trigger the chain
The promise chain may be hard to figure out where it is blocked. Oja allows to set a timeout for the given topics and upon the timeout would provide an error message listing topics that have not been resolved yet.
flow
.consume('foo')
.consume('bar')
.consume('too')
.timeout(['foo', 'bar'], 300) // 300 ms
.define('bar', 'boo')
.catch(err => {
console.log(err.message); // prints "Topic/s (foo) timed out, pending topics (too)"
});
Oja provides a current status of topcis that have not been resolved
flow
.consume('foo')
.consume('bar')
.consume('too')
.define('bar', 'boo');
console.log(flow.state()); // prints [foo, too]
flow.define('error', new Error('Boom'));
// or
flow.define('error', Promise.reject(new Error('Boom')));
// or
flow.define('data', () => {
return new Error('Boom');
});
// or
flow.define('data', runtime => {
runtime.pub(new Error('Boom'));
});
flow.catch(err => {
console.log(err); // prints Boom if linked to the above flow
});
// Or
flow.consume('error', err => {
console.log(err); // prints Boom if linked to the above flow
});
The error will prevent further events including error events from publishing.
flow
.define('foo', 'faa')
.define('boo', 'baa')
.define('error', new Error('Boom'))
.define('too', 'taa')
.consume('foo', foo => {
console.log(foo); // prints faa
})
.consume('boo', foo => {
console.log(foo); // prints baa
})
.consume('too', too => {
// will never happen
throw new Error('Should never happen');
})
.catch(err => { // catch error
console.log(err); // print Boom
});
const base = new Flow();
new Foo(base)
FAQs
Lightweight pub/sub module with event backlog, timeout support that maps events to promises and back as well as chains them to other topics and flows.
The npm package oja receives a total of 17 weekly downloads. As such, oja popularity was classified as not popular.
We found that oja demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Security News
Multiple high-impact npm maintainers confirm they have been targeted in the same social engineering campaign that compromised Axios.

Security News
Axios compromise traced to social engineering, showing how attacks on maintainers can bypass controls and expose the broader software supply chain.

Security News
Node.js has paused its bug bounty program after funding ended, removing payouts for vulnerability reports but keeping its security process unchanged.