ack
Stability: 1 - Experimental
Ack is a tracker mechanism inspired by XOR tracking in Storm that guarantees message processing. It can track multitudes of messages/events in ack chains and report whether or not all of them have been processed.
Installation
npm install ack
Tests
npm test
Overview
Ack is a tracker mechanism inspired by XOR tracking in Storm that guarantees message processing. It can track multitudes of messages/events and report whether or not all of them have been processed.
How it works
Ack uses the XOR operation for all the magic. Here is a quick overview of the relevant aspects of XOR. The XOR (^
) operation has the following properties:
A ^ A = 0
and
A ^ B ^ C = B ^ A ^ C
, A ^ D ^ B ^ A ^ B ^ D = 0
.
Ack chain
Let's say that you want to do a letter count, and you have a database containing text files that contain words.
database -> files -> words
Our approach will be event driven, so we will iterate through the database
and emit a file
event for each file we encounter.
+-> 'file'
/
database ---> 'file'
\
+-> 'file'
Another part of our computation will accept those file
events and emit word
events for each word encountered.
+-> 'word'
/
... file ---> 'word'
\
+-> 'word'
How do we track that a particular file has been fully processed when assuming that at each point the processing of any one of the words in that file could fail?
For every file
, you can create a unique tag
and a random xorStamp
. We will then initialize an ack chain. In the below example, I will use a simple bit string in place of xorStamp
for illustration purposes. In real use, you want to use a random Buffer, perhaps generated via:
var xorStamp = crypto.createHash('sha1').update('' + new Date().getTime() + process.hrtime()[1]).digest();
In our example, we generate tag
and xorStamp
.
WARNING: Pseudocode below.
var Ack = require('ack');
var ack = new Ack();
var tag = "unique-file-tag";
var fileStamp = '00101001';
ack.add(tag, fileStamp);
Next, each file is broken up into words. Here comes the tricky part. We are going to do a lot of things at once.
First, we already registered the start of file processing via ack.add(...)
, now, we will acknowledge finishing the processing of that file. To acknowledge, we will send the fileStamp
again (remember A ^ A = 0
).
Second, at the same time, we will acknowledge starting the processing of each word. Let's say we have word1
, word2
, word3
. We will generate a stamp for each word, so word1Stamp
, word2Stamp
and word3Stamp
. To acknowledge the starting of the processing we will send those word stamps to the acker.
Now, remember that A ^ A ^ B ^ C = 0 ^ B ^ C = B ^ C
. More precisely:
var fileStamp = '00101001';
var word1Stamp = '00100101';
var word2Stamp = '10101001';
var word3Stamp = '11101001';
var xorOfAll = '01001100';
ack.stamp(tag, xorOfAll);
At this point, what happened inside of Ack is the XOR of previous state with the newly stamped one.
var previousStamp = '00101001';
var inboundStamp = '01001100';
var currentState = '01100101';
So, we've managed to acknowledge multiple operations all at once, and we are still storing only the currentState
.
Next, notice what happens as we successfully process each word.
var currentState = '01100101';
var word1Stamp = '00100101';
ack.stamp(tag, word1Stamp);
currentState = '01000000';
var word2Stamp = '10101001';
ack.stamp(tag, word2Stamp);
currentState = '11101001';
var word3Stamp = '11101001';
ack.stamp(tag, word3Stamp);
currentState = '00000000';
That's it. The XOR math works out really well for tracking these types of computation where one event generates multiple child events. This can keep going further down the chain as long as we acknowledge completing our parent processing together with initiation of any child processing. Despite all that activity, the amount of information we store is always one state per entire ack chain.
The above example used binary looking strings for illustrative purposes. The real implementation uses Buffers. Additionally, the stamps need to be sufficiently large and random to prevent erronous acked
events. Storm implementation found a 64bit random integer to be sufficient in practice.
Documentation
Ack
Public API
Ack.eqv(first, second)
CAUTION: reserved for internal use
first
: Buffer First buffer to compare.second
: Buffer Second buffer to compare.- Return: Boolean
true
if equal, false
otherwise.
Ack.xor(first, second, [zeroCallback])
CAUTION: reserved for internal use
first
: Buffer First buffer to compare.second
: Buffer Second buffer to compare.zeroCallback
: Function (Default: undefined) Optional callback to call if the result of XOR is 0.- Return: Buffer The result of
first
XOR second
The lengths of the buffers must be equal.
new Ack()
Creates a new Ack instance.
ack.add(tag, xorStamp)
tag
: String A unique identifier to track this ack chain.xorStamp
: Buffer Initial stamp to start the ack chain for tag
.
ack.fail(tag)
tag
: String A unique identifier of a previously added tag
.
Removes the tag
and associated xorStamp
from the acker and emits the failed
event for the tag
.
ack.stamp(tag, xorStamp)
tag
: String A unique identifier to track this ack chain.xorStamp
: Buffer Initial stamp to start the ack chain for tag
.
Event acked
tag
: String A unique identifier of a previously added tag
.
Emitted when the ack chain for a previously added tag
succeeds. Success is defined as the cumulative XOR operation of initial add()
xorStamp
and any following stamp()
xorStamp
s that results in xorStamp
being all 0s.
Success removes the tag
and associated xorStamp
from the acker and emits the acked
event for the tag
.
Event failed
tag
: String A unique identifier of a previously added tag
.
Emitted when the ack chain for a previously added tag
fails.
Sources
The implementation has been sourced from: