New Research: Supply Chain Attack on Axios Pulls Malicious Dependency from npm.Details →
Socket
Book a DemoSign in
Socket

task-queue

Package Overview
Dependencies
Maintainers
1
Versions
64
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

task-queue - pypi Package Compare versions

Comparing version
2.9.0
to
2.10.0
+2
-2
PKG-INFO
Metadata-Version: 2.1
Name: task-queue
Version: 2.9.0
Version: 2.10.0
Summary: Multithreaded cloud queue client.

@@ -13,3 +13,3 @@ Home-page: https://github.com/seung-lab/python-task-queue/

This package provides a client and system for generating, uploading, leasing, and executing dependency free tasks both locally and in the cloud using AWS SQS or on a single machine or cluster with a common file system using file based queues.
This package provides a client and system for generating, uploading, leasing, and executing dependency free tasks both locally and in the cloud using AWS SQS or on a single machine or cluster with a common file system using file based queues. Of note, file queue requires no setup or queue service and can be used in a distributed fashion on a network filesystem.

@@ -16,0 +16,0 @@ ## Installation

@@ -5,3 +5,3 @@ [![Build Status](https://travis-ci.org/seung-lab/python-task-queue.svg?branch=master)](https://travis-ci.org/seung-lab/python-task-queue) [![PyPI version](https://badge.fury.io/py/task-queue.svg)](https://badge.fury.io/py/task-queue)

This package provides a client and system for generating, uploading, leasing, and executing dependency free tasks both locally and in the cloud using AWS SQS or on a single machine or cluster with a common file system using file based queues.
This package provides a client and system for generating, uploading, leasing, and executing dependency free tasks both locally and in the cloud using AWS SQS or on a single machine or cluster with a common file system using file based queues. Of note, file queue requires no setup or queue service and can be used in a distributed fashion on a network filesystem.

@@ -8,0 +8,0 @@ ## Installation

@@ -1,1 +0,1 @@

{"git_version": "266ff0d", "is_release": true}
{"git_version": "5694f77", "is_release": true}
Metadata-Version: 2.1
Name: task-queue
Version: 2.9.0
Version: 2.10.0
Summary: Multithreaded cloud queue client.

@@ -13,3 +13,3 @@ Home-page: https://github.com/seung-lab/python-task-queue/

This package provides a client and system for generating, uploading, leasing, and executing dependency free tasks both locally and in the cloud using AWS SQS or on a single machine or cluster with a common file system using file based queues.
This package provides a client and system for generating, uploading, leasing, and executing dependency free tasks both locally and in the cloud using AWS SQS or on a single machine or cluster with a common file system using file based queues. Of note, file queue requires no setup or queue service and can be used in a distributed fashion on a network filesystem.

@@ -16,0 +16,0 @@ ## Installation

import os
import click
from tqdm import tqdm
from taskqueue import TaskQueue, __version__
from taskqueue import TaskQueue, __version__, QueueEmptyError
from taskqueue.lib import toabs

@@ -75,3 +76,4 @@ from taskqueue.paths import get_protocol

Currently sqs queues are not copiable,
but you can copy an fq to sqs.
but you can copy an fq to sqs. The mv
command supports sqs queues.
"""

@@ -81,2 +83,6 @@ src = normalize_path(src)

if get_protocol(src) == "sqs":
print("ptq: cp does not support sqs:// as a source.")
return
tqd = TaskQueue(dest)

@@ -87,3 +93,37 @@ tqs = TaskQueue(src)

@main.command()
@click.argument("src")
@click.argument("dest")
def mv(src, dest):
"""
Moves the contents of a queue to another
service or location. Do not run this
process while a queue is being worked.
Moving an sqs queue to a file queue
may result in duplicated tasks.
"""
src = normalize_path(src)
dest = normalize_path(dest)
tqd = TaskQueue(dest, progress=False)
tqs = TaskQueue(src, progress=False)
total = tqs.enqueued
with tqdm(total=total, desc="Moving") as pbar:
while True:
try:
tasks = tqs.lease(num_tasks=10, seconds=10)
except QueueEmptyError:
break
tqd.insert(tasks)
tqs.delete(tasks)
pbar.update(len(tasks))

@@ -8,2 +8,2 @@ from .registered_task import RegisteredTask, MockTask, PrintTask

__version__ = '2.9.0'
__version__ = '2.10.0'

@@ -119,3 +119,6 @@ import json

def _request(self, num_tasks, visibility_timeout):
def lease(self, seconds, num_tasks=1, wait_sec=20):
if wait_sec is None:
wait_sec = 20
resp = self.sqs.receive_message(

@@ -130,4 +133,4 @@ QueueUrl=self.qurl,

],
VisibilityTimeout=visibility_timeout,
WaitTimeSeconds=20,
VisibilityTimeout=seconds,
WaitTimeSeconds=wait_sec,
)

@@ -145,7 +148,2 @@

def lease(self, seconds, num_tasks=1):
if num_tasks > 1:
raise ValueError("This library (not boto/SQS) only supports fetching one task at a time. Requested: {}.".format(num_tasks))
return self._request(num_tasks, seconds)
def delete(self, task):

@@ -181,3 +179,3 @@ if type(task) == str:

# visibility_timeout must be > 0 for delete to work
tasks = self._request(num_tasks=10, visibility_timeout=10)
tasks = self.lease(num_tasks=10, seconds=10)
for task in tasks:

@@ -188,3 +186,3 @@ self.delete(task)

def __iter__(self):
return iter(self._request(num_tasks=10, visibility_timeout=0))
return iter(self.lease(num_tasks=10, seconds=0))

@@ -191,0 +189,0 @@

@@ -5,2 +5,3 @@ import fcntl

import json
import math
import operator

@@ -324,3 +325,6 @@ import os.path

def lease(self, seconds, num_tasks):
def lease(self, seconds, num_tasks, wait_sec=None):
if wait_sec is None:
wait_sec = 0
def fmt(direntry):

@@ -359,4 +363,21 @@ filename = direntry.name

return leases
wait_leases = []
if wait_sec > 0 and len(leases) < num_tasks:
# Add a constant b/c this will cascade into shorter and
# shorter checks as wait_sec shrinks and we don't
# want hundreds of workers to accidently synchronize
sleep_amt = random.random() * (wait_sec + 1)
# but we still want to guarantee that wait_sec is not
# exceeded.
sleep_amt = min(sleep_amt, wait_sec)
time.sleep(sleep_amt)
wait_leases = self.lease(
seconds,
num_tasks - len(leases),
wait_sec - sleep_amt
)
return leases + wait_leases
@retry

@@ -363,0 +384,0 @@ def delete(self, task):

@@ -186,3 +186,3 @@ import copy

def insertfn(batch):
return self.api.insert(batch, delay_seconds)
return self.api.insert(batch, delay_seconds)

@@ -223,3 +223,3 @@ cts = schedule_jobs(

def lease(self, seconds=600, num_tasks=1):
def lease(self, seconds=600, num_tasks=1, wait_sec=None):
"""

@@ -234,3 +234,3 @@ Acquires a lease on the topmost N unowned tasks in the specified queue.

tasks = self.api.lease(seconds, num_tasks)
tasks = self.api.lease(seconds, num_tasks, wait_sec)

@@ -251,5 +251,6 @@ if not len(tasks):

def deltask(tid):
self.api.delete(totaskid(tid))
num_deleted = self.api.delete(totaskid(tid))
if tally:
self.api.tally()
return num_deleted

@@ -569,3 +570,3 @@ schedule_jobs(

with pathos.pools.ProcessPool(parallel) as pool:
for num_inserted in pool.imap(uploadfn, sip(tasks, 2000)):
for num_inserted in pool.imap(uploadfn, sip(tasks, block_size)):
pbar.update(num_inserted)

@@ -572,0 +573,0 @@ ct += num_inserted

@@ -12,3 +12,7 @@ from functools import partial

import taskqueue
from taskqueue import queueable, FunctionTask, RegisteredTask, TaskQueue, MockTask, PrintTask, LocalTaskQueue
from taskqueue import (
queueable, FunctionTask, RegisteredTask,
TaskQueue, MockTask, PrintTask, LocalTaskQueue,
QueueEmptyError
)
from taskqueue.paths import ExtractedPath, mkpath

@@ -158,2 +162,24 @@ from taskqueue.queueables import totask

def test_lease(sqs):
path = getpath("sqs")
tq = TaskQueue(path, n_threads=0)
n_inserts = 20
tq.purge()
tq.insert(( PrintTask(str(x)) for x in range(n_inserts) ))
tasks = tq.lease(num_tasks=10, wait_sec=0)
assert len(tasks) == 10
tq.delete(tasks)
tasks = tq.lease(num_tasks=10, wait_sec=0)
assert len(tasks) == 10
tq.delete(tasks)
try:
tasks = tq.lease(num_tasks=10, wait_sec=0)
assert False
except QueueEmptyError:
pass
@pytest.mark.parametrize('protocol', PROTOCOL)

@@ -160,0 +186,0 @@ def test_single_threaded_insertion(sqs, protocol):

Sorry, the diff of this file is not supported yet