@highoutput/amqp
Advanced tools
Comparing version 0.0.14 to 0.0.15
@@ -17,10 +17,11 @@ import Client, { ClientOptions } from './lib/client'; | ||
private workers; | ||
private clients; | ||
constructor(options?: Partial<AmqpOptions>); | ||
createClient<TInput extends any[] = any[], TOutput = any>(scope: string, options?: ClientOptions): Promise<{ | ||
createClient<TInput extends any[] = any[], TOutput = any>(queue: string, options?: ClientOptions): Promise<{ | ||
(...args: TInput): Promise<TOutput | null>; | ||
client: Client<TInput, TOutput>; | ||
}>; | ||
createWorker<TInput extends any[] = any[], TOutput = any>(scope: string, handler: (...args: TInput) => Promise<TOutput>, options?: WorkerOptions): Promise<Worker<TInput, TOutput>>; | ||
createWorker<TInput extends any[] = any[], TOutput = any>(queue: string, handler: (...args: TInput) => Promise<TOutput>, options?: WorkerOptions): Promise<Worker<TInput, TOutput>>; | ||
stop(): Promise<void>; | ||
} | ||
//# sourceMappingURL=index.d.ts.map |
@@ -16,2 +16,3 @@ "use strict"; | ||
this.workers = []; | ||
this.clients = []; | ||
this.options = ramda_1.default.mergeDeepLeft(options || {}, { | ||
@@ -38,11 +39,12 @@ host: 'localhost', | ||
} | ||
async createClient(scope, options) { | ||
const client = new client_1.default(this.connection, `${this.options.prefix || ''}${scope}`, options); | ||
async createClient(queue, options) { | ||
const client = new client_1.default(this.connection, `${this.options.prefix || ''}${queue}`, options); | ||
await client.start(); | ||
const func = (...args) => client.send(...args); | ||
func.client = client; | ||
this.clients.push(client); | ||
return func; | ||
} | ||
async createWorker(scope, handler, options) { | ||
const worker = new worker_1.default(this.connection, `${this.options.prefix || ''}${scope}`, handler, options); | ||
async createWorker(queue, handler, options) { | ||
const worker = new worker_1.default(this.connection, `${this.options.prefix || ''}${queue}`, handler, options); | ||
await worker.start(); | ||
@@ -53,3 +55,6 @@ this.workers.push(worker); | ||
async stop() { | ||
await Promise.all(this.workers.map((worker) => worker.stop())); | ||
await Promise.all([ | ||
Promise.all(this.workers.map((worker) => worker.stop())), | ||
Promise.all(this.clients.map((client) => client.stop())), | ||
]); | ||
this.connection.close(); | ||
@@ -56,0 +61,0 @@ await new Promise((resolve) => { |
@@ -10,10 +10,10 @@ import { Connection } from 'rhea'; | ||
private readonly connection; | ||
private readonly scope; | ||
private readonly queue; | ||
private options; | ||
private sender; | ||
private receiver; | ||
private id; | ||
readonly id: string; | ||
private readonly callbacks; | ||
private asyncGroup; | ||
constructor(connection: Connection, scope: string, options?: Partial<ClientOptions>); | ||
constructor(connection: Connection, queue: string, options?: Partial<ClientOptions>); | ||
send(...args: TInput): Promise<TOutput | null>; | ||
@@ -20,0 +20,0 @@ start(): Promise<void>; |
@@ -14,5 +14,5 @@ "use strict"; | ||
class Client { | ||
constructor(connection, scope, options) { | ||
constructor(connection, queue, options) { | ||
this.connection = connection; | ||
this.scope = scope; | ||
this.queue = queue; | ||
this.sender = null; | ||
@@ -77,3 +77,5 @@ this.receiver = null; | ||
target: { | ||
address: this.scope, | ||
address: `queue://${this.queue}`, | ||
durable: 2, | ||
expiry_policy: 'never', | ||
}, | ||
@@ -83,3 +85,3 @@ }), | ||
source: { | ||
address: `${this.scope}:${this.id}`, | ||
address: `queue://${this.queue}/${this.id}`, | ||
dynamic: true, | ||
@@ -86,0 +88,0 @@ }, |
@@ -0,2 +1,4 @@ | ||
/// <reference types="node" /> | ||
import { Connection } from 'rhea'; | ||
import { EventEmitter } from 'events'; | ||
export declare type WorkerOptions = { | ||
@@ -7,5 +9,5 @@ concurrency: number; | ||
}; | ||
export default class Worker<TInput extends any[] = any[], TOutput = any> { | ||
export default class Worker<TInput extends any[] = any[], TOutput = any> extends EventEmitter { | ||
private readonly connection; | ||
private readonly scope; | ||
private readonly queue; | ||
private readonly handler; | ||
@@ -16,3 +18,3 @@ private options; | ||
private asyncGroup; | ||
constructor(connection: Connection, scope: string, handler: (...args: TInput) => Promise<TOutput>, options?: Partial<WorkerOptions>); | ||
constructor(connection: Connection, queue: string, handler: (...args: TInput) => Promise<TOutput>, options?: Partial<WorkerOptions>); | ||
private getSender; | ||
@@ -19,0 +21,0 @@ private handleMessage; |
@@ -9,8 +9,10 @@ "use strict"; | ||
const serialize_error_1 = require("serialize-error"); | ||
const events_1 = require("events"); | ||
const logger_1 = __importDefault(require("./logger")); | ||
const util_1 = require("./util"); | ||
class Worker { | ||
constructor(connection, scope, handler, options) { | ||
class Worker extends events_1.EventEmitter { | ||
constructor(connection, queue, handler, options) { | ||
super(); | ||
this.connection = connection; | ||
this.scope = scope; | ||
this.queue = queue; | ||
this.handler = handler; | ||
@@ -70,3 +72,3 @@ this.senders = new Map(); | ||
source: { | ||
address: this.scope, | ||
address: `queue://${this.queue}`, | ||
durable: 2, | ||
@@ -78,3 +80,2 @@ expiry_policy: 'never', | ||
}); | ||
this.receiver.add_credit(this.options.concurrency); | ||
this.receiver.on('message', async (context) => { | ||
@@ -85,2 +86,4 @@ await this.asyncGroup.add(this.handleMessage(context)); | ||
}); | ||
this.receiver.add_credit(this.options.concurrency); | ||
this.emit('start'); | ||
} | ||
@@ -99,2 +102,3 @@ async stop() { | ||
this.senders.clear(); | ||
this.emit('stop'); | ||
} | ||
@@ -101,0 +105,0 @@ } |
{ | ||
"name": "@highoutput/amqp", | ||
"version": "0.0.14", | ||
"version": "0.0.15", | ||
"description": "A simplified abstraction of the AMQP 1.0 protocol", | ||
@@ -18,2 +18,3 @@ "keywords": [ | ||
"test": "TS_NODE_PROJECT=tsconfig.json TS_NODE_FILES=true cucumber-js -p default", | ||
"test:only": "TS_NODE_PROJECT=tsconfig.json TS_NODE_FILES=true cucumber-js -p only", | ||
"clean": "rimraf build/", | ||
@@ -52,3 +53,3 @@ "build": "npm run clean && tsc --project tsconfig.json", | ||
}, | ||
"gitHead": "3732cb2574dd1f18903bf623aea7406438f10a76" | ||
"gitHead": "64b16b812d81ac4d26dfdc102d77e8d2bb85eb6c" | ||
} |
@@ -27,2 +27,4 @@ /* eslint-disable @typescript-eslint/no-non-null-assertion, @typescript-eslint/camelcase */ | ||
private clients: Client[] = []; | ||
public constructor(options?: Partial<AmqpOptions>) { | ||
@@ -59,3 +61,3 @@ this.options = R.mergeDeepLeft(options || {}, { | ||
public async createClient<TInput extends any[] = any[], TOutput = any>( | ||
scope: string, | ||
queue: string, | ||
options?: ClientOptions, | ||
@@ -65,5 +67,6 @@ ) { | ||
this.connection, | ||
`${this.options.prefix || ''}${scope}`, | ||
`${this.options.prefix || ''}${queue}`, | ||
options, | ||
); | ||
await client.start(); | ||
@@ -74,2 +77,4 @@ | ||
this.clients.push(client); | ||
return func; | ||
@@ -79,3 +84,3 @@ } | ||
public async createWorker<TInput extends any[] = any[], TOutput = any>( | ||
scope: string, | ||
queue: string, | ||
handler: (...args: TInput) => Promise<TOutput>, | ||
@@ -86,6 +91,7 @@ options?: WorkerOptions, | ||
this.connection, | ||
`${this.options.prefix || ''}${scope}`, | ||
`${this.options.prefix || ''}${queue}`, | ||
handler, | ||
options, | ||
); | ||
await worker.start(); | ||
@@ -99,3 +105,6 @@ | ||
public async stop() { | ||
await Promise.all(this.workers.map((worker) => worker.stop())); | ||
await Promise.all([ | ||
Promise.all(this.workers.map((worker) => worker.stop())), | ||
Promise.all(this.clients.map((client) => client.stop())), | ||
]); | ||
@@ -102,0 +111,0 @@ this.connection.close(); |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
48740
32
702