@bangle.dev/collab-server
Advanced tools
Comparing version 0.15.0 to 0.16.0
@@ -5,6 +5,7 @@ export class CollabError extends Error { | ||
body: any; | ||
constructor(errorCode: number, body: any) { | ||
super(body); | ||
constructor(errorCode: number, message: any) { | ||
super(message); | ||
this.errorCode = errorCode; | ||
this.body = body; | ||
this.body = message; | ||
// Error.captureStackTrace is a v8-specific method so not avilable on | ||
@@ -11,0 +12,0 @@ // Firefox or Safari |
@@ -5,4 +5,4 @@ export declare class CollabError extends Error { | ||
body: any; | ||
constructor(errorCode: number, body: any); | ||
constructor(errorCode: number, message: any); | ||
} | ||
//# sourceMappingURL=collab-error.d.ts.map |
export class CollabError extends Error { | ||
constructor(errorCode, body) { | ||
super(body); | ||
constructor(errorCode, message) { | ||
super(message); | ||
Object.defineProperty(this, "from", { | ||
@@ -23,3 +23,3 @@ enumerable: true, | ||
this.errorCode = errorCode; | ||
this.body = body; | ||
this.body = message; | ||
if (Error.captureStackTrace) { | ||
@@ -26,0 +26,0 @@ Error.captureStackTrace(this, CollabError); |
@@ -0,1 +1,2 @@ | ||
import type { PullEventResponse, PullEvents, GetDocument, PushEvents } from './types'; | ||
export * from './instance'; | ||
@@ -5,2 +6,4 @@ export * from './manager'; | ||
export * from './utils'; | ||
export * from './parse-collab-response'; | ||
export type { PullEventResponse, PullEvents, GetDocument, PushEvents }; | ||
//# sourceMappingURL=index.d.ts.map |
@@ -5,1 +5,2 @@ export * from './instance'; | ||
export * from './utils'; | ||
export * from './parse-collab-response'; |
@@ -27,3 +27,3 @@ import { Step } from 'prosemirror-transform'; | ||
stop(): void; | ||
addEvents(version: number, steps: StepBigger[], clientID: string): false | { | ||
addEvents(version: number, steps: Step[], clientID: string): false | { | ||
version: number; | ||
@@ -30,0 +30,0 @@ }; |
@@ -107,2 +107,3 @@ import { CollabError } from './collab-error'; | ||
this.checkVersion(version); | ||
const biggerSteps = steps.map((s) => Object.assign(s, { clientID })); | ||
if (this.version !== version) { | ||
@@ -112,4 +113,3 @@ return false; | ||
let doc = this.doc, maps = []; | ||
for (const step of steps) { | ||
step.clientID = clientID; | ||
for (const step of biggerSteps) { | ||
let result = step.apply(doc); | ||
@@ -123,4 +123,4 @@ if (result.doc == null) { | ||
this.doc = doc; | ||
this.version += steps.length; | ||
this.steps = this.steps.concat(steps); | ||
this.version += biggerSteps.length; | ||
this.steps = this.steps.concat(biggerSteps); | ||
if (this.steps.length > MAX_STEP_HISTORY) { | ||
@@ -127,0 +127,0 @@ this.steps = this.steps.slice(this.steps.length - MAX_STEP_HISTORY); |
import { Instance } from './instance'; | ||
import { Schema, Node } from 'prosemirror-model'; | ||
import { CollabRequestHandler } from './collab-request-handler'; | ||
import { CollabResponse, CollabRequestType } from './types'; | ||
declare type HandleResponseOk = { | ||
status: 'ok'; | ||
body: CollabResponse; | ||
}; | ||
declare type HandleResponseError = { | ||
status: 'error'; | ||
body: { | ||
message: string; | ||
errorCode: number; | ||
}; | ||
}; | ||
export declare class Manager { | ||
@@ -13,13 +26,3 @@ private schema; | ||
}; | ||
routes: { | ||
[k: string]: (...args: { | ||
clientID: string; | ||
steps: any[]; | ||
docName: string; | ||
version: number; | ||
userId: string; | ||
}[]) => Promise<{ | ||
body: OutputData; | ||
}>; | ||
}; | ||
routes: CollabRequestHandler; | ||
disk: { | ||
@@ -44,21 +47,10 @@ load: (_docName: string) => Promise<any>; | ||
}); | ||
_stopInstance(docName: string): void; | ||
_cleanup(): void; | ||
handleRequest(path: CollabRequestType, payload: any): Promise<HandleResponseError | HandleResponseOk>; | ||
private _stopInstance; | ||
private _cleanup; | ||
destroy(): void; | ||
_newInstance(docName: string, doc?: Node): Promise<Instance>; | ||
handleRequest(path: string, payload: any): Promise<any>; | ||
_getInstanceQueued(docName: string, userId: string): Promise<Instance>; | ||
private _newInstance; | ||
private _getInstanceQueued; | ||
} | ||
interface OutputData { | ||
doc?: { | ||
[key: string]: any; | ||
}; | ||
users?: number; | ||
version?: number; | ||
steps?: Array<{ | ||
[key: string]: any; | ||
}>; | ||
clientIDs?: string[]; | ||
} | ||
export {}; | ||
//# sourceMappingURL=manager.d.ts.map |
@@ -1,4 +0,4 @@ | ||
import { Step } from 'prosemirror-transform'; | ||
import { objectMapValues, serialExecuteQueue, raceTimeout } from './utils'; | ||
import { serialExecuteQueue } from './utils'; | ||
import { Instance } from './instance'; | ||
import { CollabRequestHandler } from './collab-request-handler'; | ||
import { CollabError } from './collab-error'; | ||
@@ -77,3 +77,3 @@ const LOG = false; | ||
this.interceptRequests = interceptRequests; | ||
this.routes = generateRoutes(schema, this._getInstanceQueued, userWaitTimeout); | ||
this.routes = new CollabRequestHandler(this._getInstanceQueued, userWaitTimeout, schema); | ||
if (instanceCleanupTimeout > 0) { | ||
@@ -83,2 +83,51 @@ this.cleanUpInterval = setInterval(() => this._cleanup(), instanceCleanupTimeout); | ||
} | ||
async handleRequest(path, payload) { | ||
if (!payload.userId) { | ||
throw new Error('Must have user id'); | ||
} | ||
log(`request to ${path} from `, payload.userId); | ||
let data; | ||
try { | ||
if (this.interceptRequests) { | ||
await this.interceptRequests(path, payload); | ||
} | ||
switch (path) { | ||
case 'pull_events': { | ||
data = await this.routes.pullEvents(payload); | ||
break; | ||
} | ||
case 'push_events': { | ||
data = await this.routes.pushEvents(payload); | ||
break; | ||
} | ||
case 'get_document': { | ||
data = await this.routes.getDocument(payload); | ||
break; | ||
} | ||
} | ||
return { | ||
status: 'ok', | ||
body: data, | ||
}; | ||
} | ||
catch (err) { | ||
if (err instanceof CollabError) { | ||
return { | ||
status: 'error', | ||
body: { | ||
errorCode: err.errorCode, | ||
message: err.message, | ||
}, | ||
}; | ||
} | ||
console.error(err); | ||
return { | ||
status: 'error', | ||
body: { | ||
errorCode: 500, | ||
message: err.message || 'Unknown error occurred', | ||
}, | ||
}; | ||
} | ||
} | ||
_stopInstance(docName) { | ||
@@ -143,16 +192,2 @@ const instance = this.instances[docName]; | ||
} | ||
async handleRequest(path, payload) { | ||
if (!this.routes[path]) { | ||
throw new Error('Path not found'); | ||
} | ||
if (!payload.userId) { | ||
throw new Error('Must have user id'); | ||
} | ||
if (this.interceptRequests) { | ||
await this.interceptRequests(path, payload); | ||
} | ||
log(`request to ${path} from `, payload.userId); | ||
const route = this.routes[path]; | ||
return route(payload); | ||
} | ||
async _getInstanceQueued(docName, userId) { | ||
@@ -172,135 +207,1 @@ if (!userId) { | ||
} | ||
function nonNegInteger(str) { | ||
let num = Number(str); | ||
if (!isNaN(num) && Math.floor(num) === num && num >= 0) { | ||
return num; | ||
} | ||
throw new CollabError(400, 'Not a non-negative integer: ' + str); | ||
} | ||
class Output { | ||
constructor(body) { | ||
Object.defineProperty(this, "body", { | ||
enumerable: true, | ||
configurable: true, | ||
writable: true, | ||
value: body | ||
}); | ||
Object.defineProperty(this, "responded", { | ||
enumerable: true, | ||
configurable: true, | ||
writable: true, | ||
value: false | ||
}); | ||
} | ||
static outputEvents(inst, data) { | ||
return Output.json({ | ||
version: inst.version, | ||
steps: data.steps.map((step) => step.toJSON()), | ||
clientIDs: data.steps.map((step) => step.clientID), | ||
users: data.users, | ||
}); | ||
} | ||
static json(data) { | ||
return new Output(data); | ||
} | ||
resp() { | ||
if (this.responded) { | ||
throw new Error('already responded'); | ||
} | ||
this.responded = true; | ||
return { body: this.body }; | ||
} | ||
} | ||
function generateRoutes(schema, getInstance, userWaitTimeout) { | ||
const routes = { | ||
get_document: async ({ docName, userId, }) => { | ||
log('get_document', { docName, userId }); | ||
let inst = await getInstance(docName, userId); | ||
if (!inst) { | ||
throw new Error('Instance not found'); | ||
} | ||
return Output.json({ | ||
doc: inst.doc.toJSON(), | ||
users: inst.userCount, | ||
version: inst.version, | ||
}); | ||
}, | ||
get_events: async ({ docName, version, userId, }) => { | ||
version = nonNegInteger(version); | ||
let instance = await getInstance(docName, userId); | ||
if (instance == null) { | ||
throw new Error('Instance not found'); | ||
} | ||
let data = instance.getEvents(version); | ||
if (data === false) { | ||
throw new CollabError(410, 'History no longer available'); | ||
} | ||
if (data.steps.length) { | ||
return Output.outputEvents(instance, data); | ||
} | ||
let abort; | ||
const waitForChanges = new Promise((res) => { | ||
const inst = instance; | ||
if (inst == null) { | ||
res(); | ||
return; | ||
} | ||
let waiter = { | ||
userId, | ||
onFinish: () => { | ||
res(); | ||
}, | ||
}; | ||
inst.waiting.push(waiter); | ||
abort = () => { | ||
let found = inst.waiting.indexOf(waiter); | ||
log('in abort waiting =', inst.waiting.length); | ||
if (found > -1) { | ||
inst.waiting.splice(found, 1); | ||
} | ||
abort = null; | ||
}; | ||
}); | ||
try { | ||
await raceTimeout(waitForChanges, userWaitTimeout); | ||
log('finished'); | ||
return Output.outputEvents(instance, instance.getEvents(version)); | ||
} | ||
catch (err) { | ||
if (err.timeout === true) { | ||
log('timeout aborting'); | ||
if (abort) { | ||
abort(); | ||
} | ||
return Output.json({}); | ||
} | ||
throw err; | ||
} | ||
}, | ||
push_events: async ({ clientID, version, steps, docName, userId, }) => { | ||
version = nonNegInteger(version); | ||
steps = steps.map((s) => Step.fromJSON(schema, s)); | ||
const instance = await getInstance(docName, userId); | ||
if (!instance) { | ||
throw new Error('Instance not found'); | ||
} | ||
log('received version =', version, 'server version', instance.version); | ||
let result = instance.addEvents(version, steps, clientID); | ||
if (!result) { | ||
throw new CollabError(409, `Version ${version} not current. Currently on ${instance.version}`); | ||
} | ||
else { | ||
return Output.json(result); | ||
} | ||
}, | ||
}; | ||
function mapRoutes(routes) { | ||
return objectMapValues(routes, (route) => { | ||
return async (...args) => { | ||
let result = await route(...args); | ||
return result.resp(); | ||
}; | ||
}); | ||
} | ||
return mapRoutes(routes); | ||
} |
@@ -0,1 +1,8 @@ | ||
import type { | ||
PullEventResponse, | ||
PullEvents, | ||
GetDocument, | ||
PushEvents, | ||
} from './types'; | ||
export * from './instance'; | ||
@@ -5,1 +12,3 @@ export * from './manager'; | ||
export * from './utils'; | ||
export * from './parse-collab-response'; | ||
export type { PullEventResponse, PullEvents, GetDocument, PushEvents }; |
@@ -52,6 +52,10 @@ import { Step, StepMap } from 'prosemirror-transform'; | ||
addEvents(version: number, steps: StepBigger[], clientID: string) { | ||
addEvents(version: number, steps: Step[], clientID: string) { | ||
// TODO this checkversion is not covered | ||
this.checkVersion(version); | ||
const biggerSteps: StepBigger[] = steps.map((s) => | ||
Object.assign(s, { clientID }), | ||
); | ||
if (this.version !== version) { | ||
@@ -65,4 +69,3 @@ // TODO returning false gives 409 but if we donot give 409 error | ||
for (const step of steps) { | ||
step.clientID = clientID; | ||
for (const step of biggerSteps) { | ||
let result = step.apply(doc); | ||
@@ -78,4 +81,4 @@ if (result.doc == null) { | ||
this.doc = doc; | ||
this.version += steps.length; | ||
this.steps = this.steps.concat(steps); | ||
this.version += biggerSteps.length; | ||
this.steps = this.steps.concat(biggerSteps); | ||
if (this.steps.length > MAX_STEP_HISTORY) { | ||
@@ -82,0 +85,0 @@ this.steps = this.steps.slice(this.steps.length - MAX_STEP_HISTORY); |
316
manager.ts
@@ -1,6 +0,7 @@ | ||
import { Step } from 'prosemirror-transform'; | ||
import { objectMapValues, serialExecuteQueue, raceTimeout } from './utils'; | ||
import { Instance, StepBigger } from './instance'; | ||
import { serialExecuteQueue } from './utils'; | ||
import { Instance } from './instance'; | ||
import { Schema, Node } from 'prosemirror-model'; | ||
import { CollabRequestHandler } from './collab-request-handler'; | ||
import { CollabResponse, CollabRequestType } from './types'; | ||
import { CollabError } from './collab-error'; | ||
import { Schema, Node } from 'prosemirror-model'; | ||
@@ -11,2 +12,14 @@ const LOG = false; | ||
type HandleResponseOk = { | ||
status: 'ok'; | ||
body: CollabResponse; | ||
}; | ||
type HandleResponseError = { | ||
status: 'error'; | ||
body: { | ||
message: string; | ||
errorCode: number; | ||
}; | ||
}; | ||
export class Manager { | ||
@@ -44,6 +57,6 @@ instanceCount = 0; | ||
// for example two requests parallely comming and creating two new instances of the same doc | ||
this.routes = generateRoutes( | ||
schema, | ||
this.routes = new CollabRequestHandler( | ||
this._getInstanceQueued, | ||
userWaitTimeout, | ||
schema, | ||
); | ||
@@ -59,3 +72,58 @@ | ||
_stopInstance(docName: string) { | ||
public async handleRequest( | ||
path: CollabRequestType, | ||
payload: any, | ||
): Promise<HandleResponseError | HandleResponseOk> { | ||
if (!payload.userId) { | ||
throw new Error('Must have user id'); | ||
} | ||
log(`request to ${path} from `, payload.userId); | ||
let data; | ||
try { | ||
if (this.interceptRequests) { | ||
await this.interceptRequests(path, payload); | ||
} | ||
switch (path) { | ||
case 'pull_events': { | ||
data = await this.routes.pullEvents(payload); | ||
break; | ||
} | ||
case 'push_events': { | ||
data = await this.routes.pushEvents(payload); | ||
break; | ||
} | ||
case 'get_document': { | ||
data = await this.routes.getDocument(payload); | ||
break; | ||
} | ||
} | ||
return { | ||
status: 'ok', | ||
body: data, | ||
}; | ||
} catch (err) { | ||
if (err instanceof CollabError) { | ||
return { | ||
status: 'error', | ||
body: { | ||
errorCode: err.errorCode, | ||
message: err.message, | ||
}, | ||
}; | ||
} | ||
console.error(err); | ||
return { | ||
status: 'error', | ||
body: { | ||
errorCode: 500, | ||
message: err.message || 'Unknown error occurred', | ||
}, | ||
}; | ||
} | ||
} | ||
private _stopInstance(docName: string) { | ||
const instance = this.instances[docName]; | ||
@@ -70,3 +138,3 @@ if (instance) { | ||
_cleanup() { | ||
private _cleanup() { | ||
log('Cleaning up'); | ||
@@ -81,5 +149,5 @@ const instances = Object.values(this.instances); | ||
destroy() { | ||
public destroy() { | ||
log('destroy called'); | ||
// todo need to abort `get_events` pending requests | ||
// todo need to abort `pull_events` pending requests | ||
for (const i of Object.values(this.instances)) { | ||
@@ -94,3 +162,3 @@ this._stopInstance(i.docName); | ||
async _newInstance(docName: string, doc?: Node) { | ||
private async _newInstance(docName: string, doc?: Node) { | ||
log('creating new instance', docName); | ||
@@ -137,21 +205,3 @@ const { instances } = this; | ||
async handleRequest(path: string, payload: any) { | ||
if (!this.routes[path]) { | ||
throw new Error('Path not found'); | ||
} | ||
if (!payload.userId) { | ||
throw new Error('Must have user id'); | ||
} | ||
if (this.interceptRequests) { | ||
await this.interceptRequests(path, payload); | ||
} | ||
log(`request to ${path} from `, payload.userId); | ||
const route: any = this.routes[path]; | ||
return route(payload); | ||
} | ||
async _getInstanceQueued(docName: string, userId: string) { | ||
private async _getInstanceQueued(docName: string, userId: string) { | ||
if (!userId) { | ||
@@ -170,207 +220,1 @@ throw new Error('userId is required'); | ||
} | ||
function nonNegInteger(str: any) { | ||
let num = Number(str); | ||
if (!isNaN(num) && Math.floor(num) === num && num >= 0) { | ||
return num; | ||
} | ||
throw new CollabError(400, 'Not a non-negative integer: ' + str); | ||
} | ||
// Object that represents an HTTP response. | ||
class Output { | ||
responded: boolean = false; | ||
constructor(public body: OutputData) {} | ||
static outputEvents(inst: Instance, data: any) { | ||
return Output.json({ | ||
version: inst.version, | ||
steps: data.steps.map((step: StepBigger) => step.toJSON()), | ||
clientIDs: data.steps.map((step: StepBigger) => step.clientID), | ||
users: data.users, | ||
}); | ||
} | ||
static json(data: OutputData) { | ||
return new Output(data); | ||
} | ||
resp() { | ||
if (this.responded) { | ||
throw new Error('already responded'); | ||
} | ||
this.responded = true; | ||
return { body: this.body }; | ||
} | ||
} | ||
interface OutputData { | ||
doc?: { [key: string]: any }; | ||
// TODO users cannot be a number lol | ||
users?: number; | ||
version?: number; | ||
steps?: Array<{ [key: string]: any }>; | ||
clientIDs?: string[]; | ||
} | ||
function generateRoutes( | ||
schema: Schema, | ||
getInstance: ( | ||
docName: string, | ||
userId: string, | ||
) => Promise<Instance | undefined>, | ||
userWaitTimeout: number, | ||
) { | ||
const routes = { | ||
get_document: async ({ | ||
docName, | ||
userId, | ||
}: { | ||
docName: string; | ||
userId: string; | ||
}) => { | ||
log('get_document', { docName, userId }); | ||
let inst = await getInstance(docName, userId); | ||
// TODO better propogating of these errors | ||
if (!inst) { | ||
throw new Error('Instance not found'); | ||
} | ||
return Output.json({ | ||
doc: inst.doc.toJSON(), | ||
users: inst.userCount, | ||
version: inst.version, | ||
}); | ||
}, | ||
get_events: async ({ | ||
docName, | ||
version, | ||
userId, | ||
}: { | ||
docName: string; | ||
version: number; | ||
userId: string; | ||
}) => { | ||
// An endpoint for a collaborative document instance which | ||
// returns all events between a given version and the server's | ||
// current version of the document. | ||
version = nonNegInteger(version); | ||
let instance = await getInstance(docName, userId); | ||
if (instance == null) { | ||
throw new Error('Instance not found'); | ||
} | ||
let data = instance.getEvents(version); | ||
if (data === false) { | ||
throw new CollabError(410, 'History no longer available'); | ||
} | ||
// If the server version is greater than the given version, | ||
// return the data immediately. | ||
if (data.steps.length) { | ||
return Output.outputEvents(instance, data); | ||
} | ||
// If the server version matches the given version, | ||
// wait until a new version is published to return the event data. | ||
// TODO we need to expose this abort in case the client themself | ||
// decide to close the get_events request. | ||
let abort; | ||
const waitForChanges = new Promise<void>((res) => { | ||
const inst = instance; | ||
if (inst == null) { | ||
res(); | ||
return; | ||
} | ||
// An object to assist in waiting for a collaborative editing | ||
// instance to publish a new version before sending the version | ||
// event data to the client. | ||
let waiter = { | ||
userId, | ||
onFinish: () => { | ||
res(); | ||
}, | ||
}; | ||
inst.waiting.push(waiter); | ||
abort = () => { | ||
// note instance.js removes item from the waiting array | ||
// before calling onFinish | ||
let found = inst.waiting.indexOf(waiter); | ||
log('in abort waiting =', inst.waiting.length); | ||
if (found > -1) { | ||
inst.waiting.splice(found, 1); | ||
} | ||
abort = null; | ||
}; | ||
}); | ||
try { | ||
await raceTimeout(waitForChanges, userWaitTimeout); | ||
log('finished'); | ||
return Output.outputEvents(instance, instance.getEvents(version)); | ||
} catch (err) { | ||
if (err.timeout === true) { | ||
log('timeout aborting'); | ||
if (abort) { | ||
// TODO fix this | ||
(abort as any)(); | ||
} | ||
return Output.json({}); | ||
} | ||
throw err; | ||
} | ||
}, | ||
push_events: async ({ | ||
clientID, | ||
version, | ||
steps, | ||
docName, | ||
userId, | ||
}: { | ||
clientID: string; | ||
steps: any[]; | ||
docName: string; | ||
version: number; | ||
userId: string; | ||
}) => { | ||
version = nonNegInteger(version); | ||
steps = steps.map((s) => Step.fromJSON(schema, s)); | ||
const instance = await getInstance(docName, userId); | ||
if (!instance) { | ||
throw new Error('Instance not found'); | ||
} | ||
log('received version =', version, 'server version', instance.version); | ||
let result = instance.addEvents(version, steps, clientID); | ||
if (!result) { | ||
throw new CollabError( | ||
409, | ||
`Version ${version} not current. Currently on ${instance.version}`, | ||
); | ||
} else { | ||
return Output.json(result); | ||
} | ||
}, | ||
}; | ||
function mapRoutes<T>(routes: { | ||
[key: string]: (...args: T[]) => Promise<Output>; | ||
}) { | ||
return objectMapValues(routes, (route) => { | ||
return async (...args: T[]) => { | ||
let result = await route(...args); | ||
return result.resp(); | ||
}; | ||
}); | ||
} | ||
return mapRoutes(routes); | ||
} |
{ | ||
"name": "@bangle.dev/collab-server", | ||
"version": "0.15.0", | ||
"version": "0.16.0", | ||
"homepage": "https://bangle.dev", | ||
@@ -23,3 +23,2 @@ "authors": [ | ||
"types": "dist/index.d.ts", | ||
"sideEffects": false, | ||
"scripts": { | ||
@@ -31,3 +30,3 @@ "compile-ts": "yarn g:tsc --build $INIT_CWD", | ||
"devDependencies": { | ||
"@bangle.dev/core": "0.15.0", | ||
"@bangle.dev/core": "0.16.0", | ||
"@types/jest": "^26.0.23", | ||
@@ -34,0 +33,0 @@ "@types/node": "^15.6.1", |
@@ -6,3 +6,2 @@ { | ||
"types": [], | ||
"baseUrl": ".", | ||
"outDir": "./dist", | ||
@@ -9,0 +8,0 @@ "rootDir": "." |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
179440
41
1591