Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@uppy/companion-client

Package Overview
Dependencies
Maintainers
6
Versions
89
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@uppy/companion-client - npm Package Compare versions

Comparing version 3.5.0 to 3.6.0

3

lib/index.js

@@ -8,3 +8,2 @@ 'use strict';

export { default as Provider } from './Provider.js';
export { default as SearchProvider } from './SearchProvider.js';
export { default as Socket } from './Socket.js';
export { default as SearchProvider } from './SearchProvider.js';

@@ -32,4 +32,4 @@ 'use strict';

export default class Provider extends RequestClient {
constructor(uppy, opts, getQueue) {
super(uppy, opts, getQueue);
constructor(uppy, opts) {
super(uppy, opts);
Object.defineProperty(this, _removeAuthToken, {

@@ -147,4 +147,3 @@ value: _removeAuthToken2

window.removeEventListener('message', handleToken);
this.setAuthToken(data.token);
resolve();
this.setAuthToken(data.token).then(() => resolve()).catch(reject);
};

@@ -165,4 +164,5 @@ window.addEventListener('message', handleToken);

try {
// throw Object.assign(new Error(), { isAuthError: true }) // testing simulate access token expired (to refresh token)
// A better way to test this is for example with Google Drive:
// to test simulate access token expired (leading to a token token refresh),
// see mockAccessTokenExpiredError in companion/drive.
// If you want to test refresh token *and* access token invalid, do this for example with Google Drive:
// While uploading, go to your google account settings,

@@ -174,3 +174,4 @@ // "Third-party apps & services", then click "Companion" and "Remove access".

// only handle auth errors (401 from provider), and only handle them if we have a (refresh) token
if (!err.isAuthError || !(await _classPrivateFieldLooseBase(this, _getAuthToken)[_getAuthToken]())) throw err;
const authTokenAfter = await _classPrivateFieldLooseBase(this, _getAuthToken)[_getAuthToken]();
if (!err.isAuthError || !authTokenAfter) throw err;
if (_classPrivateFieldLooseBase(this, _refreshingTokenPromise)[_refreshingTokenPromise] == null) {

@@ -181,2 +182,3 @@ // Many provider requests may be starting at once, however refresh token should only be called once.

try {
this.uppy.log(`[CompanionClient] Refreshing expired auth token`, 'info');
const response = await super.request({

@@ -183,0 +185,0 @@ path: this.refreshTokenUrl(),

'use strict';
// eslint-disable-next-line import/no-extraneous-dependencies
let _Symbol$for;

@@ -7,2 +8,3 @@ function _classPrivateFieldLooseBase(receiver, privateKey) { if (!Object.prototype.hasOwnProperty.call(receiver, privateKey)) { throw new TypeError("attempted to use private field on non-instance"); } return receiver; }

function _classPrivateFieldLooseKey(name) { return "__private_" + id++ + "_" + name; }
import pRetry, { AbortError } from 'p-retry';
import fetchWithNetworkError from '@uppy/utils/lib/fetchWithNetworkError';

@@ -12,7 +14,5 @@ import ErrorWithCause from '@uppy/utils/lib/ErrorWithCause';

import getSocketHost from '@uppy/utils/lib/getSocketHost';
import EventManager from '@uppy/utils/lib/EventManager';
import AuthError from './AuthError.js';
import Socket from './Socket.js';
const packageJson = {
"version": "3.5.0"
"version": "3.6.0"
}; // Remove the trailing slash so we can always safely append /xyz.

@@ -22,13 +22,27 @@ function stripSlash(url) {

}
const retryCount = 10; // set to a low number, like 2 to test manual user retries
const socketActivityTimeoutMs = 5 * 60 * 1000; // set to a low number like 10000 to test this
const authErrorStatusCode = 401;
class HttpError extends Error {
constructor(_ref) {
let {
statusCode,
message
} = _ref;
super(message);
this.statusCode = void 0;
this.statusCode = statusCode;
}
}
async function handleJSONResponse(res) {
if (res.status === 401) {
if (res.status === authErrorStatusCode) {
throw new AuthError();
}
const jsonPromise = res.json();
if (res.ok) {
return jsonPromise;
return res.json();
}
let errMsg = `Failed request with status: ${res.status}. ${res.statusText}`;
try {
const errData = await jsonPromise;
const errData = await res.json();
errMsg = errData.message ? `${errMsg} message: ${errData.message}` : errMsg;

@@ -39,3 +53,6 @@ errMsg = errData.requestId ? `${errMsg} request-Id: ${errData.requestId}` : errMsg;

}
throw new Error(errMsg);
throw new HttpError({
statusCode: res.status,
message: errMsg
});
}

@@ -48,5 +65,16 @@

var _requestSocketToken = /*#__PURE__*/_classPrivateFieldLooseKey("requestSocketToken");
var _awaitRemoteFileUpload = /*#__PURE__*/_classPrivateFieldLooseKey("awaitRemoteFileUpload");
_Symbol$for = Symbol.for('uppy test: getCompanionHeaders');
export default class RequestClient {
constructor(uppy, opts, getQueue) {
constructor(uppy, opts) {
/**
* This method will ensure a websocket for the specified file and returns a promise that resolves
* when the file has finished downloading, or rejects if it fails.
* It will retry if the websocket gets disconnected
*
* @param {{ file: UppyFile, queue: RateLimitedQueue, signal: AbortSignal }} file
*/
Object.defineProperty(this, _awaitRemoteFileUpload, {
value: _awaitRemoteFileUpload2
});
Object.defineProperty(this, _getUrl, {

@@ -61,3 +89,8 @@ value: _getUrl2

writable: true,
value: async (file, postBody) => {
value: async _ref2 => {
let {
file,
postBody,
signal
} = _ref2;
if (file.remote.url == null) {

@@ -69,3 +102,3 @@ throw new Error('Cannot connect to an undefined URL');

...postBody
});
}, signal);
return res.token;

@@ -76,3 +109,2 @@ }

this.opts = opts;
this.getQueue = getQueue;
this.onReceiveResponse = this.onReceiveResponse.bind(this);

@@ -105,6 +137,6 @@ _classPrivateFieldLooseBase(this, _companionHeaders)[_companionHeaders] = opts == null ? void 0 : opts.companionHeaders;

}
onReceiveResponse(_ref) {
onReceiveResponse(_ref3) {
let {
headers
} = _ref;
} = _ref3;
const state = this.uppy.getState();

@@ -169,4 +201,4 @@ const companion = state.companion || {};

// filter to keep only allowed Headers
return Object.fromEntries(Object.entries(headers).filter(_ref2 => {
let [header] = _ref2;
return Object.fromEntries(Object.entries(headers).filter(_ref4 => {
let [header] = _ref4;
if (!allowedHeaders.includes(header.toLowerCase())) {

@@ -181,3 +213,3 @@ this.uppy.log(`[CompanionClient] excluding disallowed header ${header}`);

/** @protected */
async request(_ref3) {
async request(_ref5) {
let {

@@ -189,3 +221,3 @@ path,

signal
} = _ref3;
} = _ref5;
try {

@@ -201,5 +233,6 @@ const headers = await this.preflightAndHeaders(path);

if (!skipPostResponse) this.onReceiveResponse(response);
return handleJSONResponse(response);
return await handleJSONResponse(response);
} catch (err) {
if (err != null && err.isAuthError) throw err;
// pass these through
if (err instanceof AuthError || err.name === 'AbortError') throw err;
throw new ErrorWithCause(`Could not ${method} ${_classPrivateFieldLooseBase(this, _getUrl)[_getUrl](path)}`, {

@@ -256,3 +289,18 @@ cause: err

}
/**
* Remote uploading consists of two steps:
* 1. #requestSocketToken which starts the download/upload in companion and returns a unique token for the upload.
* Then companion will halt the upload until:
* 2. #awaitRemoteFileUpload is called, which will open/ensure a websocket connection towards companion, with the
* previously generated token provided. It returns a promise that will resolve/reject once the file has finished
* uploading or is otherwise done (failed, canceled)
*
* @param {*} file
* @param {*} reqBody
* @param {*} options
* @returns
*/
async uploadRemoteFile(file, reqBody, options) {
var _this = this;
if (options === void 0) {

@@ -262,23 +310,64 @@ options = {};

try {
if (file.serverToken) {
return await this.connectToServerSocket(file, this.getQueue());
}
const queueRequestSocketToken = this.getQueue().wrapPromiseFunction(_classPrivateFieldLooseBase(this, _requestSocketToken)[_requestSocketToken], {
priority: -1
const {
signal,
getQueue
} = options;
return await pRetry(async () => {
var _this$uppy$getFile;
// if we already have a serverToken, assume that we are resuming the existing server upload id
const existingServerToken = (_this$uppy$getFile = this.uppy.getFile(file.id)) == null ? void 0 : _this$uppy$getFile.serverToken;
if (existingServerToken != null) {
this.uppy.log(`Connecting to exiting websocket ${existingServerToken}`);
return _classPrivateFieldLooseBase(this, _awaitRemoteFileUpload)[_awaitRemoteFileUpload]({
file,
queue: getQueue(),
signal
});
}
const queueRequestSocketToken = getQueue().wrapPromiseFunction(async function () {
try {
return await _classPrivateFieldLooseBase(_this, _requestSocketToken)[_requestSocketToken](...arguments);
} catch (outerErr) {
// throwing AbortError will cause p-retry to stop retrying
if (outerErr instanceof AuthError) throw new AbortError(outerErr);
if (outerErr.cause == null) throw outerErr;
const err = outerErr.cause;
const isRetryableHttpError = () => [408, 409, 429, 418, 423].includes(err.statusCode) || err.statusCode >= 500 && err.statusCode <= 599 && ![501, 505].includes(err.statusCode);
if (err instanceof HttpError && !isRetryableHttpError()) throw new AbortError(err);
// p-retry will retry most other errors,
// but it will not retry TypeError (except network error TypeErrors)
throw err;
}
}, {
priority: -1
});
const serverToken = await queueRequestSocketToken({
file,
postBody: reqBody,
signal
}).abortOn(signal);
if (!this.uppy.getFile(file.id)) return undefined; // has file since been removed?
this.uppy.setFileState(file.id, {
serverToken
});
return _classPrivateFieldLooseBase(this, _awaitRemoteFileUpload)[_awaitRemoteFileUpload]({
file: this.uppy.getFile(file.id),
// re-fetching file because it might have changed in the meantime
queue: getQueue(),
signal
});
}, {
retries: retryCount,
signal,
onFailedAttempt: err => this.uppy.log(`Retrying upload due to: ${err.message}`, 'warning')
});
const serverToken = await queueRequestSocketToken(file, reqBody).abortOn(options.signal);
if (!this.uppy.getState().files[file.id]) return undefined;
this.uppy.setFileState(file.id, {
serverToken
});
return await this.connectToServerSocket(this.uppy.getFile(file.id), this.getQueue());
} catch (err) {
var _err$cause;
if ((err == null || (_err$cause = err.cause) == null ? void 0 : _err$cause.name) === 'AbortError') {
// this is a bit confusing, but note that an error with the `name` prop set to 'AbortError' (from AbortController)
// is not the same as `p-retry` `AbortError`
if (err.name === 'AbortError') {
// The file upload was aborted, it’s not an error
return undefined;
}
this.uppy.setFileState(file.id, {
serverToken: undefined
});
this.uppy.emit('upload-error', file, err);

@@ -288,127 +377,210 @@ throw err;

}
/**
* @param {UppyFile} file
*/
async connectToServerSocket(file, queue) {
return new Promise((resolve, reject) => {
}
function _getUrl2(url) {
if (/^(https?:|)\/\//.test(url)) {
return url;
}
return `${this.hostname}/${url}`;
}
async function _awaitRemoteFileUpload2(_ref6) {
let {
file,
queue,
signal
} = _ref6;
let removeEventHandlers;
const {
capabilities
} = this.uppy.getState();
try {
return await new Promise((resolve, reject) => {
const token = file.serverToken;
const host = getSocketHost(file.remote.companionUrl);
const socket = new Socket({
target: `${host}/api/${token}`,
autoOpen: false
});
const eventManager = new EventManager(this.uppy);
let queuedRequest;
eventManager.onFileRemove(file.id, () => {
socket.send('cancel', {});
queuedRequest.abort();
resolve(`upload ${file.id} was removed`);
});
eventManager.onPause(file.id, isPaused => {
if (isPaused) {
/** @type {WebSocket} */
let socket;
/** @type {AbortController?} */
let socketAbortController;
let activityTimeout;
let {
isPaused
} = file;
const socketSend = (action, payload) => {
if (socket == null || socket.readyState !== socket.OPEN) {
var _socket;
this.uppy.log(`Cannot send "${action}" to socket ${file.id} because the socket state was ${String((_socket = socket) == null ? void 0 : _socket.readyState)}`, 'warning');
return;
}
socket.send(JSON.stringify({
action,
payload: payload != null ? payload : {}
}));
};
function sendState() {
if (!capabilities.resumableUploads) return;
if (isPaused) socketSend('pause');else socketSend('resume');
}
const createWebsocket = async () => {
if (socketAbortController) socketAbortController.abort();
socketAbortController = new AbortController();
const onFatalError = err => {
var _socketAbortControlle;
// Remove the serverToken so that a new one will be created for the retry.
this.uppy.setFileState(file.id, {
serverToken: null
});
(_socketAbortControlle = socketAbortController) == null || _socketAbortControlle.abort == null ? void 0 : _socketAbortControlle.abort();
reject(err);
};
// todo instead implement the ability for users to cancel / retry *currently uploading files* in the UI
function resetActivityTimeout() {
clearTimeout(activityTimeout);
if (isPaused) return;
activityTimeout = setTimeout(() => onFatalError(new Error('Timeout waiting for message from Companion socket')), socketActivityTimeoutMs);
}
try {
await queue.wrapPromiseFunction(async () => {
// eslint-disable-next-line promise/param-names
const reconnectWebsocket = async () => new Promise((resolveSocket, rejectSocket) => {
socket = new WebSocket(`${host}/api/${token}`);
resetActivityTimeout();
socket.addEventListener('close', () => {
socket = undefined;
rejectSocket(new Error('Socket closed unexpectedly'));
});
socket.addEventListener('error', error => {
this.uppy.log(`Companion socket error ${JSON.stringify(error)}, closing socket`, 'warning');
socket.close(); // will 'close' event to be emitted
});
socket.addEventListener('open', () => {
sendState();
});
socket.addEventListener('message', e => {
resetActivityTimeout();
try {
const {
action,
payload
} = JSON.parse(e.data);
switch (action) {
case 'progress':
{
emitSocketProgress(this, payload, file);
break;
}
case 'success':
{
var _socketAbortControlle2;
this.uppy.emit('upload-success', file, {
uploadURL: payload.url
});
(_socketAbortControlle2 = socketAbortController) == null || _socketAbortControlle2.abort == null ? void 0 : _socketAbortControlle2.abort();
resolve();
break;
}
case 'error':
{
const {
message
} = payload.error;
throw Object.assign(new Error(message), {
cause: payload.error
});
}
default:
this.uppy.log(`Companion socket unknown action ${action}`, 'warning');
}
} catch (err) {
onFatalError(err);
}
});
const closeSocket = () => {
this.uppy.log(`Closing socket ${file.id}`, 'info');
clearTimeout(activityTimeout);
if (socket) socket.close();
socket = undefined;
};
socketAbortController.signal.addEventListener('abort', () => {
closeSocket();
});
});
await pRetry(reconnectWebsocket, {
retries: retryCount,
signal: socketAbortController.signal,
onFailedAttempt: () => {
if (socketAbortController.signal.aborted) return; // don't log in this case
this.uppy.log(`Retrying websocket ${file.id}`, 'info');
}
});
})().abortOn(socketAbortController.signal);
} catch (err) {
if (socketAbortController.signal.aborted) return;
onFatalError(err);
}
};
const pause = newPausedState => {
if (!capabilities.resumableUploads) return;
isPaused = newPausedState;
if (socket) sendState();
if (newPausedState) {
var _socketAbortControlle3;
// Remove this file from the queue so another file can start in its place.
socket.send('pause', {});
queuedRequest.abort();
(_socketAbortControlle3 = socketAbortController) == null || _socketAbortControlle3.abort == null ? void 0 : _socketAbortControlle3.abort(); // close socket to free up the request for other uploads
} else {
// Resuming an upload should be queued, else you could pause and then
// resume a queued upload to make it skip the queue.
queuedRequest.abort();
queuedRequest = queue.run(() => {
socket.open();
socket.send('resume', {});
return () => {};
});
createWebsocket();
}
});
eventManager.onPauseAll(file.id, () => {
socket.send('pause', {});
queuedRequest.abort();
});
eventManager.onCancelAll(file.id, function (_temp) {
};
const onFileRemove = targetFile => {
var _socketAbortControlle4;
if (!capabilities.individualCancellation) return;
if (targetFile.id !== file.id) return;
socketSend('cancel');
(_socketAbortControlle4 = socketAbortController) == null || _socketAbortControlle4.abort == null ? void 0 : _socketAbortControlle4.abort();
this.uppy.log(`upload ${file.id} was removed`, 'info');
resolve();
};
const onCancelAll = _ref7 => {
var _socketAbortControlle5;
let {
reason
} = _temp === void 0 ? {} : _temp;
} = _ref7;
if (reason === 'user') {
socket.send('cancel', {});
queuedRequest.abort();
socketSend('cancel');
}
resolve(`upload ${file.id} was canceled`);
});
eventManager.onResumeAll(file.id, () => {
queuedRequest.abort();
if (file.error) {
socket.send('pause', {});
}
queuedRequest = queue.run(() => {
socket.open();
socket.send('resume', {});
return () => {};
});
});
eventManager.onRetry(file.id, () => {
// Only do the retry if the upload is actually in progress;
// else we could try to send these messages when the upload is still queued.
// We may need a better check for this since the socket may also be closed
// for other reasons, like network failures.
if (socket.isOpen) {
socket.send('pause', {});
socket.send('resume', {});
}
});
eventManager.onRetryAll(file.id, () => {
// See the comment in the onRetry() call
if (socket.isOpen) {
socket.send('pause', {});
socket.send('resume', {});
}
});
socket.on('progress', progressData => emitSocketProgress(this, progressData, file));
socket.on('error', errData => {
const {
message
} = errData.error;
const error = Object.assign(new Error(message), {
cause: errData.error
});
// If the remote retry optimisation should not be used,
// close the socket—this will tell companion to clear state and delete the file.
if (!this.opts.useFastRemoteRetry) {
// Remove the serverToken so that a new one will be created for the retry.
this.uppy.setFileState(file.id, {
serverToken: null
});
} else {
socket.close();
}
this.uppy.emit('upload-error', file, error);
queuedRequest.done();
reject(error);
});
socket.on('success', data => {
const uploadResp = {
uploadURL: data.url
};
this.uppy.emit('upload-success', file, uploadResp);
queuedRequest.done();
socket.close();
(_socketAbortControlle5 = socketAbortController) == null || _socketAbortControlle5.abort == null ? void 0 : _socketAbortControlle5.abort();
this.uppy.log(`upload ${file.id} was canceled`, 'info');
resolve();
};
const onFilePausedChange = (targetFileId, newPausedState) => {
if (targetFileId !== file.id) return;
pause(newPausedState);
};
const onPauseAll = () => pause(true);
const onResumeAll = () => pause(false);
this.uppy.on('file-removed', onFileRemove);
this.uppy.on('cancel-all', onCancelAll);
this.uppy.on('upload-pause', onFilePausedChange);
this.uppy.on('pause-all', onPauseAll);
this.uppy.on('resume-all', onResumeAll);
removeEventHandlers = () => {
this.uppy.off('file-removed', onFileRemove);
this.uppy.off('cancel-all', onCancelAll);
this.uppy.off('upload-pause', onFilePausedChange);
this.uppy.off('pause-all', onPauseAll);
this.uppy.off('resume-all', onResumeAll);
};
signal.addEventListener('abort', () => {
var _socketAbortControlle6;
(_socketAbortControlle6 = socketAbortController) == null ? void 0 : _socketAbortControlle6.abort();
});
queuedRequest = queue.run(() => {
if (file.isPaused) {
socket.send('pause', {});
} else {
socket.open();
}
return () => {};
});
createWebsocket();
});
} finally {
removeEventHandlers == null ? void 0 : removeEventHandlers();
}
}
function _getUrl2(url) {
if (/^(https?:|)\/\//.test(url)) {
return url;
}
return `${this.hostname}/${url}`;
}
RequestClient.VERSION = packageJson.version;
{
"name": "@uppy/companion-client",
"description": "Client library for communication with Companion. Intended for use in Uppy plugins.",
"version": "3.5.0",
"version": "3.6.0",
"license": "MIT",

@@ -25,4 +25,5 @@ "main": "lib/index.js",

"dependencies": {
"@uppy/utils": "^5.5.2",
"namespace-emitter": "^2.0.1"
"@uppy/utils": "^5.6.0",
"namespace-emitter": "^2.0.1",
"p-retry": "^6.1.0"
},

@@ -29,0 +30,0 @@ "devDependencies": {

@@ -10,2 +10,1 @@ 'use strict'

export { default as SearchProvider } from './SearchProvider.js'
export { default as Socket } from './Socket.js'

@@ -33,4 +33,4 @@ 'use strict'

constructor (uppy, opts, getQueue) {
super(uppy, opts, getQueue)
constructor (uppy, opts) {
super(uppy, opts)
this.provider = opts.provider

@@ -144,4 +144,3 @@ this.id = this.provider

window.removeEventListener('message', handleToken)
this.setAuthToken(data.token)
resolve()
this.setAuthToken(data.token).then(() => resolve()).catch(reject)
}

@@ -165,4 +164,5 @@ window.addEventListener('message', handleToken)

try {
// throw Object.assign(new Error(), { isAuthError: true }) // testing simulate access token expired (to refresh token)
// A better way to test this is for example with Google Drive:
// to test simulate access token expired (leading to a token token refresh),
// see mockAccessTokenExpiredError in companion/drive.
// If you want to test refresh token *and* access token invalid, do this for example with Google Drive:
// While uploading, go to your google account settings,

@@ -174,3 +174,4 @@ // "Third-party apps & services", then click "Companion" and "Remove access".

// only handle auth errors (401 from provider), and only handle them if we have a (refresh) token
if (!err.isAuthError || !(await this.#getAuthToken())) throw err
const authTokenAfter = await this.#getAuthToken()
if (!err.isAuthError || !authTokenAfter) throw err

@@ -182,2 +183,3 @@ if (this.#refreshingTokenPromise == null) {

try {
this.uppy.log(`[CompanionClient] Refreshing expired auth token`, 'info')
const response = await super.request({ path: this.refreshTokenUrl(), method: 'POST' })

@@ -184,0 +186,0 @@ await this.setAuthToken(response.uppyAuthToken)

'use strict'
// eslint-disable-next-line import/no-extraneous-dependencies
import pRetry, { AbortError } from 'p-retry'
import fetchWithNetworkError from '@uppy/utils/lib/fetchWithNetworkError'

@@ -7,6 +10,4 @@ import ErrorWithCause from '@uppy/utils/lib/ErrorWithCause'

import getSocketHost from '@uppy/utils/lib/getSocketHost'
import EventManager from '@uppy/utils/lib/EventManager'
import AuthError from './AuthError.js'
import Socket from './Socket.js'

@@ -20,10 +21,23 @@ import packageJson from '../package.json'

const retryCount = 10 // set to a low number, like 2 to test manual user retries
const socketActivityTimeoutMs = 5 * 60 * 1000 // set to a low number like 10000 to test this
const authErrorStatusCode = 401
class HttpError extends Error {
statusCode
constructor({ statusCode, message }) {
super(message)
this.statusCode = statusCode
}
}
async function handleJSONResponse (res) {
if (res.status === 401) {
if (res.status === authErrorStatusCode) {
throw new AuthError()
}
const jsonPromise = res.json()
if (res.ok) {
return jsonPromise
return res.json()
}

@@ -33,3 +47,4 @@

try {
const errData = await jsonPromise
const errData = await res.json()
errMsg = errData.message ? `${errMsg} message: ${errData.message}` : errMsg

@@ -42,3 +57,4 @@ errMsg = errData.requestId

}
throw new Error(errMsg)
throw new HttpError({ statusCode: res.status, message: errMsg })
}

@@ -54,6 +70,5 @@

constructor (uppy, opts, getQueue) {
constructor (uppy, opts) {
this.uppy = uppy
this.opts = opts
this.getQueue = getQueue
this.onReceiveResponse = this.onReceiveResponse.bind(this)

@@ -200,5 +215,8 @@ this.#companionHeaders = opts?.companionHeaders

if (!skipPostResponse) this.onReceiveResponse(response)
return handleJSONResponse(response)
return await handleJSONResponse(response)
} catch (err) {
if (err?.isAuthError) throw err
// pass these through
if (err instanceof AuthError || err.name === 'AbortError') throw err
throw new ErrorWithCause(`Could not ${method} ${this.#getUrl(path)}`, {

@@ -231,24 +249,65 @@ cause: err,

/**
* Remote uploading consists of two steps:
* 1. #requestSocketToken which starts the download/upload in companion and returns a unique token for the upload.
* Then companion will halt the upload until:
* 2. #awaitRemoteFileUpload is called, which will open/ensure a websocket connection towards companion, with the
* previously generated token provided. It returns a promise that will resolve/reject once the file has finished
* uploading or is otherwise done (failed, canceled)
*
* @param {*} file
* @param {*} reqBody
* @param {*} options
* @returns
*/
async uploadRemoteFile (file, reqBody, options = {}) {
try {
if (file.serverToken) {
return await this.connectToServerSocket(file, this.getQueue())
}
const queueRequestSocketToken = this.getQueue().wrapPromiseFunction(
this.#requestSocketToken,
{ priority: -1 },
)
const serverToken = await queueRequestSocketToken(file, reqBody).abortOn(
options.signal,
)
const { signal, getQueue } = options
if (!this.uppy.getState().files[file.id]) return undefined
return await pRetry(async () => {
// if we already have a serverToken, assume that we are resuming the existing server upload id
const existingServerToken = this.uppy.getFile(file.id)?.serverToken;
if (existingServerToken != null) {
this.uppy.log(`Connecting to exiting websocket ${existingServerToken}`)
return this.#awaitRemoteFileUpload({ file, queue: getQueue(), signal })
}
this.uppy.setFileState(file.id, { serverToken })
return await this.connectToServerSocket(
this.uppy.getFile(file.id),
this.getQueue(),
)
const queueRequestSocketToken = getQueue().wrapPromiseFunction(async (...args) => {
try {
return await this.#requestSocketToken(...args)
} catch (outerErr) {
// throwing AbortError will cause p-retry to stop retrying
if (outerErr instanceof AuthError) throw new AbortError(outerErr)
if (outerErr.cause == null) throw outerErr
const err = outerErr.cause
const isRetryableHttpError = () => (
[408, 409, 429, 418, 423].includes(err.statusCode)
|| (err.statusCode >= 500 && err.statusCode <= 599 && ![501, 505].includes(err.statusCode))
)
if (err instanceof HttpError && !isRetryableHttpError()) throw new AbortError(err);
// p-retry will retry most other errors,
// but it will not retry TypeError (except network error TypeErrors)
throw err
}
}, { priority: -1 })
const serverToken = await queueRequestSocketToken({ file, postBody: reqBody, signal }).abortOn(signal)
if (!this.uppy.getFile(file.id)) return undefined // has file since been removed?
this.uppy.setFileState(file.id, { serverToken })
return this.#awaitRemoteFileUpload({
file: this.uppy.getFile(file.id), // re-fetching file because it might have changed in the meantime
queue: getQueue(),
signal
})
}, { retries: retryCount, signal, onFailedAttempt: (err) => this.uppy.log(`Retrying upload due to: ${err.message}`, 'warning') });
} catch (err) {
if (err?.cause?.name === 'AbortError') {
// this is a bit confusing, but note that an error with the `name` prop set to 'AbortError' (from AbortController)
// is not the same as `p-retry` `AbortError`
if (err.name === 'AbortError') {
// The file upload was aborted, it’s not an error

@@ -258,3 +317,2 @@ return undefined

this.uppy.setFileState(file.id, { serverToken: undefined })
this.uppy.emit('upload-error', file, err)

@@ -265,3 +323,3 @@ throw err

#requestSocketToken = async (file, postBody) => {
#requestSocketToken = async ({ file, postBody, signal }) => {
if (file.remote.url == null) {

@@ -274,3 +332,3 @@ throw new Error('Cannot connect to an undefined URL')

...postBody,
})
}, signal)

@@ -281,131 +339,207 @@ return res.token

/**
* @param {UppyFile} file
* This method will ensure a websocket for the specified file and returns a promise that resolves
* when the file has finished downloading, or rejects if it fails.
* It will retry if the websocket gets disconnected
*
* @param {{ file: UppyFile, queue: RateLimitedQueue, signal: AbortSignal }} file
*/
async connectToServerSocket (file, queue) {
return new Promise((resolve, reject) => {
const token = file.serverToken
const host = getSocketHost(file.remote.companionUrl)
const socket = new Socket({
target: `${host}/api/${token}`,
autoOpen: false,
})
const eventManager = new EventManager(this.uppy)
async #awaitRemoteFileUpload ({ file, queue, signal }) {
let removeEventHandlers
let queuedRequest
const { capabilities } = this.uppy.getState()
eventManager.onFileRemove(file.id, () => {
socket.send('cancel', {})
queuedRequest.abort()
resolve(`upload ${file.id} was removed`)
})
try {
return await new Promise((resolve, reject) => {
const token = file.serverToken
const host = getSocketHost(file.remote.companionUrl)
eventManager.onPause(file.id, (isPaused) => {
if (isPaused) {
// Remove this file from the queue so another file can start in its place.
socket.send('pause', {})
queuedRequest.abort()
} else {
// Resuming an upload should be queued, else you could pause and then
// resume a queued upload to make it skip the queue.
queuedRequest.abort()
queuedRequest = queue.run(() => {
socket.open()
socket.send('resume', {})
/** @type {WebSocket} */
let socket
/** @type {AbortController?} */
let socketAbortController
let activityTimeout
return () => {}
})
}
})
let { isPaused } = file
eventManager.onPauseAll(file.id, () => {
socket.send('pause', {})
queuedRequest.abort()
})
const socketSend = (action, payload) => {
if (socket == null || socket.readyState !== socket.OPEN) {
this.uppy.log(`Cannot send "${action}" to socket ${file.id} because the socket state was ${String(socket?.readyState)}`, 'warning')
return
}
eventManager.onCancelAll(file.id, ({ reason } = {}) => {
if (reason === 'user') {
socket.send('cancel', {})
queuedRequest.abort()
}
resolve(`upload ${file.id} was canceled`)
})
socket.send(JSON.stringify({
action,
payload: payload ?? {},
}))
};
eventManager.onResumeAll(file.id, () => {
queuedRequest.abort()
if (file.error) {
socket.send('pause', {})
function sendState() {
if (!capabilities.resumableUploads) return;
if (isPaused) socketSend('pause')
else socketSend('resume')
}
queuedRequest = queue.run(() => {
socket.open()
socket.send('resume', {})
return () => {}
})
})
const createWebsocket = async () => {
if (socketAbortController) socketAbortController.abort()
socketAbortController = new AbortController()
eventManager.onRetry(file.id, () => {
// Only do the retry if the upload is actually in progress;
// else we could try to send these messages when the upload is still queued.
// We may need a better check for this since the socket may also be closed
// for other reasons, like network failures.
if (socket.isOpen) {
socket.send('pause', {})
socket.send('resume', {})
}
})
const onFatalError = (err) => {
// Remove the serverToken so that a new one will be created for the retry.
this.uppy.setFileState(file.id, { serverToken: null })
socketAbortController?.abort?.()
reject(err)
}
// todo instead implement the ability for users to cancel / retry *currently uploading files* in the UI
function resetActivityTimeout() {
clearTimeout(activityTimeout)
if (isPaused) return
activityTimeout = setTimeout(() => onFatalError(new Error('Timeout waiting for message from Companion socket')), socketActivityTimeoutMs)
}
eventManager.onRetryAll(file.id, () => {
// See the comment in the onRetry() call
if (socket.isOpen) {
socket.send('pause', {})
socket.send('resume', {})
try {
await queue.wrapPromiseFunction(async () => {
// eslint-disable-next-line promise/param-names
const reconnectWebsocket = async () => new Promise((resolveSocket, rejectSocket) => {
socket = new WebSocket(`${host}/api/${token}`)
resetActivityTimeout()
socket.addEventListener('close', () => {
socket = undefined
rejectSocket(new Error('Socket closed unexpectedly'))
})
socket.addEventListener('error', (error) => {
this.uppy.log(`Companion socket error ${JSON.stringify(error)}, closing socket`, 'warning')
socket.close() // will 'close' event to be emitted
})
socket.addEventListener('open', () => {
sendState()
})
socket.addEventListener('message', (e) => {
resetActivityTimeout()
try {
const { action, payload } = JSON.parse(e.data)
switch (action) {
case 'progress': {
emitSocketProgress(this, payload, file)
break;
}
case 'success': {
this.uppy.emit('upload-success', file, { uploadURL: payload.url })
socketAbortController?.abort?.()
resolve()
break;
}
case 'error': {
const { message } = payload.error
throw Object.assign(new Error(message), { cause: payload.error })
}
default:
this.uppy.log(`Companion socket unknown action ${action}`, 'warning')
}
} catch (err) {
onFatalError(err)
}
})
const closeSocket = () => {
this.uppy.log(`Closing socket ${file.id}`, 'info')
clearTimeout(activityTimeout)
if (socket) socket.close()
socket = undefined
}
socketAbortController.signal.addEventListener('abort', () => {
closeSocket()
})
})
await pRetry(reconnectWebsocket, {
retries: retryCount,
signal: socketAbortController.signal,
onFailedAttempt: () => {
if (socketAbortController.signal.aborted) return // don't log in this case
this.uppy.log(`Retrying websocket ${file.id}`, 'info')
},
});
})().abortOn(socketAbortController.signal);
} catch (err) {
if (socketAbortController.signal.aborted) return
onFatalError(err)
}
}
})
socket.on('progress', (progressData) => emitSocketProgress(this, progressData, file))
const pause = (newPausedState) => {
if (!capabilities.resumableUploads) return;
socket.on('error', (errData) => {
const { message } = errData.error
const error = Object.assign(new Error(message), {
cause: errData.error,
})
isPaused = newPausedState
if (socket) sendState()
// If the remote retry optimisation should not be used,
// close the socket—this will tell companion to clear state and delete the file.
if (!this.opts.useFastRemoteRetry) {
// Remove the serverToken so that a new one will be created for the retry.
this.uppy.setFileState(file.id, {
serverToken: null,
})
} else {
socket.close()
if (newPausedState) {
// Remove this file from the queue so another file can start in its place.
socketAbortController?.abort?.() // close socket to free up the request for other uploads
} else {
// Resuming an upload should be queued, else you could pause and then
// resume a queued upload to make it skip the queue.
createWebsocket()
}
}
this.uppy.emit('upload-error', file, error)
queuedRequest.done()
reject(error)
})
const onFileRemove = (targetFile) => {
if (!capabilities.individualCancellation) return
if (targetFile.id !== file.id) return
socketSend('cancel')
socketAbortController?.abort?.()
this.uppy.log(`upload ${file.id} was removed`, 'info')
resolve()
}
socket.on('success', (data) => {
const uploadResp = {
uploadURL: data.url,
const onCancelAll = ({ reason }) => {
if (reason === 'user') {
socketSend('cancel')
}
socketAbortController?.abort?.()
this.uppy.log(`upload ${file.id} was canceled`, 'info')
resolve()
};
const onFilePausedChange = (targetFileId, newPausedState) => {
if (targetFileId !== file.id) return
pause(newPausedState)
}
this.uppy.emit('upload-success', file, uploadResp)
queuedRequest.done()
socket.close()
resolve()
})
const onPauseAll = () => pause(true)
const onResumeAll = () => pause(false)
queuedRequest = queue.run(() => {
if (file.isPaused) {
socket.send('pause', {})
} else {
socket.open()
this.uppy.on('file-removed', onFileRemove)
this.uppy.on('cancel-all', onCancelAll)
this.uppy.on('upload-pause', onFilePausedChange)
this.uppy.on('pause-all', onPauseAll)
this.uppy.on('resume-all', onResumeAll)
removeEventHandlers = () => {
this.uppy.off('file-removed', onFileRemove)
this.uppy.off('cancel-all', onCancelAll)
this.uppy.off('upload-pause', onFilePausedChange)
this.uppy.off('pause-all', onPauseAll)
this.uppy.off('resume-all', onResumeAll)
}
return () => {}
signal.addEventListener('abort', () => {
socketAbortController?.abort();
})
createWebsocket()
})
})
} finally {
removeEventHandlers?.()
}
}
}

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc