snakemake-interface-storage-plugins
This package provides a stable interface for interactions between Snakemake and its storage plugins.
Plugins should implement the following skeleton to comply with this interface.
It is recommended to use Snakemake's poetry plugin to set up this skeleton (and automated testing) within a python package, see https://github.com/snakemake/poetry-snakemake-plugin.
from dataclasses import dataclass, field
from typing import Any, Iterable, Optional, List
from snakemake_interface_storage_plugins.settings import StorageProviderSettingsBase
from snakemake_interface_storage_plugins.storage_provider import (
StorageProviderBase,
StorageQueryValidationResult,
ExampleQuery,
Operation,
)
from snakemake_interface_storage_plugins.storage_object import (
StorageObjectRead,
StorageObjectWrite,
StorageObjectGlob,
StorageObjectTouch,
retry_decorator,
)
from snakemake_interface_storage_plugins.io import IOCacheStorageInterface
@dataclass
class StorageProviderSettings(StorageProviderSettingsBase):
myparam: Optional[int] = field(
default=None,
metadata={
"help": "Some help text",
"env_var": False,
"parse_func": ...,
"unparse_func": ...,
"required": True,
},
)
class StorageProvider(StorageProviderBase):
def __post_init__(self):
pass
@classmethod
def example_queries(cls) -> List[ExampleQuery]:
"""Return valid example queries (at least one) with description."""
...
def rate_limiter_key(self, query: str, operation: Operation) -> Any:
"""Return a key for identifying a rate limiter given a query and an operation.
This is used to identify a rate limiter for the query.
E.g. for a storage provider like http that would be the host name.
For s3 it might be just the endpoint URL.
"""
...
def default_max_requests_per_second(self) -> float:
"""Return the default maximum number of requests per second for this storage
provider."""
...
def use_rate_limiter(self) -> bool:
"""Return False if no rate limiting is needed for this provider."""
...
@classmethod
def is_valid_query(cls, query: str) -> StorageQueryValidationResult:
"""Return whether the given query is valid for this storage provider."""
...
def postprocess_query(self, query: str) -> str:
return query
class StorageObject(
StorageObjectRead,
StorageObjectWrite,
StorageObjectGlob,
StorageObjectTouch
):
def __post_init__(self):
pass
async def inventory(self, cache: IOCacheStorageInterface):
"""From this file, try to find as much existence and modification date
information as possible. Only retrieve that information that comes for free
given the current object.
"""
pass
def get_inventory_parent(self) -> Optional[str]:
"""Return the parent directory of this object."""
return None
def local_suffix(self) -> str:
"""Return a unique suffix for the local path, determined from self.query."""
...
def cleanup(self):
"""Perform local cleanup of any remainders of the storage object."""
...
@retry_decorator
def exists(self) -> bool:
...
@retry_decorator
def mtime(self) -> float:
...
@retry_decorator
def size(self) -> int:
...
@retry_decorator
def retrieve_object(self):
...
@retry_decorator
def store_object(self):
...
@retry_decorator
def remove(self):
...
@retry_decorator
def list_candidate_matches(self) -> Iterable[str]:
"""Return a list of candidate matches in the storage for the query."""
...
@retry_decorator
def touch(self):
"""Touch the object, updating its modification date."""
...