Product
Socket Now Supports uv.lock Files
Socket now supports uv.lock files to ensure consistent, secure dependency resolution for Python projects and enhance supply chain security.
@synonymdev/blocktank-worker2
Advanced tools
Microservice module based on Grenache DHT and AMPQlib RabbitMQ messages. Written in Typescript, supports Javascript.
Run DHT for service discovery
npm i -g grenache-grape # Only once
grape --dp 20001 --aph 30001 --bn '127.0.0.1:20002'
grape --dp 20002 --aph 40001 --bn '127.0.0.1:20001'
Run RabbitMQ for events
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 heidiks/rabbitmq-delayed-message-exchange:3.10.2-management
Open the dashboard http://localhost:15672/ and login with guest/guest.
A Worker consists of
import { Worker, WorkerImplementation, waitOnSigint, GrenacheClient } from '@synonymdev/blocktank-worker2';
const client = new GrenacheClient()
class HelloWorldWorkerImplementation extends WorkerImplementation {
/**
* Every method defined in here can be called by other workers/clients.
*/
async helloWorld(name1: string, name2: string) {
return `Hello ${name1} and ${name2}`;
}
async callOtherWorkerUsdToBtc(usd: number) {
const exchangeRate = client.encapsulateWorker('exchange_rate') // Get exchangeRate worker
const btcUsd = await exchangeRate.getRate("BTCUSD") // Call method on exchangeRate worker.
console.log('Current BTCUSD price is', btcUsd)
// Current BTCUSD price is $30,000
return usd/btcUsd
}
}
const runner = new Worker(new HelloWorldWorkerImplementation(), {
name: 'HelloWorldService', // Name of the worker.
})
try {
await runner.start();
await waitOnSigint() // Wait on Ctrl+C
} finally {
await runner.stop()
}
callbackSupport: true
.Error
s.constructor(worker, config?)
worker
: WorkerImplementationconfig?
GrenacheServerConfig
name?
string Name of the worker. Announced on DHT. Used to name RabbitMQ queues. Default: Random name.grapeUrl?
string URL to the grape DHT. Default: http://127.0.0.1:30001
.port?
integer Server port. Default: Random port between 10,000 and 40,000.callbackSupport?
boolean Allows WorkerImplementation functions to be written with callbacks. Disables the method argument count check. Default: falseconnection?
amp.Connection RabbitMQ connection. Mutually exclusive with amqpUrl
.amqpUrl
string RabbitMQ connection URL. Mutually exclusive with connection
. Default: amqp://localhost:5672
.namespace
string RabbitMQ namespace. All objects like exchanges, queues will start with {namespace}.
. Default: blocktank
async start() Starts the worker. Listens on given port.
async stop() Stops the worker. Graceful shutdown.
options?
WorkerStopOptions
cleanupRabbitMq?
boolean Deletes all RabbitMQ queues and exchanges that were created by this worker. Used for testing. Default: false.GrenacheClient
allows to call other workers without exposing your own server.
constructor(grapeUrl?)
grapeUrl
string URL to the DHT. Default: http://127.0.0.1:30001
.import { GrenacheClient } from '@synonymdev/blocktank-worker2'
const client = new GrenaceClient()
// Method 1 - Call function
const method = 'helloWorld';
const args = ['Sepp', 'Pirmin'];
const response1 = await client.call('HelloWorldService', method, args)
console.log(response1) // Hello Sepp and Pirmin
async call(workerName, method, args?, opts?) call method of another worker. Returns the worker response.
workerName
string Name of the worker you want to call.method
string Method name you want to call.args?
any[] List of arguments. Default: [].opts?
: Partial GrenacheClientCallOptions
timeoutMs?
Request timeout in milliseconds. Default: 60,000.encapsulateWorker(workerName) Conveninence wrapper. Returns a worker object that can be called with any worker method.
// Example
const helloWorldService = client.encapsulateWorker('HelloWorldService')
const response = await helloWorldService.helloWorld('Sepp', 'Pirmin')
// Hello Sepp and Pirmin
RabbitPublisher
and RabbitConsumer
manage all events around the worker.
Events work on a "at least once" delivery basis. If an error is thrown, the even is retried with an exponential backoff.
Checkout RabbitMQ docs to get an overview on the exchange/queue structure.
Consume events from RabbitMQ.
const myServiceName = 'MyServiceName'
const consumer = new RabbitConsumer(myServiceName)
await consumer.init()
try {
await consumer.onMessage('HelloWorldService', 'invoicePaid', async event => {
console.log('HelloWorldService.invoicePaid event:', event)
})
await waitOnCtrlC()
} finally {
await consumer.stop() // Graceful shutdown
}
async init() Initializes the consumer. Creates the RabbitMQ exchanges and queues.
async stop(cleanupRabbitMq?, timeoutMs?) Stops consuming messages. Graceful shutdown.
cleanupRabbitMq?
boolean Deletes all RabbitMQ queues and exchanges that were created by this worker. Used for testing. Default: false.timeoutMs?
number Timeout in milliseconds to wait on currently consumed messages to finish. Default: 20,000.async onMessage(sourceWorkerName, eventName, callback, options?)
sourceWorkerName
string Name of the worker that emits the event.eventName
string Name of the event.callback
function Callback function that is called when the event is received.
options?
RabbitConsumeOptions options for this specific event type.
backoffFunction
: (attempt: number) => number Function that returns the backoff time in milliseconds. Default: exponential backoff.Important properties of onMessage
Publish events without a consumer.
constructor(myWorkerName, options?)
myWorkerName
string Name of the worker that emits the event.options?
RabbitConnectionOptions
connection?
amp.Connection RabbitMQ connection. Mutually exclusive with amqpUrl
.amqpUrl
string RabbitMQ connection URL. Mutually exclusive with connection
. Default: amqp://localhost:5672
.namespace
string RabbitMQ namespace. All objects like exchanges, queues will start with {namespace}.
. Default: blocktank
async init() Initializes the producer. Creates the main RabbitMq Exchange.
async stop(cleanupRabbitMq?) Stops the connection.
cleanupRabbitMq?
boolean Deletes all RabbitMQ queues and exchanges that were created by this worker. Used for testing. Default: false.async publish(eventName: string, data: any) Publishes an event.
eventName
string Name of the event.data
any Any json serializable data that is sent with the event.The Worker
and RabbitConsumer
take a logger
option.
logger
pino.Logger | boolean Default: false. If set to true, a default logger is used. If set to false, no logging is done. If set to a pino logger, this logger is used.
Experimental The goal of
MongoDatabase
is to provide convenient methods for testing. This class is not mature though so it might change in the future.
Run mongo locally:
docker run -it --rm -p 27017:27017 --name ma-mongo-db mongo:latest
Checkout the MongoDB Compass if you need a UI.
Define entities in its own folder:
import { Entity, PrimaryKey, Property, SerializedPrimaryKey } from "@mikro-orm/core";
import {randomUUID} from 'crypto'
import { ObjectId } from "@mikro-orm/mongodb";
@Entity()
export class SampleAuthor {
@PrimaryKey({ name: "_id" })
id: string = randomUUID();
@Property()
name!: string;
}
Create a mikro-orm.config.ts
file to configure your database connection.
import { MikroORMOptions, ReflectMetadataProvider } from '@mikro-orm/core';
import { MongoDriver } from '@mikro-orm/mongodb';
import entities from './1_database/entities';
import { AppConfig } from './0_config/AppConfig';
const appConfig = AppConfig.get()
const config: Partial<MikroORMOptions<MongoDriver>> = {
entities: entities,
clientUrl: appConfig.dbUrl,
metadataProvider: ReflectMetadataProvider,
debug: false,
type: 'mongo',
migrations: {
path: 'dist/1_database/migrations',
pathTs: 'src/1_database/migrations',
transactional: false
}
};
export default config;
import {MongoDatabase} from '@synonymdev/blocktank-worker2';
import config from './mikro-orm.config.ts'
try {
await MongoDatabase.connect(config)
await MongoDatabase.migrateUp()
const em = MongoDatabase.createEntityManager()
const author = new SampleAuthor()
author.name = 'Sepp'
await em.persistAndFlush(author)
} finally {
await MongoDatabase.close()
}
MongoDatabase provides a InMemory database for testing. Checkout the example MongoDatabase.test.ts for more details on how to use the inMemory database to run independent tests.
MikroORM comes with a cli. To use the cli, add this config to your package.json:
"mikro-orm": {
"useTsNode": true,
"configPaths": [
"./src/mikro-orm.config.ts",
"./dist/mikro-orm.config.js"
]
},
npx mikro-orm migration:create
to create a new migration.npx mikro-orm migration:up
to run migrations.npm run test
. Checkout vscode jest to selectively run tests.Make tests independent + cleanup RabbitMQ:
import { Worker } from "@synonymdev/blocktank-worker2";
// Use a random RabbitMQ namespace to avoid any conflicts between tests:
const runner = new Worker(worker, {
namespace: Worker.randomNamespace()
});
try {
await runner.start()
// Do your tests here
} finally {
// Cleanup all existing rabbitMQ objects
await runner.stop({cleanupRabbitMq: true})
}
package.json
.CHANGELOG.md
.git tag v0.1.0
.git push origin v0.1.0
.npm publish
.FAQs
Unknown package
We found that @synonymdev/blocktank-worker2 demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 6 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Product
Socket now supports uv.lock files to ensure consistent, secure dependency resolution for Python projects and enhance supply chain security.
Research
Security News
Socket researchers have discovered multiple malicious npm packages targeting Solana private keys, abusing Gmail to exfiltrate the data and drain Solana wallets.
Security News
PEP 770 proposes adding SBOM support to Python packages to improve transparency and catch hidden non-Python dependencies that security tools often miss.