Comparing version
@@ -0,1 +1,3 @@ | ||
| var util = require('util'); | ||
| var Queue = require('./queue'); | ||
| var Promise = require('bluebird'); | ||
@@ -24,3 +26,4 @@ var uuid = require('node-uuid').v4; | ||
| this._version = 0; | ||
| } | ||
| this._commandQueue = new Queue(); | ||
| }; | ||
@@ -60,2 +63,3 @@ Aggregate.prototype._rehydrate = function (events, version) { | ||
| }*/ | ||
| return this; | ||
| }; | ||
@@ -68,3 +72,4 @@ | ||
| try { | ||
| resolve(self._commandHandler.handle(self, command)); | ||
| var handler = self._commandHandler.handle(self, command); | ||
| resolve(handler); | ||
| } catch (error) { | ||
@@ -80,22 +85,31 @@ reject(error); | ||
| Aggregate.prototype._sink = function (command) { | ||
| LOG.info('sinking command %j', command); | ||
| if (!command.id) { | ||
| LOG.warn('No command id set, setting it automatiically'); | ||
| command.id = uuid(); | ||
| } | ||
| //console.log(command.aggregateId + " || " + this.id + " || " + this._state.id); | ||
| if (!command.type || !command.aggregateId || command.aggregateId != this.id) { | ||
| console.log(command); | ||
| var error = new Error('command is missing data', command); | ||
| LOG.error('Unable to sink command', error); | ||
| throw error; | ||
| } | ||
| if (this.type) { | ||
| command.aggregateType = this.type; | ||
| } | ||
| var result = this._commandSink.sink(command, this); | ||
| return _promise(result, 'sinking command but not returning promise, commands status and chaining might not work as expected'); | ||
| } | ||
| Aggregate.prototype._sink = function (commandToSink) { | ||
| LOG.info('sinking command %j', commandToSink); | ||
| var self = this; | ||
| return this._commandQueue.queueCommand(function() { | ||
| var thenned = Promise.resolve(commandToSink); | ||
| thenned = thenned | ||
| .then(function(command) { | ||
| if (!command.id) { | ||
| LOG.warn('No command id set, setting it automatiically'); | ||
| command.id = uuid(); | ||
| } | ||
| // console.log(command.aggregateId + " || " + self.id); | ||
| if (!command.type || !command.aggregateId || command.aggregateId != self.id) { | ||
| console.log(command); | ||
| var error = new Error('command is missing data', command); | ||
| LOG.error('Unable to sink command', error); | ||
| throw error; | ||
| } | ||
| if (self.type) { | ||
| command.aggregateType = self.type; | ||
| } | ||
| var result = self._commandSink.sink(command, self); | ||
| return _promise(result, 'sinking command but not returning promise, commands status and chaining might not work as expected'); | ||
| }) | ||
| //console.log('thenn', thenned); | ||
| return thenned; | ||
| }); | ||
| }; | ||
@@ -107,5 +121,16 @@ Aggregate.prototype.getVersion = function () { | ||
| Aggregate.prototype.getUncommittedEvents = function () { | ||
| //throw if async cmd is on queue | ||
| if (this._commandQueue.isProcessing()) { | ||
| throw new Error("Cannot get uncommitted events while there is still commands in queue - try using getUncommittedEventsAsync()") | ||
| } | ||
| return this._uncommittedEvents; | ||
| } | ||
| Aggregate.prototype.getUncommittedEventsAsync = function() { | ||
| var self = this; | ||
| return self._commandQueue.empty().then(function() { | ||
| return self.getUncommittedEvents(); | ||
| }) | ||
| } | ||
| Aggregate.prototype.clearUncommittedEvents = function () { | ||
@@ -112,0 +137,0 @@ LOG.info('Clearing uncommitted events'); |
@@ -27,16 +27,19 @@ var uuid = require('node-uuid').v4; | ||
| return this._partition.openStream(aggregate.id).then(function (stream) { | ||
| var events = aggregate.getUncommittedEvents(); | ||
| aggregate.clearUncommittedEvents(); | ||
| savingWithId = savingWithId || uuid(); | ||
| events.forEach(function (event) { | ||
| LOG.debug('%s append event - %s', self.aggregateType, event.id); | ||
| stream.append(event); | ||
| }); | ||
| return stream.commit(savingWithId).then(function () { | ||
| LOG.info('%s committed %d events with %s id', self.aggregateType, events.length, commitId); | ||
| return aggregate; | ||
| }).error(function (e) { | ||
| LOG.error('%s unable to commit %d events with %s id', self.aggregateType, events.length, commitId, e); | ||
| throw e; | ||
| }); | ||
| aggregate | ||
| .getUncommittedEventsAsync() | ||
| .then(function (events) { | ||
| aggregate.clearUncommittedEvents(); | ||
| savingWithId = savingWithId || uuid(); | ||
| events.forEach(function (event) { | ||
| LOG.debug('%s append event - %s', self.aggregateType, event.id); | ||
| stream.append(event); | ||
| }); | ||
| return stream.commit(savingWithId).then(function () { | ||
| LOG.info('%s committed %d events with %s id', self.aggregateType, events.length, commitId); | ||
| return aggregate; | ||
| }).error(function (e) { | ||
| LOG.error('%s unable to commit %d events with %s id', self.aggregateType, events.length, commitId, e); | ||
| throw e; | ||
| }); | ||
| }); | ||
| }).nodeify(callback); | ||
@@ -43,0 +46,0 @@ }; |
| { | ||
| "name": "demeine", | ||
| "version": "0.1.7", | ||
| "version": "0.2.0", | ||
| "description": "DDDD - Distributed Domain Driven Design", | ||
@@ -8,3 +8,3 @@ "main": "index.js", | ||
| "test": "mocha test --recursive", | ||
| "test.watch": "mocha test --watch --recursive", | ||
| "test.watch": "mocha test --color --watch --recursive", | ||
| "coverage": "istanbul cover node_modules/mocha/bin/_mocha -- --recursive test -u exports -R spec", | ||
@@ -55,2 +55,2 @@ "coverage-start": "istanbul cover node_modules/mocha/bin/_mocha test --recursive -- -u exports -R spec && start coverage/lcov-report/index.html", | ||
| } | ||
| } | ||
| } |
@@ -0,2 +1,9 @@ | ||
| var Promise = require('bluebird'); | ||
| var sdebug = require('slf-debug').default; | ||
| require('slf').LoggerFactory.setFactory(sdebug); | ||
| require('debug').enable('*'); | ||
| var should = require('should'); | ||
| var Location = require('./aggregates/location'); | ||
| describe('Aggregate', function () { | ||
@@ -7,5 +14,28 @@ describe('#_apply', function () { | ||
| loc.changeName('test'); | ||
| loc.getUncommittedEvents().length.should.equal(1); | ||
| loc.getUncommittedEventsAsync().then(function (res) { | ||
| res.length.should.equal(1); | ||
| }) | ||
| }); | ||
| }); | ||
| describe('#_sink (with promise)', function () { | ||
| it('_sink with promise should resolve promise before processing', function (done) { | ||
| var loc = new Location(); | ||
| loc.changeNameAsync('FIRST-CHANGE'); | ||
| loc.changeName('SECOND-CHANGE'); | ||
| loc.changeNameAsync('THIRD-CHANGE'); | ||
| should.throws(function () { | ||
| // Should throw if trying to get uncommitted events while still processing | ||
| loc.getUncommittedEvents(); | ||
| }); | ||
| loc.getUncommittedEventsAsync().then(function (res) { | ||
| res[0].payload.should.equal('FIRST-CHANGE'); | ||
| res[1].payload.should.equal('SECOND-CHANGE'); | ||
| res[2].payload.should.equal('THIRD-CHANGE'); | ||
| res.length.should.equal(3); | ||
| done(); | ||
| }) | ||
| }); | ||
| }); | ||
| describe('#<promise> domain function', function () { | ||
@@ -21,20 +51,56 @@ it('should wait for promise', function (done) { | ||
| var loc = new Location(); | ||
| loc.failName('test').then(function (result) { | ||
| done(new Error('Unreachable')); | ||
| }).error(function (e) { | ||
| loc.getUncommittedEvents().length.should.equal(0); | ||
| done(); | ||
| }); | ||
| loc.failName('test') | ||
| .then(function (result) { | ||
| done(new Error('Unreachable')); | ||
| }) | ||
| .error(function (e) { | ||
| loc.getUncommittedEventsAsync().then(function (res) { | ||
| res.length.should.equal(0); | ||
| done(); | ||
| }); | ||
| }); | ||
| }); | ||
| it.only('should return promise error when failure in process by throwing', function (done) { | ||
| it('should return promise error when failure in process by throwing', function (done) { | ||
| var loc = new Location(); | ||
| loc.failName('fail early').then(function (result) { | ||
| console.log('fail 2'); | ||
| done(new Error('Unreachable')); | ||
| }).error(function (e) { | ||
| loc.getUncommittedEvents().length.should.equal(0); | ||
| done(); | ||
| }); | ||
| loc.failName('fail early') | ||
| .then(function (result) { | ||
| done(new Error('Unreachable')); | ||
| }) | ||
| .error(function (e) { | ||
| loc.getUncommittedEventsAsync().then(function (res) { | ||
| res.length.should.equal(0); | ||
| done(); | ||
| }); | ||
| }); | ||
| }); | ||
| it('should get error', function (done) { | ||
| var promise = giefPromisePlz(); | ||
| promise | ||
| .then(function (res) { | ||
| console.log('res', res); | ||
| done(new Error('Unreachable')); | ||
| }) | ||
| .error(function (err) { | ||
| console.log('ERR', err); | ||
| done(); | ||
| }); | ||
| }); | ||
| }); | ||
| }); | ||
| var giefPromisePlz = function () { | ||
| return new Promise(function (resolve, reject) { | ||
| try { | ||
| var functionThatThrows = function () { | ||
| throw new Error('error is thrown!'); | ||
| }; | ||
| var test = functionThatThrows(); | ||
| resolve(test); | ||
| } catch (error) { | ||
| reject(error); | ||
| } | ||
| }).error(function (error) { | ||
| console.log('Failed to process'); | ||
| throw error; | ||
| }); | ||
| } |
@@ -12,2 +12,6 @@ var util=require('util'); | ||
| // --------- CHANGE NAME | ||
| Location.prototype.changeName = function(newName) { | ||
@@ -25,2 +29,27 @@ return this._sink({type:'location.change_name.command', payload:newName, aggregateId: 1 }); | ||
| // --------- CHANGE NAME (PROMISE) | ||
| Location.prototype.changeNameAsync = function(newName) { | ||
| var promise = new Promise(function (resolve, reject) { | ||
| setTimeout(function () { | ||
| resolve({type: 'location.change_name.command', payload: newName, aggregateId: 1 }) | ||
| }, 50) | ||
| }); | ||
| return this._sink(promise); | ||
| }; | ||
| Location.prototype.processChangeNameAsync = function(command) { | ||
| return this._apply({type:'location.changed_name.event', payload:command.payload, aggregateId: 1}, true); | ||
| }; | ||
| Location.prototype.applyChangedNameAsync = function(event) { | ||
| //change local state if necessary for validation | ||
| }; | ||
| // --------- FAIL NAME | ||
| Location.prototype.failName = function(newName) { | ||
@@ -32,3 +61,2 @@ return this._sink({type:'location.fail_name.command', payload:newName, aggregateId: 1 }); | ||
| var self = this; | ||
| console.log('PROCESS'); | ||
| if(command.payload === 'fail early') { | ||
@@ -35,0 +63,0 @@ throw new Error('Failing early'); |
@@ -0,1 +1,5 @@ | ||
| // var sdebug = require('slf-debug').default; | ||
| // require('slf').LoggerFactory.setFactory(sdebug); | ||
| // require('debug').enable('*'); | ||
| var should = require('should'); | ||
@@ -2,0 +6,0 @@ var Promise = require('bluebird'); |
Sorry, the diff of this file is not supported yet
26641
69.5%18
20%692
104.13%