pg-logical-replication
1. Install
$ npm install pg-logical-replication
2. Usage
- This is an example using
wal2json
. A replication slot(test_slot_wal2json
) must be created on the PostgreSQL server.
SELECT * FROM pg_create_logical_replication_slot('test_slot_wal2json', 'wal2json')
const slotName = 'test_slot_wal2json';
const service = new LogicalReplicationService(
{
database: 'playground',
},
{
acknowledge: {
auto: true,
timeoutSeconds: 10
}
}
)
const plugin = new Wal2JsonPlugin({
});
service.on('data', (lsn: string, log: Wal2Json.Output) => {
});
(function proc() {
service.subscribe(plugin, slotName)
.catch((e) => {
console.error(e);
})
.then(() => {
setTimeout(proc, 100);
});
})();
3. LogicalReplicationService
3-1. Constructor(clientConfig: ClientConfig, config?: Partial<LogicalReplicationConfig>)
const service = new LogicalReplicationService(
clientConfig: {
user?: string | undefined;
database?: string | undefined;
password?: string | (() => string | Promise<string>) | undefined;
port?: number | undefined;
host?: string | undefined;
connectionString?: string | undefined;
keepAlive?: boolean | undefined;
stream?: stream.Duplex | undefined;
statement_timeout?: false | number | undefined;
parseInputDatesAsUTC?: boolean | undefined;
ssl?: boolean | ConnectionOptions | undefined;
query_timeout?: number | undefined;
keepAliveInitialDelayMillis?: number | undefined;
idle_in_transaction_session_timeout?: number | undefined;
application_name?: string | undefined;
connectionTimeoutMillis?: number | undefined;
types?: CustomTypesConfig | undefined;
options?: string | undefined;
},
config?: Partial<{
acknowledge?: {
auto: boolean;
timeoutSeconds: 0 | 10 | number;
};
}>
)
3-2. subscribe(plugin: AbstractPlugin, slotName: string, uptoLsn?: string): Promise<this>
3-3. acknowledge(lsn: string): Promise<boolean>
- After processing the data, it signals the PostgreSQL server that it is OK to clear the WAL log.
- Usually this is done automatically.
- Manually use only when
new LogicalReplicationService({}, {acknowledge: {auto: false}})
.
3-4. Event
on(event: 'start', listener: () => Promise<void> | void)
- Emitted when start replication.
on(event: 'data', listener: (lsn: string, log: any) => Promise<void> | void)
- Emitted when PostgreSQL data changes. The log value type varies depending on the plugin.
on(event: 'error', listener: (err: Error) => void)
on(event: 'acknowledge', listener: (lsn: string) => Promise<void> | void)
- Emitted when acknowledging automatically.
on(event: 'heartbeat', listener: (lsn: string, timestamp: number, shouldRespond: boolean) => Promise<void> | void)
- A heartbeat check signal has been received from the server. You may need to run
service.acknowledge()
.
3-5. Misc. method
stop(): Promise<this>
- Terminate the server's connection and stop replication.
isStop(): boolean
- Returns false when replication starts from the server.
lastLsn(): string
4. Output Plugins
4-1. PgoutputPlugin
for pgoutput (Native to PostgreSQL)
4-2. Wal2JsonPlugin
for wal2json
4-3. ProtocolBuffersPlugin
for decoderbufs
4-4. TestDecodingPlugin
for test_decoding (Not recommended)
Contributors