![Introducing Enhanced Alert Actions and Triage Functionality](https://cdn.sanity.io/images/cgdhsj6q/production/fe71306d515f85de6139b46745ea7180362324f0-2530x946.png?w=800&fit=max&auto=format)
Product
Introducing Enhanced Alert Actions and Triage Functionality
Socket now supports four distinct alert actions instead of the previous two, and alert triaging allows users to override the actions taken for all individual alerts.
mongochangestream
Advanced tools
Changelog
0.26.1
Document
type should refer to type from mongodb
not DOM.Readme
Sync a MongoDB collection to any database. Requires Redis for state management. An initial scan is performed while change stream events are handled. In order to prevent a potential race condition see the strategies section below.
If the inital scan doesn't complete for any reason (e.g., server restart) the scan
will resume where it left off. This is deterministic since the collection scan is sorted
by _id
. Change streams will likewise resume from the last resume token upon server
restarts. See the official MongoDB docs for more information on change stream resumption:
https://www.mongodb.com/docs/manual/changeStreams/#std-label-change-stream-resume
WARNING: If the Node process is stopped prior to receiving the initial change event for the collection there is a risk that changes to documents that took place while the server was restarting would be missed.
This library uses debug
. To enable you can do something like:
DEBUG=mongochangestream node myfile.js
import { ChangeStreamDocument, MongoClient } from 'mongodb'
import { default as Redis } from 'ioredis'
import { initSync } from 'mongochangestream'
const redis = new Redis()
const mongoUrl = 'mongodb+srv://...'
const client = await MongoClient.connect(mongoUrl)
const db = client.db('someDb')
const coll = db.collection('someColl')
const processRecord = async (doc: ChangeStreamDocument) => {
console.dir(doc, { depth: 10 })
}
const processRecords = async (docs: ChangeStreamInsertDocument[]) => {
console.dir(docs, { depth: 10 })
}
// Sync collection
const sync = initSync(redis, coll)
const initialScan = await sync.runInitialScan(processRecords)
initialScan.start()
// Process change stream
const changeStream = await sync.processChangeStream(processRecord)
changeStream.start()
setTimeout(changeStream.stop, 30000)
// Detect schema changes and ignore metadata fields (i.e., title and description)
const schemaChange = await sync.detectSchemaChange(db, {
shouldRemoveMetadata: true,
})
schemaChange.start()
schemaChange.emitter.on('change', () => {
initialScan.stop()
changeStream.stop()
})
Below are the available methods.
The processChangeStream
method will never complete, but runInitialScan
will complete
once it has scanned all documents in the collection. runInitialScan
batches records for
efficiency.
The reset
method will delete all relevant keys for a given collection in Redis.
import { ChangeStreamDocument, Collection, Document } from 'mongodb'
export type ProcessRecord = (doc: ChangeStreamDocument) => void | Promise<void>
export type ProcessRecords = (
doc: ChangeStreamInsertDocument[]
) => void | Promise<void>
const runInitialScan = async (
processRecords: ProcessRecords,
options: QueueOptions & ScanOptions = {}
)
const processChangeStream = async (
processRecord: ProcessRecord,
options: ChangeStreamOptions = {}
)
const detectSchemaChange = async (db: Db, options: ChangeOptions = {})
Sometimes things stop working and a restart seems to fix the issue. In order
to automate this process you can pass maintainHealth: true
in the options
for runInitialScan
and processChangeStream
. This will run a health check
every healthCheckInterval
(defaults to 1m) and call restart
if necessary.
The idea behind these strategies is to prevent overwriting a document with an out-of-date version of the document. In order to prevent that scenario inserts must only succeed if the document doesn't already exist. Likewise, updates must be capable of inserting the full document if it doesn't already exist (i.e., perform a replace or an upsert).
The initial scan returns a simulated change event document with operationType
set to insert
. An actual update change event will include the field-level changes
in addition to the full document after the change.
NOTE: Exceptions are not caught by this library. You must catch them in your
processRecord
callback and handle them accordingly. For example, an insert
that fails due to a primary key already existing in the destination datastore.
Insert
POST /index/_create/id
...
Update
POST /index/_doc/id
...
Remove
DELETE /index/_doc/id
Insert
INSERT INTO table ...
Update
MySQL
INSERT INTO table ... ON DUPLICATE KEY UPDATE changedField = someValue
CrateDB
INSERT INTO table ... ON CONFLICT DO UPDATE SET changedField = someValue
Remove
DELETE FROM table WHERE id = someId
Insert
db.collection('someColl').insertOne(...)
Update
db.collection('someColl').replaceOne({_id: ObjectId(...)}, ..., {upsert: true})
Remove
db.collection('someColl').deleteOne({_id: ObjectId(...)})
FAQs
Sync MongoDB collections via change streams into any database.
We found that mongochangestream demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Product
Socket now supports four distinct alert actions instead of the previous two, and alert triaging allows users to override the actions taken for all individual alerts.
Security News
Polyfill.io has been serving malware for months via its CDN, after the project's open source maintainer sold the service to a company based in China.
Security News
OpenSSF is warning open source maintainers to stay vigilant against reputation farming on GitHub, where users artificially inflate their status by manipulating interactions on closed issues and PRs.