New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

@xmpp/connection

Package Overview
Dependencies
Maintainers
1
Versions
27
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@xmpp/connection - npm Package Compare versions

Comparing version 0.3.0 to 0.5.0

lib/StreamError.js

344

index.js

@@ -5,35 +5,27 @@ 'use strict'

const jid = require('@xmpp/jid')
const url = require('url')
const xml = require('@xmpp/xml')
const StreamError = require('./lib/StreamError')
class XMPPError extends Error {
constructor(condition, text, element) {
super(condition + (text ? ` - ${text}` : ''))
this.name = 'XMPPError'
this.condition = condition
this.text = text
this.element = element
}
}
const NS_STREAM = 'urn:ietf:params:xml:ns:xmpp-streams'
class StreamError extends XMPPError {
constructor(...args) {
super(...args)
this.name = 'StreamError'
}
}
function socketConnect(socket, ...params) {
return new Promise((resolve, reject) => {
function onError(err) {
socket.removeListener('connect', onConnect)
reject(err)
}
// We ignore url module from the browser bundle to reduce its size
function getHostname(uri) {
if (url.parse) {
const parsed = url.parse(uri)
return parsed.hostname || parsed.pathname
}
const el = document.createElement('a') // eslint-disable-line no-undef
el.href = uri
return el.hostname
function onConnect(value) {
socket.removeListener('error', onError)
resolve(value)
}
socket.once('error', onError)
socket.once('connect', onConnect)
socket.connect(...params)
})
}
class Connection extends EventEmitter {
constructor(options) {
constructor(options = {}) {
super()

@@ -44,10 +36,8 @@ this.domain = ''

this.timeout = 2000
this.options = typeof options === 'object' ? options : {}
this.plugins = Object.create(null)
this.startOptions = null
this.openOptions = null
this.connectOptions = null
this.options = options
this.socketListeners = Object.create(null)
this.parserListeners = Object.create(null)
this.status = 'offline'
this.socket = null
this.parser = null
}

@@ -59,7 +49,36 @@

this.jid = null
this.status = 'offline'
this._detachSocket()
this._detachParser()
this.socket = null
}
async _streamError(condition) {
try {
await this.send(
// prettier-ignore
xml('stream:error', {}, [
xml(condition, {xmlns: NS_STREAM}),
])
)
} catch (err) {}
return this._end()
}
async _onData(data) {
const str = data.toString('utf8')
this.emit('input', str)
try {
await this.parser.write(str)
} catch (err) {
// https://xmpp.org/rfcs/rfc6120.html#streams-error-conditions-bad-format
// "This error can be used instead of the more specific XML-related errors,
// such as <bad-namespace-prefix/>, <invalid-xml/>, <not-well-formed/>, <restricted-xml/>,
// and <unsupported-encoding/>. However, the more specific errors are RECOMMENDED."
try {
this._streamError('bad-format')
} catch (err) {}
}
}
_attachSocket(socket) {

@@ -69,21 +88,15 @@ const sock = (this.socket = socket)

listeners.data = data => {
const str = data.toString('utf8')
this.emit('input', str)
this.parser.write(str)
this._onData(data)
}
listeners.close = (...args) => {
listeners.close = (dirty, event) => {
this._reset()
this._status('disconnect', ...args)
this._status('disconnect', {clean: !dirty, event})
}
listeners.connect = () => {
this._status('connect')
sock.once('close', listeners.close)
}
listeners.error = error => {
this._reset()
if (this.status === 'connecting') {
this._status('offline')
}
this.emit('error', error)
}
sock.on('close', listeners.close)
sock.on('data', listeners.data)

@@ -95,10 +108,22 @@ sock.on('error', listeners.error)

_detachSocket() {
const listeners = this.socketListeners
Object.getOwnPropertyNames(listeners).forEach(k => {
this.socket.removeListener(k, listeners[k])
delete listeners[k]
const {socketListeners, socket} = this
Object.getOwnPropertyNames(socketListeners).forEach(k => {
socket.removeListener(k, socketListeners[k])
delete socketListeners[k]
})
delete this.socket
this.socket = null
return socket
}
_onElement(element) {
this.emit('element', element)
this.emit(this.isStanza(element) ? 'stanza' : 'nonza', element)
// https://xmpp.org/rfcs/rfc6120.html#streams-error
if (element.name !== 'stream:error') return
this.emit('error', StreamError.fromElement(element))
// "Stream Errors Are Unrecoverable"
// "The entity that receives the stream error then SHALL close the stream"
this._end()
}
_attachParser(p) {

@@ -108,26 +133,13 @@ const parser = (this.parser = p)

listeners.element = element => {
if (element.name === 'stream:error') {
this.close().then(() => this.disconnect())
this.emit(
'error',
new StreamError(
element.children[0].name,
element.getChildText(
'text',
'urn:ietf:params:xml:ns:xmpp-streams'
) || '',
element
)
)
}
this.emit('element', element)
this.emit(this.isStanza(element) ? 'stanza' : 'nonza', element)
this._onElement(element)
}
listeners.error = error => {
this._detachParser()
this.emit('error', error)
}
listeners.end = element => {
this._detachParser()
this._status('close', element)
}
parser.once('error', listeners.error)
parser.on('error', listeners.error)
parser.on('element', listeners.element)

@@ -143,3 +155,3 @@ parser.on('end', listeners.end)

})
delete this.parser
this.parser = null
}

@@ -158,27 +170,30 @@

async _end() {
let el
try {
el = await this.close()
} catch (err) {}
try {
await this.disconnect()
} catch (err) {}
return el
}
/**
* Opens the socket then opens the stream
*/
start(options) {
async start() {
if (this.status !== 'offline') {
return Promise.reject(new Error('Connection is not offline'))
throw new Error('Connection is not offline')
}
this.startOptions = options
const {service, domain, lang} = this.options
if (typeof options === 'string') {
options = {uri: options}
}
await this.connect(service)
if (!options.domain) {
options.domain = getHostname(options.uri)
}
const promiseOnline = promise(this, 'online')
return Promise.all([
this.promise('online'),
this.connect(options.uri).then(() => {
const {domain, lang} = options
return this.open({domain, lang})
}),
]).then(([addr]) => addr)
await this.open({domain, lang})
return promiseOnline
}

@@ -189,15 +204,8 @@

*/
connect(options) {
// eslint-disable-next-line require-await
async connect(service) {
this._status('connecting')
this.connectOptions = options
return new Promise((resolve, reject) => {
this._attachParser(new this.Parser())
this._attachSocket(new this.Socket())
this.socket.once('error', reject)
this.socket.connect(this.socketParameters(options), () => {
this.socket.removeListener('error', reject)
resolve()
// The 'connect' status is emitted by the socket 'connect' listener
})
})
this._attachSocket(new this.Socket())
// The 'connect' status is set by the socket 'connect' listener
return socketConnect(this.socket, this.socketParameters(service))
}

@@ -210,7 +218,9 @@

*/
disconnect(ms = this.timeout) {
this._status('disconnecting')
async disconnect(ms = this.timeout) {
if (this.socket) this._status('disconnecting')
this.socket.end()
return timeout(promise(this.socket, 'close'), ms)
// The 'disconnect' status is emitted by the socket 'close' listener
// The 'disconnect' status is set by the socket 'close' listener
await timeout(promise(this.socket, 'close'), ms)
}

@@ -221,6 +231,4 @@

*/
open(options) {
async open(options) {
this._status('opening')
// Useful for stream-features restart
this.openOptions = options
if (typeof options === 'string') {

@@ -236,21 +244,26 @@ options = {domain: options}

return Promise.all([
this.write(this.header(headerElement)),
promise(this.parser, 'start').then(el => {
// FIXME what about version and xmlns:stream ?
if (
el.name !== headerElement.name ||
el.attrs.xmlns !== headerElement.attrs.xmlns ||
el.attrs.from !== headerElement.attrs.to ||
!el.attrs.id
) {
return this.promise('error')
}
this._attachParser(new this.Parser())
this.domain = domain
this.lang = el.attrs['xml:lang']
this._status('open', el)
return el
}),
]).then(([, el]) => el)
await this.write(this.header(headerElement))
const promiseStart = async () => {
const el = await promise(this.parser, 'start')
// FIXME what about version and xmlns:stream ?
if (
el.name !== headerElement.name ||
el.attrs.xmlns !== headerElement.attrs.xmlns ||
el.attrs.from !== headerElement.attrs.to ||
!el.attrs.id
) {
return promise(this, 'error')
}
this.domain = domain
this.lang = el.attrs['xml:lang']
this._status('open', el)
return el
}
return timeout(promiseStart(), options.timeout || this.timeout)
}

@@ -263,12 +276,6 @@

*/
stop() {
if (!this.socket) {
return Promise.resolve()
}
return this.close().then(el =>
this.disconnect().then(() => {
this._status('offline')
return el
})
)
async stop() {
const el = await this._end()
if (this.status !== 'offline') this._status('offline', el)
return el
}

@@ -281,10 +288,12 @@

*/
close(ms = this.timeout) {
this._status('closing')
return Promise.all([
async close(ms = this.timeout) {
const p = Promise.all([
timeout(promise(this.parser, 'end'), ms),
this.write(this.footer(this.footerElement())),
]).then(([el]) => el)
// The 'close' status is emitted by the parser 'end' listener
])
if (this.parser && this.socket) this._status('closing')
const [el] = await p
return el
// The 'close' status is set by the parser 'end' listener
}

@@ -296,15 +305,23 @@

*/
restart() {
// eslint-disable-next-line require-await
async restart() {
this._detachParser()
this._attachParser(new this.Parser())
this._status('restarting')
return this.open(this.openOptions).then(() => {
this._status('restart')
})
const {domain, lang} = this.options
return this.open({domain, lang})
}
send(element) {
return this.write(element).then(() => {
this.emit('send', element)
})
// eslint-disable-next-line require-await
async send(element) {
this.emit('outgoing', element)
const proceed = () => {
return this.write(element).then(() => {
this.emit('send', element)
})
}
return this.hookOutgoing
? this.hookOutgoing(element).then(proceed)
: proceed()
}

@@ -315,3 +332,3 @@

this.send(element),
timeout(this.promise('element'), ms),
timeout(promise(this, 'element'), ms),
]).then(([, el]) => el)

@@ -322,2 +339,8 @@ }

return new Promise((resolve, reject) => {
// https://xmpp.org/rfcs/rfc6120.html#streams-close
// "Refrain from sending any further data over its outbound stream to the other entity"
if (this.status === 'closing') {
reject(new Error('Connection is closing'))
return
}
const str = data.toString('utf8')

@@ -348,16 +371,2 @@ this.socket.write(str, err => {

plugin(plugin) {
if (!this.plugins[plugin.name]) {
this.plugins[plugin.name] = plugin.plugin(this)
const p = this.plugins[plugin.name]
if (p && p.start) {
p.start()
} else if (p && p.register) {
p.register()
}
}
return this.plugins[plugin.name]
}
// Override

@@ -367,2 +376,4 @@ header(el) {

}
// Override
headerElement() {

@@ -374,12 +385,13 @@ return new xml.Element('', {

}
// Override
footer(el) {
return el.toString()
}
// Override
footerElement() {}
socketParameters(uri) {
const parsed = url.parse(uri)
parsed.port = Number(parsed.port)
parsed.host = parsed.hostname
return parsed
}
// Override
socketParameters() {}
}

@@ -393,4 +405,2 @@

module.exports = Connection
module.exports.getHostname = getHostname
module.exports.XMPPError = XMPPError
module.exports.StreamError = StreamError
module.exports.socketConnect = socketConnect

@@ -7,3 +7,3 @@ {

"bugs": "http://github.com/xmppjs/xmpp.js/issues",
"version": "0.3.0",
"version": "0.5.0",
"license": "ISC",

@@ -15,14 +15,16 @@ "keywords": [

"dependencies": {
"@xmpp/events": "^0.3.0",
"@xmpp/jid": "^0.3.0",
"@xmpp/error": "^0.5.0",
"@xmpp/events": "^0.5.0",
"@xmpp/jid": "^0.5.0",
"@xmpp/streamparser": "^0.0.6",
"@xmpp/xml": "^0.3.0"
"@xmpp/xml": "^0.5.0"
},
"browser": {
"url": false
"engines": {
"node": ">= 10.0.0",
"yarn": ">= 1.0.0"
},
"engines": {
"node": ">= 6",
"npm": ">= 2"
}
"publishConfig": {
"access": "public"
},
"gitHead": "896f432d22dd726d158bd39a46aed7d53a2b7b1b"
}

@@ -5,3 +5,3 @@ 'use strict'

const Connection = require('..')
const {EventEmitter} = require('@xmpp/events')
const {EventEmitter, promise, timeout} = require('@xmpp/events')
const xml = require('@xmpp/xml')

@@ -44,2 +44,22 @@

test.cb('error on status closing', t => {
t.plan(2)
const conn = new Connection()
conn.parser = new EventEmitter()
conn.footerElement = () => {
return xml('hello')
}
conn.socket = new EventEmitter()
conn.socket.write = (data, cb) => {
return cb()
}
conn.status = 'closing'
conn.close().catch(err => {
t.is(err.name, 'Error')
t.is(err.message, 'Connection is closing')
t.end()
})
conn.parser.emit('end')
})
test.cb('resolves', t => {

@@ -65,1 +85,41 @@ t.plan(2)

})
test('emits closing status', t => {
const conn = new Connection()
conn.parser = new EventEmitter()
conn.footerElement = () => {
return xml('hello')
}
conn.socket = new EventEmitter()
conn.socket.write = (data, cb) => {
return cb()
}
const p = Promise.all([
promise(conn, 'status').then(status => t.is(status, 'closing')),
conn.close(),
])
conn.parser.emit('end')
return p
})
test('do not emit closing status if parser property is missing', t => {
t.plan(2)
const conn = new Connection()
conn.parser = null
conn.footerElement = () => {
return xml('hello')
}
conn.socket = new EventEmitter()
conn.socket.write = (data, cb) => {
return cb()
}
return Promise.all([
timeout(promise(conn, 'status'), 500).catch(err =>
t.is(err.name, 'TimeoutError')
),
conn.close().catch(err => t.pass(err)),
])
})

@@ -7,4 +7,4 @@ 'use strict'

test.cb('timeout', t => {
t.plan(1)
test.cb("rejects with TimeoutError if socket doesn't close", t => {
t.plan(2)
const conn = new Connection()

@@ -17,12 +17,33 @@ const sock = (conn.socket = new EventEmitter())

})
t.is(conn.status, 'disconnecting')
})
test.cb('resolves', t => {
t.plan(3)
const conn = new Connection()
const sock = (conn.socket = new EventEmitter())
const sock = new EventEmitter()
conn._attachSocket(sock)
sock.emit('connect')
sock.end = () => {}
conn.disconnect().then(() => {
t.is(conn.status, 'disconnect')
t.end()
})
t.is(conn.status, 'disconnecting')
sock.emit('close')
t.is(conn.status, 'disconnect')
})
test.cb('rejects if socket.end throws', t => {
t.plan(1)
const conn = new Connection()
const sock = (conn.socket = new EventEmitter())
const error = new Error('foobar')
sock.end = () => {
throw error
}
conn.disconnect().catch(err => {
t.is(err, error)
t.end()
})
})

@@ -7,23 +7,10 @@ 'use strict'

test('resets properties on error event', t => {
test('emit error on socket error', t => {
const conn = new Connection()
conn._attachSocket(new EventEmitter())
conn.domain = 'example.com'
conn.lang = 'en'
conn.jid = {}
conn.on('error', () => {})
conn.socket.emit('error', {})
t.is(conn.domain, '')
t.is(conn.lang, '')
t.is(conn.jid, null)
t.is(conn.socket, null)
const error = new Error('foobar')
conn.on('error', err => {
t.is(err, error)
})
conn.socket.emit('error', error)
})
test('sets status to offline if status is connecting', t => {
const conn = new Connection()
conn._attachSocket(new EventEmitter())
conn.status = 'connecting'
conn.on('error', () => {})
conn.socket.emit('error', {})
t.is(conn.status, 'offline')
})

@@ -8,2 +8,5 @@ 'use strict'

const conn = new Connection()
conn.footerElement = () => {
return <foo />
}
return conn.stop().then(() => {

@@ -15,3 +18,3 @@ t.pass()

test.cb('close rejects', t => {
t.plan(0)
t.plan(1)
const conn = new Connection()

@@ -23,6 +26,5 @@ conn.socket = {}

conn.disconnect = () => {
t.fail()
return Promise.resolve()
t.pass()
}
conn.stop().catch(() => {
conn.stop().then(() => {
t.end()

@@ -49,1 +51,39 @@ })

})
test.cb('disconnect rejects', t => {
t.plan(3)
const conn = new Connection()
conn.socket = {}
const el = {}
conn.close = () => {
t.pass()
return Promise.resolve(el)
}
conn.disconnect = () => {
t.pass()
return Promise.reject()
}
conn.stop().then(element => {
t.is(el, element)
t.end()
})
})
test.cb('disconnect resolves', t => {
t.plan(3)
const conn = new Connection()
conn.socket = {}
const el = {}
conn.close = () => {
t.pass()
return Promise.resolve(el)
}
conn.disconnect = () => {
t.pass()
return Promise.resolve()
}
conn.stop().then(element => {
t.is(el, element)
t.end()
})
})
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc