STEMuli Queue Factory
Description
This library handles the creation and management of message queues on stemuli.
Setup
- To install run
npm i @stemuli/queue
- You must have the environment variable
REDIS_SERVER set with the value being the connection string to the redis server. Ask Wade or Femi for this value.
- All service must initialize the system queue
Recommendation
- Folder structure in your repo should be as follows:
- [] queues
- [] jobs
- [] <job-folder>
- [] processors
- <job-folder>.job.ts
- [] <job-folder>
- [] processors
- <job-folder>.job.ts
- index.ts
Working with the SystemQueue
The Stemuli SystemQueue allows you to easily send messages to one or all services at once.
This should be initialized on all services. The developer of the service can choose to process these messages
or not. For example, when a new user is added to the db, the auth service publishes a message USER_CREAT_UPDATE_ACTION to
the system queue. All service interested in responding to this message will be expected to register a processor
for this event. See how to register a processor below.
To initialize the SystemQueue on your service, add the following code to your queue/index.ts file:
./queues/index.ts
import {SystemQueues} from '@stemuli/queue/dist';
import {AUTH_SERVICE_QUEUE, COMMUNICATION_SERVICE_QUEUE, CLASS_SERVICE_QUEUE, EVENT_SERVICE_QUEUE} from '@stemuli/queue/dist';
const Q = SystemQueues.init(AUTH_SERVICE_QUEUE); //replace AUTH_SERVICE_QUEUE with your service queue name imported above.
Creating a custom Queue
It is possible to create a custom queue.
./queues/index.ts
import {QueueFactory} from '@stemuli/queue/dist';
const customQueue = new QueueFactory('CustomQueueName').init(); //replace `CustomQueueName` with the name of your custom queue.
Registering a processor for a queue
./queues/index.ts
import {QueueFactory} from '@stemuli/queue/dist';
import {CommuncationJobs} from './jobs/communication';
const customQueue = new QueueFactory('CustomQueueName').init(); //replace `CustomQueueName` with the name of your custom queue.
const customeQueueAction = new CommuncationJobs(customeQueue)
./queues/jobs/communication/index.ts
import { IQueues } from '@stemuli/queue/dist';
import ChatJobProcessor from './processors';
import {
CHAT_USER_CREATE_UPDATE_JOB,
CHAT_CHANNEL_CREATE_UPDATE_JOB,
CHAT_SUBSCRIBE_MEMBERS_TO_CHANNEL_JOB
} from '../../inc/constants';
import {StemuliJob} from "@stemuli/queue/dist";
export default class ChatJob extends StemuliJob{
queues: IQueues;
constructor(queues: any, defaultQueue?: string) {
// register the action processors in the contructor
super(queues, defaultQueue);
this.registerProcessor(CHAT_USER_CREATE_UPDATE_JOB, ChatJobProcessor.userCreatedUpdated);
}
// method to add a CHAT_USER_CREATE_UPDATE_JOB job to the queue
async addToChatUserCreateUpdateQueue (data: any, options?: any) {
return this.sendToQueue( CHAT_USER_CREATE_UPDATE_JOB, data, options);
};
};