Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

iomem

Package Overview
Dependencies
Maintainers
1
Versions
32
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

iomem - npm Package Compare versions

Comparing version 0.0.1 to 0.1.0

src/keys.js

7

package.json
{
"name": "iomem",
"version": "0.0.1",
"version": "0.1.0",
"description": "Memcached client with binary protocol support and multi keys commands",

@@ -34,3 +34,5 @@ "keywords": [

"postversion": "git push --tags && yarn publish . --new-version $npm_package_version && git push && echo Successfully released version $npm_package_version!",
"cleanup": "git tag -d $(git tag) && git fetch --all --tags && git clean --force -d -x && git reset --hard origin/main && git checkout main"
"cleanup": "git tag -d $(git tag) && git fetch --all --tags && git clean --force -d -x && git reset --hard origin/main && git checkout main",
"docker:build": "docker build -t memcached-sasl .",
"docker:run": "docker run -d --name iomem-memcached-sasl -e SASL_PASSWORD=test -p 11211:11211 -it memcached-sasl"
},

@@ -40,2 +42,3 @@ "files": [

],
"main": "src/client.js",
"devDependencies": {

@@ -42,0 +45,0 @@ "eslint": "^8.31.0",

# `iomem`
**WARNING! THIS SOFTWARE IS A WORK IN PROGRESS! DO NOT USE IT!**
**WARNING! THIS SOFTWARE IS UNFINISHED! ANYTHING CAN CHANGE AT ANY MOMENT!**

@@ -32,7 +32,11 @@ Memcached client implementing binary protocol with native multiple keys support.

```js
const Memcached = require('iomem')
const iomem = new Memcached()
const Mem = require('iomem')
const iomem = new Mem()
await iomem.set('test:key', 'hello')
console.log(await iomem.get('test:key'))
const value = await iomem.get('test:key')
console.log(value)
iomem.end() // call end() when your script or web server exits
```

@@ -43,21 +47,41 @@

```js
const Memcached = require('iomem')
const iomem = new Memcached()
const Mem = require('iomem')
const iomem = new Mem()
await iomem.set(['test:key1', 'test:key2'], ['hello', 'world'])
console.log(await iomem.get(['test:key1', 'test:key2']))
// set the same value for multiple keys
await iomem.set(['test:key1', 'test:key2'], 'test')
// set different values with `key => value` object
await iomem.setk({ 'test:key1': 'hello', 'test:key2': 'world' })
// get values as an array
await iomem.get(['test:key1', 'test:key2'])
// get values as a `key => value` object
await iomem.getk(['test:key1', 'test:key2'])
// delete keys
await iomem.del(['test:key1', 'test:key2'])
...
iomem.end() // call end() when your script or web server exits
```
For more details please see [Commands](#commands) section.
### Custom servers
```js
const Memcached = require('iomem')
const iomem = new Memcached(['127.0.0.1:11211', '127.0.0.2:11211'])
const Mem = require('iomem')
const iomem = new Mem(['127.0.0.1:11211', '127.0.0.2:11211'])
...
iomem.end() // call end() when your script or web server exits
```
Address formats:
Supported address formats:
```js
// host
or
// host:port

@@ -67,10 +91,128 @@ or

or
// username:password@host
or
// /path/to/memcached.sock
```
### Commands
Please note that the library automatically performs value serialization and deserialization. Here is a list of the possible value types:
`value: string|String|Number|BigInt|Boolean|Date|Array|Buffer|Object|null`
Be aware that any `value` in the below commands list refers to a value of any type specified above.
The following data types for `key` and `expiry` are must by ensured by the library user:
`key: string` - storage key, 250 bytes max as defined in [Memcached](https://github.com/memcached/memcached/blob/master/memcached.h#L68)
`expiry: unsigned integer` - time interval in seconds, defaults to `expiry` from the client config.
For more details please [Memcached commands](https://github.com/memcached/memcached/wiki/BinaryProtocolRevamped#commands).
#### GET
`get(key): value|null` - get a value for a single key.
`get([key1, ...]): [value, ...]` - get an array of values for multiple keys.
`getk(key): {key: value}|null` - get a `key => value` object for a single key.
`getk([key1, ...]): {key: value, ...}` - get a `key => value` object for multiple keys.
`gets(key): {key: cas}|null` - get a `key => cas` object for a single key.
`gets([key1, ...]): {key: cas, ...}` - get a `key => cas` object for multiple keys.
`getsv(key): {key: {value, cas}}|null` - get a `key => { value, cas }` object for a single key.
`getsv([key1, ...]): {key: {value, cas}}, ...}` - get a `key => { value, cas }` object for multiple keys.
#### SET
Set methods return `true` when all values were successfully set. Otherwise, when client receives `0x0001` or `0x0002` [statuses ](https://github.com/memcached/memcached/wiki/BinaryProtocolRevamped#response-status) from Memcached (this is abnormal behavior for set commands), the returned value will be `false`.
`set(key, value, expiry): true|false` - set a value for a single key.
`set([key1, ...], value, expiry): true|false` - set the same value for multiple keys.
`setk({key: value, ...}, expiry): true|false` - set multiple values with `key => value` object.
#### ADD
Add commands set a key only when it is not set yet (a key does not exist in the Memcached). The methods will return `false` when at least one key was not successfully set (meaning a key was already set with some value, so it was not set with the value you provided with a command).
`add(key, value, expiry): true|false` - add a value for a single key.
`add([key1, ...], value, expiry): true|false` - add the same value for multiple keys.
`addk({key: value, ...}, expiry): true|false` - add multiple values with `key => value` object.
#### REPLACE
Replace commands set a new value for a key only when it is already set with some value (a key does exist in the Memcached). The methods will return `false` when at least one key was not successfully set (meaning a key did not exist in the Memcached when you ran a command).
`replace(key, value, expiry): true|false` - replace a value for a single key.
`replace([key1, ...], value, expiry): true|false` - replace the same value for multiple keys.
`replacek({key: value, ...}, expiry): true|false` - replace multiple values with `key => value` object.
#### CAS
The `cas` command sets a key with a new value only when `cas` parameter matches `cas` value stored in the key. To retrieve the current `cas` value for a key please see [GET](#get) commands.
`cas(key, value, cas, expiry): true|false` - set a value if the cas matches.
#### DEL
Delete commands delete a key only when it exists. The methods will return `false` when at least one key does not exist.
`del(key): true|false` - delete a key.
`del([key1, ...]): true|false` - delete multiple keys.
#### INCERMENT AND DECREMENT
Increment and decrement commands add or substract the specified `delta` value from the current counter value initialized with `initial` value. You can use `SET`, `ADD`, `REPLACE` commands to set a counter value.
`incr(key, initial, delta, expiry): value` - increments counter and returns its value.
`decr(key, initial, delta, expiry): value` - decrements coutner and returns its value.
Paramters:
`initial: BigInt` - initial counter value
`delta: BigInt` - amount to add or substruct from a counter
`expiry` - see [Commands](#commands)
If you want to get the current value from a counter without changing its value, use [GET](#get) commands and manually deserialize the response buffer.
```js
...
const FLAGS = require('iomem/src/flags')
const { deserialize } = require('iomem/src/serializer')
deserialize(await iomem.get('test:foo'), FLAGS.bigint)
...
```
#### FLUSH
`flush()` - flush cached items.
`flush(expiry)` - flush cached items in `expiry` seconds.
### Streams
#### Case #1:
Force `iomem` methods to return a stream instead of a promise by passing `stream: true` flag.
```js
const Memcached = require('iomem')
const iomem = new Memcached(['127.0.0.1:11211'], { stream: true })
const Mem = require('iomem')
const iomem = new Mem(['127.0.0.1:11211'], { stream: true })

@@ -80,2 +222,6 @@ const { pipeline, Writable } = require('node:stream')

class Echo extends Writable {
constructor (opts) {
super({ objectMode: true, ...opts })
}
_write (data, _, cb) {

@@ -87,9 +233,79 @@ console.log(data)

pipeline(iomem.get('test:a'), new Echo({ objectMode: true }), err => {
pipeline(iomem.get('test:a'), new Echo(), err => {
if (err) {
console.log(err)
console.error(err)
}
})
...
iomem.end() // call end() when your script or web server exits
```
#### Case #2:
Create a stream with special method called `stream` and supply data with readable stream. Do not care about `stream` flag.
**This is the recommended approach**
```js
const Mem = require('iomem')
const iomem = new Mem(['127.0.0.1:11211'])
const { pipeline, Readable, Writable } = require('node:stream')
class Echo extends Writable {
constructor (opts) {
super({ objectMode: true, ...opts })
}
_write (data, _, cb) {
console.log(data)
cb()
}
}
pipeline(Readable.from([Mem.get('test:a')][Symbol.iterator]()), iomem.stream(), new Echo(), err => {
if (err) {
console.error(err)
}
})
...
iomem.end() // call end() when your script or web server exits
```
#### Case #3:
Combine case #1 with readable stream to supply extra data into the stream.
```js
const Mem = require('iomem')
const iomem = new Mem(['127.0.0.1:11211'], { stream: true })
const { pipeline, Readable, Writable } = require('node:stream')
class Echo extends Writable {
constructor (opts) {
super({ objectMode: true, ...opts })
}
_write (data, _, cb) {
console.log(data)
cb()
}
}
pipeline(Readable.from([Mem.get('test:a')][Symbol.iterator]()), iomem.get('test:b'), new Echo(), err => {
if (err) {
console.error(err)
}
})
...
iomem.end() // call end() when your script or web server exits
```
## Options

@@ -99,9 +315,13 @@

{
stream: false, // set true to use streams instead of promises
stream: false, // set true to force client methods return streams instead of promises
expiry: 60 * 60 * 24 * 1, // 1 day, time interval in seconds
maxConnection: 10, // max connections per server
connectionTimeout: 1000, // connection timeout
timeout: 500, // request timeout
retries: 2 // request retries
maxConnections: 10, // max connections per server
connectionTimeout: 1000, // connection timeout in milliseconds
timeout: 500, // request timeout in milliseconds
retries: 2, // request retries - max retries
retriesDelay: 100, // request retries - initial delay
retriesFactor: 2 // request retries - exponential factor
}
```
Please take a look at [Case #2](#case-2) for a better approach before enabling `stream` flag.

@@ -5,9 +5,13 @@ 'use strict'

const DEFAULT_EXPIRY = 60 * 60 * 24 * 1 // 1 day
const DEFAULT_OPTIONS = {
stream: false, // set true to use streams instead of promises
expiry: 60 * 60 * 24 * 1, // 1 day, time interval in seconds
maxConnection: 10, // max connections per server
connectionTimeout: 1000, // connection timeout
timeout: 500, // request timeout
retries: 2 // request retries
expiry: DEFAULT_EXPIRY, // the time interval in seconds
maxConnections: 10, // max connections per server
connectionTimeout: 1000, // connection timeout in milliseconds
timeout: 500, // request timeout in milliseconds
retries: 2, // request retries - max retries
retriesDelay: 100, // request retries - initial delay
retriesFactor: 2 // request retries - exponential factor
}

@@ -19,24 +23,133 @@

this._net = new Net(servers, this._options)
this._opaque = 0
}
set (key, value, expiry) {
return this.query('set', key, value, expiry || this._options.expiry, 0)
static get (key) {
return ['get', key]
}
static getk (key) {
return ['getk', key]
}
static gets (key) {
return ['gets', key]
}
static getsv (key) {
return ['getsv', key]
}
static set (key, value, expiry = DEFAULT_EXPIRY) {
return ['set', key, value, expiry]
}
static setk (key, expiry = DEFAULT_EXPIRY) {
return ['set', key, expiry]
}
static add (key, value, expiry = DEFAULT_EXPIRY) {
return ['add', key, value, expiry]
}
static addk (key, expiry = DEFAULT_EXPIRY) {
return ['add', key, expiry]
}
static replace (key, value, expiry = DEFAULT_EXPIRY) {
return ['replace', key, value, expiry]
}
static replacek (key, expiry = DEFAULT_EXPIRY) {
return ['replace', key, expiry]
}
static cas (key, value, cas, expiry = DEFAULT_EXPIRY) {
return ['cas', key, value, expiry, cas]
}
static del (key) {
return ['del', key]
}
static incr (key, initial, delta, expiry = DEFAULT_EXPIRY) {
return ['incr', key, initial, delta, expiry]
}
static decr (key, initial, delta, expiry = DEFAULT_EXPIRY) {
return ['decr', key, initial, delta, expiry]
}
static flush (expiry) {
return ['flush', expiry]
}
get (key) {
return this.query('get', key)
return this._net.query(Client.get(key))
}
getk (key) {
return this._net.query(Client.getk(key))
}
gets (key) {
return this._net.query(Client.gets(key))
}
getsv (key) {
return this._net.query(Client.getsv(key))
}
set (key, value, expiry) {
return this._net.query(Client.set(key, value, expiry || this._options.expiry))
}
setk (key, expiry) {
return this._net.query(Client.setk(key, expiry || this._options.expiry))
}
add (key, value, expiry) {
return this._net.query(Client.add(key, value, expiry || this._options.expiry))
}
addk (key, expiry) {
return this._net.query(Client.addk(key, expiry || this._options.expiry))
}
replace (key, value, expiry) {
return this._net.query(Client.replace(key, value, expiry || this._options.expiry))
}
replacek (key, expiry) {
return this._net.query(Client.replacek(key, expiry || this._options.expiry))
}
cas (key, value, cas, expiry) {
return this._net.query(Client.cas(key, value, cas, expiry || this._options.expiry))
}
del (key) {
return this.query('del', key)
return this._net.query(Client.del(key))
}
query (...args) {
this._opaque++
this._opaque = this._opaque % 0xffffffff
return this._net.query(...args, this._opaque)
incr (key, initial, delta, expiry) {
return this._net.query(Client.incr(key, initial, delta, expiry || this._options.expiry))
}
decr (key, initial, delta, expiry) {
return this._net.query(Client.decr(key, initial, delta, expiry || this._options.expiry))
}
flush (expiry) {
return this._net.query(Client.flush(expiry))
}
stream () {
return this._net.query()
}
end () {
this._net.end()
}
}
module.exports = Client
'use strict'
// Flags:
// https://github.com/memcached/memcached/blob/master/memcached.h#L546
// The flags are client specific.
// We use them to encode/decode stored values by type.
/* eslint-disable key-spacing */
const FLAGS = {
ITEM_LINKED: 1,
ITEM_CAS: 2,
/* temp */
ITEM_SLABBED: 4,
/* Item was fetched at least once in its lifetime */
ITEM_FETCHED: 8,
/* Appended on fetch, removed on LRU shuffling */
ITEM_ACTIVE: 16,
/* If an item's storage are chained chunks. */
ITEM_CHUNKED: 32,
ITEM_CHUNK: 64,
/* ITEM_data bulk is external to item */
ITEM_HDR: 128,
/* additional 4 bytes for item client flags */
ITEM_CFLAGS: 256,
/* item has sent out a token already */
ITEM_TOKEN_SENT: 512,
/* reserved, in case tokens should be a 2-bit count in future */
ITEM_TOKEN_RESERVED: 1024,
/* if item has been marked as a stale value */
ITEM_STALE: 2048,
/* if item key was sent in binary */
ITEM_KEY_BINARY: 4096
// Types
// We don't care about number, boolean, null. They will be handled as object type by JSON.stringify
// Function, Symbol, and undefined are not stringifiable.
string: 1 << 0, // we keep it here to not have "" wrappers by JSON.stringify
bigint: 1 << 1, // we keep it here b/c JSON.stringify does not support BigInt
object: 1 << 2, // this is the JSON.stringify product, the default serialization method
// Instances
Buffer: 1 << 3, // buffers are handled as they are
Date: 1 << 4, // date is a common type that we want to handle
String: 1 << 5, // string object must remain a string object
// Compression flag
compressed: 0b10000000000000000000000000000000
}
module.exports = FLAGS

@@ -6,6 +6,9 @@ 'use strict'

const HashRing = require('hashring')
const Server = require('./server')
const protocol = require('./protocol')
const { getQuietOpcodeByName } = require('./opcodes')
const { buildPacket, parsePacket, parseHeader, HEADER_LENGTH, REQUEST_MAGIC, RESPONSE_MAGIC } = require('./packet')
const { STATUS_MESSAGE_MAP, STATUS_MESSAGE_UNKOWN, STATUS_SUCCESS, STATUS_NOT_FOUND, STATUS_EXISTS } = require('./statuses')
const { getKeyFlags } = require('./keys')

@@ -19,20 +22,44 @@ const HASHRING_ALGORITHM = 'md5'

constructor (options = {}) {
const { getKeysByServer, ...opts } = options
const { getKeysSetByServer, getKeysMapByServer, getKeysSetByAllServers, config = {}, ...opts } = options
super({ objectMode: true, ...opts })
this.getKeysByServer = getKeysByServer
this.getKeysSetByServer = getKeysSetByServer
this.getKeysMapByServer = getKeysMapByServer
this.getKeysSetByAllServers = getKeysSetByAllServers
this.config = config
}
_transform (data, _, cb) {
_try (data, cb) {
const [method, key, ...args] = data
const multikey = Array.isArray(key)
const opaque = args[args.length - 1]
const keysByServer = this.getKeysByServer(multikey ? key : [key])
const keyFlags = getKeyFlags(key)
const quietOpcode = getQuietOpcodeByName(method)
const keysByServer = keyFlags.isEmpty
? this.getKeysSetByAllServers(key)
: keyFlags.isObject
? this.getKeysMapByServer(key)
: this.getKeysSetByServer(keyFlags.isArray ? key : [key])
const buffer = []
const buffer = protocol[method].bykeys ? {} : []
let serversHit = 0 // server got hit when we received a response with the last opaque sent to the server
const keysStat = { length: 0, exists: 0, misses: 0 }
keysByServer.forEach((keys, server) => {
// build request packet
const opaques = new Set()
let lastOpaque
let packet = Buffer.alloc(0)
keys.forEach(key =>
(packet = Buffer.concat([packet, buildPacket(REQUEST_MAGIC, ...protocol[method](key, ...args))])))
// process keys
let counter = 0
keys.forEach((value, key) => {
lastOpaque = Net.opaque()
opaques.add(lastOpaque)
const params = keyFlags.isObject
? protocol[method](key, value, ...args, lastOpaque)
: protocol[method](key, ...args, lastOpaque)
if (keyFlags.isMultikey && quietOpcode && keys.size !== ++counter) {
params[0] = quietOpcode
}
packet = Buffer.concat([packet, buildPacket(REQUEST_MAGIC, ...params)])
keysStat.length++
})
// get socket from server

@@ -48,4 +75,4 @@ const sock = server.getSocket()

sock.unpipe(pass)
cb(new Error(`iomem: request timeout (${1000})`))
}, 1000)
cb(new Error(`iomem: request timeout (${this.config.timeout})`))
}, this.config.timeout) // maybe use connectionTimeout when sock.readyState === 'opening' || sock.readyState === 'closed'?

@@ -62,3 +89,4 @@ const done = (err, data) => {

chunks = Buffer.concat([chunks, chunk])
while (chunks.length >= HEADER_LENGTH) {
let error
while (chunks.length >= HEADER_LENGTH && !error) {
const header = parseHeader(chunks.slice(0, HEADER_LENGTH))

@@ -71,5 +99,18 @@ if (header[0] !== RESPONSE_MAGIC) { // wrong magic

const packet = parsePacket(chunks.slice(0, packetSize), header)
// TODO: check status
if (packet && packet[6] === opaque) { // check packet and opaque
buffer.push(packet)
if (packet) { // check packet
const opaque = packet[packet.length - 1]
serversHit += Number(opaque === lastOpaque)
if (opaques.has(opaque)) { // check opaque
if (packet[5] === STATUS_SUCCESS) { // success
if (protocol[method].format) {
protocol[method].format(packet, buffer)
}
} else if (packet[5] === STATUS_EXISTS) { // exists
keysStat.exists++
} else if (packet[5] === STATUS_NOT_FOUND) { // not found
keysStat.misses++
} else {
error = new Error(`iomem: response error: ${STATUS_MESSAGE_MAP[packet[5]] || `${STATUS_MESSAGE_UNKOWN} (${packet[5]})`}`)
}
}
}

@@ -81,15 +122,13 @@ chunks = chunks.slice(packetSize)

}
if (buffer.length >= keysByServer.size) { // this will not work for multi-get with getkq and getk
done(null, multikey ? buffer : buffer[0])
if (error) {
done(error)
} else if (serversHit === keysByServer.size) {
done(null, protocol[method].result ? protocol[method].result(keyFlags, buffer, keysStat) : null)
}
})
// socket disconnected
// socket end
pass.on('end', () => {
console.log('>>>>end')
done()
// console.log('???????end', d, sock.read())
// this.push(sock.read())
// counter++
}) // TODO: check opaque and have timeout
done(new Error('iomem: socket closed unexpectedly'))
})

@@ -106,14 +145,35 @@ // socket error

}
_transform (data, _, cb) {
const retry = (retries, ms) => {
this._try(data, (err, data) => {
if (err && retries > 0) {
setTimeout(() => {
retry(retries - 1, ms * this.config.retriesFactor)
}, ms)
} else {
cb(err, data)
}
})
}
retry(this.config.retries, this.config.retriesDelay)
}
}
class Net {
static _opaque = 0
static opaque () {
Net._opaque = ++Net._opaque % 0xffffffff
return Net._opaque
}
constructor (servers = [DEFAULT_ADDRESS], options = {}) {
this._options = options
this._servers = new Map()
// this._sockets =
this._opaque = 0
servers.forEach(address => {
const server = new Server(address)
this._servers.set(server.hostname, new Server(address))
const server = new Server(address, this._options.maxConnections, this._options.connectionTimeout)
this._servers.set(server.hostname, server)
})

@@ -127,3 +187,3 @@

getKeysByServer = keys => {
getKeysSetByServer = keys => {
let server

@@ -143,17 +203,44 @@ return keys.reduce((map, key) => {

query (method, key, ...args) {
getKeysMapByServer = keys => {
let server
const map = new Map()
for (const key in keys) {
server = this._servers.length === 1
? this._servers.values().next().value
: this._servers.get(this._ring.get(key))
if (map.has(server)) {
map.get(server).set(key, keys[key])
} else {
map.set(server, new Map([[key, keys[key]]]))
}
}
return map
}
getKeysSetByAllServers = (key) => {
const map = new Map()
this._servers.forEach(server => map.set(server, new Set([key])))
return map
}
query (args = []) {
const net = new NetStream({
getKeysByServer: this.getKeysByServer
getKeysSetByServer: this.getKeysSetByServer,
getKeysMapByServer: this.getKeysMapByServer,
getKeysSetByAllServers: this.getKeysSetByAllServers,
config: this._options
})
if (method && key) {
const pass = new PassThrough({ objectMode: true })
let pass
if (args[0]) {
pass = new PassThrough({ objectMode: true })
pass.pipe(net)
pass.end([method, key, ...args])
}
if (this._options.stream) {
if (this._options.stream || !args[0]) {
pass && pass.write(args)
return net
}
pass && pass.end(args)
return new Promise((resolve, reject) => {

@@ -165,4 +252,8 @@ net.on('data', resolve)

}
end () {
this._servers.forEach(server => server.end())
}
}
module.exports = Net

@@ -5,4 +5,6 @@ 'use strict'

// https://github.com/memcached/memcached/wiki/BinaryProtocolRevamped#command-opcodes
module.exports = {
const OPCODES = {
// Opcodes having a quiet version
get: 0x00,
getk: 0x0c,
set: 0x01,

@@ -14,11 +16,58 @@ add: 0x02,

decrement: 0x06,
quit: 0x07,
flush: 0x08,
noop: 0x0a,
getk: 0x0c,
append: 0x0e,
prepend: 0x0f,
gat: 0x1d,
// Quiet version of the above opcodes
getq: 0x09,
getkq: 0x0d,
stat: 0x10,
setq: 0x11,
addq: 0x12,
sasl_list: 0x20,
sasl_auth: 0x21
replaceq: 0x13,
deleteq: 0x14,
incrementq: 0x15,
decrementq: 0x16,
quitq: 0x17,
flushq: 0x18,
appendq: 0x19,
prependq: 0x1a,
gatq: 0x1e,
// Opcodes not having a quiet equivalent
stat: 0x10,
noop: 0x0a,
version: 0x0b,
touch: 0x1c,
// SASL:
// https://github.com/memcached/memcached/wiki/SASLHowto
// https://github.com/memcached/memcached/wiki/SASLAuthProtocol
// https://en.wikipedia.org/wiki/Simple_Authentication_and_Security_Layer
saslauth: 0x21
}
// Opcodes to quiet version opcodes map
/* eslint-disable no-multi-spaces */
const OPCODES_QUIET_MAP = new Map([
[OPCODES.get, OPCODES.getq],
[OPCODES.getk, OPCODES.getkq],
[OPCODES.set, OPCODES.setq],
[OPCODES.add, OPCODES.addq],
[OPCODES.replace, OPCODES.replaceq],
[OPCODES.delete, OPCODES.deleteq],
[OPCODES.increment, OPCODES.incrementq],
[OPCODES.decrement, OPCODES.decrementq],
[OPCODES.quit, OPCODES.quitq],
[OPCODES.flush, OPCODES.flushq],
[OPCODES.append, OPCODES.appendq],
[OPCODES.prepend, OPCODES.prependq],
[OPCODES.gat, OPCODES.gatq]
])
module.exports = {
OPCODES,
getQuietOpcode: opcode => OPCODES_QUIET_MAP.get(opcode),
getQuietOpcodeByName: name => OPCODES_QUIET_MAP.get(OPCODES[name])
}

@@ -8,6 +8,7 @@ 'use strict'

const DEFAULT_VALUE = ''
const DEFAULT_KEY = ''
const DEFAULT_VALUE = Buffer.alloc(0)
const DEFAULT_EXTRAS = Buffer.alloc(0)
const DEFAULT_OPAQUE = 0x00
const DEFAULT_CAS = 0x00
const DEFAULT_CAS = BigInt(0x00)
const DEFAULT_DATA_TYPE = 0x00 // Reserved for future use https://github.com/memcached/memcached/wiki/BinaryProtocolRevamped#data-types

@@ -43,3 +44,3 @@ const DEFAULT_STATUS = 0x00

const buildHeader = (magic, opcode, keyLength, valueLength, extrasLength, status, opaque, cas) => {
const buildHeader = (magic, opcode, keyLength, valueLength, extrasLength, status, cas, opaque) => {
const header = Buffer.alloc(24)

@@ -64,3 +65,3 @@

// 16
header.writeUInt32BE(cas, 16)
header.writeBigInt64BE(cas, 16)

@@ -72,10 +73,10 @@ return header

[
header.readUInt8(0), // magic
header.readUInt8(1), // opcode
header.readUInt16BE(2), // key length
header.readUInt8(4), // extras length
header.readUInt16BE(6), // status
header.readUInt32BE(8), // total body length
header.readUInt32BE(12), // opaque
header.readUInt32BE(16) // cas
header.readUInt8(0), // magic
header.readUInt8(1), // opcode
header.readUInt16BE(2), // key length
header.readUInt8(4), // extras length
header.readUInt16BE(6), // status
header.readUInt32BE(8), // total body length
header.readBigUint64BE(16), // cas
header.readUInt32BE(12) // opaque
]

@@ -107,9 +108,7 @@

const buildPacket = (magic, opcode, key, value = DEFAULT_VALUE, extras = DEFAULT_EXTRAS, status = DEFAULT_STATUS, opaque = DEFAULT_OPAQUE, cas = DEFAULT_CAS) => {
const buildPacket = (magic, opcode, key = DEFAULT_KEY, value = DEFAULT_VALUE, extras = DEFAULT_EXTRAS, status = DEFAULT_STATUS, cas = DEFAULT_CAS, opaque = DEFAULT_OPAQUE) => {
key = Buffer.from(key)
value = Buffer.from(value)
return Buffer.concat([
// 0
buildHeader(magic, opcode, key.length, value.length, extras.length, status, opaque, cas),
buildHeader(magic, opcode, key.length, value.length, extras.length, status, cas, opaque),
// 24

@@ -125,3 +124,3 @@ extras, key, value

const [magic, opcode, keyLength, extrasLength, status, totalBodyLength, opaque, cas] =
const [magic, opcode, keyLength, extrasLength, status, totalBodyLength, cas, opaque] =
header || parseHeader(packet.slice(0, HEADER_LENGTH))

@@ -149,7 +148,7 @@

packet.slice(keyOffset, valueOffset).toString('utf8'), // key
packet.slice(valueOffset, valueOffset + valueLength).toString('utf8'), // value
packet.slice(valueOffset, valueOffset + valueLength), // value
packet.slice(HEADER_LENGTH, keyOffset), // extras
status, // status or vbucket id
opaque, // opaque
cas // cas
cas, // cas
opaque // opaque
]

@@ -166,2 +165,3 @@ }

RESPONSE_MAGIC,
DEFAULT_KEY,
DEFAULT_VALUE,

@@ -168,0 +168,0 @@ DEFAULT_EXTRAS,

'use strict'
const { DEFAULT_VALUE, DEFAULT_EXTRAS, DEFAULT_STATUS } = require('./packet')
const OPCODES = require('./opcodes')
const { DEFAULT_KEY, DEFAULT_VALUE, DEFAULT_EXTRAS, DEFAULT_STATUS, DEFAULT_CAS, DEFAULT_OPAQUE } = require('./packet')
const { OPCODES } = require('./opcodes')
const { serialize, deserialize } = require('./serializer')

@@ -9,23 +10,164 @@ // Commands:

// GET
const get = (key, opaque) =>
[OPCODES.get, key, DEFAULT_VALUE, DEFAULT_EXTRAS, DEFAULT_STATUS, opaque]
// SET
const set = (key, value, expiry = 0, flags = 0, opaque) => {
// add, set, replace mutations
const mutation = (opcode, key, value, expiry = 0, cas = DEFAULT_CAS, opaque = DEFAULT_OPAQUE) => {
const [buffer, flags] = serialize(value)
const extras = Buffer.alloc(8)
extras.writeUInt32BE(flags, 0)
extras.writeUInt32BE(expiry, 4)
return [opcode, key, buffer, extras, DEFAULT_STATUS, cas, opaque]
}
return [OPCODES.set, key, value, extras, DEFAULT_STATUS, opaque]
// increment and decrement
const counter = (opcode, key, initial, delta, expiry = 0, opaque = DEFAULT_OPAQUE) => {
const extras = Buffer.alloc(20)
extras.writeBigInt64BE(delta, 0)
extras.writeBigInt64BE(initial, 8)
extras.writeUInt32BE(expiry, 16)
return [opcode, key, DEFAULT_VALUE, extras, DEFAULT_STATUS, DEFAULT_CAS, opaque]
}
// DEL
const del = (key, opaque) =>
[OPCODES.delete, key, DEFAULT_VALUE, DEFAULT_EXTRAS, DEFAULT_STATUS, opaque]
// flush, touch, gat
const expiring = (opcode, key, expiry, opaque = DEFAULT_OPAQUE) => {
let extras = DEFAULT_EXTRAS
if (expiry !== undefined) {
extras = Buffer.alloc(4)
extras.writeUInt32BE(expiry, 0)
}
return [opcode, key, DEFAULT_VALUE, extras, DEFAULT_STATUS, DEFAULT_CAS, opaque]
}
// creates protocol method function and extend it with format(), result(), and bykeys flag
const createMethod = (method, format, result, bykeys = false) => {
method.format = format
method.result = result
method.bykeys = bykeys
return method
}
// get or multi get with value or [value, ...] response
const get = createMethod(
(key, opaque) => [OPCODES.get, key, DEFAULT_VALUE, DEFAULT_EXTRAS, DEFAULT_STATUS, DEFAULT_CAS, opaque],
(packet, buffer) => buffer.push(deserialize(packet[3], packet[4].readUInt32BE(0))),
(keyFlags, buffer) => keyFlags.isArray ? buffer : (buffer[0] || null)
)
// get or multi get with { key: value } or { key: value, ... } response
const getk = createMethod(
(key, opaque) => [OPCODES.getk, key, DEFAULT_VALUE, DEFAULT_EXTRAS, DEFAULT_STATUS, DEFAULT_CAS, opaque],
(packet, buffer) => (buffer[packet[2]] = deserialize(packet[3], packet[4].readUInt32BE(0))),
(keyFlags, buffer, keysStat) => keyFlags.isArray ? buffer : keysStat.misses ? null : buffer,
true
)
// get or multi get with { key: cas } or { key: cas, ... } response
const gets = createMethod(
(key, opaque) => [OPCODES.getk, key, DEFAULT_VALUE, DEFAULT_EXTRAS, DEFAULT_STATUS, DEFAULT_CAS, opaque],
(packet, buffer) => (buffer[packet[2]] = packet[6]),
(keyFlags, buffer, keysStat) => keyFlags.isArray ? buffer : keysStat.misses ? null : buffer,
true
)
// get or multi get with { key: { value, cas } } or { key: { value, cas }, ... } response
const getsv = createMethod(
(key, opaque) => [OPCODES.getk, key, DEFAULT_VALUE, DEFAULT_EXTRAS, DEFAULT_STATUS, DEFAULT_CAS, opaque],
(packet, buffer) => (buffer[packet[2]] = { value: deserialize(packet[3], packet[4].readUInt32BE(0)), cas: packet[6] }),
(keyFlags, buffer, keysStat) => keyFlags.isArray ? buffer : keysStat.misses ? null : buffer,
true
)
// set or multi set with key, value or [key, ...], [value, ...] pairs
const set = createMethod(
(key, value, expiry, opaque) => mutation(OPCODES.set, key, value, expiry, DEFAULT_CAS, opaque),
null,
(keyFlags, buffer, keysStat) => !keysStat.misses && !keysStat.exists
)
const add = createMethod(
(key, value, expiry, opaque) => mutation(OPCODES.add, key, value, expiry, DEFAULT_CAS, opaque),
null,
(keyFlags, buffer, keysStat) => !keysStat.misses && !keysStat.exists
)
const replace = createMethod(
(key, value, expiry, opaque) => mutation(OPCODES.replace, key, value, expiry, DEFAULT_CAS, opaque),
null,
(keyFlags, buffer, keysStat) => !keysStat.misses && !keysStat.exists
)
const cas = createMethod(
(key, value, expiry, cas, opaque) => mutation(OPCODES.set, key, value, expiry, cas, opaque),
null,
(keyFlags, buffer, keysStat) => !keysStat.misses && !keysStat.exists
)
const del = createMethod(
(key, opaque) => [OPCODES.delete, key, DEFAULT_VALUE, DEFAULT_EXTRAS, DEFAULT_STATUS, DEFAULT_CAS, opaque],
null,
(keyFlags, buffer, keysStat) => !keysStat.misses && !keysStat.exists
)
const incr = createMethod(
(key, initial, delta, expiry, opaque) => counter(OPCODES.increment, key, initial, delta, expiry, opaque),
(packet, buffer) => buffer.push(packet[3].readBigInt64BE(0)),
(keyFlags, buffer, keysStat) => keyFlags.isArray ? buffer : (buffer[0] || null)
)
const decr = createMethod(
(key, initial, delta, expiry, opaque) => counter(OPCODES.decrement, key, initial, delta, expiry, opaque),
(packet, buffer) => buffer.push(packet[3].readBigInt64BE(0)),
(keyFlags, buffer, keysStat) => keyFlags.isArray ? buffer : (buffer[0] || null)
)
const quit = (opaque) =>
[OPCODES.quit, DEFAULT_KEY, DEFAULT_VALUE, DEFAULT_EXTRAS, DEFAULT_STATUS, DEFAULT_CAS, opaque]
const flush = (expiry, opaque) =>
expiring(OPCODES.flush, DEFAULT_KEY, expiry, opaque)
const append = (key, value, opaque) =>
[OPCODES.append, key, DEFAULT_VALUE, DEFAULT_EXTRAS, DEFAULT_STATUS, DEFAULT_CAS, opaque]
const prepend = (key, value, opaque) =>
[OPCODES.prepend, key, DEFAULT_VALUE, DEFAULT_EXTRAS, DEFAULT_STATUS, DEFAULT_CAS, opaque]
const gat = (key, expiry, opaque) =>
expiring(OPCODES.gat, key, expiry, opaque)
const touch = (key, expiry, opaque) =>
expiring(OPCODES.touch, key, expiry, opaque)
const stat = (key, opaque) =>
[OPCODES.stat, key, DEFAULT_VALUE, DEFAULT_EXTRAS, DEFAULT_STATUS, DEFAULT_CAS, opaque]
const noop = (opaque) =>
[OPCODES.noop, DEFAULT_KEY, DEFAULT_VALUE, DEFAULT_EXTRAS, DEFAULT_STATUS, DEFAULT_CAS, opaque]
const version = (opaque) =>
[OPCODES.version, DEFAULT_KEY, DEFAULT_VALUE, DEFAULT_EXTRAS, DEFAULT_STATUS, DEFAULT_CAS, opaque]
const saslauth = (key, value) =>
[OPCODES.saslauth, key, value, DEFAULT_EXTRAS, DEFAULT_STATUS, DEFAULT_CAS, DEFAULT_OPAQUE]
module.exports = {
get,
getk,
gets,
getsv,
set,
del
add,
replace,
cas,
del,
incr,
decr,
quit,
flush,
append,
prepend,
gat,
touch,
stat,
noop,
version,
saslauth
}
'use strict'
const net = require('node:net')
const protocol = require('./protocol')
const { buildPacket, REQUEST_MAGIC } = require('./packet')

@@ -11,8 +13,5 @@ class Server {

// Server address formats:
// username:password@host:port
// host:port
// /path/to/memcached.sock
constructor (address, maxSockets = 10) {
// TODO: move maxSockets to options on prev layers and validate it to be >= 1
// - [username:password@]host[:port]
// - /path/to/memcached.sock
constructor (address, maxSockets, timeout) {
let [auth, hostname] = address.split('@')

@@ -39,23 +38,59 @@ if (!hostname) {

this._socketIndex = -1
this._busySockets = 0
this._maxSockets = maxSockets
this._timeout = timeout
}
createSocket (index) {
const sock = this.ipc
? net.createConnection(this.host)
: net.createConnection(this.port, this.host)
sock.on('error', () => {
this.destroySocket(sock.index)
})
sock.on('timeout', () => {
sock.end()
this.destroySocket(sock.index)
})
sock.on('end', () => {
this.destroySocket(sock.index)
})
sock.setTimeout(this._timeout)
sock.index = index === undefined
? ++this._socketIndex
: index
this._sockets.push(sock)
this.username && this.password &&
sock.write(buildPacket(REQUEST_MAGIC, ...protocol.saslauth('PLAIN', Buffer.from(`\x00${this.username}\x00${this.password}`))))
return sock
}
destroySocket (index) {
if (this._sockets[index]) {
this._sockets[index].removeAllListeners()
this._sockets[index].destroy()
delete this._sockets[index]
}
}
getSocket () {
// create a new socket and return
// create new socket and return it
if (this._sockets.length < this._maxSockets) {
const sock = this.ipc
? net.createConnection(this.host) // TODO: use params instead of two calls
: net.createConnection(this.port, this.host)
// TODO: remove sock on error and decrease this._socketIndex
this._socketIndex++
this._sockets.push(sock)
return sock
return this.createSocket()
}
// pick the next socket in a ring
// pick the next socket in the sockets ring
this._socketIndex = (this._socketIndex + 1) % this._maxSockets
return this._sockets[this._socketIndex] // TODO: check for existence before return?
if (!this._sockets[this._socketIndex]) { // recreate when destroyed
this._sockets[this._socketIndex] = this.createSocket(this._socketIndex)
}
return this._sockets[this._socketIndex]
}
end () {
this._sockets.forEach(sock => sock.end())
this._sockets = []
this._socketIndex = -1
}
}
module.exports = Server
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc