Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

mongochangestream

Package Overview
Dependencies
Maintainers
1
Versions
59
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

mongochangestream

Sync MongoDB collections via change streams into any database.

  • 0.3.0
  • Source
  • npm
  • Socket score

Version published
Weekly downloads
50
decreased by-80.39%
Maintainers
1
Weekly downloads
 
Created
Source

Mongo Change Stream

Sync a MongoDB collection to any database. Requires Redis for state management. An initial scan is performed while change stream events are handled. In order to prevent a potential race condition see the strategies section below.

If the inital scan doesn't complete for any reason (e.g., server restart) the scan will resume where it left off. This is deterministic since the collection scan is sorted by _id. Change streams will likewise resume from the last resume token upon server restarts. See the official MongoDB docs for more information on change stream resumption:

https://www.mongodb.com/docs/manual/changeStreams/#std-label-change-stream-resume

WARNING: If the Node process is stopped prior to receiving the initial change event for the collection there is a risk that changes to documents that took place while the server was restarting would be missed.

import { ChangeStreamDocument, MongoClient } from 'mongodb'
import { default as Redis } from 'ioredis'
import { initSync } from 'mongochangestream'

const redis = new Redis()

const mongoUrl = 'mongodb+srv://...'
const client = await MongoClient.connect(mongoUrl)
const db = client.db('someDb')
const coll = db.collection('someColl')

const processRecord = async (doc: ChangeStreamDocument) => {
  console.dir(doc, { depth: 10 })
}

// Sync collection
const sync = initSync(redis)
await sync.syncCollection(coll, processRecord)

Below are the available methods.

The processChangeStream method will never complete, but runInitialScan will complete once it has scanned all documents in the collection. runInitialScan batches records for efficiency.

The reset method will delete all relevant keys for a given collection in Redis.

import { ChangeStreamDocument, Collection, Document } from 'mongodb'

export type ProcessRecord = (
  doc: ChangeStreamDocument | ChangeStreamDocument[]
) => void | Promise<void>

const runInitialScan = async (
  collection: Collection,
  processRecord: ProcessRecord,
  batchSize = 100
): Promise<void> => ...

const processChangeStream = async (
  collection: Collection,
  processRecord: ProcessRecord,
  pipeline: Document[] = []
): Promise<void> => ...

const reset = async (collection: Collection): Promise<void> => ...

Change Stream Strategies

The idea behind these strategies is to prevent overwriting a document with an out-of-date version of the document. In order to prevent that scenario inserts must only succeed if the document doesn't already exist. Likewise, updates must be capable of inserting the full document if it doesn't already exist (i.e., perform a replace or an upsert).

The initial scan returns a simulated change event document with operationType set to insert. An actual update change event will include the field-level changes in addition to the full document after the change.

NOTE: Exceptions are not caught by this library. You must catch them in your processRecord callback and handle them accordingly. For example, an insert that fails due to a primary key already existing in the destination datastore.

Elasticsearch

Insert

POST /index/_create/id
...

Update

POST /index/_doc/id
...

Remove

DELETE /index/_doc/id

SQL (MySQL, CrateDB)

Insert

INSERT INTO table ...

Update

MySQL

INSERT INTO table ... ON DUPLICATE KEY UPDATE changedField = someValue

CrateDB

INSERT INTO table ... ON CONFLICT DO UPDATE SET changedField = someValue

Remove

DELETE FROM table WHERE id = someId

MongoDB

Insert

db.collection('someColl').insertOne(...)

Update

db.collection('someColl').replaceOne({_id: ObjectId(...)}, ..., {upsert: true})

Remove

db.collection('someColl').deleteOne({_id: ObjectId(...)})

Keywords

FAQs

Package last updated on 05 Aug 2022

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc