@uppy/companion-client
Advanced tools
Comparing version 3.5.0 to 3.6.0
@@ -8,3 +8,2 @@ 'use strict'; | ||
export { default as Provider } from './Provider.js'; | ||
export { default as SearchProvider } from './SearchProvider.js'; | ||
export { default as Socket } from './Socket.js'; | ||
export { default as SearchProvider } from './SearchProvider.js'; |
@@ -32,4 +32,4 @@ 'use strict'; | ||
export default class Provider extends RequestClient { | ||
constructor(uppy, opts, getQueue) { | ||
super(uppy, opts, getQueue); | ||
constructor(uppy, opts) { | ||
super(uppy, opts); | ||
Object.defineProperty(this, _removeAuthToken, { | ||
@@ -147,4 +147,3 @@ value: _removeAuthToken2 | ||
window.removeEventListener('message', handleToken); | ||
this.setAuthToken(data.token); | ||
resolve(); | ||
this.setAuthToken(data.token).then(() => resolve()).catch(reject); | ||
}; | ||
@@ -165,4 +164,5 @@ window.addEventListener('message', handleToken); | ||
try { | ||
// throw Object.assign(new Error(), { isAuthError: true }) // testing simulate access token expired (to refresh token) | ||
// A better way to test this is for example with Google Drive: | ||
// to test simulate access token expired (leading to a token token refresh), | ||
// see mockAccessTokenExpiredError in companion/drive. | ||
// If you want to test refresh token *and* access token invalid, do this for example with Google Drive: | ||
// While uploading, go to your google account settings, | ||
@@ -174,3 +174,4 @@ // "Third-party apps & services", then click "Companion" and "Remove access". | ||
// only handle auth errors (401 from provider), and only handle them if we have a (refresh) token | ||
if (!err.isAuthError || !(await _classPrivateFieldLooseBase(this, _getAuthToken)[_getAuthToken]())) throw err; | ||
const authTokenAfter = await _classPrivateFieldLooseBase(this, _getAuthToken)[_getAuthToken](); | ||
if (!err.isAuthError || !authTokenAfter) throw err; | ||
if (_classPrivateFieldLooseBase(this, _refreshingTokenPromise)[_refreshingTokenPromise] == null) { | ||
@@ -181,2 +182,3 @@ // Many provider requests may be starting at once, however refresh token should only be called once. | ||
try { | ||
this.uppy.log(`[CompanionClient] Refreshing expired auth token`, 'info'); | ||
const response = await super.request({ | ||
@@ -183,0 +185,0 @@ path: this.refreshTokenUrl(), |
'use strict'; | ||
// eslint-disable-next-line import/no-extraneous-dependencies | ||
let _Symbol$for; | ||
@@ -7,2 +8,3 @@ function _classPrivateFieldLooseBase(receiver, privateKey) { if (!Object.prototype.hasOwnProperty.call(receiver, privateKey)) { throw new TypeError("attempted to use private field on non-instance"); } return receiver; } | ||
function _classPrivateFieldLooseKey(name) { return "__private_" + id++ + "_" + name; } | ||
import pRetry, { AbortError } from 'p-retry'; | ||
import fetchWithNetworkError from '@uppy/utils/lib/fetchWithNetworkError'; | ||
@@ -12,7 +14,5 @@ import ErrorWithCause from '@uppy/utils/lib/ErrorWithCause'; | ||
import getSocketHost from '@uppy/utils/lib/getSocketHost'; | ||
import EventManager from '@uppy/utils/lib/EventManager'; | ||
import AuthError from './AuthError.js'; | ||
import Socket from './Socket.js'; | ||
const packageJson = { | ||
"version": "3.5.0" | ||
"version": "3.6.0" | ||
}; // Remove the trailing slash so we can always safely append /xyz. | ||
@@ -22,13 +22,27 @@ function stripSlash(url) { | ||
} | ||
const retryCount = 10; // set to a low number, like 2 to test manual user retries | ||
const socketActivityTimeoutMs = 5 * 60 * 1000; // set to a low number like 10000 to test this | ||
const authErrorStatusCode = 401; | ||
class HttpError extends Error { | ||
constructor(_ref) { | ||
let { | ||
statusCode, | ||
message | ||
} = _ref; | ||
super(message); | ||
this.statusCode = void 0; | ||
this.statusCode = statusCode; | ||
} | ||
} | ||
async function handleJSONResponse(res) { | ||
if (res.status === 401) { | ||
if (res.status === authErrorStatusCode) { | ||
throw new AuthError(); | ||
} | ||
const jsonPromise = res.json(); | ||
if (res.ok) { | ||
return jsonPromise; | ||
return res.json(); | ||
} | ||
let errMsg = `Failed request with status: ${res.status}. ${res.statusText}`; | ||
try { | ||
const errData = await jsonPromise; | ||
const errData = await res.json(); | ||
errMsg = errData.message ? `${errMsg} message: ${errData.message}` : errMsg; | ||
@@ -39,3 +53,6 @@ errMsg = errData.requestId ? `${errMsg} request-Id: ${errData.requestId}` : errMsg; | ||
} | ||
throw new Error(errMsg); | ||
throw new HttpError({ | ||
statusCode: res.status, | ||
message: errMsg | ||
}); | ||
} | ||
@@ -48,5 +65,16 @@ | ||
var _requestSocketToken = /*#__PURE__*/_classPrivateFieldLooseKey("requestSocketToken"); | ||
var _awaitRemoteFileUpload = /*#__PURE__*/_classPrivateFieldLooseKey("awaitRemoteFileUpload"); | ||
_Symbol$for = Symbol.for('uppy test: getCompanionHeaders'); | ||
export default class RequestClient { | ||
constructor(uppy, opts, getQueue) { | ||
constructor(uppy, opts) { | ||
/** | ||
* This method will ensure a websocket for the specified file and returns a promise that resolves | ||
* when the file has finished downloading, or rejects if it fails. | ||
* It will retry if the websocket gets disconnected | ||
* | ||
* @param {{ file: UppyFile, queue: RateLimitedQueue, signal: AbortSignal }} file | ||
*/ | ||
Object.defineProperty(this, _awaitRemoteFileUpload, { | ||
value: _awaitRemoteFileUpload2 | ||
}); | ||
Object.defineProperty(this, _getUrl, { | ||
@@ -61,3 +89,8 @@ value: _getUrl2 | ||
writable: true, | ||
value: async (file, postBody) => { | ||
value: async _ref2 => { | ||
let { | ||
file, | ||
postBody, | ||
signal | ||
} = _ref2; | ||
if (file.remote.url == null) { | ||
@@ -69,3 +102,3 @@ throw new Error('Cannot connect to an undefined URL'); | ||
...postBody | ||
}); | ||
}, signal); | ||
return res.token; | ||
@@ -76,3 +109,2 @@ } | ||
this.opts = opts; | ||
this.getQueue = getQueue; | ||
this.onReceiveResponse = this.onReceiveResponse.bind(this); | ||
@@ -105,6 +137,6 @@ _classPrivateFieldLooseBase(this, _companionHeaders)[_companionHeaders] = opts == null ? void 0 : opts.companionHeaders; | ||
} | ||
onReceiveResponse(_ref) { | ||
onReceiveResponse(_ref3) { | ||
let { | ||
headers | ||
} = _ref; | ||
} = _ref3; | ||
const state = this.uppy.getState(); | ||
@@ -169,4 +201,4 @@ const companion = state.companion || {}; | ||
// filter to keep only allowed Headers | ||
return Object.fromEntries(Object.entries(headers).filter(_ref2 => { | ||
let [header] = _ref2; | ||
return Object.fromEntries(Object.entries(headers).filter(_ref4 => { | ||
let [header] = _ref4; | ||
if (!allowedHeaders.includes(header.toLowerCase())) { | ||
@@ -181,3 +213,3 @@ this.uppy.log(`[CompanionClient] excluding disallowed header ${header}`); | ||
/** @protected */ | ||
async request(_ref3) { | ||
async request(_ref5) { | ||
let { | ||
@@ -189,3 +221,3 @@ path, | ||
signal | ||
} = _ref3; | ||
} = _ref5; | ||
try { | ||
@@ -201,5 +233,6 @@ const headers = await this.preflightAndHeaders(path); | ||
if (!skipPostResponse) this.onReceiveResponse(response); | ||
return handleJSONResponse(response); | ||
return await handleJSONResponse(response); | ||
} catch (err) { | ||
if (err != null && err.isAuthError) throw err; | ||
// pass these through | ||
if (err instanceof AuthError || err.name === 'AbortError') throw err; | ||
throw new ErrorWithCause(`Could not ${method} ${_classPrivateFieldLooseBase(this, _getUrl)[_getUrl](path)}`, { | ||
@@ -256,3 +289,18 @@ cause: err | ||
} | ||
/** | ||
* Remote uploading consists of two steps: | ||
* 1. #requestSocketToken which starts the download/upload in companion and returns a unique token for the upload. | ||
* Then companion will halt the upload until: | ||
* 2. #awaitRemoteFileUpload is called, which will open/ensure a websocket connection towards companion, with the | ||
* previously generated token provided. It returns a promise that will resolve/reject once the file has finished | ||
* uploading or is otherwise done (failed, canceled) | ||
* | ||
* @param {*} file | ||
* @param {*} reqBody | ||
* @param {*} options | ||
* @returns | ||
*/ | ||
async uploadRemoteFile(file, reqBody, options) { | ||
var _this = this; | ||
if (options === void 0) { | ||
@@ -262,23 +310,64 @@ options = {}; | ||
try { | ||
if (file.serverToken) { | ||
return await this.connectToServerSocket(file, this.getQueue()); | ||
} | ||
const queueRequestSocketToken = this.getQueue().wrapPromiseFunction(_classPrivateFieldLooseBase(this, _requestSocketToken)[_requestSocketToken], { | ||
priority: -1 | ||
const { | ||
signal, | ||
getQueue | ||
} = options; | ||
return await pRetry(async () => { | ||
var _this$uppy$getFile; | ||
// if we already have a serverToken, assume that we are resuming the existing server upload id | ||
const existingServerToken = (_this$uppy$getFile = this.uppy.getFile(file.id)) == null ? void 0 : _this$uppy$getFile.serverToken; | ||
if (existingServerToken != null) { | ||
this.uppy.log(`Connecting to exiting websocket ${existingServerToken}`); | ||
return _classPrivateFieldLooseBase(this, _awaitRemoteFileUpload)[_awaitRemoteFileUpload]({ | ||
file, | ||
queue: getQueue(), | ||
signal | ||
}); | ||
} | ||
const queueRequestSocketToken = getQueue().wrapPromiseFunction(async function () { | ||
try { | ||
return await _classPrivateFieldLooseBase(_this, _requestSocketToken)[_requestSocketToken](...arguments); | ||
} catch (outerErr) { | ||
// throwing AbortError will cause p-retry to stop retrying | ||
if (outerErr instanceof AuthError) throw new AbortError(outerErr); | ||
if (outerErr.cause == null) throw outerErr; | ||
const err = outerErr.cause; | ||
const isRetryableHttpError = () => [408, 409, 429, 418, 423].includes(err.statusCode) || err.statusCode >= 500 && err.statusCode <= 599 && ![501, 505].includes(err.statusCode); | ||
if (err instanceof HttpError && !isRetryableHttpError()) throw new AbortError(err); | ||
// p-retry will retry most other errors, | ||
// but it will not retry TypeError (except network error TypeErrors) | ||
throw err; | ||
} | ||
}, { | ||
priority: -1 | ||
}); | ||
const serverToken = await queueRequestSocketToken({ | ||
file, | ||
postBody: reqBody, | ||
signal | ||
}).abortOn(signal); | ||
if (!this.uppy.getFile(file.id)) return undefined; // has file since been removed? | ||
this.uppy.setFileState(file.id, { | ||
serverToken | ||
}); | ||
return _classPrivateFieldLooseBase(this, _awaitRemoteFileUpload)[_awaitRemoteFileUpload]({ | ||
file: this.uppy.getFile(file.id), | ||
// re-fetching file because it might have changed in the meantime | ||
queue: getQueue(), | ||
signal | ||
}); | ||
}, { | ||
retries: retryCount, | ||
signal, | ||
onFailedAttempt: err => this.uppy.log(`Retrying upload due to: ${err.message}`, 'warning') | ||
}); | ||
const serverToken = await queueRequestSocketToken(file, reqBody).abortOn(options.signal); | ||
if (!this.uppy.getState().files[file.id]) return undefined; | ||
this.uppy.setFileState(file.id, { | ||
serverToken | ||
}); | ||
return await this.connectToServerSocket(this.uppy.getFile(file.id), this.getQueue()); | ||
} catch (err) { | ||
var _err$cause; | ||
if ((err == null || (_err$cause = err.cause) == null ? void 0 : _err$cause.name) === 'AbortError') { | ||
// this is a bit confusing, but note that an error with the `name` prop set to 'AbortError' (from AbortController) | ||
// is not the same as `p-retry` `AbortError` | ||
if (err.name === 'AbortError') { | ||
// The file upload was aborted, it’s not an error | ||
return undefined; | ||
} | ||
this.uppy.setFileState(file.id, { | ||
serverToken: undefined | ||
}); | ||
this.uppy.emit('upload-error', file, err); | ||
@@ -288,127 +377,210 @@ throw err; | ||
} | ||
/** | ||
* @param {UppyFile} file | ||
*/ | ||
async connectToServerSocket(file, queue) { | ||
return new Promise((resolve, reject) => { | ||
} | ||
function _getUrl2(url) { | ||
if (/^(https?:|)\/\//.test(url)) { | ||
return url; | ||
} | ||
return `${this.hostname}/${url}`; | ||
} | ||
async function _awaitRemoteFileUpload2(_ref6) { | ||
let { | ||
file, | ||
queue, | ||
signal | ||
} = _ref6; | ||
let removeEventHandlers; | ||
const { | ||
capabilities | ||
} = this.uppy.getState(); | ||
try { | ||
return await new Promise((resolve, reject) => { | ||
const token = file.serverToken; | ||
const host = getSocketHost(file.remote.companionUrl); | ||
const socket = new Socket({ | ||
target: `${host}/api/${token}`, | ||
autoOpen: false | ||
}); | ||
const eventManager = new EventManager(this.uppy); | ||
let queuedRequest; | ||
eventManager.onFileRemove(file.id, () => { | ||
socket.send('cancel', {}); | ||
queuedRequest.abort(); | ||
resolve(`upload ${file.id} was removed`); | ||
}); | ||
eventManager.onPause(file.id, isPaused => { | ||
if (isPaused) { | ||
/** @type {WebSocket} */ | ||
let socket; | ||
/** @type {AbortController?} */ | ||
let socketAbortController; | ||
let activityTimeout; | ||
let { | ||
isPaused | ||
} = file; | ||
const socketSend = (action, payload) => { | ||
if (socket == null || socket.readyState !== socket.OPEN) { | ||
var _socket; | ||
this.uppy.log(`Cannot send "${action}" to socket ${file.id} because the socket state was ${String((_socket = socket) == null ? void 0 : _socket.readyState)}`, 'warning'); | ||
return; | ||
} | ||
socket.send(JSON.stringify({ | ||
action, | ||
payload: payload != null ? payload : {} | ||
})); | ||
}; | ||
function sendState() { | ||
if (!capabilities.resumableUploads) return; | ||
if (isPaused) socketSend('pause');else socketSend('resume'); | ||
} | ||
const createWebsocket = async () => { | ||
if (socketAbortController) socketAbortController.abort(); | ||
socketAbortController = new AbortController(); | ||
const onFatalError = err => { | ||
var _socketAbortControlle; | ||
// Remove the serverToken so that a new one will be created for the retry. | ||
this.uppy.setFileState(file.id, { | ||
serverToken: null | ||
}); | ||
(_socketAbortControlle = socketAbortController) == null || _socketAbortControlle.abort == null ? void 0 : _socketAbortControlle.abort(); | ||
reject(err); | ||
}; | ||
// todo instead implement the ability for users to cancel / retry *currently uploading files* in the UI | ||
function resetActivityTimeout() { | ||
clearTimeout(activityTimeout); | ||
if (isPaused) return; | ||
activityTimeout = setTimeout(() => onFatalError(new Error('Timeout waiting for message from Companion socket')), socketActivityTimeoutMs); | ||
} | ||
try { | ||
await queue.wrapPromiseFunction(async () => { | ||
// eslint-disable-next-line promise/param-names | ||
const reconnectWebsocket = async () => new Promise((resolveSocket, rejectSocket) => { | ||
socket = new WebSocket(`${host}/api/${token}`); | ||
resetActivityTimeout(); | ||
socket.addEventListener('close', () => { | ||
socket = undefined; | ||
rejectSocket(new Error('Socket closed unexpectedly')); | ||
}); | ||
socket.addEventListener('error', error => { | ||
this.uppy.log(`Companion socket error ${JSON.stringify(error)}, closing socket`, 'warning'); | ||
socket.close(); // will 'close' event to be emitted | ||
}); | ||
socket.addEventListener('open', () => { | ||
sendState(); | ||
}); | ||
socket.addEventListener('message', e => { | ||
resetActivityTimeout(); | ||
try { | ||
const { | ||
action, | ||
payload | ||
} = JSON.parse(e.data); | ||
switch (action) { | ||
case 'progress': | ||
{ | ||
emitSocketProgress(this, payload, file); | ||
break; | ||
} | ||
case 'success': | ||
{ | ||
var _socketAbortControlle2; | ||
this.uppy.emit('upload-success', file, { | ||
uploadURL: payload.url | ||
}); | ||
(_socketAbortControlle2 = socketAbortController) == null || _socketAbortControlle2.abort == null ? void 0 : _socketAbortControlle2.abort(); | ||
resolve(); | ||
break; | ||
} | ||
case 'error': | ||
{ | ||
const { | ||
message | ||
} = payload.error; | ||
throw Object.assign(new Error(message), { | ||
cause: payload.error | ||
}); | ||
} | ||
default: | ||
this.uppy.log(`Companion socket unknown action ${action}`, 'warning'); | ||
} | ||
} catch (err) { | ||
onFatalError(err); | ||
} | ||
}); | ||
const closeSocket = () => { | ||
this.uppy.log(`Closing socket ${file.id}`, 'info'); | ||
clearTimeout(activityTimeout); | ||
if (socket) socket.close(); | ||
socket = undefined; | ||
}; | ||
socketAbortController.signal.addEventListener('abort', () => { | ||
closeSocket(); | ||
}); | ||
}); | ||
await pRetry(reconnectWebsocket, { | ||
retries: retryCount, | ||
signal: socketAbortController.signal, | ||
onFailedAttempt: () => { | ||
if (socketAbortController.signal.aborted) return; // don't log in this case | ||
this.uppy.log(`Retrying websocket ${file.id}`, 'info'); | ||
} | ||
}); | ||
})().abortOn(socketAbortController.signal); | ||
} catch (err) { | ||
if (socketAbortController.signal.aborted) return; | ||
onFatalError(err); | ||
} | ||
}; | ||
const pause = newPausedState => { | ||
if (!capabilities.resumableUploads) return; | ||
isPaused = newPausedState; | ||
if (socket) sendState(); | ||
if (newPausedState) { | ||
var _socketAbortControlle3; | ||
// Remove this file from the queue so another file can start in its place. | ||
socket.send('pause', {}); | ||
queuedRequest.abort(); | ||
(_socketAbortControlle3 = socketAbortController) == null || _socketAbortControlle3.abort == null ? void 0 : _socketAbortControlle3.abort(); // close socket to free up the request for other uploads | ||
} else { | ||
// Resuming an upload should be queued, else you could pause and then | ||
// resume a queued upload to make it skip the queue. | ||
queuedRequest.abort(); | ||
queuedRequest = queue.run(() => { | ||
socket.open(); | ||
socket.send('resume', {}); | ||
return () => {}; | ||
}); | ||
createWebsocket(); | ||
} | ||
}); | ||
eventManager.onPauseAll(file.id, () => { | ||
socket.send('pause', {}); | ||
queuedRequest.abort(); | ||
}); | ||
eventManager.onCancelAll(file.id, function (_temp) { | ||
}; | ||
const onFileRemove = targetFile => { | ||
var _socketAbortControlle4; | ||
if (!capabilities.individualCancellation) return; | ||
if (targetFile.id !== file.id) return; | ||
socketSend('cancel'); | ||
(_socketAbortControlle4 = socketAbortController) == null || _socketAbortControlle4.abort == null ? void 0 : _socketAbortControlle4.abort(); | ||
this.uppy.log(`upload ${file.id} was removed`, 'info'); | ||
resolve(); | ||
}; | ||
const onCancelAll = _ref7 => { | ||
var _socketAbortControlle5; | ||
let { | ||
reason | ||
} = _temp === void 0 ? {} : _temp; | ||
} = _ref7; | ||
if (reason === 'user') { | ||
socket.send('cancel', {}); | ||
queuedRequest.abort(); | ||
socketSend('cancel'); | ||
} | ||
resolve(`upload ${file.id} was canceled`); | ||
}); | ||
eventManager.onResumeAll(file.id, () => { | ||
queuedRequest.abort(); | ||
if (file.error) { | ||
socket.send('pause', {}); | ||
} | ||
queuedRequest = queue.run(() => { | ||
socket.open(); | ||
socket.send('resume', {}); | ||
return () => {}; | ||
}); | ||
}); | ||
eventManager.onRetry(file.id, () => { | ||
// Only do the retry if the upload is actually in progress; | ||
// else we could try to send these messages when the upload is still queued. | ||
// We may need a better check for this since the socket may also be closed | ||
// for other reasons, like network failures. | ||
if (socket.isOpen) { | ||
socket.send('pause', {}); | ||
socket.send('resume', {}); | ||
} | ||
}); | ||
eventManager.onRetryAll(file.id, () => { | ||
// See the comment in the onRetry() call | ||
if (socket.isOpen) { | ||
socket.send('pause', {}); | ||
socket.send('resume', {}); | ||
} | ||
}); | ||
socket.on('progress', progressData => emitSocketProgress(this, progressData, file)); | ||
socket.on('error', errData => { | ||
const { | ||
message | ||
} = errData.error; | ||
const error = Object.assign(new Error(message), { | ||
cause: errData.error | ||
}); | ||
// If the remote retry optimisation should not be used, | ||
// close the socket—this will tell companion to clear state and delete the file. | ||
if (!this.opts.useFastRemoteRetry) { | ||
// Remove the serverToken so that a new one will be created for the retry. | ||
this.uppy.setFileState(file.id, { | ||
serverToken: null | ||
}); | ||
} else { | ||
socket.close(); | ||
} | ||
this.uppy.emit('upload-error', file, error); | ||
queuedRequest.done(); | ||
reject(error); | ||
}); | ||
socket.on('success', data => { | ||
const uploadResp = { | ||
uploadURL: data.url | ||
}; | ||
this.uppy.emit('upload-success', file, uploadResp); | ||
queuedRequest.done(); | ||
socket.close(); | ||
(_socketAbortControlle5 = socketAbortController) == null || _socketAbortControlle5.abort == null ? void 0 : _socketAbortControlle5.abort(); | ||
this.uppy.log(`upload ${file.id} was canceled`, 'info'); | ||
resolve(); | ||
}; | ||
const onFilePausedChange = (targetFileId, newPausedState) => { | ||
if (targetFileId !== file.id) return; | ||
pause(newPausedState); | ||
}; | ||
const onPauseAll = () => pause(true); | ||
const onResumeAll = () => pause(false); | ||
this.uppy.on('file-removed', onFileRemove); | ||
this.uppy.on('cancel-all', onCancelAll); | ||
this.uppy.on('upload-pause', onFilePausedChange); | ||
this.uppy.on('pause-all', onPauseAll); | ||
this.uppy.on('resume-all', onResumeAll); | ||
removeEventHandlers = () => { | ||
this.uppy.off('file-removed', onFileRemove); | ||
this.uppy.off('cancel-all', onCancelAll); | ||
this.uppy.off('upload-pause', onFilePausedChange); | ||
this.uppy.off('pause-all', onPauseAll); | ||
this.uppy.off('resume-all', onResumeAll); | ||
}; | ||
signal.addEventListener('abort', () => { | ||
var _socketAbortControlle6; | ||
(_socketAbortControlle6 = socketAbortController) == null ? void 0 : _socketAbortControlle6.abort(); | ||
}); | ||
queuedRequest = queue.run(() => { | ||
if (file.isPaused) { | ||
socket.send('pause', {}); | ||
} else { | ||
socket.open(); | ||
} | ||
return () => {}; | ||
}); | ||
createWebsocket(); | ||
}); | ||
} finally { | ||
removeEventHandlers == null ? void 0 : removeEventHandlers(); | ||
} | ||
} | ||
function _getUrl2(url) { | ||
if (/^(https?:|)\/\//.test(url)) { | ||
return url; | ||
} | ||
return `${this.hostname}/${url}`; | ||
} | ||
RequestClient.VERSION = packageJson.version; |
{ | ||
"name": "@uppy/companion-client", | ||
"description": "Client library for communication with Companion. Intended for use in Uppy plugins.", | ||
"version": "3.5.0", | ||
"version": "3.6.0", | ||
"license": "MIT", | ||
@@ -25,4 +25,5 @@ "main": "lib/index.js", | ||
"dependencies": { | ||
"@uppy/utils": "^5.5.2", | ||
"namespace-emitter": "^2.0.1" | ||
"@uppy/utils": "^5.6.0", | ||
"namespace-emitter": "^2.0.1", | ||
"p-retry": "^6.1.0" | ||
}, | ||
@@ -29,0 +30,0 @@ "devDependencies": { |
@@ -10,2 +10,1 @@ 'use strict' | ||
export { default as SearchProvider } from './SearchProvider.js' | ||
export { default as Socket } from './Socket.js' |
@@ -33,4 +33,4 @@ 'use strict' | ||
constructor (uppy, opts, getQueue) { | ||
super(uppy, opts, getQueue) | ||
constructor (uppy, opts) { | ||
super(uppy, opts) | ||
this.provider = opts.provider | ||
@@ -144,4 +144,3 @@ this.id = this.provider | ||
window.removeEventListener('message', handleToken) | ||
this.setAuthToken(data.token) | ||
resolve() | ||
this.setAuthToken(data.token).then(() => resolve()).catch(reject) | ||
} | ||
@@ -165,4 +164,5 @@ window.addEventListener('message', handleToken) | ||
try { | ||
// throw Object.assign(new Error(), { isAuthError: true }) // testing simulate access token expired (to refresh token) | ||
// A better way to test this is for example with Google Drive: | ||
// to test simulate access token expired (leading to a token token refresh), | ||
// see mockAccessTokenExpiredError in companion/drive. | ||
// If you want to test refresh token *and* access token invalid, do this for example with Google Drive: | ||
// While uploading, go to your google account settings, | ||
@@ -174,3 +174,4 @@ // "Third-party apps & services", then click "Companion" and "Remove access". | ||
// only handle auth errors (401 from provider), and only handle them if we have a (refresh) token | ||
if (!err.isAuthError || !(await this.#getAuthToken())) throw err | ||
const authTokenAfter = await this.#getAuthToken() | ||
if (!err.isAuthError || !authTokenAfter) throw err | ||
@@ -182,2 +183,3 @@ if (this.#refreshingTokenPromise == null) { | ||
try { | ||
this.uppy.log(`[CompanionClient] Refreshing expired auth token`, 'info') | ||
const response = await super.request({ path: this.refreshTokenUrl(), method: 'POST' }) | ||
@@ -184,0 +186,0 @@ await this.setAuthToken(response.uppyAuthToken) |
'use strict' | ||
// eslint-disable-next-line import/no-extraneous-dependencies | ||
import pRetry, { AbortError } from 'p-retry' | ||
import fetchWithNetworkError from '@uppy/utils/lib/fetchWithNetworkError' | ||
@@ -7,6 +10,4 @@ import ErrorWithCause from '@uppy/utils/lib/ErrorWithCause' | ||
import getSocketHost from '@uppy/utils/lib/getSocketHost' | ||
import EventManager from '@uppy/utils/lib/EventManager' | ||
import AuthError from './AuthError.js' | ||
import Socket from './Socket.js' | ||
@@ -20,10 +21,23 @@ import packageJson from '../package.json' | ||
const retryCount = 10 // set to a low number, like 2 to test manual user retries | ||
const socketActivityTimeoutMs = 5 * 60 * 1000 // set to a low number like 10000 to test this | ||
const authErrorStatusCode = 401 | ||
class HttpError extends Error { | ||
statusCode | ||
constructor({ statusCode, message }) { | ||
super(message) | ||
this.statusCode = statusCode | ||
} | ||
} | ||
async function handleJSONResponse (res) { | ||
if (res.status === 401) { | ||
if (res.status === authErrorStatusCode) { | ||
throw new AuthError() | ||
} | ||
const jsonPromise = res.json() | ||
if (res.ok) { | ||
return jsonPromise | ||
return res.json() | ||
} | ||
@@ -33,3 +47,4 @@ | ||
try { | ||
const errData = await jsonPromise | ||
const errData = await res.json() | ||
errMsg = errData.message ? `${errMsg} message: ${errData.message}` : errMsg | ||
@@ -42,3 +57,4 @@ errMsg = errData.requestId | ||
} | ||
throw new Error(errMsg) | ||
throw new HttpError({ statusCode: res.status, message: errMsg }) | ||
} | ||
@@ -54,6 +70,5 @@ | ||
constructor (uppy, opts, getQueue) { | ||
constructor (uppy, opts) { | ||
this.uppy = uppy | ||
this.opts = opts | ||
this.getQueue = getQueue | ||
this.onReceiveResponse = this.onReceiveResponse.bind(this) | ||
@@ -200,5 +215,8 @@ this.#companionHeaders = opts?.companionHeaders | ||
if (!skipPostResponse) this.onReceiveResponse(response) | ||
return handleJSONResponse(response) | ||
return await handleJSONResponse(response) | ||
} catch (err) { | ||
if (err?.isAuthError) throw err | ||
// pass these through | ||
if (err instanceof AuthError || err.name === 'AbortError') throw err | ||
throw new ErrorWithCause(`Could not ${method} ${this.#getUrl(path)}`, { | ||
@@ -231,24 +249,65 @@ cause: err, | ||
/** | ||
* Remote uploading consists of two steps: | ||
* 1. #requestSocketToken which starts the download/upload in companion and returns a unique token for the upload. | ||
* Then companion will halt the upload until: | ||
* 2. #awaitRemoteFileUpload is called, which will open/ensure a websocket connection towards companion, with the | ||
* previously generated token provided. It returns a promise that will resolve/reject once the file has finished | ||
* uploading or is otherwise done (failed, canceled) | ||
* | ||
* @param {*} file | ||
* @param {*} reqBody | ||
* @param {*} options | ||
* @returns | ||
*/ | ||
async uploadRemoteFile (file, reqBody, options = {}) { | ||
try { | ||
if (file.serverToken) { | ||
return await this.connectToServerSocket(file, this.getQueue()) | ||
} | ||
const queueRequestSocketToken = this.getQueue().wrapPromiseFunction( | ||
this.#requestSocketToken, | ||
{ priority: -1 }, | ||
) | ||
const serverToken = await queueRequestSocketToken(file, reqBody).abortOn( | ||
options.signal, | ||
) | ||
const { signal, getQueue } = options | ||
if (!this.uppy.getState().files[file.id]) return undefined | ||
return await pRetry(async () => { | ||
// if we already have a serverToken, assume that we are resuming the existing server upload id | ||
const existingServerToken = this.uppy.getFile(file.id)?.serverToken; | ||
if (existingServerToken != null) { | ||
this.uppy.log(`Connecting to exiting websocket ${existingServerToken}`) | ||
return this.#awaitRemoteFileUpload({ file, queue: getQueue(), signal }) | ||
} | ||
this.uppy.setFileState(file.id, { serverToken }) | ||
return await this.connectToServerSocket( | ||
this.uppy.getFile(file.id), | ||
this.getQueue(), | ||
) | ||
const queueRequestSocketToken = getQueue().wrapPromiseFunction(async (...args) => { | ||
try { | ||
return await this.#requestSocketToken(...args) | ||
} catch (outerErr) { | ||
// throwing AbortError will cause p-retry to stop retrying | ||
if (outerErr instanceof AuthError) throw new AbortError(outerErr) | ||
if (outerErr.cause == null) throw outerErr | ||
const err = outerErr.cause | ||
const isRetryableHttpError = () => ( | ||
[408, 409, 429, 418, 423].includes(err.statusCode) | ||
|| (err.statusCode >= 500 && err.statusCode <= 599 && ![501, 505].includes(err.statusCode)) | ||
) | ||
if (err instanceof HttpError && !isRetryableHttpError()) throw new AbortError(err); | ||
// p-retry will retry most other errors, | ||
// but it will not retry TypeError (except network error TypeErrors) | ||
throw err | ||
} | ||
}, { priority: -1 }) | ||
const serverToken = await queueRequestSocketToken({ file, postBody: reqBody, signal }).abortOn(signal) | ||
if (!this.uppy.getFile(file.id)) return undefined // has file since been removed? | ||
this.uppy.setFileState(file.id, { serverToken }) | ||
return this.#awaitRemoteFileUpload({ | ||
file: this.uppy.getFile(file.id), // re-fetching file because it might have changed in the meantime | ||
queue: getQueue(), | ||
signal | ||
}) | ||
}, { retries: retryCount, signal, onFailedAttempt: (err) => this.uppy.log(`Retrying upload due to: ${err.message}`, 'warning') }); | ||
} catch (err) { | ||
if (err?.cause?.name === 'AbortError') { | ||
// this is a bit confusing, but note that an error with the `name` prop set to 'AbortError' (from AbortController) | ||
// is not the same as `p-retry` `AbortError` | ||
if (err.name === 'AbortError') { | ||
// The file upload was aborted, it’s not an error | ||
@@ -258,3 +317,2 @@ return undefined | ||
this.uppy.setFileState(file.id, { serverToken: undefined }) | ||
this.uppy.emit('upload-error', file, err) | ||
@@ -265,3 +323,3 @@ throw err | ||
#requestSocketToken = async (file, postBody) => { | ||
#requestSocketToken = async ({ file, postBody, signal }) => { | ||
if (file.remote.url == null) { | ||
@@ -274,3 +332,3 @@ throw new Error('Cannot connect to an undefined URL') | ||
...postBody, | ||
}) | ||
}, signal) | ||
@@ -281,131 +339,207 @@ return res.token | ||
/** | ||
* @param {UppyFile} file | ||
* This method will ensure a websocket for the specified file and returns a promise that resolves | ||
* when the file has finished downloading, or rejects if it fails. | ||
* It will retry if the websocket gets disconnected | ||
* | ||
* @param {{ file: UppyFile, queue: RateLimitedQueue, signal: AbortSignal }} file | ||
*/ | ||
async connectToServerSocket (file, queue) { | ||
return new Promise((resolve, reject) => { | ||
const token = file.serverToken | ||
const host = getSocketHost(file.remote.companionUrl) | ||
const socket = new Socket({ | ||
target: `${host}/api/${token}`, | ||
autoOpen: false, | ||
}) | ||
const eventManager = new EventManager(this.uppy) | ||
async #awaitRemoteFileUpload ({ file, queue, signal }) { | ||
let removeEventHandlers | ||
let queuedRequest | ||
const { capabilities } = this.uppy.getState() | ||
eventManager.onFileRemove(file.id, () => { | ||
socket.send('cancel', {}) | ||
queuedRequest.abort() | ||
resolve(`upload ${file.id} was removed`) | ||
}) | ||
try { | ||
return await new Promise((resolve, reject) => { | ||
const token = file.serverToken | ||
const host = getSocketHost(file.remote.companionUrl) | ||
eventManager.onPause(file.id, (isPaused) => { | ||
if (isPaused) { | ||
// Remove this file from the queue so another file can start in its place. | ||
socket.send('pause', {}) | ||
queuedRequest.abort() | ||
} else { | ||
// Resuming an upload should be queued, else you could pause and then | ||
// resume a queued upload to make it skip the queue. | ||
queuedRequest.abort() | ||
queuedRequest = queue.run(() => { | ||
socket.open() | ||
socket.send('resume', {}) | ||
/** @type {WebSocket} */ | ||
let socket | ||
/** @type {AbortController?} */ | ||
let socketAbortController | ||
let activityTimeout | ||
return () => {} | ||
}) | ||
} | ||
}) | ||
let { isPaused } = file | ||
eventManager.onPauseAll(file.id, () => { | ||
socket.send('pause', {}) | ||
queuedRequest.abort() | ||
}) | ||
const socketSend = (action, payload) => { | ||
if (socket == null || socket.readyState !== socket.OPEN) { | ||
this.uppy.log(`Cannot send "${action}" to socket ${file.id} because the socket state was ${String(socket?.readyState)}`, 'warning') | ||
return | ||
} | ||
eventManager.onCancelAll(file.id, ({ reason } = {}) => { | ||
if (reason === 'user') { | ||
socket.send('cancel', {}) | ||
queuedRequest.abort() | ||
} | ||
resolve(`upload ${file.id} was canceled`) | ||
}) | ||
socket.send(JSON.stringify({ | ||
action, | ||
payload: payload ?? {}, | ||
})) | ||
}; | ||
eventManager.onResumeAll(file.id, () => { | ||
queuedRequest.abort() | ||
if (file.error) { | ||
socket.send('pause', {}) | ||
function sendState() { | ||
if (!capabilities.resumableUploads) return; | ||
if (isPaused) socketSend('pause') | ||
else socketSend('resume') | ||
} | ||
queuedRequest = queue.run(() => { | ||
socket.open() | ||
socket.send('resume', {}) | ||
return () => {} | ||
}) | ||
}) | ||
const createWebsocket = async () => { | ||
if (socketAbortController) socketAbortController.abort() | ||
socketAbortController = new AbortController() | ||
eventManager.onRetry(file.id, () => { | ||
// Only do the retry if the upload is actually in progress; | ||
// else we could try to send these messages when the upload is still queued. | ||
// We may need a better check for this since the socket may also be closed | ||
// for other reasons, like network failures. | ||
if (socket.isOpen) { | ||
socket.send('pause', {}) | ||
socket.send('resume', {}) | ||
} | ||
}) | ||
const onFatalError = (err) => { | ||
// Remove the serverToken so that a new one will be created for the retry. | ||
this.uppy.setFileState(file.id, { serverToken: null }) | ||
socketAbortController?.abort?.() | ||
reject(err) | ||
} | ||
// todo instead implement the ability for users to cancel / retry *currently uploading files* in the UI | ||
function resetActivityTimeout() { | ||
clearTimeout(activityTimeout) | ||
if (isPaused) return | ||
activityTimeout = setTimeout(() => onFatalError(new Error('Timeout waiting for message from Companion socket')), socketActivityTimeoutMs) | ||
} | ||
eventManager.onRetryAll(file.id, () => { | ||
// See the comment in the onRetry() call | ||
if (socket.isOpen) { | ||
socket.send('pause', {}) | ||
socket.send('resume', {}) | ||
try { | ||
await queue.wrapPromiseFunction(async () => { | ||
// eslint-disable-next-line promise/param-names | ||
const reconnectWebsocket = async () => new Promise((resolveSocket, rejectSocket) => { | ||
socket = new WebSocket(`${host}/api/${token}`) | ||
resetActivityTimeout() | ||
socket.addEventListener('close', () => { | ||
socket = undefined | ||
rejectSocket(new Error('Socket closed unexpectedly')) | ||
}) | ||
socket.addEventListener('error', (error) => { | ||
this.uppy.log(`Companion socket error ${JSON.stringify(error)}, closing socket`, 'warning') | ||
socket.close() // will 'close' event to be emitted | ||
}) | ||
socket.addEventListener('open', () => { | ||
sendState() | ||
}) | ||
socket.addEventListener('message', (e) => { | ||
resetActivityTimeout() | ||
try { | ||
const { action, payload } = JSON.parse(e.data) | ||
switch (action) { | ||
case 'progress': { | ||
emitSocketProgress(this, payload, file) | ||
break; | ||
} | ||
case 'success': { | ||
this.uppy.emit('upload-success', file, { uploadURL: payload.url }) | ||
socketAbortController?.abort?.() | ||
resolve() | ||
break; | ||
} | ||
case 'error': { | ||
const { message } = payload.error | ||
throw Object.assign(new Error(message), { cause: payload.error }) | ||
} | ||
default: | ||
this.uppy.log(`Companion socket unknown action ${action}`, 'warning') | ||
} | ||
} catch (err) { | ||
onFatalError(err) | ||
} | ||
}) | ||
const closeSocket = () => { | ||
this.uppy.log(`Closing socket ${file.id}`, 'info') | ||
clearTimeout(activityTimeout) | ||
if (socket) socket.close() | ||
socket = undefined | ||
} | ||
socketAbortController.signal.addEventListener('abort', () => { | ||
closeSocket() | ||
}) | ||
}) | ||
await pRetry(reconnectWebsocket, { | ||
retries: retryCount, | ||
signal: socketAbortController.signal, | ||
onFailedAttempt: () => { | ||
if (socketAbortController.signal.aborted) return // don't log in this case | ||
this.uppy.log(`Retrying websocket ${file.id}`, 'info') | ||
}, | ||
}); | ||
})().abortOn(socketAbortController.signal); | ||
} catch (err) { | ||
if (socketAbortController.signal.aborted) return | ||
onFatalError(err) | ||
} | ||
} | ||
}) | ||
socket.on('progress', (progressData) => emitSocketProgress(this, progressData, file)) | ||
const pause = (newPausedState) => { | ||
if (!capabilities.resumableUploads) return; | ||
socket.on('error', (errData) => { | ||
const { message } = errData.error | ||
const error = Object.assign(new Error(message), { | ||
cause: errData.error, | ||
}) | ||
isPaused = newPausedState | ||
if (socket) sendState() | ||
// If the remote retry optimisation should not be used, | ||
// close the socket—this will tell companion to clear state and delete the file. | ||
if (!this.opts.useFastRemoteRetry) { | ||
// Remove the serverToken so that a new one will be created for the retry. | ||
this.uppy.setFileState(file.id, { | ||
serverToken: null, | ||
}) | ||
} else { | ||
socket.close() | ||
if (newPausedState) { | ||
// Remove this file from the queue so another file can start in its place. | ||
socketAbortController?.abort?.() // close socket to free up the request for other uploads | ||
} else { | ||
// Resuming an upload should be queued, else you could pause and then | ||
// resume a queued upload to make it skip the queue. | ||
createWebsocket() | ||
} | ||
} | ||
this.uppy.emit('upload-error', file, error) | ||
queuedRequest.done() | ||
reject(error) | ||
}) | ||
const onFileRemove = (targetFile) => { | ||
if (!capabilities.individualCancellation) return | ||
if (targetFile.id !== file.id) return | ||
socketSend('cancel') | ||
socketAbortController?.abort?.() | ||
this.uppy.log(`upload ${file.id} was removed`, 'info') | ||
resolve() | ||
} | ||
socket.on('success', (data) => { | ||
const uploadResp = { | ||
uploadURL: data.url, | ||
const onCancelAll = ({ reason }) => { | ||
if (reason === 'user') { | ||
socketSend('cancel') | ||
} | ||
socketAbortController?.abort?.() | ||
this.uppy.log(`upload ${file.id} was canceled`, 'info') | ||
resolve() | ||
}; | ||
const onFilePausedChange = (targetFileId, newPausedState) => { | ||
if (targetFileId !== file.id) return | ||
pause(newPausedState) | ||
} | ||
this.uppy.emit('upload-success', file, uploadResp) | ||
queuedRequest.done() | ||
socket.close() | ||
resolve() | ||
}) | ||
const onPauseAll = () => pause(true) | ||
const onResumeAll = () => pause(false) | ||
queuedRequest = queue.run(() => { | ||
if (file.isPaused) { | ||
socket.send('pause', {}) | ||
} else { | ||
socket.open() | ||
this.uppy.on('file-removed', onFileRemove) | ||
this.uppy.on('cancel-all', onCancelAll) | ||
this.uppy.on('upload-pause', onFilePausedChange) | ||
this.uppy.on('pause-all', onPauseAll) | ||
this.uppy.on('resume-all', onResumeAll) | ||
removeEventHandlers = () => { | ||
this.uppy.off('file-removed', onFileRemove) | ||
this.uppy.off('cancel-all', onCancelAll) | ||
this.uppy.off('upload-pause', onFilePausedChange) | ||
this.uppy.off('pause-all', onPauseAll) | ||
this.uppy.off('resume-all', onResumeAll) | ||
} | ||
return () => {} | ||
signal.addEventListener('abort', () => { | ||
socketAbortController?.abort(); | ||
}) | ||
createWebsocket() | ||
}) | ||
}) | ||
} finally { | ||
removeEventHandlers?.() | ||
} | ||
} | ||
} |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
134172
3
25
1683
+ Addedp-retry@^6.1.0
+ Added@types/retry@0.12.2(transitive)
+ Addedis-network-error@1.1.0(transitive)
+ Addedp-retry@6.2.0(transitive)
+ Addedretry@0.13.1(transitive)
Updated@uppy/utils@^5.6.0