cqrs-eventdenormalizer
Advanced tools
Comparing version 0.3.2 to 0.3.3
@@ -176,2 +176,47 @@ var _ = require('lodash'), | ||
}, | ||
replayStreamed: function(fn, retryTimout) { | ||
var self = this; | ||
retryTimout = retryTimout || 10; | ||
var queue = []; | ||
var replay = function(evt) { | ||
queue.push(evt); | ||
if (queue.length === 1) { | ||
(function handle(e) { | ||
self.handle(e, true, function(err) { | ||
queue.splice(queue.indexOf(e), 1); | ||
if (queue.length > 0) { | ||
handle(queue[0]); | ||
} | ||
}); | ||
})(evt); | ||
} | ||
}; | ||
var done = function(callback) { | ||
(function retry() { | ||
if (queue.length > 0) { | ||
return setTimeout(retry, retryTimout); | ||
} | ||
var replVms = _.values(self.replayingVms); | ||
async.each(replVms, function(vm, callback) { | ||
self.saveViewModel(vm, callback); | ||
}, function(err) { | ||
self.replayingVms = {}; | ||
callback(err); | ||
}); | ||
})(); | ||
}; | ||
fn(replay, done); | ||
} | ||
@@ -178,0 +223,0 @@ |
@@ -9,8 +9,9 @@ var viewBuilderLoader = require('./loaders/viewBuilderLoader'), | ||
queue = require('node-queue'), | ||
replayHandler = require('./replayHandler'), | ||
repository = require('viewmodel').write.create(), | ||
revisionGuardStore = require('./revisionGuardStore'), | ||
revisionGuard = require('./revisionGuard'), | ||
eventQueue, | ||
guardStore, | ||
viewBuilders, | ||
repository = require('viewmodel').write.create(), | ||
revisionGuardStore = require('./revisionGuardStore'), | ||
revisionGuard = require('./revisionGuard'), | ||
evtDen; | ||
@@ -107,2 +108,5 @@ | ||
}); | ||
replayHandler.initialize(viewBuilders, guardStore); | ||
revisionGuard.initialize({ | ||
@@ -148,49 +152,6 @@ ignoreRevision: options.ignoreRevision, | ||
replay: function(evts, callback) { | ||
replay: replayHandler.replay, | ||
var revisionMap = {}, | ||
groupedEvents = {}; | ||
replayStreamed: replayHandler.replayStreamed | ||
_.each(evts, function(evt) { | ||
if (evt.head && evt.head.revision) { | ||
revisionMap[evt.payload.id] = evt.head.revision; | ||
} | ||
var interested = _.filter(viewBuilders, function(vB) { | ||
return _.contains(vB.registeredEventNames, evt.event); | ||
}); | ||
_.each(interested, function(inter) { | ||
groupedEvents[inter.id] = groupedEvents[inter.id] || []; | ||
groupedEvents[inter.id].push(evt); | ||
}); | ||
}); | ||
async.series([ | ||
function(callback) { | ||
async.each(viewBuilders, function(viewBuilder, callback) { | ||
if (!groupedEvents[viewBuilder.id] || groupedEvents[viewBuilder.id].length === 0) { | ||
return callback(null); | ||
} | ||
viewBuilder.replay(groupedEvents[viewBuilder.id], callback); | ||
}, callback); | ||
}, | ||
function(callback) { | ||
var ids = _.keys(revisionMap); | ||
async.each(ids, function(id, callback) { | ||
guardStore.getRevision(id, function(err, entry) { | ||
if (err) { return callback(err); } | ||
entry.revision = revisionMap[id] + 1; | ||
guardStore.saveRevision(entry, callback); | ||
}); | ||
}, callback); | ||
} | ||
], function(err) { | ||
if (callback) callback(err); | ||
}); | ||
} | ||
}); |
{ | ||
"author": "adrai", | ||
"name": "cqrs-eventdenormalizer", | ||
"version": "0.3.2", | ||
"version": "0.3.3", | ||
"private": false, | ||
@@ -6,0 +6,0 @@ "main": "index.js", |
@@ -42,2 +42,13 @@ # Introduction | ||
// to replay streamed | ||
eventDenormalizer.replayStreamed(function(replay, done) { | ||
replay(evt1); | ||
replay(evt2); | ||
replay(evt3); | ||
done(function(err) { }); | ||
}); | ||
## Define ViewBuilders... | ||
@@ -112,2 +123,6 @@ | ||
## v0.3.3 | ||
- little fix in replay streamed | ||
## v0.3.2 | ||
@@ -114,0 +129,0 @@ |
45767
20
1139
172