@highoutput/amqp
Advanced tools
Comparing version 0.0.15 to 0.1.0
import Client, { ClientOptions } from './lib/client'; | ||
import Worker, { WorkerOptions } from './lib/worker'; | ||
export { Client, Worker }; | ||
import Publisher, { PublisherOptions } from './lib/publisher'; | ||
import Subscriber, { SubscriberOptions } from './lib/subscriber'; | ||
export { Client, Worker, Publisher, Subscriber, ClientOptions, WorkerOptions, PublisherOptions, SubscriberOptions, }; | ||
export declare type AmqpOptions = { | ||
@@ -18,2 +20,4 @@ host: string; | ||
private clients; | ||
private publishers; | ||
private subscribers; | ||
constructor(options?: Partial<AmqpOptions>); | ||
@@ -25,4 +29,6 @@ createClient<TInput extends any[] = any[], TOutput = any>(queue: string, options?: ClientOptions): Promise<{ | ||
createWorker<TInput extends any[] = any[], TOutput = any>(queue: string, handler: (...args: TInput) => Promise<TOutput>, options?: WorkerOptions): Promise<Worker<TInput, TOutput>>; | ||
createPublisher<TInput extends any[] = any[]>(topic: string, options?: PublisherOptions): Promise<Publisher<TInput>>; | ||
createSubscriber<TInput extends any[] = any[]>(topic: string, handler: (...args: TInput) => Promise<void>, options?: SubscriberOptions): Promise<Subscriber<TInput>>; | ||
stop(): Promise<void>; | ||
} | ||
//# sourceMappingURL=index.d.ts.map |
@@ -8,2 +8,3 @@ "use strict"; | ||
const ramda_1 = __importDefault(require("ramda")); | ||
const uuid_1 = __importDefault(require("uuid")); | ||
const logger_1 = __importDefault(require("./lib/logger")); | ||
@@ -14,6 +15,12 @@ const client_1 = __importDefault(require("./lib/client")); | ||
exports.Worker = worker_1.default; | ||
const publisher_1 = __importDefault(require("./lib/publisher")); | ||
exports.Publisher = publisher_1.default; | ||
const subscriber_1 = __importDefault(require("./lib/subscriber")); | ||
exports.Subscriber = subscriber_1.default; | ||
class Amqp { | ||
constructor(options) { | ||
this.workers = []; | ||
this.clients = []; | ||
this.workers = new Map(); | ||
this.clients = new Map(); | ||
this.publishers = new Map(); | ||
this.subscribers = new Map(); | ||
this.options = ramda_1.default.mergeDeepLeft(options || {}, { | ||
@@ -45,3 +52,4 @@ host: 'localhost', | ||
func.client = client; | ||
this.clients.push(client); | ||
this.clients.set(client.id, client); | ||
client.once('stop', () => this.clients.delete(client.id)); | ||
return func; | ||
@@ -52,9 +60,29 @@ } | ||
await worker.start(); | ||
this.workers.push(worker); | ||
const id = uuid_1.default(); | ||
this.workers.set(id, worker); | ||
worker.once('stop', () => this.workers.delete(id)); | ||
return worker; | ||
} | ||
async createPublisher(topic, options) { | ||
const publisher = new publisher_1.default(this.connection, `${this.options.prefix || ''}${topic}`, options); | ||
await publisher.start(); | ||
const id = uuid_1.default(); | ||
this.publishers.set(id, publisher); | ||
publisher.once('stop', () => this.publishers.delete(id)); | ||
return publisher; | ||
} | ||
async createSubscriber(topic, handler, options) { | ||
const subscriber = new subscriber_1.default(this.connection, `${this.options.prefix || ''}${topic}`, handler, options); | ||
await subscriber.start(); | ||
const id = uuid_1.default(); | ||
this.subscribers.set(id, subscriber); | ||
subscriber.once('stop', () => this.subscribers.delete(id)); | ||
return subscriber; | ||
} | ||
async stop() { | ||
await Promise.all([ | ||
Promise.all(this.workers.map((worker) => worker.stop())), | ||
Promise.all(this.clients.map((client) => client.stop())), | ||
Promise.all(Array.from(this.workers.values()).map((worker) => worker.stop())), | ||
Promise.all(Array.from(this.clients.values()).map((client) => client.stop())), | ||
Promise.all(Array.from(this.publishers.values()).map((publisher) => publisher.stop())), | ||
Promise.all(Array.from(this.subscribers.values()).map((subscriber) => subscriber.stop())), | ||
]); | ||
@@ -61,0 +89,0 @@ this.connection.close(); |
@@ -0,2 +1,4 @@ | ||
/// <reference types="node" /> | ||
import { Connection } from 'rhea'; | ||
import { EventEmitter } from 'events'; | ||
export declare type ClientOptions = { | ||
@@ -8,3 +10,3 @@ timeout: string; | ||
}; | ||
export default class Client<TInput extends any[] = any[], TOutput = any> { | ||
export default class Client<TInput extends any[] = any[], TOutput = any> extends EventEmitter { | ||
private readonly connection; | ||
@@ -11,0 +13,0 @@ private readonly queue; |
@@ -11,6 +11,8 @@ "use strict"; | ||
const error_1 = __importDefault(require("@highoutput/error")); | ||
const events_1 = require("events"); | ||
const logger_1 = __importDefault(require("./logger")); | ||
const util_1 = require("./util"); | ||
class Client { | ||
class Client extends events_1.EventEmitter { | ||
constructor(connection, queue, options) { | ||
super(); | ||
this.connection = connection; | ||
@@ -84,3 +86,3 @@ this.queue = queue; | ||
source: { | ||
address: `queue://${this.queue}/${this.id}`, | ||
address: `temp-queue://${this.queue}/${this.id}`, | ||
dynamic: true, | ||
@@ -116,2 +118,3 @@ }, | ||
}); | ||
this.emit('start'); | ||
} | ||
@@ -126,2 +129,3 @@ async stop() { | ||
} | ||
this.emit('stop'); | ||
} | ||
@@ -128,0 +132,0 @@ } |
@@ -0,6 +1,8 @@ | ||
/// <reference types="node" /> | ||
import { Connection } from 'rhea'; | ||
import { EventEmitter } from 'events'; | ||
export declare type PublisherOptions = { | ||
serialize: boolean; | ||
}; | ||
export default class Publisher<TInput extends any[]> { | ||
export default class Publisher<TInput extends any[] = any[]> extends EventEmitter { | ||
private readonly connection; | ||
@@ -7,0 +9,0 @@ private readonly topic; |
@@ -7,6 +7,8 @@ "use strict"; | ||
const ramda_1 = __importDefault(require("ramda")); | ||
const events_1 = require("events"); | ||
const logger_1 = __importDefault(require("./logger")); | ||
const util_1 = require("./util"); | ||
class Publisher { | ||
class Publisher extends events_1.EventEmitter { | ||
constructor(connection, topic, options) { | ||
super(); | ||
this.connection = connection; | ||
@@ -39,2 +41,3 @@ this.topic = topic; | ||
}); | ||
this.emit('start'); | ||
} | ||
@@ -45,2 +48,3 @@ async stop() { | ||
} | ||
this.emit('stop'); | ||
} | ||
@@ -47,0 +51,0 @@ } |
@@ -0,2 +1,4 @@ | ||
/// <reference types="node" /> | ||
import { Connection } from 'rhea'; | ||
import { EventEmitter } from 'events'; | ||
export declare type SubscriberOptions = { | ||
@@ -6,3 +8,3 @@ concurrency: number; | ||
}; | ||
export default class Publisher<TInput extends any[]> { | ||
export default class Subscriber<TInput extends any[] = any[]> extends EventEmitter { | ||
private readonly connection; | ||
@@ -9,0 +11,0 @@ private readonly topic; |
@@ -8,6 +8,8 @@ "use strict"; | ||
const async_group_1 = __importDefault(require("@highoutput/async-group")); | ||
const events_1 = require("events"); | ||
const logger_1 = __importDefault(require("./logger")); | ||
const util_1 = require("./util"); | ||
class Publisher { | ||
class Subscriber extends events_1.EventEmitter { | ||
constructor(connection, topic, handler, options) { | ||
super(); | ||
this.connection = connection; | ||
@@ -42,4 +44,5 @@ this.topic = topic; | ||
address: `topic://${this.topic}`, | ||
dynamic: true, | ||
}, | ||
credit_window: 0, | ||
autoaccept: false, | ||
}); | ||
@@ -52,2 +55,3 @@ this.receiver.on('message', async (context) => { | ||
this.receiver.add_credit(this.options.concurrency); | ||
this.emit('start'); | ||
} | ||
@@ -59,5 +63,6 @@ async stop() { | ||
await this.asyncGroup.wait(); | ||
this.emit('stop'); | ||
} | ||
} | ||
exports.default = Publisher; | ||
exports.default = Subscriber; | ||
//# sourceMappingURL=subscriber.js.map |
{ | ||
"name": "@highoutput/amqp", | ||
"version": "0.0.15", | ||
"version": "0.1.0", | ||
"description": "A simplified abstraction of the AMQP 1.0 protocol", | ||
@@ -52,3 +52,3 @@ "keywords": [ | ||
}, | ||
"gitHead": "64b16b812d81ac4d26dfdc102d77e8d2bb85eb6c" | ||
"gitHead": "76f29011b812ebcd2db08b81c00608b4402c2513" | ||
} |
# `amqp` | ||
> TODO: description | ||
> A simplified abstraction of the AMQP 1.0 protocol | ||
## Usage | ||
### RPC | ||
```typescript | ||
import Amqp from '@highoutput/amqp'; | ||
const amqp = new Amqp(); | ||
async main() { | ||
await amqp.createWorker( | ||
'queue', | ||
async message => message | ||
); | ||
const client = await rabbit.createClient('queue'); | ||
const result = await client('Hello World!'); | ||
assert.equal(result, 'Hello World!'); | ||
} | ||
main(); | ||
``` | ||
const amqp = require('amqp'); | ||
// TODO: DEMONSTRATE API | ||
### PubSub | ||
```typescript | ||
import Amqp from '@highoutput/amqp'; | ||
const amqp = new Amqp(); | ||
async main() { | ||
await amqp.createSubscriber( | ||
'topic.*', | ||
async message => assert.equal('Hello World!') | ||
); | ||
const publish = await rabbit.createPublisher('topic.hello'); | ||
publish('Hello World!'); | ||
} | ||
main(); | ||
``` |
/* eslint-disable @typescript-eslint/no-non-null-assertion, @typescript-eslint/camelcase */ | ||
import container, { Connection, EventContext } from 'rhea'; | ||
import R from 'ramda'; | ||
import uuid from 'uuid'; | ||
import logger from './lib/logger'; | ||
import Client, { ClientOptions } from './lib/client'; | ||
import Worker, { WorkerOptions } from './lib/worker'; | ||
import Publisher, { PublisherOptions } from './lib/publisher'; | ||
import Subscriber, { SubscriberOptions } from './lib/subscriber'; | ||
export { Client, Worker }; | ||
export { | ||
Client, Worker, Publisher, Subscriber, | ||
ClientOptions, WorkerOptions, PublisherOptions, SubscriberOptions, | ||
}; | ||
@@ -25,6 +31,10 @@ export type AmqpOptions = { | ||
private workers: Worker[] = []; | ||
private workers: Map<string, Worker> = new Map(); | ||
private clients: Client[] = []; | ||
private clients: Map<string, Client> = new Map(); | ||
private publishers: Map<string, Publisher> = new Map(); | ||
private subscribers: Map<string, Subscriber> = new Map(); | ||
public constructor(options?: Partial<AmqpOptions>) { | ||
@@ -75,3 +85,4 @@ this.options = R.mergeDeepLeft(options || {}, { | ||
this.clients.push(client); | ||
this.clients.set(client.id, client); | ||
client.once('stop', () => this.clients.delete(client.id)); | ||
@@ -86,3 +97,3 @@ return func; | ||
) { | ||
const worker = new Worker( | ||
const worker = new Worker<TInput, TOutput>( | ||
this.connection, | ||
@@ -96,3 +107,5 @@ `${this.options.prefix || ''}${queue}`, | ||
this.workers.push(worker as Worker); | ||
const id = uuid(); | ||
this.workers.set(id, worker as Worker); | ||
worker.once('stop', () => this.workers.delete(id)); | ||
@@ -102,6 +115,48 @@ return worker; | ||
public async createPublisher<TInput extends any[] = any[]>( | ||
topic: string, | ||
options?: PublisherOptions, | ||
) { | ||
const publisher = new Publisher<TInput>( | ||
this.connection, | ||
`${this.options.prefix || ''}${topic}`, | ||
options, | ||
); | ||
await publisher.start(); | ||
const id = uuid(); | ||
this.publishers.set(id, publisher); | ||
publisher.once('stop', () => this.publishers.delete(id)); | ||
return publisher; | ||
} | ||
public async createSubscriber<TInput extends any[] = any[]>( | ||
topic: string, | ||
handler: (...args: TInput) => Promise<void>, | ||
options?: SubscriberOptions, | ||
) { | ||
const subscriber = new Subscriber<TInput>( | ||
this.connection, | ||
`${this.options.prefix || ''}${topic}`, | ||
handler, | ||
options, | ||
); | ||
await subscriber.start(); | ||
const id = uuid(); | ||
this.subscribers.set(id, subscriber as Subscriber); | ||
subscriber.once('stop', () => this.subscribers.delete(id)); | ||
return subscriber; | ||
} | ||
public async stop() { | ||
await Promise.all([ | ||
Promise.all(this.workers.map((worker) => worker.stop())), | ||
Promise.all(this.clients.map((client) => client.stop())), | ||
Promise.all(Array.from(this.workers.values()).map((worker) => worker.stop())), | ||
Promise.all(Array.from(this.clients.values()).map((client) => client.stop())), | ||
Promise.all(Array.from(this.publishers.values()).map((publisher) => publisher.stop())), | ||
Promise.all(Array.from(this.subscribers.values()).map((subscriber) => subscriber.stop())), | ||
]); | ||
@@ -108,0 +163,0 @@ |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
56833
800
47