Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

demeine

Package Overview
Dependencies
Maintainers
1
Versions
33
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

demeine - npm Package Compare versions

Comparing version 0.1.7 to 0.2.0

lib/promise_queue.js

67

lib/aggregate.js

@@ -0,1 +1,3 @@

var util = require('util');
var Queue = require('./queue');
var Promise = require('bluebird');

@@ -24,3 +26,4 @@ var uuid = require('node-uuid').v4;

this._version = 0;
}
this._commandQueue = new Queue();
};

@@ -60,2 +63,3 @@ Aggregate.prototype._rehydrate = function (events, version) {

}*/
return this;
};

@@ -68,3 +72,4 @@

try {
resolve(self._commandHandler.handle(self, command));
var handler = self._commandHandler.handle(self, command);
resolve(handler);
} catch (error) {

@@ -80,22 +85,31 @@ reject(error);

Aggregate.prototype._sink = function (command) {
LOG.info('sinking command %j', command);
if (!command.id) {
LOG.warn('No command id set, setting it automatiically');
command.id = uuid();
}
//console.log(command.aggregateId + " || " + this.id + " || " + this._state.id);
if (!command.type || !command.aggregateId || command.aggregateId != this.id) {
console.log(command);
var error = new Error('command is missing data', command);
LOG.error('Unable to sink command', error);
throw error;
}
if (this.type) {
command.aggregateType = this.type;
}
var result = this._commandSink.sink(command, this);
return _promise(result, 'sinking command but not returning promise, commands status and chaining might not work as expected');
}
Aggregate.prototype._sink = function (commandToSink) {
LOG.info('sinking command %j', commandToSink);
var self = this;
return this._commandQueue.queueCommand(function() {
var thenned = Promise.resolve(commandToSink);
thenned = thenned
.then(function(command) {
if (!command.id) {
LOG.warn('No command id set, setting it automatiically');
command.id = uuid();
}
// console.log(command.aggregateId + " || " + self.id);
if (!command.type || !command.aggregateId || command.aggregateId != self.id) {
console.log(command);
var error = new Error('command is missing data', command);
LOG.error('Unable to sink command', error);
throw error;
}
if (self.type) {
command.aggregateType = self.type;
}
var result = self._commandSink.sink(command, self);
return _promise(result, 'sinking command but not returning promise, commands status and chaining might not work as expected');
})
//console.log('thenn', thenned);
return thenned;
});
};

@@ -107,5 +121,16 @@ Aggregate.prototype.getVersion = function () {

Aggregate.prototype.getUncommittedEvents = function () {
//throw if async cmd is on queue
if (this._commandQueue.isProcessing()) {
throw new Error("Cannot get uncommitted events while there is still commands in queue - try using getUncommittedEventsAsync()")
}
return this._uncommittedEvents;
}
Aggregate.prototype.getUncommittedEventsAsync = function() {
var self = this;
return self._commandQueue.empty().then(function() {
return self.getUncommittedEvents();
})
}
Aggregate.prototype.clearUncommittedEvents = function () {

@@ -112,0 +137,0 @@ LOG.info('Clearing uncommitted events');

@@ -27,16 +27,19 @@ var uuid = require('node-uuid').v4;

return this._partition.openStream(aggregate.id).then(function (stream) {
var events = aggregate.getUncommittedEvents();
aggregate.clearUncommittedEvents();
savingWithId = savingWithId || uuid();
events.forEach(function (event) {
LOG.debug('%s append event - %s', self.aggregateType, event.id);
stream.append(event);
});
return stream.commit(savingWithId).then(function () {
LOG.info('%s committed %d events with %s id', self.aggregateType, events.length, commitId);
return aggregate;
}).error(function (e) {
LOG.error('%s unable to commit %d events with %s id', self.aggregateType, events.length, commitId, e);
throw e;
});
aggregate
.getUncommittedEventsAsync()
.then(function (events) {
aggregate.clearUncommittedEvents();
savingWithId = savingWithId || uuid();
events.forEach(function (event) {
LOG.debug('%s append event - %s', self.aggregateType, event.id);
stream.append(event);
});
return stream.commit(savingWithId).then(function () {
LOG.info('%s committed %d events with %s id', self.aggregateType, events.length, commitId);
return aggregate;
}).error(function (e) {
LOG.error('%s unable to commit %d events with %s id', self.aggregateType, events.length, commitId, e);
throw e;
});
});
}).nodeify(callback);

@@ -43,0 +46,0 @@ };

{
"name": "demeine",
"version": "0.1.7",
"version": "0.2.0",
"description": "DDDD - Distributed Domain Driven Design",

@@ -8,3 +8,3 @@ "main": "index.js",

"test": "mocha test --recursive",
"test.watch": "mocha test --watch --recursive",
"test.watch": "mocha test --color --watch --recursive",
"coverage": "istanbul cover node_modules/mocha/bin/_mocha -- --recursive test -u exports -R spec",

@@ -55,2 +55,2 @@ "coverage-start": "istanbul cover node_modules/mocha/bin/_mocha test --recursive -- -u exports -R spec && start coverage/lcov-report/index.html",

}
}
}

@@ -0,2 +1,9 @@

var Promise = require('bluebird');
var sdebug = require('slf-debug').default;
require('slf').LoggerFactory.setFactory(sdebug);
require('debug').enable('*');
var should = require('should');
var Location = require('./aggregates/location');
describe('Aggregate', function () {

@@ -7,5 +14,28 @@ describe('#_apply', function () {

loc.changeName('test');
loc.getUncommittedEvents().length.should.equal(1);
loc.getUncommittedEventsAsync().then(function (res) {
res.length.should.equal(1);
})
});
});
describe('#_sink (with promise)', function () {
it('_sink with promise should resolve promise before processing', function (done) {
var loc = new Location();
loc.changeNameAsync('FIRST-CHANGE');
loc.changeName('SECOND-CHANGE');
loc.changeNameAsync('THIRD-CHANGE');
should.throws(function () {
// Should throw if trying to get uncommitted events while still processing
loc.getUncommittedEvents();
});
loc.getUncommittedEventsAsync().then(function (res) {
res[0].payload.should.equal('FIRST-CHANGE');
res[1].payload.should.equal('SECOND-CHANGE');
res[2].payload.should.equal('THIRD-CHANGE');
res.length.should.equal(3);
done();
})
});
});
describe('#<promise> domain function', function () {

@@ -21,20 +51,56 @@ it('should wait for promise', function (done) {

var loc = new Location();
loc.failName('test').then(function (result) {
done(new Error('Unreachable'));
}).error(function (e) {
loc.getUncommittedEvents().length.should.equal(0);
done();
});
loc.failName('test')
.then(function (result) {
done(new Error('Unreachable'));
})
.error(function (e) {
loc.getUncommittedEventsAsync().then(function (res) {
res.length.should.equal(0);
done();
});
});
});
it.only('should return promise error when failure in process by throwing', function (done) {
it('should return promise error when failure in process by throwing', function (done) {
var loc = new Location();
loc.failName('fail early').then(function (result) {
console.log('fail 2');
done(new Error('Unreachable'));
}).error(function (e) {
loc.getUncommittedEvents().length.should.equal(0);
done();
});
loc.failName('fail early')
.then(function (result) {
done(new Error('Unreachable'));
})
.error(function (e) {
loc.getUncommittedEventsAsync().then(function (res) {
res.length.should.equal(0);
done();
});
});
});
it('should get error', function (done) {
var promise = giefPromisePlz();
promise
.then(function (res) {
console.log('res', res);
done(new Error('Unreachable'));
})
.error(function (err) {
console.log('ERR', err);
done();
});
});
});
});
var giefPromisePlz = function () {
return new Promise(function (resolve, reject) {
try {
var functionThatThrows = function () {
throw new Error('error is thrown!');
};
var test = functionThatThrows();
resolve(test);
} catch (error) {
reject(error);
}
}).error(function (error) {
console.log('Failed to process');
throw error;
});
}

@@ -12,2 +12,6 @@ var util=require('util');

// --------- CHANGE NAME
Location.prototype.changeName = function(newName) {

@@ -25,2 +29,27 @@ return this._sink({type:'location.change_name.command', payload:newName, aggregateId: 1 });

// --------- CHANGE NAME (PROMISE)
Location.prototype.changeNameAsync = function(newName) {
var promise = new Promise(function (resolve, reject) {
setTimeout(function () {
resolve({type: 'location.change_name.command', payload: newName, aggregateId: 1 })
}, 50)
});
return this._sink(promise);
};
Location.prototype.processChangeNameAsync = function(command) {
return this._apply({type:'location.changed_name.event', payload:command.payload, aggregateId: 1}, true);
};
Location.prototype.applyChangedNameAsync = function(event) {
//change local state if necessary for validation
};
// --------- FAIL NAME
Location.prototype.failName = function(newName) {

@@ -32,3 +61,2 @@ return this._sink({type:'location.fail_name.command', payload:newName, aggregateId: 1 });

var self = this;
console.log('PROCESS');
if(command.payload === 'fail early') {

@@ -35,0 +63,0 @@ throw new Error('Failing early');

@@ -0,1 +1,5 @@

// var sdebug = require('slf-debug').default;
// require('slf').LoggerFactory.setFactory(sdebug);
// require('debug').enable('*');
var should = require('should');

@@ -2,0 +6,0 @@ var Promise = require('bluebird');

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc