Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

mongochangestream

Package Overview
Dependencies
Maintainers
1
Versions
59
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

mongochangestream - npm Package Compare versions

Comparing version 0.43.2 to 0.44.0

.ignore

4

CHANGELOG.md

@@ -0,1 +1,5 @@

# 0.44.0
- Bump dependencies.
# 0.43.2

@@ -2,0 +6,0 @@

4

dist/mongoChangeStream.d.ts

@@ -33,3 +33,3 @@ import { Collection, Db } from 'mongodb';

change: (newState: State) => void;
waitForChange: (...newStates: State[]) => Promise<boolean>;
waitForChange: (...newStates: State[]) => Promise<void>;
is: (...states: State[]) => boolean;

@@ -55,3 +55,3 @@ canChange: (newState: State) => boolean;

change: (newState: State) => void;
waitForChange: (...newStates: State[]) => Promise<boolean>;
waitForChange: (...newStates: State[]) => Promise<void>;
is: (...states: State[]) => boolean;

@@ -58,0 +58,0 @@ canChange: (newState: State) => boolean;

{
"name": "mongochangestream",
"version": "0.43.2",
"version": "0.44.0",
"description": "Sync MongoDB collections via change streams into any database.",

@@ -10,3 +10,3 @@ "author": "GovSpend",

"scripts": {
"prepare": "npm run lint && npm run build",
"prepare": "npm run lint && npm run test && npm run build",
"clean": "rm -rf dist",

@@ -16,3 +16,5 @@ "build": "npm run fmt && npm run clean && tsc --declaration --project ./tsconfig-prod.json",

"lint": "eslint src/**",
"fmt": "prettier --ignore-path .gitignore --write './'"
"fmt": "prettier --ignore-path .gitignore --write './'",
"test": "node --test --test-force-exit",
"test:only": "DEBUG=* node --test --test-only --test-force-exit"
},

@@ -39,10 +41,10 @@ "keywords": [

"devDependencies": {
"@faker-js/faker": "^7.6.0",
"@types/debug": "^4.1.7",
"@types/lodash": "^4.14.182",
"@typescript-eslint/eslint-plugin": "^5.59.1",
"eslint": "^8.39.0",
"ioredis": "5.3.2",
"prettier": "^2.8.8",
"typescript": "^5.0.4"
"@faker-js/faker": "^8.4.1",
"@types/debug": "^4.1.12",
"@types/lodash": "^4.17.1",
"@types/node": "^20.12.12",
"@typescript-eslint/eslint-plugin": "^7.9.0",
"dotenv": "^16.4.5",
"prettier": "^3.2.5",
"typescript": "^5.4.5"
},

@@ -54,5 +56,5 @@ "dependencies": {

"ms": "^2.1.3",
"obj-walker": "^1.7.0",
"prom-utils": "^0.5.0",
"simple-machines": "^0.3.0"
"obj-walker": "^2.2.0",
"prom-utils": "^0.8.0",
"simple-machines": "^0.4.0"
},

@@ -59,0 +61,0 @@ "peerDependencies": {

/**
* To run: MONGO_CONN="[conn string]" node dist/mongoChangeStream.test.js
* To run add a local .env file with MONGO_CONN
*/
import 'dotenv/config'
import _ from 'lodash/fp.js'
import { test } from 'node:test'
import { describe, test } from 'node:test'
import assert from 'node:assert'

@@ -50,7 +51,7 @@ import { initSync } from './mongoChangeStream.js'

const genUser = () => ({
name: faker.name.fullName(),
name: faker.person.fullName(),
address: {
city: faker.address.city(),
state: faker.address.state(),
zipCode: faker.address.zipCode(),
city: faker.location.city(),
state: faker.location.state(),
zipCode: faker.location.zipCode(),
},

@@ -105,642 +106,645 @@ createdAt: faker.date.past(),

test('should complete initial scan', async () => {
const { coll, db } = await getConns()
const sync = await getSync()
await initState(sync, db, coll)
describe('syncing', () => {
test('starting change stream is idempotent', async () => {
const sync = await getSync()
// Change stream
const processRecords = async () => {
await setTimeout(5)
}
const changeStream = await sync.processChangeStream(processRecords)
// Start twice
changeStream.start()
changeStream.start()
await changeStream.stop()
})
const processed = []
const processRecords = async (docs: ChangeStreamInsertDocument[]) => {
await setTimeout(50)
processed.push(...docs)
}
const scanOptions = { batchSize: 100 }
const initialScan = await sync.runInitialScan(processRecords, scanOptions)
// Wait for initial scan to complete
await initialScan.start()
assert.equal(processed.length, numDocs)
// Stop
await initialScan.stop()
})
test('stopping change stream is idempotent', async () => {
const { coll, db } = await getConns()
const sync = await getSync()
await initState(sync, db, coll)
test('should exit cleanly if initial scan is already complete', async () => {
const { coll, db, redis } = await getConns()
const sync = await getSync()
await initState(sync, db, coll)
// Mark initial scan complete
await redis.set(sync.keys.scanCompletedKey, new Date().toString())
// Change stream
const processRecords = async () => {
await setTimeout(5)
}
const changeStream = await sync.processChangeStream(processRecords)
changeStream.start()
// Change documents
await coll.updateMany({}, { $set: { createdAt: new Date('2022-01-03') } })
// Stop twice
await changeStream.stop()
await changeStream.stop()
})
const processRecords = async () => {
await setTimeout(50)
}
const scanOptions = { batchSize: 100 }
const initialScan = await sync.runInitialScan(processRecords, scanOptions)
await initialScan.start()
await initialScan.stop()
})
test('starting initial scan is idempotent', async () => {
const { coll, db } = await getConns()
const sync = await getSync()
await initState(sync, db, coll)
test('should run initial scan in reverse sort order', async () => {
const { coll, db } = await getConns()
const sync = await getSync()
await initState(sync, db, coll)
const processRecords = async () => {
await setTimeout(50)
}
const initialScan = await sync.runInitialScan(processRecords)
// Start twice
initialScan.start()
initialScan.start()
await initialScan.stop()
})
const processed = []
const processRecords = async (docs: ChangeStreamInsertDocument[]) => {
await setTimeout(50)
processed.push(...docs)
}
const sortField: SortField<ObjectId> = {
field: '_id',
serialize: _.toString,
deserialize: (x: string) => new ObjectId(x),
order: 'desc',
}
const scanOptions = { batchSize: 100, sortField }
const initialScan = await sync.runInitialScan(processRecords, scanOptions)
// Wait for initial scan to complete
await initialScan.start()
assert.equal(processed.length, numDocs)
// Stop
await initialScan.stop()
})
test('stopping initial scan is idempotent', async () => {
const { coll, db } = await getConns()
const sync = await getSync()
await initState(sync, db, coll)
test('should omit fields from initial scan', async () => {
const { coll, db } = await getConns()
const sync = await getSync({ omit: ['name'] })
await initState(sync, db, coll)
const processRecords = async () => {
await setTimeout(50)
}
const initialScan = await sync.runInitialScan(processRecords)
initialScan.start()
await setTimeout(500)
// Stop twice
await initialScan.stop()
await initialScan.stop()
})
const documents: Document[] = []
const processRecords = async (docs: ChangeStreamInsertDocument[]) => {
await setTimeout(50)
documents.push(docs[0].fullDocument)
}
const scanOptions = { batchSize: 100 }
const initialScan = await sync.runInitialScan(processRecords, scanOptions)
// Wait for initial scan to complete
await initialScan.start()
assert.equal(documents[0].name, undefined)
// Stop
await initialScan.stop()
})
test('should complete initial scan', async () => {
const { coll, db } = await getConns()
const sync = await getSync()
await initState(sync, db, coll)
test('should complete initial scan if collection is empty', async () => {
const { coll } = await getConns()
const sync = await getSync()
const processed = []
const processRecords = async (docs: ChangeStreamInsertDocument[]) => {
await setTimeout(50)
processed.push(...docs)
}
const scanOptions = { batchSize: 100 }
const initialScan = await sync.runInitialScan(processRecords, scanOptions)
// Wait for initial scan to complete
await initialScan.start()
assert.equal(processed.length, numDocs)
// Stop
await initialScan.stop()
})
// Reset state
await sync.reset()
await coll.deleteMany({})
test('should exit cleanly if initial scan is already complete', async () => {
const { coll, db, redis } = await getConns()
const sync = await getSync()
await initState(sync, db, coll)
// Mark initial scan complete
await redis.set(sync.keys.scanCompletedKey, new Date().toString())
const processed = []
const processRecords = async (docs: ChangeStreamInsertDocument[]) => {
await setTimeout(50)
processed.push(...docs)
}
let completed = false
sync.emitter.on('initialScanComplete', () => {
completed = true
const processRecords = async () => {
await setTimeout(50)
}
const scanOptions = { batchSize: 100 }
const initialScan = await sync.runInitialScan(processRecords, scanOptions)
await initialScan.start()
await initialScan.stop()
})
const scanOptions = { batchSize: 100 }
const initialScan = await sync.runInitialScan(processRecords, scanOptions)
// Wait for initial scan to complete
await initialScan.start()
assert.ok(completed)
assert.equal(processed.length, 0)
// Stop
await initialScan.stop()
})
test('initial scan should resume after stop', async () => {
const { coll, db } = await getConns()
const sync = await getSync()
await initState(sync, db, coll)
test('should run initial scan in reverse sort order', async () => {
const { coll, db } = await getConns()
const sync = await getSync()
await initState(sync, db, coll)
const processed = []
const processRecords = async (docs: ChangeStreamInsertDocument[]) => {
await setTimeout(10)
processed.push(...docs)
}
const scanOptions = { batchSize: 50 }
const initialScan = await sync.runInitialScan(processRecords, scanOptions)
let completed = false
sync.emitter.on('initialScanComplete', () => {
completed = true
const processed = []
const processRecords = async (docs: ChangeStreamInsertDocument[]) => {
await setTimeout(50)
processed.push(...docs)
}
const sortField: SortField<ObjectId> = {
field: '_id',
serialize: _.toString,
deserialize: (x: string) => new ObjectId(x),
order: 'desc',
}
const scanOptions = { batchSize: 100, sortField }
const initialScan = await sync.runInitialScan(processRecords, scanOptions)
// Wait for initial scan to complete
await initialScan.start()
assert.equal(processed.length, numDocs)
// Stop
await initialScan.stop()
})
let cursorError = false
sync.emitter.on('cursorError', () => {
cursorError = true
test('should omit fields from initial scan', async () => {
const { coll, db } = await getConns()
const sync = await getSync({ omit: ['name'] })
await initState(sync, db, coll)
const documents: Document[] = []
const processRecords = async (docs: ChangeStreamInsertDocument[]) => {
await setTimeout(50)
documents.push(docs[0].fullDocument)
}
const scanOptions = { batchSize: 100 }
const initialScan = await sync.runInitialScan(processRecords, scanOptions)
// Wait for initial scan to complete
await initialScan.start()
assert.equal(documents[0].name, undefined)
// Stop
await initialScan.stop()
})
// Start
initialScan.start()
// Allow for some records to be processed
await setTimeout(500)
// Stop the initial scan
await initialScan.stop()
// Should not emit cursorError when stopping
assert.equal(cursorError, false)
// Wait for the initial scan to complete
initialScan.start()
// Add some more records
await populateCollection(coll, 10)
await setTimeout(ms('5s'))
assert.ok(completed)
assert.equal(processed.length, numDocs + 10)
// Stop
await initialScan.stop()
})
test('initial scan should not be marked as completed if connection is closed', async () => {
// Get a new connection since we're closing the connection in the test
const { coll, redis, db, client } = await getConns({})
const sync = initSync(redis, coll)
sync.emitter.on('stateChange', console.log)
sync.emitter.on('cursorError', console.log)
await initState(sync, db, coll)
test('should complete initial scan if collection is empty', async () => {
const { coll } = await getConns()
const sync = await getSync()
const processed = []
const processRecords = async (docs: ChangeStreamInsertDocument[]) => {
await setTimeout(10)
processed.push(...docs)
}
const scanOptions = { batchSize: 50 }
const initialScan = await sync.runInitialScan(processRecords, scanOptions)
// Start
initialScan.start()
// Allow for some records to be processed
await setTimeout(200)
// Close the connection.
await client.close()
// Allow for some time
await setTimeout(100)
// Check if completed
const completedAt = await redis.get(sync.keys.scanCompletedKey)
assert.equal(completedAt, null)
// Stop
await initialScan.stop()
})
// Reset state
await sync.reset()
await coll.deleteMany({})
test('initial scan should support custom pipeline', async () => {
const { coll, db } = await getConns()
const sync = await getSync()
await initState(sync, db, coll)
const processed = []
const processRecords = async (docs: ChangeStreamInsertDocument[]) => {
await setTimeout(50)
processed.push(...docs)
}
let completed = false
sync.emitter.on('initialScanComplete', () => {
completed = true
})
const scanOptions = { batchSize: 100 }
const initialScan = await sync.runInitialScan(processRecords, scanOptions)
// Wait for initial scan to complete
await initialScan.start()
assert.ok(completed)
assert.equal(processed.length, 0)
// Stop
await initialScan.stop()
})
sync.emitter.on('stateChange', console.log)
test('initial scan should resume after stop', async () => {
const { coll, db } = await getConns()
const sync = await getSync()
await initState(sync, db, coll)
const documents: Document[] = []
const processRecords = async (docs: ChangeStreamInsertDocument[]) => {
await setTimeout(10)
documents.push(docs[0].fullDocument)
}
const scanOptions: QueueOptions & ScanOptions = {
batchSize: 50,
pipeline: [
{
$addFields: {
cityState: { $concat: ['$address.city', '-', '$address.state'] },
},
},
],
}
const initialScan = await sync.runInitialScan(processRecords, scanOptions)
// Start
initialScan.start()
// Allow for some records to be processed
await setTimeout(500)
// Stop
await initialScan.stop()
assert.ok(documents[0].cityState)
})
const processed = []
const processRecords = async (docs: ChangeStreamInsertDocument[]) => {
await setTimeout(15)
processed.push(...docs)
}
const scanOptions = { batchSize: 25 }
const initialScan = await sync.runInitialScan(processRecords, scanOptions)
let completed = false
sync.emitter.on('initialScanComplete', () => {
completed = true
})
let cursorError = false
sync.emitter.on('cursorError', () => {
cursorError = true
})
// Start
initialScan.start()
// Allow for some records to be processed
await setTimeout(500)
// Stop the initial scan
await initialScan.stop()
// Only a subset of the documents were processed
assert.ok(processed.length < numDocs)
// Should not emit cursorError when stopping
assert.equal(cursorError, false)
// Wait for the initial scan to complete
await initialScan.start()
assert.ok(completed)
assert.equal(processed.length, numDocs)
// Stop
await initialScan.stop()
})
test('should process records via change stream', async () => {
const { coll, db } = await getConns()
const sync = await getSync()
await initState(sync, db, coll)
test('initial scan should not be marked as completed if connection is closed', async () => {
// Get a new connection since we're closing the connection in the test
const { coll, redis, db, client } = await getConns({})
const sync = initSync(redis, coll)
sync.emitter.on('stateChange', console.log)
sync.emitter.on('cursorError', console.log)
await initState(sync, db, coll)
let cursorError = false
sync.emitter.on('cursorError', () => {
cursorError = true
const processed = []
const processRecords = async (docs: ChangeStreamInsertDocument[]) => {
await setTimeout(10)
processed.push(...docs)
}
const scanOptions = { batchSize: 50 }
const initialScan = await sync.runInitialScan(processRecords, scanOptions)
// Start
initialScan.start()
// Allow for some records to be processed
await setTimeout(200)
// Close the connection.
await client.close()
// Allow for some time
await setTimeout(100)
// Check if completed
const completedAt = await redis.get(sync.keys.scanCompletedKey)
assert.equal(completedAt, null)
// Stop
await initialScan.stop()
})
const processed = []
const processRecords = async (docs: ChangeStreamDocument[]) => {
for (const doc of docs) {
await setTimeout(5)
processed.push(doc)
test('initial scan should support custom pipeline', async () => {
const { coll, db } = await getConns()
const sync = await getSync()
await initState(sync, db, coll)
sync.emitter.on('stateChange', console.log)
const documents: Document[] = []
const processRecords = async (docs: ChangeStreamInsertDocument[]) => {
await setTimeout(10)
documents.push(docs[0].fullDocument)
}
}
const changeStream = await sync.processChangeStream(processRecords)
// Start
changeStream.start()
await setTimeout(ms('1s'))
// Update records
coll.updateMany({}, { $set: { createdAt: new Date('2022-01-01') } })
// Wait for the change stream events to be processed
await setTimeout(ms('10s'))
assert.equal(processed.length, numDocs)
// Stop
await changeStream.stop()
// Should not emit cursorError when stopping
assert.equal(cursorError, false)
})
const scanOptions: QueueOptions & ScanOptions = {
batchSize: 50,
pipeline: [
{
$addFields: {
cityState: { $concat: ['$address.city', '-', '$address.state'] },
},
},
],
}
const initialScan = await sync.runInitialScan(processRecords, scanOptions)
// Start
initialScan.start()
// Allow for some records to be processed
await setTimeout(500)
// Stop
await initialScan.stop()
assert.ok(documents[0].cityState)
})
test('should omit fields from change stream', async () => {
const { coll, db } = await getConns()
const sync = await getSync({ omit: ['address.city'] })
await initState(sync, db, coll)
test('should process records via change stream', async () => {
const { coll, db } = await getConns()
const sync = await getSync()
await initState(sync, db, coll)
const documents: Document[] = []
const processRecords = async (docs: ChangeStreamDocument[]) => {
for (const doc of docs) {
await setTimeout(5)
if (doc.operationType === 'update' && doc.fullDocument) {
documents.push(doc)
let cursorError = false
sync.emitter.on('cursorError', () => {
cursorError = true
})
const processed = []
const processRecords = async (docs: ChangeStreamDocument[]) => {
for (const doc of docs) {
await setTimeout(5)
processed.push(doc)
}
}
}
const changeStream = await sync.processChangeStream(processRecords)
// Start
changeStream.start()
await setTimeout(ms('1s'))
// Update records
coll.updateMany(
{},
{ $set: { name: 'unknown', 'address.city': 'San Diego' } }
)
// Wait for the change stream events to be processed
await setTimeout(ms('2s'))
assert.equal(documents[0].fullDocument.address.city, undefined)
assert.equal(
documents[0].updateDescription.updatedFields['address.city'],
undefined
)
// Stop
await changeStream.stop()
})
const changeStream = await sync.processChangeStream(processRecords)
// Start
changeStream.start()
await setTimeout(ms('1s'))
// Update records
coll.updateMany({}, { $set: { createdAt: new Date('2022-01-01') } })
// Wait for the change stream events to be processed
await setTimeout(ms('10s'))
assert.equal(processed.length, numDocs)
// Stop
await changeStream.stop()
// Should not emit cursorError when stopping
assert.equal(cursorError, false)
})
test('should omit nested fields when parent field is omitted from change stream', async () => {
const { coll, db } = await getConns()
const sync = await getSync({ omit: ['address'] })
await initState(sync, db, coll)
test('should omit fields from change stream', async () => {
const { coll, db } = await getConns()
const sync = await getSync({ omit: ['address.city'] })
await initState(sync, db, coll)
const documents: Document[] = []
const processRecords = async (docs: ChangeStreamDocument[]) => {
for (const doc of docs) {
await setTimeout(5)
if (doc.operationType === 'update' && doc.fullDocument) {
documents.push(doc.fullDocument)
const documents: Document[] = []
const processRecords = async (docs: ChangeStreamDocument[]) => {
for (const doc of docs) {
await setTimeout(5)
if (doc.operationType === 'update' && doc.fullDocument) {
documents.push(doc)
}
}
}
}
const changeStream = await sync.processChangeStream(processRecords)
// Start
changeStream.start()
await setTimeout(ms('1s'))
// Update records
coll.updateMany({}, { $set: { 'address.zipCode': '90210' } })
// Wait for the change stream events to be processed
await setTimeout(ms('2s'))
assert.equal(documents[0].address?.zipCode, undefined)
// Stop
await changeStream.stop()
})
const changeStream = await sync.processChangeStream(processRecords)
// Start
changeStream.start()
await setTimeout(ms('1s'))
// Update records
coll.updateMany(
{},
{ $set: { name: 'unknown', 'address.city': 'San Diego' } }
)
// Wait for the change stream events to be processed
await setTimeout(ms('2s'))
assert.equal(documents[0].fullDocument.address.city, undefined)
assert.equal(
documents[0].updateDescription.updatedFields['address.city'],
undefined
)
// Stop
await changeStream.stop()
})
test('should omit operation types from change stream', async () => {
const { coll, db } = await getConns()
const sync = await getSync()
await initState(sync, db, coll)
test('should omit nested fields when parent field is omitted from change stream', async () => {
const { coll, db } = await getConns()
const sync = await getSync({ omit: ['address'] })
await initState(sync, db, coll)
const operations: string[] = []
const processRecords = async (docs: ChangeStreamDocument[]) => {
for (const doc of docs) {
await setTimeout(5)
operations.push(doc.operationType)
const documents: Document[] = []
const processRecords = async (docs: ChangeStreamDocument[]) => {
for (const doc of docs) {
await setTimeout(5)
if (doc.operationType === 'update' && doc.fullDocument) {
documents.push(doc.fullDocument)
}
}
}
}
const changeStream = await sync.processChangeStream(processRecords, {
operationTypes: ['insert'],
// Short timeout since only event will be queued
timeout: 500,
const changeStream = await sync.processChangeStream(processRecords)
// Start
changeStream.start()
await setTimeout(ms('1s'))
// Update records
coll.updateMany({}, { $set: { 'address.zipCode': '90210' } })
// Wait for the change stream events to be processed
await setTimeout(ms('2s'))
assert.equal(documents[0].address?.zipCode, undefined)
// Stop
await changeStream.stop()
})
// Start
changeStream.start()
await setTimeout(ms('1s'))
// Update records
await coll.updateMany({}, { $set: { name: 'unknown' } })
// Insert record
await coll.insertOne(genUser())
// Wait for the change stream events to be processed
await setTimeout(ms('2s'))
assert.deepEqual(_.uniq(operations), ['insert'])
// Stop
await changeStream.stop()
})
test('change stream should resume properly', async () => {
const { coll, db } = await getConns()
const sync = await getSync()
await initState(sync, db, coll)
test('should omit operation types from change stream', async () => {
const { coll, db } = await getConns()
const sync = await getSync()
await initState(sync, db, coll)
const processed = []
// Change stream
const processRecords = async (docs: ChangeStreamDocument[]) => {
for (const doc of docs) {
await setTimeout(5)
processed.push(doc)
const operations: string[] = []
const processRecords = async (docs: ChangeStreamDocument[]) => {
for (const doc of docs) {
await setTimeout(5)
operations.push(doc.operationType)
}
}
}
const changeStream = await sync.processChangeStream(processRecords)
changeStream.start()
// Let change stream connect
await setTimeout(ms('1s'))
// Change documents
coll.updateMany({}, { $set: { createdAt: new Date('2022-01-02') } })
// Wait for some change stream events to be processed
await setTimeout(ms('2s'))
// Stop
await changeStream.stop()
// Resume change stream
changeStream.start()
// Wait for all documents to be processed
await setTimeout(ms('10s'))
// All change stream docs were processed
assert.equal(processed.length, numDocs)
await changeStream.stop()
})
test('change stream handle missing oplog entry properly', async () => {
const { coll, db, redis } = await getConns()
const sync = await getSync()
let cursorError: any
sync.emitter.on('cursorError', (event: CursorErrorEvent) => {
cursorError = event
const changeStream = await sync.processChangeStream(processRecords, {
operationTypes: ['insert'],
// Short timeout since only event will be queued
timeout: 500,
})
// Start
changeStream.start()
await setTimeout(ms('1s'))
// Update records
await coll.updateMany({}, { $set: { name: 'unknown' } })
// Insert record
await coll.insertOne(genUser())
// Wait for the change stream events to be processed
await setTimeout(ms('2s'))
assert.deepEqual(_.uniq(operations), ['insert'])
// Stop
await changeStream.stop()
})
await initState(sync, db, coll)
test('change stream should resume properly', async () => {
const { coll, db } = await getConns()
const sync = await getSync()
await initState(sync, db, coll)
// Set missing token key
await redis.set(
sync.keys.changeStreamTokenKey,
'{"_data":"8263F51B8F000000012B022C0100296E5A1004F852F6C89F924F0A8711460F0C1FBD8846645F6964006463F51B8FD1AACE003022EFC80004"}'
)
// Change stream
const processRecords = async () => {
await setTimeout(5)
}
const changeStream = await sync.processChangeStream(processRecords)
changeStream.start()
// Let change stream connect
await setTimeout(ms('1s'))
assert.ok(missingOplogEntry(cursorError.error))
await changeStream.stop()
})
test('change stream handle invalid oplog entry properly', async () => {
const { coll, db, redis } = await getConns()
const sync = await getSync()
let cursorError: any
sync.emitter.on('cursorError', (event: CursorErrorEvent) => {
cursorError = event
const processed = []
// Change stream
const processRecords = async (docs: ChangeStreamDocument[]) => {
for (const doc of docs) {
await setTimeout(8)
processed.push(doc)
}
}
const changeStream = await sync.processChangeStream(processRecords)
changeStream.start()
// Let change stream connect
await setTimeout(ms('1s'))
// Change documents
coll.updateMany({}, { $set: { createdAt: new Date('2022-01-02') } })
// Wait for some change stream events to be processed
await setTimeout(ms('2s'))
// Only a subset of the documents were processed
assert.ok(processed.length < numDocs)
// Stop
await changeStream.stop()
// Resume change stream
changeStream.start()
// Wait for all documents to be processed
await setTimeout(ms('8s'))
// All change stream docs were processed
assert.equal(processed.length, numDocs)
await changeStream.stop()
})
await initState(sync, db, coll)
test('change stream handle missing oplog entry properly', async () => {
const { coll, db, redis } = await getConns()
const sync = await getSync()
let cursorError: any
sync.emitter.on('cursorError', (event: CursorErrorEvent) => {
cursorError = event
})
// Set missing token key
await redis.set(sync.keys.changeStreamTokenKey, '{"_data":"123"}')
await initState(sync, db, coll)
// Change stream
const processRecord = async () => {
await setTimeout(5)
}
const changeStream = await sync.processChangeStream(processRecord)
changeStream.start()
// Let change stream connect
await setTimeout(ms('1s'))
// Set missing token key
await redis.set(
sync.keys.changeStreamTokenKey,
'{"_data":"8263F51B8F000000012B022C0100296E5A1004F852F6C89F924F0A8711460F0C1FBD8846645F6964006463F51B8FD1AACE003022EFC80004"}'
)
assert.ok(missingOplogEntry(cursorError.error))
await changeStream.stop()
})
// Change stream
const processRecords = async () => {
await setTimeout(5)
}
const changeStream = await sync.processChangeStream(processRecords)
changeStream.start()
// Let change stream connect
await setTimeout(ms('1s'))
test('change stream should handle empty collection', async () => {
const { coll, db } = await getConns()
const sync = await getSync()
let cursorError = false
sync.emitter.on('cursorError', () => {
cursorError = true
assert.ok(missingOplogEntry(cursorError.error))
await changeStream.stop()
})
await initState(sync, db, coll)
// Delete all documents
await coll.deleteMany({})
test('change stream handle invalid oplog entry properly', async () => {
const { coll, db, redis } = await getConns()
const sync = await getSync()
let cursorError: any
sync.emitter.on('cursorError', (event: CursorErrorEvent) => {
cursorError = event
})
// Change stream
const processRecords = async () => {
await setTimeout(5)
}
const changeStream = await sync.processChangeStream(processRecords)
changeStream.start()
// Let change stream connect
await setTimeout(ms('1s'))
await initState(sync, db, coll)
assert.equal(cursorError, false)
await changeStream.stop()
})
// Set missing token key
await redis.set(sync.keys.changeStreamTokenKey, '{"_data":"123"}')
test('should emit cursorError if change stream is closed', async () => {
// Get a new connection since we're closing the connection in the test
const { redis, coll, db, client } = await getConns({})
const sync = initSync(redis, coll)
// Change stream
const processRecord = async () => {
await setTimeout(5)
}
const changeStream = await sync.processChangeStream(processRecord)
changeStream.start()
// Let change stream connect
await setTimeout(ms('1s'))
let error: any
sync.emitter.on('cursorError', (event: CursorErrorEvent) => {
console.log(event)
error = event.error
assert.ok(missingOplogEntry(cursorError.error))
await changeStream.stop()
})
await initState(sync, db, coll)
const processRecords = async () => {
await setTimeout(500)
}
const changeStream = await sync.processChangeStream(processRecords)
// Start
changeStream.start()
await setTimeout(ms('1s'))
// Update records
await coll.updateMany({}, { $set: { createdAt: new Date('2022-01-01') } })
// Close the connection.
await client.close()
await setTimeout(ms('8s'))
assert.ok(error?.message)
})
test('change stream should handle empty collection', async () => {
const { coll, db } = await getConns()
const sync = await getSync()
let cursorError = false
sync.emitter.on('cursorError', () => {
cursorError = true
})
test('starting change stream is idempotent', async () => {
const sync = await getSync()
// Change stream
const processRecords = async () => {
await setTimeout(5)
}
const changeStream = await sync.processChangeStream(processRecords)
// Start twice
changeStream.start()
changeStream.start()
await changeStream.stop()
})
await initState(sync, db, coll)
// Delete all documents
await coll.deleteMany({})
test('stopping change stream is idempotent', async () => {
const { coll, db } = await getConns()
const sync = await getSync()
await initState(sync, db, coll)
// Change stream
const processRecords = async () => {
await setTimeout(5)
}
const changeStream = await sync.processChangeStream(processRecords)
changeStream.start()
// Let change stream connect
await setTimeout(ms('1s'))
// Change stream
const processRecords = async () => {
await setTimeout(5)
}
const changeStream = await sync.processChangeStream(processRecords)
changeStream.start()
// Change documents
await coll.updateMany({}, { $set: { createdAt: new Date('2022-01-03') } })
// Stop twice
await changeStream.stop()
await changeStream.stop()
})
assert.equal(cursorError, false)
await changeStream.stop()
})
test('starting initial scan is idempotent', async () => {
const { coll, db } = await getConns()
const sync = await getSync()
await initState(sync, db, coll)
test('should emit cursorError if change stream is closed', async () => {
// Get a new connection since we're closing the connection in the test
const { redis, coll, db, client } = await getConns({})
const sync = initSync(redis, coll)
const processRecords = async () => {
await setTimeout(50)
}
const initialScan = await sync.runInitialScan(processRecords)
// Start twice
initialScan.start()
initialScan.start()
await initialScan.stop()
})
let error: any
sync.emitter.on('cursorError', (event: CursorErrorEvent) => {
console.log(event)
error = event.error
})
await initState(sync, db, coll)
test('stopping initial scan is idempotent', async () => {
const { coll, db } = await getConns()
const sync = await getSync()
await initState(sync, db, coll)
const processRecords = async () => {
await setTimeout(500)
}
const changeStream = await sync.processChangeStream(processRecords)
// Start
changeStream.start()
await setTimeout(ms('1s'))
// Update records
await coll.updateMany({}, { $set: { createdAt: new Date('2022-01-01') } })
// Close the connection.
await client.close()
await setTimeout(ms('8s'))
assert.ok(error?.message)
})
const processRecords = async () => {
await setTimeout(50)
}
const initialScan = await sync.runInitialScan(processRecords)
initialScan.start()
await setTimeout(500)
// Stop twice
await initialScan.stop()
await initialScan.stop()
})
test('Should resync when resync flag is set', async () => {
const { coll, db, redis } = await getConns()
const sync = await getSync()
await initState(sync, db, coll)
test('Should resync when resync flag is set', async () => {
const { coll, db, redis } = await getConns()
const sync = await getSync()
await initState(sync, db, coll)
let resyncTriggered = false
const processed = []
let resyncTriggered = false
const processed = []
const processRecords = async (docs: ChangeStreamInsertDocument[]) => {
await setTimeout(50)
processed.push(...docs)
}
const processRecords = async (docs: ChangeStreamInsertDocument[]) => {
await setTimeout(50)
processed.push(...docs)
}
const initialScan = await sync.runInitialScan(processRecords)
const resync = sync.detectResync(250)
sync.emitter.on('resync', async () => {
// Stop checking for resync
resync.stop()
resyncTriggered = true
// Stop the initial scan
await initialScan.stop()
// Reset keys
await sync.reset()
// Reset processed
processed.length = 0
// Start initial scan
initialScan.start()
})
// Start initial scan
initialScan.start()
// Start resync detection
resync.start()
// Allow for initial scan to start
await setTimeout(500)
// Trigger resync
await redis.set(sync.keys.resyncKey, 1)
const initialScan = await sync.runInitialScan(processRecords)
const resync = sync.detectResync(250)
sync.emitter.on('resync', async () => {
// Stop checking for resync
resync.stop()
resyncTriggered = true
// Stop the initial scan
// Wait for initial scan to complete
await setTimeout(ms('5s'))
assert.ok(resyncTriggered)
assert.equal(processed.length, numDocs)
await initialScan.stop()
// Reset keys
await sync.reset()
// Reset processed
processed.length = 0
// Start initial scan
initialScan.start()
})
// Start initial scan
initialScan.start()
// Start resync detection
resync.start()
// Allow for initial scan to start
await setTimeout(500)
// Trigger resync
await redis.set(sync.keys.resyncKey, 1)
// Wait for initial scan to complete
await setTimeout(ms('5s'))
assert.ok(resyncTriggered)
assert.equal(processed.length, numDocs)
await initialScan.stop()
})
test('Resync start/stop is idempotent', async () => {
const sync = await getSync()
test('Resync start/stop is idempotent', async () => {
const sync = await getSync()
const resync = sync.detectResync()
resync.start()
resync.start()
resync.stop()
resync.stop()
})
const resync = sync.detectResync()
resync.start()
resync.start()
resync.stop()
resync.stop()
})
test('Detect schema change', async () => {
const { db, coll } = await getConns()
const sync = await getSync()
// Set schema
await db.command({
collMod: coll.collectionName,
validator: { $jsonSchema: schema },
test('Detect schema change', async () => {
const { db, coll } = await getConns()
const sync = await getSync()
// Set schema
await db.command({
collMod: coll.collectionName,
validator: { $jsonSchema: schema },
})
// Look for a new schema every 250 ms
const schemaChange = await sync.detectSchemaChange(db, {
shouldRemoveMetadata: true,
interval: 250,
})
let newSchema: object = {}
sync.emitter.on('schemaChange', ({ currentSchema }: SchemaChangeEvent) => {
console.dir(currentSchema, { depth: 10 })
newSchema = currentSchema
})
// Start detecting schema changes
schemaChange.start()
// Modify the schema
const modifiedSchema = _.set(
'properties.email',
{ bsonType: 'string' },
schema
)
await db.command({
collMod: coll.collectionName,
validator: { $jsonSchema: modifiedSchema },
})
await setTimeout(ms('1s'))
assert.deepEqual(modifiedSchema, newSchema)
schemaChange.stop()
})
// Look for a new schema every 250 ms
const schemaChange = await sync.detectSchemaChange(db, {
shouldRemoveMetadata: true,
interval: 250,
})
let newSchema: object = {}
sync.emitter.on('schemaChange', ({ currentSchema }: SchemaChangeEvent) => {
console.dir(currentSchema, { depth: 10 })
newSchema = currentSchema
})
// Start detecting schema changes
schemaChange.start()
// Modify the schema
const modifiedSchema = _.set(
'properties.email',
{ bsonType: 'string' },
schema
)
await db.command({
collMod: coll.collectionName,
validator: { $jsonSchema: modifiedSchema },
})
await setTimeout(ms('1s'))
assert.deepEqual(modifiedSchema, newSchema)
schemaChange.stop()
})
test('Schema change start/stop is idempotent', async () => {
const { db } = await getConns()
const sync = await getSync()
test('Schema change start/stop is idempotent', async () => {
const { db } = await getConns()
const sync = await getSync()
const schemaChange = await sync.detectSchemaChange(db)
schemaChange.start()
schemaChange.start()
schemaChange.stop()
schemaChange.stop()
})
const schemaChange = await sync.detectSchemaChange(db)
schemaChange.start()
schemaChange.start()
schemaChange.stop()
schemaChange.stop()
})
test('can extend events', async () => {
const { coll, redis } = await getConns({})
const sync = initSync<'foo' | 'bar'>(redis, coll)
let emitted = ''
sync.emitter.on('foo', (x: string) => {
emitted = x
test('can extend events', async () => {
const { coll, redis } = await getConns({})
const sync = initSync<'foo' | 'bar'>(redis, coll)
let emitted = ''
sync.emitter.on('foo', (x: string) => {
emitted = x
})
sync.emitter.emit('foo', 'bar')
assert.equal(emitted, 'bar')
})
sync.emitter.emit('foo', 'bar')
assert.equal(emitted, 'bar')
})

@@ -97,3 +97,3 @@ import _ from 'lodash/fp.js'

const detectResync = (resyncCheckInterval = ms('1m')) => {
let resyncTimer: NodeJS.Timer
let resyncTimer: NodeJS.Timeout
const state = fsm(simpleStateTransistions, 'stopped', {

@@ -462,3 +462,3 @@ name: 'detectResync',

let timer: NodeJS.Timer
let timer: NodeJS.Timeout
// Check for a cached schema

@@ -479,5 +479,4 @@ let previousSchema = await getCachedCollectionSchema().then((schema) => {

const checkForSchemaChange = async () => {
const currentSchema = await getCollectionSchema(db).then(
maybeRemoveMetadata
)
const currentSchema =
await getCollectionSchema(db).then(maybeRemoveMetadata)
// Schemas are no longer the same

@@ -484,0 +483,0 @@ if (!_.isEqual(currentSchema, previousSchema)) {

@@ -1,47 +0,49 @@

import { test } from 'node:test'
import { describe, test } from 'node:test'
import assert from 'node:assert'
import { generatePipelineFromOmit } from './util.js'
test('should generate pipeline from omit with no dotted fields', async () => {
const pipeline = generatePipelineFromOmit(['documents', 'createdAt'])
assert.deepEqual(pipeline, [
{
$unset: [
'fullDocument.documents',
'updateDescription.updatedFields.documents',
'fullDocument.createdAt',
'updateDescription.updatedFields.createdAt',
],
},
])
})
describe('util', () => {
test('should generate pipeline from omit with no dotted fields', () => {
const pipeline = generatePipelineFromOmit(['documents', 'createdAt'])
assert.deepEqual(pipeline, [
{
$unset: [
'fullDocument.documents',
'updateDescription.updatedFields.documents',
'fullDocument.createdAt',
'updateDescription.updatedFields.createdAt',
],
},
])
})
test('should generate pipeline from omit with dotted fields', async () => {
const pipeline = generatePipelineFromOmit([
'documents.agenda.parsedText',
'documents.agenda.contentType',
'createdAt',
])
assert.deepEqual(pipeline, [
{
$unset: [
'fullDocument.documents.agenda.parsedText',
'updateDescription.updatedFields.documents.agenda.parsedText',
'fullDocument.documents.agenda.contentType',
'updateDescription.updatedFields.documents.agenda.contentType',
'fullDocument.createdAt',
'updateDescription.updatedFields.createdAt',
],
},
{
$set: {
'updateDescription.updatedFields': {
$arrayToObject: {
$filter: {
input: { $objectToArray: '$updateDescription.updatedFields' },
cond: {
$regexMatch: {
input: '$$this.k',
regex:
'^(?!documents\\.agenda\\.parsedText|documents\\.agenda\\.contentType)',
test('should generate pipeline from omit with dotted fields', () => {
const pipeline = generatePipelineFromOmit([
'documents.agenda.parsedText',
'documents.agenda.contentType',
'createdAt',
])
assert.deepEqual(pipeline, [
{
$unset: [
'fullDocument.documents.agenda.parsedText',
'updateDescription.updatedFields.documents.agenda.parsedText',
'fullDocument.documents.agenda.contentType',
'updateDescription.updatedFields.documents.agenda.contentType',
'fullDocument.createdAt',
'updateDescription.updatedFields.createdAt',
],
},
{
$set: {
'updateDescription.updatedFields': {
$arrayToObject: {
$filter: {
input: { $objectToArray: '$updateDescription.updatedFields' },
cond: {
$regexMatch: {
input: '$$this.k',
regex:
'^(?!documents\\.agenda\\.parsedText|documents\\.agenda\\.contentType)',
},
},

@@ -53,4 +55,4 @@ },

},
},
])
])
})
})
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc