@filecoin-shipyard/lotus-client-provider-browser
Advanced tools
Comparing version 0.0.3 to 0.0.4
135
index.js
class BrowserProvider { | ||
constructor (url, options = {}) { | ||
this.url = url | ||
this.httpUrl = url.replace(/^wss:/, 'https:') | ||
this.httpUrl = options.httpUrl || url.replace(/^wss:/, 'https:') | ||
this.importUrl = | ||
options.importUrl || this.httpUrl.replace(/\/rpc\//, '/rest/') + '/import' | ||
this.id = 0 | ||
this.inflight = new Map() | ||
this.cancelled = new Map() | ||
this.subscriptions = new Map() | ||
this.token = options.token | ||
if (this.token && this.token !== '') { | ||
this.url += `?token=${this.token}` | ||
if (typeof options.token === 'function') { | ||
this.tokenCallback = options.token | ||
} else { | ||
this.token = options.token | ||
if (this.token && this.token !== '') { | ||
this.url += `?token=${this.token}` | ||
} | ||
} | ||
@@ -16,10 +23,28 @@ } | ||
if (!this.connectPromise) { | ||
this.connectPromise = new Promise((resolve, reject) => { | ||
this.ws = new WebSocket(this.url) | ||
// FIXME: reject on error or timeout | ||
this.ws.onopen = function () { | ||
resolve() | ||
const getConnectPromise = () => { | ||
return new Promise((resolve, reject) => { | ||
this.ws = new WebSocket(this.url) | ||
// FIXME: reject on error or timeout | ||
this.ws.onopen = function () { | ||
resolve() | ||
} | ||
this.ws.onerror = function () { | ||
console.error('ws error') | ||
reject() | ||
} | ||
this.ws.onmessage = this.receive.bind(this) | ||
}) | ||
} | ||
if (this.tokenCallback) { | ||
const getToken = async () => { | ||
this.token = await this.tokenCallback() | ||
delete this.tokenCallback | ||
if (this.token && this.token !== '') { | ||
this.url += `?token=${this.token}` | ||
} | ||
} | ||
this.ws.onmessage = this.receive.bind(this) | ||
}) | ||
this.connectPromise = getToken().then(() => getConnectPromise()) | ||
} else { | ||
this.connectPromise = getConnectPromise() | ||
} | ||
} | ||
@@ -62,2 +87,5 @@ return this.connectPromise | ||
const promise = new Promise((resolve, reject) => { | ||
if (this.destroyed) { | ||
reject(new Error('WebSocket has already been destroyed')) | ||
} | ||
this.ws.send(JSON.stringify(jsonRpcRequest)) | ||
@@ -83,22 +111,38 @@ // FIXME: Add timeout | ||
} | ||
const promise = new Promise((resolve, reject) => { | ||
const promise = this.connect().then(() => { | ||
this.ws.send(JSON.stringify(json)) | ||
// FIXME: Add timeout | ||
this.inflight.set(json.id, (err, result) => { | ||
chanId = result | ||
this.subscriptions.set(chanId, subscriptionCb) | ||
if (err) { | ||
reject(err) | ||
} else { | ||
resolve(cancel) | ||
} | ||
return new Promise((resolve, reject) => { | ||
this.inflight.set(json.id, (err, result) => { | ||
chanId = result | ||
// console.info(`New subscription ${json.id} using channel ${chanId}`) | ||
this.subscriptions.set(chanId, subscriptionCb) | ||
if (err) { | ||
reject(err) | ||
} else { | ||
resolve() | ||
} | ||
}) | ||
}) | ||
}) | ||
return promise | ||
function cancel () { | ||
return [cancel.bind(this), promise] | ||
async function cancel () { | ||
await promise | ||
this.inflight.delete(json.id) | ||
if (chanId !== null) { | ||
this.subscriptions.delete(chanId) | ||
await new Promise(resolve => { | ||
// FIXME: Add timeout | ||
this.cancelled.set(chanId, { | ||
cancelledAt: Date.now(), | ||
closeCb: resolve | ||
}) | ||
this.sendWs({ | ||
jsonrpc: '2.0', | ||
method: 'xrpc.cancel', | ||
params: [json.id] | ||
}) | ||
}) | ||
// console.info(`Subscription ${json.id} cancelled, channel ${chanId} closed.`) | ||
} | ||
// FIXME: Send cancel message to Lotus? | ||
} | ||
@@ -118,4 +162,24 @@ } | ||
} else { | ||
console.warn('Could not find subscription for channel', chanId) | ||
const { cancelledAt } = this.cancelled.get(chanId) | ||
if (cancelledAt) { | ||
if (Date.now() - cancelledAt > 2000) { | ||
console.warn( | ||
'Received stale response for cancelled subscription on channel', | ||
chanId | ||
) | ||
} | ||
} else { | ||
console.warn('Could not find subscription for channel', chanId) | ||
} | ||
} | ||
} else if (method === 'xrpc.ch.close') { | ||
// FIXME: Check return code, errors | ||
const [chanId] = params | ||
const { closeCb } = this.cancelled.get(chanId) | ||
if (!closeCb) { | ||
console.warn(`Channel ${chanId} was closed before being cancelled`) | ||
} else { | ||
// console.info(`Channel ${chanId} was closed, calling callback`) | ||
closeCb() | ||
} | ||
} else { | ||
@@ -131,3 +195,3 @@ const cb = this.inflight.get(id) | ||
} else { | ||
console.warn(`Couldn't find subscription for ${id}`) | ||
console.warn(`Couldn't find request for ${id}`) | ||
} | ||
@@ -140,5 +204,24 @@ } | ||
close () { | ||
async import (body) { | ||
const headers = { | ||
'Content-Type': body.type, | ||
Accept: '*/*', | ||
Authorization: `Bearer ${this.token}` | ||
} | ||
const response = await fetch(this.importUrl, { | ||
method: 'PUT', | ||
headers, | ||
body | ||
}) | ||
// FIXME: Check return code, errors | ||
const result = await response.json() | ||
const { Cid: { "/": cid }} = result | ||
return cid | ||
} | ||
async destroy () { | ||
if (this.ws) { | ||
this.ws.close() | ||
this.destroyed = true | ||
} | ||
@@ -145,0 +228,0 @@ } |
{ | ||
"name": "@filecoin-shipyard/lotus-client-provider-browser", | ||
"version": "0.0.3", | ||
"version": "0.0.4", | ||
"type": "module", | ||
@@ -5,0 +5,0 @@ "scripts": { |
9252
215
2