Comparing version 0.1.7 to 0.2.0
@@ -0,1 +1,3 @@ | ||
var util = require('util'); | ||
var Queue = require('./queue'); | ||
var Promise = require('bluebird'); | ||
@@ -24,3 +26,4 @@ var uuid = require('node-uuid').v4; | ||
this._version = 0; | ||
} | ||
this._commandQueue = new Queue(); | ||
}; | ||
@@ -60,2 +63,3 @@ Aggregate.prototype._rehydrate = function (events, version) { | ||
}*/ | ||
return this; | ||
}; | ||
@@ -68,3 +72,4 @@ | ||
try { | ||
resolve(self._commandHandler.handle(self, command)); | ||
var handler = self._commandHandler.handle(self, command); | ||
resolve(handler); | ||
} catch (error) { | ||
@@ -80,22 +85,31 @@ reject(error); | ||
Aggregate.prototype._sink = function (command) { | ||
LOG.info('sinking command %j', command); | ||
if (!command.id) { | ||
LOG.warn('No command id set, setting it automatiically'); | ||
command.id = uuid(); | ||
} | ||
//console.log(command.aggregateId + " || " + this.id + " || " + this._state.id); | ||
if (!command.type || !command.aggregateId || command.aggregateId != this.id) { | ||
console.log(command); | ||
var error = new Error('command is missing data', command); | ||
LOG.error('Unable to sink command', error); | ||
throw error; | ||
} | ||
if (this.type) { | ||
command.aggregateType = this.type; | ||
} | ||
var result = this._commandSink.sink(command, this); | ||
return _promise(result, 'sinking command but not returning promise, commands status and chaining might not work as expected'); | ||
} | ||
Aggregate.prototype._sink = function (commandToSink) { | ||
LOG.info('sinking command %j', commandToSink); | ||
var self = this; | ||
return this._commandQueue.queueCommand(function() { | ||
var thenned = Promise.resolve(commandToSink); | ||
thenned = thenned | ||
.then(function(command) { | ||
if (!command.id) { | ||
LOG.warn('No command id set, setting it automatiically'); | ||
command.id = uuid(); | ||
} | ||
// console.log(command.aggregateId + " || " + self.id); | ||
if (!command.type || !command.aggregateId || command.aggregateId != self.id) { | ||
console.log(command); | ||
var error = new Error('command is missing data', command); | ||
LOG.error('Unable to sink command', error); | ||
throw error; | ||
} | ||
if (self.type) { | ||
command.aggregateType = self.type; | ||
} | ||
var result = self._commandSink.sink(command, self); | ||
return _promise(result, 'sinking command but not returning promise, commands status and chaining might not work as expected'); | ||
}) | ||
//console.log('thenn', thenned); | ||
return thenned; | ||
}); | ||
}; | ||
@@ -107,5 +121,16 @@ Aggregate.prototype.getVersion = function () { | ||
Aggregate.prototype.getUncommittedEvents = function () { | ||
//throw if async cmd is on queue | ||
if (this._commandQueue.isProcessing()) { | ||
throw new Error("Cannot get uncommitted events while there is still commands in queue - try using getUncommittedEventsAsync()") | ||
} | ||
return this._uncommittedEvents; | ||
} | ||
Aggregate.prototype.getUncommittedEventsAsync = function() { | ||
var self = this; | ||
return self._commandQueue.empty().then(function() { | ||
return self.getUncommittedEvents(); | ||
}) | ||
} | ||
Aggregate.prototype.clearUncommittedEvents = function () { | ||
@@ -112,0 +137,0 @@ LOG.info('Clearing uncommitted events'); |
@@ -27,16 +27,19 @@ var uuid = require('node-uuid').v4; | ||
return this._partition.openStream(aggregate.id).then(function (stream) { | ||
var events = aggregate.getUncommittedEvents(); | ||
aggregate.clearUncommittedEvents(); | ||
savingWithId = savingWithId || uuid(); | ||
events.forEach(function (event) { | ||
LOG.debug('%s append event - %s', self.aggregateType, event.id); | ||
stream.append(event); | ||
}); | ||
return stream.commit(savingWithId).then(function () { | ||
LOG.info('%s committed %d events with %s id', self.aggregateType, events.length, commitId); | ||
return aggregate; | ||
}).error(function (e) { | ||
LOG.error('%s unable to commit %d events with %s id', self.aggregateType, events.length, commitId, e); | ||
throw e; | ||
}); | ||
aggregate | ||
.getUncommittedEventsAsync() | ||
.then(function (events) { | ||
aggregate.clearUncommittedEvents(); | ||
savingWithId = savingWithId || uuid(); | ||
events.forEach(function (event) { | ||
LOG.debug('%s append event - %s', self.aggregateType, event.id); | ||
stream.append(event); | ||
}); | ||
return stream.commit(savingWithId).then(function () { | ||
LOG.info('%s committed %d events with %s id', self.aggregateType, events.length, commitId); | ||
return aggregate; | ||
}).error(function (e) { | ||
LOG.error('%s unable to commit %d events with %s id', self.aggregateType, events.length, commitId, e); | ||
throw e; | ||
}); | ||
}); | ||
}).nodeify(callback); | ||
@@ -43,0 +46,0 @@ }; |
{ | ||
"name": "demeine", | ||
"version": "0.1.7", | ||
"version": "0.2.0", | ||
"description": "DDDD - Distributed Domain Driven Design", | ||
@@ -8,3 +8,3 @@ "main": "index.js", | ||
"test": "mocha test --recursive", | ||
"test.watch": "mocha test --watch --recursive", | ||
"test.watch": "mocha test --color --watch --recursive", | ||
"coverage": "istanbul cover node_modules/mocha/bin/_mocha -- --recursive test -u exports -R spec", | ||
@@ -55,2 +55,2 @@ "coverage-start": "istanbul cover node_modules/mocha/bin/_mocha test --recursive -- -u exports -R spec && start coverage/lcov-report/index.html", | ||
} | ||
} | ||
} |
@@ -0,2 +1,9 @@ | ||
var Promise = require('bluebird'); | ||
var sdebug = require('slf-debug').default; | ||
require('slf').LoggerFactory.setFactory(sdebug); | ||
require('debug').enable('*'); | ||
var should = require('should'); | ||
var Location = require('./aggregates/location'); | ||
describe('Aggregate', function () { | ||
@@ -7,5 +14,28 @@ describe('#_apply', function () { | ||
loc.changeName('test'); | ||
loc.getUncommittedEvents().length.should.equal(1); | ||
loc.getUncommittedEventsAsync().then(function (res) { | ||
res.length.should.equal(1); | ||
}) | ||
}); | ||
}); | ||
describe('#_sink (with promise)', function () { | ||
it('_sink with promise should resolve promise before processing', function (done) { | ||
var loc = new Location(); | ||
loc.changeNameAsync('FIRST-CHANGE'); | ||
loc.changeName('SECOND-CHANGE'); | ||
loc.changeNameAsync('THIRD-CHANGE'); | ||
should.throws(function () { | ||
// Should throw if trying to get uncommitted events while still processing | ||
loc.getUncommittedEvents(); | ||
}); | ||
loc.getUncommittedEventsAsync().then(function (res) { | ||
res[0].payload.should.equal('FIRST-CHANGE'); | ||
res[1].payload.should.equal('SECOND-CHANGE'); | ||
res[2].payload.should.equal('THIRD-CHANGE'); | ||
res.length.should.equal(3); | ||
done(); | ||
}) | ||
}); | ||
}); | ||
describe('#<promise> domain function', function () { | ||
@@ -21,20 +51,56 @@ it('should wait for promise', function (done) { | ||
var loc = new Location(); | ||
loc.failName('test').then(function (result) { | ||
done(new Error('Unreachable')); | ||
}).error(function (e) { | ||
loc.getUncommittedEvents().length.should.equal(0); | ||
done(); | ||
}); | ||
loc.failName('test') | ||
.then(function (result) { | ||
done(new Error('Unreachable')); | ||
}) | ||
.error(function (e) { | ||
loc.getUncommittedEventsAsync().then(function (res) { | ||
res.length.should.equal(0); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
it.only('should return promise error when failure in process by throwing', function (done) { | ||
it('should return promise error when failure in process by throwing', function (done) { | ||
var loc = new Location(); | ||
loc.failName('fail early').then(function (result) { | ||
console.log('fail 2'); | ||
done(new Error('Unreachable')); | ||
}).error(function (e) { | ||
loc.getUncommittedEvents().length.should.equal(0); | ||
done(); | ||
}); | ||
loc.failName('fail early') | ||
.then(function (result) { | ||
done(new Error('Unreachable')); | ||
}) | ||
.error(function (e) { | ||
loc.getUncommittedEventsAsync().then(function (res) { | ||
res.length.should.equal(0); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
it('should get error', function (done) { | ||
var promise = giefPromisePlz(); | ||
promise | ||
.then(function (res) { | ||
console.log('res', res); | ||
done(new Error('Unreachable')); | ||
}) | ||
.error(function (err) { | ||
console.log('ERR', err); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
}); | ||
var giefPromisePlz = function () { | ||
return new Promise(function (resolve, reject) { | ||
try { | ||
var functionThatThrows = function () { | ||
throw new Error('error is thrown!'); | ||
}; | ||
var test = functionThatThrows(); | ||
resolve(test); | ||
} catch (error) { | ||
reject(error); | ||
} | ||
}).error(function (error) { | ||
console.log('Failed to process'); | ||
throw error; | ||
}); | ||
} |
@@ -12,2 +12,6 @@ var util=require('util'); | ||
// --------- CHANGE NAME | ||
Location.prototype.changeName = function(newName) { | ||
@@ -25,2 +29,27 @@ return this._sink({type:'location.change_name.command', payload:newName, aggregateId: 1 }); | ||
// --------- CHANGE NAME (PROMISE) | ||
Location.prototype.changeNameAsync = function(newName) { | ||
var promise = new Promise(function (resolve, reject) { | ||
setTimeout(function () { | ||
resolve({type: 'location.change_name.command', payload: newName, aggregateId: 1 }) | ||
}, 50) | ||
}); | ||
return this._sink(promise); | ||
}; | ||
Location.prototype.processChangeNameAsync = function(command) { | ||
return this._apply({type:'location.changed_name.event', payload:command.payload, aggregateId: 1}, true); | ||
}; | ||
Location.prototype.applyChangedNameAsync = function(event) { | ||
//change local state if necessary for validation | ||
}; | ||
// --------- FAIL NAME | ||
Location.prototype.failName = function(newName) { | ||
@@ -32,3 +61,2 @@ return this._sink({type:'location.fail_name.command', payload:newName, aggregateId: 1 }); | ||
var self = this; | ||
console.log('PROCESS'); | ||
if(command.payload === 'fail early') { | ||
@@ -35,0 +63,0 @@ throw new Error('Failing early'); |
@@ -0,1 +1,5 @@ | ||
// var sdebug = require('slf-debug').default; | ||
// require('slf').LoggerFactory.setFactory(sdebug); | ||
// require('debug').enable('*'); | ||
var should = require('should'); | ||
@@ -2,0 +6,0 @@ var Promise = require('bluebird'); |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
26641
18
692