New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

demeine

Package Overview
Dependencies
Maintainers
1
Versions
33
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

demeine - npm Package Compare versions

Comparing version 0.2.4 to 0.3.0

23

lib/aggregate.js

@@ -29,4 +29,8 @@ var util = require('util');

Aggregate.prototype._rehydrate = function (events, version) {
LOG.info('rehydrating aggregate with %d events to version %d', events.length, version);
Aggregate.prototype._rehydrate = function (events, version, snapshot) {
LOG.info('rehydrating aggregate with %d events to version %d has snapshot %s', events.length, version, snapshot !== undefined);
// do another way?
if (snapshot) {
this._state = snapshot;
}
for (var i = 0; i < events.length; i++) {

@@ -38,2 +42,6 @@ this._apply(events[i], false);

Aggregate.prototype._getSnapshot = function () {
return this._state;
}
Aggregate.prototype._apply = function (event, isNew) {

@@ -46,4 +54,2 @@ LOG.debug('applying event %j %s', event, isNew);

if (!event.type || !event.aggregateId || event.aggregateId != this.id) {
console.log(this);
console.log(event);
throw new Error('event is missing data', event);

@@ -89,6 +95,6 @@ }

var self = this;
return this._commandQueue.queueCommand(function() {
return this._commandQueue.queueCommand(function () {
var thenned = Promise.resolve(commandToSink);
thenned = thenned
.then(function(command) {
.then(function (command) {
if (!command.id) {

@@ -100,3 +106,2 @@ LOG.warn('No command id set, setting it automatiically');

if (!command.type || !command.aggregateId || command.aggregateId != self.id) {
console.log(command);
var error = new Error('command is missing data', command);

@@ -129,5 +134,5 @@ LOG.error('Unable to sink command', error);

Aggregate.prototype.getUncommittedEventsAsync = function() {
Aggregate.prototype.getUncommittedEventsAsync = function () {
var self = this;
return self._commandQueue.empty().then(function() {
return self._commandQueue.empty().then(function () {
if (self._commandQueue.isProcessing()) {

@@ -134,0 +139,0 @@ return self.getUncommittedEventsAsync();

@@ -13,4 +13,2 @@ var DefaultEventHandler = function() {

} else {
console.log(aggregate);
console.log('Unable to apply event ' + type + " || " + funcName);
throw new Error('Unable to apply event ' + type + " || " + funcName);

@@ -17,0 +15,0 @@ }

@@ -13,9 +13,25 @@ var uuid = require('uuid').v4;

LOG.info('%s findById(%s)', this.aggregateType, id);
var self = this;
var aggregate = this._factory(id);
return this._partition.openStream(id).then(function (stream) {
var events = stream.getCommittedEvents();
var version = stream.getVersion();
aggregate._rehydrate(events, version);
return aggregate;
}).nodeify(callback);
var hasSnapshot = this._partition.loadSnapshot !== undefined;
if (hasSnapshot) {
return self._partition.loadSnapshot(id).then(function (snapshot) {
return self._partition.queryStream(id, (snapshot && snapshot.version) || 0).then(function (commits) {
var events = [];
commits.forEach(function (commit) {
events = events.concat(commit.events);
});
var version = (snapshot && snapshot.version || 0) + events.length;
aggregate._rehydrate(events, version, snapshot && snapshot.snapshot);
return aggregate;
})
}).nodeify(callback);;
} else {
return this._partition.openStream(id).then(function (stream) {
var events = stream.getCommittedEvents();
var version = stream.getVersion();
aggregate._rehydrate(events, version);
return aggregate;
}).nodeify(callback);
}
};

@@ -46,2 +62,6 @@

LOG.info('%s committed %d events with %s id', self.aggregateType, events.length, commitId);
if(self._partition.storeSnapshot !== undefined) {
LOG.debug('Persisting snapshot for stream %s version %s', aggregate.id, aggregate.getVersion());
self._partition.storeSnapshot(aggregate.id, aggregate._getSnapshot(), aggregate.getVersion());
}
return aggregate;

@@ -48,0 +68,0 @@ }).error(function (e) {

{
"name": "demeine",
"version": "0.2.4",
"version": "0.3.0",
"description": "DDDD - Distributed Domain Driven Design",

@@ -5,0 +5,0 @@ "main": "index.js",

var Promise = require('bluebird');
var sdebug = require('slf-debug').default;
require('slf').LoggerFactory.setFactory(sdebug);
require('debug').enable('*');
//require('debug').enable('*');
var should = require('should');

@@ -44,3 +44,2 @@

loc.changeName('test').then(function (result) {
console.log(result);
done();

@@ -79,7 +78,5 @@ });

.then(function (res) {
console.log('res', res);
done(new Error('Unreachable'));
})
.error(function (err) {
console.log('ERR', err);
done();

@@ -103,5 +100,4 @@ });

}).error(function (error) {
console.log('Failed to process');
throw error;
});
}

@@ -1,8 +0,9 @@

var util=require('util');
var util = require('util');
var Aggregate = require('../..').Aggregate;
var Promise = require('bluebird');
var Location = function(commandSink, eventHandler) {
var Location = function (commandSink, eventHandler) {
this.id = 1
Aggregate.call(this, commandSink, eventHandler);
Aggregate.call(this, commandSink, eventHandler);
this._state = {};
}

@@ -16,12 +17,13 @@

Location.prototype.changeName = function(newName) {
return this._sink({type:'location.change_name.command', payload:newName, aggregateId: 1 });
Location.prototype.changeName = function (newName) {
return this._sink({ type: 'location.change_name.command', payload: newName, aggregateId: 1 });
};
Location.prototype.processChangeName = function(command) {
return this._apply({type:'location.changed_name.event', payload:command.payload, aggregateId: 1}, true);
Location.prototype.processChangeName = function (command) {
return this._apply({ type: 'location.changed_name.event', payload: command.payload, aggregateId: 1 }, true);
};
Location.prototype.applyChangedName = function(event) {
//change local state if necessary for validation
Location.prototype.applyChangedName = function (event) {
//change local state if necessary for validation
this._state.name = event.payload;
};

@@ -33,17 +35,17 @@

Location.prototype.changeNameAsync = function(newName) {
Location.prototype.changeNameAsync = function (newName) {
var promise = new Promise(function (resolve, reject) {
setTimeout(function () {
resolve({type: 'location.change_name.command', payload: newName, aggregateId: 1 })
resolve({ type: 'location.change_name.command', payload: newName, aggregateId: 1 })
}, 50)
});
return this._sink(promise);
return this._sink(promise);
};
Location.prototype.processChangeNameAsync = function(command) {
return this._apply({type:'location.changed_name.event', payload:command.payload, aggregateId: 1}, true);
Location.prototype.processChangeNameAsync = function (command) {
return this._apply({ type: 'location.changed_name.event', payload: command.payload, aggregateId: 1 }, true);
};
Location.prototype.applyChangedNameAsync = function(event) {
//change local state if necessary for validation
Location.prototype.applyChangedNameAsync = function (event) {
this._state.name = event.payload;
};

@@ -55,13 +57,13 @@

Location.prototype.failName = function(newName) {
return this._sink({type:'location.fail_name.command', payload:newName, aggregateId: 1 });
Location.prototype.failName = function (newName) {
return this._sink({ type: 'location.fail_name.command', payload: newName, aggregateId: 1 });
};
Location.prototype.processFailName = function(command) {
Location.prototype.processFailName = function (command) {
var self = this;
if(command.payload === 'fail early') {
if (command.payload === 'fail early') {
throw new Error('Failing early');
}
return new Promise(function(resolve, reject) {
self._apply({type:'location.changed_name.event', payload:command.payload, aggregateId: 1}, true);
return new Promise(function (resolve, reject) {
self._apply({ type: 'location.changed_name.event', payload: command.payload, aggregateId: 1 }, true);
reject(new Error('uh oh'))

@@ -71,4 +73,4 @@ });

Location.prototype.applyFailedName = function(event) {
//change local state if necessary for validation
Location.prototype.applyFailedName = function (event) {
//change local state if necessary for validation
};

@@ -75,0 +77,0 @@

@@ -13,59 +13,168 @@ // var sdebug = require('slf-debug').default;

describe('Repository', function() {
var Partition = function() {
this.openStream = function(streamId) {
var Stream = function() {
this.getCommittedEvents = function(){
return [];
};
this.getVersion = function(){
return -1;
}
this.append = function() {};
this.commit = function() { return Promise.resolve(null);};
}
return Promise.resolve(new Stream(streamId));
}
}
describe('#findById', function() {
describe('Repository', function () {
var Partition = function () {
this.openStream = function (streamId) {
var Stream = function () {
this.getCommittedEvents = function () {
return [];
};
this.getVersion = function () {
return -1;
}
this.append = function () { };
this.commit = function () { return Promise.resolve(null); };
}
return Promise.resolve(new Stream(streamId));
}
}
it('returns aggregate with version = -1 if new stream', function(done) {
var repo = new Repository(new Partition(), 'test_aggregate');
repo.findById('ID_THAT_DO_NOT_EXIST').then(function(aggregate) {
aggregate.getVersion().should.equal(-1);
done();
}).catch(function(err) {
done(err);
});;
});
var SnapshotPartition = function (snapshot, events) {
this._snapshot = snapshot;
this.loadSnapshot = function () {
return Promise.resolve(this._snapshot);
}
this.openStream = function (streamId) {
var Stream = function () {
this.getCommittedEvents = function () {
return [];
};
this.getVersion = function () {
return -1;
}
this.append = function () { };
this.commit = function () { return Promise.resolve(null); };
}
return Promise.resolve(new Stream(streamId));
}
this.storeSnapshot = function (id, snapshot, version) {
this._snapshot = { id: id, snapshot: snapshot, version: version };
return Promise.resolve(this._snapshot);
}
this.queryStream = function (id, fromEventSequence, callback) {
var result = [{ events: events }];
if (fromEventSequence > 0) {
var startCommitId = 0;
var foundEvents = 0;
for (var i = 0; i < result.length; i++) {
foundEvents += result[0].events.length;
startCommitId++;
if (foundEvents >= fromEventSequence) {
break;
}
}
var tooMany = foundEvents - fromEventSequence;
it('creates aggregates with custom factory', function(done) {
var factory = function() { return new Location();};
var repo = new Repository(new Partition(), 'location', factory);
repo.findById('ID_THAT_DO_NOT_EXIST').then(function(aggregate) {
if(aggregate instanceof Location) {
done();
} else {
done('Wrong type created');
}
}).catch(function(err) {
done(err);
});;
});
});
describe('#save', function() {
it('save should clear uncommitted events ', function(done) {
var factory = function() { return new Location();};
var repo = new Repository(new Partition(), 'location', factory);
repo.findById('ID_THAT_DO_NOT_EXIST').then(function(location) {
location.changeName('New Name');
repo.save(location).then(function(x) {
x.getUncommittedEvents().length.should.equal(0);
done();
});
}).catch(function(err) {
done(err);
});
});
});
result = result.slice(startCommitId - (tooMany > 0 ? 1 : 0));
if (tooMany > 0) {
result[0].events = result[0].events.slice(result[0].events.length - tooMany);
}
}
return Promise.resolve(result).nodeify(callback);
}
}
describe('#findById', function () {
it('returns aggregate with version = -1 if new stream', function (done) {
var repo = new Repository(new Partition(), 'test_aggregate');
repo.findById('ID_THAT_DO_NOT_EXIST').then(function (aggregate) {
aggregate.getVersion().should.equal(-1);
done();
}).catch(function (err) {
done(err);
});;
});
it('creates aggregates with custom factory', function (done) {
var factory = function () { return new Location(); };
var repo = new Repository(new Partition(), 'location', factory);
repo.findById('ID_THAT_DO_NOT_EXIST').then(function (aggregate) {
if (aggregate instanceof Location) {
done();
} else {
done('Wrong type created');
}
}).catch(function (err) {
done(err);
});;
});
it('hydrates aggregates with snapshot', function (done) {
var factory = function () { return new Location(); };
var repo = new Repository(new SnapshotPartition({ id: '1', version: 1, snapshot: { name: 'hello' } }, [{ id: 1, type: 'location.changed_name.event', payload: 'Hello' }]), 'location', factory);
repo.findById('1').then(function (aggregate) {
if (aggregate instanceof Location) {
aggregate._getSnapshot().name.should.equal('hello');
aggregate.getVersion().should.equal(1);
done();
} else {
done('Wrong type created');
}
}).catch(function (err) {
done(err);
});
});
it('hydrates aggregates with snapshot and events', function (done) {
var factory = function () { return new Location(); };
var repo = new Repository(new SnapshotPartition({ id: '1', version: 1, snapshot: { name: 'hello' } }, [{ id: 1, aggregateId: '1', type: 'location.changed_name.event', payload: 'Hello' }, { id: 2, aggregateId: '1', type: 'location.changed_name.event', payload: 'Hello, world' }]), 'location', factory);
repo.findById('1').then(function (aggregate) {
if (aggregate instanceof Location) {
aggregate._getSnapshot().name.should.equal('Hello, world');
aggregate.getVersion().should.equal(2);
done();
} else {
done('Wrong type created');
}
}).catch(function (err) {
done(err);
});
});
it('hydrates aggregates without snapshot', function (done) {
var factory = function () { return new Location(); };
var repo = new Repository(new SnapshotPartition(undefined, [{ id: 1, aggregateId: '1', type: 'location.changed_name.event', payload: 'Hello' }]), 'location', factory);
repo.findById('1').then(function (aggregate) {
if (aggregate instanceof Location) {
aggregate._getSnapshot().name.should.equal('Hello');
done();
} else {
done('Wrong type created');
}
}).catch(function (err) {
done(err);
});
});
it('stores snapshot for aggregate on save', function (done) {
var factory = function () { return new Location(); };
var part = new SnapshotPartition({ id: '1', version: 1, snapshot: { name: 'hello' } }, [{ id: 1, type: 'location.changed_name.event', payload: 'Hello' }]);
var repo = new Repository(part, 'location', factory);
repo.findById('1').then(function (aggregate) {
aggregate.changeName('Hello, World!');
repo.save(aggregate).then(function () {
part.loadSnapshot('1').then(function (snapshot) {
snapshot.snapshot.name.should.equal('Hello, World!');
done();
})
});
}).catch(function (err) {
done(err);
});;
});
});
describe('#save', function () {
it('save should clear uncommitted events ', function (done) {
var factory = function () { return new Location(); };
var repo = new Repository(new Partition(), 'location', factory);
repo.findById('ID_THAT_DO_NOT_EXIST').then(function (location) {
location.changeName('New Name');
repo.save(location).then(function (x) {
x.getUncommittedEvents().length.should.equal(0);
done();
});
}).catch(function (err) {
done(err);
});
});
});
});
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc