rxjs-ws-channels-filters
Advanced tools
Comparing version 0.0.7 to 0.0.8
@@ -67,14 +67,9 @@ 'use strict'; | ||
function reSubscribe(open) { | ||
var channels = Object.keys(vm.channels.subs); | ||
var i = channels.length - 1; | ||
for (; i > -1; i--) { | ||
var channel = channels[i]; | ||
if (open === 1) { | ||
if (open === 1) { | ||
var channels = Object.keys(vm.channels.subs); | ||
var i = channels.length - 1; | ||
for (; i > -1; i--) { | ||
var channel = channels[i]; | ||
var filters = getChannelFilters(channel); | ||
joinChannel(channel, filters); | ||
vm.channels.subs[channel].initialized = true; | ||
} else if (open === true) { | ||
vm.channels.subs[channel].initialized = true; | ||
} else if (open === false) { | ||
vm.channels.subs[channel].initialized = false; | ||
} | ||
@@ -98,5 +93,6 @@ } | ||
filters: [], | ||
initialized: false | ||
filtersToSend: [] | ||
}; | ||
channelSub.observable = createNewChannelObservable(channel, channelSub); | ||
channelSub.serverNotification = createNewChannelServerSubscriptionObservable(channelSub); | ||
} | ||
@@ -106,7 +102,11 @@ return channelSub; | ||
function createNewChannelServerSubscriptionObservable(channelSub) { | ||
return new Rx.Observable.create(function (observer) { | ||
channelSub.observer = observer; | ||
}).debounceTime(10).subscribe(debounceChannelNotifications); | ||
} | ||
function createNewChannelObservable(channel, channelSub) { | ||
return new Rx.Observable.create(function (observer) { | ||
var filters = getChannelFilters(channel); | ||
channelSub.initialized = true; | ||
joinChannel(channel, filters); | ||
joinChannel(channel); | ||
var channelsSubscription = vm.channels.observable.filter(createChannelFilter(channel)).subscribe(observer); | ||
@@ -116,6 +116,6 @@ return function () { | ||
channelsSubscription.unsubscribe(); | ||
channelSub.initialized = false; | ||
channelSub.serverNotification.unsubscribe(); | ||
delete vm.channels.subs[channel]; | ||
}; | ||
}).share(); | ||
}).publishReplay(1).refCount(); | ||
} | ||
@@ -168,14 +168,18 @@ | ||
return new Rx.Observable.create(function (observer) { | ||
if (channelSub.initialized) { | ||
changeChannelFilters(channelSub.name, vm.connection.options.filterJoinAction, filter); | ||
} | ||
channelSub.observer.next(channelSub.name); | ||
channelSub.filtersToSend.push({ | ||
action: vm.connection.options.filterJoinAction, | ||
filter: filter | ||
}); | ||
var filterSubscription = channelSub.observable.filter(createFilterFunction(filter)).subscribe(observer); | ||
return function () { | ||
channelSub.filters.splice(channelSub.filters.indexOf(filterSub), 1); | ||
if (channelSub.initialized) { | ||
changeChannelFilters(channelSub.name, vm.connection.options.filterLeaveAction, filter); | ||
} | ||
channelSub.observer.next(channelSub.name); | ||
channelSub.filtersToSend.push({ | ||
action: vm.connection.options.filterLeaveAction, | ||
filter: filter | ||
}); | ||
filterSubscription.unsubscribe(); | ||
}; | ||
}).share(); | ||
}).publishReplay(1).refCount(); | ||
} | ||
@@ -202,2 +206,25 @@ | ||
function debounceChannelNotifications(channel) { | ||
var filtersToSend = []; | ||
var filters = vm.channels.subs[channel].filtersToSend; | ||
var i = 0, l = filters.length; | ||
for (; i < l; i++) { | ||
var filterToSend = filters[i]; | ||
var existingFilter = findExistingFilter(filtersToSend, filterToSend.filter); | ||
if (existingFilter) { | ||
filtersToSend.splice(filtersToSend.indexOf(existingFilter), 1); | ||
} else { | ||
filtersToSend.push({ | ||
action: filterToSend.action, | ||
filter: filterToSend.filter | ||
}); | ||
} | ||
} | ||
vm.channels.subs[channel].filtersToSend = []; | ||
if (filtersToSend.length) { | ||
joinChannel(channel, filtersToSend); | ||
} | ||
} | ||
function leaveChannel(channel) { | ||
@@ -210,13 +237,2 @@ vm.connection.subject.next({ | ||
function changeChannelFilters(channel, actionType, filter) { | ||
vm.connection.subject.next({ | ||
action: vm.connection.options.channelJoinAction, | ||
channel: channel, | ||
filters: [{ | ||
action: actionType, | ||
filter: filter | ||
}] | ||
}); | ||
} | ||
function createChannelFilter(channel) { | ||
@@ -223,0 +239,0 @@ return function (data) { |
{ | ||
"name": "rxjs-ws-channels-filters", | ||
"version": "0.0.7", | ||
"version": "0.0.8", | ||
"description": "RxJS websockets implementation with channels and filters", | ||
@@ -5,0 +5,0 @@ "main": "dist/index.js", |
@@ -96,2 +96,3 @@ # rxjs-ws-channels-filters | ||
* make better documentation | ||
* make demo | ||
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
16312
324
98