@daaku/kombat
Advanced tools
Comparing version 2.4.0 to 2.5.0
import { customAlphabet } from 'nanoid'; | ||
import { Mutex } from 'async-mutex'; | ||
import { murmurHashV3 } from './murmurhash.js'; | ||
@@ -228,2 +229,3 @@ const nanoid = customAlphabet('1234567890abcdefghijklmnopqrstuvwxyz', 16); | ||
#pending = new Set(); | ||
#syncMutex = new Mutex(); | ||
constructor(clock, remote, local) { | ||
@@ -311,19 +313,29 @@ this.clock = clock; | ||
async sync(since) { | ||
// Capture this at the onset to later send() calls don't affect us. | ||
const lastSync = this.clock.send().toJSON(); | ||
await this.saveClock(); | ||
// either the given since, or the last sync, or zero | ||
if (!since) { | ||
since = await this.local.get(kLastSync); | ||
// we use a mutex to prevent running multiple sync calls concurrently | ||
// because otherwise we may try to write the same messages in two separate | ||
// sync requests. one may be pending before we update the local clock, and | ||
// so the queryMessages below would include messages from that pending | ||
// request. a backend may choose to be idempotent and ignore the exact same | ||
// messages, which it should if possible. but for example with the | ||
// kombat-firestore backend this is not possible. | ||
const syncResponse = await this.#syncMutex.runExclusive(async () => { | ||
// Capture this at the onset to later send() calls don't affect us. | ||
const lastSync = this.clock.send().toJSON(); | ||
await this.saveClock(); | ||
// either the given since, or the last sync, or zero | ||
if (!since) { | ||
since = new Timestamp(0, 0, '0').toJSON(); // the begining of time | ||
since = await this.local.get(kLastSync); | ||
if (!since) { | ||
since = new Timestamp(0, 0, '0').toJSON(); // the begining of time | ||
} | ||
} | ||
} | ||
const toSend = await this.local.queryMessages(since); | ||
const syncResponse = await this.remote.sync({ | ||
merkle: this.clock.merkle.clone(), | ||
messages: toSend, | ||
const toSend = await this.local.queryMessages(since); | ||
const syncResponse = await this.remote.sync({ | ||
merkle: this.clock.merkle.clone(), | ||
messages: toSend, | ||
}); | ||
await this.recv(syncResponse.messages); | ||
await this.local.set(kLastSync, lastSync); | ||
return syncResponse; | ||
}); | ||
await this.recv(syncResponse.messages); | ||
await this.local.set(kLastSync, lastSync); | ||
// if we still have diffferences, we may need another sync. | ||
@@ -330,0 +342,0 @@ const diffTime = syncResponse.merkle.diff(this.clock.merkle); |
{ | ||
"name": "@daaku/kombat", | ||
"author": "Naitik Shah <n@daaku.org>", | ||
"version": "2.4.0", | ||
"version": "2.5.0", | ||
"description": "Infrastructure for CRDT powered applications.", | ||
@@ -22,6 +22,5 @@ "repository": "git@github.com:daaku/kombat", | ||
"scripts": { | ||
"build": "tsc", | ||
"test": "NODE_OPTIONS='--import tsx/esm' jasmine", | ||
"deploy": "npm run test && npm publish --access=public", | ||
"prepare": "npm run build" | ||
"prepare": "tsc" | ||
}, | ||
@@ -40,4 +39,5 @@ "keywords": [ | ||
"dependencies": { | ||
"async-mutex": "^0.4.0", | ||
"nanoid": "^5.0.1" | ||
} | ||
} |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
36151
505
2
+ Addedasync-mutex@^0.4.0
+ Addedasync-mutex@0.4.1(transitive)
+ Addedtslib@2.8.1(transitive)