rx.wamp
A Reactive wrapper library for the autobahn wamp v1/v2 library in the browser/node
If you have been using below version 0.2.0 please see below for important API changes!
Installation
Regular browser
<script type="application/javascript" src="javascripts/lib/autobahn.js"></script>
<script type="application/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/2.3.22/rx.lite.js"></script>
<script type="application/javascript" src="javascripts/rx.wamp.js" ></script>
RequireJS
require(['rx.wamp'], function(Rx) {
});
NodeJS
var rxwamp = require('rx.wamp');
Rx.Observable
.fromConnection({url: 'ws://localhost:9000', realm: 'realm1'});
Connection
function newSession(session) {
console.log("A new session was created");
}
var connectionSubscription = Rx.Observable.fromConnection("ws://localhost:9000")
.subscribe(newSession);
connectionSubscription.dispose();
Subscribing to topics
function validateArgs(value) {
return value.args && value.args.length > 1;
}
function getResultValue(value) {
return value.args[1];
}
var openObserver = Rx.Observer.create();
var topic = Rx.Observable.subscribeAsObservable(session, "wamp.my.foo", options, openObserver);
var topicSubscription = topic
.filter(validateArgs)
.map(getResultValue)
.subscribe(console.log);
topicSubscription.dispose();
New in version 0.3!
You can now pass your connection observable directly into your subscription so that it will persist across sessions
Rx.Observable.subscribeAsObservable(Rx.Observable.fromConnection("ws://myconnectionurl:9090"), "wamp.my.foo", onResult);
New in version 0.5!
You can now use an even shorter-hand for subscription. These will automatically persist across sessions if you use the
fromConnection()
overload.
var connection = Rx.Observable.fromConnection("ws://myconnectionurl:9090");
var subscriber =
Rx.Observable.subscriber(connection)
.to("wamp.my.foo", {}, fooObserver)
.to("wamp.my.other.foo", function(message) {}, function(ex) {}, function(){});
subscriber.dispose();
Publishing to topic
var published = Rx.Observable.publishAsObservable(session, "wamp.my.foo", [42], {key : "value"}, {});
Rx.Observable.publishAsObservable(session, "wamp.my.foo", { args : [42], kwargs : { key : "value" } }, true);
Rx.Observable.publishAsObservable(session, "wamp.my.foo", { args : [42], kwargs : { key : "value" } }, [12345678]);
Rx.Observable.publishAsObservable(session, "wamp.my.foo", { args : [42], kwargs : { key : "value" } }, [12345678], [87654321]);
Or use them together
var topic = Rx.Observable.fromPubSubPattern(session, "wamp.pubsub.topic", {});
topic.opened.subscribe(function(){
console.log("subscribed to topic");
});
topic.errors.subscribe(function(err){
console.log("There was an error publishing the message");
});
topic.subscribe(observer);
Rx.Observable.generateWithRelativeTime(0,
function(x) {return x < 42; },
function(x) {return x + 1; },
function(x) {return {args : [x]}; },
function(x) {return 15; })
.subscribe(topic);
Registering methods
Note that this will only work in version 2
function endpoint(args, kwargs, details) {
if (args === undefined || args.length < 1)
throw new autobahn.Error("No values to sum!");
else if (args.length > 2) {
throw new autobahn.Error("Too many values!");
} else {
return args[0] + args[1];
}
}
function onError(e) {
}
var registration =
Rx.Observable
.registerAsObservable(session, "wamp.my.add", endpoint, {})
.subscribeOnError(onError);
registration.dispose();
New in version 0.3!
You can now pass your connection observable directly into your registration so that it will persist across sessions
var connection = Rx.Observable.fromConnection({url : myUrl, realm : 'realm1'});
Rx.Observable.registerAsObservable(connection, "wamp.my.add", endpoint, {});
Calling methods
We can call methods, like the one in the example above, as well.
var caller = session.callAsObservable("wamp.my.add", {});
caller([2, 3], {})
.subscribe(function(value){
console.log("Result was %s", value.args[0]);
});
addResult.subscribe(function(value) {
console.log("Result was %s", value.args[0]);
});
caller(2, 3)
.subscribe(function(value) {});
Authentication
Currently only available in V1
Rx.Observable.authreqAsObservable(session,
function onchallenge(challenge){
var signature = this.authsign(challenge, "");
return this.auth(signature);
},
"blahsomeauthenticationkeyblah",
{});
Advanced
Weather Station Monitor
var sensorReadings = Rx.Observable.subscribeAsObservable(session, "weather.sensor");
var analyzer = Rx.Observable.callAsObservable(session, "weather.forecast.compute");
var desiredTemperature = Rx.Observable.subscribeAsObservable(session, "temperature.indoors.desired");
var dailyForecast =
sensorReadings
.map(function(rawValue){
return rawValue.kwargs || rawValue.event;
})
.throttleFirst(1000)
.bufferWithTime(1000 * 60 * 60 * 24)
.tap(function(readings) {
Rx.Observable.publishAsObservable(session, "weather.visualizer.daily", readings);
})
.flatMap(function(readings) {
return analyzer(readings);
})
.publish().refCount();
dailyForecast
.filter(function(weather) {
return weather.warnings.length > 0;
})
.map(function(weather) {
var warning = weather.warnings[0];
return {type : warning.type, severity : warning.severity, message : "GET TO DA CHOPPA!!"};
})
.subscribe(Rx.Observable.publishAsObservable.bind(null, session, "weather.warnings.klaxon"));
dailyForecast
.map(function(weather) {
return weather.temperature.average;
})
.combineLatest(desiredTemperature, function(actual, desired) {
return Math.abs(desired - actual);
})
.map(function(difference) {
return {state : difference > 4};
})
.subscribe(Rx.Observable.publishAsObservable.bind(null, session, "indoor.climatecontrol.active"));
var adder = session.caller("wamp.my.add");
var multiplier = session.caller("wamp.my.multiply");
var pipeline =
adder([2, 3])
.zip(adder([3, 4]), function(value1, value2) {
return [value1.args[0], value2.args[0]];
})
.flatMap(function(value) {
return multiplier(value[0], value[1]);
});
pipeline.subscribe(function(value){
console.log("Result was %d", value.args[0]);
})
###TODO