Comparing version 0.2.4 to 0.3.0
@@ -29,4 +29,8 @@ var util = require('util'); | ||
Aggregate.prototype._rehydrate = function (events, version) { | ||
LOG.info('rehydrating aggregate with %d events to version %d', events.length, version); | ||
Aggregate.prototype._rehydrate = function (events, version, snapshot) { | ||
LOG.info('rehydrating aggregate with %d events to version %d has snapshot %s', events.length, version, snapshot !== undefined); | ||
// do another way? | ||
if (snapshot) { | ||
this._state = snapshot; | ||
} | ||
for (var i = 0; i < events.length; i++) { | ||
@@ -38,2 +42,6 @@ this._apply(events[i], false); | ||
Aggregate.prototype._getSnapshot = function () { | ||
return this._state; | ||
} | ||
Aggregate.prototype._apply = function (event, isNew) { | ||
@@ -46,4 +54,2 @@ LOG.debug('applying event %j %s', event, isNew); | ||
if (!event.type || !event.aggregateId || event.aggregateId != this.id) { | ||
console.log(this); | ||
console.log(event); | ||
throw new Error('event is missing data', event); | ||
@@ -89,6 +95,6 @@ } | ||
var self = this; | ||
return this._commandQueue.queueCommand(function() { | ||
return this._commandQueue.queueCommand(function () { | ||
var thenned = Promise.resolve(commandToSink); | ||
thenned = thenned | ||
.then(function(command) { | ||
.then(function (command) { | ||
if (!command.id) { | ||
@@ -100,3 +106,2 @@ LOG.warn('No command id set, setting it automatiically'); | ||
if (!command.type || !command.aggregateId || command.aggregateId != self.id) { | ||
console.log(command); | ||
var error = new Error('command is missing data', command); | ||
@@ -129,5 +134,5 @@ LOG.error('Unable to sink command', error); | ||
Aggregate.prototype.getUncommittedEventsAsync = function() { | ||
Aggregate.prototype.getUncommittedEventsAsync = function () { | ||
var self = this; | ||
return self._commandQueue.empty().then(function() { | ||
return self._commandQueue.empty().then(function () { | ||
if (self._commandQueue.isProcessing()) { | ||
@@ -134,0 +139,0 @@ return self.getUncommittedEventsAsync(); |
@@ -13,4 +13,2 @@ var DefaultEventHandler = function() { | ||
} else { | ||
console.log(aggregate); | ||
console.log('Unable to apply event ' + type + " || " + funcName); | ||
throw new Error('Unable to apply event ' + type + " || " + funcName); | ||
@@ -17,0 +15,0 @@ } |
@@ -13,9 +13,25 @@ var uuid = require('uuid').v4; | ||
LOG.info('%s findById(%s)', this.aggregateType, id); | ||
var self = this; | ||
var aggregate = this._factory(id); | ||
return this._partition.openStream(id).then(function (stream) { | ||
var events = stream.getCommittedEvents(); | ||
var version = stream.getVersion(); | ||
aggregate._rehydrate(events, version); | ||
return aggregate; | ||
}).nodeify(callback); | ||
var hasSnapshot = this._partition.loadSnapshot !== undefined; | ||
if (hasSnapshot) { | ||
return self._partition.loadSnapshot(id).then(function (snapshot) { | ||
return self._partition.queryStream(id, (snapshot && snapshot.version) || 0).then(function (commits) { | ||
var events = []; | ||
commits.forEach(function (commit) { | ||
events = events.concat(commit.events); | ||
}); | ||
var version = (snapshot && snapshot.version || 0) + events.length; | ||
aggregate._rehydrate(events, version, snapshot && snapshot.snapshot); | ||
return aggregate; | ||
}) | ||
}).nodeify(callback);; | ||
} else { | ||
return this._partition.openStream(id).then(function (stream) { | ||
var events = stream.getCommittedEvents(); | ||
var version = stream.getVersion(); | ||
aggregate._rehydrate(events, version); | ||
return aggregate; | ||
}).nodeify(callback); | ||
} | ||
}; | ||
@@ -46,2 +62,6 @@ | ||
LOG.info('%s committed %d events with %s id', self.aggregateType, events.length, commitId); | ||
if(self._partition.storeSnapshot !== undefined) { | ||
LOG.debug('Persisting snapshot for stream %s version %s', aggregate.id, aggregate.getVersion()); | ||
self._partition.storeSnapshot(aggregate.id, aggregate._getSnapshot(), aggregate.getVersion()); | ||
} | ||
return aggregate; | ||
@@ -48,0 +68,0 @@ }).error(function (e) { |
{ | ||
"name": "demeine", | ||
"version": "0.2.4", | ||
"version": "0.3.0", | ||
"description": "DDDD - Distributed Domain Driven Design", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
var Promise = require('bluebird'); | ||
var sdebug = require('slf-debug').default; | ||
require('slf').LoggerFactory.setFactory(sdebug); | ||
require('debug').enable('*'); | ||
//require('debug').enable('*'); | ||
var should = require('should'); | ||
@@ -44,3 +44,2 @@ | ||
loc.changeName('test').then(function (result) { | ||
console.log(result); | ||
done(); | ||
@@ -79,7 +78,5 @@ }); | ||
.then(function (res) { | ||
console.log('res', res); | ||
done(new Error('Unreachable')); | ||
}) | ||
.error(function (err) { | ||
console.log('ERR', err); | ||
done(); | ||
@@ -103,5 +100,4 @@ }); | ||
}).error(function (error) { | ||
console.log('Failed to process'); | ||
throw error; | ||
}); | ||
} |
@@ -1,8 +0,9 @@ | ||
var util=require('util'); | ||
var util = require('util'); | ||
var Aggregate = require('../..').Aggregate; | ||
var Promise = require('bluebird'); | ||
var Location = function(commandSink, eventHandler) { | ||
var Location = function (commandSink, eventHandler) { | ||
this.id = 1 | ||
Aggregate.call(this, commandSink, eventHandler); | ||
Aggregate.call(this, commandSink, eventHandler); | ||
this._state = {}; | ||
} | ||
@@ -16,12 +17,13 @@ | ||
Location.prototype.changeName = function(newName) { | ||
return this._sink({type:'location.change_name.command', payload:newName, aggregateId: 1 }); | ||
Location.prototype.changeName = function (newName) { | ||
return this._sink({ type: 'location.change_name.command', payload: newName, aggregateId: 1 }); | ||
}; | ||
Location.prototype.processChangeName = function(command) { | ||
return this._apply({type:'location.changed_name.event', payload:command.payload, aggregateId: 1}, true); | ||
Location.prototype.processChangeName = function (command) { | ||
return this._apply({ type: 'location.changed_name.event', payload: command.payload, aggregateId: 1 }, true); | ||
}; | ||
Location.prototype.applyChangedName = function(event) { | ||
//change local state if necessary for validation | ||
Location.prototype.applyChangedName = function (event) { | ||
//change local state if necessary for validation | ||
this._state.name = event.payload; | ||
}; | ||
@@ -33,17 +35,17 @@ | ||
Location.prototype.changeNameAsync = function(newName) { | ||
Location.prototype.changeNameAsync = function (newName) { | ||
var promise = new Promise(function (resolve, reject) { | ||
setTimeout(function () { | ||
resolve({type: 'location.change_name.command', payload: newName, aggregateId: 1 }) | ||
resolve({ type: 'location.change_name.command', payload: newName, aggregateId: 1 }) | ||
}, 50) | ||
}); | ||
return this._sink(promise); | ||
return this._sink(promise); | ||
}; | ||
Location.prototype.processChangeNameAsync = function(command) { | ||
return this._apply({type:'location.changed_name.event', payload:command.payload, aggregateId: 1}, true); | ||
Location.prototype.processChangeNameAsync = function (command) { | ||
return this._apply({ type: 'location.changed_name.event', payload: command.payload, aggregateId: 1 }, true); | ||
}; | ||
Location.prototype.applyChangedNameAsync = function(event) { | ||
//change local state if necessary for validation | ||
Location.prototype.applyChangedNameAsync = function (event) { | ||
this._state.name = event.payload; | ||
}; | ||
@@ -55,13 +57,13 @@ | ||
Location.prototype.failName = function(newName) { | ||
return this._sink({type:'location.fail_name.command', payload:newName, aggregateId: 1 }); | ||
Location.prototype.failName = function (newName) { | ||
return this._sink({ type: 'location.fail_name.command', payload: newName, aggregateId: 1 }); | ||
}; | ||
Location.prototype.processFailName = function(command) { | ||
Location.prototype.processFailName = function (command) { | ||
var self = this; | ||
if(command.payload === 'fail early') { | ||
if (command.payload === 'fail early') { | ||
throw new Error('Failing early'); | ||
} | ||
return new Promise(function(resolve, reject) { | ||
self._apply({type:'location.changed_name.event', payload:command.payload, aggregateId: 1}, true); | ||
return new Promise(function (resolve, reject) { | ||
self._apply({ type: 'location.changed_name.event', payload: command.payload, aggregateId: 1 }, true); | ||
reject(new Error('uh oh')) | ||
@@ -71,4 +73,4 @@ }); | ||
Location.prototype.applyFailedName = function(event) { | ||
//change local state if necessary for validation | ||
Location.prototype.applyFailedName = function (event) { | ||
//change local state if necessary for validation | ||
}; | ||
@@ -75,0 +77,0 @@ |
@@ -13,59 +13,168 @@ // var sdebug = require('slf-debug').default; | ||
describe('Repository', function() { | ||
var Partition = function() { | ||
this.openStream = function(streamId) { | ||
var Stream = function() { | ||
this.getCommittedEvents = function(){ | ||
return []; | ||
}; | ||
this.getVersion = function(){ | ||
return -1; | ||
} | ||
this.append = function() {}; | ||
this.commit = function() { return Promise.resolve(null);}; | ||
} | ||
return Promise.resolve(new Stream(streamId)); | ||
} | ||
} | ||
describe('#findById', function() { | ||
describe('Repository', function () { | ||
var Partition = function () { | ||
this.openStream = function (streamId) { | ||
var Stream = function () { | ||
this.getCommittedEvents = function () { | ||
return []; | ||
}; | ||
this.getVersion = function () { | ||
return -1; | ||
} | ||
this.append = function () { }; | ||
this.commit = function () { return Promise.resolve(null); }; | ||
} | ||
return Promise.resolve(new Stream(streamId)); | ||
} | ||
} | ||
it('returns aggregate with version = -1 if new stream', function(done) { | ||
var repo = new Repository(new Partition(), 'test_aggregate'); | ||
repo.findById('ID_THAT_DO_NOT_EXIST').then(function(aggregate) { | ||
aggregate.getVersion().should.equal(-1); | ||
done(); | ||
}).catch(function(err) { | ||
done(err); | ||
});; | ||
}); | ||
var SnapshotPartition = function (snapshot, events) { | ||
this._snapshot = snapshot; | ||
this.loadSnapshot = function () { | ||
return Promise.resolve(this._snapshot); | ||
} | ||
this.openStream = function (streamId) { | ||
var Stream = function () { | ||
this.getCommittedEvents = function () { | ||
return []; | ||
}; | ||
this.getVersion = function () { | ||
return -1; | ||
} | ||
this.append = function () { }; | ||
this.commit = function () { return Promise.resolve(null); }; | ||
} | ||
return Promise.resolve(new Stream(streamId)); | ||
} | ||
this.storeSnapshot = function (id, snapshot, version) { | ||
this._snapshot = { id: id, snapshot: snapshot, version: version }; | ||
return Promise.resolve(this._snapshot); | ||
} | ||
this.queryStream = function (id, fromEventSequence, callback) { | ||
var result = [{ events: events }]; | ||
if (fromEventSequence > 0) { | ||
var startCommitId = 0; | ||
var foundEvents = 0; | ||
for (var i = 0; i < result.length; i++) { | ||
foundEvents += result[0].events.length; | ||
startCommitId++; | ||
if (foundEvents >= fromEventSequence) { | ||
break; | ||
} | ||
} | ||
var tooMany = foundEvents - fromEventSequence; | ||
it('creates aggregates with custom factory', function(done) { | ||
var factory = function() { return new Location();}; | ||
var repo = new Repository(new Partition(), 'location', factory); | ||
repo.findById('ID_THAT_DO_NOT_EXIST').then(function(aggregate) { | ||
if(aggregate instanceof Location) { | ||
done(); | ||
} else { | ||
done('Wrong type created'); | ||
} | ||
}).catch(function(err) { | ||
done(err); | ||
});; | ||
}); | ||
}); | ||
describe('#save', function() { | ||
it('save should clear uncommitted events ', function(done) { | ||
var factory = function() { return new Location();}; | ||
var repo = new Repository(new Partition(), 'location', factory); | ||
repo.findById('ID_THAT_DO_NOT_EXIST').then(function(location) { | ||
location.changeName('New Name'); | ||
repo.save(location).then(function(x) { | ||
x.getUncommittedEvents().length.should.equal(0); | ||
done(); | ||
}); | ||
}).catch(function(err) { | ||
done(err); | ||
}); | ||
}); | ||
}); | ||
result = result.slice(startCommitId - (tooMany > 0 ? 1 : 0)); | ||
if (tooMany > 0) { | ||
result[0].events = result[0].events.slice(result[0].events.length - tooMany); | ||
} | ||
} | ||
return Promise.resolve(result).nodeify(callback); | ||
} | ||
} | ||
describe('#findById', function () { | ||
it('returns aggregate with version = -1 if new stream', function (done) { | ||
var repo = new Repository(new Partition(), 'test_aggregate'); | ||
repo.findById('ID_THAT_DO_NOT_EXIST').then(function (aggregate) { | ||
aggregate.getVersion().should.equal(-1); | ||
done(); | ||
}).catch(function (err) { | ||
done(err); | ||
});; | ||
}); | ||
it('creates aggregates with custom factory', function (done) { | ||
var factory = function () { return new Location(); }; | ||
var repo = new Repository(new Partition(), 'location', factory); | ||
repo.findById('ID_THAT_DO_NOT_EXIST').then(function (aggregate) { | ||
if (aggregate instanceof Location) { | ||
done(); | ||
} else { | ||
done('Wrong type created'); | ||
} | ||
}).catch(function (err) { | ||
done(err); | ||
});; | ||
}); | ||
it('hydrates aggregates with snapshot', function (done) { | ||
var factory = function () { return new Location(); }; | ||
var repo = new Repository(new SnapshotPartition({ id: '1', version: 1, snapshot: { name: 'hello' } }, [{ id: 1, type: 'location.changed_name.event', payload: 'Hello' }]), 'location', factory); | ||
repo.findById('1').then(function (aggregate) { | ||
if (aggregate instanceof Location) { | ||
aggregate._getSnapshot().name.should.equal('hello'); | ||
aggregate.getVersion().should.equal(1); | ||
done(); | ||
} else { | ||
done('Wrong type created'); | ||
} | ||
}).catch(function (err) { | ||
done(err); | ||
}); | ||
}); | ||
it('hydrates aggregates with snapshot and events', function (done) { | ||
var factory = function () { return new Location(); }; | ||
var repo = new Repository(new SnapshotPartition({ id: '1', version: 1, snapshot: { name: 'hello' } }, [{ id: 1, aggregateId: '1', type: 'location.changed_name.event', payload: 'Hello' }, { id: 2, aggregateId: '1', type: 'location.changed_name.event', payload: 'Hello, world' }]), 'location', factory); | ||
repo.findById('1').then(function (aggregate) { | ||
if (aggregate instanceof Location) { | ||
aggregate._getSnapshot().name.should.equal('Hello, world'); | ||
aggregate.getVersion().should.equal(2); | ||
done(); | ||
} else { | ||
done('Wrong type created'); | ||
} | ||
}).catch(function (err) { | ||
done(err); | ||
}); | ||
}); | ||
it('hydrates aggregates without snapshot', function (done) { | ||
var factory = function () { return new Location(); }; | ||
var repo = new Repository(new SnapshotPartition(undefined, [{ id: 1, aggregateId: '1', type: 'location.changed_name.event', payload: 'Hello' }]), 'location', factory); | ||
repo.findById('1').then(function (aggregate) { | ||
if (aggregate instanceof Location) { | ||
aggregate._getSnapshot().name.should.equal('Hello'); | ||
done(); | ||
} else { | ||
done('Wrong type created'); | ||
} | ||
}).catch(function (err) { | ||
done(err); | ||
}); | ||
}); | ||
it('stores snapshot for aggregate on save', function (done) { | ||
var factory = function () { return new Location(); }; | ||
var part = new SnapshotPartition({ id: '1', version: 1, snapshot: { name: 'hello' } }, [{ id: 1, type: 'location.changed_name.event', payload: 'Hello' }]); | ||
var repo = new Repository(part, 'location', factory); | ||
repo.findById('1').then(function (aggregate) { | ||
aggregate.changeName('Hello, World!'); | ||
repo.save(aggregate).then(function () { | ||
part.loadSnapshot('1').then(function (snapshot) { | ||
snapshot.snapshot.name.should.equal('Hello, World!'); | ||
done(); | ||
}) | ||
}); | ||
}).catch(function (err) { | ||
done(err); | ||
});; | ||
}); | ||
}); | ||
describe('#save', function () { | ||
it('save should clear uncommitted events ', function (done) { | ||
var factory = function () { return new Location(); }; | ||
var repo = new Repository(new Partition(), 'location', factory); | ||
repo.findById('ID_THAT_DO_NOT_EXIST').then(function (location) { | ||
location.changeName('New Name'); | ||
repo.save(location).then(function (x) { | ||
x.getUncommittedEvents().length.should.equal(0); | ||
done(); | ||
}); | ||
}).catch(function (err) { | ||
done(err); | ||
}); | ||
}); | ||
}); | ||
}); |
32640
827