blue-rings
Advanced tools
Comparing version 1.1.1 to 2.0.0
@@ -0,5 +1,8 @@ | ||
counter = require './crdt-counter' | ||
register = require './crdt-register' | ||
BlueRingStore = require './store' | ||
BlueRingAxon = require './protocol' | ||
assert = require 'assert' | ||
Public API for a service storing EcmaScript numbers (transmitted as base-36 strings). | ||
Public API for a service storing EcmaScript counters and text. | ||
@@ -10,7 +13,77 @@ run = (options) -> | ||
{Value} = options | ||
{Value,host} = options | ||
{add} = Value | ||
{Counter} = counter(Value) | ||
Register = register.LWWRegister | ||
service = new BlueRingAxon options | ||
COUNTER = 'C' | ||
REGISTER = 'R' | ||
class Mux | ||
type: (type) -> | ||
@__type = type | ||
switch type | ||
when COUNTER | ||
@__value = new Counter host | ||
when REGISTER | ||
@__value = new Register() | ||
else | ||
throw new Error "Invalid type #{type}" | ||
null | ||
increment: (args...) -> | ||
unless @__value? | ||
throw new Error 'You must use `setup_counter` before using `update_counter`.' | ||
type = @__type | ||
change = @__value.increment args... | ||
return null unless change? | ||
[type,change...] | ||
assign: (args...) -> | ||
unless @__value? | ||
throw new Error 'You must use `setup_text` before using `update_text`.' | ||
type = @__type | ||
change = @__value.assign args... | ||
return null unless change? | ||
[type,change...] | ||
value: -> @__value.value() | ||
merge: ([type,rest...]) -> | ||
@type type if not @__type | ||
msg = @__value.merge rest | ||
msg[1].unshift type | ||
msg | ||
all: -> | ||
return [] unless @__value? | ||
type = @__type | ||
@__value | ||
.all() | ||
.map (rest) -> | ||
[type,rest...] | ||
@serialize: ([type,rest...]) -> | ||
switch type | ||
when COUNTER | ||
[type].concat Counter.serialize rest | ||
when REGISTER | ||
[type].concat Register.serialize rest | ||
else | ||
throw new Error "Invalid type #{type}" | ||
@deserialize: ([type,rest...]) -> | ||
switch type | ||
when COUNTER | ||
[type].concat Counter.deserialize rest | ||
when REGISTER | ||
[type].concat Register.deserialize rest | ||
new_crdt = -> new Mux() | ||
store = new BlueRingStore new_crdt, host | ||
service = new BlueRingAxon Mux, store, options | ||
once = (e) -> new Promise (resolve) -> service.ev.once e, resolve | ||
@@ -20,5 +93,5 @@ bound = once 'bind' | ||
get_counter = (name) -> | ||
get_value = (name) -> | ||
assert 'string' is typeof name, 'get_counter: name is required' | ||
value = service.get_value name | ||
value = store.get_value name | ||
coherent = service.coherent() | ||
@@ -30,3 +103,3 @@ [coherent,value] | ||
assert 'number' is typeof expire, 'setup_counter: expire is required' | ||
service.add_counter name, expire | ||
service.operation name, expire, 'type', COUNTER | ||
return | ||
@@ -38,6 +111,20 @@ | ||
service.add_amount name, amount, expire | ||
service.operation name, expire, 'increment', [amount] | ||
get_counter name | ||
get_value name | ||
setup_text = (name,expire) -> | ||
assert 'string' is typeof name, 'setup_text: name is required' | ||
assert 'number' is typeof expire, 'setup_text: expire is required' | ||
service.operation name, expire, 'type', REGISTER | ||
return | ||
update_text = (name,text,expire) -> | ||
assert 'string' is typeof name, 'update_counter: name is required' | ||
assert 'string' is typeof text, 'update_counter: text is required' | ||
service.operation name, expire, 'assign', [text] | ||
get_value name | ||
statistics = -> | ||
@@ -56,3 +143,15 @@ recv: service.recv | ||
{setup_counter,update_counter,get_counter,bound,connected,statistics,end,subscribe_to} | ||
{ | ||
setup_counter | ||
update_counter | ||
get_counter:get_value | ||
setup_text | ||
update_text | ||
get_text:get_value | ||
bound | ||
connected | ||
statistics | ||
end | ||
subscribe_to | ||
} | ||
@@ -62,4 +161,4 @@ `values` interface for integers | ||
integer_values = | ||
deserialize: (t) -> parseInt t, 36 | ||
serialize: (n) -> n.toString 36 | ||
deserialize: (t) -> parseInt t, 36 # protocol | ||
serialize: (n) -> n.toString 36 # protocol | ||
add: (n1,n2) -> n1+n2 | ||
@@ -73,3 +172,3 @@ equals: (n1,n2) -> n1 is n2 | ||
zero: 0 | ||
accept: Number | ||
accept: Number # used for testing | ||
@@ -79,4 +178,4 @@ `values` interface for big integers (require Node.js 10.7.0 or above) | ||
bigint_values = | ||
deserialize: (t) -> BigInt t | ||
serialize: (n) -> n.toString() | ||
deserialize: (t) -> BigInt t # protocol | ||
serialize: (n) -> n.toString() # protocol | ||
add: (n1,n2) -> n1+n2 | ||
@@ -90,4 +189,4 @@ equals: (n1,n2) -> n1 is n2 | ||
zero: BigInt 0 | ||
accept: BigInt | ||
accept: BigInt # used for testing | ||
module.exports = {run,integer:integer_values,bigint:bigint_values} |
161
index.js
// Generated by CoffeeScript 2.3.1 | ||
(function() { | ||
var BlueRingAxon, assert, bigint_values, integer_values, run; | ||
var BlueRingAxon, BlueRingStore, assert, bigint_values, counter, integer_values, register, run; | ||
counter = require('./crdt-counter'); | ||
register = require('./crdt-register'); | ||
BlueRingStore = require('./store'); | ||
BlueRingAxon = require('./protocol'); | ||
@@ -9,11 +15,106 @@ | ||
// Public API for a service storing EcmaScript numbers (transmitted as base-36 strings). | ||
// Public API for a service storing EcmaScript counters and text. | ||
run = function(options) { | ||
var Value, add, bound, connected, end, get_counter, once, service, setup_counter, statistics, subscribe_to, update_counter; | ||
var COUNTER, Counter, Mux, REGISTER, Register, Value, bound, connected, end, get_value, host, new_crdt, once, service, setup_counter, setup_text, statistics, store, subscribe_to, update_counter, update_text; | ||
if (options.Value == null) { | ||
options.Value = integer_values; | ||
} | ||
({Value} = options); | ||
({add} = Value); | ||
service = new BlueRingAxon(options); | ||
({Value, host} = options); | ||
({Counter} = counter(Value)); | ||
Register = register.LWWRegister; | ||
COUNTER = 'C'; | ||
REGISTER = 'R'; | ||
Mux = class Mux { | ||
type(type) { | ||
this.__type = type; | ||
switch (type) { | ||
case COUNTER: | ||
this.__value = new Counter(host); | ||
break; | ||
case REGISTER: | ||
this.__value = new Register(); | ||
break; | ||
default: | ||
throw new Error(`Invalid type ${type}`); | ||
} | ||
return null; | ||
} | ||
increment(...args) { | ||
var change, type; | ||
if (this.__value == null) { | ||
throw new Error('You must use `setup_counter` before using `update_counter`.'); | ||
} | ||
type = this.__type; | ||
change = this.__value.increment(...args); | ||
if (change == null) { | ||
return null; | ||
} | ||
return [type, ...change]; | ||
} | ||
assign(...args) { | ||
var change, type; | ||
if (this.__value == null) { | ||
throw new Error('You must use `setup_text` before using `update_text`.'); | ||
} | ||
type = this.__type; | ||
change = this.__value.assign(...args); | ||
if (change == null) { | ||
return null; | ||
} | ||
return [type, ...change]; | ||
} | ||
value() { | ||
return this.__value.value(); | ||
} | ||
merge([type, ...rest]) { | ||
var msg; | ||
if (!this.__type) { | ||
this.type(type); | ||
} | ||
msg = this.__value.merge(rest); | ||
msg[1].unshift(type); | ||
return msg; | ||
} | ||
all() { | ||
var type; | ||
if (this.__value == null) { | ||
return []; | ||
} | ||
type = this.__type; | ||
return this.__value.all().map(function(rest) { | ||
return [type, ...rest]; | ||
}); | ||
} | ||
static serialize([type, ...rest]) { | ||
switch (type) { | ||
case COUNTER: | ||
return [type].concat(Counter.serialize(rest)); | ||
case REGISTER: | ||
return [type].concat(Register.serialize(rest)); | ||
default: | ||
throw new Error(`Invalid type ${type}`); | ||
} | ||
} | ||
static deserialize([type, ...rest]) { | ||
switch (type) { | ||
case COUNTER: | ||
return [type].concat(Counter.deserialize(rest)); | ||
case REGISTER: | ||
return [type].concat(Register.deserialize(rest)); | ||
} | ||
} | ||
}; | ||
new_crdt = function() { | ||
return new Mux(); | ||
}; | ||
store = new BlueRingStore(new_crdt, host); | ||
service = new BlueRingAxon(Mux, store, options); | ||
once = function(e) { | ||
@@ -26,6 +127,6 @@ return new Promise(function(resolve) { | ||
connected = once('connected'); | ||
get_counter = function(name) { | ||
get_value = function(name) { | ||
var coherent, value; | ||
assert('string' === typeof name, 'get_counter: name is required'); | ||
value = service.get_value(name); | ||
value = store.get_value(name); | ||
coherent = service.coherent(); | ||
@@ -37,3 +138,3 @@ return [coherent, value]; | ||
assert('number' === typeof expire, 'setup_counter: expire is required'); | ||
service.add_counter(name, expire); | ||
service.operation(name, expire, 'type', COUNTER); | ||
}; | ||
@@ -43,5 +144,16 @@ update_counter = function(name, amount, expire) { | ||
assert(amount != null, 'update_counter: amount is required'); | ||
service.add_amount(name, amount, expire); | ||
return get_counter(name); | ||
service.operation(name, expire, 'increment', [amount]); | ||
return get_value(name); | ||
}; | ||
setup_text = function(name, expire) { | ||
assert('string' === typeof name, 'setup_text: name is required'); | ||
assert('number' === typeof expire, 'setup_text: expire is required'); | ||
service.operation(name, expire, 'type', REGISTER); | ||
}; | ||
update_text = function(name, text, expire) { | ||
assert('string' === typeof name, 'update_counter: name is required'); | ||
assert('string' === typeof text, 'update_counter: text is required'); | ||
service.operation(name, expire, 'assign', [text]); | ||
return get_value(name); | ||
}; | ||
statistics = function() { | ||
@@ -61,3 +173,15 @@ return { | ||
}; | ||
return {setup_counter, update_counter, get_counter, bound, connected, statistics, end, subscribe_to}; | ||
return { | ||
setup_counter, | ||
update_counter, | ||
get_counter: get_value, | ||
setup_text, | ||
update_text, | ||
get_text: get_value, | ||
bound, | ||
connected, | ||
statistics, | ||
end, | ||
subscribe_to | ||
}; | ||
}; | ||
@@ -68,6 +192,6 @@ | ||
deserialize: function(t) { | ||
return parseInt(t, 36); | ||
return parseInt(t, 36); // protocol | ||
}, | ||
serialize: function(n) { | ||
return n.toString(36); | ||
return n.toString(36); // protocol | ||
}, | ||
@@ -104,12 +228,13 @@ add: function(n1, n2) { | ||
zero: 0, | ||
accept: Number | ||
accept: Number // used for testing | ||
}; | ||
// `values` interface for big integers (require Node.js 10.7.0 or above) | ||
bigint_values = { | ||
deserialize: function(t) { | ||
return BigInt(t); | ||
return BigInt(t); // protocol | ||
}, | ||
serialize: function(n) { | ||
return n.toString(); | ||
return n.toString(); // protocol | ||
}, | ||
@@ -146,3 +271,3 @@ add: function(n1, n2) { | ||
zero: BigInt(0), | ||
accept: BigInt | ||
accept: BigInt // used for testing | ||
}; | ||
@@ -149,0 +274,0 @@ |
{ | ||
"name": "blue-rings", | ||
"version": "1.1.1", | ||
"version": "2.0.0", | ||
"description": "Blue Rings: distributed counters", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -5,4 +5,2 @@ DEFAULT_PORT = 4000 | ||
EXPIRE = 'expire' | ||
From a protocol perspective, this should use SCTP as the underlying protocol. | ||
@@ -16,3 +14,2 @@ However SCTP is not in libuv, and therefor not in Node.js. | ||
Axon = require 'axon' | ||
BlueRing = require './store' | ||
{EventEmitter} = require 'events' | ||
@@ -30,5 +27,5 @@ | ||
class BlueRingAxon extends BlueRing | ||
constructor: (options) -> | ||
super options.Value | ||
class BlueRingAxon | ||
constructor: ({@serialize,@deserialize},store,options) -> | ||
@store = store | ||
@@ -91,3 +88,3 @@ Statistics | ||
destructor: -> | ||
super() | ||
@store.destructor() | ||
@close() | ||
@@ -101,10 +98,6 @@ @ev.removeAllListeners() | ||
add_counter: (name,expire) -> | ||
data = super name, expire | ||
operation: (name,expire,op,args) -> | ||
data = @store.operation name, expire, op, args | ||
@send_data data, [], @pub if data? | ||
add_amount: (name,amount,expire) -> | ||
data = super name, amount, expire | ||
@send_data data, [], @pub if data? | ||
postpone: (name,delay,f) -> | ||
@@ -126,4 +119,2 @@ if @__sendall.has name | ||
deserialize = ([dir,host,value]) => [dir,host,(@Value.deserialize value)] | ||
receive = wrap (msg) => | ||
@@ -151,3 +142,3 @@ switch | ||
changes = msg.c.map deserialize | ||
changes = msg.c.map @deserialize | ||
@@ -160,3 +151,3 @@ # console.log 'received', @host, name, msg.e, changes, msg.R | ||
res = @on_send name, msg.e, changes, sub | ||
res = @store.on_send name, msg.e, changes, sub | ||
@postpone name, @forward_delay, => | ||
@@ -219,3 +210,3 @@ @send_data res, msg.R, null | ||
on_connect: (sub) -> | ||
@enumerate_local_counters (res) => | ||
@store.enumerate_local_values (res) => | ||
{name} = res | ||
@@ -237,8 +228,6 @@ @postpone name, @connect_delay, => | ||
serialize = ([dir,host,value]) => [dir,host,(@Value.serialize value)] | ||
msg = | ||
n: name | ||
e: expire | ||
c: changes.map serialize | ||
c: changes.map @serialize | ||
s: source | ||
@@ -245,0 +234,0 @@ R: [@host,route...] |
// Generated by CoffeeScript 2.3.1 | ||
(function() { | ||
var Axon, BlueRing, BlueRingAxon, DEFAULT_PORT, EXPIRE, EventEmitter, PING_INTERVAL, PING_PACKET, sleep, wrap, | ||
var Axon, BlueRingAxon, DEFAULT_PORT, EventEmitter, PING_INTERVAL, PING_PACKET, sleep, wrap, | ||
indexOf = [].indexOf; | ||
@@ -12,4 +12,2 @@ | ||
EXPIRE = 'expire'; | ||
// From a protocol perspective, this should use SCTP as the underlying protocol. | ||
@@ -23,4 +21,2 @@ // However SCTP is not in libuv, and therefor not in Node.js. | ||
BlueRing = require('./store'); | ||
({EventEmitter} = require('events')); | ||
@@ -48,6 +44,8 @@ | ||
BlueRingAxon = class BlueRingAxon extends BlueRing { | ||
constructor(options) { | ||
BlueRingAxon = class BlueRingAxon { | ||
constructor({serialize, deserialize}, store, options) { | ||
var flood, ping, ref, subscribe_to; | ||
super(options.Value); | ||
this.serialize = serialize; | ||
this.deserialize = deserialize; | ||
this.store = store; | ||
// Statistics | ||
@@ -96,3 +94,3 @@ this.recv = BigInt(0); | ||
destructor() { | ||
super.destructor(); | ||
this.store.destructor(); | ||
this.close(); | ||
@@ -105,5 +103,5 @@ this.ev.removeAllListeners(); | ||
// Public operations | ||
add_counter(name, expire) { | ||
operation(name, expire, op, args) { | ||
var data; | ||
data = super.add_counter(name, expire); | ||
data = this.store.operation(name, expire, op, args); | ||
if (data != null) { | ||
@@ -114,10 +112,2 @@ return this.send_data(data, [], this.pub); | ||
add_amount(name, amount, expire) { | ||
var data; | ||
data = super.add_amount(name, amount, expire); | ||
if (data != null) { | ||
return this.send_data(data, [], this.pub); | ||
} | ||
} | ||
postpone(name, delay, f) { | ||
@@ -131,3 +121,3 @@ if (this.__sendall.has(name)) { | ||
subscribe_to(o) { | ||
var connected, deserialize, monitor, ping_received, receive, sub, timer; | ||
var connected, monitor, ping_received, receive, sub, timer; | ||
sub = Axon.socket('sub'); | ||
@@ -141,5 +131,2 @@ sub.connect(o); | ||
// - `new-tickets(name,value,array-of-tickets)` is encoded as :1 | ||
deserialize = ([dir, host, value]) => { | ||
return [dir, host, this.Value.deserialize(value)]; | ||
}; | ||
receive = wrap((msg) => { | ||
@@ -168,3 +155,3 @@ var changes, name, ref, res; | ||
} | ||
changes = msg.c.map(deserialize); | ||
changes = msg.c.map(this.deserialize); | ||
// console.log 'received', @host, name, msg.e, changes, msg.R | ||
@@ -175,3 +162,3 @@ | ||
// However this is not very reliable because things are lossy. It's better to send the entire set every time we get an update, which gives a chance to server which are behind to catch up. | ||
res = this.on_send(name, msg.e, changes, sub); | ||
res = this.store.on_send(name, msg.e, changes, sub); | ||
this.postpone(name, this.forward_delay, () => { | ||
@@ -251,3 +238,3 @@ return this.send_data(res, msg.R, null); | ||
on_connect(sub) { | ||
this.enumerate_local_counters(async(res) => { | ||
this.store.enumerate_local_values(async(res) => { | ||
var name; | ||
@@ -268,11 +255,8 @@ ({name} = res); | ||
send_data({name, expire, changes, source}, route, socket) { | ||
var msg, serialize; | ||
var msg; | ||
// console.log 'send_data', @host, name, expire, changes, source, route | ||
serialize = ([dir, host, value]) => { | ||
return [dir, host, this.Value.serialize(value)]; | ||
}; | ||
msg = { | ||
n: name, | ||
e: expire, | ||
c: changes.map(serialize), | ||
c: changes.map(this.serialize), | ||
s: source, | ||
@@ -279,0 +263,0 @@ R: [this.host, ...route] |
@@ -1,74 +0,6 @@ | ||
EXPIRE = 'expire' | ||
PLUS = '+' | ||
MINUS = '-' | ||
COUNTER = 'counter' | ||
EXPIRE = 'E' | ||
VALUE = 'V' | ||
nextTick = -> new Promise process.nextTick | ||
Delta-state C(v)RDT | ||
We implement the CvRDT here. The delta-state is handled by the protocol. | ||
class GrowCounter | ||
See e.g. [GrowCounter](https://en.wikipedia.org/wiki/Conflict-free_replicated_data_type#G-Counter_(Grow-only_Counter)) | ||
We store the increments for each node we know about. | ||
constructor: (@Value,@me) -> | ||
@increments = new Map() | ||
increment: (amount) -> | ||
increment = @Value.add amount, (@increments.get @me) ? @Value.zero | ||
@increments.set @me, increment | ||
increment | ||
update: (source,increment) -> | ||
previous = (@increments.get source) ? @Value.zero | ||
new_increment = @Value.max previous, increment | ||
@increments.set source, new_increment | ||
new_increment | ||
value: -> | ||
value = @Value.zero | ||
for v from @increments.values() | ||
value = @Value.add value, v | ||
value | ||
all: -> | ||
@increments.entries() | ||
class Counter | ||
constructor: (@Value,@me) -> | ||
@pluses = new GrowCounter @Value, @me | ||
@minuses = new GrowCounter @Value, @me | ||
increment: (amount) -> | ||
switch | ||
when @Value.is_positive amount | ||
[PLUS, @me, @pluses.increment amount] | ||
when @Value.is_negative amount | ||
[MINUS, @me, @minuses.increment @Value.abs amount] | ||
else | ||
null | ||
update: (dir,source,increment) -> | ||
switch dir | ||
when PLUS | ||
@pluses.update source, increment | ||
when MINUS | ||
@minuses.update source, increment | ||
value: -> | ||
@Value.subtract @pluses.value(), @minuses.value() | ||
all: -> | ||
all = [] | ||
for [source,increment] from @pluses.all() | ||
all.push [PLUS,source,increment] | ||
for [source,increment] from @minuses.all() | ||
all.push [MINUS,source,increment] | ||
all | ||
expired = (expire) -> expire < Date.now() | ||
@@ -83,6 +15,6 @@ | ||
class BlueRing | ||
constructor: (@Value,@host) -> | ||
class BlueRingStore | ||
constructor: (@new_crdt,@host) -> | ||
@store = new Map() | ||
@__collector = setInterval collect, 3600*1000, @store | ||
@__collector = setInterval collect.bind(this), 3600*1000, @store | ||
@@ -95,8 +27,2 @@ destructor: -> | ||
add_counter: (name,expire) -> | ||
@add_local_amount name, @Value.zero, expire | ||
add_amount: (name,amount,expire) -> | ||
@add_local_amount name, amount, expire | ||
get_expire: (name) -> | ||
@@ -111,3 +37,3 @@ @store.get(name)?.get EXPIRE | ||
@store.get name | ||
L.get(COUNTER).value() | ||
L.get(VALUE).value() | ||
else | ||
@@ -119,3 +45,3 @@ @store.delete name | ||
__counter: (name,expire) -> | ||
__retrieve: (name,expire) -> | ||
L = @store.get(name) | ||
@@ -128,3 +54,3 @@ | ||
L = new Map() | ||
L.set COUNTER, new Counter(@Value,@host) | ||
L.set VALUE, @new_crdt() | ||
if expire? | ||
@@ -140,11 +66,14 @@ L.set EXPIRE, expire | ||
add_local_amount: (name,amount,expire) -> | ||
operation: (name,expire,op,args) -> | ||
L = @__counter name, expire | ||
L = @__retrieve name, expire | ||
change = L.get(COUNTER).increment amount | ||
change = L.get(VALUE)[op]? args... | ||
expire_now = L.get EXPIRE | ||
return if expire is expire_now and not change? | ||
{name,expire:expire_now,changes:[change],source:@host} | ||
if change? | ||
{name,expire:expire_now,changes:[change],source:@host} | ||
else | ||
{name,expire:expire_now,changes:[],source:@host} | ||
@@ -154,11 +83,10 @@ Message handlers | ||
on_new_changes: (name,expire,changes,source,socket) -> | ||
L = @__counter name, expire | ||
counter = L.get COUNTER | ||
L = @__retrieve name, expire | ||
value = L.get VALUE | ||
changed = false | ||
forward = changes | ||
.map ([dir,source,increment]) => | ||
new_increment = counter.update dir,source,increment | ||
unless @Value.equals increment, new_increment | ||
changed = true | ||
[ dir, source, new_increment ] | ||
.map (msg) => | ||
[ modified, msg ] = value.merge msg | ||
changed = true if modified | ||
msg | ||
@@ -169,19 +97,19 @@ expire_now = L.get EXPIRE | ||
on_send: (name,expire,changes,socket) -> | ||
L = @__counter name, expire | ||
counter = L.get COUNTER | ||
changes.forEach ([dir,source,increment]) => | ||
counter.update dir,source,increment | ||
L = @__retrieve name, expire | ||
value = L.get VALUE | ||
changes.forEach (msg) -> | ||
value.merge msg | ||
expire = L.get EXPIRE | ||
changes = counter.all() | ||
changes = value.all() | ||
{name,expire,changes,source:@host} | ||
enumerate_local_counters: (cb) -> | ||
enumerate_local_values: (cb) -> | ||
for [name,L] from @store.entries() | ||
expire = L.get EXPIRE | ||
unless expired expire | ||
changes = L.get(COUNTER).all() | ||
changes = L.get(VALUE).all() | ||
await cb {name,expire,changes,source:@host} | ||
return | ||
module.exports = BlueRing | ||
module.exports = BlueRingStore |
188
store.js
// Generated by CoffeeScript 2.3.1 | ||
(function() { | ||
var BlueRing, COUNTER, Counter, EXPIRE, GrowCounter, MINUS, PLUS, collect, expired, nextTick; | ||
var BlueRingStore, EXPIRE, VALUE, collect, expired, nextTick; | ||
EXPIRE = 'expire'; | ||
EXPIRE = 'E'; | ||
PLUS = '+'; | ||
VALUE = 'V'; | ||
MINUS = '-'; | ||
COUNTER = 'counter'; | ||
nextTick = function() { | ||
@@ -17,95 +13,2 @@ return new Promise(process.nextTick); | ||
// Delta-state C(v)RDT | ||
// We implement the CvRDT here. The delta-state is handled by the protocol. | ||
GrowCounter = class GrowCounter { | ||
// See e.g. [GrowCounter](https://en.wikipedia.org/wiki/Conflict-free_replicated_data_type#G-Counter_(Grow-only_Counter)) | ||
// We store the increments for each node we know about. | ||
constructor(Value, me) { | ||
this.Value = Value; | ||
this.me = me; | ||
this.increments = new Map(); | ||
} | ||
increment(amount) { | ||
var increment, ref; | ||
increment = this.Value.add(amount, (ref = this.increments.get(this.me)) != null ? ref : this.Value.zero); | ||
this.increments.set(this.me, increment); | ||
return increment; | ||
} | ||
update(source, increment) { | ||
var new_increment, previous, ref; | ||
previous = (ref = this.increments.get(source)) != null ? ref : this.Value.zero; | ||
new_increment = this.Value.max(previous, increment); | ||
this.increments.set(source, new_increment); | ||
return new_increment; | ||
} | ||
value() { | ||
var ref, v, value; | ||
value = this.Value.zero; | ||
ref = this.increments.values(); | ||
for (v of ref) { | ||
value = this.Value.add(value, v); | ||
} | ||
return value; | ||
} | ||
all() { | ||
return this.increments.entries(); | ||
} | ||
}; | ||
Counter = class Counter { | ||
constructor(Value, me) { | ||
this.Value = Value; | ||
this.me = me; | ||
this.pluses = new GrowCounter(this.Value, this.me); | ||
this.minuses = new GrowCounter(this.Value, this.me); | ||
} | ||
increment(amount) { | ||
switch (false) { | ||
case !this.Value.is_positive(amount): | ||
return [PLUS, this.me, this.pluses.increment(amount)]; | ||
case !this.Value.is_negative(amount): | ||
return [MINUS, this.me, this.minuses.increment(this.Value.abs(amount))]; | ||
default: | ||
return null; | ||
} | ||
} | ||
update(dir, source, increment) { | ||
switch (dir) { | ||
case PLUS: | ||
return this.pluses.update(source, increment); | ||
case MINUS: | ||
return this.minuses.update(source, increment); | ||
} | ||
} | ||
value() { | ||
return this.Value.subtract(this.pluses.value(), this.minuses.value()); | ||
} | ||
all() { | ||
var all, increment, ref, ref1, source, x, y; | ||
all = []; | ||
ref = this.pluses.all(); | ||
for (x of ref) { | ||
[source, increment] = x; | ||
all.push([PLUS, source, increment]); | ||
} | ||
ref1 = this.minuses.all(); | ||
for (y of ref1) { | ||
[source, increment] = y; | ||
all.push([MINUS, source, increment]); | ||
} | ||
return all; | ||
} | ||
}; | ||
expired = function(expire) { | ||
@@ -128,8 +31,8 @@ return expire < Date.now(); | ||
BlueRing = class BlueRing { | ||
constructor(Value, host) { | ||
this.Value = Value; | ||
BlueRingStore = class BlueRingStore { | ||
constructor(new_crdt, host) { | ||
this.new_crdt = new_crdt; | ||
this.host = host; | ||
this.store = new Map(); | ||
this.__collector = setInterval(collect, 3600 * 1000, this.store); | ||
this.__collector = setInterval(collect.bind(this), 3600 * 1000, this.store); | ||
} | ||
@@ -142,10 +45,2 @@ | ||
// Public operations | ||
add_counter(name, expire) { | ||
return this.add_local_amount(name, this.Value.zero, expire); | ||
} | ||
add_amount(name, amount, expire) { | ||
return this.add_local_amount(name, amount, expire); | ||
} | ||
get_expire(name) { | ||
@@ -165,3 +60,3 @@ var ref; | ||
this.store.get(name); | ||
return L.get(COUNTER).value(); | ||
return L.get(VALUE).value(); | ||
} else { | ||
@@ -174,3 +69,3 @@ this.store.delete(name); | ||
// Private operations | ||
__counter(name, expire) { | ||
__retrieve(name, expire) { | ||
var L; | ||
@@ -186,3 +81,3 @@ L = this.store.get(name); | ||
L = new Map(); | ||
L.set(COUNTER, new Counter(this.Value, this.host)); | ||
L.set(VALUE, this.new_crdt()); | ||
if (expire != null) { | ||
@@ -199,6 +94,6 @@ L.set(EXPIRE, expire); | ||
// Tool | ||
add_local_amount(name, amount, expire) { | ||
var L, change, expire_now; | ||
L = this.__counter(name, expire); | ||
change = L.get(COUNTER).increment(amount); | ||
operation(name, expire, op, args) { | ||
var L, base, change, expire_now; | ||
L = this.__retrieve(name, expire); | ||
change = typeof (base = L.get(VALUE))[op] === "function" ? base[op](...args) : void 0; | ||
expire_now = L.get(EXPIRE); | ||
@@ -208,8 +103,17 @@ if (expire === expire_now && (change == null)) { | ||
} | ||
return { | ||
name, | ||
expire: expire_now, | ||
changes: [change], | ||
source: this.host | ||
}; | ||
if (change != null) { | ||
return { | ||
name, | ||
expire: expire_now, | ||
changes: [change], | ||
source: this.host | ||
}; | ||
} else { | ||
return { | ||
name, | ||
expire: expire_now, | ||
changes: [], | ||
source: this.host | ||
}; | ||
} | ||
} | ||
@@ -219,13 +123,13 @@ | ||
on_new_changes(name, expire, changes, source, socket) { | ||
var L, changed, counter, expire_now, forward; | ||
L = this.__counter(name, expire); | ||
counter = L.get(COUNTER); | ||
var L, changed, expire_now, forward, value; | ||
L = this.__retrieve(name, expire); | ||
value = L.get(VALUE); | ||
changed = false; | ||
forward = changes.map(([dir, source, increment]) => { | ||
var new_increment; | ||
new_increment = counter.update(dir, source, increment); | ||
if (!this.Value.equals(increment, new_increment)) { | ||
forward = changes.map((msg) => { | ||
var modified; | ||
[modified, msg] = value.merge(msg); | ||
if (modified) { | ||
changed = true; | ||
} | ||
return [dir, source, new_increment]; | ||
return msg; | ||
}); | ||
@@ -243,10 +147,10 @@ expire_now = L.get(EXPIRE); | ||
on_send(name, expire, changes, socket) { | ||
var L, counter; | ||
L = this.__counter(name, expire); | ||
counter = L.get(COUNTER); | ||
changes.forEach(([dir, source, increment]) => { | ||
return counter.update(dir, source, increment); | ||
var L, value; | ||
L = this.__retrieve(name, expire); | ||
value = L.get(VALUE); | ||
changes.forEach(function(msg) { | ||
return value.merge(msg); | ||
}); | ||
expire = L.get(EXPIRE); | ||
changes = counter.all(); | ||
changes = value.all(); | ||
return { | ||
@@ -260,3 +164,3 @@ name, | ||
async enumerate_local_counters(cb) { | ||
async enumerate_local_values(cb) { | ||
var L, changes, expire, name, ref, x; | ||
@@ -268,3 +172,3 @@ ref = this.store.entries(); | ||
if (!expired(expire)) { | ||
changes = L.get(COUNTER).all(); | ||
changes = L.get(VALUE).all(); | ||
await cb({ | ||
@@ -282,4 +186,4 @@ name, | ||
module.exports = BlueRing; | ||
module.exports = BlueRingStore; | ||
}).call(this); |
@@ -415,2 +415,4 @@ {expect} = chai = require 'chai' | ||
NAME = 'lion' | ||
for m in ms | ||
m.setup_counter NAME, Date.now()+80000 | ||
@@ -417,0 +419,0 @@ start = Date.now() |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
83842
16
805