blocktank-worker2
Microservice module based on Grenache DHT and AMPQlib RabbitMQ messages. Written in Typescript, supports Javascript.
Usage
Run DHT for service discovery
npm i -g grenache-grape
grape --dp 20001 --aph 30001 --bn '127.0.0.1:20002'
grape --dp 20002 --aph 40001 --bn '127.0.0.1:20001'
Run RabbitMQ for events
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 heidiks/rabbitmq-delayed-message-exchange:3.10.2-management
Open the dashboard http://localhost:15672/ and login with guest/guest.
Worker
A Worker consists of
- a server that listens on method calls.
- RabbitMQ event publisher (fanout exchange).
import { Worker, WorkerImplementation, waitOnSigint, GrenacheClient } from '@synonymdev/blocktank-worker2';
const client = new GrenacheClient()
class HelloWorldWorkerImplementation extends WorkerImplementation {
async helloWorld(name1: string, name2: string) {
return `Hello ${name1} and ${name2}`;
}
async callOtherWorkerUsdToBtc(usd: number) {
const exchangeRate = client.encapsulateWorker('exchange_rate')
const btcUsd = await exchangeRate.getRate("BTCUSD")
console.log('Current BTCUSD price is', btcUsd)
return usd/btcUsd
}
}
const runner = new Worker(new HelloWorldWorkerImplementation(), {
name: 'HelloWorldService',
})
try {
await runner.start();
await waitOnSigint()
} finally {
await runner.stop()
}
Class WorkerImplementation
- Supports, async and sync and callback functions.
- If callback functions are used, initialize the Worker with
callbackSupport: true
.
- Automatically returns
Error
s.
Class Worker
constructor(worker, config?)
worker
: WorkerImplementationconfig?
GrenacheServerConfig
name?
string Name of the worker. Announced on DHT. Used to name RabbitMQ queues. Default: Random name.grapeUrl?
string URL to the grape DHT. Default: http://127.0.0.1:30001
.port?
integer Server port. Default: Random port between 10,000 and 40,000.callbackSupport?
boolean Allows WorkerImplementation functions to be written with callbacks. Disables the method argument count check. Default: falseconnection?
amp.Connection RabbitMQ connection. Mutually exclusive with amqpUrl
.amqpUrl
string RabbitMQ connection URL. Mutually exclusive with connection
. Default: amqp://localhost:5672
.namespace
string RabbitMQ namespace. All objects like exchanges, queues will start with {namespace}.
. Default: blocktank
async start() Starts the worker. Listens on given port.
async stop() Stops the worker. Graceful shutdown.
options?
WorkerStopOptions
cleanupRabbitMq?
boolean Deletes all RabbitMQ queues and exchanges that were created by this worker. Used for testing. Default: false.
Class GrenacheClient
GrenacheClient
allows to call other workers without exposing your own server.
constructor(grapeUrl?)
grapeUrl
string URL to the DHT. Default: http://127.0.0.1:30001
.
import { GrenacheClient } from '@synonymdev/blocktank-worker2'
const client = new GrenaceClient()
const method = 'helloWorld';
const args = ['Sepp', 'Pirmin'];
const response1 = await client.call('HelloWorldService', method, args)
console.log(response1)
async call(workerName, method, args?, opts?) call method of another worker. Returns the worker response.
workerName
string Name of the worker you want to call.method
string Method name you want to call.args?
any[] List of arguments. Default: [].opts?
: Partial GrenacheClientCallOptions
timeoutMs?
Request timeout in milliseconds. Default: 60,000.
encapsulateWorker(workerName) Conveninence wrapper. Returns a worker object that can be called with any worker method.
const helloWorldService = client.encapsulateWorker('HelloWorldService')
const response = await helloWorldService.helloWorld('Sepp', 'Pirmin')
RabbitMQ / Events
RabbitPublisher
and RabbitConsumer
manage all events around the worker.
Events work on a "at least once" delivery basis. If an error is thrown, the even is retried with an exponential backoff.
Checkout RabbitMQ docs to get an overview on the exchange/queue structure.
Class RabbitConsumer
Consume events from RabbitMQ.
const myServiceName = 'MyServiceName'
const consumer = new RabbitConsumer(myServiceName)
await consumer.init()
try {
await consumer.onMessage('HelloWorldService', 'invoicePaid', async event => {
console.log('HelloWorldService.invoicePaid event:', event)
})
await waitOnCtrlC()
} finally {
await consumer.stop()
}
async init() Initializes the consumer. Creates the RabbitMQ exchanges and queues.
async stop(cleanupRabbitMq?, timeoutMs?) Stops consuming messages. Graceful shutdown.
cleanupRabbitMq?
boolean Deletes all RabbitMQ queues and exchanges that were created by this worker. Used for testing. Default: false.timeoutMs?
number Timeout in milliseconds to wait on currently consumed messages to finish. Default: 20,000.
async onMessage(sourceWorkerName, eventName, callback, options?)
sourceWorkerName
string Name of the worker that emits the event.eventName
string Name of the event.callback
function Callback function that is called when the event is received.
- Type: (msg: RabbitEventMessage) => any
- May be async or sync.
options?
RabbitConsumeOptions options for this specific event type.
backoffFunction
: (attempt: number) => number Function that returns the backoff time in milliseconds. Default: exponential backoff.
Important properties of onMessage
- At-Least-Once-Delivery: Messages can be delivered multiple times and potentially in a different order.
- Retries: If an error is thrown, the event is retried with an exponential backoff. The backoff function can be customized.
Class RabbitPublisher
Publish events without a consumer.
constructor(myWorkerName, options?)
myWorkerName
string Name of the worker that emits the event.options?
RabbitConnectionOptions
connection?
amp.Connection RabbitMQ connection. Mutually exclusive with amqpUrl
.amqpUrl
string RabbitMQ connection URL. Mutually exclusive with connection
. Default: amqp://localhost:5672
.namespace
string RabbitMQ namespace. All objects like exchanges, queues will start with {namespace}.
. Default: blocktank
async init() Initializes the producer. Creates the main RabbitMq Exchange.
async stop(cleanupRabbitMq?) Stops the connection.
cleanupRabbitMq?
boolean Deletes all RabbitMQ queues and exchanges that were created by this worker. Used for testing. Default: false.
async publish(eventName: string, data: any) Publishes an event.
eventName
string Name of the event.data
any Any json serializable data that is sent with the event.
Logging
The Worker
and RabbitConsumer
take a logger
option.
logger
pino.Logger | boolean Default: false. If set to true, a default logger is used. If set to false, no logging is done. If set to a pino logger, this logger is used.
MongoDatabase
Experimental The goal of MongoDatabase
is to provide convenient methods for testing. This class is not mature though so it might change in the future.
Run mongo locally:
docker run -it --rm -p 27017:27017 --name ma-mongo-db mongo:latest
Checkout the MongoDB Compass if you need a UI.
Define entities in its own folder:
import { Entity, PrimaryKey, Property, SerializedPrimaryKey } from "@mikro-orm/core";
import {randomUUID} from 'crypto'
import { ObjectId } from "@mikro-orm/mongodb";
@Entity()
export class SampleAuthor {
@PrimaryKey({ name: "_id" })
id: string = randomUUID();
@Property()
name!: string;
}
Create a mikro-orm.config.ts
file to configure your database connection.
import { MikroORMOptions, ReflectMetadataProvider } from '@mikro-orm/core';
import { MongoDriver } from '@mikro-orm/mongodb';
import entities from './1_database/entities';
import { AppConfig } from './0_config/AppConfig';
const appConfig = AppConfig.get()
const config: Partial<MikroORMOptions<MongoDriver>> = {
entities: entities,
clientUrl: appConfig.dbUrl,
metadataProvider: ReflectMetadataProvider,
debug: false,
type: 'mongo',
migrations: {
path: 'dist/1_database/migrations',
pathTs: 'src/1_database/migrations',
transactional: false
}
};
export default config;
- See this mikro-orm.config.ts for an example config.
- Checkout the mikro-orm docs for more info to set up the ORM.
- You may choose to use another ORM. In that case, make sure you manage test integrations yourself.
- Checkout this example Entity SampleAuthor.ts.
import {MongoDatabase} from '@synonymdev/blocktank-worker2';
import config from './mikro-orm.config.ts'
try {
await MongoDatabase.connect(config)
await MongoDatabase.migrateUp()
const em = MongoDatabase.createEntityManager()
const author = new SampleAuthor()
author.name = 'Sepp'
await em.persistAndFlush(author)
} finally {
await MongoDatabase.close()
}
MongoDatabase provides a InMemory database for testing. Checkout the example MongoDatabase.test.ts for more details on how to use the inMemory database to run independent tests.
CLI & Migrations
MikroORM comes with a cli. To use the cli, add this config to your package.json:
"mikro-orm": {
"useTsNode": true,
"configPaths": [
"./src/mikro-orm.config.ts",
"./dist/mikro-orm.config.js"
]
},
- Use
npx mikro-orm migration:create
to create a new migration. - Use
npx mikro-orm migration:up
to run migrations.
Development
Testing
- Test:
npm run test
. Checkout vscode jest to selectively run tests.
Make tests independent + cleanup RabbitMQ:
import { Worker } from "@synonymdev/blocktank-worker2";
const runner = new Worker(worker, {
namespace: Worker.randomNamespace()
});
try {
await runner.start()
} finally {
await runner.stop({cleanupRabbitMq: true})
}
Versioning
- Increase version in
package.json
. - Add changes to
CHANGELOG.md
. - Commit changes.
- Tag new version:
git tag v0.1.0
. - Push tag
git push origin v0.1.0
. - Publish to npm:
npm publish
.