Comparing version 0.0.1 to 0.1.0
{ | ||
"name": "iomem", | ||
"version": "0.0.1", | ||
"version": "0.1.0", | ||
"description": "Memcached client with binary protocol support and multi keys commands", | ||
@@ -34,3 +34,5 @@ "keywords": [ | ||
"postversion": "git push --tags && yarn publish . --new-version $npm_package_version && git push && echo Successfully released version $npm_package_version!", | ||
"cleanup": "git tag -d $(git tag) && git fetch --all --tags && git clean --force -d -x && git reset --hard origin/main && git checkout main" | ||
"cleanup": "git tag -d $(git tag) && git fetch --all --tags && git clean --force -d -x && git reset --hard origin/main && git checkout main", | ||
"docker:build": "docker build -t memcached-sasl .", | ||
"docker:run": "docker run -d --name iomem-memcached-sasl -e SASL_PASSWORD=test -p 11211:11211 -it memcached-sasl" | ||
}, | ||
@@ -40,2 +42,3 @@ "files": [ | ||
], | ||
"main": "src/client.js", | ||
"devDependencies": { | ||
@@ -42,0 +45,0 @@ "eslint": "^8.31.0", |
260
README.md
# `iomem` | ||
**WARNING! THIS SOFTWARE IS A WORK IN PROGRESS! DO NOT USE IT!** | ||
**WARNING! THIS SOFTWARE IS UNFINISHED! ANYTHING CAN CHANGE AT ANY MOMENT!** | ||
@@ -32,7 +32,11 @@ Memcached client implementing binary protocol with native multiple keys support. | ||
```js | ||
const Memcached = require('iomem') | ||
const iomem = new Memcached() | ||
const Mem = require('iomem') | ||
const iomem = new Mem() | ||
await iomem.set('test:key', 'hello') | ||
console.log(await iomem.get('test:key')) | ||
const value = await iomem.get('test:key') | ||
console.log(value) | ||
iomem.end() // call end() when your script or web server exits | ||
``` | ||
@@ -43,21 +47,41 @@ | ||
```js | ||
const Memcached = require('iomem') | ||
const iomem = new Memcached() | ||
const Mem = require('iomem') | ||
const iomem = new Mem() | ||
await iomem.set(['test:key1', 'test:key2'], ['hello', 'world']) | ||
console.log(await iomem.get(['test:key1', 'test:key2'])) | ||
// set the same value for multiple keys | ||
await iomem.set(['test:key1', 'test:key2'], 'test') | ||
// set different values with `key => value` object | ||
await iomem.setk({ 'test:key1': 'hello', 'test:key2': 'world' }) | ||
// get values as an array | ||
await iomem.get(['test:key1', 'test:key2']) | ||
// get values as a `key => value` object | ||
await iomem.getk(['test:key1', 'test:key2']) | ||
// delete keys | ||
await iomem.del(['test:key1', 'test:key2']) | ||
... | ||
iomem.end() // call end() when your script or web server exits | ||
``` | ||
For more details please see [Commands](#commands) section. | ||
### Custom servers | ||
```js | ||
const Memcached = require('iomem') | ||
const iomem = new Memcached(['127.0.0.1:11211', '127.0.0.2:11211']) | ||
const Mem = require('iomem') | ||
const iomem = new Mem(['127.0.0.1:11211', '127.0.0.2:11211']) | ||
... | ||
iomem.end() // call end() when your script or web server exits | ||
``` | ||
Address formats: | ||
Supported address formats: | ||
```js | ||
// host | ||
or | ||
// host:port | ||
@@ -67,10 +91,128 @@ or | ||
or | ||
// username:password@host | ||
or | ||
// /path/to/memcached.sock | ||
``` | ||
### Commands | ||
Please note that the library automatically performs value serialization and deserialization. Here is a list of the possible value types: | ||
`value: string|String|Number|BigInt|Boolean|Date|Array|Buffer|Object|null` | ||
Be aware that any `value` in the below commands list refers to a value of any type specified above. | ||
The following data types for `key` and `expiry` are must by ensured by the library user: | ||
`key: string` - storage key, 250 bytes max as defined in [Memcached](https://github.com/memcached/memcached/blob/master/memcached.h#L68) | ||
`expiry: unsigned integer` - time interval in seconds, defaults to `expiry` from the client config. | ||
For more details please [Memcached commands](https://github.com/memcached/memcached/wiki/BinaryProtocolRevamped#commands). | ||
#### GET | ||
`get(key): value|null` - get a value for a single key. | ||
`get([key1, ...]): [value, ...]` - get an array of values for multiple keys. | ||
`getk(key): {key: value}|null` - get a `key => value` object for a single key. | ||
`getk([key1, ...]): {key: value, ...}` - get a `key => value` object for multiple keys. | ||
`gets(key): {key: cas}|null` - get a `key => cas` object for a single key. | ||
`gets([key1, ...]): {key: cas, ...}` - get a `key => cas` object for multiple keys. | ||
`getsv(key): {key: {value, cas}}|null` - get a `key => { value, cas }` object for a single key. | ||
`getsv([key1, ...]): {key: {value, cas}}, ...}` - get a `key => { value, cas }` object for multiple keys. | ||
#### SET | ||
Set methods return `true` when all values were successfully set. Otherwise, when client receives `0x0001` or `0x0002` [statuses ](https://github.com/memcached/memcached/wiki/BinaryProtocolRevamped#response-status) from Memcached (this is abnormal behavior for set commands), the returned value will be `false`. | ||
`set(key, value, expiry): true|false` - set a value for a single key. | ||
`set([key1, ...], value, expiry): true|false` - set the same value for multiple keys. | ||
`setk({key: value, ...}, expiry): true|false` - set multiple values with `key => value` object. | ||
#### ADD | ||
Add commands set a key only when it is not set yet (a key does not exist in the Memcached). The methods will return `false` when at least one key was not successfully set (meaning a key was already set with some value, so it was not set with the value you provided with a command). | ||
`add(key, value, expiry): true|false` - add a value for a single key. | ||
`add([key1, ...], value, expiry): true|false` - add the same value for multiple keys. | ||
`addk({key: value, ...}, expiry): true|false` - add multiple values with `key => value` object. | ||
#### REPLACE | ||
Replace commands set a new value for a key only when it is already set with some value (a key does exist in the Memcached). The methods will return `false` when at least one key was not successfully set (meaning a key did not exist in the Memcached when you ran a command). | ||
`replace(key, value, expiry): true|false` - replace a value for a single key. | ||
`replace([key1, ...], value, expiry): true|false` - replace the same value for multiple keys. | ||
`replacek({key: value, ...}, expiry): true|false` - replace multiple values with `key => value` object. | ||
#### CAS | ||
The `cas` command sets a key with a new value only when `cas` parameter matches `cas` value stored in the key. To retrieve the current `cas` value for a key please see [GET](#get) commands. | ||
`cas(key, value, cas, expiry): true|false` - set a value if the cas matches. | ||
#### DEL | ||
Delete commands delete a key only when it exists. The methods will return `false` when at least one key does not exist. | ||
`del(key): true|false` - delete a key. | ||
`del([key1, ...]): true|false` - delete multiple keys. | ||
#### INCERMENT AND DECREMENT | ||
Increment and decrement commands add or substract the specified `delta` value from the current counter value initialized with `initial` value. You can use `SET`, `ADD`, `REPLACE` commands to set a counter value. | ||
`incr(key, initial, delta, expiry): value` - increments counter and returns its value. | ||
`decr(key, initial, delta, expiry): value` - decrements coutner and returns its value. | ||
Paramters: | ||
`initial: BigInt` - initial counter value | ||
`delta: BigInt` - amount to add or substruct from a counter | ||
`expiry` - see [Commands](#commands) | ||
If you want to get the current value from a counter without changing its value, use [GET](#get) commands and manually deserialize the response buffer. | ||
```js | ||
... | ||
const FLAGS = require('iomem/src/flags') | ||
const { deserialize } = require('iomem/src/serializer') | ||
deserialize(await iomem.get('test:foo'), FLAGS.bigint) | ||
... | ||
``` | ||
#### FLUSH | ||
`flush()` - flush cached items. | ||
`flush(expiry)` - flush cached items in `expiry` seconds. | ||
### Streams | ||
#### Case #1: | ||
Force `iomem` methods to return a stream instead of a promise by passing `stream: true` flag. | ||
```js | ||
const Memcached = require('iomem') | ||
const iomem = new Memcached(['127.0.0.1:11211'], { stream: true }) | ||
const Mem = require('iomem') | ||
const iomem = new Mem(['127.0.0.1:11211'], { stream: true }) | ||
@@ -80,2 +222,6 @@ const { pipeline, Writable } = require('node:stream') | ||
class Echo extends Writable { | ||
constructor (opts) { | ||
super({ objectMode: true, ...opts }) | ||
} | ||
_write (data, _, cb) { | ||
@@ -87,9 +233,79 @@ console.log(data) | ||
pipeline(iomem.get('test:a'), new Echo({ objectMode: true }), err => { | ||
pipeline(iomem.get('test:a'), new Echo(), err => { | ||
if (err) { | ||
console.log(err) | ||
console.error(err) | ||
} | ||
}) | ||
... | ||
iomem.end() // call end() when your script or web server exits | ||
``` | ||
#### Case #2: | ||
Create a stream with special method called `stream` and supply data with readable stream. Do not care about `stream` flag. | ||
**This is the recommended approach** | ||
```js | ||
const Mem = require('iomem') | ||
const iomem = new Mem(['127.0.0.1:11211']) | ||
const { pipeline, Readable, Writable } = require('node:stream') | ||
class Echo extends Writable { | ||
constructor (opts) { | ||
super({ objectMode: true, ...opts }) | ||
} | ||
_write (data, _, cb) { | ||
console.log(data) | ||
cb() | ||
} | ||
} | ||
pipeline(Readable.from([Mem.get('test:a')][Symbol.iterator]()), iomem.stream(), new Echo(), err => { | ||
if (err) { | ||
console.error(err) | ||
} | ||
}) | ||
... | ||
iomem.end() // call end() when your script or web server exits | ||
``` | ||
#### Case #3: | ||
Combine case #1 with readable stream to supply extra data into the stream. | ||
```js | ||
const Mem = require('iomem') | ||
const iomem = new Mem(['127.0.0.1:11211'], { stream: true }) | ||
const { pipeline, Readable, Writable } = require('node:stream') | ||
class Echo extends Writable { | ||
constructor (opts) { | ||
super({ objectMode: true, ...opts }) | ||
} | ||
_write (data, _, cb) { | ||
console.log(data) | ||
cb() | ||
} | ||
} | ||
pipeline(Readable.from([Mem.get('test:a')][Symbol.iterator]()), iomem.get('test:b'), new Echo(), err => { | ||
if (err) { | ||
console.error(err) | ||
} | ||
}) | ||
... | ||
iomem.end() // call end() when your script or web server exits | ||
``` | ||
## Options | ||
@@ -99,9 +315,13 @@ | ||
{ | ||
stream: false, // set true to use streams instead of promises | ||
stream: false, // set true to force client methods return streams instead of promises | ||
expiry: 60 * 60 * 24 * 1, // 1 day, time interval in seconds | ||
maxConnection: 10, // max connections per server | ||
connectionTimeout: 1000, // connection timeout | ||
timeout: 500, // request timeout | ||
retries: 2 // request retries | ||
maxConnections: 10, // max connections per server | ||
connectionTimeout: 1000, // connection timeout in milliseconds | ||
timeout: 500, // request timeout in milliseconds | ||
retries: 2, // request retries - max retries | ||
retriesDelay: 100, // request retries - initial delay | ||
retriesFactor: 2 // request retries - exponential factor | ||
} | ||
``` | ||
Please take a look at [Case #2](#case-2) for a better approach before enabling `stream` flag. |
@@ -5,9 +5,13 @@ 'use strict' | ||
const DEFAULT_EXPIRY = 60 * 60 * 24 * 1 // 1 day | ||
const DEFAULT_OPTIONS = { | ||
stream: false, // set true to use streams instead of promises | ||
expiry: 60 * 60 * 24 * 1, // 1 day, time interval in seconds | ||
maxConnection: 10, // max connections per server | ||
connectionTimeout: 1000, // connection timeout | ||
timeout: 500, // request timeout | ||
retries: 2 // request retries | ||
expiry: DEFAULT_EXPIRY, // the time interval in seconds | ||
maxConnections: 10, // max connections per server | ||
connectionTimeout: 1000, // connection timeout in milliseconds | ||
timeout: 500, // request timeout in milliseconds | ||
retries: 2, // request retries - max retries | ||
retriesDelay: 100, // request retries - initial delay | ||
retriesFactor: 2 // request retries - exponential factor | ||
} | ||
@@ -19,24 +23,133 @@ | ||
this._net = new Net(servers, this._options) | ||
this._opaque = 0 | ||
} | ||
set (key, value, expiry) { | ||
return this.query('set', key, value, expiry || this._options.expiry, 0) | ||
static get (key) { | ||
return ['get', key] | ||
} | ||
static getk (key) { | ||
return ['getk', key] | ||
} | ||
static gets (key) { | ||
return ['gets', key] | ||
} | ||
static getsv (key) { | ||
return ['getsv', key] | ||
} | ||
static set (key, value, expiry = DEFAULT_EXPIRY) { | ||
return ['set', key, value, expiry] | ||
} | ||
static setk (key, expiry = DEFAULT_EXPIRY) { | ||
return ['set', key, expiry] | ||
} | ||
static add (key, value, expiry = DEFAULT_EXPIRY) { | ||
return ['add', key, value, expiry] | ||
} | ||
static addk (key, expiry = DEFAULT_EXPIRY) { | ||
return ['add', key, expiry] | ||
} | ||
static replace (key, value, expiry = DEFAULT_EXPIRY) { | ||
return ['replace', key, value, expiry] | ||
} | ||
static replacek (key, expiry = DEFAULT_EXPIRY) { | ||
return ['replace', key, expiry] | ||
} | ||
static cas (key, value, cas, expiry = DEFAULT_EXPIRY) { | ||
return ['cas', key, value, expiry, cas] | ||
} | ||
static del (key) { | ||
return ['del', key] | ||
} | ||
static incr (key, initial, delta, expiry = DEFAULT_EXPIRY) { | ||
return ['incr', key, initial, delta, expiry] | ||
} | ||
static decr (key, initial, delta, expiry = DEFAULT_EXPIRY) { | ||
return ['decr', key, initial, delta, expiry] | ||
} | ||
static flush (expiry) { | ||
return ['flush', expiry] | ||
} | ||
get (key) { | ||
return this.query('get', key) | ||
return this._net.query(Client.get(key)) | ||
} | ||
getk (key) { | ||
return this._net.query(Client.getk(key)) | ||
} | ||
gets (key) { | ||
return this._net.query(Client.gets(key)) | ||
} | ||
getsv (key) { | ||
return this._net.query(Client.getsv(key)) | ||
} | ||
set (key, value, expiry) { | ||
return this._net.query(Client.set(key, value, expiry || this._options.expiry)) | ||
} | ||
setk (key, expiry) { | ||
return this._net.query(Client.setk(key, expiry || this._options.expiry)) | ||
} | ||
add (key, value, expiry) { | ||
return this._net.query(Client.add(key, value, expiry || this._options.expiry)) | ||
} | ||
addk (key, expiry) { | ||
return this._net.query(Client.addk(key, expiry || this._options.expiry)) | ||
} | ||
replace (key, value, expiry) { | ||
return this._net.query(Client.replace(key, value, expiry || this._options.expiry)) | ||
} | ||
replacek (key, expiry) { | ||
return this._net.query(Client.replacek(key, expiry || this._options.expiry)) | ||
} | ||
cas (key, value, cas, expiry) { | ||
return this._net.query(Client.cas(key, value, cas, expiry || this._options.expiry)) | ||
} | ||
del (key) { | ||
return this.query('del', key) | ||
return this._net.query(Client.del(key)) | ||
} | ||
query (...args) { | ||
this._opaque++ | ||
this._opaque = this._opaque % 0xffffffff | ||
return this._net.query(...args, this._opaque) | ||
incr (key, initial, delta, expiry) { | ||
return this._net.query(Client.incr(key, initial, delta, expiry || this._options.expiry)) | ||
} | ||
decr (key, initial, delta, expiry) { | ||
return this._net.query(Client.decr(key, initial, delta, expiry || this._options.expiry)) | ||
} | ||
flush (expiry) { | ||
return this._net.query(Client.flush(expiry)) | ||
} | ||
stream () { | ||
return this._net.query() | ||
} | ||
end () { | ||
this._net.end() | ||
} | ||
} | ||
module.exports = Client |
'use strict' | ||
// Flags: | ||
// https://github.com/memcached/memcached/blob/master/memcached.h#L546 | ||
// The flags are client specific. | ||
// We use them to encode/decode stored values by type. | ||
/* eslint-disable key-spacing */ | ||
const FLAGS = { | ||
ITEM_LINKED: 1, | ||
ITEM_CAS: 2, | ||
/* temp */ | ||
ITEM_SLABBED: 4, | ||
/* Item was fetched at least once in its lifetime */ | ||
ITEM_FETCHED: 8, | ||
/* Appended on fetch, removed on LRU shuffling */ | ||
ITEM_ACTIVE: 16, | ||
/* If an item's storage are chained chunks. */ | ||
ITEM_CHUNKED: 32, | ||
ITEM_CHUNK: 64, | ||
/* ITEM_data bulk is external to item */ | ||
ITEM_HDR: 128, | ||
/* additional 4 bytes for item client flags */ | ||
ITEM_CFLAGS: 256, | ||
/* item has sent out a token already */ | ||
ITEM_TOKEN_SENT: 512, | ||
/* reserved, in case tokens should be a 2-bit count in future */ | ||
ITEM_TOKEN_RESERVED: 1024, | ||
/* if item has been marked as a stale value */ | ||
ITEM_STALE: 2048, | ||
/* if item key was sent in binary */ | ||
ITEM_KEY_BINARY: 4096 | ||
// Types | ||
// We don't care about number, boolean, null. They will be handled as object type by JSON.stringify | ||
// Function, Symbol, and undefined are not stringifiable. | ||
string: 1 << 0, // we keep it here to not have "" wrappers by JSON.stringify | ||
bigint: 1 << 1, // we keep it here b/c JSON.stringify does not support BigInt | ||
object: 1 << 2, // this is the JSON.stringify product, the default serialization method | ||
// Instances | ||
Buffer: 1 << 3, // buffers are handled as they are | ||
Date: 1 << 4, // date is a common type that we want to handle | ||
String: 1 << 5, // string object must remain a string object | ||
// Compression flag | ||
compressed: 0b10000000000000000000000000000000 | ||
} | ||
module.exports = FLAGS |
165
src/net.js
@@ -6,6 +6,9 @@ 'use strict' | ||
const HashRing = require('hashring') | ||
const Server = require('./server') | ||
const protocol = require('./protocol') | ||
const { getQuietOpcodeByName } = require('./opcodes') | ||
const { buildPacket, parsePacket, parseHeader, HEADER_LENGTH, REQUEST_MAGIC, RESPONSE_MAGIC } = require('./packet') | ||
const { STATUS_MESSAGE_MAP, STATUS_MESSAGE_UNKOWN, STATUS_SUCCESS, STATUS_NOT_FOUND, STATUS_EXISTS } = require('./statuses') | ||
const { getKeyFlags } = require('./keys') | ||
@@ -19,20 +22,44 @@ const HASHRING_ALGORITHM = 'md5' | ||
constructor (options = {}) { | ||
const { getKeysByServer, ...opts } = options | ||
const { getKeysSetByServer, getKeysMapByServer, getKeysSetByAllServers, config = {}, ...opts } = options | ||
super({ objectMode: true, ...opts }) | ||
this.getKeysByServer = getKeysByServer | ||
this.getKeysSetByServer = getKeysSetByServer | ||
this.getKeysMapByServer = getKeysMapByServer | ||
this.getKeysSetByAllServers = getKeysSetByAllServers | ||
this.config = config | ||
} | ||
_transform (data, _, cb) { | ||
_try (data, cb) { | ||
const [method, key, ...args] = data | ||
const multikey = Array.isArray(key) | ||
const opaque = args[args.length - 1] | ||
const keysByServer = this.getKeysByServer(multikey ? key : [key]) | ||
const keyFlags = getKeyFlags(key) | ||
const quietOpcode = getQuietOpcodeByName(method) | ||
const keysByServer = keyFlags.isEmpty | ||
? this.getKeysSetByAllServers(key) | ||
: keyFlags.isObject | ||
? this.getKeysMapByServer(key) | ||
: this.getKeysSetByServer(keyFlags.isArray ? key : [key]) | ||
const buffer = [] | ||
const buffer = protocol[method].bykeys ? {} : [] | ||
let serversHit = 0 // server got hit when we received a response with the last opaque sent to the server | ||
const keysStat = { length: 0, exists: 0, misses: 0 } | ||
keysByServer.forEach((keys, server) => { | ||
// build request packet | ||
const opaques = new Set() | ||
let lastOpaque | ||
let packet = Buffer.alloc(0) | ||
keys.forEach(key => | ||
(packet = Buffer.concat([packet, buildPacket(REQUEST_MAGIC, ...protocol[method](key, ...args))]))) | ||
// process keys | ||
let counter = 0 | ||
keys.forEach((value, key) => { | ||
lastOpaque = Net.opaque() | ||
opaques.add(lastOpaque) | ||
const params = keyFlags.isObject | ||
? protocol[method](key, value, ...args, lastOpaque) | ||
: protocol[method](key, ...args, lastOpaque) | ||
if (keyFlags.isMultikey && quietOpcode && keys.size !== ++counter) { | ||
params[0] = quietOpcode | ||
} | ||
packet = Buffer.concat([packet, buildPacket(REQUEST_MAGIC, ...params)]) | ||
keysStat.length++ | ||
}) | ||
// get socket from server | ||
@@ -48,4 +75,4 @@ const sock = server.getSocket() | ||
sock.unpipe(pass) | ||
cb(new Error(`iomem: request timeout (${1000})`)) | ||
}, 1000) | ||
cb(new Error(`iomem: request timeout (${this.config.timeout})`)) | ||
}, this.config.timeout) // maybe use connectionTimeout when sock.readyState === 'opening' || sock.readyState === 'closed'? | ||
@@ -62,3 +89,4 @@ const done = (err, data) => { | ||
chunks = Buffer.concat([chunks, chunk]) | ||
while (chunks.length >= HEADER_LENGTH) { | ||
let error | ||
while (chunks.length >= HEADER_LENGTH && !error) { | ||
const header = parseHeader(chunks.slice(0, HEADER_LENGTH)) | ||
@@ -71,5 +99,18 @@ if (header[0] !== RESPONSE_MAGIC) { // wrong magic | ||
const packet = parsePacket(chunks.slice(0, packetSize), header) | ||
// TODO: check status | ||
if (packet && packet[6] === opaque) { // check packet and opaque | ||
buffer.push(packet) | ||
if (packet) { // check packet | ||
const opaque = packet[packet.length - 1] | ||
serversHit += Number(opaque === lastOpaque) | ||
if (opaques.has(opaque)) { // check opaque | ||
if (packet[5] === STATUS_SUCCESS) { // success | ||
if (protocol[method].format) { | ||
protocol[method].format(packet, buffer) | ||
} | ||
} else if (packet[5] === STATUS_EXISTS) { // exists | ||
keysStat.exists++ | ||
} else if (packet[5] === STATUS_NOT_FOUND) { // not found | ||
keysStat.misses++ | ||
} else { | ||
error = new Error(`iomem: response error: ${STATUS_MESSAGE_MAP[packet[5]] || `${STATUS_MESSAGE_UNKOWN} (${packet[5]})`}`) | ||
} | ||
} | ||
} | ||
@@ -81,15 +122,13 @@ chunks = chunks.slice(packetSize) | ||
} | ||
if (buffer.length >= keysByServer.size) { // this will not work for multi-get with getkq and getk | ||
done(null, multikey ? buffer : buffer[0]) | ||
if (error) { | ||
done(error) | ||
} else if (serversHit === keysByServer.size) { | ||
done(null, protocol[method].result ? protocol[method].result(keyFlags, buffer, keysStat) : null) | ||
} | ||
}) | ||
// socket disconnected | ||
// socket end | ||
pass.on('end', () => { | ||
console.log('>>>>end') | ||
done() | ||
// console.log('???????end', d, sock.read()) | ||
// this.push(sock.read()) | ||
// counter++ | ||
}) // TODO: check opaque and have timeout | ||
done(new Error('iomem: socket closed unexpectedly')) | ||
}) | ||
@@ -106,14 +145,35 @@ // socket error | ||
} | ||
_transform (data, _, cb) { | ||
const retry = (retries, ms) => { | ||
this._try(data, (err, data) => { | ||
if (err && retries > 0) { | ||
setTimeout(() => { | ||
retry(retries - 1, ms * this.config.retriesFactor) | ||
}, ms) | ||
} else { | ||
cb(err, data) | ||
} | ||
}) | ||
} | ||
retry(this.config.retries, this.config.retriesDelay) | ||
} | ||
} | ||
class Net { | ||
static _opaque = 0 | ||
static opaque () { | ||
Net._opaque = ++Net._opaque % 0xffffffff | ||
return Net._opaque | ||
} | ||
constructor (servers = [DEFAULT_ADDRESS], options = {}) { | ||
this._options = options | ||
this._servers = new Map() | ||
// this._sockets = | ||
this._opaque = 0 | ||
servers.forEach(address => { | ||
const server = new Server(address) | ||
this._servers.set(server.hostname, new Server(address)) | ||
const server = new Server(address, this._options.maxConnections, this._options.connectionTimeout) | ||
this._servers.set(server.hostname, server) | ||
}) | ||
@@ -127,3 +187,3 @@ | ||
getKeysByServer = keys => { | ||
getKeysSetByServer = keys => { | ||
let server | ||
@@ -143,17 +203,44 @@ return keys.reduce((map, key) => { | ||
query (method, key, ...args) { | ||
getKeysMapByServer = keys => { | ||
let server | ||
const map = new Map() | ||
for (const key in keys) { | ||
server = this._servers.length === 1 | ||
? this._servers.values().next().value | ||
: this._servers.get(this._ring.get(key)) | ||
if (map.has(server)) { | ||
map.get(server).set(key, keys[key]) | ||
} else { | ||
map.set(server, new Map([[key, keys[key]]])) | ||
} | ||
} | ||
return map | ||
} | ||
getKeysSetByAllServers = (key) => { | ||
const map = new Map() | ||
this._servers.forEach(server => map.set(server, new Set([key]))) | ||
return map | ||
} | ||
query (args = []) { | ||
const net = new NetStream({ | ||
getKeysByServer: this.getKeysByServer | ||
getKeysSetByServer: this.getKeysSetByServer, | ||
getKeysMapByServer: this.getKeysMapByServer, | ||
getKeysSetByAllServers: this.getKeysSetByAllServers, | ||
config: this._options | ||
}) | ||
if (method && key) { | ||
const pass = new PassThrough({ objectMode: true }) | ||
let pass | ||
if (args[0]) { | ||
pass = new PassThrough({ objectMode: true }) | ||
pass.pipe(net) | ||
pass.end([method, key, ...args]) | ||
} | ||
if (this._options.stream) { | ||
if (this._options.stream || !args[0]) { | ||
pass && pass.write(args) | ||
return net | ||
} | ||
pass && pass.end(args) | ||
return new Promise((resolve, reject) => { | ||
@@ -165,4 +252,8 @@ net.on('data', resolve) | ||
} | ||
end () { | ||
this._servers.forEach(server => server.end()) | ||
} | ||
} | ||
module.exports = Net |
@@ -5,4 +5,6 @@ 'use strict' | ||
// https://github.com/memcached/memcached/wiki/BinaryProtocolRevamped#command-opcodes | ||
module.exports = { | ||
const OPCODES = { | ||
// Opcodes having a quiet version | ||
get: 0x00, | ||
getk: 0x0c, | ||
set: 0x01, | ||
@@ -14,11 +16,58 @@ add: 0x02, | ||
decrement: 0x06, | ||
quit: 0x07, | ||
flush: 0x08, | ||
noop: 0x0a, | ||
getk: 0x0c, | ||
append: 0x0e, | ||
prepend: 0x0f, | ||
gat: 0x1d, | ||
// Quiet version of the above opcodes | ||
getq: 0x09, | ||
getkq: 0x0d, | ||
stat: 0x10, | ||
setq: 0x11, | ||
addq: 0x12, | ||
sasl_list: 0x20, | ||
sasl_auth: 0x21 | ||
replaceq: 0x13, | ||
deleteq: 0x14, | ||
incrementq: 0x15, | ||
decrementq: 0x16, | ||
quitq: 0x17, | ||
flushq: 0x18, | ||
appendq: 0x19, | ||
prependq: 0x1a, | ||
gatq: 0x1e, | ||
// Opcodes not having a quiet equivalent | ||
stat: 0x10, | ||
noop: 0x0a, | ||
version: 0x0b, | ||
touch: 0x1c, | ||
// SASL: | ||
// https://github.com/memcached/memcached/wiki/SASLHowto | ||
// https://github.com/memcached/memcached/wiki/SASLAuthProtocol | ||
// https://en.wikipedia.org/wiki/Simple_Authentication_and_Security_Layer | ||
saslauth: 0x21 | ||
} | ||
// Opcodes to quiet version opcodes map | ||
/* eslint-disable no-multi-spaces */ | ||
const OPCODES_QUIET_MAP = new Map([ | ||
[OPCODES.get, OPCODES.getq], | ||
[OPCODES.getk, OPCODES.getkq], | ||
[OPCODES.set, OPCODES.setq], | ||
[OPCODES.add, OPCODES.addq], | ||
[OPCODES.replace, OPCODES.replaceq], | ||
[OPCODES.delete, OPCODES.deleteq], | ||
[OPCODES.increment, OPCODES.incrementq], | ||
[OPCODES.decrement, OPCODES.decrementq], | ||
[OPCODES.quit, OPCODES.quitq], | ||
[OPCODES.flush, OPCODES.flushq], | ||
[OPCODES.append, OPCODES.appendq], | ||
[OPCODES.prepend, OPCODES.prependq], | ||
[OPCODES.gat, OPCODES.gatq] | ||
]) | ||
module.exports = { | ||
OPCODES, | ||
getQuietOpcode: opcode => OPCODES_QUIET_MAP.get(opcode), | ||
getQuietOpcodeByName: name => OPCODES_QUIET_MAP.get(OPCODES[name]) | ||
} |
@@ -8,6 +8,7 @@ 'use strict' | ||
const DEFAULT_VALUE = '' | ||
const DEFAULT_KEY = '' | ||
const DEFAULT_VALUE = Buffer.alloc(0) | ||
const DEFAULT_EXTRAS = Buffer.alloc(0) | ||
const DEFAULT_OPAQUE = 0x00 | ||
const DEFAULT_CAS = 0x00 | ||
const DEFAULT_CAS = BigInt(0x00) | ||
const DEFAULT_DATA_TYPE = 0x00 // Reserved for future use https://github.com/memcached/memcached/wiki/BinaryProtocolRevamped#data-types | ||
@@ -43,3 +44,3 @@ const DEFAULT_STATUS = 0x00 | ||
const buildHeader = (magic, opcode, keyLength, valueLength, extrasLength, status, opaque, cas) => { | ||
const buildHeader = (magic, opcode, keyLength, valueLength, extrasLength, status, cas, opaque) => { | ||
const header = Buffer.alloc(24) | ||
@@ -64,3 +65,3 @@ | ||
// 16 | ||
header.writeUInt32BE(cas, 16) | ||
header.writeBigInt64BE(cas, 16) | ||
@@ -72,10 +73,10 @@ return header | ||
[ | ||
header.readUInt8(0), // magic | ||
header.readUInt8(1), // opcode | ||
header.readUInt16BE(2), // key length | ||
header.readUInt8(4), // extras length | ||
header.readUInt16BE(6), // status | ||
header.readUInt32BE(8), // total body length | ||
header.readUInt32BE(12), // opaque | ||
header.readUInt32BE(16) // cas | ||
header.readUInt8(0), // magic | ||
header.readUInt8(1), // opcode | ||
header.readUInt16BE(2), // key length | ||
header.readUInt8(4), // extras length | ||
header.readUInt16BE(6), // status | ||
header.readUInt32BE(8), // total body length | ||
header.readBigUint64BE(16), // cas | ||
header.readUInt32BE(12) // opaque | ||
] | ||
@@ -107,9 +108,7 @@ | ||
const buildPacket = (magic, opcode, key, value = DEFAULT_VALUE, extras = DEFAULT_EXTRAS, status = DEFAULT_STATUS, opaque = DEFAULT_OPAQUE, cas = DEFAULT_CAS) => { | ||
const buildPacket = (magic, opcode, key = DEFAULT_KEY, value = DEFAULT_VALUE, extras = DEFAULT_EXTRAS, status = DEFAULT_STATUS, cas = DEFAULT_CAS, opaque = DEFAULT_OPAQUE) => { | ||
key = Buffer.from(key) | ||
value = Buffer.from(value) | ||
return Buffer.concat([ | ||
// 0 | ||
buildHeader(magic, opcode, key.length, value.length, extras.length, status, opaque, cas), | ||
buildHeader(magic, opcode, key.length, value.length, extras.length, status, cas, opaque), | ||
// 24 | ||
@@ -125,3 +124,3 @@ extras, key, value | ||
const [magic, opcode, keyLength, extrasLength, status, totalBodyLength, opaque, cas] = | ||
const [magic, opcode, keyLength, extrasLength, status, totalBodyLength, cas, opaque] = | ||
header || parseHeader(packet.slice(0, HEADER_LENGTH)) | ||
@@ -149,7 +148,7 @@ | ||
packet.slice(keyOffset, valueOffset).toString('utf8'), // key | ||
packet.slice(valueOffset, valueOffset + valueLength).toString('utf8'), // value | ||
packet.slice(valueOffset, valueOffset + valueLength), // value | ||
packet.slice(HEADER_LENGTH, keyOffset), // extras | ||
status, // status or vbucket id | ||
opaque, // opaque | ||
cas // cas | ||
cas, // cas | ||
opaque // opaque | ||
] | ||
@@ -166,2 +165,3 @@ } | ||
RESPONSE_MAGIC, | ||
DEFAULT_KEY, | ||
DEFAULT_VALUE, | ||
@@ -168,0 +168,0 @@ DEFAULT_EXTRAS, |
'use strict' | ||
const { DEFAULT_VALUE, DEFAULT_EXTRAS, DEFAULT_STATUS } = require('./packet') | ||
const OPCODES = require('./opcodes') | ||
const { DEFAULT_KEY, DEFAULT_VALUE, DEFAULT_EXTRAS, DEFAULT_STATUS, DEFAULT_CAS, DEFAULT_OPAQUE } = require('./packet') | ||
const { OPCODES } = require('./opcodes') | ||
const { serialize, deserialize } = require('./serializer') | ||
@@ -9,23 +10,164 @@ // Commands: | ||
// GET | ||
const get = (key, opaque) => | ||
[OPCODES.get, key, DEFAULT_VALUE, DEFAULT_EXTRAS, DEFAULT_STATUS, opaque] | ||
// SET | ||
const set = (key, value, expiry = 0, flags = 0, opaque) => { | ||
// add, set, replace mutations | ||
const mutation = (opcode, key, value, expiry = 0, cas = DEFAULT_CAS, opaque = DEFAULT_OPAQUE) => { | ||
const [buffer, flags] = serialize(value) | ||
const extras = Buffer.alloc(8) | ||
extras.writeUInt32BE(flags, 0) | ||
extras.writeUInt32BE(expiry, 4) | ||
return [opcode, key, buffer, extras, DEFAULT_STATUS, cas, opaque] | ||
} | ||
return [OPCODES.set, key, value, extras, DEFAULT_STATUS, opaque] | ||
// increment and decrement | ||
const counter = (opcode, key, initial, delta, expiry = 0, opaque = DEFAULT_OPAQUE) => { | ||
const extras = Buffer.alloc(20) | ||
extras.writeBigInt64BE(delta, 0) | ||
extras.writeBigInt64BE(initial, 8) | ||
extras.writeUInt32BE(expiry, 16) | ||
return [opcode, key, DEFAULT_VALUE, extras, DEFAULT_STATUS, DEFAULT_CAS, opaque] | ||
} | ||
// DEL | ||
const del = (key, opaque) => | ||
[OPCODES.delete, key, DEFAULT_VALUE, DEFAULT_EXTRAS, DEFAULT_STATUS, opaque] | ||
// flush, touch, gat | ||
const expiring = (opcode, key, expiry, opaque = DEFAULT_OPAQUE) => { | ||
let extras = DEFAULT_EXTRAS | ||
if (expiry !== undefined) { | ||
extras = Buffer.alloc(4) | ||
extras.writeUInt32BE(expiry, 0) | ||
} | ||
return [opcode, key, DEFAULT_VALUE, extras, DEFAULT_STATUS, DEFAULT_CAS, opaque] | ||
} | ||
// creates protocol method function and extend it with format(), result(), and bykeys flag | ||
const createMethod = (method, format, result, bykeys = false) => { | ||
method.format = format | ||
method.result = result | ||
method.bykeys = bykeys | ||
return method | ||
} | ||
// get or multi get with value or [value, ...] response | ||
const get = createMethod( | ||
(key, opaque) => [OPCODES.get, key, DEFAULT_VALUE, DEFAULT_EXTRAS, DEFAULT_STATUS, DEFAULT_CAS, opaque], | ||
(packet, buffer) => buffer.push(deserialize(packet[3], packet[4].readUInt32BE(0))), | ||
(keyFlags, buffer) => keyFlags.isArray ? buffer : (buffer[0] || null) | ||
) | ||
// get or multi get with { key: value } or { key: value, ... } response | ||
const getk = createMethod( | ||
(key, opaque) => [OPCODES.getk, key, DEFAULT_VALUE, DEFAULT_EXTRAS, DEFAULT_STATUS, DEFAULT_CAS, opaque], | ||
(packet, buffer) => (buffer[packet[2]] = deserialize(packet[3], packet[4].readUInt32BE(0))), | ||
(keyFlags, buffer, keysStat) => keyFlags.isArray ? buffer : keysStat.misses ? null : buffer, | ||
true | ||
) | ||
// get or multi get with { key: cas } or { key: cas, ... } response | ||
const gets = createMethod( | ||
(key, opaque) => [OPCODES.getk, key, DEFAULT_VALUE, DEFAULT_EXTRAS, DEFAULT_STATUS, DEFAULT_CAS, opaque], | ||
(packet, buffer) => (buffer[packet[2]] = packet[6]), | ||
(keyFlags, buffer, keysStat) => keyFlags.isArray ? buffer : keysStat.misses ? null : buffer, | ||
true | ||
) | ||
// get or multi get with { key: { value, cas } } or { key: { value, cas }, ... } response | ||
const getsv = createMethod( | ||
(key, opaque) => [OPCODES.getk, key, DEFAULT_VALUE, DEFAULT_EXTRAS, DEFAULT_STATUS, DEFAULT_CAS, opaque], | ||
(packet, buffer) => (buffer[packet[2]] = { value: deserialize(packet[3], packet[4].readUInt32BE(0)), cas: packet[6] }), | ||
(keyFlags, buffer, keysStat) => keyFlags.isArray ? buffer : keysStat.misses ? null : buffer, | ||
true | ||
) | ||
// set or multi set with key, value or [key, ...], [value, ...] pairs | ||
const set = createMethod( | ||
(key, value, expiry, opaque) => mutation(OPCODES.set, key, value, expiry, DEFAULT_CAS, opaque), | ||
null, | ||
(keyFlags, buffer, keysStat) => !keysStat.misses && !keysStat.exists | ||
) | ||
const add = createMethod( | ||
(key, value, expiry, opaque) => mutation(OPCODES.add, key, value, expiry, DEFAULT_CAS, opaque), | ||
null, | ||
(keyFlags, buffer, keysStat) => !keysStat.misses && !keysStat.exists | ||
) | ||
const replace = createMethod( | ||
(key, value, expiry, opaque) => mutation(OPCODES.replace, key, value, expiry, DEFAULT_CAS, opaque), | ||
null, | ||
(keyFlags, buffer, keysStat) => !keysStat.misses && !keysStat.exists | ||
) | ||
const cas = createMethod( | ||
(key, value, expiry, cas, opaque) => mutation(OPCODES.set, key, value, expiry, cas, opaque), | ||
null, | ||
(keyFlags, buffer, keysStat) => !keysStat.misses && !keysStat.exists | ||
) | ||
const del = createMethod( | ||
(key, opaque) => [OPCODES.delete, key, DEFAULT_VALUE, DEFAULT_EXTRAS, DEFAULT_STATUS, DEFAULT_CAS, opaque], | ||
null, | ||
(keyFlags, buffer, keysStat) => !keysStat.misses && !keysStat.exists | ||
) | ||
const incr = createMethod( | ||
(key, initial, delta, expiry, opaque) => counter(OPCODES.increment, key, initial, delta, expiry, opaque), | ||
(packet, buffer) => buffer.push(packet[3].readBigInt64BE(0)), | ||
(keyFlags, buffer, keysStat) => keyFlags.isArray ? buffer : (buffer[0] || null) | ||
) | ||
const decr = createMethod( | ||
(key, initial, delta, expiry, opaque) => counter(OPCODES.decrement, key, initial, delta, expiry, opaque), | ||
(packet, buffer) => buffer.push(packet[3].readBigInt64BE(0)), | ||
(keyFlags, buffer, keysStat) => keyFlags.isArray ? buffer : (buffer[0] || null) | ||
) | ||
const quit = (opaque) => | ||
[OPCODES.quit, DEFAULT_KEY, DEFAULT_VALUE, DEFAULT_EXTRAS, DEFAULT_STATUS, DEFAULT_CAS, opaque] | ||
const flush = (expiry, opaque) => | ||
expiring(OPCODES.flush, DEFAULT_KEY, expiry, opaque) | ||
const append = (key, value, opaque) => | ||
[OPCODES.append, key, DEFAULT_VALUE, DEFAULT_EXTRAS, DEFAULT_STATUS, DEFAULT_CAS, opaque] | ||
const prepend = (key, value, opaque) => | ||
[OPCODES.prepend, key, DEFAULT_VALUE, DEFAULT_EXTRAS, DEFAULT_STATUS, DEFAULT_CAS, opaque] | ||
const gat = (key, expiry, opaque) => | ||
expiring(OPCODES.gat, key, expiry, opaque) | ||
const touch = (key, expiry, opaque) => | ||
expiring(OPCODES.touch, key, expiry, opaque) | ||
const stat = (key, opaque) => | ||
[OPCODES.stat, key, DEFAULT_VALUE, DEFAULT_EXTRAS, DEFAULT_STATUS, DEFAULT_CAS, opaque] | ||
const noop = (opaque) => | ||
[OPCODES.noop, DEFAULT_KEY, DEFAULT_VALUE, DEFAULT_EXTRAS, DEFAULT_STATUS, DEFAULT_CAS, opaque] | ||
const version = (opaque) => | ||
[OPCODES.version, DEFAULT_KEY, DEFAULT_VALUE, DEFAULT_EXTRAS, DEFAULT_STATUS, DEFAULT_CAS, opaque] | ||
const saslauth = (key, value) => | ||
[OPCODES.saslauth, key, value, DEFAULT_EXTRAS, DEFAULT_STATUS, DEFAULT_CAS, DEFAULT_OPAQUE] | ||
module.exports = { | ||
get, | ||
getk, | ||
gets, | ||
getsv, | ||
set, | ||
del | ||
add, | ||
replace, | ||
cas, | ||
del, | ||
incr, | ||
decr, | ||
quit, | ||
flush, | ||
append, | ||
prepend, | ||
gat, | ||
touch, | ||
stat, | ||
noop, | ||
version, | ||
saslauth | ||
} |
'use strict' | ||
const net = require('node:net') | ||
const protocol = require('./protocol') | ||
const { buildPacket, REQUEST_MAGIC } = require('./packet') | ||
@@ -11,8 +13,5 @@ class Server { | ||
// Server address formats: | ||
// username:password@host:port | ||
// host:port | ||
// /path/to/memcached.sock | ||
constructor (address, maxSockets = 10) { | ||
// TODO: move maxSockets to options on prev layers and validate it to be >= 1 | ||
// - [username:password@]host[:port] | ||
// - /path/to/memcached.sock | ||
constructor (address, maxSockets, timeout) { | ||
let [auth, hostname] = address.split('@') | ||
@@ -39,23 +38,59 @@ if (!hostname) { | ||
this._socketIndex = -1 | ||
this._busySockets = 0 | ||
this._maxSockets = maxSockets | ||
this._timeout = timeout | ||
} | ||
createSocket (index) { | ||
const sock = this.ipc | ||
? net.createConnection(this.host) | ||
: net.createConnection(this.port, this.host) | ||
sock.on('error', () => { | ||
this.destroySocket(sock.index) | ||
}) | ||
sock.on('timeout', () => { | ||
sock.end() | ||
this.destroySocket(sock.index) | ||
}) | ||
sock.on('end', () => { | ||
this.destroySocket(sock.index) | ||
}) | ||
sock.setTimeout(this._timeout) | ||
sock.index = index === undefined | ||
? ++this._socketIndex | ||
: index | ||
this._sockets.push(sock) | ||
this.username && this.password && | ||
sock.write(buildPacket(REQUEST_MAGIC, ...protocol.saslauth('PLAIN', Buffer.from(`\x00${this.username}\x00${this.password}`)))) | ||
return sock | ||
} | ||
destroySocket (index) { | ||
if (this._sockets[index]) { | ||
this._sockets[index].removeAllListeners() | ||
this._sockets[index].destroy() | ||
delete this._sockets[index] | ||
} | ||
} | ||
getSocket () { | ||
// create a new socket and return | ||
// create new socket and return it | ||
if (this._sockets.length < this._maxSockets) { | ||
const sock = this.ipc | ||
? net.createConnection(this.host) // TODO: use params instead of two calls | ||
: net.createConnection(this.port, this.host) | ||
// TODO: remove sock on error and decrease this._socketIndex | ||
this._socketIndex++ | ||
this._sockets.push(sock) | ||
return sock | ||
return this.createSocket() | ||
} | ||
// pick the next socket in a ring | ||
// pick the next socket in the sockets ring | ||
this._socketIndex = (this._socketIndex + 1) % this._maxSockets | ||
return this._sockets[this._socketIndex] // TODO: check for existence before return? | ||
if (!this._sockets[this._socketIndex]) { // recreate when destroyed | ||
this._sockets[this._socketIndex] = this.createSocket(this._socketIndex) | ||
} | ||
return this._sockets[this._socketIndex] | ||
} | ||
end () { | ||
this._sockets.forEach(sock => sock.end()) | ||
this._sockets = [] | ||
this._socketIndex = -1 | ||
} | ||
} | ||
module.exports = Server |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
44760
13
950
322
1