New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

aedes-persistence-cassandra

Package Overview
Dependencies
Maintainers
1
Versions
5
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

aedes-persistence-cassandra - npm Package Compare versions

Comparing version 0.1.1 to 0.1.2

5

package.json
{
"name": "aedes-persistence-cassandra",
"version": "0.1.1",
"version": "0.1.2",
"description": "Cassandra persistence for Aedes",

@@ -34,4 +34,5 @@ "main": "persistence-cassandra.js",

"qlobber": "^5.0.3",
"through2": "^4.0.2"
"through2": "^4.0.2",
"uuid": "^8.3.2"
}
}

208

persistence-cassandra.js

@@ -10,2 +10,3 @@ "use strict";

const Qlobber = require("qlobber").Qlobber;
const uuidv4 = require("uuid").v4;

@@ -27,14 +28,24 @@ const qlobberOpts = {

opts = opts || {};
opts.ttl = opts.ttl || {};
if (typeof opts.ttl.packets === "number") {
opts.ttl.packets = {
retained: opts.ttl.packets,
will: opts.ttl.packets,
outgoing: opts.ttl.packets,
incoming: opts.ttl.packets
let ttl = opts.ttl != null ? { ...opts.ttl } : {};
if (typeof ttl.packets === "number") {
ttl.packets = {
retained: ttl.packets,
will: ttl.packets,
outgoing: ttl.packets,
incoming: ttl.packets
};
}
else if (ttl.packets == null) {
ttl.packets = {};
}
ttl.packets.retained = ttl.packets.retained || 0;
ttl.packets.will = ttl.packets.will || 0;
ttl.packets.outgoing = ttl.packets.outgoing || 0;
ttl.packets.incoming = ttl.packets.incoming || 0;
ttl.subscriptions = ttl.subscriptions || 0;
this._opts = opts;
this._opts.ttl = ttl;
this._shutdownClient = false;

@@ -108,3 +119,3 @@ this._client = null;

batch.push({
query: "INSERT INTO retained (topic, broker_id, broker_counter, cmd, dup, qos, payload) VALUES (?, ?, ?, ?, ?, ?, ?)",
query: "INSERT INTO retained (topic, broker_id, broker_counter, cmd, dup, qos, payload) VALUES (?, ?, ?, ?, ?, ?, ?) USING TTL ?",
params: [

@@ -117,10 +128,6 @@ p.packet.topic,

p.packet.qos,
p.packet.payload
p.packet.payload,
that._opts.ttl.packets.retained
]
});
if (that._opts.ttl.packets && that._opts.ttl.packets.retained) {
batch[batch.length - 1].query += " USING TTL ?";
batch[batch.length - 1].params.push(that._opts.ttl.packets.retained);
}
}

@@ -193,27 +200,17 @@ else {

.forEach(function(sub) {
const params = [client.id, sub.topic, sub.qos];
const params = [client.id, sub.topic, sub.qos, that._opts.ttl.subscriptions];
batch.push({
query: "INSERT INTO subscription (client_id, topic, qos) VALUES (?, ?, ?)",
query: "INSERT INTO subscription (client_id, topic, qos) VALUES (?, ?, ?) USING TTL ?",
params
}, {
query: "INSERT INTO subscription_by_topic (client_id, topic, qos) VALUES (?, ?, ?)",
query: "INSERT INTO subscription_by_topic (client_id, topic, qos) VALUES (?, ?, ?) USING TTL ?",
params
});
if (that._opts.ttl.subscriptions) {
batch[batch.length - 2].query += " USING TTL ?";
batch[batch.length - 1].query += " USING TTL ?";
params.push(that._opts.ttl.subscriptions);
}
if (sub.qos > 0) {
batch.push({
query: "INSERT INTO subscription_qos12 (client_id, topic, qos) VALUES (?, ?, ?)",
query: "INSERT INTO subscription_qos12 (client_id, topic, qos) VALUES (?, ?, ?) USING TTL ?",
params
});
if (that._opts.ttl.subscriptions) {
batch[batch.length - 1].query += " USING TTL ?";
}
}

@@ -255,3 +252,2 @@ });

// Rimuove la subscription con client id e topic
batch.push({

@@ -371,15 +367,14 @@ query: "DELETE FROM subscription WHERE client_id = ? AND topic = ?",

subs.map(function(sub) {
const params = [sub.clientId, newp.messageId, newp.brokerId, newp.brokerCounter, newp.cmd, newp.topic, newp.qos, newp.retain, newp.dup, newp.payload];
if (that._opts.ttl.packets && that._opts.ttl.packets.outgoing) {
params.push(that._opts.ttl.packets.outgoing);
}
const params = [sub.clientId, uuidv4(), newp.messageId, newp.brokerId, newp.brokerCounter, newp.cmd, newp.topic, newp.qos, newp.retain, newp.dup, newp.payload, that._opts.ttl.packets.outgoing];
batch.push({
query: "INSERT INTO outgoing (client_id, ref, message_id, broker_id, broker_counter, cmd, topic, qos, retain, dup, payload) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) USING TTL ?",
params
});
if (newp.messageId != null) {
batch.push({
query: "INSERT INTO outgoing (client_id, message_id, broker_id, broker_counter, cmd, topic, qos, retain, dup, payload) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
query: "INSERT INTO outgoing_by_message_id (client_id, ref, message_id, broker_id, broker_counter, cmd, topic, qos, retain, dup, payload) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) USING TTL ?",
params
});
if (that._opts.ttl.packets && that._opts.ttl.packets.outgoing) {
batch[batch.length - 1].query += " USING TTL ?";
}
}

@@ -389,8 +384,5 @@

batch.push({
query: "INSERT INTO outgoing_by_broker (client_id, message_id, broker_id, broker_counter, cmd, topic, qos, retain, dup, payload) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
query: "INSERT INTO outgoing_by_broker (client_id, ref, message_id, broker_id, broker_counter, cmd, topic, qos, retain, dup, payload) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) USING TTL ?",
params
});
if (that._opts.ttl.packets && that._opts.ttl.packets.outgoing) {
batch[batch.length - 1].query += " USING TTL ?";
}
}

@@ -433,2 +425,8 @@ });

const oldRow = result.rows[0];
if (oldRow == null) {
cb(new Error("Existing outgoing message not found"));
return;
}
const batch = [

@@ -446,18 +444,22 @@ {

const oldRow = result.rows[0];
if (oldRow || packet.messageId) {
if (oldRow && oldRow.messageId != null && oldRow.message_id.toNumber() != packet.messageId) {
batch.push({
query: "DELETE FROM outgoing WHERE client_id = ? AND message_id = ?",
params: [oldRow.client_id, oldRow.message_id.toNumber()]
});
}
const messageId = packet.messageId != null ? packet.messageId : (oldRow.message_id != null ? oldRow.message_id.toNumber() : null);
if (oldRow.messageId != null && oldRow.message_id.toNumber() != packet.messageId) {
batch.push({
query: "INSERT INTO outgoing (client_id, message_id, broker_id, broker_counter, cmd, topic, qos, retain, dup, payload) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
params: [client.id, messageId, oldRow.broker_id, oldRow.broker_counter, oldRow.cmd, oldRow.topic, oldRow.qos, oldRow.retain, oldRow.dup, oldRow.payload]
query: "DELETE FROM outgoing_by_message_id WHERE client_id = ? AND message_id = ?",
params: [oldRow.client_id, oldRow.message_id.toNumber()]
});
}
const messageId = packet.messageId != null ? packet.messageId : (oldRow.message_id != null ? oldRow.message_id.toNumber() : null);
batch.push({
query: "INSERT INTO outgoing_by_message_id (client_id, ref, message_id, broker_id, broker_counter, cmd, topic, qos, retain, dup, payload) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) USING TTL ?",
params: [client.id, oldRow.ref, messageId, oldRow.broker_id, oldRow.broker_counter, oldRow.cmd, oldRow.topic, oldRow.qos, oldRow.retain, oldRow.dup, oldRow.payload, that._opts.ttl.packets.outgoing]
}, {
query: "UPDATE outgoing SET message_id = ? WHERE client_id = ? AND ref = ?",
params: [
packet.messageId,
client.id,
oldRow.ref
]
});
that._client.batch(batch, { prepare: true }, function(err) {

@@ -469,3 +471,3 @@ cb(err, client, packet);

async function updatePacket(that, client, packet, cb) {
const result = await that._client.execute("SELECT * FROM outgoing WHERE client_id = ? AND message_id = ?", [
const result = await that._client.execute("SELECT * FROM outgoing_by_message_id WHERE client_id = ? AND message_id = ?", [
client.id,

@@ -475,26 +477,36 @@ packet.messageId

const batch = [
{
query: "INSERT INTO outgoing (client_id, message_id, broker_id, broker_counter, cmd, topic, qos, retain, dup, payload) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
params: [client.id, packet.messageId, packet.brokerId, packet.brokerCounter, packet.cmd, packet.topic, packet.qos, packet.retain, packet.dup, packet.payload]
}
];
const oldRow = result.rows[0];
if (oldRow || (packet.brokerId && packet.brokerCounter)) {
if (oldRow && oldRow.broker_id!= null && oldRow.broker_counter != null && (oldRow.broker_id != packet.brokerId || oldRow.broker_counter.toNumber() != packet.brokerCounter)) {
batch.push({
query: "DELETE FROM outgoing_by_broker WHERE client_id = ? AND broker_id = ? AND broker_counter = ?",
params: [oldRow.client_id, oldRow.broker_id, oldRow.broker_counter.toNumber()]
});
}
if (oldRow == null) {
cb(new Error("Existing outgoing message not found"));
return;
}
const brokerId = packet.brokerId != null ? packet.brokerId : oldRow.broker_id;
const brokerCounter = packet.brokerCounter != null ? packet.brokerCounter : (oldRow.broker_counter != null ? oldRow.broker_counter.toNumber() : null);
const brokerId = packet.brokerId != null ? packet.brokerId : oldRow.broker_id;
const brokerCounter = packet.brokerCounter != null ? packet.brokerCounter : (oldRow.broker_counter != null ? oldRow.broker_counter.toNumber() : null);
const params = [client.id, oldRow.ref, packet.messageId, brokerId, brokerCounter, packet.cmd, packet.topic, packet.qos, packet.retain, packet.dup, packet.payload, that._opts.ttl.packets.outgoing];
const batch = [{
query: "INSERT INTO outgoing_by_message_id (client_id, ref, message_id, broker_id, broker_counter, cmd, topic, qos, retain, dup, payload) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) USING TTL ?",
params
}, {
query: "UPDATE outgoing SET message_id = ? WHERE client_id = ? AND ref = ?",
params: [
packet.messageId,
client.id,
oldRow.ref
]
}];
if (oldRow.broker_id!= null && oldRow.broker_counter != null && (oldRow.broker_id != packet.brokerId || oldRow.broker_counter.toNumber() != packet.brokerCounter)) {
batch.push({
query: "INSERT INTO outgoing_by_broker (client_id, message_id, broker_id, broker_counter, cmd, topic, qos, retain, dup, payload) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
params: [client.id, packet.messageId, brokerId, brokerCounter, packet.cmd, packet.topic, packet.qos, packet.retain, packet.dup, packet.payload]
query: "DELETE FROM outgoing_by_broker WHERE client_id = ? AND broker_id = ? AND broker_counter = ?",
params: [oldRow.client_id, oldRow.broker_id, oldRow.broker_counter.toNumber()]
});
}
batch.push({
query: "INSERT INTO outgoing_by_broker (client_id, ref, message_id, broker_id, broker_counter, cmd, topic, qos, retain, dup, payload) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) USING TTL ?",
params
});
that._client.batch(batch, { prepare: true }, function(err) {

@@ -524,5 +536,5 @@ cb(err, client, packet);

let outgoingPacket;
let oldRow;
try {
const result = await this._client.execute("SELECT * FROM outgoing WHERE client_id = ? AND message_id = ?", [client.id, packet.messageId], { prepare: true });
const result = await this._client.execute("SELECT * FROM outgoing_by_message_id WHERE client_id = ? AND message_id = ?", [client.id, packet.messageId], { prepare: true });

@@ -533,3 +545,3 @@ if (!result.rows.length) {

outgoingPacket = asPacket(result.rows[0]);
oldRow = result.rows[0];
}

@@ -540,13 +552,27 @@ catch (err) {

this._client.batch([
if (oldRow == null) {
cb(new Error("Existing outgoing message not found"));
return;
}
const batch = [
{
query: "DELETE FROM outgoing WHERE client_id = ? AND message_id = ?",
params: [client.id, outgoingPacket.messageId]
query: "DELETE FROM outgoing WHERE client_id = ? AND ref = ?",
params: [client.id, oldRow.ref]
},
{
query: "DELETE FROM outgoing_by_message_id WHERE client_id = ? AND message_id = ?",
params: [client.id, oldRow.message_id.toNumber()]
}
];
if (oldRow.broker_id != null && oldRow.broker_counter != null) {
batch.push({
query: "DELETE FROM outgoing_by_broker WHERE client_id = ? AND broker_id = ? AND broker_counter = ?",
params: [client.id, outgoingPacket.brokerId, outgoingPacket.brokerCounter]
}
], { prepare: true }, function(err) {
cb(err, outgoingPacket);
params: [client.id, oldRow.broker_id, oldRow.broker_counter.toNumber()]
});
}
this._client.batch(batch, { prepare: true }, function(err) {
cb(err, asPacket(oldRow));
});

@@ -564,3 +590,3 @@ };

let query = "INSERT INTO incoming (client_id, message_id, broker_id, broker_counter, cmd, topic, qos, retain, dup, payload) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
let query = "INSERT INTO incoming (client_id, message_id, broker_id, broker_counter, cmd, topic, qos, retain, dup, payload) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) USING TTL ?";
const params = [

@@ -576,8 +602,5 @@ client.id,

newp.dup,
newp.payload
newp.payload,
this._opts.ttl.packets.incoming
];
if (this._opts.ttl.packets && this._opts.ttl.packets.incoming) {
query += " USING TTL ?";
params.push(this._opts.ttl.packets.incoming);
}

@@ -632,3 +655,3 @@ this._client.execute(query, params, { prepare: true }, cb);

let query = "INSERT INTO last_will (client_id, broker_id, topic, qos, retain, payload) VALUES (?, ?, ?, ?, ?, ?)";
let query = "INSERT INTO last_will (client_id, broker_id, topic, qos, retain, payload) VALUES (?, ?, ?, ?, ?, ?) USING TTL ?";
const params = [

@@ -640,8 +663,5 @@ packet.clientId,

packet.retain,
packet.payload
packet.payload,
this._opts.ttl.packets.will
];
if (this._opts.ttl.packets && this._opts.ttl.packets.will) {
query += " USING TTL ?";
params.push(this._opts.ttl.packets.will);
}

@@ -648,0 +668,0 @@ this._client.execute(query, params, { prepare: true }, function(err) {

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc