#Introduction
SQNS is a Simple Queue and Notification Service. It manages processing of queues and notification due to various events
and subscriptions in distributed system. This service can be divided into following sections.
- Queue Management
- Notification Management
This project has been inspired from AWS SQS and AWS SNS.
This is an extension of the existing AWS SQS and AWS SNS with some more functionality add that we felt were lagging.
Prerequisites
- Express app
- MongoDB Server
Installation
Queue Management
Queue management is a distributed multiple priority queue processing system.
This is divided into two part Manager and Workers.
Manager collects all the events sent via different channels.
Worker requests events from Manager to process them.
There can be only one Manager, but many Workers.
Queue Manager
- Initialize queue manager
import { SQNS } from 'sqns';
const dataBaseConnectionConfig = {};
const sqns = new SQNS({
endpoint: 'http://your.server.url/api',
adminSecretKeys: [{ accessKey: 'yourKey', secretAccessKey: 'yourSecretKey' }],
db: { uri: 'DatabaseUri', config: dataBaseConnectionConfig },
sns: {
disable: true
},
});
- Register routes with Express Server.
sqns.registerExpressRoutes(app);
Queue Worker
- Initialize Worker
import { SQNSClient } from 'sqns';
const sqnsClient = new SQNSClient({
endpoint: 'http://your.server.url/api',
accessKeyId: 'yourKey',
secretAccessKey: 'yourSecretKey',
});
- Create Queue
sqnsClient.createQueue({ QueueName: 'queueName' }).then(queue => {});
- Send a Message to the queue
sqnsClient.sendMessage({ QueueUrl: queue.QueueUrl, MessageBody: '123' });
- Receive a Message form queue
client.receiveMessage({ QueueUrl: queue.QueueUrl, MaxNumberOfMessages: 1 })
.then(response => {
const message = response.Messages[0]
});
Manager Scheduler
Either you can use the SQNSClient support to add the event in the queue or use ManagerEventScheduler to
fetch events and add them into the queue periodically. ManagerEventScheduler constructor requires below parameters.
- SQNSClient options.
- Queue Name and Initial Params json to which events will be added.
- Listener function to fetch request related to Events that need to be added in the queue.
This function returns array of two elements.
First is pagination params for next call and second is items that need to be added in the queue.
If items that need to be added in the queue length is zero then next time listener function will be called wih initial pagination params.
- Cron Interval (optional).
import { ManagerEventScheduler } from 'sqns';
...
new ManagerEventScheduler(
{
endpoint: 'http://xyz.abz/api',
accessKeyId: 'accessKey',
secretAccessKey: 'secretKey',
},
{ 'queueName': { page: 0 } },
(params) => {
const items = []
const eventItems = items.map(each => {
const requestItem = {
MessageBody: each.message,
DelaySeconds: 10,
MessageAttributes: { attribute1: { StringValue: 'value1', DataType: 'String' } },
MessageSystemAttributes: { attribute1: { StringValue: 'value1', DataType: 'String' } },
MessageDeduplicationId: each.duplicationId,
};
return requestItem;
});
return [{ "page": params.page + 1 }, eventItems];
},
'*/10 * * * * *');
Processing Scheduler
Either you can use SimpleQueueServerClient support to fetch the event from Manager Server or use WorkerEventScheduler
to fetch events and process them periodically.
WorkerEventScheduler constructor requires below parameters
- SimpleQueueServerClient options.
- Array of Queue Name to which events will be added.
- Listener function that will be called with EventItem to be process.
- Cron Interval (optional).
import { WorkerEventScheduler } from 'sqns';
...
new WorkerEventScheduler(
{
endpoint: 'http://xyz.abz/api',
accessKeyId: 'accessKey',
secretAccessKey: 'secretKey',
},
["queueName"],
(queueName, item) => {
},
'0 * * * *');
Notification Management
Notification Management deals with passing one published event to many subscribed links.
This uses the Queue Management module for passing published events to its subscribers.
Notification Manager
- Initialize queue manager
import { SQNS } from 'sqns';
const dataBaseConnectionConfig = {};
const sqns = new SQNS({
endpoint: 'http://your.server.url/api',
adminSecretKeys: [{ accessKey: 'yourKey', secretAccessKey: 'yourSecretKey' }],
db: { uri: 'DatabaseUri', config: dataBaseConnectionConfig },
});
- Register routes with Express Server
sqns.registerExpressRoutes(app);
- Notification Scheduler
import { WorkerEventScheduler } from 'sqns';
...
new WorkerEventScheduler(
{
endpoint: 'http://xyz.abz/api',
accessKeyId: 'accessKey',
secretAccessKey: 'secretKey',
},
["sqns"],
(queueName, item) => {
},
'0 * * * *');
- Create Topic
client.createTopic({ Name: 'Topic1' })
.then(topic => {})
- Publish Message
client.publish({ Message: 'This is message' })
SQNSClient
createQueue
sqnsClient.createQueue({
QueueName: 'queueName',
Attributes: { attribute1: 'value1' },
tags: { tag1: 'value2'},
}).then(queue => {});
CreateQueue Attributes
- maxReceiveCount: Maximum number of time any event of the queue will be retried.
client.createQueue({ ..., Attributes: { maxReceiveCount: '2' } });
sendMessage
client.sendMessage({
QueueUrl: queue.QueueUrl,
MessageAttributes: {
attribute1: {
StringValue: 'value1',
DataType: 'String'
}
},
MessageSystemAttributes: {
attribute1: {
StringValue: 'attributeValue',
DataType: 'String'
}
},
MessageDeduplicationId: 'uniqueId1',
MessageBody: 'This is message body',
});
sendMessageBatch
client.sendMessageBatch({
QueueUrl: queue.QueueUrl,
Entries: [
{
Id: '123',
MessageAttributes: {
attribute1: {
StringValue: 'value1',
DataType: 'String'
}
},
MessageSystemAttributes: {
attribute1: {
StringValue: 'attributeValue',
DataType: 'String'
}
},
MessageDeduplicationId: 'uniqueId1',
MessageBody: 'This is message body',
},
],
});
receiveMessage
client.receiveMessage({
QueueUrl: queue.QueueUrl,
AttributeNames: ['attributeValue'],
MessageSystemAttributes: ['ALL'],
MaxNumberOfMessages: 10,
VisibilityTimeout: 30,
}).then(response => {
const MessageId = response.Messages[0].MessageId;
});
listQueues
client.listQueues({
QueueNamePrefix: 'queuePrefix',
NextToken: 'nextQuestToken',
})
deleteQueue
client.deleteQueue({ QueueUrl: queue.QueueUrl });
getQueueUrl
client.getQueueUrl({ QueueName: 'queueName' });
markEventSuccess
client.markEventSuccess(MessageId, queue.QueueUrl, 'success message');
markEventFailure
client.markEventFailure(MessageId, queue.QueueUrl, 'success message');
createTopic
client.createTopic({
Name: 'Topic1',
Attributes: { DisplayName: 'Topic One' },
Tags: [{ Key: 'tag1', Value: 'value1' }],
}).then(topic => {});
listTopics
client.listTopics({
NextToken: 'nextToken'
})
getTopicAttributes
client.getTopicAttributes({ TopicArn: topic.TopicArn })
setTopicAttributes
client.setTopicAttributes({
TopicArn: topic.TopicArn,
AttributeName: 'DisplayName',
AttributeValue: 'Updated Topic One',
})
deleteTopic
client.deleteTopic({ TopicArn: topic.TopicArn });
publish
client.publish({
Message: 'This is message',
TopicArn: topic.TopicArn,
TargetArn: topic.TopicArn,
MessageAttributes: { key1: { DataType: 'String', StringValue: 'value' } },
})
subscribe
client.subscribe({
TopicArn: topic.TopicArn,
Attributes: { key: 'value' },
Endpoint: 'http://your.server.subscription/url',
Protocol: 'http',
ReturnSubscriptionArn: true,
}).then(result => {
const SubscriptionArn = result.SubscriptionArn;
})
listSubscriptions
client.listSubscriptions({
NextToken: 'NextToken'
})
listSubscriptionsByTopic
client.listSubscriptionsByTopic({
TopicArn: topic.TopicArn,
NextToken: 'NextToken'
});
confirmSubscription
client.confirmSubscription({
TopicArn: 'topicArn',
Token: 'verificationToken',
});
unsubscribe
client.unsubscribe({ SubscriptionArn: 'subscriptionArn' });
getPublish
client.getPublish({ MessageId: 'MessageId' })
getSubscription
client.getSubscription({ SubscriptionArn: 'subscriptionArn' })
markPublished
client.markPublished({ MessageId: 'MessageId' })