diffusion
Advanced tools
Comparing version 5.6.0 to 5.6.1
{ | ||
"name": "diffusion", | ||
"version": "5.6.0", | ||
"version": "5.6.1", | ||
"description": "Diffusion Javascript UCI client", | ||
@@ -5,0 +5,0 @@ "keywords" : ["diffusion", "reappt", "websockets", "data"], |
@@ -27,5 +27,5 @@ var SessionImpl = require('session/session-impl'); | ||
*/ | ||
version : '5.6.0', | ||
version : '5.6.1', | ||
build : '0_dev#internal', | ||
build : '1_dev#internal', | ||
@@ -32,0 +32,0 @@ /** |
@@ -5,18 +5,57 @@ var interface = require('util/interface').interface; | ||
/** | ||
* Represents a selection to one or more topics. Will emit events related to topics matched | ||
* by this instance's selector. | ||
* Provides a stream of topic events, specific to the topic selector that this Subscription was created for. | ||
* Subscription inherits all functions defined on {@link Stream}. | ||
* <P> | ||
* <br /> | ||
* A subscription will emit a <code>subscribed</code> event when this subscription's selector is resolved against a | ||
* topic by the server. | ||
* <P> | ||
* Updates from a topic are emitted with the <code>update</code> event. Event values are provided as <code>Buffer</code> | ||
* objects. | ||
* <P> | ||
* An <code>unsubscribed</code> event will be emitted when this subscription is unsubscribed from a particular topic. | ||
* This may occur because the topic was removed, or because the session unsubscribed via {@link Session#unsubscribe}. | ||
* <p> | ||
* Subscription inherits all functions defined on {@link Stream}. | ||
* <h5>Events:</h5> | ||
* <table class="table striped"> | ||
* <thead> | ||
* <tr> | ||
* <th>Event</th> | ||
* <th>Arguments</th> | ||
* <th>Description</th> | ||
* </tr> | ||
* </thead> | ||
* <tbody> | ||
* <tr> | ||
* <td><code>open</code></td> | ||
* <td>subscription</td> | ||
* <td>Emitted when the subscription is initially opened, passing a reference to the subscription itself. This will | ||
* only be fired once.</td> | ||
* </tr> | ||
* <tr> | ||
* <td><code>subscribe</code></td> | ||
* <td>details, topic</td> | ||
* <td>Emitted when a topic that is selected by this Subscription's topic selector is subscribed to by this session. | ||
* Once subscribed, <code>update</code> update events may be received for this topic</td> | ||
* </tr> | ||
* <tr> | ||
* <td><code>unsubscribe</code></td> | ||
* <td>reason, topic</td> | ||
* <td>Emitted when a topic that was previously subscribed, has been unsubscribed. No further update events will be | ||
* received from this topic until subscribed again. Unsubscriptions may occur due to the topic being removed, or | ||
* through calling {@link Session#unsubscribe} - an object containing the reason is provided.</td> | ||
* </tr> | ||
* <tr> | ||
* <td><code>update</code></td> | ||
* <td>value, topic</td> | ||
* <td>Emitted when an update has been received for a topic's value. By default, values are provided as | ||
* <code>Buffer</code> objects, unless {@link Subscription#transform} has been used to convert values to a specific | ||
* type.</td> | ||
* </tr> | ||
* <tr> | ||
* <td><code>close</code></td> | ||
* <td></td> | ||
* <td>Emitted when the subscription has been closed using {@link Subscription#close}.</td> | ||
* </tr> | ||
* </tbody> | ||
* </table> | ||
* | ||
* @example | ||
* // Get a reference to the subscription | ||
* session.subscribe('foo').on('open', function(subscription) { | ||
* // Get selector & subscription instance | ||
* }); | ||
* | ||
* @example | ||
* // Subscribe to a single topic | ||
@@ -40,3 +79,3 @@ * var subscription = session.subscribe('foo'); | ||
* | ||
* subscription.on('subscribed', function(details, topic) { | ||
* subscription.on('subscribe', function(details, topic) { | ||
* // Receive notifications when we are subscribed to a topic | ||
@@ -48,3 +87,3 @@ * }); | ||
* | ||
* subscription.on('unsubscribed', function(reason, topic) { | ||
* subscription.on('unsubscribe', function(reason, topic) { | ||
* // Receive notifications when we are unsubscribed from a topic | ||
@@ -54,3 +93,5 @@ * }); | ||
* @class Subscription | ||
* | ||
* | ||
* @fires Subscription#open | ||
* @fires Subscription#close | ||
* @fires Subscription#update | ||
@@ -101,3 +142,10 @@ * @fires Subscription#subscribe | ||
*/ | ||
'transform' | ||
'transform', | ||
/** | ||
* Close the subscription. No further events will be emitted. | ||
* | ||
* @function Subscription#close | ||
*/ | ||
'close' | ||
]); |
@@ -9,5 +9,5 @@ var interface = require('util/interface').interface; | ||
* @example | ||
* var client = session.client; | ||
* var clients = session.clients; | ||
* | ||
* @namespace Session.client | ||
* @namespace Session.clients | ||
*/ | ||
@@ -19,4 +19,4 @@ | ||
* <p> | ||
* | ||
* See {@link ClientControl.PropertyKeys} for a list of the available | ||
* <br /> | ||
* See {@link Session.clients.PropertyKeys} for a list of the available | ||
* fixed property keys. | ||
@@ -38,8 +38,13 @@ * <p> | ||
* // Get values of the 'FOO' and 'BAR' properties for client whose session id is 'id'. | ||
* session.clients.getSessionProperties(id, ['FOO', 'BAR']); | ||
* session.clients.getSessionProperties(id, ['FOO', 'BAR']).then(function(session, properties) { | ||
* console.log('Received properties for session', session, properties); | ||
* }, function(err) { | ||
* console.log('Unable to receive properties: ', err'); | ||
* }); | ||
* | ||
* | ||
* @param {SessionID} sessionID - Identifies the client session. | ||
* @param {Array} [requiredProperties] - Specifies the keys of the property values required. | ||
* @returns {Result} A {@link Result} for this operation. | ||
* @function Session.client#getSessionProperties | ||
* @function Session.clients#getSessionProperties | ||
*/ | ||
@@ -64,64 +69,108 @@ 'getSessionProperties', | ||
* <p> | ||
* To request all fixed properties ALL_FIXED_PROPERTIES should be included | ||
* To request all fixed properties <code>ALL_FIXED_PROPERTIES</code> should be included | ||
* as a key and any other fixed property keys would be ignored. | ||
* To request all user properties ALL_USER_PROPERTIES should be included | ||
* To request all user properties <code>ALL_USER_PROPERTIES</code> should be included | ||
* as a key and any other user property keys supplied would be ignored. | ||
* | ||
* @example | ||
* // Register a listener for all fixed system properties. | ||
* session.clients.registerSessionPropertiesListener( | ||
* diffusion.clients.PropertyKeys.ALL_FIXED_PROPERTIES) | ||
* .then(function() { ... }); | ||
* // Specify desired properties to listen to | ||
* var props = diffusion.clients.PropertyKeys.ALL_FIXED_PROPERTIES; | ||
* | ||
* // Create the listener | ||
* var listener = { | ||
* onActive : function(deregister) { | ||
* // Listener is active | ||
* }, | ||
* onSessionOpen : function(session, properties) { | ||
* // A session has been opened | ||
* }, | ||
* onSessionEvent : function(session, event, properties, previous) { | ||
* // A session's properties have changed (specified by 'event') | ||
* }, | ||
* onSessionClose : function(session, properties, reason) { | ||
* // A session has closed | ||
* }, | ||
* onClose : function() { | ||
* // Listener is closed | ||
* } | ||
* } | ||
* session.clients.setSessionPropertiesListener(props, listener).then(function() { | ||
* // Registration was succesful | ||
* }, function(err) { | ||
* // There was an error registering the session listener | ||
* }); | ||
* | ||
* @param {Array} requiredProperties - A set of required property keys. | ||
* @param {Session.clients.SessionPropertiesListener} listener - The listener to register | ||
* @returns {Result} A {@link Result} for this operation. | ||
* @function Session.client#registerSessionPropertiesListener | ||
* @function Session.clients#setSessionPropertiesListener | ||
*/ | ||
'registerSessionPropertiesListener', | ||
'setSessionPropertiesListener', | ||
]); | ||
/** | ||
* The Session Properties Listener interface for receiving session property events. This interface must be implemented | ||
* by the user, to be registered via {@link Session.clients#setSessionPropertiesListener}. | ||
* <P> | ||
* <br /> | ||
* A session properties listener has a lifecycle that reflects the registration state on the server. This is expressed | ||
* through the callback methods. Once {@link Session.clients.SessionPropertiesListener#onClose onClose} has been | ||
* called, no further interactions will occur. | ||
* | ||
* @class Session.clients.SessionPropertiesListener | ||
*/ | ||
ClientControl.SessionPropertiesListener = interface('SessionPropertiesListener', [ | ||
/** | ||
* Returns a listener which provides a stream of session property | ||
* events which {@link registerSessionPropertiesListener} has | ||
* requested. | ||
* <p> | ||
* @example | ||
* var listener = session.clients.getSessionListener(); | ||
* Called when the listener has been registered at the server and is now active. | ||
* @param {Function} deregister - A function to call that will deregister and close this handler. | ||
* @function Session.clients.SessionPropertiesListener#onActive | ||
*/ | ||
'onActive', | ||
/** | ||
* Called when the listener is deregistered, or the session is closed. | ||
* | ||
* // Listen for new sessions and receive their properties. | ||
* listener.on('onSessionOpen', function(event) { | ||
* console.log(event.sessionId); | ||
* console.log(event.properties); | ||
* }); | ||
* @function Session.clients.SessionPropertiesListener#onClose | ||
*/ | ||
'onClose', | ||
/** | ||
* Notification that a new client session has been opened. | ||
* <P> | ||
* <br /> | ||
* When the listener is registered, this will be called for all existing sessions. It will then be | ||
* called for every client session that opens whilst the listener is registered. | ||
* <P> | ||
* This will be called for client session regardless of requested session properties. | ||
* | ||
* // Listen for changes to properties for an established session. | ||
* listener.on('onSessionUpdate', function(event) { | ||
* console.log(event.sessionid); | ||
* console.log(event.type); | ||
* console.log(event.oldProperties); | ||
* console.log(event.newProperties); | ||
* }); | ||
* @param {String} session - The session identifier | ||
* @param {Object} properties - The map of requested session property values. | ||
* @function Session.clients.SessionPropertiesListener#onSessionOpen | ||
*/ | ||
'onSessionOpen', | ||
/** | ||
* Notification of a session event that can result in a change of properties. | ||
* | ||
* // Listen for sessions closing, receive the close reason | ||
* // and the properties for the session at time of close. | ||
* listener.on('onSessionClose', function(event) { | ||
* console.log(event.sessionId); | ||
* console.log(event.reason); | ||
* console.log(event.properties); | ||
* }); | ||
* @param {String} session - The session identifier | ||
* @param {Session.clients.SessionEventType} type - The type of event | ||
* @param {Object} properties - The map of requested property values | ||
* @param {Object} previous - A map of previous values for keys that have changed. This will only contain changed | ||
* values and not the whole required property set. | ||
* @function Session.clients.SessionPropertiesListener#onSessionEvent | ||
*/ | ||
'getSessionPropertiesListener' | ||
]); | ||
'onSessionEvent', | ||
/** | ||
* A stream of session properties events sent to this session for the | ||
* registered listener. | ||
*/ | ||
ClientControl.SessionPropertiesStream = interface('SessionPropertiesStream'); | ||
/** | ||
* Events which may be fired by the session properties listener. | ||
*/ | ||
ClientControl.SessionPropertiesListener = interface('SessionPropertiesListener', [ | ||
'onSessionOpen', | ||
'onSessionUpdate', | ||
/** | ||
* Notification that a client session has closed. | ||
* <P> | ||
* <br /> | ||
* This will be called for every client that closes whilst the listener is registered, regardless of | ||
* requested session properties. | ||
* | ||
* @param {String} session - The session identifier | ||
* @param {Object} properties - The map of requested property values | ||
* @param {Object} reason - The reason why the session was closed | ||
* @function Session.clients.SessionPropertiesListener#onSessionClose | ||
*/ | ||
'onSessionClose' | ||
@@ -128,0 +177,0 @@ ]); |
@@ -11,3 +11,3 @@ var interface = require('util/interface').interface; | ||
* <P> | ||
* Subscription causes the server to establish a subscription for this | ||
* Subscription causes the server to establish a subscription for this | ||
* session to any topic that matches the specified selector, including topics | ||
@@ -18,15 +18,11 @@ * that are added after the initial call to {@link Session#subscribe}. | ||
* defined by {@link TopicSelectors}, it will be treated as a direct topic | ||
* path. | ||
* path. | ||
* <P> | ||
* When a session is subscribed to a topic, the returned {@link Subscription} | ||
* will emit a <code>subscribe</code> event that contains the topic path | ||
* the subscription is for. | ||
* <P> | ||
* Whenever any topic selected by the subscription is updated, the | ||
* {@link Subscription} will emit a <code>update</code> event with the topic | ||
* path and the updated value. | ||
* <P> | ||
* An optional callback function may be provided, which will be automatically | ||
* bound to the returned {@link Subscription}'s <code>update</code> event. | ||
* | ||
* <P> | ||
* A {@link Subscription} will emit all topic events that match the provided | ||
* selector. Documentation of these events is available on the {@link Subscription} | ||
* page. | ||
* | ||
* @example | ||
@@ -37,24 +33,27 @@ * // Subscribe to a single topic with a callback function for updates | ||
* }); | ||
* | ||
* | ||
* @example | ||
* // Subscribe to multiple topics and handle subscription events | ||
* session.subscribe('?foo/.*').on({ | ||
* update : function(update, topic) { | ||
* open : function(subscription) { | ||
* console.log('Opened subscription for: ' + subscription.selector); | ||
* }, | ||
* update : function(update, topic) { | ||
* console.log('Update for ' + topic + ' : ' + update); | ||
* }, | ||
* subscribe : function(topic) { | ||
* subscribe : function(details, topic) { | ||
* console.log('Subscribed to : ' + topic); | ||
* }, | ||
* unsubscribe : function(topic) { | ||
* unsubscribe : function(reason, topic) { | ||
* console.log('Unsubscribed from : ' + topic); | ||
* } | ||
* }); | ||
* | ||
* | ||
* @example | ||
* // Assign subscription and bind multiple listeners | ||
* var subscription = session.subscribe('?foo/.*'); | ||
* | ||
* | ||
* subscription.on('update', callback1); | ||
* subscription.on('update', callback2); | ||
* | ||
* | ||
* @param {String} selector - The topic selector to subscribe to. | ||
@@ -66,3 +65,3 @@ * @param {Function} [callback] - An optional callback for update events | ||
'subscribe', | ||
/** | ||
@@ -74,6 +73,9 @@ * Unsubscribe the client from a given topic selector. | ||
* <P> | ||
* Each topic that this session is unsubscribed from will cause any | ||
* Each topic that this session is unsubscribed from will cause any | ||
* associated {@link Subscription}s to emit an <code>unsubscribe</code> | ||
* event. | ||
* | ||
* | ||
* @example | ||
* session.unsubscribe('foo'); | ||
* | ||
* @param {String} selector - The topic selector to unsubscribe from. | ||
@@ -84,6 +86,50 @@ * @returns {Result} A {@link Result Result} for this operation | ||
'unsubscribe', | ||
/** | ||
* Produce a {@link View} of multiple topics from a topic selector. | ||
* Create a {@link Subscription} stream to receive updates from topics that match the provided topic selector. | ||
* <P> | ||
* This method operates in a similar way to {@link Session#subscribe}, except it will not cause the server to | ||
* send any topic updates. This allows the registration of listeners prior to actually subscribing, or to | ||
* add/remove listeners independently of subscriptions on the server. | ||
* <P> | ||
* If no selector is provided, the {@link Subscription} stream created will receive all topic events that have not | ||
* been handled by other subscription streams registered to specific selectors. This allows the handling of topic | ||
* events for topics that this session has been subscribed to by other control sessions. | ||
* <P> | ||
* An optional callback function may be provided, which will be automatically bound to the returned | ||
* {@link Subscription}'s <code>update</code> event. | ||
* | ||
* @example | ||
* // Establish a listener, but will not receive updates immediately | ||
* session.stream('foo', function(update, topic) { | ||
* console.log('Received an update for : ' + topic, update); | ||
* }); | ||
* | ||
* // Once subscribed, the listener established above will receive topic updates | ||
* session.subscribe('foo'); | ||
* | ||
* @example | ||
* // Receive topic events for topics that aren't handled by any other listeners | ||
* session.stream().on({ | ||
* update : function(value, topic) { | ||
* console.log('Update from: ', topic, value); | ||
* }, | ||
* subscribe : function(details, topic) { | ||
* console.log('Subscribed to: ', topic); | ||
* }, | ||
* unsubscribe : function(reason, topic) { | ||
* console.log('Unsubscribed from: ', topic); | ||
* } | ||
* }); | ||
* | ||
* @param {String} [selector] - The topic selector to receive updates for | ||
* @param {Function} [callback] - An optional callback for update events | ||
* @returns {Subscription} A representation of the subscription stream | ||
* @function Session#stream | ||
*/ | ||
'stream', | ||
/** | ||
* Produce a {@link View} of multiple topics from a topic selector. | ||
* <P> | ||
* A view is a subscription, but instead of an update being issued for | ||
@@ -97,3 +143,3 @@ * each topic in a selector set, the view presents all participating | ||
* Update events will still be emitted by the {@link View}. | ||
* | ||
* | ||
* @example | ||
@@ -103,6 +149,6 @@ * // For an existing topic tree containing foo/bar and foo/baz, | ||
* var view = session.view('?foo//'); | ||
* | ||
* | ||
* // Log the current value of the topic 'foo'. | ||
* console.log(view.foo); | ||
* | ||
* | ||
* // Log the current value of the topic 'foo/bar'. | ||
@@ -129,3 +175,3 @@ * console.log(view.foo.bar); | ||
* The reason that an unsubscription occurred. | ||
* | ||
* | ||
* @readonly | ||
@@ -132,0 +178,0 @@ * @enum |
@@ -9,3 +9,5 @@ var implements = require('util/interface').implements; | ||
var registration = require('control/registration'); | ||
var responseHandler = require('control/registration').responseHandler; | ||
var registrationCallback = require('control/registration').registrationCallback; | ||
var api = require('../../../features/client-control'); | ||
@@ -18,7 +20,3 @@ | ||
function PropertiesEventStream() { | ||
var emitter = new Emitter(undefined, undefined, ['onSessionOpen', 'onSessionUpdate', 'onSessionClose']); | ||
emitter.assign(this); | ||
this.e = emitter; | ||
} | ||
var logger = require('util/logger').create('Session.Clients'); | ||
@@ -29,45 +27,12 @@ module.exports = implements(api, function ClientControlImpl(internal) { | ||
var GET_SESSION_PROPERTIES_SERVICE = serviceLocator.obtain(Services.GET_SESSION_PROPERTIES); | ||
var SET_SESSION_PROPERTIES_LISTENER_SERVICE = serviceLocator.obtain(Services.SET_SESSION_PROPERTIES_LISTENER); | ||
var getProperties = serviceLocator.obtain(Services.GET_SESSION_PROPERTIES); | ||
var registration = serviceLocator.obtain(Services.SESSION_PROPERTIES_REGISTRATION); | ||
var stream; | ||
internal.getServiceRegistry().add(Services.SESSION_PROPERTIES_EVENT, { | ||
onRequest : function(internal, message, callback) { | ||
conversationSet.respond(message.cid, message); | ||
callback.respond(); | ||
if (stream) { | ||
switch (message.type) { | ||
case SessionPropertiesEventType.OPEN: | ||
stream.e.emit('onSessionOpen', { | ||
sessionId : message.sessionId, | ||
properties : message.oldProperties | ||
}); | ||
break; | ||
case SessionPropertiesEventType.UPDATE: | ||
stream.e.emit('onSessionUpdate', { | ||
sessionId : message.sessionId, | ||
type : message.updateType, | ||
oldProperties : message.oldProperties, | ||
newProperties : message.newProperties | ||
}); | ||
break; | ||
case SessionPropertiesEventType.CLOSE: | ||
stream.e.emit('onSessionClose', { | ||
sessionId : message.sessionId, | ||
reason : message.closeReason, | ||
properties : message.oldProperties | ||
}); | ||
break; | ||
} | ||
} | ||
} | ||
}); | ||
internal.getServiceRegistry().add(Services.SET_SESSION_PROPERTIES_LISTENER, { | ||
onRequest : function(internal, message, callback) { | ||
callback.respond(); | ||
conversationSet.respond(message.cid, message); | ||
} | ||
}); | ||
this.getSessionProperties = function(sid, propertyKeys) { | ||
@@ -82,3 +47,3 @@ var emitter = new Emitter(); | ||
if (internal.checkConnected(emitter)) { | ||
GET_SESSION_PROPERTIES_SERVICE.send({ | ||
getProperties.send({ | ||
sessionID : sid, | ||
@@ -93,3 +58,3 @@ propertyKeys : propertyKeys | ||
} else { | ||
emitter.emit('complete', result.properties); | ||
emitter.emit('complete', sid, result.properties); | ||
} | ||
@@ -103,44 +68,57 @@ } | ||
this.registerSessionPropertiesListener = function(requiredProperties, callbacks) { | ||
this.setSessionPropertiesListener = function(requiredProperties, handler) { | ||
var emitter = new Emitter(); | ||
var result = new Result(emitter); | ||
if (!stream) { | ||
stream = new PropertiesEventStream(); | ||
if (!handler) { | ||
emitter.error(new Error('Session Properties listener is null or undefined')); | ||
} | ||
stream.on('close', function() { | ||
stream = undefined; | ||
}); | ||
if (internal.checkConnected(emitter)) { | ||
logger.debug('Adding Session Properties Listener'); | ||
if (internal.checkConnected(emitter)) { | ||
var params = { | ||
definition : Services.SESSION_PROPERTIES_EVENT, | ||
requiredProperties : requiredProperties | ||
}; | ||
var adapter = { | ||
active : function(close) { | ||
if (callbacks && callbacks.onActive) { | ||
callbacks.onActive(close); | ||
logger.debug('Session Properties Listener active'); | ||
handler.onActive(close); | ||
}, | ||
respond : function(message) { | ||
switch (message.type) { | ||
case SessionPropertiesEventType.OPEN: | ||
handler.onSessionOpen(message.sessionId, message.oldProperties); | ||
break; | ||
case SessionPropertiesEventType.UPDATE: | ||
handler.onSessionEvent( | ||
message.sessionId, | ||
message.updateType, | ||
message.newProperties, | ||
message.oldProperties); | ||
break; | ||
case SessionPropertiesEventType.CLOSE: | ||
handler.onSessionClose( | ||
message.sessionId, | ||
message.oldProperties, | ||
message.closeReason); | ||
break; | ||
default : | ||
logger.debug('Unknown event type received for session properties listener', message.type); | ||
} | ||
}, | ||
respond : function(response) { | ||
return false; | ||
}, | ||
close : function() { | ||
if (callbacks && callbacks.onClose) { | ||
callbacks.onClose(); | ||
} | ||
logger.debug('Session Properties Listener closed'); | ||
handler.onClose(); | ||
} | ||
}; | ||
return registration.registerSessionPropertiesListener(internal, params, adapter); | ||
} else { | ||
return result; | ||
var cid = conversationSet.new(responseHandler(internal, adapter, function(cid, callback) { | ||
registration.send({ cid : cid }, callback); | ||
})); | ||
registration.send( | ||
{ cid : cid, properties : requiredProperties }, | ||
registrationCallback(conversationSet, cid, emitter)); | ||
} | ||
}; | ||
this.getSessionPropertiesListener = function() { | ||
return stream; | ||
return result; | ||
}; | ||
}); |
@@ -22,2 +22,4 @@ var HashMap = require('hashmap').HashMap; | ||
var subscriptions = []; | ||
var fallbacks = []; | ||
var details_cache = new HashMap(); | ||
@@ -29,7 +31,17 @@ | ||
return function(msg) { | ||
var handled = false; | ||
subscriptions.forEach(function(sub) { | ||
if (sub.sub.selector.selects(msg.topic)) { | ||
handled = true; | ||
sub.emitter.emit(evt, msg[field], msg.topic); | ||
} | ||
} | ||
}); | ||
if (!handled) { | ||
fallbacks.forEach(function(sub) { | ||
sub.emitter.emit(evt, msg[field], msg.topic); | ||
}); | ||
} | ||
}; | ||
@@ -48,29 +60,34 @@ }; | ||
this.subscribe = function(topic, callback) { | ||
logger.debug('Subscribing', topic); | ||
function createSubscription(selector, callback) { | ||
var e = new Emitter(undefined, undefined, ['open', 'update', 'subscribe', 'unsubscribe']); | ||
var s = new Subscription(e, selector); | ||
var selector = topicSelectorParser(topic); | ||
if (callback) { | ||
s.on('update', callback); | ||
} | ||
var e = new Emitter(undefined, undefined, ['update', 'subscribe', 'unsubscribe']); | ||
var s = new Subscription(e, selector); | ||
var ref = { | ||
return { | ||
emitter : e, | ||
sub : s | ||
}; | ||
s.on('close', function() { | ||
} | ||
this.subscribe = function(topic, callback) { | ||
logger.debug('Subscribing', topic); | ||
var selector = topicSelectorParser(topic); | ||
var ref = createSubscription(selector, callback); | ||
ref.sub.on('close', function() { | ||
arrays.remove(subscriptions, ref); | ||
}); | ||
if (callback) { | ||
s.on('update', callback); | ||
} | ||
if (internal.checkConnected(e)) { | ||
if (internal.checkConnected(ref.emitter)) { | ||
subscribe.send(selector); | ||
subscriptions.push(ref); | ||
ref.emitter.emit('open', ref.sub); | ||
} | ||
return s; | ||
return ref.sub; | ||
}; | ||
@@ -84,2 +101,28 @@ | ||
this.stream = function(topic, callback) { | ||
logger.debug('Establishing topic stream', topic); | ||
var collection = subscriptions; | ||
if (topic && typeof topic === 'function' || !topic) { | ||
collection = fallbacks; | ||
callback = topic; | ||
topic = '*/'; | ||
} | ||
var selector = topicSelectorParser(topic); | ||
var ref = createSubscription(selector, callback); | ||
ref.sub.on('close', function() { | ||
arrays.remove(collection, ref); | ||
}); | ||
if (internal.checkConnected(ref.emitter)) { | ||
collection.push(ref); | ||
ref.emitter.emit('open', ref.sub); | ||
} | ||
return ref.sub; | ||
}; | ||
this.view = function(selector, callback) { | ||
@@ -86,0 +129,0 @@ var s = this.subscribe(selector); |
@@ -48,4 +48,4 @@ var Emitter = require('events/emitter'); | ||
if (info) { | ||
topicToInfo[info.path] = null; | ||
idToInfo[id] = null; | ||
topicToInfo[info.path] = undefined; | ||
idToInfo[id] = undefined; | ||
@@ -52,0 +52,0 @@ emitter.emit('unsubscribe', { |
@@ -60,2 +60,14 @@ var Services = require('services/services'); | ||
function registrationCallback(conversationSet, cid, emitter) { | ||
return function(err) { | ||
if (err) { | ||
conversationSet.discard(cid, err); | ||
emitter.error(err); | ||
} else { | ||
conversationSet.respond(cid, ResponseHandlerState.ACTIVE); | ||
emitter.emit('complete'); | ||
} | ||
}; | ||
} | ||
function registerHandler(internal, params, adapter, reg, dereg) { | ||
@@ -75,11 +87,4 @@ var conversationSet = internal.getConversationSet(); | ||
registration.send({ params : params, cid : cid }, function(err) { | ||
if (err) { | ||
conversationSet.discard(cid, err); | ||
emitter.error(err); | ||
} else { | ||
conversationSet.respond(cid, ResponseHandlerState.ACTIVE); | ||
emitter.emit('complete'); | ||
} | ||
}); | ||
registration.send({ params : params, cid : cid }, | ||
registrationCallback(conversationSet, cid, emitter)); | ||
@@ -89,8 +94,10 @@ return result; | ||
module.exports.responseHandler = responseHandler; | ||
module.exports.registrationCallback = registrationCallback; | ||
module.exports.registerMessageHandler = function registerMessageHandler(internal, params, adapter) { | ||
var handler = registerHandler(internal, params, adapter, | ||
return registerHandler(internal, params, adapter, | ||
Services.MESSAGE_RECEIVER_CONTROL_REGISTRATION, | ||
Services.MESSAGE_RECEIVER_CONTROL_DEREGISTRATION); | ||
return handler; | ||
}; | ||
@@ -104,10 +111,2 @@ | ||
module.exports.registerSessionPropertiesListener = | ||
function registerSessionPropertiesListener(internal, params, adapter) { | ||
var listener = registerHandler(internal, params, adapter, | ||
Services.SET_SESSION_PROPERTIES_LISTENER, | ||
Services.SET_SESSION_PROPERTIES_LISTENER); // TODO: Deregister? | ||
return listener; | ||
}; | ||
module.exports.ResponseHandlerState = ResponseHandlerState; |
@@ -24,13 +24,13 @@ var functions = require('util/function'); | ||
args = args || []; | ||
if (listeners[event]) { | ||
var dispatch = function(listener) { | ||
functions.callWithArguments(listener, args); | ||
}; | ||
process.nextTick(function() { | ||
var dispatch = function(listener) { | ||
functions.callWithArguments(listener, args); | ||
}; | ||
process.nextTick(function() { | ||
if (listeners[event]) { | ||
listeners[event].forEach(dispatch); | ||
}); | ||
} | ||
} | ||
}); | ||
listener(event, args); | ||
@@ -37,0 +37,0 @@ }; |
@@ -64,7 +64,3 @@ var implements = require('util/interface').implements; | ||
if (events) { | ||
if (! allowedEvents) { | ||
// These events are always allowed. | ||
allowedEvents = ['close', 'error', 'complete']; | ||
} | ||
allowedEvents = allowedEvents.concat(events); | ||
allowedEvents = ['close', 'error', 'complete'].concat(events); | ||
} | ||
@@ -71,0 +67,0 @@ |
@@ -27,2 +27,10 @@ var implements = require('util/interface').implements; | ||
stream.on('open', function(selector, s) { | ||
e.emit('open', selector, this); | ||
}); | ||
stream.on('close', function() { | ||
e.emit('close'); | ||
}); | ||
stream.on('update', function(update, topic) { | ||
@@ -29,0 +37,0 @@ e.emit('update', fn(update), topic); |
@@ -13,3 +13,3 @@ var ClientControlOptions = require('services/control/client-control-options'); | ||
var result = { | ||
conversationId : ConversationIdSerialiser.read(input), | ||
cid : ConversationIdSerialiser.read(input), | ||
sessionId : SessionIdSerialiser.read(input), | ||
@@ -16,0 +16,0 @@ type : BEES.read(input, SessionPropertiesEventType) |
@@ -8,6 +8,5 @@ var ConversationIDSerialiser = require('conversation/conversation-id-serialiser'); | ||
write : function(output, request) { | ||
if (request.params.requiredProperties !== undefined && | ||
request.params.requiredProperties !== null) { | ||
if (request.properties) { | ||
Codec.writeInt32(output, 0); | ||
Codec.writeCollection(output, request.params.requiredProperties, Codec.writeString); | ||
Codec.writeCollection(output, request.properties, Codec.writeString); | ||
} else { | ||
@@ -14,0 +13,0 @@ Codec.writeInt32(output, 1); |
@@ -214,7 +214,7 @@ var TopicSelector = require('../../topics/topic-selector'); | ||
}, | ||
SET_SESSION_PROPERTIES_LISTENER : { | ||
SESSION_PROPERTIES_REGISTRATION : { | ||
id : 69, | ||
name : "Set session properties listener", | ||
request : SetSessionPropertiesListener, | ||
response : SetSessionPropertiesListener | ||
response : null | ||
}, | ||
@@ -225,4 +225,4 @@ SESSION_PROPERTIES_EVENT : { | ||
request : SessionPropertiesEvent, | ||
response : SessionPropertiesEvent | ||
response : null | ||
} | ||
}; |
339825
9433