@oracle/coherence
Advanced tools
Comparing version 1.1.3 to 1.1.4
@@ -501,4 +501,8 @@ import { extractor } from './extractors'; | ||
* The {@link Comparator} to apply against the extracted values. | ||
* @private | ||
*/ | ||
protected comparator?: AggregatorComparator; | ||
/** | ||
* The property that results will be ordered by. | ||
*/ | ||
protected property?: string; | ||
@@ -598,3 +602,3 @@ /** | ||
* statically, so their use is strongly encouraged in lieu of direct construction | ||
* of EntryAggregator} classes. | ||
* of {@link EntryAggregator} classes. | ||
*/ | ||
@@ -601,0 +605,0 @@ export declare class Aggregators { |
@@ -688,3 +688,3 @@ "use strict"; | ||
* statically, so their use is strongly encouraged in lieu of direct construction | ||
* of EntryAggregator} classes. | ||
* of {@link EntryAggregator} classes. | ||
*/ | ||
@@ -691,0 +691,0 @@ class Aggregators { |
@@ -9,2 +9,3 @@ /// <reference types="node" /> | ||
import { util } from './util'; | ||
import { Session } from "./session"; | ||
export declare namespace event { | ||
@@ -146,3 +147,3 @@ import Filter = filter.Filter; | ||
* | ||
* 1. A Map of stringified key => ListenerGroup, which is used to identify the | ||
* 1. A Map of string keys mapped to a ListenerGroup, which is used to identify the | ||
* group of callbacks for a single key. We stringify the key since Javascript | ||
@@ -184,3 +185,3 @@ * is not the same as Java's equals(). | ||
/** | ||
* The `NamedMap` that will used as the *source* of the events. | ||
* The `NamedMap` used as the *source* of the events. | ||
*/ | ||
@@ -230,15 +231,31 @@ protected namedMap: NamedMap<K, V>; | ||
/** | ||
* The `Session` associated with this event stream. | ||
* | ||
* @private | ||
*/ | ||
private session; | ||
/** | ||
* Callback for session reconnect events. | ||
* @private | ||
*/ | ||
private readonly onReconnect; | ||
/** | ||
* Callback for session disconnect events. | ||
* @private | ||
*/ | ||
private readonly onDisconnect; | ||
/** | ||
* Constructs a new `MapEventsManager` | ||
* | ||
* @param namedMap the {@link NamedMap} to manage events for | ||
* @param session the associated {@link Session} | ||
* @param client the `gRPC` interface for making requests | ||
* @param scope the {@link NamedMap} scope | ||
* @param serializer the {@link Serializer} used by this map | ||
* @param emitter the {@link EventEmitter} to use | ||
*/ | ||
constructor(namedMap: NamedMap<K, V>, scope: string, client: NamedCacheServiceClient, serializer: util.Serializer, emitter: EventEmitter); | ||
constructor(namedMap: NamedMap<K, V>, session: Session, client: NamedCacheServiceClient, serializer: util.Serializer, emitter: EventEmitter); | ||
/** | ||
* Create a BiDi stream lazily. | ||
*/ | ||
ensureStream(): Promise<ClientDuplexStream<MapListenerRequest, MapListenerResponse>>; | ||
ensureStream(): Promise<ClientDuplexStream<MapListenerRequest, MapListenerResponse> | null>; | ||
/** | ||
@@ -319,14 +336,6 @@ * Process incoming `gRPC` {@link MapListenerResponse}s. | ||
private onError; | ||
/** | ||
* Handles the end of an event stream. | ||
*/ | ||
private onEnd; | ||
/** | ||
* Handles a stream being cancelled. | ||
*/ | ||
private onCancel; | ||
} | ||
/** | ||
* Manages a collection of MapEventListeners. Handles sending out | ||
* MapListenerRequest subscriptions / unsubscriptions. Also, handles | ||
* MapListenerRequest subscriptions / un-subscriptions. Also, handles | ||
* notification of all the registered listeners. | ||
@@ -340,7 +349,2 @@ */ | ||
/** | ||
* Active status will be true if the subscribe request has been sent. | ||
* It will be false if a unsubscribe request has been sent. | ||
*/ | ||
isActive: boolean; | ||
/** | ||
* The key or the filter for which this group of callbacks will | ||
@@ -355,3 +359,3 @@ * receive events. | ||
* | ||
* Similarly if a listener is removed whose isLite == false but if all the | ||
* Similarly, if a listener is removed whose isLite == false but if all the | ||
* remaining listeners are interested in only isLite == true, then a | ||
@@ -477,3 +481,6 @@ * re-registration occurs. | ||
export enum SessionLifecycleEvent { | ||
CLOSED = "session_closed" | ||
CLOSED = "session_closed", | ||
DISCONNECTED = "session_disconnected", | ||
RECONNECTED = "session_reconnected", | ||
CONNECTED = "session_connected" | ||
} | ||
@@ -480,0 +487,0 @@ export {}; |
"use strict"; | ||
/* | ||
* Copyright (c) 2020 Oracle and/or its affiliates. | ||
* Copyright (c) 2020, 2022 Oracle and/or its affiliates. | ||
* | ||
* Licensed under the Universal Permissive License v 1.0 as shown at | ||
* http://oss.oracle.com/licenses/upl. | ||
* https://oss.oracle.com/licenses/upl. | ||
*/ | ||
@@ -23,2 +23,3 @@ var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) { | ||
const util_1 = require("./util"); | ||
const connectivity_state_1 = require("@grpc/grpc-js/build/src/connectivity-state"); | ||
var event; | ||
@@ -167,3 +168,3 @@ (function (event_1) { | ||
* | ||
* 1. A Map of stringified key => ListenerGroup, which is used to identify the | ||
* 1. A Map of string keys mapped to a ListenerGroup, which is used to identify the | ||
* group of callbacks for a single key. We stringify the key since Javascript | ||
@@ -192,8 +193,8 @@ * is not the same as Java's equals(). | ||
* @param namedMap the {@link NamedMap} to manage events for | ||
* @param session the associated {@link Session} | ||
* @param client the `gRPC` interface for making requests | ||
* @param scope the {@link NamedMap} scope | ||
* @param serializer the {@link Serializer} used by this map | ||
* @param emitter the {@link EventEmitter} to use | ||
*/ | ||
constructor(namedMap, scope, client, serializer, emitter) { | ||
constructor(namedMap, session, client, serializer, emitter) { | ||
/** | ||
@@ -221,2 +222,3 @@ * A Promise for lazily creating the duplex stream. The streamPromise | ||
this.emitter = emitter; | ||
this.session = session; | ||
// Initialize internal data structures. | ||
@@ -226,4 +228,27 @@ this.keyMap = new Map(); | ||
this.filterId2ListenerGroup = new Map(); | ||
this.reqFactory = new util_1.util.RequestFactory(this.mapName, scope, serializer); | ||
this.reqFactory = new util_1.util.RequestFactory(this.mapName, session.scope, serializer); | ||
this.streamPromise = this.ensureStream(); | ||
this.onDisconnect = () => __awaiter(this, void 0, void 0, function* () { | ||
let st = null; | ||
try { | ||
st = yield this.streamPromise; | ||
} | ||
catch (err) { | ||
// ignore | ||
} | ||
if (st !== null) { | ||
st.cancel(); | ||
} | ||
this.streamPromise = null; | ||
}); | ||
session.on(SessionLifecycleEvent.DISCONNECTED, this.onDisconnect); | ||
this.onReconnect = () => { | ||
this.keyMap.forEach((value) => __awaiter(this, void 0, void 0, function* () { | ||
yield value.doSubscribe(value.registeredIsLite); | ||
})); | ||
this.filterMap.forEach((value) => __awaiter(this, void 0, void 0, function* () { | ||
yield value.doSubscribe(value.registeredIsLite); | ||
})); | ||
}; | ||
session.on(SessionLifecycleEvent.RECONNECTED, this.onReconnect); | ||
} | ||
@@ -236,28 +261,36 @@ /** | ||
if (self.streamPromise == null) { | ||
const bidiStream = self.client.events(); | ||
bidiStream.on('data', (resp) => self.handleResponse(resp)); | ||
bidiStream.on('end', () => self.onEnd()); | ||
bidiStream.on('error', (err) => self.onError(err)); | ||
bidiStream.on('cancelled', () => self.onCancel()); | ||
// Create a SubscribeRequest (with RequestType.INIT) | ||
const request = self.reqFactory.mapEventSubscribe(); | ||
const initUid = request.getUid(); | ||
self.streamPromise = new Promise((resolve, reject) => { | ||
// Setup pending subscriptions map so that when the | ||
// subscribe response comes back, or an error occurs | ||
// we can resolve or reject the connection. | ||
self.pendingSubscriptions.set(initUid, (uid, resp, err) => { | ||
self.pendingSubscriptions.delete(uid); | ||
self.streamPromise = new Promise((resolve) => { | ||
self.client.waitForReady(Date.now() + this.session.options.readyTimeoutInMillis, (err) => { | ||
if (err) { | ||
reject(err); | ||
this.streamPromise = this.ensureStream(); | ||
resolve(null); | ||
} | ||
else { | ||
// If we received a successful subscribed response, | ||
// the connection is initialized. So resolve it. | ||
resolve(bidiStream); | ||
} | ||
const bidiStream = self.client.events(); | ||
bidiStream.on('data', (resp) => self.handleResponse(resp)); | ||
bidiStream.on('error', (err) => self.onError(err)); | ||
// Create a SubscribeRequest (with RequestType.INIT) | ||
const request = self.reqFactory.mapEventSubscribe(); | ||
const initUid = request.getUid(); | ||
// If we received a successful subscribed response, | ||
// the connection is initialized. So resolve it. | ||
resolve(bidiStream); | ||
// Setup pending subscriptions map so that when the | ||
// subscribe response comes back, or an error occurs | ||
// we can resolve or reject the connection. | ||
self.pendingSubscriptions.set(initUid, (uid, resp, err) => { | ||
self.pendingSubscriptions.delete(uid); | ||
if (err) { | ||
this.streamPromise = this.ensureStream(); | ||
resolve(null); | ||
} | ||
else { | ||
// If we received a successful subscribed response, | ||
// the connection is initialized. So resolve it. | ||
resolve(bidiStream); | ||
} | ||
}); | ||
// Now that we have set up the pending subscriptions map, | ||
// write the init request. | ||
bidiStream.write(request); | ||
}); | ||
// Now that we have set up the pending subscriptions map, | ||
// write the init request. | ||
bidiStream.write(request); | ||
}); | ||
@@ -388,13 +421,16 @@ } | ||
.then((stream) => { | ||
return new Promise((resolve, reject) => { | ||
self.pendingSubscriptions.set(request.getUid(), (uid, resp, err) => { | ||
self.pendingSubscriptions.delete(uid); | ||
if (err) { | ||
reject(err); | ||
} | ||
else { | ||
return new Promise((resolve) => { | ||
if (!stream) { | ||
resolve(); | ||
} | ||
else { | ||
self.pendingSubscriptions.set(request.getUid(), (uid, resp, err) => { | ||
self.pendingSubscriptions.delete(uid); | ||
if (err) { | ||
this.streamPromise = this.ensureStream(); | ||
} | ||
resolve(); | ||
} | ||
}); | ||
stream.write(request); | ||
}); | ||
stream.write(request); | ||
} | ||
}); | ||
@@ -411,13 +447,24 @@ }); | ||
self.markedForClose = true; | ||
const bidiStream = yield self.streamPromise; | ||
let bidiStream = null; | ||
try { | ||
bidiStream = yield self.streamPromise; | ||
} | ||
catch (err) { | ||
} | ||
self.session.removeListener(SessionLifecycleEvent.CONNECTED, self.onReconnect); | ||
self.session.removeListener(SessionLifecycleEvent.RECONNECTED, self.onReconnect); | ||
self.session.removeListener(SessionLifecycleEvent.DISCONNECTED, self.onDisconnect); | ||
yield new Promise((resolve) => __awaiter(this, void 0, void 0, function* () { | ||
// Setup an event handler for 'error' as calling cancel() on | ||
// Add an event handler for 'error' as calling cancel() on | ||
// the bidi stream will result in a CANCELLED status. | ||
bidiStream.on('error', (err) => { | ||
if (err.toString().indexOf('CANCELLED')) { | ||
self.streamPromise = null; | ||
resolve(); | ||
} | ||
}); | ||
bidiStream.end(); | ||
if (bidiStream) { | ||
bidiStream.on('error', (err) => { | ||
if (err.toString().indexOf('CANCELLED')) { | ||
self.streamPromise = null; | ||
resolve(); | ||
} | ||
}); | ||
bidiStream.end(); | ||
} | ||
resolve(); | ||
})); | ||
@@ -462,21 +509,14 @@ } | ||
if (!this.markedForClose) { | ||
this.emitter.emit('error', this.mapName + ': Received onError', err); | ||
if (this.client.getChannel().getConnectivityState(false) == connectivity_state_1.ConnectivityState.READY) { | ||
// stream cancellation, but channel still okay | ||
this.streamPromise = null; | ||
this.keyMap.forEach((value) => __awaiter(this, void 0, void 0, function* () { | ||
yield value.doSubscribe(value.registeredIsLite); | ||
})); | ||
this.filterMap.forEach((value) => __awaiter(this, void 0, void 0, function* () { | ||
yield value.doSubscribe(value.registeredIsLite); | ||
})); | ||
} | ||
} | ||
} | ||
/** | ||
* Handles the end of an event stream. | ||
*/ | ||
onEnd() { | ||
if (!this.markedForClose) { | ||
this.emitter.emit('error', this.mapName + ': Received onEnd'); | ||
} | ||
} | ||
/** | ||
* Handles a stream being cancelled. | ||
*/ | ||
onCancel() { | ||
if (!this.markedForClose) { | ||
this.emitter.emit('error', '** Received onCancel'); | ||
} | ||
} | ||
} | ||
@@ -494,3 +534,3 @@ /** | ||
* Manages a collection of MapEventListeners. Handles sending out | ||
* MapListenerRequest subscriptions / unsubscriptions. Also, handles | ||
* MapListenerRequest subscriptions / un-subscriptions. Also, handles | ||
* notification of all the registered listeners. | ||
@@ -507,7 +547,2 @@ */ | ||
/** | ||
* Active status will be true if the subscribe request has been sent. | ||
* It will be false if a unsubscribe request has been sent. | ||
*/ | ||
this.isActive = true; // Initially active. | ||
/** | ||
* The current value of isLite that is registered with the cache. | ||
@@ -517,3 +552,3 @@ * If a new listener is added to the group that requires isLite == false | ||
* | ||
* Similarly if a listener is removed whose isLite == false but if all the | ||
* Similarly, if a listener is removed whose isLite == false but if all the | ||
* remaining listeners are interested in only isLite == true, then a | ||
@@ -762,4 +797,7 @@ * re-registration occurs. | ||
SessionLifecycleEvent["CLOSED"] = "session_closed"; | ||
SessionLifecycleEvent["DISCONNECTED"] = "session_disconnected"; | ||
SessionLifecycleEvent["RECONNECTED"] = "session_reconnected"; | ||
SessionLifecycleEvent["CONNECTED"] = "session_connected"; | ||
})(SessionLifecycleEvent = event_1.SessionLifecycleEvent || (event_1.SessionLifecycleEvent = {})); | ||
})(event = exports.event || (exports.event = {})); | ||
//# sourceMappingURL=events.js.map |
@@ -655,3 +655,3 @@ import { extractor } from './extractors'; | ||
/** | ||
* A {@code java.util.function.Predicate} based {@link ExtractorFilter}. | ||
* A predicate based {@link ExtractorFilter}. | ||
*/ | ||
@@ -658,0 +658,0 @@ class PredicateFilter extends ExtractorFilter { |
@@ -752,3 +752,3 @@ "use strict"; | ||
/** | ||
* A {@code java.util.function.Predicate} based {@link ExtractorFilter}. | ||
* A predicate based {@link ExtractorFilter}. | ||
*/ | ||
@@ -755,0 +755,0 @@ class PredicateFilter extends ExtractorFilter { |
/// <reference types="node" /> | ||
import { ClientReadableStream, ServiceError } from '@grpc/grpc-js'; | ||
import { ClientReadableStream, Deadline, ServiceError } from '@grpc/grpc-js'; | ||
import { EventEmitter } from 'events'; | ||
@@ -56,3 +56,3 @@ import { BytesValue } from 'google-protobuf/google/protobuf/wrappers_pb'; | ||
/** | ||
* Signifies whether or not this `NamedMap` has been destroyed. | ||
* Signifies if this `NamedMap` has been destroyed. | ||
*/ | ||
@@ -104,3 +104,3 @@ readonly destroyed: boolean; | ||
* @return a `Promise` resolving to `true` if the key is mapped | ||
* to the specified value value, or `false` if it does not | ||
* to the specified value, or `false` if it does not | ||
*/ | ||
@@ -148,3 +148,3 @@ hasEntry(key: K, value: V): Promise<boolean>; | ||
/** | ||
* Copies all of the mappings from the specified map to this map | ||
* Copies all mappings from the specified map to this map | ||
* | ||
@@ -420,3 +420,3 @@ * @param map the map to copy from | ||
* Unlike the {@link keySet()} method, the set returned by this method may | ||
* not be backed by the map, so changes to the set may not reflected in the | ||
* not be backed by the map, so changes to the set may not be reflected in the | ||
* map, and vice-versa. | ||
@@ -426,7 +426,6 @@ * | ||
* entries of this map should satisfy | ||
* @param comparator the comparator for sorting | ||
* | ||
* @return a set of keys for entries that satisfy the specified criteria | ||
*/ | ||
keys(filter: Filter, comparator?: Comparator): Promise<RemoteSet<K>>; | ||
keys(filter: Filter): Promise<RemoteSet<K>>; | ||
/** | ||
@@ -560,3 +559,3 @@ * Returns a Set view of the mappings contained in this map. | ||
* | ||
* @return a `Promise` resolving to the the previous value associated with the specified key, or | ||
* @return a `Promise` resolving to the previous value associated with the specified key, or | ||
* `null` if there was no mapping for the key. (A `null` return can also indicate that the map previously | ||
@@ -589,3 +588,3 @@ * associated `null` with the key, if the implementation supports `null` values.) | ||
*/ | ||
private session; | ||
private readonly session; | ||
/** | ||
@@ -714,3 +713,3 @@ * The name of the Coherence `NamedCache`. | ||
*/ | ||
keys(filter?: Filter, comparator?: Comparator): Promise<RemoteSet<K>>; | ||
keys(filter?: Filter): Promise<RemoteSet<K>>; | ||
/** | ||
@@ -789,2 +788,13 @@ * @inheritDoc | ||
/** | ||
* Create a promise wrapping the provided execution logic. This logic will be called | ||
* when the `gRPC` client's channel is ready. | ||
* | ||
* @param logic execution logic | ||
*/ | ||
promisify<T>(logic: (resolve: (value: T | PromiseLike<T>) => void, reject: (reason?: any) => void) => void): Promise<T>; | ||
/** | ||
* The deadline for a `gRPC` channel to be ready. | ||
*/ | ||
readyTimeout(): Deadline; | ||
/** | ||
* Obtain the next page of entries from the cache. | ||
@@ -791,0 +801,0 @@ * |
@@ -6,4 +6,13 @@ "use strict"; | ||
* Licensed under the Universal Permissive License v 1.0 as shown at | ||
* http://oss.oracle.com/licenses/upl. | ||
* https://oss.oracle.com/licenses/upl. | ||
*/ | ||
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) { | ||
function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); } | ||
return new (P || (P = Promise))(function (resolve, reject) { | ||
function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } } | ||
function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } } | ||
function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); } | ||
step((generator = generator.apply(thisArg, _arguments || [])).next()); | ||
}); | ||
}; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
@@ -97,3 +106,3 @@ exports.NamedCacheClient = void 0; | ||
// Now open the events channel. | ||
this.mapEventsHandler = new MapEventsManager(this, this.session.scope, this.client, this.serializer, this.internalEmitter); | ||
this.mapEventsHandler = new MapEventsManager(this, this.session, this.client, this.serializer, this.internalEmitter); | ||
} | ||
@@ -117,3 +126,3 @@ /** | ||
const self = this; | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
const request = new messages_pb_1.IsEmptyRequest(); | ||
@@ -131,3 +140,3 @@ request.setCache(this.cacheName); | ||
get size() { | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
const request = new messages_pb_1.SizeRequest(); | ||
@@ -178,3 +187,3 @@ request.setCache(this.cacheName); | ||
const self = this; | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
const request = self.requestFactory.containsEntry(key, value); | ||
@@ -200,3 +209,3 @@ self.client.containsEntry(request, new grpc_js_1.Metadata(), this.session.callOptions(), (err, resp) => { | ||
} | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
self.client.aggregate(request, new grpc_js_1.Metadata(), this.session.callOptions(), (err, resp) => { | ||
@@ -230,3 +239,3 @@ if (err) { | ||
const self = this; | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
self.client.invoke(self.requestFactory.invoke(key, processor), (err, resp) => { | ||
@@ -275,3 +284,3 @@ if (err) { | ||
if (util_1.util.isIterableType(keysOrFilter)) { | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
this.getAll(keysOrFilter) | ||
@@ -284,3 +293,3 @@ .then(entries => entries.forEach((value, key) => action(value, key, this))) | ||
else { | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
this.entries(keysOrFilter) | ||
@@ -297,3 +306,3 @@ .then(entries => { | ||
} | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
this.entries(filters_1.Filters.always()) | ||
@@ -350,3 +359,3 @@ .then(entries => { | ||
const request = this.requestFactory.addIndex(extractor, ordered, comparator); | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
self.client.addIndex(request, new grpc_js_1.Metadata(), this.session.callOptions(), (err) => { | ||
@@ -368,3 +377,3 @@ self.resolveValue(resolve, reject, err); | ||
const call = self.client.entrySet(request, this.session.callOptions()); | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
call.on(RequestStateEvent.DATA, function (e) { | ||
@@ -383,3 +392,3 @@ const entry = new NamedCacheEntry(e.getKey_asU8(), e.getValue_asU8(), self.getRequestFactory().serializer); | ||
*/ | ||
keys(filter, comparator) { | ||
keys(filter) { | ||
const self = this; | ||
@@ -392,3 +401,3 @@ if (!filter) { | ||
const call = self.client.keySet(request, this.session.callOptions()); | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
call.on(RequestStateEvent.DATA, function (r) { | ||
@@ -412,3 +421,3 @@ const k = self.getRequestFactory().serializer.deserialize(r.getValue_asU8()); | ||
const request = this.requestFactory.removeIndex(extractor); | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
self.client.removeIndex(request, (err) => { | ||
@@ -430,3 +439,3 @@ self.resolveValue(resolve, reject, err); | ||
const call = self.client.values(request, this.session.callOptions()); | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
call.on(RequestStateEvent.DATA, function (b) { | ||
@@ -446,3 +455,3 @@ set.add(self.getRequestFactory().serializer.deserialize(b.getValue_asU8())); | ||
const self = this; | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
self.client.clear(self.requestFactory.clear(), new grpc_js_1.Metadata(), this.session.callOptions(), (err) => { | ||
@@ -459,3 +468,3 @@ self.resolveValue(resolve, reject, err); | ||
const request = self.requestFactory.containsKey(key); | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
self.client.containsKey(request, new grpc_js_1.Metadata(), this.session.callOptions(), (err, resp) => { | ||
@@ -473,3 +482,3 @@ // @ts-ignore | ||
const request = this.requestFactory.containsValue(value); | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
self.client.containsValue(request, new grpc_js_1.Metadata(), this.session.callOptions(), (err, resp) => { | ||
@@ -500,3 +509,3 @@ // @ts-ignore | ||
const self = this; | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
self.client.get(self.requestFactory.get(key), new grpc_js_1.Metadata(), this.session.callOptions(), (err, resp) => { | ||
@@ -518,3 +527,3 @@ if (resp && resp.getPresent()) { | ||
const self = this; | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
self.client.put(self.requestFactory.put(key, value, ttl), new grpc_js_1.Metadata(), this.session.callOptions(), (err, resp) => { | ||
@@ -531,3 +540,3 @@ // @ts-ignore | ||
const self = this; | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
self.client.putAll(self.requestFactory.putAll(map), new grpc_js_1.Metadata(), this.session.callOptions(), (err) => { | ||
@@ -544,3 +553,3 @@ self.resolveValue(resolve, reject, err); | ||
const request = self.requestFactory.putIfAbsent(key, value, ttl); | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
self.client.putIfAbsent(request, new grpc_js_1.Metadata(), this.session.callOptions(), (err, resp) => { | ||
@@ -557,3 +566,3 @@ // @ts-ignore | ||
const self = this; | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
self.client.remove(this.requestFactory.remove(key), new grpc_js_1.Metadata(), this.session.callOptions(), (err, resp) => { | ||
@@ -571,3 +580,3 @@ // @ts-ignore | ||
const request = this.requestFactory.removeMapping(key, value); | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
self.client.removeMapping(request, new grpc_js_1.Metadata(), this.session.callOptions(), (err, resp) => { | ||
@@ -585,3 +594,3 @@ // @ts-ignore | ||
const request = this.requestFactory.replace(key, value); | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
self.client.replace(request, new grpc_js_1.Metadata(), this.session.callOptions(), (err, resp) => { | ||
@@ -599,3 +608,3 @@ // @ts-ignore | ||
const request = this.requestFactory.replaceMapping(key, value, newValue); | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
self.client.replaceMapping(request, new grpc_js_1.Metadata(), this.session.callOptions(), (err, resp) => { | ||
@@ -613,18 +622,18 @@ // @ts-ignore | ||
if (this.active) { | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
// Note that this listener will be after the default listeners | ||
// that were setup in the constructor. So once this receives | ||
// that were registered in the constructor. So once this receives | ||
// the event, we can be sure that *all other* listeners have | ||
// be notified!! | ||
self.internalEmitter.once(MapLifecycleEvent.DESTROYED, () => resolve()); | ||
// Now that we have setup our 'once & only once' listener, we | ||
// Now that we have registered our 'once & only once' listener, we | ||
// can now send out the 'truncate' request. The handleResponse() | ||
// method will generate the appropriate event on the internalEmitter | ||
// for which our 'once & only once' listener is setup. | ||
// for which our 'once & only once' listener is registered. | ||
const request = self.requestFactory.destroy(); | ||
self.client.destroy(request, new grpc_js_1.Metadata(), self.session.callOptions(), (err) => { | ||
self.client.destroy(request, new grpc_js_1.Metadata(), self.session.callOptions(), (err) => __awaiter(this, void 0, void 0, function* () { | ||
if (err) { | ||
reject(err); | ||
} | ||
}); | ||
})); | ||
}); | ||
@@ -639,11 +648,11 @@ } | ||
const self = this; | ||
return new Promise((resolve) => { | ||
return this.promisify((resolve) => { | ||
// Note that this listener will be after the default listeners | ||
// that were setup in the constructor. So once this receives | ||
// that were registered in the constructor. So once this receives | ||
// the event, we can be sure that *all other* listeners have | ||
// be notified!! | ||
self.internalEmitter.once(MapLifecycleEvent.RELEASED, () => resolve()); | ||
// Now that we have setup our 'once & only once' listener, we | ||
// Now that we have registered our 'once & only once' listener, we | ||
// can emit the MapLifecycleEvent.RELEASED event on the internalEmitter | ||
// for which our 'once & only once' listener is setup. | ||
// for which our 'once & only once' listener is registered. | ||
self.internalEmitter.emit(MapLifecycleEvent.RELEASED, self.cacheName); | ||
@@ -657,5 +666,5 @@ }); | ||
const self = this; | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
// Note that this listener will be after the default listeners | ||
// that were setup in the constructor. So once this receives | ||
// that were registered in the constructor. So once this receives | ||
// the event, we can be sure that *all other* listeners have | ||
@@ -666,6 +675,6 @@ // be notified!! | ||
}); | ||
// Now that we have setup our 'once & only once' listener, we | ||
// Now that we have registered our 'once & only once' listener, we | ||
// can now send out the 'truncate' request. The handleResponse() | ||
// method will generate the appropriate event on the internalEmitter | ||
// for which our 'once & only once' listener is setup. | ||
// for which our 'once & only once' listener is registered. | ||
const request = new messages_pb_1.TruncateRequest(); | ||
@@ -682,2 +691,29 @@ request.setCache(this.cacheName); | ||
/** | ||
* Create a promise wrapping the provided execution logic. This logic will be called | ||
* when the `gRPC` client's channel is ready. | ||
* | ||
* @param logic execution logic | ||
*/ | ||
promisify(logic) { | ||
const self = this; | ||
return new Promise((resolve, reject) => { | ||
if (!self.active) { | ||
let message = 'Cache [' + this.cacheName + '] has been ' + (this.released ? 'release.' : 'destroyed.'); | ||
reject(new Error(message)); | ||
} | ||
self.client.waitForReady(self.readyTimeout(), error => { | ||
if (error) { | ||
reject(error); | ||
} | ||
logic(resolve, reject); | ||
}); | ||
}); | ||
} | ||
/** | ||
* The deadline for a `gRPC` channel to be ready. | ||
*/ | ||
readyTimeout() { | ||
return Date.now() + this.session.options.readyTimeoutInMillis; | ||
} | ||
/** | ||
* Obtain the next page of entries from the cache. | ||
@@ -710,2 +746,3 @@ * | ||
if (cacheName == self.cacheName) { | ||
// noinspection JSIgnoredPromiseFromCall | ||
self.mapEventsHandler.closeEventStream(); | ||
@@ -723,2 +760,3 @@ self._destroyed = true; | ||
if (cacheName == self.cacheName) { | ||
// noinspection JSIgnoredPromiseFromCall | ||
self.mapEventsHandler.closeEventStream(); | ||
@@ -725,0 +763,0 @@ self._released = true; |
@@ -33,2 +33,9 @@ /// <reference types="node" /> | ||
/** | ||
* Defines the timeout, in `milliseconds`, that will be applied when waiting for | ||
* the `gRPC` stream to become ready. | ||
* | ||
* If not explicitly set, this defaults to `60000`. | ||
*/ | ||
private _readyTimeoutInMillis; | ||
/** | ||
* The serialization format. Currently, this is always `json`. | ||
@@ -38,2 +45,8 @@ */ | ||
/** | ||
* The `gRPC` `ChannelOptions`. | ||
* | ||
* @see https://github.com/grpc/grpc-node/tree/master/packages/grpc-js | ||
*/ | ||
private _channelOptions; | ||
/** | ||
* A function taking no arguments returning a gRPC CallOptions instance. | ||
@@ -78,2 +91,14 @@ * If not explicitly configured, then the call options will simply define | ||
/** | ||
* Returns the ready timeout in `milliseconds`. | ||
* | ||
* @return the ready timeout in `milliseconds` | ||
*/ | ||
get readyTimeoutInMillis(): number; | ||
/** | ||
* Set the ready timeout in `milliseconds`. If the timeout value is zero or less, then no timeout will be applied. | ||
* | ||
* @param timeout the request timeout in `milliseconds` | ||
*/ | ||
set readyTimeoutInMillis(timeout: number); | ||
/** | ||
* Return the scope name used to link this `Session` with to the corresponding | ||
@@ -95,2 +120,18 @@ * `ConfigurableCacheFactory` on the server. | ||
/** | ||
* Return the `gRPC` `ChannelOptions`. | ||
*/ | ||
get channelOptions(): { | ||
[p: string]: any; | ||
}; | ||
/** | ||
* Set the `gRPC` `ChannelOptions`. | ||
* | ||
* @param value the `gRPC` `ChannelOptions` | ||
* | ||
* @see https://github.com/grpc/grpc-node/tree/master/packages/grpc-js | ||
*/ | ||
set channelOptions(value: { | ||
[p: string]: any; | ||
}); | ||
/** | ||
* The serialization format used by this session. This library currently supports JSON serialization only, thus | ||
@@ -167,7 +208,2 @@ * this always returns 'json'. | ||
/** | ||
* The gRPC channel options. See [documentation](https://grpc.github.io/grpc/core/group__grpc__arg__keys.html) | ||
* to obtain a list of possible options | ||
*/ | ||
private _channelOptions?; | ||
/** | ||
* Returns `true` if TLS is to be enabled. | ||
@@ -221,17 +257,2 @@ * | ||
/** | ||
* Return the defined gRPC channel options. | ||
*/ | ||
get channelOptions(): { | ||
[key: string]: string | number; | ||
} | undefined; | ||
/** | ||
* Set the gRPC channel options. See [documentation](https://grpc.github.io/grpc/core/group__grpc__arg__keys.html) | ||
* to obtain a list of possible options | ||
* | ||
* @param value the gRPC channel options | ||
*/ | ||
set channelOptions(value: { | ||
[key: string]: string | number; | ||
} | undefined); | ||
/** | ||
* Once called, no further mutations can be made. | ||
@@ -251,2 +272,6 @@ * @hidden | ||
* 3. {@link MapLifecycleEvent.RELEASED}: When the underlying cache is released | ||
* 4. {@link event.SessionLifecycleEvent.CONNECT}`: when the Session detects the underlying `gRPC` channel has connected. | ||
* 4. {@link event.SessionLifecycleEvent.DISCONNECT}`: when the Session detects the underlying `gRPC` channel has disconnected | ||
* 5. {@link event.SessionLifecycleEvent.DISCONNECT}`: when the Session detects the underlying `gRPC` channel has re-connected | ||
* 5. {@link event.SessionLifecycleEvent.CLOSED}`: when the Session has been closed | ||
*/ | ||
@@ -263,2 +288,6 @@ export declare class Session extends EventEmitter { | ||
/** | ||
* The default `gRPC` stream ready timeout. | ||
*/ | ||
static readonly DEFAULT_READY_TIMEOUT = 30000; | ||
/** | ||
* The default scope. | ||
@@ -298,6 +327,2 @@ */ | ||
/** | ||
* The set of options to use while creating a gRPC Channel. | ||
*/ | ||
private readonly _channelOptions; | ||
/** | ||
* The set of options to use while creating a {@link NamedCacheClient}. | ||
@@ -304,0 +329,0 @@ */ |
"use strict"; | ||
/* | ||
* Copyright (c) 2020 Oracle and/or its affiliates. | ||
* Copyright (c) 2020, 2022 Oracle and/or its affiliates. | ||
* | ||
* Licensed under the Universal Permissive License v 1.0 as shown at | ||
* http://oss.oracle.com/licenses/upl. | ||
* https://oss.oracle.com/licenses/upl. | ||
*/ | ||
@@ -25,2 +25,3 @@ var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) { | ||
const util_1 = require("./util"); | ||
const connectivity_state_1 = require("@grpc/grpc-js/build/src/connectivity-state"); | ||
/** | ||
@@ -40,4 +41,6 @@ * Supported {@link Session} options. | ||
this._requestTimeoutInMillis = Session.DEFAULT_REQUEST_TIMEOUT; | ||
this._readyTimeoutInMillis = Session.DEFAULT_READY_TIMEOUT; | ||
this._format = Session.DEFAULT_FORMAT; | ||
this._scope = Session.DEFAULT_SCOPE; | ||
this._channelOptions = {}; | ||
this._tls = new TlsOptions(); | ||
@@ -97,2 +100,24 @@ const self = this; | ||
/** | ||
* Returns the ready timeout in `milliseconds`. | ||
* | ||
* @return the ready timeout in `milliseconds` | ||
*/ | ||
get readyTimeoutInMillis() { | ||
return this._readyTimeoutInMillis; | ||
} | ||
/** | ||
* Set the ready timeout in `milliseconds`. If the timeout value is zero or less, then no timeout will be applied. | ||
* | ||
* @param timeout the request timeout in `milliseconds` | ||
*/ | ||
set readyTimeoutInMillis(timeout) { | ||
if (this.locked) { | ||
return; | ||
} | ||
if (timeout <= 0) { | ||
timeout = Number.POSITIVE_INFINITY; | ||
} | ||
this._readyTimeoutInMillis = timeout; | ||
} | ||
/** | ||
* Return the scope name used to link this `Session` with to the corresponding | ||
@@ -118,2 +143,21 @@ * `ConfigurableCacheFactory` on the server. | ||
/** | ||
* Return the `gRPC` `ChannelOptions`. | ||
*/ | ||
get channelOptions() { | ||
return this._channelOptions; | ||
} | ||
/** | ||
* Set the `gRPC` `ChannelOptions`. | ||
* | ||
* @param value the `gRPC` `ChannelOptions` | ||
* | ||
* @see https://github.com/grpc/grpc-node/tree/master/packages/grpc-js | ||
*/ | ||
set channelOptions(value) { | ||
if (this.locked) { | ||
return; | ||
} | ||
this._channelOptions = value; | ||
} | ||
/** | ||
* The serialization format used by this session. This library currently supports JSON serialization only, thus | ||
@@ -275,20 +319,2 @@ * this always returns 'json'. | ||
/** | ||
* Return the defined gRPC channel options. | ||
*/ | ||
get channelOptions() { | ||
return this._channelOptions; | ||
} | ||
/** | ||
* Set the gRPC channel options. See [documentation](https://grpc.github.io/grpc/core/group__grpc__arg__keys.html) | ||
* to obtain a list of possible options | ||
* | ||
* @param value the gRPC channel options | ||
*/ | ||
set channelOptions(value) { | ||
if (this.locked) { | ||
return; | ||
} | ||
this._channelOptions = value; | ||
} | ||
/** | ||
* Once called, no further mutations can be made. | ||
@@ -311,2 +337,6 @@ * @hidden | ||
* 3. {@link MapLifecycleEvent.RELEASED}: When the underlying cache is released | ||
* 4. {@link event.SessionLifecycleEvent.CONNECT}`: when the Session detects the underlying `gRPC` channel has connected. | ||
* 4. {@link event.SessionLifecycleEvent.DISCONNECT}`: when the Session detects the underlying `gRPC` channel has disconnected | ||
* 5. {@link event.SessionLifecycleEvent.DISCONNECT}`: when the Session detects the underlying `gRPC` channel has re-connected | ||
* 5. {@link event.SessionLifecycleEvent.CLOSED}`: when the Session has been closed | ||
*/ | ||
@@ -336,6 +366,2 @@ class Session extends events_1.EventEmitter { | ||
/** | ||
* The set of options to use while creating a gRPC Channel. | ||
*/ | ||
this._channelOptions = {}; | ||
/** | ||
* The set of options to use while creating a {@link NamedCacheClient}. | ||
@@ -352,10 +378,49 @@ */ | ||
} | ||
// If TLS is enabled then create a SSL channel credentials object. | ||
// If TLS is enabled then create an SSL channel credentials object. | ||
this._channelCredentials = this.options.tls.enabled | ||
? grpc_js_1.credentials.createSsl(Session.readFile('caCert', this.options.tls.caCertPath), Session.readFile('clientKey', this.options.tls.clientKeyPath), Session.readFile('clientCert', this.options.tls.clientCertPath)) | ||
: grpc_js_1.credentials.createInsecure(); | ||
this._channel = new grpc_js_1.Channel(this.options.address, this.channelCredentials, this._channelOptions); | ||
let channel = this._channel = new grpc_js_1.Channel(this.options.address, this.channelCredentials, this.options.channelOptions); | ||
// register handler to monitoring gRPC channel state. | ||
// When transitioning from READY to any other state, other than SHUTDOWN, | ||
// emit the `disconnect` event. | ||
// When transitioning from any other state, | ||
// other than SHUTDOWN, to READY, emit the 'reconnect' event. | ||
let connected = false; | ||
let firstConnect = true; | ||
let lastState = 0; | ||
let callback = () => __awaiter(this, void 0, void 0, function* () { | ||
let state = channel.getConnectivityState(false); | ||
lastState = state; | ||
if (state === connectivity_state_1.ConnectivityState.SHUTDOWN) { | ||
// nothing to do | ||
return; | ||
} | ||
else if (state === connectivity_state_1.ConnectivityState.READY) { | ||
if (!firstConnect && !connected) { | ||
this.emit(events_2.event.SessionLifecycleEvent.RECONNECTED); | ||
connected = true; | ||
} | ||
else if (firstConnect && !connected) { | ||
this.emit(events_2.event.SessionLifecycleEvent.CONNECTED); | ||
firstConnect = false; | ||
connected = true; | ||
} | ||
} | ||
else { | ||
if (connected) { | ||
this.emit(events_2.event.SessionLifecycleEvent.DISCONNECTED); | ||
connected = false; | ||
} | ||
} | ||
let deadline = Number.POSITIVE_INFINITY; | ||
if (state !== connectivity_state_1.ConnectivityState.READY) { | ||
deadline = Date.now() + this._sessionOptions.readyTimeoutInMillis; | ||
} | ||
channel.watchConnectivityState(state, deadline, callback); | ||
}); | ||
channel.watchConnectivityState(connectivity_state_1.ConnectivityState.READY, Number.POSITIVE_INFINITY, callback); | ||
// channel will now be shared by all caches created by this session | ||
this._clientOptions = { | ||
channelOverride: this._channel | ||
channelOverride: channel | ||
}; | ||
@@ -600,2 +665,6 @@ this.sessionClosedPromise = new Promise((resolve) => { | ||
/** | ||
* The default `gRPC` stream ready timeout. | ||
*/ | ||
Session.DEFAULT_READY_TIMEOUT = 30000; | ||
/** | ||
* The default scope. | ||
@@ -602,0 +671,0 @@ */ |
{ | ||
"name": "@oracle/coherence", | ||
"version": "1.1.3", | ||
"version": "1.1.4", | ||
"license": "UPL-1.0", | ||
@@ -34,7 +34,7 @@ "main": "lib/index.js", | ||
"clean": "rm -rf lib docs coverage .nyc_output oracle-*tgz", | ||
"test": "npm run compile; mocha 'test/**.js' --recursive", | ||
"test": "npm run compile; mocha 'test/**.js' --recursive --exit", | ||
"coh-up": "bin/docker-utils.sh -u", | ||
"coh-down": "bin/docker-utils.sh -d", | ||
"coh-clean": "bin/docker-utils.sh -c", | ||
"coverage": "nyc mocha 'test/**.js'", | ||
"coverage": "nyc mocha 'test/**.js' --exit", | ||
"wait": "sleep 30", | ||
@@ -41,0 +41,0 @@ "dist": "npm run compile; typedoc; npm pack", |
@@ -30,3 +30,3 @@ <!-- | ||
```bash | ||
docker run -d -p 1408:1408 oraclecoherence/coherence-ce:22.06 | ||
docker run -d -p 1408:1408 oraclecoherence/coherence-ce:22.06.2 | ||
``` | ||
@@ -33,0 +33,0 @@ |
@@ -657,4 +657,9 @@ /* | ||
* The {@link Comparator} to apply against the extracted values. | ||
* @private | ||
*/ | ||
protected comparator?: AggregatorComparator | ||
/** | ||
* The property that results will be ordered by. | ||
*/ | ||
protected property?: string | ||
@@ -808,3 +813,3 @@ | ||
* statically, so their use is strongly encouraged in lieu of direct construction | ||
* of EntryAggregator} classes. | ||
* of {@link EntryAggregator} classes. | ||
*/ | ||
@@ -811,0 +816,0 @@ export class Aggregators { |
/* | ||
* Copyright (c) 2020 Oracle and/or its affiliates. | ||
* Copyright (c) 2020, 2022 Oracle and/or its affiliates. | ||
* | ||
* Licensed under the Universal Permissive License v 1.0 as shown at | ||
* http://oss.oracle.com/licenses/upl. | ||
* https://oss.oracle.com/licenses/upl. | ||
*/ | ||
import { ClientDuplexStream } from '@grpc/grpc-js' | ||
import { EventEmitter } from 'events' | ||
import { filter } from './filters' | ||
import { MapEventResponse, MapListenerRequest, MapListenerResponse } from './grpc/messages_pb' | ||
import { NamedCacheServiceClient } from './grpc/services_grpc_pb' | ||
import { NamedCache, NamedMap } from './named-cache-client' | ||
import { util } from './util' | ||
import {ClientDuplexStream} from '@grpc/grpc-js' | ||
import {EventEmitter} from 'events' | ||
import {filter} from './filters' | ||
import {MapEventResponse, MapListenerRequest, MapListenerResponse} from './grpc/messages_pb' | ||
import {NamedCacheServiceClient} from './grpc/services_grpc_pb' | ||
import {NamedCache, NamedMap} from './named-cache-client' | ||
import {util} from './util' | ||
import {ConnectivityState} from "@grpc/grpc-js/build/src/connectivity-state"; | ||
import {Session} from "./session"; | ||
@@ -225,3 +227,3 @@ export namespace event { | ||
* | ||
* 1. A Map of stringified key => ListenerGroup, which is used to identify the | ||
* 1. A Map of string keys mapped to a ListenerGroup, which is used to identify the | ||
* group of callbacks for a single key. We stringify the key since Javascript | ||
@@ -267,3 +269,3 @@ * is not the same as Java's equals(). | ||
/** | ||
* The `NamedMap` that will used as the *source* of the events. | ||
* The `NamedMap` used as the *source* of the events. | ||
*/ | ||
@@ -282,3 +284,3 @@ protected namedMap: NamedMap<K, V> | ||
*/ | ||
private streamPromise: Promise<ClientDuplexStream<MapListenerRequest, MapListenerResponse>> | null = null | ||
private streamPromise: Promise<ClientDuplexStream<MapListenerRequest, MapListenerResponse> | null> | null = null | ||
@@ -324,11 +326,30 @@ /** | ||
/** | ||
* The `Session` associated with this event stream. | ||
* | ||
* @private | ||
*/ | ||
private session: Session; | ||
/** | ||
* Callback for session reconnect events. | ||
* @private | ||
*/ | ||
private readonly onReconnect: () => void | ||
/** | ||
* Callback for session disconnect events. | ||
* @private | ||
*/ | ||
private readonly onDisconnect: () => void | ||
/** | ||
* Constructs a new `MapEventsManager` | ||
* | ||
* @param namedMap the {@link NamedMap} to manage events for | ||
* @param session the associated {@link Session} | ||
* @param client the `gRPC` interface for making requests | ||
* @param scope the {@link NamedMap} scope | ||
* @param serializer the {@link Serializer} used by this map | ||
* @param emitter the {@link EventEmitter} to use | ||
*/ | ||
constructor (namedMap: NamedMap<K, V>, scope: string, client: NamedCacheServiceClient, serializer: util.Serializer, emitter: EventEmitter) { | ||
constructor (namedMap: NamedMap<K, V>, session: Session, client: NamedCacheServiceClient, serializer: util.Serializer, emitter: EventEmitter) { | ||
this.mapName = namedMap.name | ||
@@ -339,2 +360,3 @@ this.client = client | ||
this.emitter = emitter | ||
this.session = session; | ||
@@ -345,4 +367,28 @@ // Initialize internal data structures. | ||
this.filterId2ListenerGroup = new Map() | ||
this.reqFactory = new util.RequestFactory(this.mapName, scope, serializer) | ||
this.reqFactory = new util.RequestFactory(this.mapName, session.scope, serializer) | ||
this.streamPromise = this.ensureStream() | ||
this.onDisconnect = async () => { | ||
let st: ClientDuplexStream<MapListenerRequest, MapListenerResponse> | null = null; | ||
try { | ||
st = await this.streamPromise | ||
} catch (err: any) { | ||
// ignore | ||
} | ||
if (st !== null) { | ||
st.cancel() | ||
} | ||
this.streamPromise = null; | ||
} | ||
session.on(SessionLifecycleEvent.DISCONNECTED, this.onDisconnect) | ||
this.onReconnect = () => { | ||
this.keyMap.forEach(async value => { | ||
await value.doSubscribe(value.registeredIsLite) | ||
}) | ||
this.filterMap.forEach(async value => { | ||
await value.doSubscribe(value.registeredIsLite) | ||
}) | ||
} | ||
session.on(SessionLifecycleEvent.RECONNECTED, this.onReconnect) | ||
} | ||
@@ -353,33 +399,43 @@ | ||
*/ | ||
ensureStream (): Promise<ClientDuplexStream<MapListenerRequest, MapListenerResponse>> { | ||
ensureStream (): Promise<ClientDuplexStream<MapListenerRequest, MapListenerResponse> | null> { | ||
const self = this | ||
if (self.streamPromise == null) { | ||
const bidiStream = self.client.events() | ||
self.streamPromise = new Promise((resolve) => { | ||
self.client.waitForReady(Date.now() + this.session.options.readyTimeoutInMillis, (err) => { | ||
if (err) { | ||
this.streamPromise = this.ensureStream() | ||
resolve(null) | ||
} | ||
const bidiStream = self.client.events() | ||
bidiStream.on('data', (resp) => self.handleResponse(resp)) | ||
bidiStream.on('end', () => self.onEnd()) | ||
bidiStream.on('error', (err) => self.onError(err)) | ||
bidiStream.on('cancelled', () => self.onCancel()) | ||
bidiStream.on('data', (resp) => self.handleResponse(resp)) | ||
bidiStream.on('error', (err) => self.onError(err)) | ||
// Create a SubscribeRequest (with RequestType.INIT) | ||
const request = self.reqFactory.mapEventSubscribe() | ||
const initUid = request.getUid() | ||
self.streamPromise = new Promise((resolve, reject) => { | ||
// Setup pending subscriptions map so that when the | ||
// subscribe response comes back, or an error occurs | ||
// we can resolve or reject the connection. | ||
self.pendingSubscriptions.set(initUid, (uid, resp, err) => { | ||
self.pendingSubscriptions.delete(uid) | ||
if (err) { | ||
reject(err) | ||
} else { | ||
// If we received a successful subscribed response, | ||
// the connection is initialized. So resolve it. | ||
resolve(bidiStream) | ||
} | ||
// Create a SubscribeRequest (with RequestType.INIT) | ||
const request = self.reqFactory.mapEventSubscribe() | ||
const initUid = request.getUid() | ||
// If we received a successful subscribed response, | ||
// the connection is initialized. So resolve it. | ||
resolve(bidiStream) | ||
// Setup pending subscriptions map so that when the | ||
// subscribe response comes back, or an error occurs | ||
// we can resolve or reject the connection. | ||
self.pendingSubscriptions.set(initUid, (uid, resp, err) => { | ||
self.pendingSubscriptions.delete(uid) | ||
if (err) { | ||
this.streamPromise = this.ensureStream() | ||
resolve(null) | ||
} else { | ||
// If we received a successful subscribed response, | ||
// the connection is initialized. So resolve it. | ||
resolve(bidiStream) | ||
} | ||
}) | ||
// Now that we have set up the pending subscriptions map, | ||
// write the init request. | ||
bidiStream.write(request) | ||
}) | ||
// Now that we have set up the pending subscriptions map, | ||
// write the init request. | ||
bidiStream.write(request) | ||
}) | ||
@@ -528,13 +584,16 @@ } | ||
return this.ensureStream() | ||
.then((stream: ClientDuplexStream<MapListenerRequest, MapListenerResponse>) => { | ||
return new Promise<void>((resolve, reject) => { | ||
self.pendingSubscriptions.set(request.getUid(), (uid, resp, err) => { | ||
self.pendingSubscriptions.delete(uid) | ||
if (err) { | ||
reject(err) | ||
} else { | ||
.then((stream: ClientDuplexStream<MapListenerRequest, MapListenerResponse> | null) => { | ||
return new Promise<void>((resolve) => { | ||
if (!stream) { | ||
resolve() | ||
} else { | ||
self.pendingSubscriptions.set(request.getUid(), (uid, resp, err) => { | ||
self.pendingSubscriptions.delete(uid) | ||
if (err) { | ||
this.streamPromise = this.ensureStream() | ||
} | ||
resolve() | ||
} | ||
}) | ||
stream.write(request) | ||
}) | ||
stream.write(request) | ||
} | ||
}) | ||
@@ -551,13 +610,25 @@ }) | ||
self.markedForClose = true | ||
const bidiStream = await self.streamPromise | ||
let bidiStream: ClientDuplexStream<MapListenerRequest, MapListenerResponse> | null = null; | ||
try { | ||
bidiStream = await self.streamPromise; | ||
} catch (err: any) { | ||
} | ||
self.session.removeListener(SessionLifecycleEvent.CONNECTED, self.onReconnect) | ||
self.session.removeListener(SessionLifecycleEvent.RECONNECTED, self.onReconnect) | ||
self.session.removeListener(SessionLifecycleEvent.DISCONNECTED, self.onDisconnect) | ||
await new Promise<void>(async (resolve) => { | ||
// Setup an event handler for 'error' as calling cancel() on | ||
// Add an event handler for 'error' as calling cancel() on | ||
// the bidi stream will result in a CANCELLED status. | ||
bidiStream.on('error', (err) => { | ||
if (err.toString().indexOf('CANCELLED')) { | ||
self.streamPromise = null | ||
resolve() | ||
} | ||
}) | ||
bidiStream.end() | ||
if (bidiStream) { | ||
bidiStream.on('error', (err) => { | ||
if (err.toString().indexOf('CANCELLED')) { | ||
self.streamPromise = null | ||
resolve() | ||
} | ||
}) | ||
bidiStream.end() | ||
} | ||
resolve() | ||
}) | ||
@@ -603,25 +674,16 @@ } | ||
*/ | ||
private onError (err: Error) { | ||
private onError(err: Error) { | ||
if (!this.markedForClose) { | ||
this.emitter.emit('error', this.mapName + ': Received onError', err) | ||
if (this.client.getChannel().getConnectivityState(false) == ConnectivityState.READY) { | ||
// stream cancellation, but channel still okay | ||
this.streamPromise = null; | ||
this.keyMap.forEach(async value => { | ||
await value.doSubscribe(value.registeredIsLite) | ||
}) | ||
this.filterMap.forEach(async value => { | ||
await value.doSubscribe(value.registeredIsLite) | ||
}) | ||
} | ||
} | ||
} | ||
/** | ||
* Handles the end of an event stream. | ||
*/ | ||
private onEnd () { | ||
if (!this.markedForClose) { | ||
this.emitter.emit('error', this.mapName + ': Received onEnd') | ||
} | ||
} | ||
/** | ||
* Handles a stream being cancelled. | ||
*/ | ||
private onCancel () { | ||
if (!this.markedForClose) { | ||
this.emitter.emit('error', '** Received onCancel') | ||
} | ||
} | ||
} | ||
@@ -631,3 +693,3 @@ | ||
* Manages a collection of MapEventListeners. Handles sending out | ||
* MapListenerRequest subscriptions / unsubscriptions. Also, handles | ||
* MapListenerRequest subscriptions / un-subscriptions. Also, handles | ||
* notification of all the registered listeners. | ||
@@ -642,8 +704,2 @@ */ | ||
/** | ||
* Active status will be true if the subscribe request has been sent. | ||
* It will be false if a unsubscribe request has been sent. | ||
*/ | ||
isActive: boolean = true // Initially active. | ||
/** | ||
* The key or the filter for which this group of callbacks will | ||
@@ -659,3 +715,3 @@ * receive events. | ||
* | ||
* Similarly if a listener is removed whose isLite == false but if all the | ||
* Similarly, if a listener is removed whose isLite == false but if all the | ||
* remaining listeners are interested in only isLite == true, then a | ||
@@ -951,4 +1007,7 @@ * re-registration occurs. | ||
export enum SessionLifecycleEvent { | ||
CLOSED = 'session_closed' | ||
CLOSED = 'session_closed', | ||
DISCONNECTED = 'session_disconnected', | ||
RECONNECTED = 'session_reconnected', | ||
CONNECTED = 'session_connected' | ||
} | ||
} |
@@ -875,3 +875,3 @@ /* | ||
/** | ||
* A {@code java.util.function.Predicate} based {@link ExtractorFilter}. | ||
* A predicate based {@link ExtractorFilter}. | ||
*/ | ||
@@ -878,0 +878,0 @@ export class PredicateFilter |
@@ -5,6 +5,6 @@ /* | ||
* Licensed under the Universal Permissive License v 1.0 as shown at | ||
* http://oss.oracle.com/licenses/upl. | ||
* https://oss.oracle.com/licenses/upl. | ||
*/ | ||
import { ClientReadableStream, Metadata, ServiceError } from '@grpc/grpc-js' | ||
import {ClientReadableStream, Deadline, Metadata, ServiceError} from '@grpc/grpc-js' | ||
import { EventEmitter } from 'events' | ||
@@ -91,3 +91,3 @@ import { BytesValue } from 'google-protobuf/google/protobuf/wrappers_pb' | ||
/** | ||
* Signifies whether or not this `NamedMap` has been destroyed. | ||
* Signifies if this `NamedMap` has been destroyed. | ||
*/ | ||
@@ -144,3 +144,3 @@ readonly destroyed: boolean | ||
* @return a `Promise` resolving to `true` if the key is mapped | ||
* to the specified value value, or `false` if it does not | ||
* to the specified value, or `false` if it does not | ||
*/ | ||
@@ -193,3 +193,3 @@ hasEntry (key: K, value: V): Promise<boolean> | ||
/** | ||
* Copies all of the mappings from the specified map to this map | ||
* Copies all mappings from the specified map to this map | ||
* | ||
@@ -487,3 +487,3 @@ * @param map the map to copy from | ||
* Unlike the {@link keySet()} method, the set returned by this method may | ||
* not be backed by the map, so changes to the set may not reflected in the | ||
* not be backed by the map, so changes to the set may not be reflected in the | ||
* map, and vice-versa. | ||
@@ -493,7 +493,6 @@ * | ||
* entries of this map should satisfy | ||
* @param comparator the comparator for sorting | ||
* | ||
* @return a set of keys for entries that satisfy the specified criteria | ||
*/ | ||
keys (filter: Filter, comparator?: Comparator): Promise<RemoteSet<K>> | ||
keys (filter: Filter): Promise<RemoteSet<K>> | ||
@@ -639,3 +638,3 @@ /** | ||
* | ||
* @return a `Promise` resolving to the the previous value associated with the specified key, or | ||
* @return a `Promise` resolving to the previous value associated with the specified key, or | ||
* `null` if there was no mapping for the key. (A `null` return can also indicate that the map previously | ||
@@ -673,3 +672,3 @@ * associated `null` with the key, if the implementation supports `null` values.) | ||
*/ | ||
private session: Session | ||
private readonly session: Session | ||
/** | ||
@@ -740,3 +739,4 @@ * The name of the Coherence `NamedCache`. | ||
// Now open the events channel. | ||
this.mapEventsHandler = new MapEventsManager(this as NamedMap<K, V>, this.session.scope, this.client, this.serializer, this.internalEmitter) | ||
this.mapEventsHandler = new MapEventsManager(this as NamedMap<K, V>, | ||
this.session, this.client, this.serializer, this.internalEmitter) | ||
} | ||
@@ -777,3 +777,3 @@ | ||
const self = this | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
const request = new IsEmptyRequest() | ||
@@ -792,3 +792,3 @@ request.setCache(this.cacheName) | ||
get size () { | ||
return new Promise<number>((resolve, reject) => { | ||
return this.promisify<number>((resolve, reject) => { | ||
const request = new SizeRequest() | ||
@@ -843,3 +843,3 @@ request.setCache(this.cacheName) | ||
const self = this | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
const request = self.requestFactory.containsEntry(key, value) | ||
@@ -868,3 +868,3 @@ self.client.containsEntry(request, new Metadata(), this.session.callOptions(), (err, resp) => { | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
self.client.aggregate(request, new Metadata(), this.session.callOptions(), (err, resp) => { | ||
@@ -898,3 +898,3 @@ if (err) { | ||
const self = this | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
self.client.invoke(self.requestFactory.invoke(key, processor), (err, resp) => { | ||
@@ -945,3 +945,3 @@ if (err) { | ||
if (util.isIterableType(keysOrFilter)) { | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
this.getAll(keysOrFilter as Iterable<K>) | ||
@@ -953,3 +953,3 @@ .then(entries => entries.forEach((value: V, key: K) => action(value, key, this))) | ||
} else { | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
this.entries(keysOrFilter as filter.Filter) | ||
@@ -966,3 +966,3 @@ .then(entries => { | ||
} | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
this.entries(Filters.always()) | ||
@@ -1022,3 +1022,3 @@ .then(entries => { | ||
const request = this.requestFactory.addIndex(extractor, ordered, comparator) | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
self.client.addIndex(request, new Metadata(), this.session.callOptions(), (err: ServiceError | null) => { | ||
@@ -1043,3 +1043,3 @@ self.resolveValue(resolve, reject, err) | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
call.on(RequestStateEvent.DATA, function (e: GrpcEntry) { | ||
@@ -1059,3 +1059,3 @@ const entry = new NamedCacheEntry<K, V>(e.getKey_asU8(), e.getValue_asU8(), self.getRequestFactory().serializer) | ||
*/ | ||
keys (filter?: Filter, comparator?: Comparator): Promise<RemoteSet<K>> { | ||
keys (filter?: Filter): Promise<RemoteSet<K>> { | ||
const self = this | ||
@@ -1070,3 +1070,3 @@ if (!filter) { | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
call.on(RequestStateEvent.DATA, function (r: BytesValue) { | ||
@@ -1091,3 +1091,3 @@ const k = self.getRequestFactory().serializer.deserialize(r.getValue_asU8()) | ||
const request = this.requestFactory.removeIndex(extractor) | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
self.client.removeIndex(request, (err: ServiceError | null) => { | ||
@@ -1112,3 +1112,3 @@ self.resolveValue(resolve, reject, err) | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
call.on(RequestStateEvent.DATA, function (b: BytesValue) { | ||
@@ -1129,3 +1129,3 @@ set.add(self.getRequestFactory().serializer.deserialize(b.getValue_asU8())) | ||
const self = this | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
self.client.clear(self.requestFactory.clear(), new Metadata(), this.session.callOptions(), (err: ServiceError | null) => { | ||
@@ -1143,3 +1143,3 @@ self.resolveValue(resolve, reject, err) | ||
const request = self.requestFactory.containsKey(key) | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
self.client.containsKey(request, new Metadata(), this.session.callOptions(), (err, resp) => { | ||
@@ -1158,3 +1158,3 @@ // @ts-ignore | ||
const request = this.requestFactory.containsValue(value) | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
self.client.containsValue(request, new Metadata(), this.session.callOptions(), (err, resp) => { | ||
@@ -1188,3 +1188,3 @@ // @ts-ignore | ||
const self = this | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
self.client.get(self.requestFactory.get(key), new Metadata(), this.session.callOptions(), (err, resp) => { | ||
@@ -1206,3 +1206,3 @@ if (resp && resp.getPresent()) { | ||
const self = this | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
self.client.put(self.requestFactory.put(key, value, ttl), new Metadata(), this.session.callOptions(), (err, resp) => { | ||
@@ -1220,3 +1220,3 @@ // @ts-ignore | ||
const self = this | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
self.client.putAll(self.requestFactory.putAll(map), new Metadata(), this.session.callOptions(), (err: ServiceError | null) => { | ||
@@ -1234,3 +1234,3 @@ self.resolveValue(resolve, reject, err) | ||
const request = self.requestFactory.putIfAbsent(key, value, ttl) | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
self.client.putIfAbsent(request, new Metadata(), this.session.callOptions(), (err, resp) => { | ||
@@ -1248,3 +1248,3 @@ // @ts-ignore | ||
const self = this | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
self.client.remove(this.requestFactory.remove(key), new Metadata(), this.session.callOptions(), (err, resp) => { | ||
@@ -1263,3 +1263,3 @@ // @ts-ignore | ||
const request = this.requestFactory.removeMapping(key, value) | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
self.client.removeMapping(request, new Metadata(), this.session.callOptions(), (err, resp) => { | ||
@@ -1278,3 +1278,3 @@ // @ts-ignore | ||
const request = this.requestFactory.replace(key, value) | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
self.client.replace(request, new Metadata(), this.session.callOptions(), (err, resp) => { | ||
@@ -1294,3 +1294,3 @@ // @ts-ignore | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
self.client.replaceMapping(request, new Metadata(), this.session.callOptions(), (err, resp) => { | ||
@@ -1310,5 +1310,5 @@ // @ts-ignore | ||
if (this.active) { | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
// Note that this listener will be after the default listeners | ||
// that were setup in the constructor. So once this receives | ||
// that were registered in the constructor. So once this receives | ||
// the event, we can be sure that *all other* listeners have | ||
@@ -1318,8 +1318,8 @@ // be notified!! | ||
// Now that we have setup our 'once & only once' listener, we | ||
// Now that we have registered our 'once & only once' listener, we | ||
// can now send out the 'truncate' request. The handleResponse() | ||
// method will generate the appropriate event on the internalEmitter | ||
// for which our 'once & only once' listener is setup. | ||
// for which our 'once & only once' listener is registered. | ||
const request = self.requestFactory.destroy() | ||
self.client.destroy(request, new Metadata(), self.session.callOptions(), (err: ServiceError | null) => { | ||
self.client.destroy(request, new Metadata(), self.session.callOptions(), async (err: ServiceError | null) => { | ||
if (err) { | ||
@@ -1340,5 +1340,5 @@ reject(err) | ||
const self = this | ||
return new Promise((resolve) => { | ||
return this.promisify((resolve) => { | ||
// Note that this listener will be after the default listeners | ||
// that were setup in the constructor. So once this receives | ||
// that were registered in the constructor. So once this receives | ||
// the event, we can be sure that *all other* listeners have | ||
@@ -1348,5 +1348,5 @@ // be notified!! | ||
// Now that we have setup our 'once & only once' listener, we | ||
// Now that we have registered our 'once & only once' listener, we | ||
// can emit the MapLifecycleEvent.RELEASED event on the internalEmitter | ||
// for which our 'once & only once' listener is setup. | ||
// for which our 'once & only once' listener is registered. | ||
self.internalEmitter.emit(MapLifecycleEvent.RELEASED, self.cacheName) | ||
@@ -1361,5 +1361,5 @@ }) | ||
const self = this | ||
return new Promise((resolve, reject) => { | ||
return this.promisify((resolve, reject) => { | ||
// Note that this listener will be after the default listeners | ||
// that were setup in the constructor. So once this receives | ||
// that were registered in the constructor. So once this receives | ||
// the event, we can be sure that *all other* listeners have | ||
@@ -1371,6 +1371,6 @@ // be notified!! | ||
// Now that we have setup our 'once & only once' listener, we | ||
// Now that we have registered our 'once & only once' listener, we | ||
// can now send out the 'truncate' request. The handleResponse() | ||
// method will generate the appropriate event on the internalEmitter | ||
// for which our 'once & only once' listener is setup. | ||
// for which our 'once & only once' listener is registered. | ||
const request = new TruncateRequest() | ||
@@ -1389,2 +1389,32 @@ request.setCache(this.cacheName) | ||
/** | ||
* Create a promise wrapping the provided execution logic. This logic will be called | ||
* when the `gRPC` client's channel is ready. | ||
* | ||
* @param logic execution logic | ||
*/ | ||
promisify<T> (logic: (resolve: (value: T | PromiseLike<T>) => void, reject: (reason?: any) => void) => void): Promise<T> { | ||
const self = this | ||
return new Promise((resolve, reject) => { | ||
if (!self.active) { | ||
let message: string = 'Cache [' + this.cacheName + '] has been ' + (this.released ? 'release.' : 'destroyed.') | ||
reject(new Error(message)) | ||
} | ||
self.client.waitForReady(self.readyTimeout(), error => { | ||
if (error) { | ||
reject(error) | ||
} | ||
logic(resolve, reject) | ||
}) | ||
}) | ||
} | ||
/** | ||
* The deadline for a `gRPC` channel to be ready. | ||
*/ | ||
readyTimeout(): Deadline { | ||
return Date.now() + this.session.options.readyTimeoutInMillis; | ||
} | ||
/** | ||
* Obtain the next page of entries from the cache. | ||
@@ -1419,2 +1449,3 @@ * | ||
if (cacheName == self.cacheName) { | ||
// noinspection JSIgnoredPromiseFromCall | ||
self.mapEventsHandler.closeEventStream() | ||
@@ -1434,2 +1465,3 @@ self._destroyed = true | ||
if (cacheName == self.cacheName) { | ||
// noinspection JSIgnoredPromiseFromCall | ||
self.mapEventsHandler.closeEventStream() | ||
@@ -1436,0 +1468,0 @@ self._released = true |
/* | ||
* Copyright (c) 2020 Oracle and/or its affiliates. | ||
* Copyright (c) 2020, 2022 Oracle and/or its affiliates. | ||
* | ||
* Licensed under the Universal Permissive License v 1.0 as shown at | ||
* http://oss.oracle.com/licenses/upl. | ||
* https://oss.oracle.com/licenses/upl. | ||
*/ | ||
import { CallOptions, Channel, ChannelCredentials, credentials } from '@grpc/grpc-js' | ||
import { EventEmitter } from 'events' | ||
import { PathLike, readFileSync } from 'fs' | ||
import { event } from './events' | ||
import {CallOptions, Channel, ChannelCredentials, credentials} from '@grpc/grpc-js' | ||
import {EventEmitter} from 'events' | ||
import {PathLike, readFileSync} from 'fs' | ||
import {event} from './events' | ||
import { NamedCache, NamedCacheClient, NamedMap } from './named-cache-client' | ||
import { util } from './util' | ||
import {NamedCache, NamedCacheClient, NamedMap} from './named-cache-client' | ||
import {util} from './util' | ||
import {ConnectivityState} from "@grpc/grpc-js/build/src/connectivity-state"; | ||
@@ -46,2 +47,10 @@ /** | ||
/** | ||
* Defines the timeout, in `milliseconds`, that will be applied when waiting for | ||
* the `gRPC` stream to become ready. | ||
* | ||
* If not explicitly set, this defaults to `60000`. | ||
*/ | ||
private _readyTimeoutInMillis: number | ||
/** | ||
* The serialization format. Currently, this is always `json`. | ||
@@ -52,2 +61,9 @@ */ | ||
/** | ||
* The `gRPC` `ChannelOptions`. | ||
* | ||
* @see https://github.com/grpc/grpc-node/tree/master/packages/grpc-js | ||
*/ | ||
private _channelOptions: { [key: string]: any } | ||
/** | ||
* A function taking no arguments returning a gRPC CallOptions instance. | ||
@@ -121,2 +137,26 @@ * If not explicitly configured, then the call options will simply define | ||
/** | ||
* Returns the ready timeout in `milliseconds`. | ||
* | ||
* @return the ready timeout in `milliseconds` | ||
*/ | ||
get readyTimeoutInMillis(): number { | ||
return this._readyTimeoutInMillis; | ||
} | ||
/** | ||
* Set the ready timeout in `milliseconds`. If the timeout value is zero or less, then no timeout will be applied. | ||
* | ||
* @param timeout the request timeout in `milliseconds` | ||
*/ | ||
set readyTimeoutInMillis(timeout: number) { | ||
if (this.locked) { | ||
return; | ||
} | ||
if (timeout <= 0) { | ||
timeout = Number.POSITIVE_INFINITY | ||
} | ||
this._readyTimeoutInMillis = timeout; | ||
} | ||
/** | ||
* Return the scope name used to link this `Session` with to the corresponding | ||
@@ -144,2 +184,23 @@ * `ConfigurableCacheFactory` on the server. | ||
/** | ||
* Return the `gRPC` `ChannelOptions`. | ||
*/ | ||
get channelOptions(): { [p: string]: any } { | ||
return this._channelOptions; | ||
} | ||
/** | ||
* Set the `gRPC` `ChannelOptions`. | ||
* | ||
* @param value the `gRPC` `ChannelOptions` | ||
* | ||
* @see https://github.com/grpc/grpc-node/tree/master/packages/grpc-js | ||
*/ | ||
set channelOptions(value: { [p: string]: any }) { | ||
if (this.locked) { | ||
return; | ||
} | ||
this._channelOptions = value; | ||
} | ||
/** | ||
* The serialization format used by this session. This library currently supports JSON serialization only, thus | ||
@@ -217,4 +278,6 @@ * this always returns 'json'. | ||
this._requestTimeoutInMillis = Session.DEFAULT_REQUEST_TIMEOUT | ||
this._readyTimeoutInMillis = Session.DEFAULT_READY_TIMEOUT | ||
this._format = Session.DEFAULT_FORMAT | ||
this._scope = Session.DEFAULT_SCOPE | ||
this._channelOptions = {} | ||
this._tls = new TlsOptions() | ||
@@ -261,8 +324,2 @@ | ||
/** | ||
* The gRPC channel options. See [documentation](https://grpc.github.io/grpc/core/group__grpc__arg__keys.html) | ||
* to obtain a list of possible options | ||
*/ | ||
private _channelOptions?: { [key: string]: string | number } | ||
/** | ||
* Returns `true` if TLS is to be enabled. | ||
@@ -352,22 +409,2 @@ * | ||
/** | ||
* Return the defined gRPC channel options. | ||
*/ | ||
get channelOptions (): { [key: string]: string | number } | undefined { | ||
return this._channelOptions | ||
} | ||
/** | ||
* Set the gRPC channel options. See [documentation](https://grpc.github.io/grpc/core/group__grpc__arg__keys.html) | ||
* to obtain a list of possible options | ||
* | ||
* @param value the gRPC channel options | ||
*/ | ||
set channelOptions (value: { [key: string]: string | number } | undefined) { | ||
if (this.locked) { | ||
return; | ||
} | ||
this._channelOptions = value | ||
} | ||
/** | ||
* Once called, no further mutations can be made. | ||
@@ -390,2 +427,6 @@ * @hidden | ||
* 3. {@link MapLifecycleEvent.RELEASED}: When the underlying cache is released | ||
* 4. {@link event.SessionLifecycleEvent.CONNECT}`: when the Session detects the underlying `gRPC` channel has connected. | ||
* 4. {@link event.SessionLifecycleEvent.DISCONNECT}`: when the Session detects the underlying `gRPC` channel has disconnected | ||
* 5. {@link event.SessionLifecycleEvent.DISCONNECT}`: when the Session detects the underlying `gRPC` channel has re-connected | ||
* 5. {@link event.SessionLifecycleEvent.CLOSED}`: when the Session has been closed | ||
*/ | ||
@@ -405,2 +446,7 @@ export class Session | ||
/** | ||
* The default `gRPC` stream ready timeout. | ||
*/ | ||
public static readonly DEFAULT_READY_TIMEOUT = 30000 | ||
/** | ||
* The default scope. | ||
@@ -448,7 +494,2 @@ */ | ||
/** | ||
* The set of options to use while creating a gRPC Channel. | ||
*/ | ||
private readonly _channelOptions: { [key: string]: string | number } = {} | ||
/** | ||
* The set of options to use while creating a {@link NamedCacheClient}. | ||
@@ -478,3 +519,3 @@ */ | ||
// If TLS is enabled then create a SSL channel credentials object. | ||
// If TLS is enabled then create an SSL channel credentials object. | ||
this._channelCredentials = this.options.tls.enabled | ||
@@ -486,7 +527,46 @@ ? credentials.createSsl(Session.readFile('caCert', this.options.tls.caCertPath), | ||
this._channel = new Channel(this.options.address, this.channelCredentials, this._channelOptions) | ||
let channel = this._channel = new Channel(this.options.address, | ||
this.channelCredentials, | ||
this.options.channelOptions) | ||
// register handler to monitoring gRPC channel state. | ||
// When transitioning from READY to any other state, other than SHUTDOWN, | ||
// emit the `disconnect` event. | ||
// When transitioning from any other state, | ||
// other than SHUTDOWN, to READY, emit the 'reconnect' event. | ||
let connected: boolean = false; | ||
let firstConnect: boolean = true; | ||
let lastState: number = 0; | ||
let callback = async () => { | ||
let state = channel.getConnectivityState(false); | ||
lastState = state; | ||
if (state === ConnectivityState.SHUTDOWN) { | ||
// nothing to do | ||
return; | ||
} else if (state === ConnectivityState.READY) { | ||
if (!firstConnect && !connected) { | ||
this.emit(event.SessionLifecycleEvent.RECONNECTED) | ||
connected = true | ||
} else if (firstConnect && !connected){ | ||
this.emit(event.SessionLifecycleEvent.CONNECTED) | ||
firstConnect = false | ||
connected = true | ||
} | ||
} else { | ||
if (connected) { | ||
this.emit(event.SessionLifecycleEvent.DISCONNECTED) | ||
connected = false; | ||
} | ||
} | ||
let deadline = Number.POSITIVE_INFINITY | ||
if (state !== ConnectivityState.READY) { | ||
deadline = Date.now() + this._sessionOptions.readyTimeoutInMillis | ||
} | ||
channel.watchConnectivityState(state, deadline, callback) | ||
} | ||
channel.watchConnectivityState(ConnectivityState.READY, Number.POSITIVE_INFINITY, callback) | ||
// channel will now be shared by all caches created by this session | ||
this._clientOptions = { | ||
channelOverride: this._channel | ||
channelOverride: channel | ||
} | ||
@@ -493,0 +573,0 @@ |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Native code
Supply chain riskContains native code (e.g., compiled binaries or shared libraries). Including native code can obscure malicious behavior.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Native code
Supply chain riskContains native code (e.g., compiled binaries or shared libraries). Including native code can obscure malicious behavior.
Found 1 instance in 1 package
10066933
187
33061