karton-core
Advanced tools
| import sys, types, os;has_mfs = sys.version_info > (3, 5);p = os.path.join(sys._getframe(1).f_locals['sitedir'], *('karton',));importlib = has_mfs and __import__('importlib.util');has_mfs and __import__('importlib.machinery');m = has_mfs and sys.modules.setdefault('karton', importlib.util.module_from_spec(importlib.machinery.PathFinder.find_spec('karton', [os.path.dirname(p)])));m = m or sys.modules.setdefault('karton', types.ModuleType('karton'));mp = (m or []) and m.__dict__.setdefault('__path__',[]);(p not in mp) and mp.append(p) |
| import sys | ||
| if sys.version_info < (3, 11, 0): | ||
| raise ImportError("karton.core.asyncio is only compatible with Python 3.11+") | ||
| from karton.core.config import Config | ||
| from karton.core.task import Task | ||
| from .karton import Consumer, Karton, Producer | ||
| from .resource import LocalResource, RemoteResource, Resource | ||
| __all__ = [ | ||
| "Karton", | ||
| "Producer", | ||
| "Consumer", | ||
| "Task", | ||
| "Config", | ||
| "LocalResource", | ||
| "Resource", | ||
| "RemoteResource", | ||
| ] |
| import json | ||
| import logging | ||
| import time | ||
| from typing import IO, Any, Dict, List, Optional, Tuple, Union | ||
| import aioboto3 | ||
| from aiobotocore.credentials import ContainerProvider, InstanceMetadataProvider | ||
| from aiobotocore.session import ClientCreatorContext, get_session | ||
| from aiobotocore.utils import InstanceMetadataFetcher | ||
| from redis.asyncio import Redis | ||
| from redis.asyncio.client import Pipeline | ||
| from redis.exceptions import AuthenticationError | ||
| from karton.core import Config, Task | ||
| from karton.core.asyncio.resource import RemoteResource | ||
| from karton.core.backend import ( | ||
| KARTON_BINDS_HSET, | ||
| KARTON_TASK_NAMESPACE, | ||
| KARTON_TASKS_QUEUE, | ||
| KartonBackendBase, | ||
| KartonBind, | ||
| KartonMetrics, | ||
| KartonServiceInfo, | ||
| ) | ||
| from karton.core.task import TaskState | ||
| logger = logging.getLogger(__name__) | ||
| class KartonAsyncBackend(KartonBackendBase): | ||
| def __init__( | ||
| self, | ||
| config: Config, | ||
| identity: Optional[str] = None, | ||
| service_info: Optional[KartonServiceInfo] = None, | ||
| ) -> None: | ||
| super().__init__(config, identity, service_info) | ||
| self._redis: Optional[Redis] = None | ||
| self._s3_session: Optional[aioboto3.Session] = None | ||
| self._s3_iam_auth = False | ||
| @property | ||
| def redis(self) -> Redis: | ||
| if not self._redis: | ||
| raise RuntimeError("Call connect() first before using KartonAsyncBackend") | ||
| return self._redis | ||
| @property | ||
| def s3(self) -> ClientCreatorContext: | ||
| if not self._s3_session: | ||
| raise RuntimeError("Call connect() first before using KartonAsyncBackend") | ||
| endpoint = self.config.get("s3", "address") | ||
| if self._s3_iam_auth: | ||
| return self._s3_session.client( | ||
| "s3", | ||
| endpoint_url=endpoint, | ||
| ) | ||
| else: | ||
| access_key = self.config.get("s3", "access_key") | ||
| secret_key = self.config.get("s3", "secret_key") | ||
| return self._s3_session.client( | ||
| "s3", | ||
| endpoint_url=endpoint, | ||
| aws_access_key_id=access_key, | ||
| aws_secret_access_key=secret_key, | ||
| ) | ||
| async def connect(self): | ||
| if self._redis is not None or self._s3_session is not None: | ||
| # Already connected | ||
| return | ||
| self._redis = await self.make_redis( | ||
| self.config, identity=self.identity, service_info=self.service_info | ||
| ) | ||
| endpoint = self.config.get("s3", "address") | ||
| access_key = self.config.get("s3", "access_key") | ||
| secret_key = self.config.get("s3", "secret_key") | ||
| iam_auth = self.config.getboolean("s3", "iam_auth") | ||
| if not endpoint: | ||
| raise RuntimeError("Attempting to get S3 client without an endpoint set") | ||
| if access_key and secret_key and iam_auth: | ||
| logger.warning( | ||
| "Warning: iam is turned on and both S3 access key and secret key are" | ||
| " provided" | ||
| ) | ||
| if iam_auth: | ||
| s3_client_creator = await self.iam_auth_s3() | ||
| if s3_client_creator: | ||
| self._s3_iam_auth = True | ||
| self._s3_session = s3_client_creator | ||
| return | ||
| if access_key is None or secret_key is None: | ||
| raise RuntimeError( | ||
| "Attempting to get S3 client without an access_key/secret_key set" | ||
| ) | ||
| session = aioboto3.Session() | ||
| self._s3_session = session | ||
| async def iam_auth_s3(self): | ||
| boto_session = get_session() | ||
| iam_providers = [ | ||
| ContainerProvider(), | ||
| InstanceMetadataProvider( | ||
| iam_role_fetcher=InstanceMetadataFetcher(timeout=1000, num_attempts=2) | ||
| ), | ||
| ] | ||
| for provider in iam_providers: | ||
| creds = await provider.load() | ||
| if creds: | ||
| boto_session._credentials = creds # type: ignore | ||
| return aioboto3.Session(botocore_session=boto_session) | ||
| @staticmethod | ||
| async def make_redis( | ||
| config, | ||
| identity: Optional[str] = None, | ||
| service_info: Optional[KartonServiceInfo] = None, | ||
| ) -> Redis: | ||
| """ | ||
| Create and test a Redis connection. | ||
| :param config: The karton configuration | ||
| :param identity: Karton service identity | ||
| :param service_info: Additional service identity metadata | ||
| :return: Redis connection | ||
| """ | ||
| if service_info is not None: | ||
| client_name: Optional[str] = service_info.make_client_name() | ||
| else: | ||
| client_name = identity | ||
| redis_args = { | ||
| "host": config["redis"]["host"], | ||
| "port": config.getint("redis", "port", 6379), | ||
| "db": config.getint("redis", "db", 0), | ||
| "username": config.get("redis", "username"), | ||
| "password": config.get("redis", "password"), | ||
| "client_name": client_name, | ||
| # set socket_timeout to None if set to 0 | ||
| "socket_timeout": config.getint("redis", "socket_timeout", 30) or None, | ||
| "decode_responses": True, | ||
| } | ||
| try: | ||
| rs = Redis(**redis_args) | ||
| await rs.ping() | ||
| except AuthenticationError: | ||
| # Maybe we've sent a wrong password. | ||
| # Or maybe the server is not (yet) password protected | ||
| # To make smooth transition possible, try to login insecurely | ||
| del redis_args["password"] | ||
| rs = Redis(**redis_args) | ||
| await rs.ping() | ||
| return rs | ||
| def unserialize_resource(self, resource_spec: Dict[str, Any]) -> RemoteResource: | ||
| """ | ||
| Unserializes resource into a RemoteResource object bound with current backend | ||
| :param resource_spec: Resource specification | ||
| :return: RemoteResource object | ||
| """ | ||
| return RemoteResource.from_dict(resource_spec, backend=self) | ||
| async def register_task(self, task: Task, pipe: Optional[Pipeline] = None) -> None: | ||
| """ | ||
| Register or update task in Redis. | ||
| :param task: Task object | ||
| :param pipe: Optional pipeline object if operation is a part of pipeline | ||
| """ | ||
| rs = pipe or self.redis | ||
| await rs.set(f"{KARTON_TASK_NAMESPACE}:{task.uid}", task.serialize()) | ||
| async def set_task_status( | ||
| self, task: Task, status: TaskState, pipe: Optional[Pipeline] = None | ||
| ) -> None: | ||
| """ | ||
| Request task status change to be applied by karton-system | ||
| :param task: Task object | ||
| :param status: New task status (TaskState) | ||
| :param pipe: Optional pipeline object if operation is a part of pipeline | ||
| """ | ||
| if task.status == status: | ||
| return | ||
| task.status = status | ||
| task.last_update = time.time() | ||
| await self.register_task(task, pipe=pipe) | ||
| async def register_bind(self, bind: KartonBind) -> Optional[KartonBind]: | ||
| """ | ||
| Register bind for Karton service and return the old one | ||
| :param bind: KartonBind object with bind definition | ||
| :return: Old KartonBind that was registered under this identity | ||
| """ | ||
| async with self.redis.pipeline(transaction=True) as pipe: | ||
| await pipe.hget(KARTON_BINDS_HSET, bind.identity) | ||
| await pipe.hset(KARTON_BINDS_HSET, bind.identity, self.serialize_bind(bind)) | ||
| old_serialized_bind, _ = await pipe.execute() | ||
| if old_serialized_bind: | ||
| return self.unserialize_bind(bind.identity, old_serialized_bind) | ||
| else: | ||
| return None | ||
| async def get_bind(self, identity: str) -> KartonBind: | ||
| """ | ||
| Get bind object for given identity | ||
| :param identity: Karton service identity | ||
| :return: KartonBind object | ||
| """ | ||
| return self.unserialize_bind( | ||
| identity, await self.redis.hget(KARTON_BINDS_HSET, identity) | ||
| ) | ||
| async def produce_unrouted_task(self, task: Task) -> None: | ||
| """ | ||
| Add given task to unrouted task (``karton.tasks``) queue | ||
| Task must be registered before with :py:meth:`register_task` | ||
| :param task: Task object | ||
| """ | ||
| await self.redis.rpush(KARTON_TASKS_QUEUE, task.uid) | ||
| async def consume_queues( | ||
| self, queues: Union[str, List[str]], timeout: int = 0 | ||
| ) -> Optional[Tuple[str, str]]: | ||
| """ | ||
| Get item from queues (ordered from the most to the least prioritized) | ||
| If there are no items, wait until one appear. | ||
| :param queues: Redis queue name or list of names | ||
| :param timeout: Waiting for item timeout (default: 0 = wait forever) | ||
| :return: Tuple of [queue_name, item] objects or None if timeout has been reached | ||
| """ | ||
| return await self.redis.blpop(queues, timeout=timeout) | ||
| async def get_task(self, task_uid: str) -> Optional[Task]: | ||
| """ | ||
| Get task object with given identifier | ||
| :param task_uid: Task identifier | ||
| :return: Task object | ||
| """ | ||
| task_data = await self.redis.get(f"{KARTON_TASK_NAMESPACE}:{task_uid}") | ||
| if not task_data: | ||
| return None | ||
| return Task.unserialize( | ||
| task_data, resource_unserializer=self.unserialize_resource | ||
| ) | ||
| async def consume_routed_task( | ||
| self, identity: str, timeout: int = 5 | ||
| ) -> Optional[Task]: | ||
| """ | ||
| Get routed task for given consumer identity. | ||
| If there are no tasks, blocks until new one appears or timeout is reached. | ||
| :param identity: Karton service identity | ||
| :param timeout: Waiting for task timeout (default: 5) | ||
| :return: Task object | ||
| """ | ||
| item = await self.consume_queues( | ||
| self.get_queue_names(identity), | ||
| timeout=timeout, | ||
| ) | ||
| if not item: | ||
| return None | ||
| queue, data = item | ||
| return await self.get_task(data) | ||
| async def increment_metrics( | ||
| self, metric: KartonMetrics, identity: str, pipe: Optional[Pipeline] = None | ||
| ) -> None: | ||
| """ | ||
| Increments metrics for given operation type and identity | ||
| :param metric: Operation metric type | ||
| :param identity: Related Karton service identity | ||
| :param pipe: Optional pipeline object if operation is a part of pipeline | ||
| """ | ||
| rs = pipe or self.redis | ||
| await rs.hincrby(metric.value, identity, 1) | ||
| async def upload_object( | ||
| self, | ||
| bucket: str, | ||
| object_uid: str, | ||
| content: Union[bytes, IO[bytes]], | ||
| ) -> None: | ||
| """ | ||
| Upload resource object to underlying object storage (S3) | ||
| :param bucket: Bucket name | ||
| :param object_uid: Object identifier | ||
| :param content: Object content as bytes or file-like stream | ||
| """ | ||
| async with self.s3 as client: | ||
| await client.put_object(Bucket=bucket, Key=object_uid, Body=content) | ||
| async def upload_object_from_file( | ||
| self, bucket: str, object_uid: str, path: str | ||
| ) -> None: | ||
| """ | ||
| Upload resource object file to underlying object storage | ||
| :param bucket: Bucket name | ||
| :param object_uid: Object identifier | ||
| :param path: Path to the object content | ||
| """ | ||
| async with self.s3 as client: | ||
| with open(path, "rb") as f: | ||
| await client.put_object(Bucket=bucket, Key=object_uid, Body=f) | ||
| async def download_object(self, bucket: str, object_uid: str) -> bytes: | ||
| """ | ||
| Download resource object from object storage. | ||
| :param bucket: Bucket name | ||
| :param object_uid: Object identifier | ||
| :return: Content bytes | ||
| """ | ||
| async with self.s3 as client: | ||
| obj = await client.get_object(Bucket=bucket, Key=object_uid) | ||
| return await obj["Body"].read() | ||
| async def download_object_to_file( | ||
| self, bucket: str, object_uid: str, path: str | ||
| ) -> None: | ||
| """ | ||
| Download resource object from object storage to file | ||
| :param bucket: Bucket name | ||
| :param object_uid: Object identifier | ||
| :param path: Target file path | ||
| """ | ||
| async with self.s3 as client: | ||
| await client.download_file(Bucket=bucket, Key=object_uid, Filename=path) | ||
| async def produce_log( | ||
| self, | ||
| log_record: Dict[str, Any], | ||
| logger_name: str, | ||
| level: str, | ||
| ) -> bool: | ||
| """ | ||
| Push new log record to the logs channel | ||
| :param log_record: Dict with log record | ||
| :param logger_name: Logger name | ||
| :param level: Log level | ||
| :return: True if any active log consumer received log record | ||
| """ | ||
| return ( | ||
| await self.redis.publish( | ||
| self._log_channel(logger_name, level), json.dumps(log_record) | ||
| ) | ||
| > 0 | ||
| ) |
| import abc | ||
| import asyncio | ||
| import signal | ||
| from asyncio import CancelledError | ||
| from typing import Optional | ||
| from karton.core import Task | ||
| from karton.core.__version__ import __version__ | ||
| from karton.core.backend import KartonServiceInfo | ||
| from karton.core.base import ConfigMixin, LoggingMixin | ||
| from karton.core.config import Config | ||
| from karton.core.task import get_current_task, set_current_task | ||
| from karton.core.utils import StrictClassMethod | ||
| from .backend import KartonAsyncBackend | ||
| from .logger import KartonAsyncLogHandler | ||
| class KartonAsyncBase(abc.ABC, ConfigMixin, LoggingMixin): | ||
| """ | ||
| Base class for all Karton services | ||
| You can set an informative version information by setting the ``version`` class | ||
| attribute. | ||
| """ | ||
| #: Karton service identity | ||
| identity: str = "" | ||
| #: Karton service version | ||
| version: Optional[str] = None | ||
| #: Include extended service information for non-consumer services | ||
| with_service_info: bool = False | ||
| def __init__( | ||
| self, | ||
| config: Optional[Config] = None, | ||
| identity: Optional[str] = None, | ||
| backend: Optional[KartonAsyncBackend] = None, | ||
| ) -> None: | ||
| ConfigMixin.__init__(self, config, identity) | ||
| self.service_info = None | ||
| if self.identity is not None and self.with_service_info: | ||
| self.service_info = KartonServiceInfo( | ||
| identity=self.identity, | ||
| karton_version=__version__, | ||
| service_version=self.version, | ||
| ) | ||
| self.backend = backend or KartonAsyncBackend( | ||
| self.config, identity=self.identity, service_info=self.service_info | ||
| ) | ||
| log_handler = KartonAsyncLogHandler(backend=self.backend, channel=self.identity) | ||
| LoggingMixin.__init__( | ||
| self, | ||
| log_handler, | ||
| log_format="[%(asctime)s][%(levelname)s][%(task_id)s] %(message)s", | ||
| ) | ||
| async def connect(self) -> None: | ||
| await self.backend.connect() | ||
| @property | ||
| def current_task(self) -> Optional[Task]: | ||
| return get_current_task() | ||
| @current_task.setter | ||
| def current_task(self, task: Optional[Task]): | ||
| set_current_task(task) | ||
| class KartonAsyncServiceBase(KartonAsyncBase): | ||
| """ | ||
| Karton base class for looping services. | ||
| You can set an informative version information by setting the ``version`` class | ||
| attribute | ||
| :param config: Karton config to use for service configuration | ||
| :param identity: Karton service identity to use | ||
| :param backend: Karton backend to use | ||
| """ | ||
| def __init__( | ||
| self, | ||
| config: Optional[Config] = None, | ||
| identity: Optional[str] = None, | ||
| backend: Optional[KartonAsyncBackend] = None, | ||
| ) -> None: | ||
| super().__init__( | ||
| config=config, | ||
| identity=identity, | ||
| backend=backend, | ||
| ) | ||
| self.setup_logger() | ||
| self._loop_coro: Optional[asyncio.Task] = None | ||
| def _do_shutdown(self) -> None: | ||
| self.log.info("Got signal, shutting down...") | ||
| if self._loop_coro is not None: | ||
| self._loop_coro.cancel() | ||
| @abc.abstractmethod | ||
| async def _loop(self) -> None: | ||
| raise NotImplementedError | ||
| # Base class for Karton services | ||
| async def loop(self) -> None: | ||
| if self.enable_publish_log and hasattr(self.log_handler, "start_consuming"): | ||
| self.log_handler.start_consuming() | ||
| await self.connect() | ||
| event_loop = asyncio.get_event_loop() | ||
| for sig in (signal.SIGTERM, signal.SIGINT): | ||
| event_loop.add_signal_handler(sig, self._do_shutdown) | ||
| self._loop_coro = asyncio.create_task(self._loop()) | ||
| try: | ||
| await self._loop_coro | ||
| finally: | ||
| for sig in (signal.SIGTERM, signal.SIGINT): | ||
| event_loop.remove_signal_handler(sig) | ||
| if self.enable_publish_log and hasattr(self.log_handler, "stop_consuming"): | ||
| await self.log_handler.stop_consuming() | ||
| @StrictClassMethod | ||
| def main(cls) -> None: | ||
| """Main method invoked from CLI.""" | ||
| service = cls.karton_from_args() | ||
| try: | ||
| asyncio.run(service.loop()) | ||
| except CancelledError: | ||
| # Swallow cancellation, we're done! | ||
| pass |
| import abc | ||
| import argparse | ||
| import asyncio | ||
| import sys | ||
| import time | ||
| import traceback | ||
| from asyncio import CancelledError | ||
| from typing import Any, Dict, List, Optional | ||
| from karton.core import query | ||
| from karton.core.__version__ import __version__ | ||
| from karton.core.backend import KartonBind, KartonMetrics | ||
| from karton.core.config import Config | ||
| from karton.core.exceptions import TaskTimeoutError | ||
| from karton.core.resource import LocalResource as SyncLocalResource | ||
| from karton.core.task import Task, TaskState | ||
| from .backend import KartonAsyncBackend | ||
| from .base import KartonAsyncBase, KartonAsyncServiceBase | ||
| from .resource import LocalResource | ||
| class Producer(KartonAsyncBase): | ||
| """ | ||
| Producer part of Karton. Used for dispatching initial tasks into karton. | ||
| :param config: Karton configuration object (optional) | ||
| :type config: :class:`karton.Config` | ||
| :param identity: Producer name (optional) | ||
| :type identity: str | ||
| Usage example: | ||
| .. code-block:: python | ||
| from karton.core.asyncio import Producer | ||
| producer = Producer(identity="karton.mwdb") | ||
| await producer.connect() | ||
| task = Task( | ||
| headers={ | ||
| "type": "sample", | ||
| "kind": "raw" | ||
| }, | ||
| payload={ | ||
| "sample": Resource("sample.exe", b"put content here") | ||
| } | ||
| ) | ||
| await producer.send_task(task) | ||
| :param config: Karton config to use for service configuration | ||
| :param identity: Karton producer identity | ||
| :param backend: Karton backend to use | ||
| """ | ||
| def __init__( | ||
| self, | ||
| config: Optional[Config] = None, | ||
| identity: Optional[str] = None, | ||
| backend: Optional[KartonAsyncBackend] = None, | ||
| ) -> None: | ||
| super().__init__(config=config, identity=identity, backend=backend) | ||
| async def send_task(self, task: Task) -> bool: | ||
| """ | ||
| Sends a task to the unrouted task queue. Takes care of logging. | ||
| Given task will be child of task we are currently handling (if such exists). | ||
| :param task: Task object to be sent | ||
| :return: Bool indicating if the task was delivered | ||
| """ | ||
| self.log.debug("Dispatched task %s", task.uid) | ||
| # Complete information about task | ||
| if self.current_task is not None: | ||
| task.set_task_parent(self.current_task) | ||
| task.merge_persistent_payload(self.current_task) | ||
| task.merge_persistent_headers(self.current_task) | ||
| task.priority = self.current_task.priority | ||
| task.last_update = time.time() | ||
| task.headers.update({"origin": self.identity}) | ||
| # Ensure all local resources have good buckets | ||
| for resource in task.iterate_resources(): | ||
| if isinstance(resource, LocalResource) and not resource.bucket: | ||
| resource.bucket = self.backend.default_bucket_name | ||
| if isinstance(resource, SyncLocalResource): | ||
| raise RuntimeError( | ||
| "Synchronous resources are not supported. " | ||
| "Use karton.core.asyncio.resource module instead." | ||
| ) | ||
| # Register new task | ||
| await self.backend.register_task(task) | ||
| # Upload local resources | ||
| for resource in task.iterate_resources(): | ||
| if isinstance(resource, LocalResource): | ||
| await resource.upload(self.backend) | ||
| # Add task to karton.tasks | ||
| await self.backend.produce_unrouted_task(task) | ||
| await self.backend.increment_metrics(KartonMetrics.TASK_PRODUCED, self.identity) | ||
| return True | ||
| class Consumer(KartonAsyncServiceBase): | ||
| """ | ||
| Base consumer class, this is the part of Karton responsible for processing | ||
| incoming tasks | ||
| :param config: Karton config to use for service configuration | ||
| :param identity: Karton service identity | ||
| :param backend: Karton backend to use | ||
| :param task_timeout: The maximum time, in seconds, this consumer will wait for | ||
| a task to finish processing before being CRASHED on timeout. | ||
| Set 0 for unlimited, and None for using global value | ||
| :param concurrency_limit: The maximum number of concurrent tasks that may be | ||
| gathered from queue and processed asynchronously. | ||
| """ | ||
| filters: List[Dict[str, Any]] = [] | ||
| persistent: bool = True | ||
| version: Optional[str] = None | ||
| task_timeout = None | ||
| concurrency_limit: Optional[int] = 1 | ||
| def __init__( | ||
| self, | ||
| config: Optional[Config] = None, | ||
| identity: Optional[str] = None, | ||
| backend: Optional[KartonAsyncBackend] = None, | ||
| ) -> None: | ||
| super().__init__(config=config, identity=identity, backend=backend) | ||
| if self.filters is None: | ||
| raise ValueError("Cannot bind consumer on Empty binds") | ||
| # Dummy conversion to make sure the filters are well-formed. | ||
| query.convert(self.filters) | ||
| self.persistent = ( | ||
| self.config.getboolean("karton", "persistent", self.persistent) | ||
| and not self.debug | ||
| ) | ||
| if self.task_timeout is None: | ||
| self.task_timeout = self.config.getint("karton", "task_timeout") | ||
| self.concurrency_limit = self.config.getint( | ||
| "karton", "concurrency_limit", self.concurrency_limit | ||
| ) | ||
| self.concurrency_semaphore: Optional[asyncio.Semaphore] = None | ||
| if self.concurrency_limit is not None: | ||
| self.concurrency_semaphore = asyncio.BoundedSemaphore( | ||
| self.concurrency_limit | ||
| ) | ||
| @abc.abstractmethod | ||
| async def process(self, task: Task) -> None: | ||
| """ | ||
| Task processing method. | ||
| :param task: The incoming task object | ||
| self.current_task contains task that triggered invocation of | ||
| :py:meth:`karton.Consumer.process` but you should only focus on the passed | ||
| task object and shouldn't interact with the field directly. | ||
| """ | ||
| raise NotImplementedError() | ||
| async def _internal_process(self, task: Task) -> None: | ||
| exception_str = None | ||
| try: | ||
| self.log.info("Received new task - %s", task.uid) | ||
| await self.backend.set_task_status(task, TaskState.STARTED) | ||
| if self.task_timeout: | ||
| try: | ||
| # asyncio.timeout is Py3.11+ | ||
| async with asyncio.timeout(self.task_timeout): # type: ignore | ||
| await self.process(task) | ||
| except asyncio.TimeoutError as e: | ||
| raise TaskTimeoutError from e | ||
| else: | ||
| await self.process(task) | ||
| self.log.info("Task done - %s", task.uid) | ||
| except (Exception, TaskTimeoutError, CancelledError): | ||
| exc_info = sys.exc_info() | ||
| exception_str = traceback.format_exception(*exc_info) | ||
| await self.backend.increment_metrics( | ||
| KartonMetrics.TASK_CRASHED, self.identity | ||
| ) | ||
| self.log.exception("Failed to process task - %s", task.uid) | ||
| finally: | ||
| await self.backend.increment_metrics( | ||
| KartonMetrics.TASK_CONSUMED, self.identity | ||
| ) | ||
| task_state = TaskState.FINISHED | ||
| # report the task status as crashed | ||
| # if an exception was caught while processing | ||
| if exception_str is not None: | ||
| task_state = TaskState.CRASHED | ||
| task.error = exception_str | ||
| await self.backend.set_task_status(task, task_state) | ||
| async def internal_process(self, task: Task) -> None: | ||
| """ | ||
| The internal side of :py:meth:`Consumer.process` function, takes care of | ||
| synchronizing the task state, handling errors and running task hooks. | ||
| :param task: Task object to process | ||
| :meta private: | ||
| """ | ||
| try: | ||
| self.current_task = task | ||
| if not task.matches_filters(self.filters): | ||
| self.log.info("Task rejected because binds are no longer valid.") | ||
| await self.backend.set_task_status(task, TaskState.FINISHED) | ||
| # Task rejected: end of processing | ||
| return | ||
| await self._internal_process(task) | ||
| finally: | ||
| if self.concurrency_semaphore is not None: | ||
| self.concurrency_semaphore.release() | ||
| self.current_task = None | ||
| @property | ||
| def _bind(self) -> KartonBind: | ||
| return KartonBind( | ||
| identity=self.identity, | ||
| info=self.__class__.__doc__, | ||
| version=__version__, | ||
| filters=self.filters, | ||
| persistent=self.persistent, | ||
| service_version=self.__class__.version, | ||
| is_async=True, | ||
| ) | ||
| @classmethod | ||
| def args_parser(cls) -> argparse.ArgumentParser: | ||
| parser = super().args_parser() | ||
| # store_false defaults to True, we intentionally want None there | ||
| parser.add_argument( | ||
| "--non-persistent", | ||
| action="store_const", | ||
| const=False, | ||
| dest="persistent", | ||
| help="Run service with non-persistent queue", | ||
| ) | ||
| parser.add_argument( | ||
| "--task-timeout", | ||
| type=int, | ||
| help="Limit task execution time", | ||
| ) | ||
| parser.add_argument( | ||
| "--concurrency-limit", | ||
| type=int, | ||
| help="Limit number of concurrent tasks", | ||
| ) | ||
| return parser | ||
| @classmethod | ||
| def config_from_args(cls, config: Config, args: argparse.Namespace) -> None: | ||
| super().config_from_args(config, args) | ||
| config.load_from_dict( | ||
| { | ||
| "karton": { | ||
| "persistent": args.persistent, | ||
| "task_timeout": args.task_timeout, | ||
| "concurrency_limit": args.concurrency_limit, | ||
| } | ||
| } | ||
| ) | ||
| async def _loop(self) -> None: | ||
| """ | ||
| Blocking loop that consumes tasks and runs | ||
| :py:meth:`karton.Consumer.process` as a handler | ||
| :meta private: | ||
| """ | ||
| self.log.info("Service %s started", self.identity) | ||
| if self.task_timeout: | ||
| self.log.info(f"Task timeout is set to {self.task_timeout} seconds") | ||
| if self.concurrency_limit: | ||
| self.log.info(f"Concurrency limit is set to {self.concurrency_limit}") | ||
| # Get the old binds and set the new ones atomically | ||
| old_bind = await self.backend.register_bind(self._bind) | ||
| if not old_bind: | ||
| self.log.info("Service binds created.") | ||
| elif old_bind != self._bind: | ||
| self.log.info("Binds changed, old service instances should exit soon.") | ||
| for task_filter in self.filters: | ||
| self.log.info("Binding on: %s", task_filter) | ||
| concurrent_tasks: List[asyncio.Task] = [] | ||
| try: | ||
| while True: | ||
| current_bind = await self.backend.get_bind(self.identity) | ||
| if current_bind != self._bind: | ||
| self.log.info("Binds changed, shutting down.") | ||
| break | ||
| if self.concurrency_semaphore is not None: | ||
| await self.concurrency_semaphore.acquire() | ||
| task = await self.backend.consume_routed_task(self.identity) | ||
| if task: | ||
| coro_task = asyncio.create_task(self.internal_process(task)) | ||
| concurrent_tasks.append(coro_task) | ||
| else: | ||
| if self.concurrency_semaphore is not None: | ||
| self.concurrency_semaphore.release() | ||
| # Garbage collection and exception propagation | ||
| # for finished concurrent tasks | ||
| unfinished_tasks: List[asyncio.Task] = [] | ||
| for coro_task in concurrent_tasks: | ||
| if coro_task.done(): | ||
| # Propagate possible unhandled exception | ||
| coro_task.result() | ||
| else: | ||
| unfinished_tasks.append(coro_task) | ||
| concurrent_tasks = unfinished_tasks | ||
| finally: | ||
| # Finally handles shutdown events: | ||
| # - main loop cancellation (SIGINT/SIGTERM) | ||
| # - unhandled exception in internal_process | ||
| # First cancel all pending tasks | ||
| for coro_task in concurrent_tasks: | ||
| if not coro_task.done(): | ||
| coro_task.cancel() | ||
| # Then gather all tasks to finalize them | ||
| await asyncio.gather(*concurrent_tasks) | ||
| class Karton(Consumer, Producer): | ||
| """ | ||
| This glues together Consumer and Producer - which is the most common use case | ||
| """ | ||
| def __init__( | ||
| self, | ||
| config: Optional[Config] = None, | ||
| identity: Optional[str] = None, | ||
| backend: Optional[KartonAsyncBackend] = None, | ||
| ) -> None: | ||
| super().__init__(config=config, identity=identity, backend=backend) |
| """ | ||
| asyncio implementation of KartonLogHandler | ||
| """ | ||
| import asyncio | ||
| import logging | ||
| import platform | ||
| from typing import Any, Dict, Optional, Tuple | ||
| from karton.core.logger import LogLineFormatterMixin | ||
| from .backend import KartonAsyncBackend | ||
| HOSTNAME = platform.node() | ||
| QueuedRecord = Optional[Tuple[Dict[str, Any], str]] | ||
| async def async_log_consumer( | ||
| queue: asyncio.Queue[QueuedRecord], backend: KartonAsyncBackend, channel: str | ||
| ) -> None: | ||
| while True: | ||
| item = await queue.get() | ||
| if not item: | ||
| break | ||
| log_line, levelname = item | ||
| await backend.produce_log(log_line, logger_name=channel, level=levelname) | ||
| class KartonAsyncLogHandler(logging.Handler, LogLineFormatterMixin): | ||
| """ | ||
| logging.Handler that passes logs to the Karton backend. | ||
| """ | ||
| def __init__(self, backend: KartonAsyncBackend, channel: str) -> None: | ||
| logging.Handler.__init__(self) | ||
| self._consumer: Optional[asyncio.Task] = None | ||
| self._queue: asyncio.Queue[QueuedRecord] = asyncio.Queue() | ||
| self._backend = backend | ||
| self._channel = channel | ||
| def emit(self, record: logging.LogRecord) -> None: | ||
| log_line = self.prepare_log_line(record) | ||
| self._queue.put_nowait((log_line, record.levelname)) | ||
| def start_consuming(self): | ||
| if self._consumer is not None: | ||
| raise RuntimeError("Consumer already started") | ||
| self._consumer = asyncio.create_task( | ||
| async_log_consumer(self._queue, self._backend, self._channel) | ||
| ) | ||
| async def stop_consuming(self): | ||
| if self._consumer is None: | ||
| raise RuntimeError("Consumer is not started") | ||
| self._queue.put_nowait(None) # Signal that queue is finished | ||
| await self._consumer |
| import contextlib | ||
| import os | ||
| import shutil | ||
| import tempfile | ||
| import zipfile | ||
| from io import BytesIO | ||
| from typing import IO, TYPE_CHECKING, Any, AsyncIterator, Dict, List, Optional, Union | ||
| from karton.core.resource import LocalResourceBase, ResourceBase | ||
| if TYPE_CHECKING: | ||
| from .backend import KartonAsyncBackend | ||
| class LocalResource(LocalResourceBase): | ||
| """ | ||
| Represents local resource with arbitrary binary data e.g. file contents. | ||
| Local resources will be uploaded to object hub (S3) during | ||
| task dispatching. | ||
| .. code-block:: python | ||
| # Creating resource from bytes | ||
| sample = Resource("original_name.exe", content=b"X5O!P%@AP[4\\ | ||
| PZX54(P^)7CC)7}$EICAR-STANDARD-ANT...") | ||
| # Creating resource from path | ||
| sample = Resource("original_name.exe", path="sample/original_name.exe") | ||
| :param name: Name of the resource (e.g. name of file) | ||
| :param content: Resource content | ||
| :param path: Path of file with resource content | ||
| :param bucket: Alternative S3 bucket for resource | ||
| :param metadata: Resource metadata | ||
| :param uid: Alternative S3 resource id | ||
| :param sha256: Resource sha256 hash | ||
| :param fd: Seekable file descriptor | ||
| :param _flags: Resource flags | ||
| :param _close_fd: Close file descriptor after upload (default: False) | ||
| """ | ||
| def __init__( | ||
| self, | ||
| name: str, | ||
| content: Optional[Union[str, bytes]] = None, | ||
| path: Optional[str] = None, | ||
| bucket: Optional[str] = None, | ||
| metadata: Optional[Dict[str, Any]] = None, | ||
| uid: Optional[str] = None, | ||
| sha256: Optional[str] = None, | ||
| fd: Optional[IO[bytes]] = None, | ||
| _flags: Optional[List[str]] = None, | ||
| _close_fd: bool = False, | ||
| ) -> None: | ||
| super().__init__( | ||
| name=name, | ||
| content=content, | ||
| path=path, | ||
| bucket=bucket, | ||
| metadata=metadata, | ||
| uid=uid, | ||
| sha256=sha256, | ||
| fd=fd, | ||
| _flags=_flags, | ||
| _close_fd=_close_fd, | ||
| ) | ||
| async def _upload(self, backend: "KartonAsyncBackend") -> None: | ||
| """Internal function for uploading resources | ||
| :param backend: KartonBackend to use while uploading the resource | ||
| :meta private: | ||
| """ | ||
| # Note: never transform resource into Remote | ||
| # Multiple task dispatching with same local, in that case resource | ||
| # can be deleted between tasks. | ||
| if self.bucket is None: | ||
| raise RuntimeError( | ||
| "Resource object can't be uploaded because its bucket is not set" | ||
| ) | ||
| if self._content: | ||
| # Upload contents | ||
| await backend.upload_object(self.bucket, self.uid, self._content) | ||
| elif self.fd: | ||
| if self.fd.tell() != 0: | ||
| raise RuntimeError( | ||
| f"Resource object can't be uploaded: " | ||
| f"file descriptor must point at first byte " | ||
| f"(fd.tell = {self.fd.tell()})" | ||
| ) | ||
| # Upload contents from fd | ||
| await backend.upload_object(self.bucket, self.uid, self.fd) | ||
| # If file descriptor is managed by Resource, close it after upload | ||
| if self._close_fd: | ||
| self.fd.close() | ||
| elif self._path: | ||
| # Upload file provided by path | ||
| await backend.upload_object_from_file(self.bucket, self.uid, self._path) | ||
| async def upload(self, backend: "KartonAsyncBackend") -> None: | ||
| """Internal function for uploading resources | ||
| :param backend: KartonBackend to use while uploading the resource | ||
| :meta private: | ||
| """ | ||
| if not self._content and not self._path and not self.fd: | ||
| raise RuntimeError("Can't upload resource without content") | ||
| await self._upload(backend) | ||
| Resource = LocalResource | ||
| class RemoteResource(ResourceBase): | ||
| """ | ||
| Keeps reference to remote resource object shared between subsystems | ||
| via object storage (S3) | ||
| Should never be instantiated directly by subsystem, but can be directly passed to | ||
| outgoing payload. | ||
| :param name: Name of the resource (e.g. name of file) | ||
| :param bucket: Alternative S3 bucket for resource | ||
| :param metadata: Resource metadata | ||
| :param uid: Alternative S3 resource id | ||
| :param size: Resource size | ||
| :param backend: :py:meth:`KartonBackend` to bind to this resource | ||
| :param sha256: Resource sha256 hash | ||
| :param _flags: Resource flags | ||
| """ | ||
| def __init__( | ||
| self, | ||
| name: str, | ||
| bucket: Optional[str] = None, | ||
| metadata: Optional[Dict[str, Any]] = None, | ||
| uid: Optional[str] = None, | ||
| size: Optional[int] = None, | ||
| backend: Optional["KartonAsyncBackend"] = None, | ||
| sha256: Optional[str] = None, | ||
| _flags: Optional[List[str]] = None, | ||
| ) -> None: | ||
| super(RemoteResource, self).__init__( | ||
| name, | ||
| bucket=bucket, | ||
| metadata=metadata, | ||
| sha256=sha256, | ||
| _uid=uid, | ||
| _size=size, | ||
| _flags=_flags, | ||
| ) | ||
| self.backend = backend | ||
| def loaded(self) -> bool: | ||
| """ | ||
| Checks whether resource is loaded into memory | ||
| :return: Flag indicating if the resource is loaded or not | ||
| """ | ||
| return self._content is not None | ||
| @property | ||
| def content(self) -> bytes: | ||
| """ | ||
| Resource content. Performs download when resource was not loaded before. | ||
| :return: Content bytes | ||
| """ | ||
| if self._content is None: | ||
| raise RuntimeError( | ||
| "Resource object needs to be explicitly downloaded first" | ||
| ) | ||
| return self._content | ||
| @classmethod | ||
| def from_dict( | ||
| cls, dict: Dict[str, Any], backend: Optional["KartonAsyncBackend"] | ||
| ) -> "RemoteResource": | ||
| """ | ||
| Internal deserialization method for remote resources | ||
| :param dict: Serialized information about resource | ||
| :param backend: KartonBackend object | ||
| :return: Deserialized :py:meth:`RemoteResource` object | ||
| :meta private: | ||
| """ | ||
| # Backwards compatibility | ||
| metadata = dict.get("metadata", {}) | ||
| if "sha256" in dict: | ||
| metadata["sha256"] = dict["sha256"] | ||
| return cls( | ||
| name=dict["name"], | ||
| metadata=metadata, | ||
| bucket=dict["bucket"], | ||
| uid=dict["uid"], | ||
| size=dict.get("size"), # Backwards compatibility (2.x.x) | ||
| backend=backend, | ||
| _flags=dict.get("flags"), # Backwards compatibility (3.x.x) | ||
| ) | ||
| def unload(self) -> None: | ||
| """ | ||
| Unloads resource object from memory | ||
| """ | ||
| self._content = None | ||
| async def download(self) -> bytes: | ||
| """ | ||
| Downloads remote resource content from object hub into memory. | ||
| .. code-block:: python | ||
| sample = self.current_task.get_resource("sample") | ||
| # Ensure that resource will be downloaded before it will be | ||
| # passed to processing method | ||
| sample.download() | ||
| self.process_sample(sample) | ||
| :return: Downloaded content bytes | ||
| """ | ||
| if self.backend is None: | ||
| raise RuntimeError( | ||
| ( | ||
| "Resource object can't be downloaded because it's not bound to " | ||
| "the backend" | ||
| ) | ||
| ) | ||
| if self.bucket is None: | ||
| raise RuntimeError( | ||
| "Resource object can't be downloaded because its bucket is not set" | ||
| ) | ||
| self._content = await self.backend.download_object(self.bucket, self.uid) | ||
| return self._content | ||
| async def download_to_file(self, path: str) -> None: | ||
| """ | ||
| Downloads remote resource into file. | ||
| .. code-block:: python | ||
| sample = self.current_task.get_resource("sample") | ||
| sample.download_to_file("sample/sample.exe") | ||
| with open("sample/sample.exe", "rb") as f: | ||
| contents = f.read() | ||
| :param path: Path to download the resource into | ||
| """ | ||
| if self.backend is None: | ||
| raise RuntimeError( | ||
| ( | ||
| "Resource object can't be downloaded because it's not bound to " | ||
| "the backend" | ||
| ) | ||
| ) | ||
| if self.bucket is None: | ||
| raise RuntimeError( | ||
| "Resource object can't be downloaded because its bucket is not set" | ||
| ) | ||
| await self.backend.download_object_to_file(self.bucket, self.uid, path) | ||
| @contextlib.asynccontextmanager | ||
| async def download_temporary_file(self, suffix=None) -> AsyncIterator[IO[bytes]]: | ||
| """ | ||
| Downloads remote resource into named temporary file. | ||
| .. code-block:: python | ||
| sample = self.current_task.get_resource("sample") | ||
| with sample.download_temporary_file() as f: | ||
| contents = f.read() | ||
| path = f.name | ||
| # Temporary file is deleted after exitting the "with" scope | ||
| :return: ContextManager with the temporary file | ||
| """ | ||
| # That tempfile-fu is necessary because minio.fget_object removes file | ||
| # under provided path and renames its own part-file with downloaded content | ||
| # under previously deleted path | ||
| # Weird move, but ok... | ||
| tmp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix) | ||
| tmp.close() | ||
| try: | ||
| await self.download_to_file(tmp.name) | ||
| with open(tmp.name, "rb") as f: | ||
| yield f | ||
| finally: | ||
| os.remove(tmp.name) | ||
| @contextlib.asynccontextmanager | ||
| async def zip_file(self) -> AsyncIterator[zipfile.ZipFile]: | ||
| """ | ||
| If resource contains a Zip file, downloads it to the temporary file | ||
| and wraps it with ZipFile object. | ||
| .. code-block:: python | ||
| dumps = self.current_task.get_resource("dumps") | ||
| with dumps.zip_file() as zipf: | ||
| print("Fetched dumps: ", zipf.namelist()) | ||
| By default: method downloads zip into temporary file, which is deleted after | ||
| leaving the context. If you want to load zip into memory, | ||
| call :py:meth:`RemoteResource.download` first. | ||
| If you want to pre-download Zip under specified path and open it using | ||
| zipfile module, you need to do this manually: | ||
| .. code-block:: python | ||
| dumps = self.current_task.get_resource("dumps") | ||
| # Download zip file | ||
| zip_path = "./dumps.zip" | ||
| dumps.download_to_file(zip_path) | ||
| zipf = zipfile.Zipfile(zip_path) | ||
| :return: ContextManager with zipfile | ||
| """ | ||
| if self._content: | ||
| yield zipfile.ZipFile(BytesIO(self._content)) | ||
| else: | ||
| async with self.download_temporary_file() as f: | ||
| yield zipfile.ZipFile(f) | ||
| async def extract_to_directory(self, path: str) -> None: | ||
| """ | ||
| If resource contains a Zip file, extracts files contained in Zip into | ||
| provided path. | ||
| By default: method downloads zip into temporary file, which is deleted | ||
| after extraction. If you want to load zip into memory, call | ||
| :py:meth:`RemoteResource.download` first. | ||
| :param path: Directory path where the resource should be unpacked | ||
| """ | ||
| async with self.zip_file() as zf: | ||
| zf.extractall(path) | ||
| @contextlib.asynccontextmanager | ||
| async def extract_temporary(self) -> AsyncIterator[str]: | ||
| """ | ||
| If resource contains a Zip file, extracts files contained in Zip | ||
| to the temporary directory. | ||
| Returns path of directory with extracted files. Directory is recursively | ||
| deleted after leaving the context. | ||
| .. code-block:: python | ||
| dumps = self.current_task.get_resource("dumps") | ||
| with dumps.extract_temporary() as dumps_path: | ||
| print("Fetched dumps:", os.listdir(dumps_path)) | ||
| By default: method downloads zip into temporary file, which is deleted | ||
| after extraction. If you want to load zip into memory, call | ||
| :py:meth:`RemoteResource.download` first. | ||
| :return: ContextManager with the temporary directory | ||
| """ | ||
| tmpdir = tempfile.mkdtemp() | ||
| try: | ||
| await self.extract_to_directory(tmpdir) | ||
| yield tmpdir | ||
| yield tmpdir | ||
| finally: | ||
| shutil.rmtree(tmpdir) |
@@ -1,1 +0,1 @@ | ||
| __version__ = "5.7.0" | ||
| __version__ = "5.8.0" |
+150
-109
@@ -24,2 +24,3 @@ import dataclasses | ||
| from .exceptions import InvalidIdentityError | ||
| from .resource import RemoteResource | ||
| from .task import Task, TaskPriority, TaskState | ||
@@ -37,3 +38,11 @@ from .utils import chunks, chunks_iter | ||
| "KartonBind", | ||
| ["identity", "info", "version", "persistent", "filters", "service_version"], | ||
| [ | ||
| "identity", | ||
| "info", | ||
| "version", | ||
| "persistent", | ||
| "filters", | ||
| "service_version", | ||
| "is_async", | ||
| ], | ||
| ) | ||
@@ -43,3 +52,3 @@ | ||
| KartonOutputs = namedtuple("KartonOutputs", ["identity", "outputs"]) | ||
| logger = logging.getLogger("karton.core.backend") | ||
| logger = logging.getLogger(__name__) | ||
@@ -109,3 +118,3 @@ | ||
| class KartonBackend: | ||
| class KartonBackendBase: | ||
| def __init__( | ||
@@ -116,3 +125,3 @@ self, | ||
| service_info: Optional[KartonServiceInfo] = None, | ||
| ) -> None: | ||
| ): | ||
| self.config = config | ||
@@ -125,56 +134,3 @@ | ||
| self.service_info = service_info | ||
| self.redis = self.make_redis( | ||
| config, identity=identity, service_info=service_info | ||
| ) | ||
| endpoint = config.get("s3", "address") | ||
| access_key = config.get("s3", "access_key") | ||
| secret_key = config.get("s3", "secret_key") | ||
| iam_auth = config.getboolean("s3", "iam_auth") | ||
| if not endpoint: | ||
| raise RuntimeError("Attempting to get S3 client without an endpoint set") | ||
| if access_key and secret_key and iam_auth: | ||
| logger.warning( | ||
| "Warning: iam is turned on and both S3 access key and secret key are" | ||
| " provided" | ||
| ) | ||
| if iam_auth: | ||
| s3_client = self.iam_auth_s3(endpoint) | ||
| if s3_client: | ||
| self.s3 = s3_client | ||
| return | ||
| if access_key is None or secret_key is None: | ||
| raise RuntimeError( | ||
| "Attempting to get S3 client without an access_key/secret_key set" | ||
| ) | ||
| self.s3 = boto3.client( | ||
| "s3", | ||
| endpoint_url=endpoint, | ||
| aws_access_key_id=access_key, | ||
| aws_secret_access_key=secret_key, | ||
| ) | ||
| def iam_auth_s3(self, endpoint: str): | ||
| boto_session = get_session() | ||
| iam_providers = [ | ||
| ContainerProvider(), | ||
| InstanceMetadataProvider( | ||
| iam_role_fetcher=InstanceMetadataFetcher(timeout=1000, num_attempts=2) | ||
| ), | ||
| ] | ||
| for provider in iam_providers: | ||
| creds = provider.load() | ||
| if creds: | ||
| boto_session._credentials = creds # type: ignore | ||
| return boto3.Session(botocore_session=boto_session).client( | ||
| "s3", | ||
| endpoint_url=endpoint, | ||
| ) | ||
| @staticmethod | ||
@@ -188,44 +144,2 @@ def _validate_identity(identity: str): | ||
| @staticmethod | ||
| def make_redis( | ||
| config, | ||
| identity: Optional[str] = None, | ||
| service_info: Optional[KartonServiceInfo] = None, | ||
| ) -> StrictRedis: | ||
| """ | ||
| Create and test a Redis connection. | ||
| :param config: The karton configuration | ||
| :param identity: Karton service identity | ||
| :param service_info: Additional service identity metadata | ||
| :return: Redis connection | ||
| """ | ||
| if service_info is not None: | ||
| client_name: Optional[str] = service_info.make_client_name() | ||
| else: | ||
| client_name = identity | ||
| redis_args = { | ||
| "host": config["redis"]["host"], | ||
| "port": config.getint("redis", "port", 6379), | ||
| "db": config.getint("redis", "db", 0), | ||
| "username": config.get("redis", "username"), | ||
| "password": config.get("redis", "password"), | ||
| "client_name": client_name, | ||
| # set socket_timeout to None if set to 0 | ||
| "socket_timeout": config.getint("redis", "socket_timeout", 30) or None, | ||
| "decode_responses": True, | ||
| } | ||
| try: | ||
| redis = StrictRedis(**redis_args) | ||
| redis.ping() | ||
| except AuthenticationError: | ||
| # Maybe we've sent a wrong password. | ||
| # Or maybe the server is not (yet) password protected | ||
| # To make smooth transition possible, try to login insecurely | ||
| del redis_args["password"] | ||
| redis = StrictRedis(**redis_args) | ||
| redis.ping() | ||
| return redis | ||
| @property | ||
@@ -280,2 +194,3 @@ def default_bucket_name(self) -> str: | ||
| "service_version": bind.service_version, | ||
| "is_async": bind.is_async, | ||
| }, | ||
@@ -305,2 +220,3 @@ sort_keys=True, | ||
| service_version=None, | ||
| is_async=False, | ||
| ) | ||
@@ -314,2 +230,3 @@ return KartonBind( | ||
| service_version=bind.get("service_version"), | ||
| is_async=bind.get("is_async", False), | ||
| ) | ||
@@ -329,2 +246,122 @@ | ||
| @staticmethod | ||
| def _log_channel(logger_name: Optional[str], level: Optional[str]) -> str: | ||
| return ".".join( | ||
| [KARTON_LOG_CHANNEL, (level or "*").lower(), logger_name or "*"] | ||
| ) | ||
| class KartonBackend(KartonBackendBase): | ||
| def __init__( | ||
| self, | ||
| config: Config, | ||
| identity: Optional[str] = None, | ||
| service_info: Optional[KartonServiceInfo] = None, | ||
| ) -> None: | ||
| super().__init__(config, identity, service_info) | ||
| self.redis = self.make_redis( | ||
| config, identity=identity, service_info=service_info | ||
| ) | ||
| endpoint = config.get("s3", "address") | ||
| access_key = config.get("s3", "access_key") | ||
| secret_key = config.get("s3", "secret_key") | ||
| iam_auth = config.getboolean("s3", "iam_auth") | ||
| if not endpoint: | ||
| raise RuntimeError("Attempting to get S3 client without an endpoint set") | ||
| if access_key and secret_key and iam_auth: | ||
| logger.warning( | ||
| "Warning: iam is turned on and both S3 access key and secret key are" | ||
| " provided" | ||
| ) | ||
| if iam_auth: | ||
| s3_client = self.iam_auth_s3(endpoint) | ||
| if s3_client: | ||
| self.s3 = s3_client | ||
| return | ||
| if access_key is None or secret_key is None: | ||
| raise RuntimeError( | ||
| "Attempting to get S3 client without an access_key/secret_key set" | ||
| ) | ||
| self.s3 = boto3.client( | ||
| "s3", | ||
| endpoint_url=endpoint, | ||
| aws_access_key_id=access_key, | ||
| aws_secret_access_key=secret_key, | ||
| ) | ||
| def iam_auth_s3(self, endpoint: str): | ||
| boto_session = get_session() | ||
| iam_providers = [ | ||
| ContainerProvider(), | ||
| InstanceMetadataProvider( | ||
| iam_role_fetcher=InstanceMetadataFetcher(timeout=1000, num_attempts=2) | ||
| ), | ||
| ] | ||
| for provider in iam_providers: | ||
| creds = provider.load() | ||
| if creds: | ||
| boto_session._credentials = creds # type: ignore | ||
| return boto3.Session(botocore_session=boto_session).client( | ||
| "s3", | ||
| endpoint_url=endpoint, | ||
| ) | ||
| @staticmethod | ||
| def make_redis( | ||
| config, | ||
| identity: Optional[str] = None, | ||
| service_info: Optional[KartonServiceInfo] = None, | ||
| ) -> StrictRedis: | ||
| """ | ||
| Create and test a Redis connection. | ||
| :param config: The karton configuration | ||
| :param identity: Karton service identity | ||
| :param service_info: Additional service identity metadata | ||
| :return: Redis connection | ||
| """ | ||
| if service_info is not None: | ||
| client_name: Optional[str] = service_info.make_client_name() | ||
| else: | ||
| client_name = identity | ||
| redis_args = { | ||
| "host": config["redis"]["host"], | ||
| "port": config.getint("redis", "port", 6379), | ||
| "db": config.getint("redis", "db", 0), | ||
| "username": config.get("redis", "username"), | ||
| "password": config.get("redis", "password"), | ||
| "client_name": client_name, | ||
| # set socket_timeout to None if set to 0 | ||
| "socket_timeout": config.getint("redis", "socket_timeout", 30) or None, | ||
| "decode_responses": True, | ||
| } | ||
| try: | ||
| redis = StrictRedis(**redis_args) | ||
| redis.ping() | ||
| except AuthenticationError: | ||
| # Maybe we've sent a wrong password. | ||
| # Or maybe the server is not (yet) password protected | ||
| # To make smooth transition possible, try to login insecurely | ||
| del redis_args["password"] | ||
| redis = StrictRedis(**redis_args) | ||
| redis.ping() | ||
| return redis | ||
| def unserialize_resource(self, resource_spec: Dict[str, Any]) -> RemoteResource: | ||
| """ | ||
| Unserializes resource into a RemoteResource object bound with current backend | ||
| :param resource_spec: Resource specification | ||
| :return: RemoteResource object | ||
| """ | ||
| return RemoteResource.from_dict(resource_spec, backend=self) | ||
| def get_bind(self, identity: str) -> KartonBind: | ||
@@ -440,3 +477,5 @@ """ | ||
| return None | ||
| return Task.unserialize(task_data, backend=self) | ||
| return Task.unserialize( | ||
| task_data, resource_unserializer=self.unserialize_resource | ||
| ) | ||
@@ -464,3 +503,7 @@ def get_tasks( | ||
| return [ | ||
| Task.unserialize(task_data, backend=self, parse_resources=parse_resources) | ||
| Task.unserialize( | ||
| task_data, | ||
| parse_resources=parse_resources, | ||
| resource_unserializer=self.unserialize_resource, | ||
| ) | ||
| for chunk in keys | ||
@@ -480,3 +523,5 @@ for task_data in self.redis.mget(chunk) | ||
| Task.unserialize( | ||
| task_data, backend=self, parse_resources=parse_resources | ||
| task_data, | ||
| parse_resources=parse_resources, | ||
| resource_unserializer=self.unserialize_resource, | ||
| ) | ||
@@ -567,3 +612,5 @@ for task_data in self.redis.mget(chunk) | ||
| Task.unserialize( | ||
| task_data, backend=self, parse_resources=parse_resources | ||
| task_data, | ||
| parse_resources=parse_resources, | ||
| resource_unserializer=self.unserialize_resource, | ||
| ) | ||
@@ -816,8 +863,2 @@ for task_data in self.redis.mget(chunk) | ||
| @staticmethod | ||
| def _log_channel(logger_name: Optional[str], level: Optional[str]) -> str: | ||
| return ".".join( | ||
| [KARTON_LOG_CHANNEL, (level or "*").lower(), logger_name or "*"] | ||
| ) | ||
| def produce_log( | ||
@@ -824,0 +865,0 @@ self, |
+119
-93
@@ -12,28 +12,12 @@ import abc | ||
| from .config import Config | ||
| from .logger import KartonLogHandler | ||
| from .task import Task | ||
| from .logger import KartonLogHandler, TaskContextFilter | ||
| from .task import Task, get_current_task, set_current_task | ||
| from .utils import HardShutdownInterrupt, StrictClassMethod, graceful_killer | ||
| class KartonBase(abc.ABC): | ||
| """ | ||
| Base class for all Karton services | ||
| class ConfigMixin: | ||
| identity: Optional[str] | ||
| version: Optional[str] | ||
| You can set an informative version information by setting the ``version`` class | ||
| attribute. | ||
| """ | ||
| #: Karton service identity | ||
| identity: str = "" | ||
| #: Karton service version | ||
| version: Optional[str] = None | ||
| #: Include extended service information for non-consumer services | ||
| with_service_info: bool = False | ||
| def __init__( | ||
| self, | ||
| config: Optional[Config] = None, | ||
| identity: Optional[str] = None, | ||
| backend: Optional[KartonBackend] = None, | ||
| ) -> None: | ||
| def __init__(self, config: Optional[Config] = None, identity: Optional[str] = None): | ||
| self.config = config or Config() | ||
@@ -54,22 +38,78 @@ self.enable_publish_log = self.config.getboolean( | ||
| if self.debug: | ||
| if self.debug and self.identity: | ||
| self.identity += "-" + os.urandom(4).hex() + "-dev" | ||
| self.service_info = None | ||
| if self.identity is not None and self.with_service_info: | ||
| self.service_info = KartonServiceInfo( | ||
| identity=self.identity, | ||
| karton_version=__version__, | ||
| service_version=self.version, | ||
| ) | ||
| @classmethod | ||
| def args_description(cls) -> str: | ||
| """Return short description for argument parser.""" | ||
| if not cls.__doc__: | ||
| return "" | ||
| return textwrap.dedent(cls.__doc__).strip().splitlines()[0] | ||
| self.backend = backend or KartonBackend( | ||
| self.config, identity=self.identity, service_info=self.service_info | ||
| @classmethod | ||
| def args_parser(cls) -> argparse.ArgumentParser: | ||
| """ | ||
| Return ArgumentParser for main() class method. | ||
| This method should be overridden and call super methods | ||
| if you want to add more arguments. | ||
| """ | ||
| parser = argparse.ArgumentParser(description=cls.args_description()) | ||
| parser.add_argument( | ||
| "--version", action="version", version=cast(str, cls.version) | ||
| ) | ||
| parser.add_argument("--config-file", help="Alternative configuration path") | ||
| parser.add_argument( | ||
| "--identity", help="Alternative identity for Karton service" | ||
| ) | ||
| parser.add_argument("--log-level", help="Logging level of Karton logger") | ||
| parser.add_argument( | ||
| "--debug", help="Enable debugging mode", action="store_true", default=None | ||
| ) | ||
| return parser | ||
| self._log_handler = KartonLogHandler( | ||
| backend=self.backend, channel=self.identity | ||
| @classmethod | ||
| def config_from_args(cls, config: Config, args: argparse.Namespace) -> None: | ||
| """ | ||
| Updates configuration with settings from arguments | ||
| This method should be overridden and call super methods | ||
| if you want to add more arguments. | ||
| """ | ||
| config.load_from_dict( | ||
| { | ||
| "karton": { | ||
| "identity": args.identity, | ||
| "debug": args.debug, | ||
| }, | ||
| "logging": {"level": args.log_level}, | ||
| } | ||
| ) | ||
| self.current_task: Optional[Task] = None | ||
| @classmethod | ||
| def karton_from_args(cls, args: Optional[argparse.Namespace] = None): | ||
| """ | ||
| Returns Karton instance configured using configuration files | ||
| and provided arguments | ||
| Used by :py:meth:`KartonServiceBase.main` method | ||
| """ | ||
| if args is None: | ||
| parser = cls.args_parser() | ||
| args = parser.parse_args() | ||
| config = Config(path=args.config_file) | ||
| cls.config_from_args(config, args) | ||
| return cls(config=config) | ||
| class LoggingMixin: | ||
| config: Config | ||
| identity: Optional[str] | ||
| debug: bool | ||
| enable_publish_log: bool | ||
| def __init__(self, log_handler: logging.Handler, log_format: str) -> None: | ||
| self._log_handler = log_handler | ||
| self._log_format = log_format | ||
| def setup_logger(self, level: Optional[Union[str, int]] = None) -> None: | ||
@@ -99,2 +139,3 @@ """ | ||
| logger = logging.getLogger(self.identity) | ||
| logger.addFilter(TaskContextFilter()) | ||
@@ -112,5 +153,3 @@ if logger.handlers: | ||
| stream_handler = logging.StreamHandler() | ||
| stream_handler.setFormatter( | ||
| logging.Formatter("[%(asctime)s][%(levelname)s] %(message)s") | ||
| ) | ||
| stream_handler.setFormatter(logging.Formatter(self._log_format)) | ||
| logger.addHandler(stream_handler) | ||
@@ -122,3 +161,3 @@ | ||
| @property | ||
| def log_handler(self) -> KartonLogHandler: | ||
| def log_handler(self) -> logging.Handler: | ||
| """ | ||
@@ -149,65 +188,52 @@ Return KartonLogHandler bound to this Karton service. | ||
| @classmethod | ||
| def args_description(cls) -> str: | ||
| """Return short description for argument parser.""" | ||
| if not cls.__doc__: | ||
| return "" | ||
| return textwrap.dedent(cls.__doc__).strip().splitlines()[0] | ||
| @classmethod | ||
| def args_parser(cls) -> argparse.ArgumentParser: | ||
| """ | ||
| Return ArgumentParser for main() class method. | ||
| class KartonBase(abc.ABC, ConfigMixin, LoggingMixin): | ||
| """ | ||
| Base class for all Karton services | ||
| This method should be overridden and call super methods | ||
| if you want to add more arguments. | ||
| """ | ||
| parser = argparse.ArgumentParser(description=cls.args_description()) | ||
| parser.add_argument( | ||
| "--version", action="version", version=cast(str, cls.version) | ||
| ) | ||
| parser.add_argument("--config-file", help="Alternative configuration path") | ||
| parser.add_argument( | ||
| "--identity", help="Alternative identity for Karton service" | ||
| ) | ||
| parser.add_argument("--log-level", help="Logging level of Karton logger") | ||
| parser.add_argument( | ||
| "--debug", help="Enable debugging mode", action="store_true", default=None | ||
| ) | ||
| return parser | ||
| You can set an informative version information by setting the ``version`` class | ||
| attribute. | ||
| """ | ||
| @classmethod | ||
| def config_from_args(cls, config: Config, args: argparse.Namespace) -> None: | ||
| """ | ||
| Updates configuration with settings from arguments | ||
| #: Karton service identity | ||
| identity: str = "" | ||
| #: Karton service version | ||
| version: Optional[str] = None | ||
| #: Include extended service information for non-consumer services | ||
| with_service_info: bool = False | ||
| This method should be overridden and call super methods | ||
| if you want to add more arguments. | ||
| """ | ||
| config.load_from_dict( | ||
| { | ||
| "karton": { | ||
| "identity": args.identity, | ||
| "debug": args.debug, | ||
| }, | ||
| "logging": {"level": args.log_level}, | ||
| } | ||
| def __init__( | ||
| self, | ||
| config: Optional[Config] = None, | ||
| identity: Optional[str] = None, | ||
| backend: Optional[KartonBackend] = None, | ||
| ) -> None: | ||
| ConfigMixin.__init__(self, config, identity) | ||
| self.service_info = None | ||
| if self.identity is not None and self.with_service_info: | ||
| self.service_info = KartonServiceInfo( | ||
| identity=self.identity, | ||
| karton_version=__version__, | ||
| service_version=self.version, | ||
| ) | ||
| self.backend = backend or KartonBackend( | ||
| self.config, identity=self.identity, service_info=self.service_info | ||
| ) | ||
| @classmethod | ||
| def karton_from_args(cls, args: Optional[argparse.Namespace] = None): | ||
| """ | ||
| Returns Karton instance configured using configuration files | ||
| and provided arguments | ||
| log_handler = KartonLogHandler(backend=self.backend, channel=self.identity) | ||
| LoggingMixin.__init__( | ||
| self, log_handler, log_format="[%(asctime)s][%(levelname)s] %(message)s" | ||
| ) | ||
| Used by :py:meth:`KartonServiceBase.main` method | ||
| """ | ||
| if args is None: | ||
| parser = cls.args_parser() | ||
| args = parser.parse_args() | ||
| config = Config(path=args.config_file) | ||
| cls.config_from_args(config, args) | ||
| return cls(config=config) | ||
| @property | ||
| def current_task(self) -> Optional[Task]: | ||
| return get_current_task() | ||
| @current_task.setter | ||
| def current_task(self, task: Optional[Task]): | ||
| set_current_task(task) | ||
| class KartonServiceBase(KartonBase): | ||
@@ -214,0 +240,0 @@ """ |
@@ -119,3 +119,8 @@ import configparser | ||
| @overload | ||
| def getint( | ||
| self, section_name: str, option_name: str, fallback: Optional[int] | ||
| ) -> Optional[int]: ... | ||
| def getint( | ||
| self, section_name: str, option_name: str, fallback: Optional[int] = None | ||
@@ -122,0 +127,0 @@ ) -> Optional[int]: |
+13
-12
@@ -140,3 +140,3 @@ """ | ||
| self.task_timeout = self.config.getint("karton", "task_timeout") | ||
| self.current_task: Optional[Task] = None | ||
| self._pre_hooks: List[Tuple[Optional[str], Callable[[Task], None]]] = [] | ||
@@ -174,7 +174,6 @@ self._post_hooks: List[ | ||
| self.current_task = task | ||
| self.log_handler.set_task(self.current_task) | ||
| if not self.current_task.matches_filters(self.filters): | ||
| if not task.matches_filters(self.filters): | ||
| self.log.info("Task rejected because binds are no longer valid.") | ||
| self.backend.set_task_status(self.current_task, TaskState.FINISHED) | ||
| self.backend.set_task_status(task, TaskState.FINISHED) | ||
| # Task rejected: end of processing | ||
@@ -186,4 +185,4 @@ return | ||
| try: | ||
| self.log.info("Received new task - %s", self.current_task.uid) | ||
| self.backend.set_task_status(self.current_task, TaskState.STARTED) | ||
| self.log.info("Received new task - %s", task.uid) | ||
| self.backend.set_task_status(task, TaskState.STARTED) | ||
@@ -196,5 +195,5 @@ self._run_pre_hooks() | ||
| with timeout(self.task_timeout): | ||
| self.process(self.current_task) | ||
| self.process(task) | ||
| else: | ||
| self.process(self.current_task) | ||
| self.process(task) | ||
| except (Exception, TaskTimeoutError) as exc: | ||
@@ -206,3 +205,3 @@ saved_exception = exc | ||
| self.log.info("Task done - %s", self.current_task.uid) | ||
| self.log.info("Task done - %s", task.uid) | ||
| except (Exception, TaskTimeoutError): | ||
@@ -213,3 +212,3 @@ exc_info = sys.exc_info() | ||
| self.backend.increment_metrics(KartonMetrics.TASK_CRASHED, self.identity) | ||
| self.log.exception("Failed to process task - %s", self.current_task.uid) | ||
| self.log.exception("Failed to process task - %s", task.uid) | ||
| finally: | ||
@@ -224,5 +223,6 @@ self.backend.increment_metrics(KartonMetrics.TASK_CONSUMED, self.identity) | ||
| task_state = TaskState.CRASHED | ||
| self.current_task.error = exception_str | ||
| task.error = exception_str | ||
| self.backend.set_task_status(self.current_task, task_state) | ||
| self.backend.set_task_status(task, task_state) | ||
| self.current_task = None | ||
@@ -238,2 +238,3 @@ @property | ||
| service_version=self.__class__.version, | ||
| is_async=False, | ||
| ) | ||
@@ -240,0 +241,0 @@ |
+33
-15
@@ -5,6 +5,6 @@ import logging | ||
| import warnings | ||
| from typing import Optional | ||
| from typing import Any, Callable, Dict | ||
| from .backend import KartonBackend | ||
| from .task import Task | ||
| from .task import get_current_task | ||
@@ -14,18 +14,20 @@ HOSTNAME = platform.node() | ||
| class KartonLogHandler(logging.Handler): | ||
| class TaskContextFilter(logging.Filter): | ||
| """ | ||
| logging.Handler that passes logs to the Karton backend. | ||
| This is a filter which injects information about current task ID to the log. | ||
| """ | ||
| def __init__(self, backend: KartonBackend, channel: str) -> None: | ||
| logging.Handler.__init__(self) | ||
| self.backend = backend | ||
| self.task: Optional[Task] = None | ||
| self.is_consumer_active: bool = True | ||
| self.channel: str = channel | ||
| def filter(self, record: logging.LogRecord) -> bool: | ||
| current_task = get_current_task() | ||
| if current_task is not None: | ||
| record.task_id = current_task.task_uid | ||
| else: | ||
| record.task_id = "(no task)" | ||
| return True | ||
| def set_task(self, task: Task) -> None: | ||
| self.task = task | ||
| def emit(self, record: logging.LogRecord) -> None: | ||
| class LogLineFormatterMixin: | ||
| format: Callable[[logging.LogRecord], str] | ||
| def prepare_log_line(self, record: logging.LogRecord) -> Dict[str, Any]: | ||
| ignore_fields = [ | ||
@@ -59,7 +61,23 @@ "args", | ||
| if self.task is not None: | ||
| log_line["task"] = self.task.serialize() | ||
| current_task = get_current_task() | ||
| if current_task is not None: | ||
| log_line["task"] = current_task.serialize() | ||
| log_line["hostname"] = HOSTNAME | ||
| return log_line | ||
| class KartonLogHandler(logging.Handler, LogLineFormatterMixin): | ||
| """ | ||
| logging.Handler that passes logs to the Karton backend. | ||
| """ | ||
| def __init__(self, backend: KartonBackend, channel: str) -> None: | ||
| logging.Handler.__init__(self) | ||
| self.backend = backend | ||
| self.is_consumer_active: bool = True | ||
| self.channel: str = channel | ||
| def emit(self, record: logging.LogRecord) -> None: | ||
| log_line = self.prepare_log_line(record) | ||
| log_consumed = self.backend.produce_log( | ||
@@ -66,0 +84,0 @@ log_line, logger_name=self.channel, level=record.levelname |
+32
-30
@@ -153,30 +153,3 @@ import contextlib | ||
| class LocalResource(ResourceBase): | ||
| """ | ||
| Represents local resource with arbitrary binary data e.g. file contents. | ||
| Local resources will be uploaded to object hub (S3) during | ||
| task dispatching. | ||
| .. code-block:: python | ||
| # Creating resource from bytes | ||
| sample = Resource("original_name.exe", content=b"X5O!P%@AP[4\\ | ||
| PZX54(P^)7CC)7}$EICAR-STANDARD-ANT...") | ||
| # Creating resource from path | ||
| sample = Resource("original_name.exe", path="sample/original_name.exe") | ||
| :param name: Name of the resource (e.g. name of file) | ||
| :param content: Resource content | ||
| :param path: Path of file with resource content | ||
| :param bucket: Alternative S3 bucket for resource | ||
| :param metadata: Resource metadata | ||
| :param uid: Alternative S3 resource id | ||
| :param sha256: Resource sha256 hash | ||
| :param fd: Seekable file descriptor | ||
| :param _flags: Resource flags | ||
| :param _close_fd: Close file descriptor after upload (default: False) | ||
| """ | ||
| class LocalResourceBase(ResourceBase): | ||
| def __init__( | ||
@@ -198,3 +171,3 @@ self, | ||
| super(LocalResource, self).__init__( | ||
| super().__init__( | ||
| name, | ||
@@ -252,3 +225,3 @@ content=content, | ||
| uid: Optional[str] = None, | ||
| ) -> "LocalResource": | ||
| ) -> "LocalResourceBase": | ||
| """ | ||
@@ -311,2 +284,31 @@ Resource extension, allowing to pass whole directory as a zipped resource. | ||
| class LocalResource(LocalResourceBase): | ||
| """ | ||
| Represents local resource with arbitrary binary data e.g. file contents. | ||
| Local resources will be uploaded to object hub (S3) during | ||
| task dispatching. | ||
| .. code-block:: python | ||
| # Creating resource from bytes | ||
| sample = Resource("original_name.exe", content=b"X5O!P%@AP[4\\ | ||
| PZX54(P^)7CC)7}$EICAR-STANDARD-ANT...") | ||
| # Creating resource from path | ||
| sample = Resource("original_name.exe", path="sample/original_name.exe") | ||
| :param name: Name of the resource (e.g. name of file) | ||
| :param content: Resource content | ||
| :param path: Path of file with resource content | ||
| :param bucket: Alternative S3 bucket for resource | ||
| :param metadata: Resource metadata | ||
| :param uid: Alternative S3 resource id | ||
| :param sha256: Resource sha256 hash | ||
| :param fd: Seekable file descriptor | ||
| :param _flags: Resource flags | ||
| :param _close_fd: Close file descriptor after upload (default: False) | ||
| """ | ||
| def _upload(self, backend: "KartonBackend") -> None: | ||
@@ -313,0 +315,0 @@ """Internal function for uploading resources |
+24
-2
@@ -6,2 +6,3 @@ import enum | ||
| import warnings | ||
| from contextvars import ContextVar | ||
| from typing import ( | ||
@@ -28,3 +29,13 @@ TYPE_CHECKING, | ||
| current_task: ContextVar[Optional["Task"]] = ContextVar("current_task") | ||
| def get_current_task() -> Optional["Task"]: | ||
| return current_task.get(None) | ||
| def set_current_task(task: Optional["Task"]): | ||
| current_task.set(task) | ||
| class TaskState(enum.Enum): | ||
@@ -380,2 +391,3 @@ DECLARED = "Declared" # Task declared in TASKS_QUEUE | ||
| parse_resources: bool = True, | ||
| resource_unserializer: Optional[Callable[[Dict], Any]] = None, | ||
| ) -> "Task": | ||
@@ -386,3 +398,5 @@ """ | ||
| :param data: JSON-serialized task | ||
| :param backend: Backend instance to be bound to RemoteResource objects | ||
| :param backend: | | ||
| Backend instance to be bound to RemoteResource objects. | ||
| Deprecated: pass resource_unserializer instead. | ||
| :param parse_resources: | | ||
@@ -395,2 +409,5 @@ If set to False (default is True), method doesn't | ||
| to use faster 3rd-party JSON parser (orjson). | ||
| :param resource_unserializer: | | ||
| Resource factory used for deserialization of __karton_resource__ | ||
| dictionary values. | ||
| :return: Unserialized Task object | ||
@@ -407,3 +424,8 @@ | ||
| if isinstance(value, dict) and "__karton_resource__" in value: | ||
| return RemoteResource.from_dict(value["__karton_resource__"], backend) | ||
| if resource_unserializer is None: | ||
| return RemoteResource.from_dict( | ||
| value["__karton_resource__"], backend | ||
| ) | ||
| else: | ||
| return resource_unserializer(value["__karton_resource__"]) | ||
| return value | ||
@@ -410,0 +432,0 @@ |
+3
-2
| Metadata-Version: 2.1 | ||
| Name: karton-core | ||
| Version: 5.7.0 | ||
| Version: 5.8.0 | ||
| Summary: Distributed malware analysis orchestration framework | ||
@@ -12,3 +12,4 @@ Home-page: https://github.com/CERT-Polska/karton | ||
| License-File: LICENSE | ||
| Requires-Dist: boto3 <1.36.0 | ||
| Requires-Dist: aioboto3 ==14.3.0 | ||
| Requires-Dist: boto3 <1.37.4,>=1.37.2 | ||
| Requires-Dist: orjson | ||
@@ -15,0 +16,0 @@ Requires-Dist: redis |
+22
-16
@@ -1,27 +0,33 @@ | ||
| karton_core-5.7.0-nspkg.pth,sha256=vHa-jm6pBTeInFrmnsHMg9AOeD88czzQy-6QCFbpRcM,539 | ||
| karton_core-5.8.0-nspkg.pth,sha256=vHa-jm6pBTeInFrmnsHMg9AOeD88czzQy-6QCFbpRcM,539 | ||
| karton/core/__init__.py,sha256=QuT0BWZyp799eY90tK3H1OD2hwuusqMJq8vQwpB3kG4,337 | ||
| karton/core/__version__.py,sha256=QmHMXVnw5DVPfWzvN7FS1tOhDAesdxpM_aVOh9CMuSk,22 | ||
| karton/core/backend.py,sha256=_IOjN9pWdSBsDnTMYvg-Fpm6Ag-uf2Jb9LWmrtZqVAU,38773 | ||
| karton/core/base.py,sha256=lqVJvCHRMzvIOpS8SaWlOaSSJBEVkNQe0oClZC_GQYM,8225 | ||
| karton/core/config.py,sha256=M3dB0XgnUO5VzUcGyQa7FyKzmdgmDml1MrzG6CxEuvE,8100 | ||
| karton/core/__version__.py,sha256=duDTiv1rL8Ee9_jtzzhpq-j4xkkpVPkUh2Daa_Ou-xA,22 | ||
| karton/core/backend.py,sha256=3tmjAM35jaoZr_5QqarcVH81LvKMEB63vyvMKCYXPCM,39934 | ||
| karton/core/base.py,sha256=6PEQiEWHjMtJKRj0dfgEEpNhgSRoKOBYF1WBlrmyBp0,9064 | ||
| karton/core/config.py,sha256=UTd0hLqYNUttfI7FYUDOJPaz-C3uj1Kw7xxataTG_OM,8234 | ||
| karton/core/exceptions.py,sha256=8i9WVzi4PinNlX10Cb-lQQC35Hl-JB5R_UKXa9AUKoQ,153 | ||
| karton/core/inspect.py,sha256=aIJQEOEkD5q2xLlV8nhxY5qL5zqcnprP-2DdP6ecKlE,6150 | ||
| karton/core/karton.py,sha256=l3joJWw8m23wlOErkcQmNFYhLFA5x2la6L0WopxJ7mk,15435 | ||
| karton/core/logger.py,sha256=J3XAyG88U0cwYC9zR6E3QD1uJenrQh7zS9-HgxhqeAs,2040 | ||
| karton/core/karton.py,sha256=4CISOmUTfaEaCJUtbYxJSBMzydT27o3a-R14VBNpmr0,15269 | ||
| karton/core/logger.py,sha256=UhYCoVARXaatvoJ2lO2mfBHeODOS7z8O-vqdeQhNmV4,2654 | ||
| karton/core/main.py,sha256=ir1-dhn3vbwfh2YHiM6ZYfRBbjwLvJSz0d8tuK1mb_4,8310 | ||
| karton/core/py.typed,sha256=47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU,0 | ||
| karton/core/query.py,sha256=sf24DweVlXfJuBbBD_ns2LXhOV-IBwuPG3jBfTJu77s,12063 | ||
| karton/core/resource.py,sha256=9kWXpMBRfudH0_whJfSSI27K3Gwv2u93CVa7p68Q5UM,20842 | ||
| karton/core/task.py,sha256=gW1szMi5PN2Y06X-Ryo7cmEVluZv1r7W5tvmwIJiD94,18808 | ||
| karton/core/resource.py,sha256=GkU3JGSP1kOsAoblon4BWbQp31eZiDsCjaaDKanEokk,20872 | ||
| karton/core/task.py,sha256=QvAsSUIsyYiRmkgxgugrYkHWH2gczrFDfw2V4izdt1E,19566 | ||
| karton/core/test.py,sha256=cj6W4gNt0BpRjsYiiBt0hPE8dmRfUeIc8sSVkxB50cU,9123 | ||
| karton/core/utils.py,sha256=sEVqGdVPyYswWuVn8wYXBQmln8Az826N_2HgC__pmW8,4090 | ||
| karton/core/asyncio/__init__.py,sha256=ZgndeKzS3Yg2o8hebwFYJWlCRdW3ImdCOShK4EVmZ14,457 | ||
| karton/core/asyncio/backend.py,sha256=GF0z9YxWvUKYkvnBatCDthX0M9Kwt9cRfVSWQq5bc9E,12751 | ||
| karton/core/asyncio/base.py,sha256=YDNGyWzgVvt2TnfKvHYbJbcNJaQl95bdBq45YGEo-3Q,4246 | ||
| karton/core/asyncio/karton.py,sha256=LhzMGuJsmXdTEa323gZted8KgVfHH6l0j0_tTqMh4Z4,12932 | ||
| karton/core/asyncio/logger.py,sha256=BjkbuAeWylTmFjWv8-ckmOGf4nL2Tma96W0nIOc2vwk,1752 | ||
| karton/core/asyncio/resource.py,sha256=86AYm7JeVjEYRNw--h02HIS9xFvgddhktmDUp0qvTO4,12517 | ||
| karton/system/__init__.py,sha256=JF51OqRU_Y4c0unOulvmv1KzSHSq4ZpXU8ZsH4nefRM,63 | ||
| karton/system/__main__.py,sha256=QJkwIlSwaPRdzwKlNmCAL41HtDAa73db9MZKWmOfxGM,56 | ||
| karton/system/system.py,sha256=d_5hhLTthJdr_4gZEGQ6Y-kHvxeBqyQxjjx_wRs3xMA,17285 | ||
| karton_core-5.7.0.dist-info/LICENSE,sha256=o8h7hYhn7BJC_-DmrfqWwLjaR_Gbe0TZOOQJuN2ca3I,1519 | ||
| karton_core-5.7.0.dist-info/METADATA,sha256=MrmtycTaYsNB8v0LRyuLIHL2bV17n1Lt6e-ak4RfrH8,6818 | ||
| karton_core-5.7.0.dist-info/WHEEL,sha256=oiQVh_5PnQM0E3gPdiz09WCNmwiHDMaGer_elqB3coM,92 | ||
| karton_core-5.7.0.dist-info/entry_points.txt,sha256=OgLlsXy61GP6-Yob3oXqeJ2hlRU6LBLj33fr0NufKz0,98 | ||
| karton_core-5.7.0.dist-info/namespace_packages.txt,sha256=X8SslCPsqXDCnGZqrYYolzT3xPzJMq1r-ZQSc0jfAEA,7 | ||
| karton_core-5.7.0.dist-info/top_level.txt,sha256=X8SslCPsqXDCnGZqrYYolzT3xPzJMq1r-ZQSc0jfAEA,7 | ||
| karton_core-5.7.0.dist-info/RECORD,, | ||
| karton_core-5.8.0.dist-info/LICENSE,sha256=o8h7hYhn7BJC_-DmrfqWwLjaR_Gbe0TZOOQJuN2ca3I,1519 | ||
| karton_core-5.8.0.dist-info/METADATA,sha256=26n1VrX-0ESRMG7fug5hqKxYT1URIa_NgypF5N1E2gg,6860 | ||
| karton_core-5.8.0.dist-info/WHEEL,sha256=oiQVh_5PnQM0E3gPdiz09WCNmwiHDMaGer_elqB3coM,92 | ||
| karton_core-5.8.0.dist-info/entry_points.txt,sha256=OgLlsXy61GP6-Yob3oXqeJ2hlRU6LBLj33fr0NufKz0,98 | ||
| karton_core-5.8.0.dist-info/namespace_packages.txt,sha256=X8SslCPsqXDCnGZqrYYolzT3xPzJMq1r-ZQSc0jfAEA,7 | ||
| karton_core-5.8.0.dist-info/top_level.txt,sha256=X8SslCPsqXDCnGZqrYYolzT3xPzJMq1r-ZQSc0jfAEA,7 | ||
| karton_core-5.8.0.dist-info/RECORD,, |
| import sys, types, os;has_mfs = sys.version_info > (3, 5);p = os.path.join(sys._getframe(1).f_locals['sitedir'], *('karton',));importlib = has_mfs and __import__('importlib.util');has_mfs and __import__('importlib.machinery');m = has_mfs and sys.modules.setdefault('karton', importlib.util.module_from_spec(importlib.machinery.PathFinder.find_spec('karton', [os.path.dirname(p)])));m = m or sys.modules.setdefault('karton', types.ModuleType('karton'));mp = (m or []) and m.__dict__.setdefault('__path__',[]);(p not in mp) and mp.append(p) |
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.