Security News
Research
Data Theft Repackaged: A Case Study in Malicious Wrapper Packages on npm
The Socket Research Team breaks down a malicious wrapper package that uses obfuscation to harvest credentials and exfiltrate sensitive data.
A simple task queue system optimized for very long running tasks.
Queue
A list of tasks, of a given type, to be run. Multiple queues may be used to execute different types of tasks on different nodes.
Task
A piece of work to be done, including its parameters, current status, result (if done), etc.
Node
A computer running tasks from the queue.
Listener
A process running on a node, listening for new tasks on a single queue. The system does not support multiple listeners listening to the same queue on the same node.
Process
A process, running on a node, created by a listener, executing a task.
The Blueque.Client
object stores the connection to the Blueque
system, and constructs the other objects necessary for interacting
with the system
client = Blueque.Client("redis://hostname:port/db")
The Blueque.queue.Queue
object provides the interface to a named
queue of tasks.
queue = client.get_queue("some.queue")
Queue.enqueue
task_id = queue.enqueue(parameters)
Returns the Task ID (a string) of the newly enqueued task
The task object provides a basic, read-only view of all the attributes of a task.
task = client.get_task(task_id)
There is a read-only attribute for all of the task attributes stored in Redis (see below).
The listener object provides the interface used to listen for new tasks.
listener = client.get_listener("some.queue")
Listener.listen
task = listener.listen(on_new_task)
Blocks listening for a new task to execute until one is ready, then
returns a Task
object describing that task.
A Processor object provides the interface used to update a task while it is being processed.
processor = client.get_processor(task)
Processor.start
parameters = processor.start(pid)
Marks a task as being started, by the specified pid, and returns the parameters of that task.
Processor.complete
processor.complete("some result")
Marks a task as being complete, and stores the result.
Processor.fail
processor.fail("some error")
Marks a task as having failed, and stores the error.
Currently, the backend structure is Redis. Keys are prefixed with a namespace, i.e. named "bluequeue_foo", so that other systems can use the Redis DB for other things.
blueque_pending_tasks_[queue name]
Stored in a List
, accessed as a queue: new tasks are added via
LPUSH
, tasks are removed for execution via RPOP
. There is a List
for each task queue (channel). All that is stored in the List
is a
task ID, which will be used to retrieve the task data.
blueque_reserved_tasks_[queue name]_[listener ID]
Stored in a List
, this is used to keep track of which listeners are
running which tasks. Tasks should be atomically moved from the Task
Queue to the Listener Task List via RPOPLPUSH
, so that they don't get
lost.
Note: this will not be implemented in the first pass.
There is a Pub/Sub Channel
for each task queue (channel). This can
be used by the listener client to listen for new tasks if the task queue
they were interested in was empty when they last checked for a task.
Messages should not include the task ID of the newly created task,
because listeners must be required to manually try to LPOP
the task
off the task queue, so that only one work runs each task.
blueque_tasks_[queue name]
There is a list of all the tasks in a queue, regardless of their state. This is mostly used for introspection/management purposes.
blueque_task_[task id]
The actual data associated with a task will be stored as a hash, where the key is built from the task ID (i.e. "bluequeue_task_NNNN"). Each subsystem will be able to add its own fields to the hash.
Current fields are
status
One of: scheduled
, pending
, reserved
, started
, complete
or failed
.
queue
The queue that the task is in (mostly just for debugging purposes).
parameters
String containing data to be passed to the process. Totally task specific.
result
String containing the result of the process; totally task-specific. Will not be set if the task hasn't completed yet.
error
String containing the error generated by the process. Set only if
status
is failed
.
node
The listener ID of the node running the task. Will not be set if the task has not been started yet.
pid
The PID of the process running the task on the node. Will not be set if the task has not been started yet.
created
A floating point value containing the Python timestampe
(time.time()
) of the time when the task was first created
(enqueued).
updated
A floating point value containing the Python timestamp
(time.time()
) of the last time any value in the task changed.
eta
The timestamp when the task is scheduled to be executed. Will not be set if the task was not scheduled with an ETA.
blueque_listeners_[queue name]
In order for the system to be easily introspected, the currently
active listeners will be stored in a Redis Set
. Listeners are stored
by their LISTENER ID
, which must be [hostname]_[pid]
.
Note that this means that all hosts in the system must have unique names.
blueque_queues
There is a Sorted Set
containing the names of all the queues, where
the score of the set is the number of nodes listening to that set.
When a node comes online, it increments the score by 1; when a node goes offline (cleanly) it increments the score by -1. Every time a task is enqueued, the score should be incremented by 0, so that a queue with tasks, but no listeners, still shows up in the set.
Tasks are executed via this workflow. Note that any HMSET
call which
modifies a blueque_task_[TASK ID]
should be assumed to also set the
updated
field of the task.
Tasks should be submitted by creating a UUID, [TID], JSON encoding the parameters, [PARAMS], and then executing:
MULTI
HMSET blueque_task_[TASK ID] status pending queue [QUEUE] parameters [PARAMS]
ZINCRBY blueque_queues 0 [QUEUE]
LPUSH blueque_pending_tasks_[QUEUE] [TASK ID]
EXEC
Nodes should pop a task off the queue and then set the status of the
task to started
, and set the node
field of the task.
If no task is popped off the queue, the Node should wait for a new task notification. Ideally, this will be via Pub/Sub, but, at first, we can do it by polling.
Tasks are popped using the following commands.
RPOPLPUSH blueque_pending_tasks_[QUEUE] [NODE TASKS]
HMSET blueque_task_[TASK ID] status reserved node [NODE]
Note that these two commands cannot be executed atomically because the
second depends on the first, and, even with Lua scripting, that cannot
be done atomically and safely. Therefore, there is a chance that a
task is popped off the pending queue, but its record is not
updated. This can be detected if a task is in a node's queue, but has
a status of pending
.
Note: We will not implement this in the first pass. Listeners will just poll once every few seconds.
If a listener receives a message via a Pub/Sub channel that a queue has a task in it, it should try to atomically pop a task off that channel (see above). If it does get a task, it should unsubscribe from the channel; if it does not (i.e. another listener got the task) it should remain subscribed.
When a process starts executing a task on a node, it should update the task to indicate that, and also add itself to a set of all active tasks:
MULTI
SADD blueque_started_tasks_[QUEUE] "[LISTENER ID] [PID] [TASK ID]"
HMSET blueque_task_[TASK ID] status started pid [PID]
EXEC
Note that this assumes that the process is told what task to execute, rather than pulling it off the node's task list.
If a task completes successfully, it should set the status
field of
the task to succeeded
and set the result
field to the
JSON-serialized result of the task, as a single atomic transaction.
MULTI
LREM blueque_reserved_tasks_[QUEUE]_[LISTENER ID] 1 [TASK ID]
ZREM blueque_started_tasks_[QUEUE] "[LISTENER ID] [PID] [TASK ID]"
HMSET blueque_task_[TASK ID] status complete result [RESULT]
LPUSH blueque_complete_tasks_[QUEUE] [TASK ID]
EXEC
If a task fails for any reason (process function raises an exception,
or some monitoring process determines that the process fails more
catastrophically), the process that detects the error should set the
status
field of the task to failed
and the error
field to a
JSON-serialized description of the error (see above).
MULTI
LREM blueque_reserved_tasks_[QUEUE]_[LISTENER ID] 1 [TASK ID]
ZREM blueque_started_tasks_[QUEUE] "[LISTENER ID] [PID] [TASK ID]"
HMSET blueque_task_[TASK ID] status failed error [ERROR]
LPUSH blueque_failed_tasks_[QUEUE] [TASK ID]
EXEC
Once everybody interested in a task's result (or error) has been notified, the task data needs to be deleted from Redis, so that it does not leak data.
The command to do this is slightly different, depending on whether it is complete or failed:
MULTI
DEL blueque_task_[TASK ID]
LREM blueque_[status]_tasks_[QUEUE] 1 [TASK ID]
EXEC
A task can be scheduled for execution at a later time by adding it to a sorted set, where the score is the timestamp when the task should be executed.
HMSET blueque_task_[TASK ID] status scheduled queue [QUEUE] parameters [PARAMS] eta [TIMESTAMP]
ZINCRBY blueque_queues 0 [QUEUE]
ZADD blueque_scheduled_tasks_[QUEUE] [TIMESTAMP] [TASK ID]
A process must be running which periodically checks the scheduled task list for each queue and adds them to the queue to be run at the scheduled time.
WATCH blueque_scheduled_tasks_[QUEUE]
to_run = ZRANGEBYSCORE blueque_scheduled_tasks_[QUEUE] 0 [CURRENT TIME]
MULTI
ZREMRANGEBYSCORE blueque_scheduled_tasks_[QUEUE] 0 [CURRENT TIME]
LPUSH blueque_pending_tasks_[QUEUE] to_run[0] ... to_run[n]
for task in to_run:
HMSET blueque_task_[TASK ID] status pending
EXEC
Note that [CURRENT TIME]
should only be read once, before executing
the transaction, so that the same tasks which were fetched are the
ones that are removed.
FAQs
Simple job queueing for very long tasks
We found that blueque demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Research
The Socket Research Team breaks down a malicious wrapper package that uses obfuscation to harvest credentials and exfiltrate sensitive data.
Research
Security News
Attackers used a malicious npm package typosquatting a popular ESLint plugin to steal sensitive data, execute commands, and exploit developer systems.
Security News
The Ultralytics' PyPI Package was compromised four times in one weekend through GitHub Actions cache poisoning and failure to rotate previously compromised API tokens.