@hocuspocus/server
Advanced tools
Comparing version 2.1.0 to 2.2.0
import Document from './Document.js'; | ||
import type { Hocuspocus } from './Hocuspocus.js'; | ||
export declare class DirectConnection { | ||
import type { DirectConnection as DirectConnectionInterface } from './types'; | ||
export declare class DirectConnection implements DirectConnectionInterface { | ||
document: Document | null; | ||
@@ -5,0 +6,0 @@ instance: Hocuspocus; |
/// <reference types="node" /> | ||
/// <reference types="node" /> | ||
import { IncomingMessage, Server as HTTPServer } from 'http'; | ||
import { Server as HTTPServer, IncomingMessage } from 'http'; | ||
import WebSocket, { AddressInfo, WebSocketServer } from 'ws'; | ||
import { Configuration, HookName, HookPayload, onListenPayload, onStoreDocumentPayload } from './types.js'; | ||
import Document from './Document.js'; | ||
import { Debugger } from './Debugger.js'; | ||
import { DirectConnection } from './DirectConnection.js'; | ||
import Document from './Document.js'; | ||
import { Configuration, ConnectionConfiguration, HookName, HookPayload, onListenPayload, onStoreDocumentPayload } from './types.js'; | ||
export declare const defaultConfiguration: { | ||
@@ -72,3 +72,3 @@ name: null; | ||
*/ | ||
handleConnection(incoming: WebSocket, request: IncomingMessage, context?: any): void; | ||
handleConnection(incoming: WebSocket, request: IncomingMessage, defaultContext?: any): void; | ||
/** | ||
@@ -89,7 +89,3 @@ * Handle update of the given document | ||
*/ | ||
private createDocument; | ||
/** | ||
* Create a new connection by the given request and document | ||
*/ | ||
private createConnection; | ||
createDocument(documentName: string, request: Partial<Pick<IncomingMessage, 'headers' | 'url'>>, socketId: string, connection: ConnectionConfiguration, context?: any): Promise<Document>; | ||
storeDocumentHooks(document: Document, hookPayload: onStoreDocumentPayload): void; | ||
@@ -101,6 +97,2 @@ /** | ||
hooks(name: HookName, payload: HookPayload, callback?: Function | null): Promise<any>; | ||
/** | ||
* Get parameters by the given request | ||
*/ | ||
private static getParameters; | ||
enableDebugging(): void; | ||
@@ -107,0 +99,0 @@ enableMessageLogging(): void; |
@@ -267,1 +267,5 @@ /// <reference types="node" /> | ||
} | ||
export interface DirectConnection { | ||
transact(transaction: (document: Document) => void): Promise<void>; | ||
disconnect(): void; | ||
} |
{ | ||
"name": "@hocuspocus/server", | ||
"description": "plug & play collaboration backend", | ||
"version": "2.1.0", | ||
"version": "2.2.0", | ||
"homepage": "https://hocuspocus.dev", | ||
@@ -32,3 +32,3 @@ "keywords": [ | ||
"dependencies": { | ||
"@hocuspocus/common": "^2.1.0", | ||
"@hocuspocus/common": "^2.2.0", | ||
"async-lock": "^1.3.1", | ||
@@ -35,0 +35,0 @@ "kleur": "^4.1.4", |
import { URLSearchParams } from 'url' | ||
import Document from './Document.js' | ||
import type { Hocuspocus } from './Hocuspocus.js' | ||
import type { DirectConnection as DirectConnectionInterface } from './types' | ||
export class DirectConnection { | ||
export class DirectConnection implements DirectConnectionInterface { | ||
document: Document | null = null | ||
@@ -7,0 +8,0 @@ |
@@ -1,35 +0,30 @@ | ||
import { createServer, IncomingMessage, Server as HTTPServer } from 'http' | ||
import { URLSearchParams } from 'url' | ||
import { Server as HTTPServer, IncomingMessage, createServer } from 'http' | ||
import { ListenOptions } from 'net' | ||
import * as decoding from 'lib0/decoding' | ||
import WebSocket, { AddressInfo, WebSocketServer } from 'ws' | ||
import { Doc, encodeStateAsUpdate, applyUpdate } from 'yjs' | ||
import { v4 as uuid } from 'uuid' | ||
import kleur from 'kleur' | ||
import { | ||
ResetConnection, | ||
Unauthorized, | ||
Forbidden, | ||
awarenessStatesToArray, | ||
WsReadyStates, | ||
ResetConnection, awarenessStatesToArray, | ||
} from '@hocuspocus/common' | ||
import meta from '../package.json' assert {type: 'json'} | ||
import { IncomingMessage as SocketIncomingMessage } from './IncomingMessage.js' | ||
import kleur from 'kleur' | ||
import { v4 as uuid } from 'uuid' | ||
import WebSocket, { AddressInfo, WebSocketServer } from 'ws' | ||
import { Doc, applyUpdate, encodeStateAsUpdate } from 'yjs' | ||
import meta from '../package.json' assert { type: 'json' } | ||
import { ClientConnection } from './ClientConnection' | ||
// TODO: would be nice to only have a dependency on ClientConnection, and not on Connection | ||
import Connection from './Connection.js' | ||
import { Debugger } from './Debugger.js' | ||
import { DirectConnection } from './DirectConnection.js' | ||
import Document from './Document.js' | ||
import { | ||
MessageType, | ||
AwarenessUpdate, | ||
Configuration, | ||
ConnectionConfiguration, | ||
HookName, | ||
AwarenessUpdate, | ||
HookPayload, | ||
beforeHandleMessagePayload, | ||
beforeBroadcastStatelessPayload, | ||
onChangePayload, | ||
onDisconnectPayload, | ||
onListenPayload, | ||
onStoreDocumentPayload, | ||
} from './types.js' | ||
import Document from './Document.js' | ||
import Connection from './Connection.js' | ||
import { OutgoingMessage } from './OutgoingMessage.js' | ||
import { Debugger } from './Debugger.js' | ||
import { DirectConnection } from './DirectConnection.js' | ||
import { getParameters } from './util/getParameters' | ||
@@ -189,43 +184,45 @@ export const defaultConfiguration = { | ||
const server = createServer((request, response) => { | ||
this.hooks('onRequest', { request, response, instance: this }) | ||
.then(() => { | ||
// default response if all prior hooks don't interfere | ||
response.writeHead(200, { 'Content-Type': 'text/plain' }) | ||
response.end('OK') | ||
}) | ||
.catch(error => { | ||
// if a hook rejects and the error is empty, do nothing | ||
// this is only meant to prevent later hooks and the | ||
// default handler to do something. if a error is present | ||
// just rethrow it | ||
if (error) { | ||
throw error | ||
} | ||
}) | ||
const server = createServer(async (request, response) => { | ||
try { | ||
await this.hooks('onRequest', { request, response, instance: this }) | ||
// default response if all prior hooks don't interfere | ||
response.writeHead(200, { 'Content-Type': 'text/plain' }) | ||
response.end('OK') | ||
} catch (error) { | ||
// if a hook rejects and the error is empty, do nothing | ||
// this is only meant to prevent later hooks and the | ||
// default handler to do something. if a error is present | ||
// just rethrow it | ||
if (error) { | ||
throw error | ||
} | ||
} | ||
}) | ||
server.on('upgrade', (request, socket, head) => { | ||
this.hooks('onUpgrade', { | ||
request, | ||
socket, | ||
head, | ||
instance: this, | ||
}) | ||
.then(() => { | ||
// let the default websocket server handle the connection if | ||
// prior hooks don't interfere | ||
webSocketServer.handleUpgrade(request, socket, head, ws => { | ||
webSocketServer.emit('connection', ws, request) | ||
}) | ||
server.on('upgrade', async (request, socket, head) => { | ||
try { | ||
await this.hooks('onUpgrade', { | ||
request, | ||
socket, | ||
head, | ||
instance: this, | ||
}) | ||
.catch(error => { | ||
// if a hook rejects and the error is empty, do nothing | ||
// this is only meant to prevent later hooks and the | ||
// default handler to do something. if a error is present | ||
// just rethrow it | ||
if (error) { | ||
throw error | ||
} | ||
// let the default websocket server handle the connection if | ||
// prior hooks don't interfere | ||
webSocketServer.handleUpgrade(request, socket, head, ws => { | ||
webSocketServer.emit('connection', ws, request) | ||
}) | ||
} catch (error) { | ||
// if a hook rejects and the error is empty, do nothing | ||
// this is only meant to prevent later hooks and the | ||
// default handler to do something. if a error is present | ||
// just rethrow it | ||
// TODO: why? | ||
if (error) { | ||
throw error | ||
} | ||
} | ||
}) | ||
@@ -240,3 +237,3 @@ | ||
host: this.configuration.address, | ||
} as ListenOptions, () => { | ||
} as ListenOptions, async () => { | ||
if (!this.configuration.quiet && process.env.NODE_ENV !== 'testing') { | ||
@@ -252,5 +249,8 @@ this.showStartScreen() | ||
this.hooks('onListen', onListenPayload) | ||
.then(() => resolve(this)) | ||
.catch(error => reject(error)) | ||
try { | ||
await this.hooks('onListen', onListenPayload) | ||
resolve(this) | ||
} catch (e) { | ||
reject(e) | ||
} | ||
}) | ||
@@ -379,210 +379,31 @@ }) | ||
*/ | ||
handleConnection(incoming: WebSocket, request: IncomingMessage, context: any = null): void { | ||
// Make sure to close an idle connection after a while. | ||
const closeIdleConnection = setTimeout(() => { | ||
incoming.close(Unauthorized.code, Unauthorized.reason) | ||
}, this.configuration.timeout) | ||
// Every new connection gets a unique identifier. | ||
const socketId = uuid() | ||
// To override settings for specific connections, we’ll | ||
// keep track of a few things in the `ConnectionConfiguration`. | ||
const connection: ConnectionConfiguration = { | ||
readOnly: false, | ||
handleConnection(incoming: WebSocket, request: IncomingMessage, defaultContext: any = {}): void { | ||
const clientConnection = new ClientConnection(incoming, request, this, this.hooks.bind(this), this.debugger, { | ||
requiresAuthentication: this.requiresAuthentication, | ||
isAuthenticated: false, | ||
} | ||
// The `onConnect` and `onAuthenticate` hooks need some context | ||
// to decide who’s connecting, so let’s put it together: | ||
const hookPayload = { | ||
instance: this, | ||
request, | ||
requestHeaders: request.headers, | ||
requestParameters: Hocuspocus.getParameters(request), | ||
socketId, | ||
connection, | ||
} | ||
// this map indicates whether a `Connection` instance has already taken over for incoming message for the key (i.e. documentName) | ||
const documentConnections: Record<string, boolean> = {} | ||
// While the connection will be establishing messages will | ||
// be queued and handled later. | ||
const incomingMessageQueue: Record<string, Uint8Array[]> = {} | ||
// While the connection is establishing | ||
const connectionEstablishing: Record<string, boolean> = {} | ||
// Once all hooks are run, we’ll fully establish the connection: | ||
const setUpNewConnection = async (documentName: string) => { | ||
// Not an idle connection anymore, no need to close it then. | ||
clearTimeout(closeIdleConnection) | ||
// If no hook interrupts, create a document and connection | ||
const document = await this.createDocument(documentName, request, socketId, connection, context) | ||
const instance = this.createConnection(incoming, request, document, socketId, connection.readOnly, context) | ||
instance.onClose((document, event) => { | ||
delete documentConnections[documentName] | ||
delete incomingMessageQueue[documentName] | ||
delete connectionEstablishing[documentName] | ||
if (Object.keys(documentConnections).length === 0) { | ||
instance.webSocket.close(event?.code, event?.reason) // TODO: Move this to Hocuspocus connection handler | ||
} | ||
}) | ||
documentConnections[documentName] = true | ||
// There’s no need to queue messages anymore. | ||
// Let’s work through queued messages. | ||
incomingMessageQueue[documentName].forEach(input => { | ||
incoming.emit('message', input) | ||
}) | ||
this.hooks('connected', { | ||
...hookPayload, | ||
documentName, | ||
context, | ||
connectionInstance: instance, | ||
}) | ||
} | ||
// This listener handles authentication messages and queues everything else. | ||
const handleQueueingMessage = (data: Uint8Array) => { | ||
try { | ||
const tmpMsg = new SocketIncomingMessage(data) | ||
const documentName = decoding.readVarString(tmpMsg.decoder) | ||
const type = decoding.readVarUint(tmpMsg.decoder) | ||
// Okay, we’ve got the authentication message we’re waiting for: | ||
if (type === MessageType.Auth && !connectionEstablishing[documentName]) { | ||
connectionEstablishing[documentName] = true | ||
// The 2nd integer contains the submessage type | ||
// which will always be authentication when sent from client -> server | ||
decoding.readVarUint(tmpMsg.decoder) | ||
const token = decoding.readVarString(tmpMsg.decoder) | ||
this.debugger.log({ | ||
direction: 'in', | ||
type, | ||
category: 'Token', | ||
}) | ||
this.hooks('onAuthenticate', { | ||
token, | ||
...hookPayload, | ||
documentName, | ||
}, (contextAdditions: any) => { | ||
// Hooks are allowed to give us even more context and we’ll merge everything together. | ||
// We’ll pass the context to other hooks then. | ||
context = { ...context, ...contextAdditions } | ||
}) | ||
.then(() => { | ||
// All `onAuthenticate` hooks passed. | ||
connection.isAuthenticated = true | ||
// Let the client know that authentication was successful. | ||
const message = new OutgoingMessage(documentName).writeAuthenticated(connection.readOnly) | ||
this.debugger.log({ | ||
direction: 'out', | ||
type: message.type, | ||
category: message.category, | ||
}) | ||
incoming.send(message.toUint8Array()) | ||
}) | ||
.then(() => { | ||
// Time to actually establish the connection. | ||
return setUpNewConnection(documentName) | ||
}) | ||
.catch((error = Forbidden) => { | ||
const message = new OutgoingMessage(documentName).writePermissionDenied(error.reason ?? 'permission-denied') | ||
this.debugger.log({ | ||
direction: 'out', | ||
type: message.type, | ||
category: message.category, | ||
}) | ||
// Ensure that the permission denied message is sent before the | ||
// connection is closed | ||
incoming.send(message.toUint8Array(), () => { | ||
if (Object.keys(documentConnections).length === 0) { | ||
try { | ||
incoming.close(error.code ?? Forbidden.code, error.reason ?? Forbidden.reason) | ||
} catch (closeError) { | ||
// catch is needed in case invalid error code is returned by hook (that would fail sending the close message) | ||
console.error(closeError) | ||
incoming.close(Forbidden.code, Forbidden.reason) | ||
} | ||
} | ||
}) | ||
}) | ||
} else { | ||
incomingMessageQueue[documentName].push(data) | ||
} | ||
// Catch errors due to failed decoding of data | ||
} catch (error) { | ||
console.error(error) | ||
incoming.close(Unauthorized.code, Unauthorized.reason) | ||
timeout: this.configuration.timeout, | ||
}, defaultContext) | ||
clientConnection.onClose((document: Document, hookPayload: onDisconnectPayload) => { | ||
// Check if there are still no connections to the document, as these hooks | ||
// may take some time to resolve (e.g. database queries). If a | ||
// new connection were to come in during that time it would rely on the | ||
// document in the map that we remove now. | ||
if (document.getConnectionsCount() > 0) { | ||
return | ||
} | ||
} | ||
const messageHandler = (data: Uint8Array) => { | ||
try { | ||
const tmpMsg = new SocketIncomingMessage(data) | ||
const documentName = decoding.readVarString(tmpMsg.decoder) | ||
if (documentConnections[documentName] === true) { | ||
// we already have a `Connection` set up for this document | ||
return | ||
} | ||
// if this is the first message, trigger onConnect & check if we can start the connection (only if no auth is required) | ||
if (incomingMessageQueue[documentName] === undefined) { | ||
incomingMessageQueue[documentName] = [] | ||
this.hooks('onConnect', { ...hookPayload, documentName }, (contextAdditions: any) => { | ||
// merge context from all hooks | ||
context = { ...context, ...contextAdditions } | ||
}) | ||
.then(() => { | ||
// Authentication is required, we’ll need to wait for the Authentication message. | ||
if (connection.requiresAuthentication || connectionEstablishing[documentName]) { | ||
return | ||
} | ||
connectionEstablishing[documentName] = true | ||
return setUpNewConnection(documentName) | ||
}) | ||
.catch((error = Forbidden) => { | ||
// if a hook interrupts, close the websocket connection | ||
try { | ||
incoming.close(error.code ?? Forbidden.code, error.reason ?? Forbidden.reason) | ||
} catch (closeError) { | ||
// catch is needed in case invalid error code is returned by hook (that would fail sending the close message) | ||
console.error(closeError) | ||
incoming.close(Unauthorized.code, Unauthorized.reason) | ||
} | ||
}) | ||
} | ||
handleQueueingMessage(data) | ||
} catch (closeError) { | ||
// catch is needed in case an invalid payload crashes the parsing of the Uint8Array | ||
console.error(closeError) | ||
incoming.close(Unauthorized.code, Unauthorized.reason) | ||
// If it’s the last connection, we need to make sure to store the | ||
// document. Use the debounce helper, to clear running timers, | ||
// but make it run immediately (`true`). | ||
// Only run this if the document has finished loading earlier (i.e. not to persist the empty | ||
// ydoc if the onLoadDocument hook returned an error) | ||
if (!document.isLoading) { | ||
this.debounce(`onStoreDocument-${document.name}`, () => { | ||
this.storeDocumentHooks(document, hookPayload) | ||
}, true) | ||
} else { | ||
// Remove document from memory immediately | ||
this.documents.delete(document.name) | ||
document.destroy() | ||
} | ||
} | ||
incoming.on('message', messageHandler) | ||
}) | ||
} | ||
@@ -594,3 +415,3 @@ | ||
private handleDocumentUpdate(document: Document, connection: Connection | undefined, update: Uint8Array, request?: IncomingMessage): void { | ||
const hookPayload = { | ||
const hookPayload: onChangePayload | onStoreDocumentPayload = { | ||
instance: this, | ||
@@ -602,3 +423,3 @@ clientsCount: document.getConnectionsCount(), | ||
requestHeaders: request?.headers ?? {}, | ||
requestParameters: Hocuspocus.getParameters(request), | ||
requestParameters: getParameters(request), | ||
socketId: connection?.socketId ?? '', | ||
@@ -609,2 +430,3 @@ update, | ||
this.hooks('onChange', hookPayload).catch(error => { | ||
// TODO: what's the intention of this catch -> throw? | ||
throw error | ||
@@ -663,3 +485,3 @@ }) | ||
*/ | ||
private async createDocument(documentName: string, request: Partial<Pick<IncomingMessage, 'headers' | 'url'>>, socketId: string, connection: ConnectionConfiguration, context?: any): Promise<Document> { | ||
public async createDocument(documentName: string, request: Partial<Pick<IncomingMessage, 'headers' | 'url'>>, socketId: string, connection: ConnectionConfiguration, context?: any): Promise<Document> { | ||
if (this.documents.has(documentName)) { | ||
@@ -684,3 +506,3 @@ const document = this.documents.get(documentName) | ||
requestHeaders: request.headers, | ||
requestParameters: Hocuspocus.getParameters(request), | ||
requestParameters: getParameters(request), | ||
} | ||
@@ -735,95 +557,2 @@ | ||
/** | ||
* Create a new connection by the given request and document | ||
*/ | ||
private createConnection(connection: WebSocket, request: IncomingMessage, document: Document, socketId: string, readOnly = false, context?: any): Connection { | ||
const instance = new Connection( | ||
connection, | ||
request, | ||
document, | ||
this.configuration.timeout, | ||
socketId, | ||
context, | ||
readOnly, | ||
this.debugger, | ||
) | ||
instance.onClose(document => { | ||
const hookPayload = { | ||
instance: this, | ||
clientsCount: document.getConnectionsCount(), | ||
context, | ||
document, | ||
socketId, | ||
documentName: document.name, | ||
requestHeaders: request.headers, | ||
requestParameters: Hocuspocus.getParameters(request), | ||
} | ||
this.hooks('onDisconnect', hookPayload).then(() => { | ||
// Check if there are still no connections to the document, as these hooks | ||
// may take some time to resolve (e.g. database queries). If a | ||
// new connection were to come in during that time it would rely on the | ||
// document in the map that we remove now. | ||
if (document.getConnectionsCount() > 0) { | ||
return | ||
} | ||
// If it’s the last connection, we need to make sure to store the | ||
// document. Use the debounce helper, to clear running timers, | ||
// but make it run immediately (`true`). | ||
// Only run this if the document has finished loading earlier (i.e. not to persist the empty | ||
// ydoc if the onLoadDocument hook returned an error) | ||
if (!document.isLoading) { | ||
this.debounce(`onStoreDocument-${document.name}`, () => { | ||
this.storeDocumentHooks(document, hookPayload) | ||
}, true) | ||
} else { | ||
// Remove document from memory immediately | ||
this.documents.delete(document.name) | ||
document.destroy() | ||
} | ||
}) | ||
}) | ||
instance.onStatelessCallback(payload => { | ||
return this.hooks('onStateless', payload) | ||
.catch(error => { | ||
if (error?.message) { | ||
throw error | ||
} | ||
}) | ||
}) | ||
instance.beforeHandleMessage((connection, update) => { | ||
const hookPayload: beforeHandleMessagePayload = { | ||
instance: this, | ||
clientsCount: document.getConnectionsCount(), | ||
context, | ||
document, | ||
socketId, | ||
connection, | ||
documentName: document.name, | ||
requestHeaders: request.headers, | ||
requestParameters: Hocuspocus.getParameters(request), | ||
update, | ||
} | ||
return this.hooks('beforeHandleMessage', hookPayload) | ||
}) | ||
// If the WebSocket has already disconnected (wow, that was fast) – then | ||
// immediately call close to cleanup the connection and document in memory. | ||
if ( | ||
connection.readyState === WsReadyStates.Closing | ||
|| connection.readyState === WsReadyStates.Closed | ||
) { | ||
instance.close() | ||
} | ||
return instance | ||
} | ||
storeDocumentHooks(document: Document, hookPayload: onStoreDocumentPayload) { | ||
@@ -885,10 +614,2 @@ this.hooks('onStoreDocument', hookPayload) | ||
/** | ||
* Get parameters by the given request | ||
*/ | ||
private static getParameters(request?: Pick<IncomingMessage, 'url'>): URLSearchParams { | ||
const query = request?.url?.split('?') || [] | ||
return new URLSearchParams(query[1] ? query[1] : '') | ||
} | ||
enableDebugging() { | ||
@@ -895,0 +616,0 @@ this.debugger.enable() |
@@ -340,1 +340,6 @@ import { | ||
} | ||
export interface DirectConnection { | ||
transact(transaction: (document: Document) => void): Promise<void>, | ||
disconnect(): void | ||
} |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
785546
141
9376
Updated@hocuspocus/common@^2.2.0