task-queue
Advanced tools
+2
-2
| Metadata-Version: 2.1 | ||
| Name: task-queue | ||
| Version: 2.9.0 | ||
| Version: 2.10.0 | ||
| Summary: Multithreaded cloud queue client. | ||
@@ -13,3 +13,3 @@ Home-page: https://github.com/seung-lab/python-task-queue/ | ||
| This package provides a client and system for generating, uploading, leasing, and executing dependency free tasks both locally and in the cloud using AWS SQS or on a single machine or cluster with a common file system using file based queues. | ||
| This package provides a client and system for generating, uploading, leasing, and executing dependency free tasks both locally and in the cloud using AWS SQS or on a single machine or cluster with a common file system using file based queues. Of note, file queue requires no setup or queue service and can be used in a distributed fashion on a network filesystem. | ||
@@ -16,0 +16,0 @@ ## Installation |
+1
-1
@@ -5,3 +5,3 @@ [](https://travis-ci.org/seung-lab/python-task-queue) [](https://badge.fury.io/py/task-queue) | ||
| This package provides a client and system for generating, uploading, leasing, and executing dependency free tasks both locally and in the cloud using AWS SQS or on a single machine or cluster with a common file system using file based queues. | ||
| This package provides a client and system for generating, uploading, leasing, and executing dependency free tasks both locally and in the cloud using AWS SQS or on a single machine or cluster with a common file system using file based queues. Of note, file queue requires no setup or queue service and can be used in a distributed fashion on a network filesystem. | ||
@@ -8,0 +8,0 @@ ## Installation |
@@ -1,1 +0,1 @@ | ||
| {"git_version": "266ff0d", "is_release": true} | ||
| {"git_version": "5694f77", "is_release": true} |
| Metadata-Version: 2.1 | ||
| Name: task-queue | ||
| Version: 2.9.0 | ||
| Version: 2.10.0 | ||
| Summary: Multithreaded cloud queue client. | ||
@@ -13,3 +13,3 @@ Home-page: https://github.com/seung-lab/python-task-queue/ | ||
| This package provides a client and system for generating, uploading, leasing, and executing dependency free tasks both locally and in the cloud using AWS SQS or on a single machine or cluster with a common file system using file based queues. | ||
| This package provides a client and system for generating, uploading, leasing, and executing dependency free tasks both locally and in the cloud using AWS SQS or on a single machine or cluster with a common file system using file based queues. Of note, file queue requires no setup or queue service and can be used in a distributed fashion on a network filesystem. | ||
@@ -16,0 +16,0 @@ ## Installation |
| import os | ||
| import click | ||
| from tqdm import tqdm | ||
| from taskqueue import TaskQueue, __version__ | ||
| from taskqueue import TaskQueue, __version__, QueueEmptyError | ||
| from taskqueue.lib import toabs | ||
@@ -75,3 +76,4 @@ from taskqueue.paths import get_protocol | ||
| Currently sqs queues are not copiable, | ||
| but you can copy an fq to sqs. | ||
| but you can copy an fq to sqs. The mv | ||
| command supports sqs queues. | ||
| """ | ||
@@ -81,2 +83,6 @@ src = normalize_path(src) | ||
| if get_protocol(src) == "sqs": | ||
| print("ptq: cp does not support sqs:// as a source.") | ||
| return | ||
| tqd = TaskQueue(dest) | ||
@@ -87,3 +93,37 @@ tqs = TaskQueue(src) | ||
| @main.command() | ||
| @click.argument("src") | ||
| @click.argument("dest") | ||
| def mv(src, dest): | ||
| """ | ||
| Moves the contents of a queue to another | ||
| service or location. Do not run this | ||
| process while a queue is being worked. | ||
| Moving an sqs queue to a file queue | ||
| may result in duplicated tasks. | ||
| """ | ||
| src = normalize_path(src) | ||
| dest = normalize_path(dest) | ||
| tqd = TaskQueue(dest, progress=False) | ||
| tqs = TaskQueue(src, progress=False) | ||
| total = tqs.enqueued | ||
| with tqdm(total=total, desc="Moving") as pbar: | ||
| while True: | ||
| try: | ||
| tasks = tqs.lease(num_tasks=10, seconds=10) | ||
| except QueueEmptyError: | ||
| break | ||
| tqd.insert(tasks) | ||
| tqs.delete(tasks) | ||
| pbar.update(len(tasks)) | ||
@@ -8,2 +8,2 @@ from .registered_task import RegisteredTask, MockTask, PrintTask | ||
| __version__ = '2.9.0' | ||
| __version__ = '2.10.0' |
@@ -119,3 +119,6 @@ import json | ||
| def _request(self, num_tasks, visibility_timeout): | ||
| def lease(self, seconds, num_tasks=1, wait_sec=20): | ||
| if wait_sec is None: | ||
| wait_sec = 20 | ||
| resp = self.sqs.receive_message( | ||
@@ -130,4 +133,4 @@ QueueUrl=self.qurl, | ||
| ], | ||
| VisibilityTimeout=visibility_timeout, | ||
| WaitTimeSeconds=20, | ||
| VisibilityTimeout=seconds, | ||
| WaitTimeSeconds=wait_sec, | ||
| ) | ||
@@ -145,7 +148,2 @@ | ||
| def lease(self, seconds, num_tasks=1): | ||
| if num_tasks > 1: | ||
| raise ValueError("This library (not boto/SQS) only supports fetching one task at a time. Requested: {}.".format(num_tasks)) | ||
| return self._request(num_tasks, seconds) | ||
| def delete(self, task): | ||
@@ -181,3 +179,3 @@ if type(task) == str: | ||
| # visibility_timeout must be > 0 for delete to work | ||
| tasks = self._request(num_tasks=10, visibility_timeout=10) | ||
| tasks = self.lease(num_tasks=10, seconds=10) | ||
| for task in tasks: | ||
@@ -188,3 +186,3 @@ self.delete(task) | ||
| def __iter__(self): | ||
| return iter(self._request(num_tasks=10, visibility_timeout=0)) | ||
| return iter(self.lease(num_tasks=10, seconds=0)) | ||
@@ -191,0 +189,0 @@ |
@@ -5,2 +5,3 @@ import fcntl | ||
| import json | ||
| import math | ||
| import operator | ||
@@ -324,3 +325,6 @@ import os.path | ||
| def lease(self, seconds, num_tasks): | ||
| def lease(self, seconds, num_tasks, wait_sec=None): | ||
| if wait_sec is None: | ||
| wait_sec = 0 | ||
| def fmt(direntry): | ||
@@ -359,4 +363,21 @@ filename = direntry.name | ||
| return leases | ||
| wait_leases = [] | ||
| if wait_sec > 0 and len(leases) < num_tasks: | ||
| # Add a constant b/c this will cascade into shorter and | ||
| # shorter checks as wait_sec shrinks and we don't | ||
| # want hundreds of workers to accidently synchronize | ||
| sleep_amt = random.random() * (wait_sec + 1) | ||
| # but we still want to guarantee that wait_sec is not | ||
| # exceeded. | ||
| sleep_amt = min(sleep_amt, wait_sec) | ||
| time.sleep(sleep_amt) | ||
| wait_leases = self.lease( | ||
| seconds, | ||
| num_tasks - len(leases), | ||
| wait_sec - sleep_amt | ||
| ) | ||
| return leases + wait_leases | ||
| @retry | ||
@@ -363,0 +384,0 @@ def delete(self, task): |
@@ -186,3 +186,3 @@ import copy | ||
| def insertfn(batch): | ||
| return self.api.insert(batch, delay_seconds) | ||
| return self.api.insert(batch, delay_seconds) | ||
@@ -223,3 +223,3 @@ cts = schedule_jobs( | ||
| def lease(self, seconds=600, num_tasks=1): | ||
| def lease(self, seconds=600, num_tasks=1, wait_sec=None): | ||
| """ | ||
@@ -234,3 +234,3 @@ Acquires a lease on the topmost N unowned tasks in the specified queue. | ||
| tasks = self.api.lease(seconds, num_tasks) | ||
| tasks = self.api.lease(seconds, num_tasks, wait_sec) | ||
@@ -251,5 +251,6 @@ if not len(tasks): | ||
| def deltask(tid): | ||
| self.api.delete(totaskid(tid)) | ||
| num_deleted = self.api.delete(totaskid(tid)) | ||
| if tally: | ||
| self.api.tally() | ||
| return num_deleted | ||
@@ -569,3 +570,3 @@ schedule_jobs( | ||
| with pathos.pools.ProcessPool(parallel) as pool: | ||
| for num_inserted in pool.imap(uploadfn, sip(tasks, 2000)): | ||
| for num_inserted in pool.imap(uploadfn, sip(tasks, block_size)): | ||
| pbar.update(num_inserted) | ||
@@ -572,0 +573,0 @@ ct += num_inserted |
@@ -12,3 +12,7 @@ from functools import partial | ||
| import taskqueue | ||
| from taskqueue import queueable, FunctionTask, RegisteredTask, TaskQueue, MockTask, PrintTask, LocalTaskQueue | ||
| from taskqueue import ( | ||
| queueable, FunctionTask, RegisteredTask, | ||
| TaskQueue, MockTask, PrintTask, LocalTaskQueue, | ||
| QueueEmptyError | ||
| ) | ||
| from taskqueue.paths import ExtractedPath, mkpath | ||
@@ -158,2 +162,24 @@ from taskqueue.queueables import totask | ||
| def test_lease(sqs): | ||
| path = getpath("sqs") | ||
| tq = TaskQueue(path, n_threads=0) | ||
| n_inserts = 20 | ||
| tq.purge() | ||
| tq.insert(( PrintTask(str(x)) for x in range(n_inserts) )) | ||
| tasks = tq.lease(num_tasks=10, wait_sec=0) | ||
| assert len(tasks) == 10 | ||
| tq.delete(tasks) | ||
| tasks = tq.lease(num_tasks=10, wait_sec=0) | ||
| assert len(tasks) == 10 | ||
| tq.delete(tasks) | ||
| try: | ||
| tasks = tq.lease(num_tasks=10, wait_sec=0) | ||
| assert False | ||
| except QueueEmptyError: | ||
| pass | ||
| @pytest.mark.parametrize('protocol', PROTOCOL) | ||
@@ -160,0 +186,0 @@ def test_single_threaded_insertion(sqs, protocol): |
Sorry, the diff of this file is not supported yet
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
158737
1.59%2236
3.14%