Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@highoutput/amqp

Package Overview
Dependencies
Maintainers
1
Versions
80
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@highoutput/amqp - npm Package Compare versions

Comparing version 0.0.15 to 0.1.0

8

build/index.d.ts
import Client, { ClientOptions } from './lib/client';
import Worker, { WorkerOptions } from './lib/worker';
export { Client, Worker };
import Publisher, { PublisherOptions } from './lib/publisher';
import Subscriber, { SubscriberOptions } from './lib/subscriber';
export { Client, Worker, Publisher, Subscriber, ClientOptions, WorkerOptions, PublisherOptions, SubscriberOptions, };
export declare type AmqpOptions = {

@@ -18,2 +20,4 @@ host: string;

private clients;
private publishers;
private subscribers;
constructor(options?: Partial<AmqpOptions>);

@@ -25,4 +29,6 @@ createClient<TInput extends any[] = any[], TOutput = any>(queue: string, options?: ClientOptions): Promise<{

createWorker<TInput extends any[] = any[], TOutput = any>(queue: string, handler: (...args: TInput) => Promise<TOutput>, options?: WorkerOptions): Promise<Worker<TInput, TOutput>>;
createPublisher<TInput extends any[] = any[]>(topic: string, options?: PublisherOptions): Promise<Publisher<TInput>>;
createSubscriber<TInput extends any[] = any[]>(topic: string, handler: (...args: TInput) => Promise<void>, options?: SubscriberOptions): Promise<Subscriber<TInput>>;
stop(): Promise<void>;
}
//# sourceMappingURL=index.d.ts.map

@@ -8,2 +8,3 @@ "use strict";

const ramda_1 = __importDefault(require("ramda"));
const uuid_1 = __importDefault(require("uuid"));
const logger_1 = __importDefault(require("./lib/logger"));

@@ -14,6 +15,12 @@ const client_1 = __importDefault(require("./lib/client"));

exports.Worker = worker_1.default;
const publisher_1 = __importDefault(require("./lib/publisher"));
exports.Publisher = publisher_1.default;
const subscriber_1 = __importDefault(require("./lib/subscriber"));
exports.Subscriber = subscriber_1.default;
class Amqp {
constructor(options) {
this.workers = [];
this.clients = [];
this.workers = new Map();
this.clients = new Map();
this.publishers = new Map();
this.subscribers = new Map();
this.options = ramda_1.default.mergeDeepLeft(options || {}, {

@@ -45,3 +52,4 @@ host: 'localhost',

func.client = client;
this.clients.push(client);
this.clients.set(client.id, client);
client.once('stop', () => this.clients.delete(client.id));
return func;

@@ -52,9 +60,29 @@ }

await worker.start();
this.workers.push(worker);
const id = uuid_1.default();
this.workers.set(id, worker);
worker.once('stop', () => this.workers.delete(id));
return worker;
}
async createPublisher(topic, options) {
const publisher = new publisher_1.default(this.connection, `${this.options.prefix || ''}${topic}`, options);
await publisher.start();
const id = uuid_1.default();
this.publishers.set(id, publisher);
publisher.once('stop', () => this.publishers.delete(id));
return publisher;
}
async createSubscriber(topic, handler, options) {
const subscriber = new subscriber_1.default(this.connection, `${this.options.prefix || ''}${topic}`, handler, options);
await subscriber.start();
const id = uuid_1.default();
this.subscribers.set(id, subscriber);
subscriber.once('stop', () => this.subscribers.delete(id));
return subscriber;
}
async stop() {
await Promise.all([
Promise.all(this.workers.map((worker) => worker.stop())),
Promise.all(this.clients.map((client) => client.stop())),
Promise.all(Array.from(this.workers.values()).map((worker) => worker.stop())),
Promise.all(Array.from(this.clients.values()).map((client) => client.stop())),
Promise.all(Array.from(this.publishers.values()).map((publisher) => publisher.stop())),
Promise.all(Array.from(this.subscribers.values()).map((subscriber) => subscriber.stop())),
]);

@@ -61,0 +89,0 @@ this.connection.close();

4

build/lib/client.d.ts

@@ -0,2 +1,4 @@

/// <reference types="node" />
import { Connection } from 'rhea';
import { EventEmitter } from 'events';
export declare type ClientOptions = {

@@ -8,3 +10,3 @@ timeout: string;

};
export default class Client<TInput extends any[] = any[], TOutput = any> {
export default class Client<TInput extends any[] = any[], TOutput = any> extends EventEmitter {
private readonly connection;

@@ -11,0 +13,0 @@ private readonly queue;

@@ -11,6 +11,8 @@ "use strict";

const error_1 = __importDefault(require("@highoutput/error"));
const events_1 = require("events");
const logger_1 = __importDefault(require("./logger"));
const util_1 = require("./util");
class Client {
class Client extends events_1.EventEmitter {
constructor(connection, queue, options) {
super();
this.connection = connection;

@@ -84,3 +86,3 @@ this.queue = queue;

source: {
address: `queue://${this.queue}/${this.id}`,
address: `temp-queue://${this.queue}/${this.id}`,
dynamic: true,

@@ -116,2 +118,3 @@ },

});
this.emit('start');
}

@@ -126,2 +129,3 @@ async stop() {

}
this.emit('stop');
}

@@ -128,0 +132,0 @@ }

@@ -0,6 +1,8 @@

/// <reference types="node" />
import { Connection } from 'rhea';
import { EventEmitter } from 'events';
export declare type PublisherOptions = {
serialize: boolean;
};
export default class Publisher<TInput extends any[]> {
export default class Publisher<TInput extends any[] = any[]> extends EventEmitter {
private readonly connection;

@@ -7,0 +9,0 @@ private readonly topic;

@@ -7,6 +7,8 @@ "use strict";

const ramda_1 = __importDefault(require("ramda"));
const events_1 = require("events");
const logger_1 = __importDefault(require("./logger"));
const util_1 = require("./util");
class Publisher {
class Publisher extends events_1.EventEmitter {
constructor(connection, topic, options) {
super();
this.connection = connection;

@@ -39,2 +41,3 @@ this.topic = topic;

});
this.emit('start');
}

@@ -45,2 +48,3 @@ async stop() {

}
this.emit('stop');
}

@@ -47,0 +51,0 @@ }

@@ -0,2 +1,4 @@

/// <reference types="node" />
import { Connection } from 'rhea';
import { EventEmitter } from 'events';
export declare type SubscriberOptions = {

@@ -6,3 +8,3 @@ concurrency: number;

};
export default class Publisher<TInput extends any[]> {
export default class Subscriber<TInput extends any[] = any[]> extends EventEmitter {
private readonly connection;

@@ -9,0 +11,0 @@ private readonly topic;

@@ -8,6 +8,8 @@ "use strict";

const async_group_1 = __importDefault(require("@highoutput/async-group"));
const events_1 = require("events");
const logger_1 = __importDefault(require("./logger"));
const util_1 = require("./util");
class Publisher {
class Subscriber extends events_1.EventEmitter {
constructor(connection, topic, handler, options) {
super();
this.connection = connection;

@@ -42,4 +44,5 @@ this.topic = topic;

address: `topic://${this.topic}`,
dynamic: true,
},
credit_window: 0,
autoaccept: false,
});

@@ -52,2 +55,3 @@ this.receiver.on('message', async (context) => {

this.receiver.add_credit(this.options.concurrency);
this.emit('start');
}

@@ -59,5 +63,6 @@ async stop() {

await this.asyncGroup.wait();
this.emit('stop');
}
}
exports.default = Publisher;
exports.default = Subscriber;
//# sourceMappingURL=subscriber.js.map
{
"name": "@highoutput/amqp",
"version": "0.0.15",
"version": "0.1.0",
"description": "A simplified abstraction of the AMQP 1.0 protocol",

@@ -52,3 +52,3 @@ "keywords": [

},
"gitHead": "64b16b812d81ac4d26dfdc102d77e8d2bb85eb6c"
"gitHead": "76f29011b812ebcd2db08b81c00608b4402c2513"
}
# `amqp`
> TODO: description
> A simplified abstraction of the AMQP 1.0 protocol
## Usage
### RPC
```typescript
import Amqp from '@highoutput/amqp';
const amqp = new Amqp();
async main() {
await amqp.createWorker(
'queue',
async message => message
);
const client = await rabbit.createClient('queue');
const result = await client('Hello World!');
assert.equal(result, 'Hello World!');
}
main();
```
const amqp = require('amqp');
// TODO: DEMONSTRATE API
### PubSub
```typescript
import Amqp from '@highoutput/amqp';
const amqp = new Amqp();
async main() {
await amqp.createSubscriber(
'topic.*',
async message => assert.equal('Hello World!')
);
const publish = await rabbit.createPublisher('topic.hello');
publish('Hello World!');
}
main();
```
/* eslint-disable @typescript-eslint/no-non-null-assertion, @typescript-eslint/camelcase */
import container, { Connection, EventContext } from 'rhea';
import R from 'ramda';
import uuid from 'uuid';
import logger from './lib/logger';
import Client, { ClientOptions } from './lib/client';
import Worker, { WorkerOptions } from './lib/worker';
import Publisher, { PublisherOptions } from './lib/publisher';
import Subscriber, { SubscriberOptions } from './lib/subscriber';
export { Client, Worker };
export {
Client, Worker, Publisher, Subscriber,
ClientOptions, WorkerOptions, PublisherOptions, SubscriberOptions,
};

@@ -25,6 +31,10 @@ export type AmqpOptions = {

private workers: Worker[] = [];
private workers: Map<string, Worker> = new Map();
private clients: Client[] = [];
private clients: Map<string, Client> = new Map();
private publishers: Map<string, Publisher> = new Map();
private subscribers: Map<string, Subscriber> = new Map();
public constructor(options?: Partial<AmqpOptions>) {

@@ -75,3 +85,4 @@ this.options = R.mergeDeepLeft(options || {}, {

this.clients.push(client);
this.clients.set(client.id, client);
client.once('stop', () => this.clients.delete(client.id));

@@ -86,3 +97,3 @@ return func;

) {
const worker = new Worker(
const worker = new Worker<TInput, TOutput>(
this.connection,

@@ -96,3 +107,5 @@ `${this.options.prefix || ''}${queue}`,

this.workers.push(worker as Worker);
const id = uuid();
this.workers.set(id, worker as Worker);
worker.once('stop', () => this.workers.delete(id));

@@ -102,6 +115,48 @@ return worker;

public async createPublisher<TInput extends any[] = any[]>(
topic: string,
options?: PublisherOptions,
) {
const publisher = new Publisher<TInput>(
this.connection,
`${this.options.prefix || ''}${topic}`,
options,
);
await publisher.start();
const id = uuid();
this.publishers.set(id, publisher);
publisher.once('stop', () => this.publishers.delete(id));
return publisher;
}
public async createSubscriber<TInput extends any[] = any[]>(
topic: string,
handler: (...args: TInput) => Promise<void>,
options?: SubscriberOptions,
) {
const subscriber = new Subscriber<TInput>(
this.connection,
`${this.options.prefix || ''}${topic}`,
handler,
options,
);
await subscriber.start();
const id = uuid();
this.subscribers.set(id, subscriber as Subscriber);
subscriber.once('stop', () => this.subscribers.delete(id));
return subscriber;
}
public async stop() {
await Promise.all([
Promise.all(this.workers.map((worker) => worker.stop())),
Promise.all(this.clients.map((client) => client.stop())),
Promise.all(Array.from(this.workers.values()).map((worker) => worker.stop())),
Promise.all(Array.from(this.clients.values()).map((client) => client.stop())),
Promise.all(Array.from(this.publishers.values()).map((publisher) => publisher.stop())),
Promise.all(Array.from(this.subscribers.values()).map((subscriber) => subscriber.stop())),
]);

@@ -108,0 +163,0 @@

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc