Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@aws-amplify/api-graphql

Package Overview
Dependencies
Maintainers
9
Versions
1710
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@aws-amplify/api-graphql - npm Package Compare versions

Comparing version 4.4.0 to 4.4.1-events.619318.0

dist/cjs/internals/events/appsyncRequest.js

5

./dist/cjs/index.js

@@ -6,4 +6,6 @@ 'use strict';

Object.defineProperty(exports, "__esModule", { value: true });
exports.CONNECTION_STATE_CHANGE = exports.graphqlOperation = exports.GraphQLAPIClass = exports.GraphQLAPI = void 0;
exports.CONNECTION_STATE_CHANGE = exports.graphqlOperation = exports.GraphQLAPIClass = exports.GraphQLAPI = exports.events = void 0;
const tslib_1 = require("tslib");
const events = tslib_1.__importStar(require("./internals/events"));
exports.events = events;
var GraphQLAPI_1 = require("./GraphQLAPI");

@@ -16,2 +18,3 @@ Object.defineProperty(exports, "GraphQLAPI", { enumerable: true, get: function () { return GraphQLAPI_1.GraphQLAPI; } });

Object.defineProperty(exports, "CONNECTION_STATE_CHANGE", { enumerable: true, get: function () { return constants_1.CONNECTION_STATE_CHANGE; } });
tslib_1.__exportStar(require("./internals/events/types"), exports);
//# sourceMappingURL=index.js.map

@@ -6,4 +6,6 @@ 'use strict';

Object.defineProperty(exports, "__esModule", { value: true });
exports.CONNECTION_STATE_CHANGE = exports.graphqlOperation = exports.GraphQLAPIClass = exports.GraphQLAPI = void 0;
exports.CONNECTION_STATE_CHANGE = exports.graphqlOperation = exports.GraphQLAPIClass = exports.GraphQLAPI = exports.events = void 0;
const tslib_1 = require("tslib");
const events = tslib_1.__importStar(require("./internals/events"));
exports.events = events;
var GraphQLAPI_1 = require("./GraphQLAPI");

@@ -16,2 +18,3 @@ Object.defineProperty(exports, "GraphQLAPI", { enumerable: true, get: function () { return GraphQLAPI_1.GraphQLAPI; } });

Object.defineProperty(exports, "CONNECTION_STATE_CHANGE", { enumerable: true, get: function () { return constants_1.CONNECTION_STATE_CHANGE; } });
tslib_1.__exportStar(require("./internals/events/types"), exports);
//# sourceMappingURL=index.js.map

75

dist/cjs/internals/InternalGraphQLAPI.js

@@ -18,2 +18,3 @@ 'use strict';

const isGraphQLResponseWithErrors_1 = require("./utils/runtimeTypeGuards/isGraphQLResponseWithErrors");
const graphqlAuth_1 = require("./graphqlAuth");
const USER_AGENT_HEADER = 'x-amz-user-agent';

@@ -42,57 +43,2 @@ const isAmplifyInstance = (amplify) => {

}
async _headerBasedAuth(amplify, authMode, additionalHeaders = {}) {
const { apiKey } = (0, utils_2.resolveConfig)(amplify);
let headers = {};
switch (authMode) {
case 'apiKey':
if (!apiKey) {
throw new errors_1.GraphQLApiError(constants_1.NO_API_KEY);
}
headers = {
'X-Api-Key': apiKey,
};
break;
case 'iam': {
const session = await amplify.Auth.fetchAuthSession();
if (session.credentials === undefined) {
throw new errors_1.GraphQLApiError(constants_1.NO_VALID_CREDENTIALS);
}
break;
}
case 'oidc':
case 'userPool': {
let token;
try {
token = (await amplify.Auth.fetchAuthSession()).tokens?.accessToken.toString();
}
catch (e) {
// fetchAuthSession failed
throw new errors_1.GraphQLApiError({
...constants_1.NO_SIGNED_IN_USER,
underlyingError: e,
});
}
// `fetchAuthSession()` succeeded but didn't return `tokens`.
// This may happen when unauthenticated access is enabled and there is
// no user signed in.
if (!token) {
throw new errors_1.GraphQLApiError(constants_1.NO_VALID_AUTH_TOKEN);
}
headers = {
Authorization: token,
};
break;
}
case 'lambda':
if (typeof additionalHeaders === 'object' &&
!additionalHeaders.Authorization) {
throw new errors_1.GraphQLApiError(constants_1.NO_AUTH_TOKEN_HEADER);
}
headers = {
Authorization: additionalHeaders.Authorization,
};
break;
}
return headers;
}
/**

@@ -149,3 +95,3 @@ * to get the operation type

async _graphql(amplify, { query, variables, authMode: explicitAuthMode }, additionalHeaders = {}, abortController, customUserAgentDetails, authToken) {
const { region, endpoint: appSyncGraphqlEndpoint, customEndpoint, customEndpointRegion, defaultAuthMode, } = (0, utils_2.resolveConfig)(amplify);
const { apiKey, region, endpoint: appSyncGraphqlEndpoint, customEndpoint, customEndpointRegion, defaultAuthMode, } = (0, utils_2.resolveConfig)(amplify);
const initialAuthMode = explicitAuthMode || defaultAuthMode || 'iam';

@@ -186,6 +132,5 @@ // identityPool is an alias for iam. TODO: remove 'iam' in v7

}
// TODO: Figure what we need to do to remove `!`'s.
const authHeaders = await (0, graphqlAuth_1.headerBasedAuth)(amplify, authMode, apiKey, additionalCustomHeaders);
const headers = {
...(!customEndpoint &&
(await this._headerBasedAuth(amplify, authMode, additionalCustomHeaders))),
...(!customEndpoint && authHeaders),
/**

@@ -197,7 +142,3 @@ * Custom endpoint headers.

*/
...((customEndpoint &&
(customEndpointRegion
? await this._headerBasedAuth(amplify, authMode, additionalCustomHeaders)
: {})) ||
{}),
...((customEndpoint && (customEndpointRegion ? authHeaders : {})) || {}),
// Custom headers included in Amplify configuration options:

@@ -247,5 +188,5 @@ ...(customHeaders &&

try {
// See the inline doc of the REST `post()` API for possible errors to be thrown.
// As these errors are catastrophic they should be caught and handled by GraphQL
// API consumers.
// // // See the inline doc of the REST `post()` API for possible errors to be thrown.
// // // As these errors are catastrophic they should be caught and handled by GraphQL
// // // API consumers.
const { body: responseBody } = await this._api.post(amplify, {

@@ -252,0 +193,0 @@ url: new utils_1.AmplifyUrl(endpoint),

'use strict';
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
Object.defineProperty(exports, "__esModule", { value: true });
exports.AWSAppSyncRealTimeProvider = void 0;
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
const rxjs_1 = require("rxjs");
const graphql_1 = require("graphql");
const core_1 = require("@aws-amplify/core");
const aws_client_utils_1 = require("@aws-amplify/core/internals/aws-client-utils");
const utils_1 = require("@aws-amplify/core/internals/utils");
const PubSub_1 = require("../../types/PubSub");
const constants_1 = require("../constants");
const ConnectionStateMonitor_1 = require("../../utils/ConnectionStateMonitor");
const ReconnectionMonitor_1 = require("../../utils/ReconnectionMonitor");
const logger = new core_1.ConsoleLogger('AWSAppSyncRealTimeProvider');
const dispatchApiEvent = (payload) => {
core_1.Hub.dispatch('api', payload, 'PubSub', constants_1.AMPLIFY_SYMBOL);
};
const standardDomainPattern = /^https:\/\/\w{26}\.appsync-api\.\w{2}(?:(?:-\w{2,})+)-\d\.amazonaws.com(?:\.cn)?\/graphql$/i;
const customDomainPath = '/realtime';
class AWSAppSyncRealTimeProvider {
const AWSWebSocketProvider_1 = require("../AWSWebSocketProvider");
const authHeaders_1 = require("../AWSWebSocketProvider/authHeaders");
const PROVIDER_NAME = 'AWSAppSyncRealTimeProvider';
// get rid of generic. Just map the options from Gogi-specific to general
class AWSAppSyncRealTimeProvider extends AWSWebSocketProvider_1.AWSWebSocketProvider {
constructor() {
this.socketStatus = constants_1.SOCKET_STATUS.CLOSED;
this.keepAliveTimeout = constants_1.DEFAULT_KEEP_ALIVE_TIMEOUT;
this.subscriptionObserverMap = new Map();
this.promiseArray = [];
this.connectionStateMonitor = new ConnectionStateMonitor_1.ConnectionStateMonitor();
this.reconnectionMonitor = new ReconnectionMonitor_1.ReconnectionMonitor();
// Monitor the connection state and pass changes along to Hub
this.connectionStateMonitorSubscription =
this.connectionStateMonitor.connectionStateObservable.subscribe(connectionState => {
dispatchApiEvent({
event: constants_1.CONNECTION_STATE_CHANGE,
data: {
provider: this,
connectionState,
},
message: `Connection state is ${connectionState}`,
});
this.connectionState = connectionState;
// Trigger START_RECONNECT when the connection is disrupted
if (connectionState === PubSub_1.ConnectionState.ConnectionDisrupted) {
this.reconnectionMonitor.record(ReconnectionMonitor_1.ReconnectEvent.START_RECONNECT);
}
// Trigger HALT_RECONNECT to halt reconnection attempts when the state is anything other than
// ConnectionDisrupted or Connecting
if ([
PubSub_1.ConnectionState.Connected,
PubSub_1.ConnectionState.ConnectedPendingDisconnect,
PubSub_1.ConnectionState.ConnectedPendingKeepAlive,
PubSub_1.ConnectionState.ConnectedPendingNetwork,
PubSub_1.ConnectionState.ConnectionDisruptedPendingNetwork,
PubSub_1.ConnectionState.Disconnected,
].includes(connectionState)) {
this.reconnectionMonitor.record(ReconnectionMonitor_1.ReconnectEvent.HALT_RECONNECT);
}
});
super(PROVIDER_NAME);
}
/**
* Mark the socket closed and release all active listeners
*/
close() {
// Mark the socket closed both in status and the connection monitor
this.socketStatus = constants_1.SOCKET_STATUS.CLOSED;
this.connectionStateMonitor.record(ConnectionStateMonitor_1.CONNECTION_CHANGE.CONNECTION_FAILED);
// Turn off the subscription monitor Hub publishing
this.connectionStateMonitorSubscription.unsubscribe();
// Complete all reconnect observers
this.reconnectionMonitor.close();
subscribe(options, customUserAgentDetails) {
return super.subscribe(options, customUserAgentDetails);
}
getNewWebSocket(url, protocol) {
return new WebSocket(url, protocol);
}
getProviderName() {
return 'AWSAppSyncRealTimeProvider';
return PROVIDER_NAME;
}
// Check if url matches standard domain pattern
isCustomDomain(url) {
return url.match(standardDomainPattern) === null;
}
subscribe(options, customUserAgentDetails) {
const { appSyncGraphqlEndpoint, region, query, variables, authenticationType, additionalHeaders, apiKey, authToken, libraryConfigHeaders, } = options || {};
return new rxjs_1.Observable(observer => {
if (!options || !appSyncGraphqlEndpoint) {
observer.error({
errors: [
{
...new graphql_1.GraphQLError(`Subscribe only available for AWS AppSync endpoint`),
},
],
});
observer.complete();
}
else {
let subscriptionStartActive = false;
const subscriptionId = (0, utils_1.amplifyUuid)();
const startSubscription = () => {
if (!subscriptionStartActive) {
subscriptionStartActive = true;
const startSubscriptionPromise = this._startSubscriptionWithAWSAppSyncRealTime({
options: {
query,
variables,
region,
authenticationType,
appSyncGraphqlEndpoint,
additionalHeaders,
apiKey,
authToken,
libraryConfigHeaders,
},
observer,
subscriptionId,
customUserAgentDetails,
}).catch(err => {
logger.debug(`${PubSub_1.CONTROL_MSG.REALTIME_SUBSCRIPTION_INIT_ERROR}: ${err}`);
this.connectionStateMonitor.record(ConnectionStateMonitor_1.CONNECTION_CHANGE.CLOSED);
});
startSubscriptionPromise.finally(() => {
subscriptionStartActive = false;
});
}
};
// Add an observable to the reconnection list to manage reconnection for this subscription
const reconnectSubscription = new rxjs_1.Observable(reconnectSubscriptionObserver => {
this.reconnectionMonitor.addObserver(reconnectSubscriptionObserver);
}).subscribe(() => {
startSubscription();
});
startSubscription();
return async () => {
// Cleanup reconnection subscription
reconnectSubscription?.unsubscribe();
// Cleanup after unsubscribing or observer.complete was called after _startSubscriptionWithAWSAppSyncRealTime
try {
// Waiting that subscription has been connected before trying to unsubscribe
await this._waitForSubscriptionToBeConnected(subscriptionId);
const { subscriptionState } = this.subscriptionObserverMap.get(subscriptionId) || {};
if (!subscriptionState) {
// subscription already unsubscribed
return;
}
if (subscriptionState === constants_1.SUBSCRIPTION_STATUS.CONNECTED) {
this._sendUnsubscriptionMessage(subscriptionId);
}
else {
throw new Error('Subscription never connected');
}
}
catch (err) {
logger.debug(`Error while unsubscribing ${err}`);
}
finally {
this._removeSubscriptionObserver(subscriptionId);
}
};
}
});
}
async _startSubscriptionWithAWSAppSyncRealTime({ options, observer, subscriptionId, customUserAgentDetails, }) {
const { appSyncGraphqlEndpoint, authenticationType, query, variables, apiKey, region, libraryConfigHeaders = () => ({}), additionalHeaders = {}, authToken, } = options;
let additionalCustomHeaders = {};
if (typeof additionalHeaders === 'function') {
const requestOptions = {
url: appSyncGraphqlEndpoint || '',
queryString: query || '',
};
additionalCustomHeaders = await additionalHeaders(requestOptions);
}
else {
additionalCustomHeaders = additionalHeaders;
}
// if an authorization header is set, have the explicit authToken take precedence
if (authToken) {
additionalCustomHeaders = {
...additionalCustomHeaders,
Authorization: authToken,
};
}
const subscriptionState = constants_1.SUBSCRIPTION_STATUS.PENDING;
async _prepareSubscriptionPayload({ options, subscriptionId, customUserAgentDetails, additionalCustomHeaders, libraryConfigHeaders, }) {
const { appSyncGraphqlEndpoint, authenticationType, query, variables, apiKey, region, } = options;
const data = {

@@ -186,18 +29,9 @@ query,

};
// Having a subscription id map will make it simple to forward messages received
this.subscriptionObserverMap.set(subscriptionId, {
observer,
query: query ?? '',
variables: variables ?? {},
subscriptionState,
startAckTimeoutId: undefined,
});
// Preparing payload for subscription message
const dataString = JSON.stringify(data);
const headerObj = {
...(await this._awsRealTimeHeaderBasedAuth({
const serializedData = JSON.stringify(data);
const headers = {
...(await (0, authHeaders_1.awsRealTimeHeaderBasedAuth)({
apiKey,
appSyncGraphqlEndpoint,
authenticationType,
payload: dataString,
payload: serializedData,
canonicalUri: '',

@@ -207,3 +41,3 @@ region,

})),
...(await libraryConfigHeaders()),
...libraryConfigHeaders,
...additionalCustomHeaders,

@@ -215,6 +49,6 @@ [utils_1.USER_AGENT_HEADER]: (0, utils_1.getAmplifyUserAgent)(customUserAgentDetails),

payload: {
data: dataString,
data: serializedData,
extensions: {
authorization: {
...headerObj,
...headers,
},

@@ -225,151 +59,11 @@ },

};
const stringToAWSRealTime = JSON.stringify(subscriptionMessage);
try {
this.connectionStateMonitor.record(ConnectionStateMonitor_1.CONNECTION_CHANGE.OPENING_CONNECTION);
await this._initializeWebSocketConnection({
apiKey,
appSyncGraphqlEndpoint,
authenticationType,
region,
additionalCustomHeaders,
});
}
catch (err) {
this._logStartSubscriptionError(subscriptionId, observer, err);
return;
}
// Potential race condition can occur when unsubscribe is called during _initializeWebSocketConnection.
// E.g.unsubscribe gets invoked prior to finishing WebSocket handshake or START_ACK.
// Both subscriptionFailedCallback and subscriptionReadyCallback are used to synchronized this.
const { subscriptionFailedCallback, subscriptionReadyCallback } = this.subscriptionObserverMap.get(subscriptionId) ?? {};
// This must be done before sending the message in order to be listening immediately
this.subscriptionObserverMap.set(subscriptionId, {
observer,
subscriptionState,
query: query ?? '',
variables: variables ?? {},
subscriptionReadyCallback,
subscriptionFailedCallback,
startAckTimeoutId: setTimeout(() => {
this._timeoutStartSubscriptionAck(subscriptionId);
}, constants_1.START_ACK_TIMEOUT),
});
if (this.awsRealTimeSocket) {
this.awsRealTimeSocket.send(stringToAWSRealTime);
}
const serializedSubscriptionMessage = JSON.stringify(subscriptionMessage);
return serializedSubscriptionMessage;
}
// Log logic for start subscription failures
_logStartSubscriptionError(subscriptionId, observer, err) {
logger.debug({ err });
const message = String(err.message ?? '');
// Resolving to give the state observer time to propogate the update
this.connectionStateMonitor.record(ConnectionStateMonitor_1.CONNECTION_CHANGE.CLOSED);
// Capture the error only when the network didn't cause disruption
if (this.connectionState !== PubSub_1.ConnectionState.ConnectionDisruptedPendingNetwork) {
// When the error is non-retriable, error out the observable
if ((0, utils_1.isNonRetryableError)(err)) {
observer.error({
errors: [
{
...new graphql_1.GraphQLError(`${PubSub_1.CONTROL_MSG.CONNECTION_FAILED}: ${message}`),
},
],
});
}
else {
logger.debug(`${PubSub_1.CONTROL_MSG.CONNECTION_FAILED}: ${message}`);
}
const { subscriptionFailedCallback } = this.subscriptionObserverMap.get(subscriptionId) || {};
// Notify concurrent unsubscription
if (typeof subscriptionFailedCallback === 'function') {
subscriptionFailedCallback();
}
}
}
// Waiting that subscription has been connected before trying to unsubscribe
async _waitForSubscriptionToBeConnected(subscriptionId) {
const subscriptionObserver = this.subscriptionObserverMap.get(subscriptionId);
if (subscriptionObserver) {
const { subscriptionState } = subscriptionObserver;
// This in case unsubscribe is invoked before sending start subscription message
if (subscriptionState === constants_1.SUBSCRIPTION_STATUS.PENDING) {
return new Promise((resolve, reject) => {
const { observer, subscriptionState: observedSubscriptionState, variables, query, } = subscriptionObserver;
this.subscriptionObserverMap.set(subscriptionId, {
observer,
subscriptionState: observedSubscriptionState,
variables,
query,
subscriptionReadyCallback: resolve,
subscriptionFailedCallback: reject,
});
});
}
}
}
_sendUnsubscriptionMessage(subscriptionId) {
try {
if (this.awsRealTimeSocket &&
this.awsRealTimeSocket.readyState === WebSocket.OPEN &&
this.socketStatus === constants_1.SOCKET_STATUS.READY) {
// Preparing unsubscribe message to stop receiving messages for that subscription
const unsubscribeMessage = {
id: subscriptionId,
type: constants_1.MESSAGE_TYPES.GQL_STOP,
};
const stringToAWSRealTime = JSON.stringify(unsubscribeMessage);
this.awsRealTimeSocket.send(stringToAWSRealTime);
}
}
catch (err) {
// If GQL_STOP is not sent because of disconnection issue, then there is nothing the client can do
logger.debug({ err });
}
}
_removeSubscriptionObserver(subscriptionId) {
this.subscriptionObserverMap.delete(subscriptionId);
// Verifying 1000ms after removing subscription in case there are new subscription unmount/mount
setTimeout(this._closeSocketIfRequired.bind(this), 1000);
}
_closeSocketIfRequired() {
if (this.subscriptionObserverMap.size > 0) {
// Active subscriptions on the WebSocket
return;
}
if (!this.awsRealTimeSocket) {
this.socketStatus = constants_1.SOCKET_STATUS.CLOSED;
return;
}
this.connectionStateMonitor.record(ConnectionStateMonitor_1.CONNECTION_CHANGE.CLOSING_CONNECTION);
if (this.awsRealTimeSocket.bufferedAmount > 0) {
// Still data on the WebSocket
setTimeout(this._closeSocketIfRequired.bind(this), 1000);
}
else {
logger.debug('closing WebSocket...');
if (this.keepAliveTimeoutId) {
clearTimeout(this.keepAliveTimeoutId);
}
if (this.keepAliveAlertTimeoutId) {
clearTimeout(this.keepAliveAlertTimeoutId);
}
const tempSocket = this.awsRealTimeSocket;
// Cleaning callbacks to avoid race condition, socket still exists
tempSocket.onclose = null;
tempSocket.onerror = null;
tempSocket.close(1000);
this.awsRealTimeSocket = undefined;
this.socketStatus = constants_1.SOCKET_STATUS.CLOSED;
this.connectionStateMonitor.record(ConnectionStateMonitor_1.CONNECTION_CHANGE.CLOSED);
}
}
_handleIncomingSubscriptionMessage(message) {
if (typeof message.data !== 'string') {
return;
}
logger.debug(`subscription message from AWS AppSync RealTime: ${message.data}`);
const { id = '', payload, type, } = JSON.parse(String(message.data));
const { observer = null, query = '', variables = {}, startAckTimeoutId, subscriptionReadyCallback, subscriptionFailedCallback, } = this.subscriptionObserverMap.get(id) || {};
logger.debug({ id, observer, query, variables });
if (type === constants_1.MESSAGE_TYPES.GQL_DATA && payload && payload.data) {
_handleSubscriptionData(message) {
this.logger.debug(`subscription message from AWS AppSync RealTime: ${message.data}`);
const { id = '', payload, type } = JSON.parse(String(message.data));
const { observer = null, query = '', variables = {}, } = this.subscriptionObserverMap.get(id) || {};
this.logger.debug({ id, observer, query, variables });
if (type === constants_1.MESSAGE_TYPES.DATA && payload && payload.data) {
if (observer) {

@@ -379,407 +73,16 @@ observer.next(payload);

else {
logger.debug(`observer not found for id: ${id}`);
this.logger.debug(`observer not found for id: ${id}`);
}
return;
return [true, { id, type, payload }];
}
if (type === constants_1.MESSAGE_TYPES.GQL_START_ACK) {
logger.debug(`subscription ready for ${JSON.stringify({ query, variables })}`);
if (typeof subscriptionReadyCallback === 'function') {
subscriptionReadyCallback();
}
if (startAckTimeoutId)
clearTimeout(startAckTimeoutId);
dispatchApiEvent({
event: PubSub_1.CONTROL_MSG.SUBSCRIPTION_ACK,
data: { query, variables },
message: 'Connection established for subscription',
});
const subscriptionState = constants_1.SUBSCRIPTION_STATUS.CONNECTED;
if (observer) {
this.subscriptionObserverMap.set(id, {
observer,
query,
variables,
startAckTimeoutId: undefined,
subscriptionState,
subscriptionReadyCallback,
subscriptionFailedCallback,
});
}
this.connectionStateMonitor.record(ConnectionStateMonitor_1.CONNECTION_CHANGE.CONNECTION_ESTABLISHED);
return;
}
if (type === constants_1.MESSAGE_TYPES.GQL_CONNECTION_KEEP_ALIVE) {
if (this.keepAliveTimeoutId)
clearTimeout(this.keepAliveTimeoutId);
if (this.keepAliveAlertTimeoutId)
clearTimeout(this.keepAliveAlertTimeoutId);
this.keepAliveTimeoutId = setTimeout(() => {
this._errorDisconnect(PubSub_1.CONTROL_MSG.TIMEOUT_DISCONNECT);
}, this.keepAliveTimeout);
this.keepAliveAlertTimeoutId = setTimeout(() => {
this.connectionStateMonitor.record(ConnectionStateMonitor_1.CONNECTION_CHANGE.KEEP_ALIVE_MISSED);
}, constants_1.DEFAULT_KEEP_ALIVE_ALERT_TIMEOUT);
this.connectionStateMonitor.record(ConnectionStateMonitor_1.CONNECTION_CHANGE.KEEP_ALIVE);
return;
}
if (type === constants_1.MESSAGE_TYPES.GQL_ERROR) {
const subscriptionState = constants_1.SUBSCRIPTION_STATUS.FAILED;
if (observer) {
this.subscriptionObserverMap.set(id, {
observer,
query,
variables,
startAckTimeoutId,
subscriptionReadyCallback,
subscriptionFailedCallback,
subscriptionState,
});
logger.debug(`${PubSub_1.CONTROL_MSG.CONNECTION_FAILED}: ${JSON.stringify(payload)}`);
observer.error({
errors: [
{
...new graphql_1.GraphQLError(`${PubSub_1.CONTROL_MSG.CONNECTION_FAILED}: ${JSON.stringify(payload)}`),
},
],
});
if (startAckTimeoutId)
clearTimeout(startAckTimeoutId);
if (typeof subscriptionFailedCallback === 'function') {
subscriptionFailedCallback();
}
}
}
return [false, { id, type, payload }];
}
_errorDisconnect(msg) {
logger.debug(`Disconnect error: ${msg}`);
if (this.awsRealTimeSocket) {
this.connectionStateMonitor.record(ConnectionStateMonitor_1.CONNECTION_CHANGE.CLOSED);
this.awsRealTimeSocket.close();
}
this.socketStatus = constants_1.SOCKET_STATUS.CLOSED;
}
_timeoutStartSubscriptionAck(subscriptionId) {
const subscriptionObserver = this.subscriptionObserverMap.get(subscriptionId);
if (subscriptionObserver) {
const { observer, query, variables } = subscriptionObserver;
if (!observer) {
return;
}
this.subscriptionObserverMap.set(subscriptionId, {
observer,
query,
variables,
subscriptionState: constants_1.SUBSCRIPTION_STATUS.FAILED,
});
this.connectionStateMonitor.record(ConnectionStateMonitor_1.CONNECTION_CHANGE.CLOSED);
logger.debug('timeoutStartSubscription', JSON.stringify({ query, variables }));
}
}
/**
* Strips out `Authorization` header if present
*/
_extractNonAuthHeaders(headers) {
if (!headers) {
return {};
}
if ('Authorization' in headers) {
const { Authorization: _, ...nonAuthHeaders } = headers;
return nonAuthHeaders;
}
return headers;
}
/**
*
* @param headers - http headers
* @returns uri-encoded query parameters derived from custom headers
*/
_queryParamsFromCustomHeaders(headers) {
const nonAuthHeaders = this._extractNonAuthHeaders(headers);
const params = new utils_1.AmplifyUrlSearchParams();
Object.entries(nonAuthHeaders).forEach(([k, v]) => {
params.append(k, v);
});
return params;
}
/**
* Normalizes AppSync realtime endpoint URL
*
* @param appSyncGraphqlEndpoint - AppSync endpointUri from config
* @param urlParams - URLSearchParams
* @returns fully resolved string realtime endpoint URL
*/
_realtimeUrlWithQueryString(appSyncGraphqlEndpoint, urlParams) {
const protocol = 'wss://';
let realtimeEndpoint = appSyncGraphqlEndpoint ?? '';
if (this.isCustomDomain(realtimeEndpoint)) {
realtimeEndpoint = realtimeEndpoint.concat(customDomainPath);
}
else {
realtimeEndpoint = realtimeEndpoint
.replace('appsync-api', 'appsync-realtime-api')
.replace('gogi-beta', 'grt-beta');
}
realtimeEndpoint = realtimeEndpoint
.replace('https://', protocol)
.replace('http://', protocol);
const realtimeEndpointUrl = new utils_1.AmplifyUrl(realtimeEndpoint);
// preserves any query params a customer might manually set in the configuration
const existingParams = new utils_1.AmplifyUrlSearchParams(realtimeEndpointUrl.search);
for (const [k, v] of urlParams.entries()) {
existingParams.append(k, v);
}
realtimeEndpointUrl.search = existingParams.toString();
return realtimeEndpointUrl.toString();
}
_initializeWebSocketConnection({ appSyncGraphqlEndpoint, authenticationType, apiKey, region, additionalCustomHeaders, }) {
if (this.socketStatus === constants_1.SOCKET_STATUS.READY) {
return;
}
// TODO(Eslint): refactor to now use async function as the promise executor
// eslint-disable-next-line no-async-promise-executor
return new Promise(async (resolve, reject) => {
this.promiseArray.push({ res: resolve, rej: reject });
if (this.socketStatus === constants_1.SOCKET_STATUS.CLOSED) {
try {
this.socketStatus = constants_1.SOCKET_STATUS.CONNECTING;
const payloadString = '{}';
const authHeader = await this._awsRealTimeHeaderBasedAuth({
authenticationType,
payload: payloadString,
canonicalUri: '/connect',
apiKey,
appSyncGraphqlEndpoint,
region,
additionalCustomHeaders,
});
const headerString = authHeader ? JSON.stringify(authHeader) : '';
// base64url-encoded string
const encodedHeader = utils_1.base64Encoder.convert(headerString, {
urlSafe: true,
skipPadding: true,
});
const authTokenSubprotocol = `header-${encodedHeader}`;
const queryParams = this._queryParamsFromCustomHeaders(additionalCustomHeaders);
const awsRealTimeUrl = this._realtimeUrlWithQueryString(appSyncGraphqlEndpoint, queryParams);
await this._initializeRetryableHandshake(awsRealTimeUrl, authTokenSubprotocol);
this.promiseArray.forEach(({ res }) => {
logger.debug('Notifying connection successful');
res();
});
this.socketStatus = constants_1.SOCKET_STATUS.READY;
this.promiseArray = [];
}
catch (err) {
logger.debug('Connection exited with', err);
this.promiseArray.forEach(({ rej }) => {
rej(err);
});
this.promiseArray = [];
if (this.awsRealTimeSocket &&
this.awsRealTimeSocket.readyState === WebSocket.OPEN) {
this.awsRealTimeSocket.close(3001);
}
this.awsRealTimeSocket = undefined;
this.socketStatus = constants_1.SOCKET_STATUS.CLOSED;
}
}
});
}
async _initializeRetryableHandshake(awsRealTimeUrl, subprotocol) {
logger.debug(`Initializaling retryable Handshake`);
await (0, utils_1.jitteredExponentialRetry)(this._initializeHandshake.bind(this), [awsRealTimeUrl, subprotocol], constants_1.MAX_DELAY_MS);
}
/**
*
* @param subprotocol -
*/
async _initializeHandshake(awsRealTimeUrl, subprotocol) {
logger.debug(`Initializing handshake ${awsRealTimeUrl}`);
// Because connecting the socket is async, is waiting until connection is open
// Step 1: connect websocket
try {
await (() => {
return new Promise((resolve, reject) => {
const newSocket = this.getNewWebSocket(awsRealTimeUrl, [
'graphql-ws',
subprotocol,
]);
newSocket.onerror = () => {
logger.debug(`WebSocket connection error`);
};
newSocket.onclose = () => {
reject(new Error('Connection handshake error'));
};
newSocket.onopen = () => {
this.awsRealTimeSocket = newSocket;
resolve();
};
});
})();
// Step 2: wait for ack from AWS AppSyncReaTime after sending init
await (() => {
return new Promise((resolve, reject) => {
if (this.awsRealTimeSocket) {
let ackOk = false;
this.awsRealTimeSocket.onerror = error => {
logger.debug(`WebSocket error ${JSON.stringify(error)}`);
};
this.awsRealTimeSocket.onclose = event => {
logger.debug(`WebSocket closed ${event.reason}`);
reject(new Error(JSON.stringify(event)));
};
this.awsRealTimeSocket.onmessage = (message) => {
if (typeof message.data !== 'string') {
return;
}
logger.debug(`subscription message from AWS AppSyncRealTime: ${message.data} `);
const data = JSON.parse(message.data);
const { type, payload: { connectionTimeoutMs = constants_1.DEFAULT_KEEP_ALIVE_TIMEOUT, } = {}, } = data;
if (type === constants_1.MESSAGE_TYPES.GQL_CONNECTION_ACK) {
ackOk = true;
if (this.awsRealTimeSocket) {
this.keepAliveTimeout = connectionTimeoutMs;
this.awsRealTimeSocket.onmessage =
this._handleIncomingSubscriptionMessage.bind(this);
this.awsRealTimeSocket.onerror = err => {
logger.debug(err);
this._errorDisconnect(PubSub_1.CONTROL_MSG.CONNECTION_CLOSED);
};
this.awsRealTimeSocket.onclose = event => {
logger.debug(`WebSocket closed ${event.reason}`);
this._errorDisconnect(PubSub_1.CONTROL_MSG.CONNECTION_CLOSED);
};
}
resolve('Cool, connected to AWS AppSyncRealTime');
return;
}
if (type === constants_1.MESSAGE_TYPES.GQL_CONNECTION_ERROR) {
const { payload: { errors: [{ errorType = '', errorCode = 0 } = {}] = [], } = {}, } = data;
// TODO(Eslint): refactor to reject an Error object instead of a plain object
// eslint-disable-next-line prefer-promise-reject-errors
reject({ errorType, errorCode });
}
};
const gqlInit = {
type: constants_1.MESSAGE_TYPES.GQL_CONNECTION_INIT,
};
this.awsRealTimeSocket.send(JSON.stringify(gqlInit));
const checkAckOk = (targetAckOk) => {
if (!targetAckOk) {
this.connectionStateMonitor.record(ConnectionStateMonitor_1.CONNECTION_CHANGE.CONNECTION_FAILED);
reject(new Error(`Connection timeout: ack from AWSAppSyncRealTime was not received after ${constants_1.CONNECTION_INIT_TIMEOUT} ms`));
}
};
setTimeout(() => {
checkAckOk(ackOk);
}, constants_1.CONNECTION_INIT_TIMEOUT);
}
});
})();
}
catch (err) {
const { errorType, errorCode } = err;
if (constants_1.NON_RETRYABLE_CODES.includes(errorCode)) {
throw new utils_1.NonRetryableError(errorType);
}
else if (errorType) {
throw new Error(errorType);
}
else {
throw err;
}
}
}
async _awsRealTimeHeaderBasedAuth({ apiKey, authenticationType, payload, canonicalUri, appSyncGraphqlEndpoint, region, additionalCustomHeaders, }) {
const headerHandler = {
apiKey: this._awsRealTimeApiKeyHeader.bind(this),
iam: this._awsRealTimeIAMHeader.bind(this),
oidc: this._awsAuthTokenHeader.bind(this),
userPool: this._awsAuthTokenHeader.bind(this),
lambda: this._customAuthHeader,
none: this._customAuthHeader,
};
if (!authenticationType || !headerHandler[authenticationType]) {
logger.debug(`Authentication type ${authenticationType} not supported`);
return undefined;
}
else {
const handler = headerHandler[authenticationType];
const host = appSyncGraphqlEndpoint
? new utils_1.AmplifyUrl(appSyncGraphqlEndpoint).host
: undefined;
logger.debug(`Authenticating with ${JSON.stringify(authenticationType)}`);
let resolvedApiKey;
if (authenticationType === 'apiKey') {
resolvedApiKey = apiKey;
}
const result = await handler({
payload,
canonicalUri,
appSyncGraphqlEndpoint,
apiKey: resolvedApiKey,
region,
host,
additionalCustomHeaders,
});
return result;
}
}
async _awsAuthTokenHeader({ host }) {
const session = await (0, core_1.fetchAuthSession)();
_unsubscribeMessage(subscriptionId) {
return {
Authorization: session?.tokens?.accessToken?.toString(),
host,
id: subscriptionId,
type: constants_1.MESSAGE_TYPES.GQL_STOP,
};
}
async _awsRealTimeApiKeyHeader({ apiKey, host, }) {
const dt = new Date();
const dtStr = dt.toISOString().replace(/[:-]|\.\d{3}/g, '');
return {
host,
'x-amz-date': dtStr,
'x-api-key': apiKey,
};
}
async _awsRealTimeIAMHeader({ payload, canonicalUri, appSyncGraphqlEndpoint, region, }) {
const endpointInfo = {
region,
service: 'appsync',
};
const creds = (await (0, core_1.fetchAuthSession)()).credentials;
const request = {
url: `${appSyncGraphqlEndpoint}${canonicalUri}`,
data: payload,
method: 'POST',
headers: { ...constants_1.AWS_APPSYNC_REALTIME_HEADERS },
};
const signedParams = (0, aws_client_utils_1.signRequest)({
headers: request.headers,
method: request.method,
url: new utils_1.AmplifyUrl(request.url),
body: request.data,
}, {
// TODO: What do we need to do to remove these !'s?
credentials: creds,
signingRegion: endpointInfo.region,
signingService: endpointInfo.service,
});
return signedParams.headers;
}
_customAuthHeader({ host, additionalCustomHeaders, }) {
/**
* If `additionalHeaders` was provided to the subscription as a function,
* the headers that are returned by that function will already have been
* provided before this function is called.
*/
if (!additionalCustomHeaders?.Authorization) {
throw new Error('No auth token specified');
}
return {
Authorization: additionalCustomHeaders.Authorization,
host,
};
}
}
exports.AWSAppSyncRealTimeProvider = AWSAppSyncRealTimeProvider;
//# sourceMappingURL=index.js.map

@@ -40,5 +40,5 @@ 'use strict';

* Server -> Client message.
* This message type is for subscription message from AWS AppSync RealTime
* This message type is for subscription message from AWS AppSync RealTime or Events
*/
MESSAGE_TYPES["GQL_DATA"] = "data";
MESSAGE_TYPES["DATA"] = "data";
/**

@@ -64,2 +64,32 @@ * Server -> Client message.

MESSAGE_TYPES["GQL_ERROR"] = "error";
/**
* Client -> Server message.
* This message type is for registering subscriptions with Events
*/
MESSAGE_TYPES["EVENT_SUBSCRIBE"] = "subscribe";
/**
* Client -> Server message.
* This message type is for publishing a message with Events
*/
MESSAGE_TYPES["EVENT_PUBLISH"] = "publish";
/**
* Server -> Client message.
* Server acknowledges successful subscription
*/
MESSAGE_TYPES["EVENT_SUBSCRIBE_ACK"] = "subscribe_success";
/**
* Server -> Client message.
* Server acknowledges successful publish
*/
MESSAGE_TYPES["EVENT_PUBLISH_ACK"] = "publish_success";
/**
* Client -> Server message.
* This message type is for unregister subscriptions with AWS AppSync RealTime
*/
MESSAGE_TYPES["EVENT_STOP"] = "unsubscribe";
/**
* Server -> Client message.
* This is the ack response from AWS AppSync Events to EVENT_STOP message
*/
MESSAGE_TYPES["EVENT_COMPLETE"] = "unsubscribe_success";
})(exports.MESSAGE_TYPES || (exports.MESSAGE_TYPES = {}));

@@ -66,0 +96,0 @@ (function (SUBSCRIPTION_STATUS) {

@@ -13,3 +13,3 @@ 'use strict';

* Captures the reconnect event logic used to determine when to reconnect to PubSub providers.
* Reconnnect attempts are delayed by 5 seconds to let the interface settle.
* Reconnect attempts are delayed by 5 seconds to let the interface settle.
* Attempting to reconnect only once creates unrecoverable states when the network state isn't

@@ -16,0 +16,0 @@ * supported by the browser, so this keeps retrying every minute until halted.

@@ -32,2 +32,21 @@ 'use strict';

exports.resolveConfig = resolveConfig;
// /**
// * @internal
// */
// export const resolveEventsConfig = (amplify: AmplifyClassV6) => {
// const config = amplify.getConfig();
// if (!config.API?.Events) {
// logger.warn(
// 'The Events configuration is missing. This is likely due to Amplify.configure() not being called prior to using events.connect() or events.post().',
// );
// }
// const { apiKey, endpoint, defaultAuthMode, region } =
// config.API?.Events ?? {};
// return {
// apiKey,
// defaultAuthMode,
// appSyncGraphqlEndpoint: endpoint,
// region,
// };
// };
//# sourceMappingURL=resolveConfig.js.map

@@ -0,3 +1,6 @@

import * as events from './internals/events';
export { events };
export { GraphQLAPI, GraphQLAPIClass, graphqlOperation } from './GraphQLAPI';
export * from './types';
export { CONNECTION_STATE_CHANGE } from './Providers/constants';
export * from './internals/events/types';

@@ -17,3 +17,2 @@ import { OperationTypeNode } from 'graphql';

getModuleName(): string;
private _headerBasedAuth;
/**

@@ -20,0 +19,0 @@ * to get the operation type

@@ -1,16 +0,5 @@

import { Observable } from 'rxjs';
import { CustomUserAgentDetails, DocumentType, GraphQLAuthMode } from '@aws-amplify/core/internals/utils';
import { CustomHeaders } from '@aws-amplify/data-schema/runtime';
import { PubSubContentObserver } from '../../types/PubSub';
import { SUBSCRIPTION_STATUS } from '../constants';
import { AWSWebSocketProvider } from '../AWSWebSocketProvider';
type ResolvedGraphQLAuthModes = Exclude<GraphQLAuthMode, 'identityPool'>;
export interface ObserverQuery {
observer: PubSubContentObserver;
query: string;
variables: Record<string, DocumentType>;
subscriptionState: SUBSCRIPTION_STATUS;
subscriptionReadyCallback?(): void;
subscriptionFailedCallback?(reason?: any): void;
startAckTimeoutId?: ReturnType<typeof setTimeout>;
}
export interface AWSAppSyncRealTimeProviderOptions {

@@ -20,3 +9,3 @@ appSyncGraphqlEndpoint?: string;

query?: string;
variables?: Record<string, DocumentType>;
variables?: DocumentType;
apiKey?: string;

@@ -29,63 +18,27 @@ region?: string;

}
export declare class AWSAppSyncRealTimeProvider {
private awsRealTimeSocket?;
private socketStatus;
private keepAliveTimeoutId?;
private keepAliveTimeout;
private keepAliveAlertTimeoutId?;
private subscriptionObserverMap;
private promiseArray;
private connectionState;
private readonly connectionStateMonitor;
private readonly reconnectionMonitor;
private connectionStateMonitorSubscription;
interface DataObject extends Record<string, unknown> {
data: Record<string, unknown>;
}
interface DataPayload {
id: string;
payload: DataObject;
type: string;
}
export declare class AWSAppSyncRealTimeProvider extends AWSWebSocketProvider {
constructor();
/**
* Mark the socket closed and release all active listeners
*/
close(): void;
getNewWebSocket(url: string, protocol: string[]): WebSocket;
subscribe(options?: AWSAppSyncRealTimeProviderOptions, customUserAgentDetails?: CustomUserAgentDetails): import("rxjs").Observable<Record<string, unknown>>;
getProviderName(): string;
private isCustomDomain;
subscribe(options?: AWSAppSyncRealTimeProviderOptions, customUserAgentDetails?: CustomUserAgentDetails): Observable<Record<string, unknown>>;
private _startSubscriptionWithAWSAppSyncRealTime;
private _logStartSubscriptionError;
private _waitForSubscriptionToBeConnected;
private _sendUnsubscriptionMessage;
private _removeSubscriptionObserver;
private _closeSocketIfRequired;
private _handleIncomingSubscriptionMessage;
private _errorDisconnect;
private _timeoutStartSubscriptionAck;
/**
* Strips out `Authorization` header if present
*/
private _extractNonAuthHeaders;
/**
*
* @param headers - http headers
* @returns uri-encoded query parameters derived from custom headers
*/
private _queryParamsFromCustomHeaders;
/**
* Normalizes AppSync realtime endpoint URL
*
* @param appSyncGraphqlEndpoint - AppSync endpointUri from config
* @param urlParams - URLSearchParams
* @returns fully resolved string realtime endpoint URL
*/
private _realtimeUrlWithQueryString;
private _initializeWebSocketConnection;
private _initializeRetryableHandshake;
/**
*
* @param subprotocol -
*/
private _initializeHandshake;
private _awsRealTimeHeaderBasedAuth;
private _awsAuthTokenHeader;
private _awsRealTimeApiKeyHeader;
private _awsRealTimeIAMHeader;
private _customAuthHeader;
protected _prepareSubscriptionPayload({ options, subscriptionId, customUserAgentDetails, additionalCustomHeaders, libraryConfigHeaders, }: {
options: AWSAppSyncRealTimeProviderOptions;
subscriptionId: string;
customUserAgentDetails: CustomUserAgentDetails | undefined;
additionalCustomHeaders: Record<string, string>;
libraryConfigHeaders: Record<string, string>;
}): Promise<string>;
protected _handleSubscriptionData(message: MessageEvent): [boolean, DataPayload];
protected _unsubscribeMessage(subscriptionId: string): {
id: string;
type: string;
};
}
export {};

@@ -33,5 +33,5 @@ export { AMPLIFY_SYMBOL } from '@aws-amplify/core/internals/utils';

* Server -> Client message.
* This message type is for subscription message from AWS AppSync RealTime
* This message type is for subscription message from AWS AppSync RealTime or Events
*/
GQL_DATA = "data",
DATA = "data",
/**

@@ -56,3 +56,33 @@ * Server -> Client message.

*/
GQL_ERROR = "error"
GQL_ERROR = "error",
/**
* Client -> Server message.
* This message type is for registering subscriptions with Events
*/
EVENT_SUBSCRIBE = "subscribe",
/**
* Client -> Server message.
* This message type is for publishing a message with Events
*/
EVENT_PUBLISH = "publish",
/**
* Server -> Client message.
* Server acknowledges successful subscription
*/
EVENT_SUBSCRIBE_ACK = "subscribe_success",
/**
* Server -> Client message.
* Server acknowledges successful publish
*/
EVENT_PUBLISH_ACK = "publish_success",
/**
* Client -> Server message.
* This message type is for unregister subscriptions with AWS AppSync RealTime
*/
EVENT_STOP = "unsubscribe",
/**
* Server -> Client message.
* This is the ack response from AWS AppSync Events to EVENT_STOP message
*/
EVENT_COMPLETE = "unsubscribe_success"
}

@@ -59,0 +89,0 @@ export declare enum SUBSCRIPTION_STATUS {

@@ -12,3 +12,31 @@ import { Observable } from 'rxjs';

export declare const CONNECTION_CHANGE: {
[key in 'KEEP_ALIVE_MISSED' | 'KEEP_ALIVE' | 'CONNECTION_ESTABLISHED' | 'CONNECTION_FAILED' | 'CLOSING_CONNECTION' | 'OPENING_CONNECTION' | 'CLOSED' | 'ONLINE' | 'OFFLINE']: Partial<LinkedConnectionStates>;
readonly KEEP_ALIVE_MISSED: {
readonly keepAliveState: "unhealthy";
};
readonly KEEP_ALIVE: {
readonly keepAliveState: "healthy";
};
readonly CONNECTION_ESTABLISHED: {
readonly connectionState: "connected";
};
readonly CONNECTION_FAILED: {
readonly intendedConnectionState: "disconnected";
readonly connectionState: "disconnected";
};
readonly CLOSING_CONNECTION: {
readonly intendedConnectionState: "disconnected";
};
readonly OPENING_CONNECTION: {
readonly intendedConnectionState: "connected";
readonly connectionState: "connecting";
};
readonly CLOSED: {
readonly connectionState: "disconnected";
};
readonly ONLINE: {
readonly networkState: "connected";
};
readonly OFFLINE: {
readonly networkState: "disconnected";
};
};

@@ -15,0 +43,0 @@ export declare class ConnectionStateMonitor {

@@ -8,3 +8,3 @@ import { Observer } from 'rxjs';

* Captures the reconnect event logic used to determine when to reconnect to PubSub providers.
* Reconnnect attempts are delayed by 5 seconds to let the interface settle.
* Reconnect attempts are delayed by 5 seconds to let the interface settle.
* Attempting to reconnect only once creates unrecoverable states when the network state isn't

@@ -11,0 +11,0 @@ * supported by the browser, so this keeps retrying every minute until halted.

{
"name": "@aws-amplify/api-graphql",
"version": "4.4.0",
"version": "4.4.1-events.0619318.0+0619318",
"description": "Api-graphql category of aws-amplify",

@@ -87,4 +87,4 @@ "main": "./dist/cjs/index.js",

"dependencies": {
"@aws-amplify/api-rest": "4.0.51",
"@aws-amplify/core": "6.4.4",
"@aws-amplify/api-rest": "4.0.52-events.0619318.0+0619318",
"@aws-amplify/core": "6.4.5-events.0619318.0+0619318",
"@aws-amplify/data-schema": "^1.7.0",

@@ -105,3 +105,3 @@ "@aws-sdk/types": "3.387.0",

],
"gitHead": "1d4c5c8c5da7e2567b3dfe62b788d3c5f54bedb7"
"gitHead": "06193189d7eb5d64880eb70d94c0a82748cc2097"
}
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import * as events from './internals/events';
export { events };
export { GraphQLAPI, GraphQLAPIClass, graphqlOperation } from './GraphQLAPI';

@@ -8,1 +12,2 @@ export * from './types';

export { CONNECTION_STATE_CHANGE } from './Providers/constants';
export * from './internals/events/types';

@@ -15,3 +15,2 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.

CustomUserAgentDetails,
GraphQLAuthMode,
getAmplifyUserAgent,

@@ -34,13 +33,7 @@ } from '@aws-amplify/core/internals/utils';

import { repackageUnauthorizedError } from '../utils/errors/repackageAuthError';
import {
NO_API_KEY,
NO_AUTH_TOKEN_HEADER,
NO_ENDPOINT,
NO_SIGNED_IN_USER,
NO_VALID_AUTH_TOKEN,
NO_VALID_CREDENTIALS,
} from '../utils/errors/constants';
import { NO_ENDPOINT } from '../utils/errors/constants';
import { GraphQLApiError, createGraphQLResultWithError } from '../utils/errors';
import { isGraphQLResponseWithErrors } from './utils/runtimeTypeGuards/isGraphQLResponseWithErrors';
import { headerBasedAuth } from './graphqlAuth';

@@ -77,76 +70,2 @@ const USER_AGENT_HEADER = 'x-amz-user-agent';

private async _headerBasedAuth(
amplify: AmplifyClassV6,
authMode: GraphQLAuthMode,
additionalHeaders: Record<string, string> = {},
) {
const { apiKey } = resolveConfig(amplify);
let headers = {};
switch (authMode) {
case 'apiKey':
if (!apiKey) {
throw new GraphQLApiError(NO_API_KEY);
}
headers = {
'X-Api-Key': apiKey,
};
break;
case 'iam': {
const session = await amplify.Auth.fetchAuthSession();
if (session.credentials === undefined) {
throw new GraphQLApiError(NO_VALID_CREDENTIALS);
}
break;
}
case 'oidc':
case 'userPool': {
let token: string | undefined;
try {
token = (
await amplify.Auth.fetchAuthSession()
).tokens?.accessToken.toString();
} catch (e) {
// fetchAuthSession failed
throw new GraphQLApiError({
...NO_SIGNED_IN_USER,
underlyingError: e,
});
}
// `fetchAuthSession()` succeeded but didn't return `tokens`.
// This may happen when unauthenticated access is enabled and there is
// no user signed in.
if (!token) {
throw new GraphQLApiError(NO_VALID_AUTH_TOKEN);
}
headers = {
Authorization: token,
};
break;
}
case 'lambda':
if (
typeof additionalHeaders === 'object' &&
!additionalHeaders.Authorization
) {
throw new GraphQLApiError(NO_AUTH_TOKEN_HEADER);
}
headers = {
Authorization: additionalHeaders.Authorization,
};
break;
case 'none':
break;
default:
break;
}
return headers;
}
/**

@@ -258,2 +177,3 @@ * to get the operation type

const {
apiKey,
region,

@@ -310,10 +230,11 @@ endpoint: appSyncGraphqlEndpoint,

// TODO: Figure what we need to do to remove `!`'s.
const authHeaders = await headerBasedAuth(
amplify,
authMode,
apiKey,
additionalCustomHeaders,
);
const headers = {
...(!customEndpoint &&
(await this._headerBasedAuth(
amplify,
authMode!,
additionalCustomHeaders,
))),
...(!customEndpoint && authHeaders),
/**

@@ -325,11 +246,3 @@ * Custom endpoint headers.

*/
...((customEndpoint &&
(customEndpointRegion
? await this._headerBasedAuth(
amplify,
authMode!,
additionalCustomHeaders,
)
: {})) ||
{}),
...((customEndpoint && (customEndpointRegion ? authHeaders : {})) || {}),
// Custom headers included in Amplify configuration options:

@@ -387,5 +300,5 @@ ...(customHeaders &&

try {
// See the inline doc of the REST `post()` API for possible errors to be thrown.
// As these errors are catastrophic they should be caught and handled by GraphQL
// API consumers.
// // // See the inline doc of the REST `post()` API for possible errors to be thrown.
// // // As these errors are catastrophic they should be caught and handled by GraphQL
// // // API consumers.
const { body: responseBody } = await this._api.post(amplify, {

@@ -392,0 +305,0 @@ url: new AmplifyUrl(endpoint),

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import { Observable, SubscriptionLike } from 'rxjs';
import { GraphQLError } from 'graphql';
import {
ConsoleLogger,
Hub,
HubPayload,
fetchAuthSession,
} from '@aws-amplify/core';
import { signRequest } from '@aws-amplify/core/internals/aws-client-utils';
import {
AmplifyUrl,
AmplifyUrlSearchParams,
CustomUserAgentDetails,
DocumentType,
GraphQLAuthMode,
NonRetryableError,
USER_AGENT_HEADER,
amplifyUuid,
base64Encoder,
getAmplifyUserAgent,
isNonRetryableError,
jitteredExponentialRetry,
} from '@aws-amplify/core/internals/utils';
import {
CustomHeaders,
RequestOptions,
} from '@aws-amplify/data-schema/runtime';
import { CustomHeaders } from '@aws-amplify/data-schema/runtime';
import {
CONTROL_MSG,
ConnectionState,
PubSubContentObserver,
} from '../../types/PubSub';
import {
AMPLIFY_SYMBOL,
AWS_APPSYNC_REALTIME_HEADERS,
CONNECTION_INIT_TIMEOUT,
CONNECTION_STATE_CHANGE,
DEFAULT_KEEP_ALIVE_ALERT_TIMEOUT,
DEFAULT_KEEP_ALIVE_TIMEOUT,
MAX_DELAY_MS,
MESSAGE_TYPES,
NON_RETRYABLE_CODES,
SOCKET_STATUS,
START_ACK_TIMEOUT,
SUBSCRIPTION_STATUS,
} from '../constants';
import {
CONNECTION_CHANGE,
ConnectionStateMonitor,
} from '../../utils/ConnectionStateMonitor';
import {
ReconnectEvent,
ReconnectionMonitor,
} from '../../utils/ReconnectionMonitor';
import { MESSAGE_TYPES } from '../constants';
import { AWSWebSocketProvider } from '../AWSWebSocketProvider';
import { awsRealTimeHeaderBasedAuth } from '../AWSWebSocketProvider/authHeaders';
const logger = new ConsoleLogger('AWSAppSyncRealTimeProvider');
const dispatchApiEvent = (payload: HubPayload) => {
Hub.dispatch('api', payload, 'PubSub', AMPLIFY_SYMBOL);
};
// resolved/actual AuthMode values. identityPool gets resolves to IAM upstream in InternalGraphQLAPI._graphqlSubscribe
type ResolvedGraphQLAuthModes = Exclude<GraphQLAuthMode, 'identityPool'>;
export interface ObserverQuery {
observer: PubSubContentObserver;
query: string;
variables: Record<string, DocumentType>;
subscriptionState: SUBSCRIPTION_STATUS;
subscriptionReadyCallback?(): void;
subscriptionFailedCallback?(reason?: any): void;
startAckTimeoutId?: ReturnType<typeof setTimeout>;
}
const standardDomainPattern =
/^https:\/\/\w{26}\.appsync-api\.\w{2}(?:(?:-\w{2,})+)-\d\.amazonaws.com(?:\.cn)?\/graphql$/i;
const customDomainPath = '/realtime';
interface DataObject extends Record<string, unknown> {
data: Record<string, unknown>;
}
interface DataPayload {
id: string;
payload: DataObject;
type: string;
}
interface ParsedMessagePayload {
type: string;
payload: {
connectionTimeoutMs: number;
errors?: [{ errorType: string; errorCode: number }];
};
}
export interface AWSAppSyncRealTimeProviderOptions {

@@ -104,3 +24,3 @@ appSyncGraphqlEndpoint?: string;

query?: string;
variables?: Record<string, DocumentType>;
variables?: DocumentType;
apiKey?: string;

@@ -114,205 +34,44 @@ region?: string;

type AWSAppSyncRealTimeAuthInput =
Partial<AWSAppSyncRealTimeProviderOptions> & {
canonicalUri: string;
payload: string;
host?: string | undefined;
};
interface DataObject extends Record<string, unknown> {
data: Record<string, unknown>;
}
export class AWSAppSyncRealTimeProvider {
private awsRealTimeSocket?: WebSocket;
private socketStatus: SOCKET_STATUS = SOCKET_STATUS.CLOSED;
private keepAliveTimeoutId?: ReturnType<typeof setTimeout>;
private keepAliveTimeout = DEFAULT_KEEP_ALIVE_TIMEOUT;
private keepAliveAlertTimeoutId?: ReturnType<typeof setTimeout>;
private subscriptionObserverMap = new Map<string, ObserverQuery>();
private promiseArray: { res(): void; rej(reason?: any): void }[] = [];
private connectionState: ConnectionState | undefined;
private readonly connectionStateMonitor = new ConnectionStateMonitor();
private readonly reconnectionMonitor = new ReconnectionMonitor();
private connectionStateMonitorSubscription: SubscriptionLike;
interface DataPayload {
id: string;
payload: DataObject;
type: string;
}
const PROVIDER_NAME = 'AWSAppSyncRealTimeProvider';
// get rid of generic. Just map the options from Gogi-specific to general
export class AWSAppSyncRealTimeProvider extends AWSWebSocketProvider {
constructor() {
// Monitor the connection state and pass changes along to Hub
this.connectionStateMonitorSubscription =
this.connectionStateMonitor.connectionStateObservable.subscribe(
connectionState => {
dispatchApiEvent({
event: CONNECTION_STATE_CHANGE,
data: {
provider: this,
connectionState,
},
message: `Connection state is ${connectionState}`,
});
this.connectionState = connectionState;
// Trigger START_RECONNECT when the connection is disrupted
if (connectionState === ConnectionState.ConnectionDisrupted) {
this.reconnectionMonitor.record(ReconnectEvent.START_RECONNECT);
}
// Trigger HALT_RECONNECT to halt reconnection attempts when the state is anything other than
// ConnectionDisrupted or Connecting
if (
[
ConnectionState.Connected,
ConnectionState.ConnectedPendingDisconnect,
ConnectionState.ConnectedPendingKeepAlive,
ConnectionState.ConnectedPendingNetwork,
ConnectionState.ConnectionDisruptedPendingNetwork,
ConnectionState.Disconnected,
].includes(connectionState)
) {
this.reconnectionMonitor.record(ReconnectEvent.HALT_RECONNECT);
}
},
);
super(PROVIDER_NAME);
}
/**
* Mark the socket closed and release all active listeners
*/
close() {
// Mark the socket closed both in status and the connection monitor
this.socketStatus = SOCKET_STATUS.CLOSED;
this.connectionStateMonitor.record(CONNECTION_CHANGE.CONNECTION_FAILED);
// Turn off the subscription monitor Hub publishing
this.connectionStateMonitorSubscription.unsubscribe();
// Complete all reconnect observers
this.reconnectionMonitor.close();
public subscribe(
options?: AWSAppSyncRealTimeProviderOptions,
customUserAgentDetails?: CustomUserAgentDetails,
) {
return super.subscribe(options, customUserAgentDetails);
}
getNewWebSocket(url: string, protocol: string[]) {
return new WebSocket(url, protocol);
}
getProviderName() {
return 'AWSAppSyncRealTimeProvider';
return PROVIDER_NAME;
}
// Check if url matches standard domain pattern
private isCustomDomain(url: string): boolean {
return url.match(standardDomainPattern) === null;
}
subscribe(
options?: AWSAppSyncRealTimeProviderOptions,
customUserAgentDetails?: CustomUserAgentDetails,
): Observable<Record<string, unknown>> {
const {
appSyncGraphqlEndpoint,
region,
query,
variables,
authenticationType,
additionalHeaders,
apiKey,
authToken,
libraryConfigHeaders,
} = options || {};
return new Observable(observer => {
if (!options || !appSyncGraphqlEndpoint) {
observer.error({
errors: [
{
...new GraphQLError(
`Subscribe only available for AWS AppSync endpoint`,
),
},
],
});
observer.complete();
} else {
let subscriptionStartActive = false;
const subscriptionId = amplifyUuid();
const startSubscription = () => {
if (!subscriptionStartActive) {
subscriptionStartActive = true;
const startSubscriptionPromise =
this._startSubscriptionWithAWSAppSyncRealTime({
options: {
query,
variables,
region,
authenticationType,
appSyncGraphqlEndpoint,
additionalHeaders,
apiKey,
authToken,
libraryConfigHeaders,
},
observer,
subscriptionId,
customUserAgentDetails,
}).catch<any>(err => {
logger.debug(
`${CONTROL_MSG.REALTIME_SUBSCRIPTION_INIT_ERROR}: ${err}`,
);
this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED);
});
startSubscriptionPromise.finally(() => {
subscriptionStartActive = false;
});
}
};
// Add an observable to the reconnection list to manage reconnection for this subscription
const reconnectSubscription = new Observable(
reconnectSubscriptionObserver => {
this.reconnectionMonitor.addObserver(reconnectSubscriptionObserver);
},
).subscribe(() => {
startSubscription();
});
startSubscription();
return async () => {
// Cleanup reconnection subscription
reconnectSubscription?.unsubscribe();
// Cleanup after unsubscribing or observer.complete was called after _startSubscriptionWithAWSAppSyncRealTime
try {
// Waiting that subscription has been connected before trying to unsubscribe
await this._waitForSubscriptionToBeConnected(subscriptionId);
const { subscriptionState } =
this.subscriptionObserverMap.get(subscriptionId) || {};
if (!subscriptionState) {
// subscription already unsubscribed
return;
}
if (subscriptionState === SUBSCRIPTION_STATUS.CONNECTED) {
this._sendUnsubscriptionMessage(subscriptionId);
} else {
throw new Error('Subscription never connected');
}
} catch (err) {
logger.debug(`Error while unsubscribing ${err}`);
} finally {
this._removeSubscriptionObserver(subscriptionId);
}
};
}
});
}
private async _startSubscriptionWithAWSAppSyncRealTime({
protected async _prepareSubscriptionPayload({
options,
observer,
subscriptionId,
customUserAgentDetails,
additionalCustomHeaders,
libraryConfigHeaders,
}: {
options: AWSAppSyncRealTimeProviderOptions;
observer: PubSubContentObserver;
subscriptionId: string;
customUserAgentDetails: CustomUserAgentDetails | undefined;
}) {
additionalCustomHeaders: Record<string, string>;
libraryConfigHeaders: Record<string, string>;
}): Promise<string> {
const {

@@ -325,28 +84,3 @@ appSyncGraphqlEndpoint,

region,
libraryConfigHeaders = () => ({}),
additionalHeaders = {},
authToken,
} = options;
let additionalCustomHeaders: Record<string, string> = {};
if (typeof additionalHeaders === 'function') {
const requestOptions: RequestOptions = {
url: appSyncGraphqlEndpoint || '',
queryString: query || '',
};
additionalCustomHeaders = await additionalHeaders(requestOptions);
} else {
additionalCustomHeaders = additionalHeaders;
}
// if an authorization header is set, have the explicit authToken take precedence
if (authToken) {
additionalCustomHeaders = {
...additionalCustomHeaders,
Authorization: authToken,
};
}
const subscriptionState: SUBSCRIPTION_STATUS = SUBSCRIPTION_STATUS.PENDING;
const data = {

@@ -356,20 +90,10 @@ query,

};
// Having a subscription id map will make it simple to forward messages received
this.subscriptionObserverMap.set(subscriptionId, {
observer,
query: query ?? '',
variables: variables ?? {},
subscriptionState,
startAckTimeoutId: undefined,
});
const serializedData = JSON.stringify(data);
// Preparing payload for subscription message
const dataString = JSON.stringify(data);
const headerObj = {
...(await this._awsRealTimeHeaderBasedAuth({
const headers = {
...(await awsRealTimeHeaderBasedAuth({
apiKey,
appSyncGraphqlEndpoint,
authenticationType,
payload: dataString,
payload: serializedData,
canonicalUri: '',

@@ -379,3 +103,3 @@ region,

})),
...(await libraryConfigHeaders()),
...libraryConfigHeaders,
...additionalCustomHeaders,

@@ -388,6 +112,6 @@ [USER_AGENT_HEADER]: getAmplifyUserAgent(customUserAgentDetails),

payload: {
data: dataString,
data: serializedData,
extensions: {
authorization: {
...headerObj,
...headers,
},

@@ -399,763 +123,46 @@ },

const stringToAWSRealTime = JSON.stringify(subscriptionMessage);
const serializedSubscriptionMessage = JSON.stringify(subscriptionMessage);
try {
this.connectionStateMonitor.record(CONNECTION_CHANGE.OPENING_CONNECTION);
await this._initializeWebSocketConnection({
apiKey,
appSyncGraphqlEndpoint,
authenticationType,
region,
additionalCustomHeaders,
});
} catch (err: any) {
this._logStartSubscriptionError(subscriptionId, observer, err);
return;
}
// Potential race condition can occur when unsubscribe is called during _initializeWebSocketConnection.
// E.g.unsubscribe gets invoked prior to finishing WebSocket handshake or START_ACK.
// Both subscriptionFailedCallback and subscriptionReadyCallback are used to synchronized this.
const { subscriptionFailedCallback, subscriptionReadyCallback } =
this.subscriptionObserverMap.get(subscriptionId) ?? {};
// This must be done before sending the message in order to be listening immediately
this.subscriptionObserverMap.set(subscriptionId, {
observer,
subscriptionState,
query: query ?? '',
variables: variables ?? {},
subscriptionReadyCallback,
subscriptionFailedCallback,
startAckTimeoutId: setTimeout(() => {
this._timeoutStartSubscriptionAck(subscriptionId);
}, START_ACK_TIMEOUT),
});
if (this.awsRealTimeSocket) {
this.awsRealTimeSocket.send(stringToAWSRealTime);
}
return serializedSubscriptionMessage;
}
// Log logic for start subscription failures
private _logStartSubscriptionError(
subscriptionId: string,
observer: PubSubContentObserver,
err: { message?: string },
) {
logger.debug({ err });
const message = String(err.message ?? '');
// Resolving to give the state observer time to propogate the update
this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED);
protected _handleSubscriptionData(
message: MessageEvent,
): [boolean, DataPayload] {
this.logger.debug(
`subscription message from AWS AppSync RealTime: ${message.data}`,
);
// Capture the error only when the network didn't cause disruption
if (
this.connectionState !== ConnectionState.ConnectionDisruptedPendingNetwork
) {
// When the error is non-retriable, error out the observable
if (isNonRetryableError(err)) {
observer.error({
errors: [
{
...new GraphQLError(
`${CONTROL_MSG.CONNECTION_FAILED}: ${message}`,
),
},
],
});
} else {
logger.debug(`${CONTROL_MSG.CONNECTION_FAILED}: ${message}`);
}
const { id = '', payload, type } = JSON.parse(String(message.data));
const { subscriptionFailedCallback } =
this.subscriptionObserverMap.get(subscriptionId) || {};
// Notify concurrent unsubscription
if (typeof subscriptionFailedCallback === 'function') {
subscriptionFailedCallback();
}
}
}
// Waiting that subscription has been connected before trying to unsubscribe
private async _waitForSubscriptionToBeConnected(subscriptionId: string) {
const subscriptionObserver =
this.subscriptionObserverMap.get(subscriptionId);
if (subscriptionObserver) {
const { subscriptionState } = subscriptionObserver;
// This in case unsubscribe is invoked before sending start subscription message
if (subscriptionState === SUBSCRIPTION_STATUS.PENDING) {
return new Promise<void>((resolve, reject) => {
const {
observer,
subscriptionState: observedSubscriptionState,
variables,
query,
} = subscriptionObserver;
this.subscriptionObserverMap.set(subscriptionId, {
observer,
subscriptionState: observedSubscriptionState,
variables,
query,
subscriptionReadyCallback: resolve,
subscriptionFailedCallback: reject,
});
});
}
}
}
private _sendUnsubscriptionMessage(subscriptionId: string) {
try {
if (
this.awsRealTimeSocket &&
this.awsRealTimeSocket.readyState === WebSocket.OPEN &&
this.socketStatus === SOCKET_STATUS.READY
) {
// Preparing unsubscribe message to stop receiving messages for that subscription
const unsubscribeMessage = {
id: subscriptionId,
type: MESSAGE_TYPES.GQL_STOP,
};
const stringToAWSRealTime = JSON.stringify(unsubscribeMessage);
this.awsRealTimeSocket.send(stringToAWSRealTime);
}
} catch (err) {
// If GQL_STOP is not sent because of disconnection issue, then there is nothing the client can do
logger.debug({ err });
}
}
private _removeSubscriptionObserver(subscriptionId: string) {
this.subscriptionObserverMap.delete(subscriptionId);
// Verifying 1000ms after removing subscription in case there are new subscription unmount/mount
setTimeout(this._closeSocketIfRequired.bind(this), 1000);
}
private _closeSocketIfRequired() {
if (this.subscriptionObserverMap.size > 0) {
// Active subscriptions on the WebSocket
return;
}
if (!this.awsRealTimeSocket) {
this.socketStatus = SOCKET_STATUS.CLOSED;
return;
}
this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSING_CONNECTION);
if (this.awsRealTimeSocket.bufferedAmount > 0) {
// Still data on the WebSocket
setTimeout(this._closeSocketIfRequired.bind(this), 1000);
} else {
logger.debug('closing WebSocket...');
if (this.keepAliveTimeoutId) {
clearTimeout(this.keepAliveTimeoutId);
}
if (this.keepAliveAlertTimeoutId) {
clearTimeout(this.keepAliveAlertTimeoutId);
}
const tempSocket = this.awsRealTimeSocket;
// Cleaning callbacks to avoid race condition, socket still exists
tempSocket.onclose = null;
tempSocket.onerror = null;
tempSocket.close(1000);
this.awsRealTimeSocket = undefined;
this.socketStatus = SOCKET_STATUS.CLOSED;
this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED);
}
}
private _handleIncomingSubscriptionMessage(message: MessageEvent) {
if (typeof message.data !== 'string') {
return;
}
logger.debug(
`subscription message from AWS AppSync RealTime: ${message.data}`,
);
const {
id = '',
payload,
type,
}: DataPayload = JSON.parse(String(message.data));
const {
observer = null,
query = '',
variables = {},
startAckTimeoutId,
subscriptionReadyCallback,
subscriptionFailedCallback,
} = this.subscriptionObserverMap.get(id) || {};
logger.debug({ id, observer, query, variables });
this.logger.debug({ id, observer, query, variables });
if (type === MESSAGE_TYPES.GQL_DATA && payload && payload.data) {
if (type === MESSAGE_TYPES.DATA && payload && payload.data) {
if (observer) {
observer.next(payload);
} else {
logger.debug(`observer not found for id: ${id}`);
this.logger.debug(`observer not found for id: ${id}`);
}
return;
return [true, { id, type, payload }];
}
if (type === MESSAGE_TYPES.GQL_START_ACK) {
logger.debug(
`subscription ready for ${JSON.stringify({ query, variables })}`,
);
if (typeof subscriptionReadyCallback === 'function') {
subscriptionReadyCallback();
}
if (startAckTimeoutId) clearTimeout(startAckTimeoutId);
dispatchApiEvent({
event: CONTROL_MSG.SUBSCRIPTION_ACK,
data: { query, variables },
message: 'Connection established for subscription',
});
const subscriptionState = SUBSCRIPTION_STATUS.CONNECTED;
if (observer) {
this.subscriptionObserverMap.set(id, {
observer,
query,
variables,
startAckTimeoutId: undefined,
subscriptionState,
subscriptionReadyCallback,
subscriptionFailedCallback,
});
}
this.connectionStateMonitor.record(
CONNECTION_CHANGE.CONNECTION_ESTABLISHED,
);
return;
}
if (type === MESSAGE_TYPES.GQL_CONNECTION_KEEP_ALIVE) {
if (this.keepAliveTimeoutId) clearTimeout(this.keepAliveTimeoutId);
if (this.keepAliveAlertTimeoutId)
clearTimeout(this.keepAliveAlertTimeoutId);
this.keepAliveTimeoutId = setTimeout(() => {
this._errorDisconnect(CONTROL_MSG.TIMEOUT_DISCONNECT);
}, this.keepAliveTimeout);
this.keepAliveAlertTimeoutId = setTimeout(() => {
this.connectionStateMonitor.record(CONNECTION_CHANGE.KEEP_ALIVE_MISSED);
}, DEFAULT_KEEP_ALIVE_ALERT_TIMEOUT);
this.connectionStateMonitor.record(CONNECTION_CHANGE.KEEP_ALIVE);
return;
}
if (type === MESSAGE_TYPES.GQL_ERROR) {
const subscriptionState = SUBSCRIPTION_STATUS.FAILED;
if (observer) {
this.subscriptionObserverMap.set(id, {
observer,
query,
variables,
startAckTimeoutId,
subscriptionReadyCallback,
subscriptionFailedCallback,
subscriptionState,
});
logger.debug(
`${CONTROL_MSG.CONNECTION_FAILED}: ${JSON.stringify(payload)}`,
);
observer.error({
errors: [
{
...new GraphQLError(
`${CONTROL_MSG.CONNECTION_FAILED}: ${JSON.stringify(payload)}`,
),
},
],
});
if (startAckTimeoutId) clearTimeout(startAckTimeoutId);
if (typeof subscriptionFailedCallback === 'function') {
subscriptionFailedCallback();
}
}
}
return [false, { id, type, payload }];
}
private _errorDisconnect(msg: string) {
logger.debug(`Disconnect error: ${msg}`);
if (this.awsRealTimeSocket) {
this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED);
this.awsRealTimeSocket.close();
}
this.socketStatus = SOCKET_STATUS.CLOSED;
}
private _timeoutStartSubscriptionAck(subscriptionId: string) {
const subscriptionObserver =
this.subscriptionObserverMap.get(subscriptionId);
if (subscriptionObserver) {
const { observer, query, variables } = subscriptionObserver;
if (!observer) {
return;
}
this.subscriptionObserverMap.set(subscriptionId, {
observer,
query,
variables,
subscriptionState: SUBSCRIPTION_STATUS.FAILED,
});
this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED);
logger.debug(
'timeoutStartSubscription',
JSON.stringify({ query, variables }),
);
}
}
/**
* Strips out `Authorization` header if present
*/
private _extractNonAuthHeaders(
headers?: AWSAppSyncRealTimeProviderOptions['additionalCustomHeaders'],
): Record<string, string> {
if (!headers) {
return {};
}
if ('Authorization' in headers) {
const { Authorization: _, ...nonAuthHeaders } = headers;
return nonAuthHeaders;
}
return headers;
}
/**
*
* @param headers - http headers
* @returns uri-encoded query parameters derived from custom headers
*/
private _queryParamsFromCustomHeaders(
headers?: AWSAppSyncRealTimeProviderOptions['additionalCustomHeaders'],
): URLSearchParams {
const nonAuthHeaders = this._extractNonAuthHeaders(headers);
const params = new AmplifyUrlSearchParams();
Object.entries(nonAuthHeaders).forEach(([k, v]) => {
params.append(k, v);
});
return params;
}
/**
* Normalizes AppSync realtime endpoint URL
*
* @param appSyncGraphqlEndpoint - AppSync endpointUri from config
* @param urlParams - URLSearchParams
* @returns fully resolved string realtime endpoint URL
*/
private _realtimeUrlWithQueryString(
appSyncGraphqlEndpoint: string | undefined,
urlParams: URLSearchParams,
): string {
const protocol = 'wss://';
let realtimeEndpoint = appSyncGraphqlEndpoint ?? '';
if (this.isCustomDomain(realtimeEndpoint)) {
realtimeEndpoint = realtimeEndpoint.concat(customDomainPath);
} else {
realtimeEndpoint = realtimeEndpoint
.replace('appsync-api', 'appsync-realtime-api')
.replace('gogi-beta', 'grt-beta');
}
realtimeEndpoint = realtimeEndpoint
.replace('https://', protocol)
.replace('http://', protocol);
const realtimeEndpointUrl = new AmplifyUrl(realtimeEndpoint);
// preserves any query params a customer might manually set in the configuration
const existingParams = new AmplifyUrlSearchParams(
realtimeEndpointUrl.search,
);
for (const [k, v] of urlParams.entries()) {
existingParams.append(k, v);
}
realtimeEndpointUrl.search = existingParams.toString();
return realtimeEndpointUrl.toString();
}
private _initializeWebSocketConnection({
appSyncGraphqlEndpoint,
authenticationType,
apiKey,
region,
additionalCustomHeaders,
}: AWSAppSyncRealTimeProviderOptions) {
if (this.socketStatus === SOCKET_STATUS.READY) {
return;
}
// TODO(Eslint): refactor to now use async function as the promise executor
// eslint-disable-next-line no-async-promise-executor
return new Promise<void>(async (resolve, reject) => {
this.promiseArray.push({ res: resolve, rej: reject });
if (this.socketStatus === SOCKET_STATUS.CLOSED) {
try {
this.socketStatus = SOCKET_STATUS.CONNECTING;
const payloadString = '{}';
const authHeader = await this._awsRealTimeHeaderBasedAuth({
authenticationType,
payload: payloadString,
canonicalUri: '/connect',
apiKey,
appSyncGraphqlEndpoint,
region,
additionalCustomHeaders,
});
const headerString = authHeader ? JSON.stringify(authHeader) : '';
// base64url-encoded string
const encodedHeader = base64Encoder.convert(headerString, {
urlSafe: true,
skipPadding: true,
});
const authTokenSubprotocol = `header-${encodedHeader}`;
const queryParams = this._queryParamsFromCustomHeaders(
additionalCustomHeaders,
);
const awsRealTimeUrl = this._realtimeUrlWithQueryString(
appSyncGraphqlEndpoint,
queryParams,
);
await this._initializeRetryableHandshake(
awsRealTimeUrl,
authTokenSubprotocol,
);
this.promiseArray.forEach(({ res }) => {
logger.debug('Notifying connection successful');
res();
});
this.socketStatus = SOCKET_STATUS.READY;
this.promiseArray = [];
} catch (err) {
logger.debug('Connection exited with', err);
this.promiseArray.forEach(({ rej }) => {
rej(err);
});
this.promiseArray = [];
if (
this.awsRealTimeSocket &&
this.awsRealTimeSocket.readyState === WebSocket.OPEN
) {
this.awsRealTimeSocket.close(3001);
}
this.awsRealTimeSocket = undefined;
this.socketStatus = SOCKET_STATUS.CLOSED;
}
}
});
}
private async _initializeRetryableHandshake(
awsRealTimeUrl: string,
subprotocol: string,
) {
logger.debug(`Initializaling retryable Handshake`);
await jitteredExponentialRetry(
this._initializeHandshake.bind(this),
[awsRealTimeUrl, subprotocol],
MAX_DELAY_MS,
);
}
/**
*
* @param subprotocol -
*/
private async _initializeHandshake(
awsRealTimeUrl: string,
subprotocol: string,
) {
logger.debug(`Initializing handshake ${awsRealTimeUrl}`);
// Because connecting the socket is async, is waiting until connection is open
// Step 1: connect websocket
try {
await (() => {
return new Promise<void>((resolve, reject) => {
const newSocket = this.getNewWebSocket(awsRealTimeUrl, [
'graphql-ws',
subprotocol,
]);
newSocket.onerror = () => {
logger.debug(`WebSocket connection error`);
};
newSocket.onclose = () => {
reject(new Error('Connection handshake error'));
};
newSocket.onopen = () => {
this.awsRealTimeSocket = newSocket;
resolve();
};
});
})();
// Step 2: wait for ack from AWS AppSyncReaTime after sending init
await (() => {
return new Promise((resolve, reject) => {
if (this.awsRealTimeSocket) {
let ackOk = false;
this.awsRealTimeSocket.onerror = error => {
logger.debug(`WebSocket error ${JSON.stringify(error)}`);
};
this.awsRealTimeSocket.onclose = event => {
logger.debug(`WebSocket closed ${event.reason}`);
reject(new Error(JSON.stringify(event)));
};
this.awsRealTimeSocket.onmessage = (message: MessageEvent) => {
if (typeof message.data !== 'string') {
return;
}
logger.debug(
`subscription message from AWS AppSyncRealTime: ${message.data} `,
);
const data = JSON.parse(message.data) as ParsedMessagePayload;
const {
type,
payload: {
connectionTimeoutMs = DEFAULT_KEEP_ALIVE_TIMEOUT,
} = {},
} = data;
if (type === MESSAGE_TYPES.GQL_CONNECTION_ACK) {
ackOk = true;
if (this.awsRealTimeSocket) {
this.keepAliveTimeout = connectionTimeoutMs;
this.awsRealTimeSocket.onmessage =
this._handleIncomingSubscriptionMessage.bind(this);
this.awsRealTimeSocket.onerror = err => {
logger.debug(err);
this._errorDisconnect(CONTROL_MSG.CONNECTION_CLOSED);
};
this.awsRealTimeSocket.onclose = event => {
logger.debug(`WebSocket closed ${event.reason}`);
this._errorDisconnect(CONTROL_MSG.CONNECTION_CLOSED);
};
}
resolve('Cool, connected to AWS AppSyncRealTime');
return;
}
if (type === MESSAGE_TYPES.GQL_CONNECTION_ERROR) {
const {
payload: {
errors: [{ errorType = '', errorCode = 0 } = {}] = [],
} = {},
} = data;
// TODO(Eslint): refactor to reject an Error object instead of a plain object
// eslint-disable-next-line prefer-promise-reject-errors
reject({ errorType, errorCode });
}
};
const gqlInit = {
type: MESSAGE_TYPES.GQL_CONNECTION_INIT,
};
this.awsRealTimeSocket.send(JSON.stringify(gqlInit));
const checkAckOk = (targetAckOk: boolean) => {
if (!targetAckOk) {
this.connectionStateMonitor.record(
CONNECTION_CHANGE.CONNECTION_FAILED,
);
reject(
new Error(
`Connection timeout: ack from AWSAppSyncRealTime was not received after ${CONNECTION_INIT_TIMEOUT} ms`,
),
);
}
};
setTimeout(() => {
checkAckOk(ackOk);
}, CONNECTION_INIT_TIMEOUT);
}
});
})();
} catch (err) {
const { errorType, errorCode } = err as {
errorType: string;
errorCode: number;
};
if (NON_RETRYABLE_CODES.includes(errorCode)) {
throw new NonRetryableError(errorType);
} else if (errorType) {
throw new Error(errorType);
} else {
throw err;
}
}
}
private async _awsRealTimeHeaderBasedAuth({
apiKey,
authenticationType,
payload,
canonicalUri,
appSyncGraphqlEndpoint,
region,
additionalCustomHeaders,
}: AWSAppSyncRealTimeAuthInput): Promise<
Record<string, unknown> | undefined
> {
const headerHandler: {
[key in ResolvedGraphQLAuthModes]: (
arg0: AWSAppSyncRealTimeAuthInput,
) => Promise<Record<string, unknown>> | Record<string, unknown>;
} = {
apiKey: this._awsRealTimeApiKeyHeader.bind(this),
iam: this._awsRealTimeIAMHeader.bind(this),
oidc: this._awsAuthTokenHeader.bind(this),
userPool: this._awsAuthTokenHeader.bind(this),
lambda: this._customAuthHeader,
none: this._customAuthHeader,
};
if (!authenticationType || !headerHandler[authenticationType]) {
logger.debug(`Authentication type ${authenticationType} not supported`);
return undefined;
} else {
const handler = headerHandler[authenticationType];
const host = appSyncGraphqlEndpoint
? new AmplifyUrl(appSyncGraphqlEndpoint).host
: undefined;
logger.debug(`Authenticating with ${JSON.stringify(authenticationType)}`);
let resolvedApiKey;
if (authenticationType === 'apiKey') {
resolvedApiKey = apiKey;
}
const result = await handler({
payload,
canonicalUri,
appSyncGraphqlEndpoint,
apiKey: resolvedApiKey,
region,
host,
additionalCustomHeaders,
});
return result;
}
}
private async _awsAuthTokenHeader({ host }: AWSAppSyncRealTimeAuthInput) {
const session = await fetchAuthSession();
protected _unsubscribeMessage(subscriptionId: string): {
id: string;
type: string;
} {
return {
Authorization: session?.tokens?.accessToken?.toString(),
host,
id: subscriptionId,
type: MESSAGE_TYPES.GQL_STOP,
};
}
private async _awsRealTimeApiKeyHeader({
apiKey,
host,
}: AWSAppSyncRealTimeAuthInput) {
const dt = new Date();
const dtStr = dt.toISOString().replace(/[:-]|\.\d{3}/g, '');
return {
host,
'x-amz-date': dtStr,
'x-api-key': apiKey,
};
}
private async _awsRealTimeIAMHeader({
payload,
canonicalUri,
appSyncGraphqlEndpoint,
region,
}: AWSAppSyncRealTimeAuthInput) {
const endpointInfo = {
region,
service: 'appsync',
};
const creds = (await fetchAuthSession()).credentials;
const request = {
url: `${appSyncGraphqlEndpoint}${canonicalUri}`,
data: payload,
method: 'POST',
headers: { ...AWS_APPSYNC_REALTIME_HEADERS },
};
const signedParams = signRequest(
{
headers: request.headers,
method: request.method,
url: new AmplifyUrl(request.url),
body: request.data,
},
{
// TODO: What do we need to do to remove these !'s?
credentials: creds!,
signingRegion: endpointInfo.region!,
signingService: endpointInfo.service,
},
);
return signedParams.headers;
}
private _customAuthHeader({
host,
additionalCustomHeaders,
}: AWSAppSyncRealTimeAuthInput) {
/**
* If `additionalHeaders` was provided to the subscription as a function,
* the headers that are returned by that function will already have been
* provided before this function is called.
*/
if (!additionalCustomHeaders?.Authorization) {
throw new Error('No auth token specified');
}
return {
Authorization: additionalCustomHeaders.Authorization,
host,
};
}
}

@@ -39,5 +39,5 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.

* Server -> Client message.
* This message type is for subscription message from AWS AppSync RealTime
* This message type is for subscription message from AWS AppSync RealTime or Events
*/
GQL_DATA = 'data',
DATA = 'data',
/**

@@ -63,2 +63,32 @@ * Server -> Client message.

GQL_ERROR = 'error', // Server -> Client
/**
* Client -> Server message.
* This message type is for registering subscriptions with Events
*/
EVENT_SUBSCRIBE = 'subscribe',
/**
* Client -> Server message.
* This message type is for publishing a message with Events
*/
EVENT_PUBLISH = 'publish',
/**
* Server -> Client message.
* Server acknowledges successful subscription
*/
EVENT_SUBSCRIBE_ACK = 'subscribe_success',
/**
* Server -> Client message.
* Server acknowledges successful publish
*/
EVENT_PUBLISH_ACK = 'publish_success',
/**
* Client -> Server message.
* This message type is for unregister subscriptions with AWS AppSync RealTime
*/
EVENT_STOP = 'unsubscribe',
/**
* Server -> Client message.
* This is the ack response from AWS AppSync Events to EVENT_STOP message
*/
EVENT_COMPLETE = 'unsubscribe_success',
}

@@ -65,0 +95,0 @@

@@ -20,14 +20,3 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.

export const CONNECTION_CHANGE: {
[key in
| 'KEEP_ALIVE_MISSED'
| 'KEEP_ALIVE'
| 'CONNECTION_ESTABLISHED'
| 'CONNECTION_FAILED'
| 'CLOSING_CONNECTION'
| 'OPENING_CONNECTION'
| 'CLOSED'
| 'ONLINE'
| 'OFFLINE']: Partial<LinkedConnectionStates>;
} = {
export const CONNECTION_CHANGE = {
KEEP_ALIVE_MISSED: { keepAliveState: 'unhealthy' },

@@ -48,3 +37,3 @@ KEEP_ALIVE: { keepAliveState: 'healthy' },

OFFLINE: { networkState: 'disconnected' },
};
} as const;

@@ -51,0 +40,0 @@ export class ConnectionStateMonitor {

@@ -14,3 +14,3 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.

* Captures the reconnect event logic used to determine when to reconnect to PubSub providers.
* Reconnnect attempts are delayed by 5 seconds to let the interface settle.
* Reconnect attempts are delayed by 5 seconds to let the interface settle.
* Attempting to reconnect only once creates unrecoverable states when the network state isn't

@@ -17,0 +17,0 @@ * supported by the browser, so this keeps retrying every minute until halted.

@@ -47,1 +47,24 @@ // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.

};
// /**
// * @internal
// */
// export const resolveEventsConfig = (amplify: AmplifyClassV6) => {
// const config = amplify.getConfig();
// if (!config.API?.Events) {
// logger.warn(
// 'The Events configuration is missing. This is likely due to Amplify.configure() not being called prior to using events.connect() or events.post().',
// );
// }
// const { apiKey, endpoint, defaultAuthMode, region } =
// config.API?.Events ?? {};
// return {
// apiKey,
// defaultAuthMode,
// appSyncGraphqlEndpoint: endpoint,
// region,
// };
// };

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc