aedes-persistence-cassandra
Advanced tools
Comparing version 0.1.1 to 0.1.2
{ | ||
"name": "aedes-persistence-cassandra", | ||
"version": "0.1.1", | ||
"version": "0.1.2", | ||
"description": "Cassandra persistence for Aedes", | ||
@@ -34,4 +34,5 @@ "main": "persistence-cassandra.js", | ||
"qlobber": "^5.0.3", | ||
"through2": "^4.0.2" | ||
"through2": "^4.0.2", | ||
"uuid": "^8.3.2" | ||
} | ||
} |
@@ -10,2 +10,3 @@ "use strict"; | ||
const Qlobber = require("qlobber").Qlobber; | ||
const uuidv4 = require("uuid").v4; | ||
@@ -27,14 +28,24 @@ const qlobberOpts = { | ||
opts = opts || {}; | ||
opts.ttl = opts.ttl || {}; | ||
if (typeof opts.ttl.packets === "number") { | ||
opts.ttl.packets = { | ||
retained: opts.ttl.packets, | ||
will: opts.ttl.packets, | ||
outgoing: opts.ttl.packets, | ||
incoming: opts.ttl.packets | ||
let ttl = opts.ttl != null ? { ...opts.ttl } : {}; | ||
if (typeof ttl.packets === "number") { | ||
ttl.packets = { | ||
retained: ttl.packets, | ||
will: ttl.packets, | ||
outgoing: ttl.packets, | ||
incoming: ttl.packets | ||
}; | ||
} | ||
else if (ttl.packets == null) { | ||
ttl.packets = {}; | ||
} | ||
ttl.packets.retained = ttl.packets.retained || 0; | ||
ttl.packets.will = ttl.packets.will || 0; | ||
ttl.packets.outgoing = ttl.packets.outgoing || 0; | ||
ttl.packets.incoming = ttl.packets.incoming || 0; | ||
ttl.subscriptions = ttl.subscriptions || 0; | ||
this._opts = opts; | ||
this._opts.ttl = ttl; | ||
this._shutdownClient = false; | ||
@@ -108,3 +119,3 @@ this._client = null; | ||
batch.push({ | ||
query: "INSERT INTO retained (topic, broker_id, broker_counter, cmd, dup, qos, payload) VALUES (?, ?, ?, ?, ?, ?, ?)", | ||
query: "INSERT INTO retained (topic, broker_id, broker_counter, cmd, dup, qos, payload) VALUES (?, ?, ?, ?, ?, ?, ?) USING TTL ?", | ||
params: [ | ||
@@ -117,10 +128,6 @@ p.packet.topic, | ||
p.packet.qos, | ||
p.packet.payload | ||
p.packet.payload, | ||
that._opts.ttl.packets.retained | ||
] | ||
}); | ||
if (that._opts.ttl.packets && that._opts.ttl.packets.retained) { | ||
batch[batch.length - 1].query += " USING TTL ?"; | ||
batch[batch.length - 1].params.push(that._opts.ttl.packets.retained); | ||
} | ||
} | ||
@@ -193,27 +200,17 @@ else { | ||
.forEach(function(sub) { | ||
const params = [client.id, sub.topic, sub.qos]; | ||
const params = [client.id, sub.topic, sub.qos, that._opts.ttl.subscriptions]; | ||
batch.push({ | ||
query: "INSERT INTO subscription (client_id, topic, qos) VALUES (?, ?, ?)", | ||
query: "INSERT INTO subscription (client_id, topic, qos) VALUES (?, ?, ?) USING TTL ?", | ||
params | ||
}, { | ||
query: "INSERT INTO subscription_by_topic (client_id, topic, qos) VALUES (?, ?, ?)", | ||
query: "INSERT INTO subscription_by_topic (client_id, topic, qos) VALUES (?, ?, ?) USING TTL ?", | ||
params | ||
}); | ||
if (that._opts.ttl.subscriptions) { | ||
batch[batch.length - 2].query += " USING TTL ?"; | ||
batch[batch.length - 1].query += " USING TTL ?"; | ||
params.push(that._opts.ttl.subscriptions); | ||
} | ||
if (sub.qos > 0) { | ||
batch.push({ | ||
query: "INSERT INTO subscription_qos12 (client_id, topic, qos) VALUES (?, ?, ?)", | ||
query: "INSERT INTO subscription_qos12 (client_id, topic, qos) VALUES (?, ?, ?) USING TTL ?", | ||
params | ||
}); | ||
if (that._opts.ttl.subscriptions) { | ||
batch[batch.length - 1].query += " USING TTL ?"; | ||
} | ||
} | ||
@@ -255,3 +252,2 @@ }); | ||
// Rimuove la subscription con client id e topic | ||
batch.push({ | ||
@@ -371,15 +367,14 @@ query: "DELETE FROM subscription WHERE client_id = ? AND topic = ?", | ||
subs.map(function(sub) { | ||
const params = [sub.clientId, newp.messageId, newp.brokerId, newp.brokerCounter, newp.cmd, newp.topic, newp.qos, newp.retain, newp.dup, newp.payload]; | ||
if (that._opts.ttl.packets && that._opts.ttl.packets.outgoing) { | ||
params.push(that._opts.ttl.packets.outgoing); | ||
} | ||
const params = [sub.clientId, uuidv4(), newp.messageId, newp.brokerId, newp.brokerCounter, newp.cmd, newp.topic, newp.qos, newp.retain, newp.dup, newp.payload, that._opts.ttl.packets.outgoing]; | ||
batch.push({ | ||
query: "INSERT INTO outgoing (client_id, ref, message_id, broker_id, broker_counter, cmd, topic, qos, retain, dup, payload) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) USING TTL ?", | ||
params | ||
}); | ||
if (newp.messageId != null) { | ||
batch.push({ | ||
query: "INSERT INTO outgoing (client_id, message_id, broker_id, broker_counter, cmd, topic, qos, retain, dup, payload) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", | ||
query: "INSERT INTO outgoing_by_message_id (client_id, ref, message_id, broker_id, broker_counter, cmd, topic, qos, retain, dup, payload) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) USING TTL ?", | ||
params | ||
}); | ||
if (that._opts.ttl.packets && that._opts.ttl.packets.outgoing) { | ||
batch[batch.length - 1].query += " USING TTL ?"; | ||
} | ||
} | ||
@@ -389,8 +384,5 @@ | ||
batch.push({ | ||
query: "INSERT INTO outgoing_by_broker (client_id, message_id, broker_id, broker_counter, cmd, topic, qos, retain, dup, payload) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", | ||
query: "INSERT INTO outgoing_by_broker (client_id, ref, message_id, broker_id, broker_counter, cmd, topic, qos, retain, dup, payload) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) USING TTL ?", | ||
params | ||
}); | ||
if (that._opts.ttl.packets && that._opts.ttl.packets.outgoing) { | ||
batch[batch.length - 1].query += " USING TTL ?"; | ||
} | ||
} | ||
@@ -433,2 +425,8 @@ }); | ||
const oldRow = result.rows[0]; | ||
if (oldRow == null) { | ||
cb(new Error("Existing outgoing message not found")); | ||
return; | ||
} | ||
const batch = [ | ||
@@ -446,18 +444,22 @@ { | ||
const oldRow = result.rows[0]; | ||
if (oldRow || packet.messageId) { | ||
if (oldRow && oldRow.messageId != null && oldRow.message_id.toNumber() != packet.messageId) { | ||
batch.push({ | ||
query: "DELETE FROM outgoing WHERE client_id = ? AND message_id = ?", | ||
params: [oldRow.client_id, oldRow.message_id.toNumber()] | ||
}); | ||
} | ||
const messageId = packet.messageId != null ? packet.messageId : (oldRow.message_id != null ? oldRow.message_id.toNumber() : null); | ||
if (oldRow.messageId != null && oldRow.message_id.toNumber() != packet.messageId) { | ||
batch.push({ | ||
query: "INSERT INTO outgoing (client_id, message_id, broker_id, broker_counter, cmd, topic, qos, retain, dup, payload) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", | ||
params: [client.id, messageId, oldRow.broker_id, oldRow.broker_counter, oldRow.cmd, oldRow.topic, oldRow.qos, oldRow.retain, oldRow.dup, oldRow.payload] | ||
query: "DELETE FROM outgoing_by_message_id WHERE client_id = ? AND message_id = ?", | ||
params: [oldRow.client_id, oldRow.message_id.toNumber()] | ||
}); | ||
} | ||
const messageId = packet.messageId != null ? packet.messageId : (oldRow.message_id != null ? oldRow.message_id.toNumber() : null); | ||
batch.push({ | ||
query: "INSERT INTO outgoing_by_message_id (client_id, ref, message_id, broker_id, broker_counter, cmd, topic, qos, retain, dup, payload) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) USING TTL ?", | ||
params: [client.id, oldRow.ref, messageId, oldRow.broker_id, oldRow.broker_counter, oldRow.cmd, oldRow.topic, oldRow.qos, oldRow.retain, oldRow.dup, oldRow.payload, that._opts.ttl.packets.outgoing] | ||
}, { | ||
query: "UPDATE outgoing SET message_id = ? WHERE client_id = ? AND ref = ?", | ||
params: [ | ||
packet.messageId, | ||
client.id, | ||
oldRow.ref | ||
] | ||
}); | ||
that._client.batch(batch, { prepare: true }, function(err) { | ||
@@ -469,3 +471,3 @@ cb(err, client, packet); | ||
async function updatePacket(that, client, packet, cb) { | ||
const result = await that._client.execute("SELECT * FROM outgoing WHERE client_id = ? AND message_id = ?", [ | ||
const result = await that._client.execute("SELECT * FROM outgoing_by_message_id WHERE client_id = ? AND message_id = ?", [ | ||
client.id, | ||
@@ -475,26 +477,36 @@ packet.messageId | ||
const batch = [ | ||
{ | ||
query: "INSERT INTO outgoing (client_id, message_id, broker_id, broker_counter, cmd, topic, qos, retain, dup, payload) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", | ||
params: [client.id, packet.messageId, packet.brokerId, packet.brokerCounter, packet.cmd, packet.topic, packet.qos, packet.retain, packet.dup, packet.payload] | ||
} | ||
]; | ||
const oldRow = result.rows[0]; | ||
if (oldRow || (packet.brokerId && packet.brokerCounter)) { | ||
if (oldRow && oldRow.broker_id!= null && oldRow.broker_counter != null && (oldRow.broker_id != packet.brokerId || oldRow.broker_counter.toNumber() != packet.brokerCounter)) { | ||
batch.push({ | ||
query: "DELETE FROM outgoing_by_broker WHERE client_id = ? AND broker_id = ? AND broker_counter = ?", | ||
params: [oldRow.client_id, oldRow.broker_id, oldRow.broker_counter.toNumber()] | ||
}); | ||
} | ||
if (oldRow == null) { | ||
cb(new Error("Existing outgoing message not found")); | ||
return; | ||
} | ||
const brokerId = packet.brokerId != null ? packet.brokerId : oldRow.broker_id; | ||
const brokerCounter = packet.brokerCounter != null ? packet.brokerCounter : (oldRow.broker_counter != null ? oldRow.broker_counter.toNumber() : null); | ||
const brokerId = packet.brokerId != null ? packet.brokerId : oldRow.broker_id; | ||
const brokerCounter = packet.brokerCounter != null ? packet.brokerCounter : (oldRow.broker_counter != null ? oldRow.broker_counter.toNumber() : null); | ||
const params = [client.id, oldRow.ref, packet.messageId, brokerId, brokerCounter, packet.cmd, packet.topic, packet.qos, packet.retain, packet.dup, packet.payload, that._opts.ttl.packets.outgoing]; | ||
const batch = [{ | ||
query: "INSERT INTO outgoing_by_message_id (client_id, ref, message_id, broker_id, broker_counter, cmd, topic, qos, retain, dup, payload) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) USING TTL ?", | ||
params | ||
}, { | ||
query: "UPDATE outgoing SET message_id = ? WHERE client_id = ? AND ref = ?", | ||
params: [ | ||
packet.messageId, | ||
client.id, | ||
oldRow.ref | ||
] | ||
}]; | ||
if (oldRow.broker_id!= null && oldRow.broker_counter != null && (oldRow.broker_id != packet.brokerId || oldRow.broker_counter.toNumber() != packet.brokerCounter)) { | ||
batch.push({ | ||
query: "INSERT INTO outgoing_by_broker (client_id, message_id, broker_id, broker_counter, cmd, topic, qos, retain, dup, payload) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", | ||
params: [client.id, packet.messageId, brokerId, brokerCounter, packet.cmd, packet.topic, packet.qos, packet.retain, packet.dup, packet.payload] | ||
query: "DELETE FROM outgoing_by_broker WHERE client_id = ? AND broker_id = ? AND broker_counter = ?", | ||
params: [oldRow.client_id, oldRow.broker_id, oldRow.broker_counter.toNumber()] | ||
}); | ||
} | ||
batch.push({ | ||
query: "INSERT INTO outgoing_by_broker (client_id, ref, message_id, broker_id, broker_counter, cmd, topic, qos, retain, dup, payload) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) USING TTL ?", | ||
params | ||
}); | ||
that._client.batch(batch, { prepare: true }, function(err) { | ||
@@ -524,5 +536,5 @@ cb(err, client, packet); | ||
let outgoingPacket; | ||
let oldRow; | ||
try { | ||
const result = await this._client.execute("SELECT * FROM outgoing WHERE client_id = ? AND message_id = ?", [client.id, packet.messageId], { prepare: true }); | ||
const result = await this._client.execute("SELECT * FROM outgoing_by_message_id WHERE client_id = ? AND message_id = ?", [client.id, packet.messageId], { prepare: true }); | ||
@@ -533,3 +545,3 @@ if (!result.rows.length) { | ||
outgoingPacket = asPacket(result.rows[0]); | ||
oldRow = result.rows[0]; | ||
} | ||
@@ -540,13 +552,27 @@ catch (err) { | ||
this._client.batch([ | ||
if (oldRow == null) { | ||
cb(new Error("Existing outgoing message not found")); | ||
return; | ||
} | ||
const batch = [ | ||
{ | ||
query: "DELETE FROM outgoing WHERE client_id = ? AND message_id = ?", | ||
params: [client.id, outgoingPacket.messageId] | ||
query: "DELETE FROM outgoing WHERE client_id = ? AND ref = ?", | ||
params: [client.id, oldRow.ref] | ||
}, | ||
{ | ||
query: "DELETE FROM outgoing_by_message_id WHERE client_id = ? AND message_id = ?", | ||
params: [client.id, oldRow.message_id.toNumber()] | ||
} | ||
]; | ||
if (oldRow.broker_id != null && oldRow.broker_counter != null) { | ||
batch.push({ | ||
query: "DELETE FROM outgoing_by_broker WHERE client_id = ? AND broker_id = ? AND broker_counter = ?", | ||
params: [client.id, outgoingPacket.brokerId, outgoingPacket.brokerCounter] | ||
} | ||
], { prepare: true }, function(err) { | ||
cb(err, outgoingPacket); | ||
params: [client.id, oldRow.broker_id, oldRow.broker_counter.toNumber()] | ||
}); | ||
} | ||
this._client.batch(batch, { prepare: true }, function(err) { | ||
cb(err, asPacket(oldRow)); | ||
}); | ||
@@ -564,3 +590,3 @@ }; | ||
let query = "INSERT INTO incoming (client_id, message_id, broker_id, broker_counter, cmd, topic, qos, retain, dup, payload) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; | ||
let query = "INSERT INTO incoming (client_id, message_id, broker_id, broker_counter, cmd, topic, qos, retain, dup, payload) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) USING TTL ?"; | ||
const params = [ | ||
@@ -576,8 +602,5 @@ client.id, | ||
newp.dup, | ||
newp.payload | ||
newp.payload, | ||
this._opts.ttl.packets.incoming | ||
]; | ||
if (this._opts.ttl.packets && this._opts.ttl.packets.incoming) { | ||
query += " USING TTL ?"; | ||
params.push(this._opts.ttl.packets.incoming); | ||
} | ||
@@ -632,3 +655,3 @@ this._client.execute(query, params, { prepare: true }, cb); | ||
let query = "INSERT INTO last_will (client_id, broker_id, topic, qos, retain, payload) VALUES (?, ?, ?, ?, ?, ?)"; | ||
let query = "INSERT INTO last_will (client_id, broker_id, topic, qos, retain, payload) VALUES (?, ?, ?, ?, ?, ?) USING TTL ?"; | ||
const params = [ | ||
@@ -640,8 +663,5 @@ packet.clientId, | ||
packet.retain, | ||
packet.payload | ||
packet.payload, | ||
this._opts.ttl.packets.will | ||
]; | ||
if (this._opts.ttl.packets && this._opts.ttl.packets.will) { | ||
query += " USING TTL ?"; | ||
params.push(this._opts.ttl.packets.will); | ||
} | ||
@@ -648,0 +668,0 @@ this._client.execute(query, params, { prepare: true }, function(err) { |
Sorry, the diff of this file is not supported yet
27787
704
6
+ Addeduuid@^8.3.2
+ Addeduuid@8.3.2(transitive)