cojson-storage-sqlite
Advanced tools
Comparing version
# cojson-storage-sqlite | ||
## 0.14.18 | ||
### Patch Changes | ||
- Updated dependencies [0d5ee3e] | ||
- Updated dependencies [be7c4c2] | ||
- cojson@0.14.18 | ||
- cojson-storage@0.14.18 | ||
## 0.14.16 | ||
@@ -4,0 +13,0 @@ |
@@ -1,7 +0,4 @@ | ||
import { type Database as DatabaseT } from "better-sqlite3"; | ||
import { type IncomingSyncStream, type OutgoingSyncQueue, type Peer } from "cojson"; | ||
export declare class SQLiteNode { | ||
private readonly syncManager; | ||
private readonly dbClient; | ||
constructor(db: DatabaseT, fromLocalNode: IncomingSyncStream, toLocalNode: OutgoingSyncQueue); | ||
import type { Peer } from "cojson"; | ||
import { SQLiteNodeBase } from "cojson-storage"; | ||
export declare class SQLiteNode extends SQLiteNodeBase { | ||
static asPeer({ filename, localNodeName, }: { | ||
@@ -11,4 +8,3 @@ filename: string; | ||
}): Promise<Peer>; | ||
static open(filename: string, fromLocalNode: IncomingSyncStream, toLocalNode: OutgoingSyncQueue): Promise<SQLiteNode>; | ||
} | ||
//# sourceMappingURL=sqliteNode.d.ts.map |
@@ -1,102 +0,13 @@ | ||
import Database from "better-sqlite3"; | ||
import { cojsonInternals, logger, } from "cojson"; | ||
import { StorageManagerSync } from "cojson-storage"; | ||
import { SQLiteClient } from "./sqliteClient.js"; | ||
export class SQLiteNode { | ||
constructor(db, fromLocalNode, toLocalNode) { | ||
this.dbClient = new SQLiteClient(db, toLocalNode); | ||
this.syncManager = new StorageManagerSync(this.dbClient, toLocalNode); | ||
const processMessages = async () => { | ||
let lastTimer = performance.now(); | ||
let runningTimer = false; | ||
for await (const msg of fromLocalNode) { | ||
try { | ||
if (msg === "Disconnected" || msg === "PingTimeout") { | ||
throw new Error("Unexpected Disconnected message"); | ||
} | ||
if (!runningTimer) { | ||
runningTimer = true; | ||
lastTimer = performance.now(); | ||
setTimeout(() => { | ||
runningTimer = false; | ||
}, 10); | ||
} | ||
this.syncManager.handleSyncMessage(msg); | ||
// Since the DB APIs are synchronous there may be the case | ||
// where a bulk of messages are processed without interruptions | ||
// which may block other peers from sending messages. | ||
// To avoid this we schedule a timer to downgrade the priority of the storage peer work | ||
if (performance.now() - lastTimer > 500) { | ||
lastTimer = performance.now(); | ||
await new Promise((resolve) => setTimeout(resolve, 0)); | ||
} | ||
} | ||
catch (e) { | ||
logger.error("Error reading from localNode, handling msg", { | ||
msg, | ||
err: e, | ||
}); | ||
} | ||
} | ||
}; | ||
processMessages().catch((e) => logger.error("Error in processMessages in sqlite", { err: e })); | ||
} | ||
import { SQLiteNodeBase } from "cojson-storage"; | ||
import { BetterSqliteDriver } from "./betterSqliteDriver.js"; | ||
export class SQLiteNode extends SQLiteNodeBase { | ||
static async asPeer({ filename, localNodeName = "local", }) { | ||
const [localNodeAsPeer, storageAsPeer] = cojsonInternals.connectedPeers(localNodeName, "storage", { peer1role: "client", peer2role: "storage", crashOnClose: true }); | ||
await SQLiteNode.open(filename, localNodeAsPeer.incoming, localNodeAsPeer.outgoing); | ||
return { ...storageAsPeer, priority: 100 }; | ||
const db = new BetterSqliteDriver(filename); | ||
return SQLiteNodeBase.create({ | ||
db, | ||
localNodeName, | ||
maxBlockingTime: 500, | ||
}); | ||
} | ||
static async open(filename, fromLocalNode, toLocalNode) { | ||
const db = Database(filename); | ||
db.pragma("journal_mode = WAL"); | ||
const oldVersion = db.pragma("user_version")[0].user_version; | ||
if (oldVersion === 0) { | ||
db.prepare(`CREATE TABLE IF NOT EXISTS transactions ( | ||
ses INTEGER, | ||
idx INTEGER, | ||
tx TEXT NOT NULL, | ||
PRIMARY KEY (ses, idx) | ||
) WITHOUT ROWID;`).run(); | ||
db.prepare(`CREATE TABLE IF NOT EXISTS sessions ( | ||
rowID INTEGER PRIMARY KEY, | ||
coValue INTEGER NOT NULL, | ||
sessionID TEXT NOT NULL, | ||
lastIdx INTEGER, | ||
lastSignature TEXT, | ||
UNIQUE (sessionID, coValue) | ||
);`).run(); | ||
db.prepare("CREATE INDEX IF NOT EXISTS sessionsByCoValue ON sessions (coValue);").run(); | ||
db.prepare(`CREATE TABLE IF NOT EXISTS coValues ( | ||
rowID INTEGER PRIMARY KEY, | ||
id TEXT NOT NULL UNIQUE, | ||
header TEXT NOT NULL UNIQUE | ||
);`).run(); | ||
db.prepare("CREATE INDEX IF NOT EXISTS coValuesByID ON coValues (id);").run(); | ||
db.pragma("user_version = 1"); | ||
} | ||
if (oldVersion <= 1) { | ||
// fix embarrassing off-by-one error for transaction indices | ||
const txs = db | ||
.prepare("SELECT * FROM transactions") | ||
.all(); | ||
for (const tx of txs) { | ||
db.prepare("DELETE FROM transactions WHERE ses = ? AND idx = ?").run(tx.ses, tx.idx); | ||
tx.idx -= 1; | ||
db.prepare("INSERT INTO transactions (ses, idx, tx) VALUES (?, ?, ?)").run(tx.ses, tx.idx, tx.tx); | ||
} | ||
db.pragma("user_version = 2"); | ||
} | ||
if (oldVersion <= 2) { | ||
db.prepare(`CREATE TABLE IF NOT EXISTS signatureAfter ( | ||
ses INTEGER, | ||
idx INTEGER, | ||
signature TEXT NOT NULL, | ||
PRIMARY KEY (ses, idx) | ||
) WITHOUT ROWID;`).run(); | ||
db.prepare("ALTER TABLE sessions ADD COLUMN bytesSinceLastSignature INTEGER;").run(); | ||
db.pragma("user_version = 3"); | ||
} | ||
return new SQLiteNode(db, fromLocalNode, toLocalNode); | ||
} | ||
} | ||
//# sourceMappingURL=sqliteNode.js.map |
{ | ||
"name": "cojson-storage-sqlite", | ||
"type": "module", | ||
"version": "0.14.16", | ||
"version": "0.14.18", | ||
"main": "dist/index.js", | ||
@@ -10,4 +10,4 @@ "types": "dist/index.d.ts", | ||
"better-sqlite3": "^11.7.0", | ||
"cojson": "0.14.16", | ||
"cojson-storage": "0.14.16" | ||
"cojson": "0.14.18", | ||
"cojson-storage": "0.14.18" | ||
}, | ||
@@ -14,0 +14,0 @@ "devDependencies": { |
@@ -1,67 +0,6 @@ | ||
import Database, { type Database as DatabaseT } from "better-sqlite3"; | ||
import { | ||
type IncomingSyncStream, | ||
type OutgoingSyncQueue, | ||
type Peer, | ||
cojsonInternals, | ||
logger, | ||
} from "cojson"; | ||
import { StorageManagerSync, type TransactionRow } from "cojson-storage"; | ||
import { SQLiteClient } from "./sqliteClient.js"; | ||
import type { Peer } from "cojson"; | ||
import { SQLiteNodeBase } from "cojson-storage"; | ||
import { BetterSqliteDriver } from "./betterSqliteDriver.js"; | ||
export class SQLiteNode { | ||
private readonly syncManager: StorageManagerSync; | ||
private readonly dbClient: SQLiteClient; | ||
constructor( | ||
db: DatabaseT, | ||
fromLocalNode: IncomingSyncStream, | ||
toLocalNode: OutgoingSyncQueue, | ||
) { | ||
this.dbClient = new SQLiteClient(db, toLocalNode); | ||
this.syncManager = new StorageManagerSync(this.dbClient, toLocalNode); | ||
const processMessages = async () => { | ||
let lastTimer = performance.now(); | ||
let runningTimer = false; | ||
for await (const msg of fromLocalNode) { | ||
try { | ||
if (msg === "Disconnected" || msg === "PingTimeout") { | ||
throw new Error("Unexpected Disconnected message"); | ||
} | ||
if (!runningTimer) { | ||
runningTimer = true; | ||
lastTimer = performance.now(); | ||
setTimeout(() => { | ||
runningTimer = false; | ||
}, 10); | ||
} | ||
this.syncManager.handleSyncMessage(msg); | ||
// Since the DB APIs are synchronous there may be the case | ||
// where a bulk of messages are processed without interruptions | ||
// which may block other peers from sending messages. | ||
// To avoid this we schedule a timer to downgrade the priority of the storage peer work | ||
if (performance.now() - lastTimer > 500) { | ||
lastTimer = performance.now(); | ||
await new Promise((resolve) => setTimeout(resolve, 0)); | ||
} | ||
} catch (e) { | ||
logger.error("Error reading from localNode, handling msg", { | ||
msg, | ||
err: e, | ||
}); | ||
} | ||
} | ||
}; | ||
processMessages().catch((e) => | ||
logger.error("Error in processMessages in sqlite", { err: e }), | ||
); | ||
} | ||
export class SQLiteNode extends SQLiteNodeBase { | ||
static async asPeer({ | ||
@@ -74,108 +13,10 @@ filename, | ||
}): Promise<Peer> { | ||
const [localNodeAsPeer, storageAsPeer] = cojsonInternals.connectedPeers( | ||
const db = new BetterSqliteDriver(filename); | ||
return SQLiteNodeBase.create({ | ||
db, | ||
localNodeName, | ||
"storage", | ||
{ peer1role: "client", peer2role: "storage", crashOnClose: true }, | ||
); | ||
await SQLiteNode.open( | ||
filename, | ||
localNodeAsPeer.incoming, | ||
localNodeAsPeer.outgoing, | ||
); | ||
return { ...storageAsPeer, priority: 100 }; | ||
maxBlockingTime: 500, | ||
}); | ||
} | ||
static async open( | ||
filename: string, | ||
fromLocalNode: IncomingSyncStream, | ||
toLocalNode: OutgoingSyncQueue, | ||
) { | ||
const db = Database(filename); | ||
db.pragma("journal_mode = WAL"); | ||
const oldVersion = ( | ||
db.pragma("user_version") as [{ user_version: number }] | ||
)[0].user_version as number; | ||
if (oldVersion === 0) { | ||
db.prepare( | ||
`CREATE TABLE IF NOT EXISTS transactions ( | ||
ses INTEGER, | ||
idx INTEGER, | ||
tx TEXT NOT NULL, | ||
PRIMARY KEY (ses, idx) | ||
) WITHOUT ROWID;`, | ||
).run(); | ||
db.prepare( | ||
`CREATE TABLE IF NOT EXISTS sessions ( | ||
rowID INTEGER PRIMARY KEY, | ||
coValue INTEGER NOT NULL, | ||
sessionID TEXT NOT NULL, | ||
lastIdx INTEGER, | ||
lastSignature TEXT, | ||
UNIQUE (sessionID, coValue) | ||
);`, | ||
).run(); | ||
db.prepare( | ||
"CREATE INDEX IF NOT EXISTS sessionsByCoValue ON sessions (coValue);", | ||
).run(); | ||
db.prepare( | ||
`CREATE TABLE IF NOT EXISTS coValues ( | ||
rowID INTEGER PRIMARY KEY, | ||
id TEXT NOT NULL UNIQUE, | ||
header TEXT NOT NULL UNIQUE | ||
);`, | ||
).run(); | ||
db.prepare( | ||
"CREATE INDEX IF NOT EXISTS coValuesByID ON coValues (id);", | ||
).run(); | ||
db.pragma("user_version = 1"); | ||
} | ||
if (oldVersion <= 1) { | ||
// fix embarrassing off-by-one error for transaction indices | ||
const txs = db | ||
.prepare("SELECT * FROM transactions") | ||
.all() as TransactionRow[]; | ||
for (const tx of txs) { | ||
db.prepare("DELETE FROM transactions WHERE ses = ? AND idx = ?").run( | ||
tx.ses, | ||
tx.idx, | ||
); | ||
tx.idx -= 1; | ||
db.prepare( | ||
"INSERT INTO transactions (ses, idx, tx) VALUES (?, ?, ?)", | ||
).run(tx.ses, tx.idx, tx.tx); | ||
} | ||
db.pragma("user_version = 2"); | ||
} | ||
if (oldVersion <= 2) { | ||
db.prepare( | ||
`CREATE TABLE IF NOT EXISTS signatureAfter ( | ||
ses INTEGER, | ||
idx INTEGER, | ||
signature TEXT NOT NULL, | ||
PRIMARY KEY (ses, idx) | ||
) WITHOUT ROWID;`, | ||
).run(); | ||
db.prepare( | ||
"ALTER TABLE sessions ADD COLUMN bytesSinceLastSignature INTEGER;", | ||
).run(); | ||
db.pragma("user_version = 3"); | ||
} | ||
return new SQLiteNode(db, fromLocalNode, toLocalNode); | ||
} | ||
} |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
89416
-21.2%1321
-25.99%+ Added
+ Added
- Removed
- Removed
Updated
Updated