cojson-storage-sqlite
Advanced tools
Comparing version
@@ -1,5 +0,2 @@ | ||
import { cojsonInternals, | ||
// CojsonInternalTypes, | ||
// SessionID, | ||
} from "cojson"; | ||
import { cojsonInternals, MAX_RECOMMENDED_TX_SIZE, } from "cojson"; | ||
import Database from "better-sqlite3"; | ||
@@ -37,3 +34,3 @@ export class SQLiteStorage { | ||
idx INTEGER, | ||
tx TEXT NOT NULL , | ||
tx TEXT NOT NULL, | ||
PRIMARY KEY (ses, idx) | ||
@@ -62,3 +59,5 @@ ) WITHOUT ROWID;`).run(); | ||
console.log("Migration 1 -> 2: Fix off-by-one error for transaction indices"); | ||
const txs = db.prepare(`SELECT * FROM transactions`).all(); | ||
const txs = db | ||
.prepare(`SELECT * FROM transactions`) | ||
.all(); | ||
for (const tx of txs) { | ||
@@ -72,2 +71,14 @@ db.prepare(`DELETE FROM transactions WHERE ses = ? AND idx = ?`).run(tx.ses, tx.idx); | ||
} | ||
if (oldVersion <= 2) { | ||
console.log("Migration 2 -> 3: Add signatureAfter"); | ||
db.prepare(`CREATE TABLE IF NOT EXISTS signatureAfter ( | ||
ses INTEGER, | ||
idx INTEGER, | ||
signature TEXT NOT NULL, | ||
PRIMARY KEY (ses, idx) | ||
) WITHOUT ROWID;`).run(); | ||
db.prepare(`ALTER TABLE sessions ADD COLUMN bytesSinceLastSignature INTEGER;`).run(); | ||
db.pragma("user_version = 3"); | ||
console.log("Migration 2 -> 3: Add signatureAfter - done"); | ||
} | ||
return new SQLiteStorage(db, fromLocalNode, toLocalNode); | ||
@@ -107,8 +118,10 @@ } | ||
JSON.parse(coValueRow.header)); | ||
const newContent = { | ||
action: "content", | ||
id: theirKnown.id, | ||
header: theirKnown.header ? undefined : parsedHeader, | ||
new: {}, | ||
}; | ||
const newContentPieces = [ | ||
{ | ||
action: "content", | ||
id: theirKnown.id, | ||
header: theirKnown.header ? undefined : parsedHeader, | ||
new: {}, | ||
}, | ||
]; | ||
for (const sessionRow of allOurSessions) { | ||
@@ -119,14 +132,52 @@ ourKnown.sessions[sessionRow.sessionID] = sessionRow.lastIdx; | ||
const firstNewTxIdx = theirKnown.sessions[sessionRow.sessionID] || 0; | ||
const signaturesAndIdxs = this.db | ||
.prepare(`SELECT * FROM signatureAfter WHERE ses = ? AND idx >= ?`) | ||
.all(sessionRow.rowID, firstNewTxIdx); | ||
// console.log( | ||
// theirKnown.id, | ||
// "signaturesAndIdxs", | ||
// JSON.stringify(signaturesAndIdxs) | ||
// ); | ||
const newTxInSession = this.db | ||
.prepare(`SELECT * FROM transactions WHERE ses = ? AND idx >= ?`) | ||
.all(sessionRow.rowID, firstNewTxIdx); | ||
newContent.new[sessionRow.sessionID] = { | ||
after: firstNewTxIdx, | ||
lastSignature: sessionRow.lastSignature, | ||
newTransactions: newTxInSession.map((row) => JSON.parse(row.tx)), | ||
}; | ||
let idx = firstNewTxIdx; | ||
// console.log( | ||
// theirKnown.id, | ||
// "newTxInSession", | ||
// newTxInSession.length | ||
// ); | ||
for (const tx of newTxInSession) { | ||
let sessionEntry = newContentPieces[newContentPieces.length - 1].new[sessionRow.sessionID]; | ||
if (!sessionEntry) { | ||
sessionEntry = { | ||
after: idx, | ||
lastSignature: "WILL_BE_REPLACED", | ||
newTransactions: [], | ||
}; | ||
newContentPieces[newContentPieces.length - 1].new[sessionRow.sessionID] = sessionEntry; | ||
} | ||
sessionEntry.newTransactions.push(JSON.parse(tx.tx)); | ||
if (signaturesAndIdxs[0] && | ||
idx === signaturesAndIdxs[0].idx) { | ||
sessionEntry.lastSignature = | ||
signaturesAndIdxs[0].signature; | ||
signaturesAndIdxs.shift(); | ||
newContentPieces.push({ | ||
action: "content", | ||
id: theirKnown.id, | ||
new: {}, | ||
}); | ||
} | ||
else if (idx === | ||
firstNewTxIdx + newTxInSession.length - 1) { | ||
sessionEntry.lastSignature = sessionRow.lastSignature; | ||
} | ||
idx += 1; | ||
} | ||
} | ||
} | ||
const dependedOnCoValues = parsedHeader?.ruleset.type === "group" | ||
? Object.values(newContent.new).flatMap((sessionEntry) => sessionEntry.newTransactions.flatMap((tx) => { | ||
? newContentPieces | ||
.flatMap((piece) => Object.values(piece.new)).flatMap((sessionEntry) => sessionEntry.newTransactions.flatMap((tx) => { | ||
if (tx.privacy !== "trusting") | ||
@@ -157,4 +208,7 @@ return []; | ||
}); | ||
if (newContent.header || Object.keys(newContent.new).length > 0) { | ||
await this.toLocalNode.write(newContent); | ||
const nonEmptyNewContentPieces = newContentPieces.filter((piece) => piece.header || Object.keys(piece.new).length > 0); | ||
// console.log(theirKnown.id, nonEmptyNewContentPieces); | ||
for (const piece of nonEmptyNewContentPieces) { | ||
await this.toLocalNode.write(piece); | ||
await new Promise((resolve) => setTimeout(resolve, 0)); | ||
} | ||
@@ -214,2 +268,14 @@ } | ||
const actuallyNewTransactions = newTransactions.slice(actuallyNewOffset); | ||
let newBytesSinceLastSignature = (sessionRow?.bytesSinceLastSignature || 0) + | ||
actuallyNewTransactions.reduce((sum, tx) => sum + | ||
(tx.privacy === "private" | ||
? tx.encryptedChanges.length | ||
: tx.changes.length), 0); | ||
const newLastIdx = (sessionRow?.lastIdx || 0) + | ||
actuallyNewTransactions.length; | ||
let shouldWriteSignature = false; | ||
if (newBytesSinceLastSignature > MAX_RECOMMENDED_TX_SIZE) { | ||
shouldWriteSignature = true; | ||
newBytesSinceLastSignature = 0; | ||
} | ||
let nextIdx = sessionRow?.lastIdx || 0; | ||
@@ -219,12 +285,19 @@ const sessionUpdate = { | ||
sessionID: sessionID, | ||
lastIdx: (sessionRow?.lastIdx || 0) + | ||
actuallyNewTransactions.length, | ||
lastIdx: newLastIdx, | ||
lastSignature: msg.new[sessionID].lastSignature, | ||
bytesSinceLastSignature: newBytesSinceLastSignature, | ||
}; | ||
const upsertedSession = this.db | ||
.prepare(`INSERT INTO sessions (coValue, sessionID, lastIdx, lastSignature) VALUES (?, ?, ?, ?) | ||
ON CONFLICT(coValue, sessionID) DO UPDATE SET lastIdx=excluded.lastIdx, lastSignature=excluded.lastSignature | ||
.prepare(`INSERT INTO sessions (coValue, sessionID, lastIdx, lastSignature, bytesSinceLastSignature) VALUES (?, ?, ?, ?, ?) | ||
ON CONFLICT(coValue, sessionID) DO UPDATE SET lastIdx=excluded.lastIdx, lastSignature=excluded.lastSignature, bytesSinceLastSignature=excluded.bytesSinceLastSignature | ||
RETURNING rowID`) | ||
.get(sessionUpdate.coValue, sessionUpdate.sessionID, sessionUpdate.lastIdx, sessionUpdate.lastSignature); | ||
.get(sessionUpdate.coValue, sessionUpdate.sessionID, sessionUpdate.lastIdx, sessionUpdate.lastSignature, sessionUpdate.bytesSinceLastSignature); | ||
const sessionRowID = upsertedSession.rowID; | ||
if (shouldWriteSignature) { | ||
this.db | ||
.prepare(`INSERT INTO signatureAfter (ses, idx, signature) VALUES (?, ?, ?)`) | ||
.run(sessionRowID, | ||
// TODO: newLastIdx is a misnomer, it's actually more like nextIdx or length | ||
newLastIdx - 1, msg.new[sessionID].lastSignature); | ||
} | ||
for (const newTransaction of actuallyNewTransactions) { | ||
@@ -231,0 +304,0 @@ this.db |
{ | ||
"name": "cojson-storage-sqlite", | ||
"type": "module", | ||
"version": "0.2.4", | ||
"version": "0.2.5", | ||
"main": "dist/index.js", | ||
@@ -21,3 +21,3 @@ "types": "dist/index.d.ts", | ||
}, | ||
"gitHead": "ee7e3ee5a7ad3fa2b0e85c791f9ff05ca87669ba" | ||
"gitHead": "6720c192335d1c12c6bb50b7c52ff0d9ed1d5396" | ||
} |
212
src/index.ts
@@ -7,4 +7,3 @@ import { | ||
SessionID, | ||
// CojsonInternalTypes, | ||
// SessionID, | ||
MAX_RECOMMENDED_TX_SIZE, | ||
} from "cojson"; | ||
@@ -19,3 +18,2 @@ import { | ||
import Database, { Database as DatabaseT } from "better-sqlite3"; | ||
import { RawCoID } from "cojson/dist/ids"; | ||
@@ -34,2 +32,3 @@ type CoValueRow = { | ||
lastSignature: CojsonInternalTypes.Signature; | ||
bytesSinceLastSignature?: number; | ||
}; | ||
@@ -45,2 +44,8 @@ | ||
type SignatureAfterRow = { | ||
ses: number; | ||
idx: number; | ||
signature: CojsonInternalTypes.Signature; | ||
}; | ||
export class SQLiteStorage { | ||
@@ -105,3 +110,5 @@ fromLocalNode!: ReadableStreamDefaultReader<SyncMessage>; | ||
const oldVersion = (db.pragma("user_version") as [{user_version: number}])[0].user_version as number; | ||
const oldVersion = ( | ||
db.pragma("user_version") as [{ user_version: number }] | ||
)[0].user_version as number; | ||
@@ -116,3 +123,3 @@ console.log("DB version", oldVersion); | ||
idx INTEGER, | ||
tx TEXT NOT NULL , | ||
tx TEXT NOT NULL, | ||
PRIMARY KEY (ses, idx) | ||
@@ -155,16 +162,46 @@ ) WITHOUT ROWID;` | ||
// fix embarrassing off-by-one error for transaction indices | ||
console.log("Migration 1 -> 2: Fix off-by-one error for transaction indices"); | ||
console.log( | ||
"Migration 1 -> 2: Fix off-by-one error for transaction indices" | ||
); | ||
const txs = db.prepare(`SELECT * FROM transactions`).all() as TransactionRow[]; | ||
const txs = db | ||
.prepare(`SELECT * FROM transactions`) | ||
.all() as TransactionRow[]; | ||
for (const tx of txs) { | ||
db.prepare(`DELETE FROM transactions WHERE ses = ? AND idx = ?`).run(tx.ses, tx.idx); | ||
db.prepare( | ||
`DELETE FROM transactions WHERE ses = ? AND idx = ?` | ||
).run(tx.ses, tx.idx); | ||
tx.idx -= 1; | ||
db.prepare(`INSERT INTO transactions (ses, idx, tx) VALUES (?, ?, ?)`).run(tx.ses, tx.idx, tx.tx); | ||
db.prepare( | ||
`INSERT INTO transactions (ses, idx, tx) VALUES (?, ?, ?)` | ||
).run(tx.ses, tx.idx, tx.tx); | ||
} | ||
db.pragma("user_version = 2"); | ||
console.log("Migration 1 -> 2: Fix off-by-one error for transaction indices - done"); | ||
console.log( | ||
"Migration 1 -> 2: Fix off-by-one error for transaction indices - done" | ||
); | ||
} | ||
if (oldVersion <= 2) { | ||
console.log("Migration 2 -> 3: Add signatureAfter"); | ||
db.prepare( | ||
`CREATE TABLE IF NOT EXISTS signatureAfter ( | ||
ses INTEGER, | ||
idx INTEGER, | ||
signature TEXT NOT NULL, | ||
PRIMARY KEY (ses, idx) | ||
) WITHOUT ROWID;` | ||
).run(); | ||
db.prepare( | ||
`ALTER TABLE sessions ADD COLUMN bytesSinceLastSignature INTEGER;` | ||
).run(); | ||
db.pragma("user_version = 3"); | ||
console.log("Migration 2 -> 3: Add signatureAfter - done"); | ||
} | ||
return new SQLiteStorage(db, fromLocalNode, toLocalNode); | ||
@@ -215,8 +252,10 @@ } | ||
const newContent: CojsonInternalTypes.NewContentMessage = { | ||
action: "content", | ||
id: theirKnown.id, | ||
header: theirKnown.header ? undefined : parsedHeader, | ||
new: {}, | ||
}; | ||
const newContentPieces: CojsonInternalTypes.NewContentMessage[] = [ | ||
{ | ||
action: "content", | ||
id: theirKnown.id, | ||
header: theirKnown.header ? undefined : parsedHeader, | ||
new: {}, | ||
}, | ||
]; | ||
@@ -233,2 +272,14 @@ for (const sessionRow of allOurSessions) { | ||
const signaturesAndIdxs = this.db | ||
.prepare<[number, number]>( | ||
`SELECT * FROM signatureAfter WHERE ses = ? AND idx >= ?` | ||
) | ||
.all(sessionRow.rowID, firstNewTxIdx) as SignatureAfterRow[]; | ||
// console.log( | ||
// theirKnown.id, | ||
// "signaturesAndIdxs", | ||
// JSON.stringify(signaturesAndIdxs) | ||
// ); | ||
const newTxInSession = this.db | ||
@@ -240,9 +291,48 @@ .prepare<[number, number]>( | ||
newContent.new[sessionRow.sessionID] = { | ||
after: firstNewTxIdx, | ||
lastSignature: sessionRow.lastSignature, | ||
newTransactions: newTxInSession.map((row) => | ||
JSON.parse(row.tx) | ||
), | ||
}; | ||
let idx = firstNewTxIdx; | ||
// console.log( | ||
// theirKnown.id, | ||
// "newTxInSession", | ||
// newTxInSession.length | ||
// ); | ||
for (const tx of newTxInSession) { | ||
let sessionEntry = | ||
newContentPieces[newContentPieces.length - 1]!.new[ | ||
sessionRow.sessionID | ||
]; | ||
if (!sessionEntry) { | ||
sessionEntry = { | ||
after: idx, | ||
lastSignature: "WILL_BE_REPLACED" as CojsonInternalTypes.Signature, | ||
newTransactions: [], | ||
}; | ||
newContentPieces[newContentPieces.length - 1]!.new[ | ||
sessionRow.sessionID | ||
] = sessionEntry; | ||
} | ||
sessionEntry.newTransactions.push(JSON.parse(tx.tx)); | ||
if ( | ||
signaturesAndIdxs[0] && | ||
idx === signaturesAndIdxs[0].idx | ||
) { | ||
sessionEntry.lastSignature = | ||
signaturesAndIdxs[0].signature; | ||
signaturesAndIdxs.shift(); | ||
newContentPieces.push({ | ||
action: "content", | ||
id: theirKnown.id, | ||
new: {}, | ||
}); | ||
} else if ( | ||
idx === | ||
firstNewTxIdx + newTxInSession.length - 1 | ||
) { | ||
sessionEntry.lastSignature = sessionRow.lastSignature; | ||
} | ||
idx += 1; | ||
} | ||
} | ||
@@ -253,3 +343,4 @@ } | ||
parsedHeader?.ruleset.type === "group" | ||
? Object.values(newContent.new).flatMap((sessionEntry) => | ||
? newContentPieces | ||
.flatMap((piece) => Object.values(piece.new)).flatMap((sessionEntry) => | ||
sessionEntry.newTransactions.flatMap((tx) => { | ||
@@ -293,4 +384,11 @@ if (tx.privacy !== "trusting") return []; | ||
if (newContent.header || Object.keys(newContent.new).length > 0) { | ||
await this.toLocalNode.write(newContent); | ||
const nonEmptyNewContentPieces = newContentPieces.filter( | ||
(piece) => piece.header || Object.keys(piece.new).length > 0 | ||
); | ||
// console.log(theirKnown.id, nonEmptyNewContentPieces); | ||
for (const piece of nonEmptyNewContentPieces) { | ||
await this.toLocalNode.write(piece); | ||
await new Promise((resolve) => setTimeout(resolve, 0)); | ||
} | ||
@@ -306,3 +404,5 @@ } | ||
this.db | ||
.prepare<RawCoID>(`SELECT rowID FROM coValues WHERE id = ?`) | ||
.prepare<CojsonInternalTypes.RawCoID>( | ||
`SELECT rowID FROM coValues WHERE id = ?` | ||
) | ||
.get(msg.id) as StoredCoValueRow | undefined | ||
@@ -326,3 +426,3 @@ )?.rowID; | ||
storedCoValueRowID = this.db | ||
.prepare<[RawCoID, string]>( | ||
.prepare<[CojsonInternalTypes.RawCoID, string]>( | ||
`INSERT INTO coValues (id, header) VALUES (?, ?)` | ||
@@ -369,5 +469,28 @@ ) | ||
(msg.new[sessionID]?.after || 0); | ||
const actuallyNewTransactions = | ||
newTransactions.slice(actuallyNewOffset); | ||
let newBytesSinceLastSignature = | ||
(sessionRow?.bytesSinceLastSignature || 0) + | ||
actuallyNewTransactions.reduce( | ||
(sum, tx) => | ||
sum + | ||
(tx.privacy === "private" | ||
? tx.encryptedChanges.length | ||
: tx.changes.length), | ||
0 | ||
); | ||
const newLastIdx = | ||
(sessionRow?.lastIdx || 0) + | ||
actuallyNewTransactions.length; | ||
let shouldWriteSignature = false; | ||
if (newBytesSinceLastSignature > MAX_RECOMMENDED_TX_SIZE) { | ||
shouldWriteSignature = true; | ||
newBytesSinceLastSignature = 0; | ||
} | ||
let nextIdx = sessionRow?.lastIdx || 0; | ||
@@ -378,12 +501,11 @@ | ||
sessionID: sessionID, | ||
lastIdx: | ||
(sessionRow?.lastIdx || 0) + | ||
actuallyNewTransactions.length, | ||
lastIdx: newLastIdx, | ||
lastSignature: msg.new[sessionID]!.lastSignature, | ||
bytesSinceLastSignature: newBytesSinceLastSignature, | ||
}; | ||
const upsertedSession = this.db | ||
.prepare<[number, string, number, string]>( | ||
`INSERT INTO sessions (coValue, sessionID, lastIdx, lastSignature) VALUES (?, ?, ?, ?) | ||
ON CONFLICT(coValue, sessionID) DO UPDATE SET lastIdx=excluded.lastIdx, lastSignature=excluded.lastSignature | ||
.prepare<[number, string, number, string, number]>( | ||
`INSERT INTO sessions (coValue, sessionID, lastIdx, lastSignature, bytesSinceLastSignature) VALUES (?, ?, ?, ?, ?) | ||
ON CONFLICT(coValue, sessionID) DO UPDATE SET lastIdx=excluded.lastIdx, lastSignature=excluded.lastSignature, bytesSinceLastSignature=excluded.bytesSinceLastSignature | ||
RETURNING rowID` | ||
@@ -395,3 +517,4 @@ ) | ||
sessionUpdate.lastIdx, | ||
sessionUpdate.lastSignature | ||
sessionUpdate.lastSignature, | ||
sessionUpdate.bytesSinceLastSignature, | ||
) as { rowID: number }; | ||
@@ -401,6 +524,19 @@ | ||
if (shouldWriteSignature) { | ||
this.db | ||
.prepare<[number, number, string]>( | ||
`INSERT INTO signatureAfter (ses, idx, signature) VALUES (?, ?, ?)` | ||
) | ||
.run( | ||
sessionRowID, | ||
// TODO: newLastIdx is a misnomer, it's actually more like nextIdx or length | ||
newLastIdx - 1, | ||
msg.new[sessionID]!.lastSignature | ||
); | ||
} | ||
for (const newTransaction of actuallyNewTransactions) { | ||
this.db | ||
.prepare<[number, number, string]>( | ||
`INSERT INTO transactions (ses, idx, tx) VALUES (?, ?, ?)` | ||
.prepare<[number, number, string]>( | ||
`INSERT INTO transactions (ses, idx, tx) VALUES (?, ?, ?)` | ||
) | ||
@@ -411,3 +547,3 @@ .run( | ||
JSON.stringify(newTransaction) | ||
); | ||
); | ||
nextIdx++; | ||
@@ -414,0 +550,0 @@ } |
Sorry, the diff of this file is not supported yet
46981
31.61%841
29.19%