Socket
Socket
Sign inDemoInstall

multines

Package Overview
Dependencies
62
Maintainers
1
Versions
7
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 0.2.1 to 1.0.0

18

examples/a.js
'use strict'
const Hapi = require('hapi')
const routes = require('./routes')
const server = new Hapi.Server()
server.connection({ port: 3000 })
async function init () {
const server = new Hapi.Server({ port: 3000 })
require('./routes')(server)
await routes(server)
await server.start()
console.log('[a] server started')
}
server.start((err) => {
if (err) {
console.error(err.message)
process.exit(1)
}
console.log('server started', server.info.uri)
})
init()
'use strict'
const Hapi = require('hapi')
const routes = require('./routes')
const server = new Hapi.Server()
server.connection({ port: 3001 })
async function init () {
const server = new Hapi.Server({ port: 3001 })
require('./routes')(server)
await routes(server)
await server.start()
console.log('[b] server started')
}
server.start((err) => {
if (err) {
console.error(err.message)
process.exit(1)
}
console.log('server started', server.info.uri)
})
init()

@@ -5,24 +5,16 @@ 'use strict'

const client = new Nes.Client('ws://localhost:3001')
async function pub () {
const client = new Nes.Client('ws://localhost:3001')
await client.connect()
client.connect((err) => {
if (err) {
console.error(err.message)
process.exit(1)
}
console.log('connected')
client.request({
console.log('pub connected')
await client.request({
path: '/echo',
method: 'POST',
payload: { hello: 'world' }
}, (err) => {
if (err) {
console.error(err.message)
process.exit(1)
}
})
client.disconnect()
})
})
await client.disconnect()
}
pub()

@@ -6,5 +6,8 @@ 'use strict'

module.exports = (server) => {
module.exports = async (server) => {
const plugin = {
register: Multines.register,
plugin: {
name: 'multines',
register: Multines.register
},
options: {

@@ -15,7 +18,3 @@ type: 'redis'

server.register([Nes, plugin], (err) => {
if (err) {
throw err
}
})
await server.register([Nes, plugin])

@@ -26,7 +25,10 @@ server.subscriptionFar('/echo')

method: 'POST',
handler: (req, reply) => {
handler: async (req, h) => {
server.publishFar('/echo', req.payload)
reply(req.payload)
return req.payload
}
})
return server
}

@@ -5,22 +5,14 @@ 'use strict'

const client = new Nes.Client('ws://localhost:3000')
async function sub () {
const client = new Nes.Client('ws://localhost:3000')
await client.connect()
client.connect((err) => {
if (err) {
console.error(err.message)
process.exit(1)
}
console.log('connected')
console.log('sub connected')
client.subscribe('/echo', (message) => {
console.log(message)
}, (err) => {
if (err) {
console.error(err.message)
process.exit(1)
}
})
console.log('subscribed')
})
})
console.log('subscribed')
}
sub()
'use strict'
const util = require('util')
const mqemitter = require('mqemitter')

@@ -9,5 +10,13 @@ const redis = require('mqemitter-redis')

function register (server, options, next) {
server.dependency('nes')
function buildDeliver (socket, topic) {
return async function deliver (message, done) {
if (topic === message.topic) {
await socket.publish('/' + topic, message.body)
} else {
await socket.publish('/' + topic, message)
}
}
}
function getMq (options) {
let mq

@@ -23,23 +32,26 @@

break
default:
default: {
mq = options.mq || mqemitter(options)
}
}
function buildDeliver (socket, topic) {
return function deliver (message, done) {
if (topic === message.topic) {
socket.publish('/' + topic, message.body, done)
} else {
socket.publish('/' + topic, message, done)
}
}
return {
removeListener: util.promisify(mq.removeListener.bind(mq)),
on: util.promisify(mq.on.bind(mq)),
emit: util.promisify(mq.emit.bind(mq)),
close: util.promisify(mq.close.bind(mq))
}
}
async function register (server, options) {
server.dependency('nes')
const mq = getMq(options)
server.decorate('server', 'subscriptionFar', (path, options) => {
options = options || {}
const wrapSubscribe = options.onSubscribe || ((socket, path, params, next) => next())
const wrapUnsubscribe = options.onUnsubscribe || ((socket, path, params, next) => next())
const wrapSubscribe = options.onSubscribe || (async (socket, path, params) => null)
const wrapUnsubscribe = options.onUnsubscribe || (async (socket, path, params) => null)
options.onSubscribe = (socket, path, params, next) => {
options.onSubscribe = async (socket, path, params) => {
const deliverMap = socket[kDeliver] || {}

@@ -54,29 +66,17 @@ socket[kDeliver] = deliverMap

wrapSubscribe(socket, path, params, (err) => {
if (err) {
return next(err)
}
mq.on(topic, deliverMap[path], next)
})
await wrapSubscribe(socket, path, params)
await mq.on(topic, deliverMap[path])
}
options.onUnsubscribe = (socket, path, params, next) => {
wrapUnsubscribe(socket, path, params, (err) => {
if (err) {
return next(err)
}
options.onUnsubscribe = async (socket, path, params) => {
await wrapUnsubscribe(socket, path, params)
const deliverMap = socket[kDeliver] || {}
socket[kDeliver] = deliverMap
const deliverMap = socket[kDeliver] || {}
socket[kDeliver] = deliverMap
if (!deliverMap[path]) {
next()
return
}
if (!deliverMap[path]) {
return
}
mq.removeListener(path.replace(/^\//, ''), deliverMap[path], function () {
setImmediate(next)
})
})
await mq.removeListener(path.replace(/^\//, ''), deliverMap[path])
}

@@ -87,6 +87,6 @@

server.decorate('server', 'publishFar', (path, body) => {
server.decorate('server', 'publishFar', async (path, body) => {
options = options || {}
mq.emit({
await mq.emit({
topic: path.replace(/^\//, ''), // the first is always a '/'

@@ -97,12 +97,5 @@ body

server.ext({
type: 'onPostStop',
method: function (event, done) {
mq.close(function () {
setImmediate(done)
})
}
server.ext('onPostStop', async () => {
await mq.close()
})
next()
}

@@ -109,0 +102,0 @@

{
"name": "multines",
"version": "0.2.1",
"version": "1.0.0",
"description": "Multi-process nes backend, turn nes into a fully scalable solution",

@@ -30,14 +30,14 @@ "main": "multines.js",

"devDependencies": {
"code": "^4.0.0",
"hapi": "^16.4.0",
"lab": "^13.1.0",
"nes": "^6.4.2",
"code": "^5.2.0",
"hapi": "^17.5.0",
"lab": "^15.4.5",
"nes": "^8.1.0",
"pre-commit": "^1.1.2",
"standard": "^10.0.0"
"standard": "^11.0.0"
},
"dependencies": {
"mqemitter": "^2.0.0",
"mqemitter": "^2.2.0",
"mqemitter-mongodb": "^3.0.2",
"mqemitter-redis": "^2.1.0"
"mqemitter-redis": "^2.3.0"
}
}

@@ -12,2 +12,4 @@ # multines  [![Build Status](https://travis-ci.org/mcollina/multines.svg)](https://travis-ci.org/mcollina/multines)

**Important note:** this library needs nodejs 8 or greater.
## Install

@@ -62,1 +64,2 @@

[mqmongo]: https://github.com/mcollina/mqemitter-mongodb

@@ -19,26 +19,20 @@ 'use strict'

port = port || 4000
const server = new Hapi.Server()
server.connection({ port: port })
const server = new Hapi.Server({ port: port })
return server
}
function start (server, opts, cb) {
if (typeof opts === 'function') {
cb = opts
opts = {}
}
async function start (server, opts) {
opts = opts || {}
const plugin = {
register: Multines.register,
plugin: {
name: 'multines',
register: Multines.register
},
options: opts
}
server.register([Nes, plugin], (err) => {
if (err) {
return cb(err)
}
await server.register([Nes, plugin])
server.start(cb)
})
server.subscriptionFar('/echo')

@@ -48,8 +42,11 @@ server.route({

method: 'POST',
handler: (req, reply) => {
handler: async (req, h) => {
server.publishFar('/echo', req.payload)
reply(req.payload)
return req.payload
}
})
await server.start()
return server

@@ -59,118 +56,83 @@ }

function pubSubTest () {
test('pub/sub', (done) => {
test('pub/sub', async () => {
let done
let error
const client = new Nes.Client('ws://localhost:4000')
await client.connect()
client.connect((err) => {
if (err) {
return done(err)
}
function handler (message, flags) {
expect(message).to.equal({ hello: 'world' })
client.subscribe('/echo', (message) => {
expect(message).to.equal({ hello: 'world' })
setTimeout(function () {
client.disconnect()
done()
}, 100)
}, (err) => {
if (err) {
return done(err)
}
client.disconnect().then(done).catch(error)
}
client.request({
path: '/echo',
method: 'POST',
payload: { hello: 'world' }
}, (err) => {
if (err) {
return done(err)
}
})
})
const waitForHandler = new Promise((resolve, reject) => {
done = resolve
error = reject
})
await client.subscribe('/echo', handler)
await Promise.all([
client.request({
path: '/echo',
method: 'POST',
payload: { hello: 'world' }
}),
waitForHandler
])
})
test('sub/unsub/pub', (done) => {
test('sub/unsub/pub', async () => {
const client = new Nes.Client('ws://localhost:4000')
await client.connect()
client.connect((err) => {
if (err) {
return done(err)
}
const handler = (message) => {
throw new Error('this should never happen')
}
const handler = (message) => {
done(new Error('this should never happen'))
}
await client.subscribe('/echo', handler)
await client.unsubscribe('/echo', handler)
client.subscribe('/echo', handler, (err) => {
if (err) {
return done(err)
}
await client.request({
path: '/echo',
method: 'POST',
payload: { hello: 'world' }
})
client.unsubscribe('/echo', handler, (err) => {
if (err) {
return done(err)
}
client.request({
path: '/echo',
method: 'POST',
payload: { hello: 'world' }
}, (err) => {
if (err) {
return done(err)
}
client.disconnect(() => setImmediate(done))
})
})
})
})
await client.disconnect()
})
test('sub/disconnect/sub/pub', (done) => {
test('sub/disconnect/sub/pub', async () => {
let client = new Nes.Client('ws://localhost:4000')
let done
let error
client.connect((err) => {
if (err) {
return done(err)
}
await client.connect()
await client.subscribe('/echo', (message) => {})
await client.disconnect()
client.subscribe('/echo', (message) => {}, (err) => {
if (err) {
return done(err)
}
client = new Nes.Client('ws://localhost:4000')
await client.connect()
client.disconnect()
function handler (message, flags) {
expect(message).to.equal({ hello: 'world' })
client = new Nes.Client('ws://localhost:4000')
client.disconnect().then(done).catch(error)
}
client.connect((err) => {
if (err) {
return done(err)
}
const waitForHandler = new Promise((resolve, reject) => {
done = resolve
error = reject
})
client.subscribe('/echo', (message) => {
expect(message).to.equal({ hello: 'world' })
setTimeout(function () {
client.disconnect()
done()
}, 100)
}, (err) => {
if (err) {
return done(err)
}
client.request({
path: '/echo',
method: 'POST',
payload: { hello: 'world' }
}, (err) => {
if (err) {
return done(err)
}
})
})
})
})
})
await client.subscribe('/echo', handler)
await Promise.all([
client.request({
path: '/echo',
method: 'POST',
payload: { hello: 'world' }
}),
waitForHandler
])
})

@@ -180,50 +142,45 @@ }

function scalablePubSubTest () {
test('scalable pub/sub', (done) => {
test('scalable pub/sub', async () => {
let done
let error
const client1 = new Nes.Client('ws://localhost:4000')
const client2 = new Nes.Client('ws://localhost:4001')
client1.connect((err) => {
if (err) {
return done(err)
}
await client1.connect()
await client2.connect()
client2.connect((err) => {
if (err) {
return done(err)
}
function handler (message, flags) {
expect(message).to.equal({ hello: 'world' })
client1.subscribe('/echo', (message) => {
expect(message).to.equal({ hello: 'world' })
client1.disconnect(() => done())
}, (err) => {
if (err) {
return done(err)
}
client1.disconnect().then(done).catch(error)
}
client2.request({
path: '/echo',
method: 'POST',
payload: { hello: 'world' }
}, (err) => {
if (err) {
return done(err)
}
const waitForHandler = new Promise((resolve, reject) => {
done = resolve
error = reject
})
client2.disconnect()
})
})
})
})
await client1.subscribe('/echo', handler)
await Promise.all([
client2.request({
path: '/echo',
method: 'POST',
payload: { hello: 'world' }
}),
waitForHandler
])
await client2.disconnect()
})
}
experiment('nes work as normal', () => {
experiment('nes work as normal', async () => {
let server
beforeEach((done) => {
server = start(getServer(), done)
beforeEach(async () => {
server = await start(getServer())
})
afterEach((done) => {
server.stop(done)
afterEach(async () => {
await server.stop()
server = null

@@ -240,32 +197,22 @@ })

beforeEach((done) => {
beforeEach(async () => {
mq = mqemitter()
server1 = start(getServer(), {
mq: mq
}, function (err) {
if (err) {
return done(err)
}
server2 = start(getServer(4001), {
mq: mq
}, done)
})
server1 = await start(getServer(), { mq: mq })
server2 = await start(getServer(4001), { mq: mq })
})
afterEach((done) => {
server1.stop((err) => {
if (err) {
return done(err)
}
server2.stop((err) => {
if (err) {
return done(err)
}
mq.close(done)
afterEach(async () => {
await server1.stop()
server1 = null
await server2.stop()
server2 = null
await new Promise((resolve, reject) => {
mq.close((err) => {
if (err) return reject(err)
resolve()
})
server2 = null
})
server1 = null
})

@@ -276,34 +223,31 @@

test('remove the / at the beginning', (done) => {
test('remove the / at the beginning', async () => {
let done
let error
const client = new Nes.Client('ws://localhost:4000')
await client.connect()
client.connect((err) => {
if (err) {
return done(err)
}
function handler (message, cb) {
expect(message.body).to.equal({ hello: 'world' })
mq.removeListener('echo', handler)
cb()
client.disconnect().then(done).catch(error)
}
const handler = (message, cb) => {
expect(message.body).to.equal({ hello: 'world' })
mq.removeListener('echo', handler)
cb()
}
mq.on('echo', handler)
mq.on('echo', handler, (err) => {
if (err) {
return done(err)
}
const waitForHandler = new Promise((resolve, reject) => {
done = resolve
error = reject
})
client.request({
path: '/echo',
method: 'POST',
payload: { hello: 'world' }
}, (err) => {
if (err) {
return done(err)
}
client.disconnect(() => setImmediate(done))
})
})
})
await Promise.all([
client.request({
path: '/echo',
method: 'POST',
payload: { hello: 'world' }
}),
waitForHandler
])
})

@@ -316,24 +260,11 @@ })

beforeEach((done) => {
server1 = start(getServer(), {
type: 'redis'
}, function (err) {
if (err) {
return done(err)
}
server2 = start(getServer(4001), {
type: 'redis'
}, done)
})
beforeEach(async () => {
server1 = await start(getServer(), { type: 'redis' })
server2 = await start(getServer(4001), { type: 'redis' })
})
afterEach((done) => {
server1.stop((err) => {
if (err) {
return done(err)
}
server2.stop(done)
server2 = null
})
afterEach(async () => {
await server1.stop()
await server2.stop()
server2 = null
server1 = null

@@ -350,24 +281,11 @@ })

beforeEach((done) => {
server1 = start(getServer(), {
type: 'redis'
}, function (err) {
if (err) {
return done(err)
}
server2 = start(getServer(4001), {
type: 'redis'
}, done)
})
beforeEach(async () => {
server1 = await start(getServer(), { type: 'redis' })
server2 = await start(getServer(4001), { type: 'redis' })
})
afterEach((done) => {
server1.stop((err) => {
if (err) {
return done(err)
}
server2.stop(done)
server2 = null
})
afterEach(async () => {
await server1.stop()
await server2.stop()
server2 = null
server1 = null

@@ -383,98 +301,85 @@ })

beforeEach((done) => {
beforeEach(async () => {
server = getServer()
const plugin = {
name: 'multines',
register: Multines.register
}
server.register([Nes, plugin], (err) => {
if (err) {
return done(err)
}
await server.register([Nes, plugin])
server.subscriptionFar('/{parts*}')
server.route({
path: '/publish',
method: 'POST',
handler: (req, reply) => {
const topic = req.payload.topic
const body = req.payload.body
server.publishFar(topic, body)
reply()
}
})
server.subscriptionFar('/{parts*}')
server.route({
path: '/publish',
method: 'POST',
handler: (req) => {
const topic = req.payload.topic
const body = req.payload.body
server.publishFar(topic, body)
server.start(done)
return { topic, body }
}
})
await server.start()
})
afterEach((done) => {
server.stop(done)
server = null
afterEach(async () => {
await server.stop()
})
test('a + wildcard work', (done) => {
test('a + wildcard work', async () => {
let done
let error
const client = new Nes.Client('ws://localhost:4000')
await client.connect()
client.connect((err) => {
if (err) {
return done(err)
}
const waitForHandler = new Promise((resolve, reject) => {
done = resolve
error = reject
})
client.subscribe('/+', (message) => {
expect(message).to.equal({ topic: 'hello', body: { hello: 'world' } })
setTimeout(() => {
client.disconnect()
done()
}, 100)
}, (err) => {
if (err) {
return done(err)
}
await client.subscribe('/+', (message) => {
expect(message).to.equal({ topic: 'hello', body: { hello: 'world' } })
client.disconnect().then(done).catch(error)
})
client.request({
path: '/publish',
method: 'POST',
payload: { topic: 'hello', body: { hello: 'world' } }
}, (err) => {
if (err) {
return done(err)
}
})
})
})
await Promise.all([
client.request({
path: '/publish',
method: 'POST',
payload: { topic: 'hello', body: { hello: 'world' } }
}),
waitForHandler
])
})
test('a # wildcard work', (done) => {
test('a # wildcard work', async () => {
let done
let error
const client = new Nes.Client('ws://localhost:4000')
await client.connect()
client.connect((err) => {
if (err) {
return done(err)
}
const waitForHandler = new Promise((resolve, reject) => {
done = resolve
error = reject
})
client.subscribe('/#', (message) => {
expect(message).to.equal({ topic: 'hello/new/world', body: { hello: 'world' } })
setTimeout(() => {
client.disconnect()
done()
}, 100)
}, (err) => {
if (err) {
return done(err)
}
await client.subscribe('/#', (message) => {
expect(message).to.equal({ topic: 'hello/new/world', body: { hello: 'world' } })
client.disconnect().then(done).catch(error)
})
client.request({
path: '/publish',
method: 'POST',
payload: { topic: 'hello/new/world', body: { hello: 'world' } }
}, (err) => {
if (err) {
return done(err)
}
})
})
})
await Promise.all([
client.request({
path: '/publish',
method: 'POST',
payload: { topic: 'hello/new/world', body: { hello: 'world' } }
}),
waitForHandler
])
})
})

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc