async-queue-adapter
Advanced tools
Comparing version 0.1.0 to 0.2.1
@@ -0,1 +1,2 @@ | ||
/// <reference types="node" /> | ||
export interface AmqpQueueConfig extends RawAmqpConnectOptions { | ||
@@ -16,1 +17,60 @@ readonly adapter: "amqplib"; | ||
} | ||
export interface RawAmqpConnection { | ||
close(): Promise<void>; | ||
createChannel(): Promise<RawAmqpChannel>; | ||
createConfirmChannel(): Promise<RawAmqpConfirmChannel>; | ||
} | ||
export interface RawAmqpConfirmChannel extends RawAmqpChannel { | ||
publish(exchange: string, routingKey: string, content: Buffer, options?: any, callback?: (err: any, ok: {}) => void): boolean; | ||
sendToQueue(queue: string, content: Buffer, options?: any, callback?: (err: any, ok: {}) => void): boolean; | ||
waitForConfirms(): Promise<void>; | ||
} | ||
export interface RawAmqpChannel { | ||
close(): Promise<void>; | ||
assertQueue(queue: string, options?: any): Promise<RawAmqpRepliesAssertQueue>; | ||
checkQueue(queue: string): Promise<RawAmqpRepliesAssertQueue>; | ||
deleteQueue(queue: string, options?: any): Promise<RawAmqpRepliesDeleteQueue>; | ||
purgeQueue(queue: string): Promise<RawAmqpRepliesPurgeQueue>; | ||
bindQueue(queue: string, source: string, pattern: string, args?: any): Promise<{}>; | ||
unbindQueue(queue: string, source: string, pattern: string, args?: any): Promise<{}>; | ||
assertExchange(exchange: string, type: string, options?: any): Promise<RawAmqpRepliesAssertExchange>; | ||
checkExchange(exchange: string): Promise<{}>; | ||
deleteExchange(exchange: string, options?: any): Promise<{}>; | ||
bindExchange(destination: string, source: string, pattern: string, args?: any): Promise<{}>; | ||
unbindExchange(destination: string, source: string, pattern: string, args?: any): Promise<{}>; | ||
publish(exchange: string, routingKey: string, content: Buffer, options?: any): boolean; | ||
sendToQueue(queue: string, content: Buffer, options?: any): boolean; | ||
consume(queue: string, onMessage: (msg: RawAmqpMessage | null) => any, options?: any): Promise<RawAmqpRepliesConsume>; | ||
cancel(consumerTag: string): Promise<{}>; | ||
get(queue: string, options?: any): Promise<RawAmqpMessage | false>; | ||
reject(message: RawAmqpMessage, requeue?: boolean): void; | ||
ack(message: RawAmqpMessage, allUpTo?: boolean): void; | ||
ackAll(): void; | ||
nack(message: RawAmqpMessage, allUpTo?: boolean, requeue?: boolean): void; | ||
nackAll(requeue?: boolean): void; | ||
reject(message: RawAmqpMessage, requeue?: boolean): void; | ||
prefetch(count: number, global?: boolean): Promise<{}>; | ||
recover(): Promise<{}>; | ||
} | ||
export interface RawAmqpMessage { | ||
content: Buffer; | ||
fields: any; | ||
properties: any; | ||
} | ||
export interface RawAmqpRepliesAssertQueue { | ||
queue: string; | ||
messageCount: number; | ||
consumerCount: number; | ||
} | ||
export interface RawAmqpRepliesPurgeQueue { | ||
messageCount: number; | ||
} | ||
export interface RawAmqpRepliesDeleteQueue { | ||
messageCount: number; | ||
} | ||
export interface RawAmqpRepliesAssertExchange { | ||
exchange: string; | ||
} | ||
export interface RawAmqpRepliesConsume { | ||
consumerTag: string; | ||
} |
@@ -1,11 +0,11 @@ | ||
import { Message } from "amqplib"; | ||
import { Job } from "../../interfaces/queue"; | ||
import { RawAmqpMessage } from "./interfaces"; | ||
import { AmqpQueue } from "./queue"; | ||
export declare class AmqpJob<P> implements Job<P> { | ||
queue: AmqpQueue<P>; | ||
message: Message; | ||
message: RawAmqpMessage; | ||
payload: P; | ||
isDeleted: boolean; | ||
constructor(queue: AmqpQueue<P>, message: Message, payload: P); | ||
constructor(queue: AmqpQueue<P>, message: RawAmqpMessage, payload: P); | ||
done(): Promise<void>; | ||
} |
@@ -1,12 +0,12 @@ | ||
import { Channel, Connection } from "amqplib"; | ||
import { Queue, SendQueueOptions } from "../../interfaces/queue"; | ||
import { RawAmqpChannel, RawAmqpConnection } from "./interfaces"; | ||
import { AmqpJob } from "./job"; | ||
export declare class AmqpQueue<P> implements Queue<P> { | ||
connecting: Promise<Connection>; | ||
connecting: Promise<RawAmqpConnection>; | ||
queue: string; | ||
connection?: Connection; | ||
channel?: Channel; | ||
constructor(connecting: Promise<Connection>, queue?: string); | ||
connection?: RawAmqpConnection; | ||
channel?: RawAmqpChannel; | ||
constructor(connecting: Promise<RawAmqpConnection>, queue?: string); | ||
close(): Promise<void>; | ||
getChannel(): Promise<Channel>; | ||
getChannel(): Promise<RawAmqpChannel>; | ||
flush(): Promise<void>; | ||
@@ -13,0 +13,0 @@ send(payload: P, options?: SendQueueOptions): Promise<void>; |
@@ -38,4 +38,7 @@ "use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
var queue_1 = require("../../interfaces/queue"); | ||
var utils_1 = require("../../utils"); | ||
var job_1 = require("./job"); | ||
var DEFAULT_PRIORITY = 10; | ||
var DEFAULT_PRIORITY = queue_1.Priority.Normal; | ||
var scale = utils_1.priorityScale([0, 255], [0, 255]); | ||
var AmqpQueue = /** @class */ (function () { | ||
@@ -88,3 +91,5 @@ function AmqpQueue(connecting, queue) { | ||
_b.channel = _c.sent(); | ||
return [4 /*yield*/, this.channel.assertQueue(this.queue)]; | ||
return [4 /*yield*/, this.channel.assertQueue(this.queue, { | ||
maxPriority: 255, | ||
})]; | ||
case 3: | ||
@@ -123,3 +128,3 @@ _c.sent(); | ||
return [4 /*yield*/, channel.sendToQueue(this.queue, new Buffer(JSON.stringify(payload)), { | ||
priority: (options && options.priority) || DEFAULT_PRIORITY, | ||
priority: scale((options && options.priority) || DEFAULT_PRIORITY), | ||
})]; | ||
@@ -126,0 +131,0 @@ case 2: |
@@ -38,6 +38,9 @@ "use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
var queue_1 = require("../../interfaces/queue"); | ||
var utils_1 = require("../../utils"); | ||
var job_1 = require("./job"); | ||
var DEFAULT_PRIORITY = 1024; | ||
var DEFAULT_PRIORITY = queue_1.Priority.Normal; | ||
var DEFAULT_DELAY = 0; | ||
var DEFAULT_TTR = 1; | ||
var scale = utils_1.priorityScale([0, 255], [255, 0]); | ||
var BeanstalkdQueue = /** @class */ (function () { | ||
@@ -86,3 +89,3 @@ function BeanstalkdQueue(client, tube) { | ||
return __awaiter(this, void 0, void 0, function () { | ||
var res; | ||
var res, e_1; | ||
return __generator(this, function (_a) { | ||
@@ -93,9 +96,23 @@ switch (_a.label) { | ||
_a.sent(); | ||
_a.label = 2; | ||
case 2: | ||
if (!1) return [3 /*break*/, 8]; | ||
_a.label = 3; | ||
case 3: | ||
_a.trys.push([3, 6, , 7]); | ||
return [4 /*yield*/, this.client.peekReady()]; | ||
case 2: | ||
case 4: | ||
res = _a.sent(); | ||
return [4 /*yield*/, this.client.delete(res[0])]; | ||
case 3: | ||
case 5: | ||
_a.sent(); | ||
return [2 /*return*/]; | ||
return [3 /*break*/, 7]; | ||
case 6: | ||
e_1 = _a.sent(); | ||
if (e_1.message === "NOT_FOUND") { | ||
return [3 /*break*/, 8]; | ||
} | ||
throw e_1; | ||
case 7: return [3 /*break*/, 2]; | ||
case 8: return [2 /*return*/]; | ||
} | ||
@@ -112,3 +129,3 @@ }); | ||
_a.sent(); | ||
return [4 /*yield*/, this.client.put((options && options.priority) || DEFAULT_PRIORITY, (options && options.delay) || DEFAULT_DELAY, DEFAULT_TTR, JSON.stringify(payload))]; | ||
return [4 /*yield*/, this.client.put(scale((options && options.priority) || DEFAULT_PRIORITY), (options && options.delay) || DEFAULT_DELAY, DEFAULT_TTR, JSON.stringify(payload))]; | ||
case 2: | ||
@@ -123,3 +140,3 @@ _a.sent(); | ||
return __awaiter(this, void 0, void 0, function () { | ||
var _a, id, buff, e_1; | ||
var _a, id, buff, e_2; | ||
return __generator(this, function (_b) { | ||
@@ -141,7 +158,7 @@ switch (_b.label) { | ||
case 5: | ||
e_1 = _b.sent(); | ||
if (e_1.message === "TIMED_OUT") { | ||
e_2 = _b.sent(); | ||
if (e_2.message === "TIMED_OUT") { | ||
return [2 /*return*/]; | ||
} | ||
throw e_1; | ||
throw e_2; | ||
case 6: return [2 /*return*/]; | ||
@@ -148,0 +165,0 @@ } |
export { create } from "./create"; | ||
export { QueueConfig } from "./interfaces/config"; | ||
export { Job, Queue } from "./interfaces/queue"; | ||
export { Job, Priority, Queue, SendQueueOptions } from "./interfaces/queue"; | ||
export * from "./driver/amqp/interfaces"; | ||
export { AmqpQueue } from "./driver/amqp/queue"; | ||
export { AmqpJob } from "./driver/amqp/job"; | ||
export * from "./driver/beanstalkd/interfaces"; | ||
export { BeanstalkdQueue } from "./driver/beanstalkd/queue"; | ||
export { BeanstalkdJob } from "./driver/beanstalkd/job"; | ||
export * from "./driver/local/interfaces"; | ||
export { LocalQueue } from "./driver/local/queue"; | ||
export { LocalJob } from "./driver/local/job"; | ||
export * from "./driver/sqs/interfaces"; | ||
export { SqsQueue } from "./driver/sqs/queue"; | ||
export { SqsJob } from "./driver/sqs/job"; |
@@ -5,6 +5,20 @@ "use strict"; | ||
exports.create = create_1.create; | ||
var queue_1 = require("./driver/local/queue"); | ||
exports.LocalQueue = queue_1.LocalQueue; | ||
var job_1 = require("./driver/local/job"); | ||
exports.LocalJob = job_1.LocalJob; | ||
var queue_1 = require("./interfaces/queue"); | ||
exports.Priority = queue_1.Priority; | ||
var queue_2 = require("./driver/amqp/queue"); | ||
exports.AmqpQueue = queue_2.AmqpQueue; | ||
var job_1 = require("./driver/amqp/job"); | ||
exports.AmqpJob = job_1.AmqpJob; | ||
var queue_3 = require("./driver/beanstalkd/queue"); | ||
exports.BeanstalkdQueue = queue_3.BeanstalkdQueue; | ||
var job_2 = require("./driver/beanstalkd/job"); | ||
exports.BeanstalkdJob = job_2.BeanstalkdJob; | ||
var queue_4 = require("./driver/local/queue"); | ||
exports.LocalQueue = queue_4.LocalQueue; | ||
var job_3 = require("./driver/local/job"); | ||
exports.LocalJob = job_3.LocalJob; | ||
var queue_5 = require("./driver/sqs/queue"); | ||
exports.SqsQueue = queue_5.SqsQueue; | ||
var job_4 = require("./driver/sqs/job"); | ||
exports.SqsJob = job_4.SqsJob; | ||
//# sourceMappingURL=index.js.map |
@@ -0,1 +1,6 @@ | ||
export declare enum Priority { | ||
Normal = 10, | ||
High = 30, | ||
Highest = 50 | ||
} | ||
export interface Queue<P> { | ||
@@ -2,0 +7,0 @@ close(): Promise<void>; |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
var Priority; | ||
(function (Priority) { | ||
Priority[Priority["Normal"] = 10] = "Normal"; | ||
Priority[Priority["High"] = 30] = "High"; | ||
Priority[Priority["Highest"] = 50] = "Highest"; | ||
})(Priority = exports.Priority || (exports.Priority = {})); | ||
//# sourceMappingURL=queue.js.map |
{ | ||
"name": "async-queue-adapter", | ||
"version": "0.1.0", | ||
"version": "0.2.1", | ||
"description": "Async Queue Adapter for Javascript(& Typescript).", | ||
@@ -5,0 +5,0 @@ "keywords": [ |
@@ -21,1 +21,77 @@ | ||
} | ||
export interface RawAmqpConnection { | ||
close(): Promise<void> | ||
createChannel(): Promise<RawAmqpChannel> | ||
createConfirmChannel(): Promise<RawAmqpConfirmChannel> | ||
} | ||
export interface RawAmqpConfirmChannel extends RawAmqpChannel { | ||
publish(exchange: string, routingKey: string, content: Buffer, options?: any, callback?: (err: any, ok: {}) => void): boolean | ||
sendToQueue(queue: string, content: Buffer, options?: any, callback?: (err: any, ok: {}) => void): boolean | ||
waitForConfirms(): Promise<void> | ||
} | ||
export interface RawAmqpChannel { | ||
close(): Promise<void> | ||
assertQueue(queue: string, options?: any): Promise<RawAmqpRepliesAssertQueue> | ||
checkQueue(queue: string): Promise<RawAmqpRepliesAssertQueue> | ||
deleteQueue(queue: string, options?: any): Promise<RawAmqpRepliesDeleteQueue> | ||
purgeQueue(queue: string): Promise<RawAmqpRepliesPurgeQueue> | ||
bindQueue(queue: string, source: string, pattern: string, args?: any): Promise<{}> | ||
unbindQueue(queue: string, source: string, pattern: string, args?: any): Promise<{}> | ||
assertExchange(exchange: string, type: string, options?: any): Promise<RawAmqpRepliesAssertExchange> | ||
checkExchange(exchange: string): Promise<{}> | ||
deleteExchange(exchange: string, options?: any): Promise<{}> | ||
bindExchange(destination: string, source: string, pattern: string, args?: any): Promise<{}> | ||
unbindExchange(destination: string, source: string, pattern: string, args?: any): Promise<{}> | ||
publish(exchange: string, routingKey: string, content: Buffer, options?: any): boolean | ||
sendToQueue(queue: string, content: Buffer, options?: any): boolean | ||
consume(queue: string, onMessage: (msg: RawAmqpMessage | null) => any, options?: any): Promise<RawAmqpRepliesConsume> | ||
cancel(consumerTag: string): Promise<{}> | ||
get(queue: string, options?: any): Promise<RawAmqpMessage | false> | ||
reject(message: RawAmqpMessage, requeue?: boolean): void | ||
ack(message: RawAmqpMessage, allUpTo?: boolean): void | ||
ackAll(): void | ||
nack(message: RawAmqpMessage, allUpTo?: boolean, requeue?: boolean): void | ||
nackAll(requeue?: boolean): void | ||
reject(message: RawAmqpMessage, requeue?: boolean): void | ||
prefetch(count: number, global?: boolean): Promise<{}> | ||
recover(): Promise<{}> | ||
} | ||
export interface RawAmqpMessage { | ||
content: Buffer | ||
fields: any | ||
properties: any | ||
} | ||
export interface RawAmqpRepliesAssertQueue { | ||
queue: string | ||
messageCount: number | ||
consumerCount: number | ||
} | ||
export interface RawAmqpRepliesPurgeQueue { | ||
messageCount: number | ||
} | ||
export interface RawAmqpRepliesDeleteQueue { | ||
messageCount: number | ||
} | ||
export interface RawAmqpRepliesAssertExchange { | ||
exchange: string | ||
} | ||
export interface RawAmqpRepliesConsume { | ||
consumerTag: string | ||
} |
import { Message } from "amqplib" | ||
import { Job } from "../../interfaces/queue" | ||
import { RawAmqpMessage } from "./interfaces" | ||
import { AmqpQueue } from "./queue" | ||
@@ -10,3 +10,3 @@ | ||
constructor(public queue: AmqpQueue<P>, public message: Message, public payload: P) { | ||
constructor(public queue: AmqpQueue<P>, public message: RawAmqpMessage, public payload: P) { | ||
} | ||
@@ -13,0 +13,0 @@ |
import { Channel, Connection } from "amqplib" | ||
import { Queue, SendQueueOptions } from "../../interfaces/queue" | ||
import { Priority, Queue, SendQueueOptions } from "../../interfaces/queue" | ||
import { priorityScale } from "../../utils" | ||
import { RawAmqpChannel, RawAmqpConnection } from "./interfaces" | ||
import { AmqpJob } from "./job" | ||
const DEFAULT_PRIORITY = 10 | ||
const DEFAULT_PRIORITY = Priority.Normal | ||
const scale = priorityScale([0, 255], [0, 255]) | ||
export class AmqpQueue<P> implements Queue<P> { | ||
public connection?: Connection | ||
public connection?: RawAmqpConnection | ||
public channel?: Channel | ||
public channel?: RawAmqpChannel | ||
constructor(public connecting: Promise<Connection>, public queue = "default") { | ||
constructor(public connecting: Promise<RawAmqpConnection>, public queue = "default") { | ||
} | ||
@@ -28,7 +31,9 @@ | ||
public async getChannel(): Promise<Channel> { | ||
public async getChannel(): Promise<RawAmqpChannel> { | ||
if (!this.channel) { | ||
this.connection = await this.connecting | ||
this.channel = await this.connection.createChannel() | ||
await this.channel.assertQueue(this.queue) | ||
await this.channel.assertQueue(this.queue, { | ||
maxPriority: 255, | ||
}) | ||
} | ||
@@ -46,3 +51,3 @@ return this.channel | ||
await channel.sendToQueue(this.queue, new Buffer(JSON.stringify(payload)), { | ||
priority: (options && options.priority) || DEFAULT_PRIORITY, | ||
priority: scale((options && options.priority) || DEFAULT_PRIORITY), | ||
}) | ||
@@ -49,0 +54,0 @@ } |
import { Queue, SendQueueOptions } from "../../interfaces/queue" | ||
import { Priority, Queue, SendQueueOptions } from "../../interfaces/queue" | ||
import { priorityScale } from "../../utils" | ||
import { BeanstalkdJob } from "./job" | ||
const DEFAULT_PRIORITY = 1024 | ||
const DEFAULT_PRIORITY = Priority.Normal | ||
const DEFAULT_DELAY = 0 | ||
const DEFAULT_TTR = 1 | ||
const scale = priorityScale([0, 255], [255, 0]) | ||
export class BeanstalkdQueue<P> implements Queue<P> { | ||
@@ -33,4 +36,13 @@ | ||
await this.connect() | ||
const res = await this.client.peekReady() | ||
await this.client.delete(res[0]) | ||
while (1) { | ||
try { | ||
const res = await this.client.peekReady() | ||
await this.client.delete(res[0]) | ||
} catch (e) { | ||
if (e.message === "NOT_FOUND") { | ||
break | ||
} | ||
throw e | ||
} | ||
} | ||
} | ||
@@ -41,3 +53,3 @@ | ||
await this.client.put( | ||
(options && options.priority) || DEFAULT_PRIORITY, | ||
scale((options && options.priority) || DEFAULT_PRIORITY), | ||
(options && options.delay) || DEFAULT_DELAY, | ||
@@ -44,0 +56,0 @@ DEFAULT_TTR, |
@@ -5,6 +5,18 @@ | ||
export { QueueConfig } from "./interfaces/config" | ||
export { Job, Queue } from "./interfaces/queue" | ||
export { Job, Priority, Queue, SendQueueOptions } from "./interfaces/queue" | ||
export * from "./driver/amqp/interfaces" | ||
export { AmqpQueue } from "./driver/amqp/queue" | ||
export { AmqpJob } from "./driver/amqp/job" | ||
export * from "./driver/beanstalkd/interfaces" | ||
export { BeanstalkdQueue } from "./driver/beanstalkd/queue" | ||
export { BeanstalkdJob } from "./driver/beanstalkd/job" | ||
export * from "./driver/local/interfaces" | ||
export { LocalQueue } from "./driver/local/queue" | ||
export { LocalJob } from "./driver/local/job" | ||
export * from "./driver/sqs/interfaces" | ||
export { SqsQueue } from "./driver/sqs/queue" | ||
export { SqsJob } from "./driver/sqs/job" |
export enum Priority { | ||
Normal = 10, | ||
High = 30, | ||
Highest = 50, | ||
} | ||
export interface Queue<P> { | ||
@@ -3,0 +9,0 @@ close(): Promise<void> |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
82136
75
1606