New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

@targetprocess/balancer-core

Package Overview
Dependencies
Maintainers
0
Versions
37
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@targetprocess/balancer-core

Message balancer that allow to process messages grouped by partition key in a fair way

  • 1.5.3
  • latest
  • npm
  • Socket score

Version published
Weekly downloads
739
increased by13.34%
Maintainers
0
Weekly downloads
 
Created
Source

Balancer core

Usage sample

import {
  MessageBalancer,
  MessageCache,
  MessageStorage,
  Db,
  migrateDb,
  createMethodPerLevelLoggerAdapter,
  DefaultMessageBalancerDiagnosticsAdapter,
  DefaultMessageCacheDiagnosticsAdapter,
  DefaultMessageStorageDiagnosticsAdapter,
  ProcessMessageBatchResultItem
} from '@targetprocess/balancer-core'
import {makeLogger} from 'loggerism'
import {Pool} from 'pg'
import * as promClient from 'prom-client'

type MessageBalancerA = MessageBalancer<{data?: string; retry?: number}>

type MessageBalancerB = MessageBalancer<{data?: string}>

async function main() {
  const [balancerA, balancerB] = await createAndInitBalancers()

  await balancerA.storeMessage({
    partitionKey: 'partition#1',
    content: Buffer.alloc(128),
    properties: {data: 'some arbitrary data'}
  })

  await balancerB.storeMessage({
    partitionKey: 'partition#2',
    content: Buffer.alloc(128),
    properties: {data: 'some arbitrary data'}
  })

  await balancerA.processNextMessage(async message => {
    try {
      const {partitionGroup, partitionKey} = message
      console.log(`Processed message from partition "${partitionGroup}/${partitionKey}"`)
      return {type: 'Ok'}
    } catch {
      const properties = {
        ...message.properties,
        retry: (message.properties?.retry || 0) + 1
      }
      // Push message back with updated properties
      return {type: 'Requeue', update: {properties}}
    }
  })

  // Using batch API
  balancerB.processNextMessageBatch(
    async messages => {
      const results = [] as ProcessMessageBatchResultItem<{data?: string}>[]

      for (const message of messages) {
        try {
          const {partitionGroup, partitionKey} = message
          console.log(`Processed message from partition "${partitionGroup}/${partitionKey}"`)
          results.push({messageId: message.messageId, type: 'Ok'})
        } catch {
          // Push message back with no properties update
          results.push({messageId: message.messageId, type: 'Requeue'})
        }
      }

      return results
    },
    {maxBatchSize: 10}
  )
}

async function createAndInitBalancers(): Promise<[MessageBalancerA, MessageBalancerB]> {
  const pool = new Pool({
    connectionString: process.env.POSTGRES_CONNECTION_STRING,
    max: process.env.POSTGRES_POOL_MAX
  })

  pool.on('error', error => {
    // Handle error here
    console.error(error)
  })

  await migrateDb({pool})

  const logger = createMethodPerLevelLoggerAdapter(
    makeLogger({
      logLevel: process.env.LOG_LEVEL,
      handleExceptions: false
    })
  )

  const db = new Db({pool})
  const storage = new MessageStorage({
    db,
    diagnostics: new DefaultMessageStorageDiagnosticsAdapter({
      logger,
      createMessagesDurationMetric: summaryMetric('create_messages_duration_in_ms'),
      updateMessagesDurationMetric: summaryMetric('update_messages_duration_in_ms'),
      removeMessagesDurationMetric: summaryMetric('remove_messages_duration_in_ms'),
      readMessagesDurationMetric: summaryMetric('read_messages_duration_in_ms')
    })
  })
  const cache = new MessageCache({
    maxMessageSize: Number(process.env.MESSAGE_CACHE_MAX_MESSAGE_SIZE),
    maxSize: Number(process.env.MESSAGE_CACHE_MAX_SIZE),
    diagnostics: new DefaultMessageCacheDiagnosticsAdapter({
      logger,
      messageCountMetric: gaugeMetric('cache_message_count'),
      messageSizeMetric: gaugeMetric('cache_message_size')
    })
  })
  const balancerA = new MessageBalancer<{data?: string; retry?: number}>({
    partitionGroup: 'A',
    lockPartition: true,
    storage,
    cache,
    diagnostics: new DefaultMessageBalancerDiagnosticsAdapter({
      logger,
      endToEndMessageProcessingDurationMetric: summaryMetric('balancer_a_end_to_end_processing_duration_in_ms'),
      centrifugePartitionCountMetric: gaugeMetric('balancer_a_centrifuge_partition_count'),
      centrifugeMessageCountMetric: gaugeMetric('balancer_a_centrifuge_message_count')
    })
  })
  const balancerB = new MessageBalancer<{data?: string}>({
    partitionGroup: 'B',
    partitionSizeLimit: Number(process.env.PARTITION_SIZE_LIMIT),
    lockPartition: true,
    storage,
    cache,
    diagnostics: new DefaultMessageBalancerDiagnosticsAdapter({
      logger,
      endToEndMessageProcessingDurationMetric: summaryMetric('balancer_b_end_to_end_processing_duration_in_ms'),
      centrifugePartitionCountMetric: gaugeMetric('balancer_b_centrifuge_partition_count'),
      centrifugeMessageCountMetric: gaugeMetric('balancer_b_centrifuge_message_count')
    })
  })

  await balancerA.init()
  await balancerB.init()

  return [balancerA, balancerA]
}

function summaryMetric(name: string) {
  return new promClient.Summary({
    name,
    help: 'Write it yourself',
    percentiles: [0.1, 0.5, 0.9, 0.99],
    maxAgeSeconds: 10 * 60,
    ageBuckets: 10
  })
}

function gaugeMetric(name: string) {
  return new promClient.Gauge({
    name,
    help: 'Write it yourself'
  })
}

main()

FAQs

Package last updated on 04 Dec 2024

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc