bionumpy
Advanced tools
| from bionumpy import SequenceEntry | ||
| import numpy as np | ||
| class MultipleSequenceAlignment: | ||
| def __init__(self, matrix, sequence_names): | ||
| self.matrix = matrix | ||
| self.sequence_names = sequence_names | ||
| def matrix(self): | ||
| return self.matrix | ||
| @classmethod | ||
| def from_sequence_entries(cls, entries: SequenceEntry): | ||
| sequences = entries.sequence | ||
| L = len(sequences[0]) | ||
| assert np.all(sequences.lengths == L) | ||
| matrix = sequences.ravel().reshape(len(sequences), L) | ||
| return cls(matrix, entries.name) | ||
| def mask(self): | ||
| return self.matrix != '-' |
| import functools | ||
| from . import bnp_open | ||
| from .bnpdataclass.bnpdataclass import BNPDataClass | ||
| import inspect | ||
| try: | ||
| import typer | ||
| except ImportError: | ||
| typer = None | ||
| def check_arguments(function): | ||
| arguments = inspect.getfullargspec(function) | ||
| print(arguments) | ||
| new_annotations = {name: str if isinstance(t, BNPDataClass) else t for name, t in arguments.annotations.items()} | ||
| if BNPDataClass(arguments.annotations['return']): | ||
| pass | ||
| class CliWrapper: | ||
| '''Convert all arguments that are typed with BNPDataclass into filename options''' | ||
| def __init__(self, *args, **kwargs): | ||
| self._args = args | ||
| self._kwargs = kwargs | ||
| def __call__(self, function): | ||
| argspec = inspect.getfullargspec(function) | ||
| do_write = 'return' in argspec.annotations and issubclass(argspec.annotations['return'], BNPDataClass) | ||
| def is_bnpdataclass(name: int) -> bool: | ||
| return issubclass(argspec.annotations[name], BNPDataClass) | ||
| @functools.wraps(function) | ||
| def new_func(*args, **kwargs): | ||
| new_args = [bnp_open(arg).read() if is_bnpdataclass(argspec.args[i]) else arg for i, arg in enumerate(args)] | ||
| new_kwargs = {k: bnp_open(v).read() if is_bnpdataclass(k) else v for k, v in kwargs.items() if k != 'output'} | ||
| return_val = function(*new_args, **new_kwargs) | ||
| if do_write: | ||
| bnp_open(kwargs['output'], "w").write(return_val) | ||
| sig = inspect.signature(function) | ||
| new_parameters = [val.replace(annotation=str) if issubclass(val.annotation, BNPDataClass) else val for key, val in sig.parameters.items()] | ||
| if do_write: | ||
| new_parameters.append(inspect.Parameter('output', inspect.Parameter.KEYWORD_ONLY, annotation=str, default=None)) | ||
| new_sig = sig.replace(parameters=new_parameters, return_annotation=sig.empty) | ||
| new_func.__signature__ = new_sig | ||
| # Add the new annotations to the function | ||
| annotations = {name: str if is_bnpdataclass(name) else t for name, t in argspec.annotations.items() if name != 'return'} | ||
| if do_write: | ||
| annotations['output'] = str | ||
| new_func.__annotations__ = annotations | ||
| return new_func |
| from bionumpy.datatypes import PairsEntry | ||
| from .delimited_buffers import DelimitedBuffer | ||
| class PairsBuffer(DelimitedBuffer): | ||
| dataclass = PairsEntry |
| import inspect | ||
| import bionumpy as bnp | ||
| from bionumpy.cli import CliWrapper | ||
| from .util import get_file_name | ||
| def mock_function(reads: bnp.datatypes.SequenceEntry) -> bnp.datatypes.SequenceEntry: | ||
| return reads[reads.sequence[:, 0] == 'A'] | ||
| def test_cli_wrapper(data_path, tmp_path): | ||
| cli_function = CliWrapper()(mock_function) | ||
| output_filename = tmp_path / 'tmp.fq.gz' | ||
| input_filename = data_path / 'big.fq.gz' | ||
| cli_function(input_filename, output=output_filename) | ||
| assert bnp.count_entries(output_filename) < bnp.count_entries(input_filename) // 2 | ||
| def test_cli_wrapper_annotations(): | ||
| cli_function = CliWrapper()(mock_function) | ||
| argspec = inspect.getfullargspec(cli_function) | ||
| print(argspec) | ||
| assert argspec.annotations['reads'] == str |
| import pytest | ||
| from bionumpy import MultiLineFastaBuffer | ||
| import bionumpy as bnp | ||
| from bionumpy.alignments.msa import MultipleSequenceAlignment | ||
| @pytest.fixture | ||
| def mfa_obj(data_path): | ||
| return bnp.open(data_path / "example.mfa", buffer_type=MultiLineFastaBuffer).read() | ||
| @pytest.fixture() | ||
| def msa(mfa_obj): | ||
| return MultipleSequenceAlignment.from_sequence_entries(mfa_obj) | ||
| def test_from_sequence_entries(mfa_obj): | ||
| sequences = mfa_obj | ||
| msa = MultipleSequenceAlignment.from_sequence_entries(sequences) | ||
| msa.matrix.shape == (3, 5) | ||
| @pytest.mark.skip | ||
| def test_mask(msa): | ||
| assert msa.mask().shape == msa.matrix.shape | ||
| assert msa.mask().dtype == bool | ||
| assert msa.mask().sum() == 20 |
| import bionumpy as bnp | ||
| from tempfile import NamedTemporaryFile | ||
| import numpy as np | ||
| def test_read_write_pairs_file(data_path): | ||
| file = data_path / "small.pairs" | ||
| data = bnp.open(file).read() | ||
| assert data[0].pos1 == 61 | ||
| file = NamedTemporaryFile(suffix=".pairs") | ||
| with bnp.open(file.name, mode='w') as f: | ||
| f.write(data) | ||
| new_data = bnp.open(file.name).read() | ||
| assert np.all(data == new_data) |
| Metadata-Version: 2.1 | ||
| Name: bionumpy | ||
| Version: 1.0.8 | ||
| Version: 1.0.10 | ||
| Summary: Library for working with biological sequence data as numpy arrays. | ||
@@ -5,0 +5,0 @@ Home-page: https://github.com/bionumpy/bionumpy |
@@ -8,2 +8,3 @@ HISTORY.rst | ||
| bionumpy/__init__.py | ||
| bionumpy/cli.py | ||
| bionumpy/computation_graph.py | ||
@@ -24,2 +25,3 @@ bionumpy/config.py | ||
| bionumpy/alignments/cigar.py | ||
| bionumpy/alignments/msa.py | ||
| bionumpy/arithmetics/__init__.py | ||
@@ -82,2 +84,3 @@ bionumpy/arithmetics/bedgraph.py | ||
| bionumpy/io/one_line_buffer.py | ||
| bionumpy/io/pairs.py | ||
| bionumpy/io/parser.py | ||
@@ -151,2 +154,3 @@ bionumpy/io/regexp.py | ||
| tests/test_chromosome_provider.py | ||
| tests/test_cli.py | ||
| tests/test_computation_graph.py | ||
@@ -184,5 +188,7 @@ tests/test_coordinate_mapping.py | ||
| tests/test_minimizers.py | ||
| tests/test_msa.py | ||
| tests/test_multistream.py | ||
| tests/test_mutation_types.py | ||
| tests/test_npdataclassstream.py | ||
| tests/test_pairs.py | ||
| tests/test_pandas_interface.py | ||
@@ -189,0 +195,0 @@ tests/test_parsers.py |
@@ -5,3 +5,3 @@ """Top-level package for bionumpy.""" | ||
| __email__ = "knutdrand@gmail.com" | ||
| __version__ = '1.0.8' | ||
| __version__ = '1.0.10' | ||
@@ -8,0 +8,0 @@ import npstructures as nps |
@@ -48,3 +48,3 @@ from .intervals import get_boolean_mask | ||
| N = (a+b+c+d) | ||
| return a*N/((a+b)*(a+c)) | ||
| return float(a*N/((a+b)*(a+c))) | ||
@@ -76,2 +76,2 @@ | ||
| N = (a+b+c+d) | ||
| return a/(N-d) | ||
| return float(a/(N-d)) |
@@ -5,3 +5,3 @@ import dataclasses | ||
| from collections import defaultdict | ||
| from typing import List, Type, Dict, Iterable, Union, Optional | ||
| from typing import List, Type, Dict, Iterable, Union, Optional, Any, Tuple | ||
| from numpy.typing import ArrayLike | ||
@@ -40,3 +40,10 @@ from npstructures.npdataclasses import npdataclass, NpDataClass, shallow_tuple | ||
| def todict(self): | ||
| def todict(self) -> Dict[str, ArrayLike]: | ||
| ''' | ||
| Convert the data into a dictionary with the field names as keys and the corresponding data as values. | ||
| Returns | ||
| ------- | ||
| dict[str, ArrayLike] | ||
| ''' | ||
| field_dict = {} | ||
@@ -52,3 +59,10 @@ for field in dataclasses.fields(self): | ||
| def topandas(self): | ||
| def topandas(self) -> 'pandas.DataFrame': | ||
| ''' | ||
| Convert the data into a pandas DataFrame with the fields as columns | ||
| Returns | ||
| ------- | ||
| pandas.DataFrame | ||
| ''' | ||
| return pandas_adaptor.get_data_frame(self.todict()) | ||
@@ -58,3 +72,15 @@ # return pd.DataFrame(self.todict()) | ||
| @classmethod | ||
| def from_data_frame(cls, df): | ||
| def from_data_frame(cls, df: 'pandas.DataFrame') -> 'BNPDataClass': | ||
| ''' | ||
| Convert a pandas DataFrame into a BNPDataClass object. | ||
| The columns of the dataframe are used as fields in the BNPDataClass object. | ||
| Parameters | ||
| ---------- | ||
| df: pandas.DataFrame | ||
| Returns | ||
| ------- | ||
| BNPDataClass | ||
| ''' | ||
| d = df.to_dict('series') | ||
@@ -64,3 +90,13 @@ return cls.from_dict(d) | ||
| @classmethod | ||
| def from_dict(cls, dict_object: Dict) -> 'BNPDataClass': | ||
| def from_dict(cls, dict_object: Dict[str, Any]) -> 'BNPDataClass': | ||
| ''' | ||
| Convert a dictionary into a BNPDataClass object. The keys of the dictionary are used as field names in the BNPDataClass object. | ||
| Parameters | ||
| ---------- | ||
| dict_object: dict | ||
| Returns | ||
| ------- | ||
| BNPDataClass | ||
| ''' | ||
| dict_names = [name.split('.')[0] for name in dict_object.keys()] | ||
@@ -90,2 +126,3 @@ field_names = {field.name for field in dataclasses.fields(cls)} | ||
| the data, use `toiter` instead. | ||
| Returns | ||
@@ -99,3 +136,12 @@ ------- | ||
| def toiter(self): | ||
| def toiter(self) -> Iterable['dataclass']: | ||
| """ | ||
| Convert the data into an iterator of entries from the | ||
| corrsponding dataclass with normal python types. | ||
| Returns | ||
| ------- | ||
| Iterable[cls.dataclass] | ||
| """ | ||
| iters = tuple(get_vanilla_generator(f) | ||
@@ -181,9 +227,32 @@ for f in shallow_tuple(self)) | ||
| @classmethod | ||
| def from_entry_tuples(cls, tuples): | ||
| def from_entry_tuples(cls, tuples: Iterable[tuple]) -> 'BNPDataClass': | ||
| return cls(*(list(c) for c in zip(*tuples))) | ||
| def sort_by(self, field_name: str) -> 'BNPDataClass': | ||
| """ | ||
| Sort the data by the given field | ||
| Parameters | ||
| ---------- | ||
| field_name: str | ||
| Returns | ||
| ------- | ||
| BNPDataClass | ||
| """ | ||
| return self[np.argsort(getattr(self, field_name))] | ||
| def set_context(self, name, value): | ||
| def set_context(self, name: str, value: Any): | ||
| """ | ||
| Set a context value for the object, typycally used for storing auxillary information like header information | ||
| Parameters | ||
| ---------- | ||
| name: str | ||
| value: Any | ||
| Returns | ||
| ------- | ||
| """ | ||
| if not hasattr(self, '_context'): | ||
@@ -193,3 +262,14 @@ self._context = dict() | ||
| def get_context(self, name): | ||
| def get_context(self, name: str)->Any: | ||
| """ | ||
| Get a context value for the object, typycally used for storing auxillary information like header information | ||
| Parameters | ||
| ---------- | ||
| name: str | ||
| Returns | ||
| ------- | ||
| """ | ||
| logger.warning(f'Deprecated method set_context in BNPDataClass') | ||
| if not hasattr(self, '_context'): | ||
@@ -222,3 +302,3 @@ self._context = dict() | ||
| ------- | ||
| npdataclass | ||
| bnpdataclass | ||
| `bnpdataclass` object that supports numpy like indexing | ||
@@ -329,7 +409,7 @@ | ||
| NewClass.__qualname__ = base_class.__qualname__ | ||
| NewClass.__doc__ = dataclasses.dataclass(base_class).__doc__ | ||
| return NewClass | ||
| def make_dataclass(fields: list, name: str = "DynamicDC", bases=()) -> Type[BNPDataClass]: | ||
| def make_dataclass(fields: List[Tuple], name: str = "DynamicDC", bases=()) -> Type[BNPDataClass]: | ||
| """ | ||
@@ -353,3 +433,18 @@ Constructs a dynamic dataclass from a list of attributes | ||
| def narrow_type(bnp_dc, field_name, field_type): | ||
| def narrow_type(bnp_dc: Type[BNPDataClass], field_name: str, field_type: type): | ||
| """ | ||
| Resticts the type of a field in a BNPDataClass | ||
| Parameters | ||
| ---------- | ||
| bnp_dc: Type[BNPDataClass] | ||
| field_name: str | ||
| field_type: type | ||
| Returns | ||
| ------- | ||
| Type[BNPDataClass] | ||
| """ | ||
| new_fields = [(f.name, field_type) if f.name==field_name else (f.name, f.type, f) for f in dataclasses.fields(bnp_dc)] | ||
@@ -356,0 +451,0 @@ return make_dataclass(new_fields, name=bnp_dc.__name__, bases=(bnp_dc,)) |
@@ -14,3 +14,3 @@ from .bnpdataclass import BNPDataClass | ||
| def replace(obj, **kwargs): | ||
| def replace(obj: BNPDataClass, **kwargs) -> BNPDataClass: | ||
| '''Replace the values of a dataclass with new values | ||
@@ -17,0 +17,0 @@ |
| import dataclasses | ||
| import types | ||
| from functools import lru_cache | ||
| from functools import lru_cache, wraps | ||
| from numbers import Number | ||
| from typing import Type, Optional, Any | ||
| import numpy as np | ||
| from bionumpy.io.dump_csv import get_column, join_columns | ||
| from bionumpy.io.exceptions import FormatException, ParsingException | ||
| from .bnpdataclass import BNPDataClass | ||
| from ..encoded_array import EncodedRaggedArray | ||
| from ..io.dump_csv import get_column | ||
| from ..io.exceptions import FormatException, ParsingException | ||
| # from bionumpy import EncodedRaggedArray | ||
| def translate_types(input_type): | ||
| if input_type == str: | ||
| return EncodedRaggedArray | ||
| elif input_type == int: | ||
| return np.ndarray | ||
| def buffer_backed_bnp(old_cls): | ||
| cls = types.new_class(old_cls.__name__, old_cls.__bases__, {}) | ||
| for i, (var_name, var_type) in enumerate(old_cls.__annotations__.items()): | ||
| setattr(cls, var_name, BufferBackedDescriptor(i, var_type)) | ||
| setattr(cls, '__init__', lambda self, buffer: setattr(self, '_buffer', buffer)) | ||
| return cls | ||
| class BufferBackedDescriptor: | ||
| ''' | ||
| This class is made to access and parse parts of a text buffer lazily. | ||
| ''' | ||
| def __init__(self, buffer, index, dtype): | ||
| self._buffer = buffer | ||
| self._index = index | ||
| self._dtype = dtype | ||
| def __get__(self, obj, objtype): | ||
| return self._dtype(obj._buffer.get_field_by_number(self._index, self._dtype)) | ||
| class LazyBNPDataClass: | ||
@@ -48,12 +19,2 @@ pass | ||
| class BaseClass: | ||
| def __init__(self, buffer): | ||
| self._buffer = buffer | ||
| def __getattr__(self, var_name): | ||
| if var_name in self._buffer: | ||
| return self._buffer[var_name] | ||
| return super().__getattr__(var_name) | ||
| class ItemGetter: | ||
@@ -92,6 +53,22 @@ def __init__(self, buffer: 'FileBuffer', dataclass: dataclasses.dataclass, start_line=0): | ||
| def create_lazy_class(dataclass, header=None): | ||
| def create_lazy_class(dataclass: Type[BNPDataClass], header: Optional[Any] = None) -> Type[BNPDataClass]: | ||
| ''' | ||
| Creates a dataclass that emulates the given BNPDataClass but with lazy loading of fields | ||
| Parameters | ||
| ---------- | ||
| dataclass | ||
| header | ||
| Returns | ||
| ------- | ||
| ''' | ||
| field_names = [field.name for field in dataclasses.fields(dataclass)] | ||
| class NewClass(dataclass, LazyBNPDataClass): | ||
| """ | ||
| A class that lazily loads fields from a buffer | ||
| """ | ||
| def __init__(self, item_getter, set_values=None, computed_values=None): | ||
@@ -108,18 +85,24 @@ self._itemgetter = item_getter | ||
| @classmethod | ||
| def from_data_frame(cls, df): | ||
| @wraps(dataclass.from_data_frame) | ||
| def from_data_frame(cls, df: 'pd.DataFrame') -> 'NewClass': | ||
| return dataclass.from_data_frame(df) | ||
| @classmethod | ||
| @wraps(dataclass.from_dict) | ||
| def from_dict(cls, d): | ||
| return dataclass.from_dict(d) | ||
| @wraps(dataclass.toiter) | ||
| def toiter(self): | ||
| return self.get_data_object().toiter() | ||
| @wraps(dataclass.tolist) | ||
| def tolist(self): | ||
| return self.get_data_object().tolist() | ||
| @wraps(dataclass.todict) | ||
| def todict(self): | ||
| return self.get_data_object().todict() | ||
| @wraps(dataclass.topandas) | ||
| def topandas(self): | ||
@@ -179,3 +162,11 @@ return self.get_data_object().topandas() | ||
| def get_data_object(self): | ||
| def get_data_object(self) -> BNPDataClass: | ||
| """ | ||
| Returns the BNPDataClass with all fields loaded | ||
| Returns | ||
| ------- | ||
| BNPDataClass | ||
| """ | ||
| if not self._computed: | ||
@@ -196,3 +187,3 @@ fields = [getattr(self, field.name) for field in dataclasses.fields(dataclass)] | ||
| if hasattr(values[0]._itemgetter.buffer, 'concatenate'): | ||
| #if all(not a._set_values for a in values): | ||
| # if all(not a._set_values for a in values): | ||
| set_values = {name: np.concatenate([a._set_values[name] for a in values]) | ||
@@ -210,3 +201,3 @@ for name in self._set_values} | ||
| def get_buffer(self, buffer_class=None): | ||
| def get_buffer(self, buffer_class=None) -> EncodedRaggedArray: | ||
| if buffer_class is None: | ||
@@ -216,3 +207,3 @@ buffer_class = self._itemgetter.buffer.__class__ | ||
| 'SKIP_LAZY') or hasattr( | ||
| buffer_class, 'SKIP_LAZY'): | ||
| buffer_class, 'SKIP_LAZY'): | ||
| return self._itemgetter.buffer.from_data(self.get_data_object()) | ||
@@ -219,0 +210,0 @@ columns = [] |
@@ -158,22 +158,2 @@ from ..typing import SequenceID | ||
| class SortedIntervals: | ||
| def __init__(self, data): | ||
| self.data = np.asanyarray(data) | ||
| assert data.shape[-1] == 2 | ||
| assert len(data.shape) == 2 | ||
| self.starts = self.data[..., 0] | ||
| self.ends = self.data[..., 1] | ||
| def in_intervals(self, position): | ||
| idx = np.minimum( | ||
| np.searchsorted(self.starts, position, side="left"), self.starts.size - 1 | ||
| ) | ||
| return (position >= self.starts[idx]) & (position < self.ends[idx]) | ||
| @classmethod | ||
| def concatenate(cls, elements): | ||
| return cls(np.vstack([element.data for element in elements])) | ||
| @bnpdataclass | ||
@@ -218,1 +198,14 @@ class SAMEntry: | ||
| directions: List[int] | ||
| @bnpdataclass | ||
| class PairsEntry: | ||
| """https://pairtools.readthedocs.io/en/latest/formats.html""" | ||
| read_id: str | ||
| chrom1: SequenceID | ||
| pos1: int | ||
| chrom2: SequenceID | ||
| pos2: int | ||
| strand1: StrandEncoding | ||
| strand2: StrandEncoding | ||
@@ -20,3 +20,2 @@ from ..bnpdataclass import bnpdataclass, BNPDataClass | ||
| atributes: str | ||
| #attributes: BNPDataClass | ||
@@ -27,3 +26,2 @@ def _get_attributes(self, attribute_names): | ||
| for name in attribute_names} | ||
| # ends_in_sep = gtf_entries.atributes[:, -1] == | ||
| self.atributes[:, -1] = " " | ||
@@ -30,0 +28,0 @@ all_features = split(self.atributes.ravel(), " ") |
@@ -11,3 +11,3 @@ """ | ||
| from npstructures.mixin import NPSArray | ||
| from typing import Tuple, List | ||
| from typing import Tuple, List, Union | ||
| import numpy as np | ||
@@ -40,2 +40,5 @@ from abc import abstractmethod | ||
| class OneToOneEncoding(Encoding): | ||
| """Represents encodings that are one-to-one, i.e. where each element is | ||
| encoded to one element and vice versa. This class is meant to be subclassed | ||
| when implementing specific encodings.""" | ||
@@ -233,2 +236,3 @@ def encode(self, data): | ||
| def get_NPSArray(array): | ||
@@ -253,2 +257,4 @@ return array.view(NPSArray) | ||
| Parameters | ||
@@ -261,3 +267,10 @@ ---------- | ||
| Examples | ||
| -------- | ||
| >>> import bionumpy as bnp | ||
| >>> import numpy as np | ||
| >>> print(EncodedArray(np.array([0, 1, 2, 3]), bnp.DNAEncoding)) | ||
| ACGT | ||
| """ | ||
| if isinstance(data, EncodedArray): | ||
@@ -274,6 +287,6 @@ assert data.encoding == encoding | ||
| @property | ||
| def T(self): | ||
| def T(self) -> "EncodedArray": | ||
| return self.__class__(self.data.T, self.encoding) | ||
| def copy(self): | ||
| def copy(self) -> 'EncodedArray': | ||
| return self.__class__(self.data.copy(), self.encoding) | ||
@@ -287,6 +300,9 @@ | ||
| def tolist(self): | ||
| def tolist(self) -> str: | ||
| """Converts the data to a string by decoding the data. | ||
| This behaviour is compatible with NumPy's scalar behaviour (only a single element)""" | ||
| return self.to_string() | ||
| def to_string(self) -> str: | ||
| """Converts the data to a string by decoding the data""" | ||
| if not self.encoding.is_one_to_one_encoding(): | ||
@@ -381,3 +397,3 @@ return self.encoding.to_string(self.data) | ||
| def __hash__(self): | ||
| def __hash__(self) -> int: | ||
| if len(self.shape) <= 1: | ||
@@ -480,3 +496,3 @@ return hash(self.to_string()) | ||
| def ravel(self): | ||
| def ravel(self) -> "EncodedArray": | ||
| return self.__class__(self.data.ravel(), self.encoding) | ||
@@ -633,4 +649,10 @@ | ||
| -------- | ||
| 5 | ||
| >>> import bionumpy as bnp | ||
| >>> encoded_array = bnp.DNAEncoding.encode("ACGT") | ||
| >>> print(from_encoded_array(encoded_array)) | ||
| ACGT | ||
| >>> encoded_array = bnp.DNAEncoding.encode(["ACGT", "ACGT"]) | ||
| >>> print(from_encoded_array(encoded_array)) | ||
| ['ACGT', 'ACGT'] | ||
| """ | ||
@@ -643,3 +665,32 @@ if isinstance(encoded_array, EncodedRaggedArray): | ||
| def change_encoding(encoded_array, new_encoding): | ||
| def change_encoding(encoded_array: Union[EncodedArray, EncodedRaggedArray], new_encoding: Encoding) \ | ||
| -> Union[EncodedArray, EncodedRaggedArray]: | ||
| """ | ||
| Changes the encoding of an `EncodedArray` or `EncodedRaggedArray` by decoding the data and | ||
| encoding it again with the new encoding. | ||
| Parameters: | ||
| ----------- | ||
| encoded_array : EncodedArray/EncodedRaggedArray | ||
| The data to change encoding on | ||
| new_encoding : Encoding | ||
| The new encoding to use | ||
| Returns: | ||
| -------- | ||
| EncodedArray/EncodedRaggedArray | ||
| The data with the new encoding | ||
| Examples | ||
| -------- | ||
| >>> import bionumpy as bnp | ||
| >>> encoded_array = bnp.as_encoded_array("ACGT", bnp.DNAEncoding) | ||
| >>> encoded_array.raw() | ||
| array([0, 1, 2, 3], dtype=uint8) | ||
| >>> new_encoding = bnp.BaseEncoding | ||
| >>> new_encoded_array = change_encoding(encoded_array, new_encoding) | ||
| >>> new_encoded_array.raw() | ||
| array([65, 67, 71, 84], dtype=uint8) | ||
| """ | ||
| assert isinstance(encoded_array, (EncodedArray, EncodedRaggedArray)), \ | ||
@@ -646,0 +697,0 @@ "Can only change encoding of EncodedArray or EncodedRaggedArray" |
@@ -11,14 +11,2 @@ from ..encoded_array import BaseEncoding, Encoding, NumericEncoding | ||
| # class StrandEncoding(Encoding): | ||
| # MIN_CODE = ord("+") | ||
| # | ||
| # @classmethod | ||
| # def encode(cls, bytes_array): | ||
| # return (bytes_array & np.uint8(2)) >> np.uint8(1) | ||
| # | ||
| # @classmethod | ||
| # def decode(cls, strands): | ||
| # return 2 * strands + cls.MIN_CODE | ||
| class DigitEncodingFactory(NumericEncoding): | ||
@@ -44,9 +32,2 @@ def __init__(self, min_code): | ||
| def set_backend(lib): | ||
| #from ..cupy_compatible.encodings.alphabet_encoding import CPAlphabetEncoding | ||
| #from ..cupy_compatible.encodings.alphabet_encoding import CPACTGEncoding | ||
| #from ..cupy_compatible.encodings.alphabet_encoding import CPAminoAcidEncoding | ||
| #sys.modules[__name__].AlphabetEncoding = CPAlphabetEncoding | ||
| #sys.modules[__name__].ACTGEncoding = CPACTGEncoding | ||
| #sys.modules[__name__].AminoAcidEncoding = CPAminoAcidEncoding | ||
@@ -53,0 +34,0 @@ from . import base_encoding |
@@ -52,61 +52,61 @@ import numpy as np | ||
| class ACTGTwoBitEncoding: | ||
| letters = ["A", "C", "T", "G"] | ||
| bitcodes = ["00", "01", "10", "11"] | ||
| reverse = np.array([1, 3, 20, 7], dtype=np.uint8) | ||
| _lookup_2bytes_to_4bits = np.zeros(256 * 256, dtype=np.uint8) | ||
| _lookup_2bytes_to_4bits[ | ||
| 256 * reverse[np.arange(4)[:, None]] + reverse[np.arange(4)] | ||
| ] = np.arange(4)[:, None] * 4 + np.arange(4) | ||
| _shift_4bits = 4 * np.arange(2, dtype=np.uint8) | ||
| _shift_2bits = 2 * np.arange(4, dtype=np.uint8) | ||
| # class ACTGTwoBitEncoding: | ||
| # letters = ["A", "C", "T", "G"] | ||
| # bitcodes = ["00", "01", "10", "11"] | ||
| # reverse = np.array([1, 3, 20, 7], dtype=np.uint8) | ||
| # _lookup_2bytes_to_4bits = np.zeros(256 * 256, dtype=np.uint8) | ||
| # _lookup_2bytes_to_4bits[ | ||
| # 256 * reverse[np.arange(4)[:, None]] + reverse[np.arange(4)] | ||
| # ] = np.arange(4)[:, None] * 4 + np.arange(4) | ||
| # _shift_4bits = 4 * np.arange(2, dtype=np.uint8) | ||
| # _shift_2bits = 2 * np.arange(4, dtype=np.uint8) | ||
| # | ||
| # @classmethod | ||
| # def convert_2bytes_to_4bits(cls, two_bytes): | ||
| # assert two_bytes.dtype == np.uint16, two_bytes.dtype | ||
| # return cls._lookup_2bytes_to_4bits[two_bytes] | ||
| # | ||
| # @classmethod | ||
| # def join_4bits_to_byte(cls, four_bits): | ||
| # return np.sum(four_bits << cls._shift_4bits, axis=1, dtype=np.uint8) | ||
| # | ||
| # @classmethod | ||
| # def complement(cls, char): | ||
| # complements = np.packbits([1, 0, 1, 0, 1, 0, 1, 0]) | ||
| # dtype = char.dtype | ||
| # return (char.view(np.uint8) ^ complements).view(dtype) | ||
| # | ||
| # @classmethod | ||
| # def encode(cls, sequence): | ||
| # if sequence.size % 16 != 0: | ||
| # sequence = np.append( | ||
| # sequence, np.empty(16 - (sequence.size % 16), dtype=np.uint8) | ||
| # ) | ||
| # | ||
| # assert sequence.dtype == np.uint8 | ||
| # assert sequence.size % 4 == 0, sequence.size | ||
| # sequence = sequence & 31 | ||
| # four_bits = cls.convert_2bytes_to_4bits(sequence.view(np.uint16)) | ||
| # codes = cls.join_4bits_to_byte(four_bits.reshape(-1, 2)) | ||
| # assert codes.dtype == np.uint8, codes.dtype | ||
| # return codes.flatten().view(np.uint8) | ||
| # | ||
| # @classmethod | ||
| # def from_string(cls, string): | ||
| # byte_repr = np.array([ord(c) for c in string], dtype=np.uint8) | ||
| # return cls.encode(byte_repr) | ||
| # | ||
| # @classmethod | ||
| # def to_string(cls, bits): | ||
| # byte_repr = cls.decode(bits) | ||
| # return "".join(chr(b) for b in byte_repr) | ||
| # | ||
| # @classmethod | ||
| # def decode(cls, sequence): | ||
| # assert sequence.dtype == np.uint8 | ||
| # bit_mask = np.uint8(3) # last two bits | ||
| # all_bytes = (sequence[:, None] >> cls._shift_2bits) & bit_mask | ||
| # return cls.reverse[all_bytes.flatten()] + 96 | ||
| @classmethod | ||
| def convert_2bytes_to_4bits(cls, two_bytes): | ||
| assert two_bytes.dtype == np.uint16, two_bytes.dtype | ||
| return cls._lookup_2bytes_to_4bits[two_bytes] | ||
| @classmethod | ||
| def join_4bits_to_byte(cls, four_bits): | ||
| return np.sum(four_bits << cls._shift_4bits, axis=1, dtype=np.uint8) | ||
| @classmethod | ||
| def complement(cls, char): | ||
| complements = np.packbits([1, 0, 1, 0, 1, 0, 1, 0]) | ||
| dtype = char.dtype | ||
| return (char.view(np.uint8) ^ complements).view(dtype) | ||
| @classmethod | ||
| def encode(cls, sequence): | ||
| if sequence.size % 16 != 0: | ||
| sequence = np.append( | ||
| sequence, np.empty(16 - (sequence.size % 16), dtype=np.uint8) | ||
| ) | ||
| assert sequence.dtype == np.uint8 | ||
| assert sequence.size % 4 == 0, sequence.size | ||
| sequence = sequence & 31 | ||
| four_bits = cls.convert_2bytes_to_4bits(sequence.view(np.uint16)) | ||
| codes = cls.join_4bits_to_byte(four_bits.reshape(-1, 2)) | ||
| assert codes.dtype == np.uint8, codes.dtype | ||
| return codes.flatten().view(np.uint8) | ||
| @classmethod | ||
| def from_string(cls, string): | ||
| byte_repr = np.array([ord(c) for c in string], dtype=np.uint8) | ||
| return cls.encode(byte_repr) | ||
| @classmethod | ||
| def to_string(cls, bits): | ||
| byte_repr = cls.decode(bits) | ||
| return "".join(chr(b) for b in byte_repr) | ||
| @classmethod | ||
| def decode(cls, sequence): | ||
| assert sequence.dtype == np.uint8 | ||
| bit_mask = np.uint8(3) # last two bits | ||
| all_bytes = (sequence[:, None] >> cls._shift_2bits) & bit_mask | ||
| return cls.reverse[all_bytes.flatten()] + 96 | ||
| class ACTGEncoding: | ||
@@ -129,29 +129,29 @@ _lookup_byte_to_2bits = np.zeros(256, dtype=np.uint8) | ||
| class SimpleEncoding(ACTGTwoBitEncoding): | ||
| _lookup_byte_to_2bits = np.zeros(256, dtype=np.uint8) | ||
| _lookup_byte_to_2bits[[97, 65]] = 0 | ||
| _lookup_byte_to_2bits[[99, 67]] = 1 | ||
| _lookup_byte_to_2bits[[116, 84]] = 2 | ||
| _lookup_byte_to_2bits[[103, 71]] = 3 | ||
| # class SimpleEncoding(ACTGTwoBitEncoding): | ||
| # _lookup_byte_to_2bits = np.zeros(256, dtype=np.uint8) | ||
| # _lookup_byte_to_2bits[[97, 65]] = 0 | ||
| # _lookup_byte_to_2bits[[99, 67]] = 1 | ||
| # _lookup_byte_to_2bits[[116, 84]] = 2 | ||
| # _lookup_byte_to_2bits[[103, 71]] = 3 | ||
| # | ||
| # _shift_2bits = 2 * np.arange(4, dtype=np.uint8) | ||
| # | ||
| # @classmethod | ||
| # def convert_byte_to_2bits(cls, one_byte): | ||
| # assert one_byte.dtype == np.uint8, one_byte.dtype | ||
| # return cls._lookup_byte_to_2bits[one_byte] | ||
| # | ||
| # @classmethod | ||
| # def join_2bits_to_byte(cls, two_bits_vector): | ||
| # return np.bitwise_or.reduce(two_bits_vector << cls._shift_2bits, axis=-1) | ||
| # | ||
| # @classmethod | ||
| # def encode(cls, sequence): | ||
| # assert sequence.dtype == np.uint8 | ||
| # assert sequence.size % 4 == 0, sequence.size | ||
| # two_bits = cls.convert_byte_to_2bits(sequence) | ||
| # codes = cls.join_2bits_to_byte(two_bits.reshape(-1, 4)) | ||
| # return codes.flatten() | ||
| _shift_2bits = 2 * np.arange(4, dtype=np.uint8) | ||
| @classmethod | ||
| def convert_byte_to_2bits(cls, one_byte): | ||
| assert one_byte.dtype == np.uint8, one_byte.dtype | ||
| return cls._lookup_byte_to_2bits[one_byte] | ||
| @classmethod | ||
| def join_2bits_to_byte(cls, two_bits_vector): | ||
| return np.bitwise_or.reduce(two_bits_vector << cls._shift_2bits, axis=-1) | ||
| @classmethod | ||
| def encode(cls, sequence): | ||
| assert sequence.dtype == np.uint8 | ||
| assert sequence.size % 4 == 0, sequence.size | ||
| two_bits = cls.convert_byte_to_2bits(sequence) | ||
| codes = cls.join_2bits_to_byte(two_bits.reshape(-1, 4)) | ||
| return codes.flatten() | ||
| def twobit_swap(number): | ||
@@ -158,0 +158,0 @@ dtype = number.dtype |
@@ -0,1 +1,3 @@ | ||
| from typing import List | ||
| import numpy as np | ||
@@ -7,2 +9,7 @@ from ..encoded_array import OneToOneEncoding | ||
| class AlphabetEncoding(OneToOneEncoding): | ||
| """ | ||
| Encoding for an alphabet. The encoding is one-to-one and the alphabet is | ||
| defined by the input string. The encoding is case-insensitive. | ||
| """ | ||
| def __init__(self, alphabet: str): | ||
@@ -45,15 +52,32 @@ self._raw_alphabet = [c.upper() for c in alphabet] | ||
| array = np.asarray(encoded) | ||
| # assert np.issubdtype(array.dtype, int), (encoded, array) | ||
| return self._alphabet[array] | ||
| @property | ||
| def alphabet_size(self): | ||
| def alphabet_size(self)->int: | ||
| """ | ||
| Get the size of the alphabet | ||
| Returns | ||
| ------- | ||
| int | ||
| The size of the alphabet | ||
| """ | ||
| self._initialize() | ||
| return self._alphabet.size | ||
| def get_alphabet(self): | ||
| def get_alphabet(self)-> List[str]: | ||
| """ | ||
| Get the alphabet | ||
| Returns | ||
| ------- | ||
| list[str] | ||
| The alphabet | ||
| """ | ||
| self._initialize() | ||
| return [chr(c) for c in self._alphabet] | ||
| def get_labels(self): | ||
| def get_labels(self) -> List[str]: | ||
| return self.get_alphabet() | ||
@@ -60,0 +84,0 @@ |
@@ -0,1 +1,3 @@ | ||
| from typing import Union, List | ||
| from . import AlphabetEncoding | ||
@@ -10,2 +12,6 @@ from ..encoded_array import Encoding | ||
| class KmerEncoding(Encoding): | ||
| """ | ||
| Encodes kmers of a given length using an alphabet encoding. | ||
| """ | ||
| def __init__(self, alphabet_encoding: AlphabetEncoding, k: int): | ||
@@ -17,6 +23,19 @@ assert is_subclass_or_instance(alphabet_encoding, AlphabetEncoding), alphabet_encoding | ||
| @property | ||
| def k(self): | ||
| def k(self) -> int: | ||
| return self._k | ||
| def encode(self, data): | ||
| def encode(self, data: Union[str, list, EncodedRaggedArray]) -> Union[EncodedArray, EncodedRaggedArray]: | ||
| """ | ||
| Encodes a string, list of strings or EncodedRaggedArray into an EncodedArray or | ||
| EncodedRaggedArray of hashed kmers. | ||
| Parameters | ||
| ---------- | ||
| data: Union[str, list, EncodedRaggedArray] | ||
| Returns | ||
| ------- | ||
| Union[EncodedArray, EncodedRaggedArray] | ||
| """ | ||
| if isinstance(data, str): | ||
@@ -26,3 +45,3 @@ assert len(data) == self.k | ||
| return EncodedArray( | ||
| letters.dot(self._alphabet_encoding.alphabet_size**np.arange(self._k)), | ||
| letters.dot(self._alphabet_encoding.alphabet_size ** np.arange(self._k)), | ||
| self) | ||
@@ -35,8 +54,7 @@ if isinstance(data, (list, EncodedRaggedArray)): | ||
| return EncodedArray( | ||
| letters.dot(self._alphabet_encoding.alphabet_size**np.arange(self._k)), | ||
| letters.dot(self._alphabet_encoding.alphabet_size ** np.arange(self._k)), | ||
| self) | ||
| print(data, type(data)) | ||
| raise NotImplementedError | ||
| def to_string(self, kmer): | ||
| def to_string(self, kmer: int) -> str: | ||
| """ | ||
@@ -52,3 +70,3 @@ Returns a human-readable string representation | ||
| n = self._alphabet_encoding.alphabet_size | ||
| tmp = (kmer//n**np.arange(self._k)) % n | ||
| tmp = (kmer // n ** np.arange(self._k)) % n | ||
@@ -59,5 +77,5 @@ chars = EncodedArray(tmp, self._alphabet_encoding) | ||
| def get_labels(self): | ||
| def get_labels(self) -> List[str]: | ||
| assert self._k <= 8, "Only supported for k <= 5" | ||
| return [self.to_string(kmer) for kmer in range(self._alphabet_encoding.alphabet_size**self._k)] | ||
| return [self.to_string(kmer) for kmer in range(self._alphabet_encoding.alphabet_size ** self._k)] | ||
@@ -64,0 +82,0 @@ def __str__(self): |
@@ -0,1 +1,3 @@ | ||
| from typing import Optional, List, Union | ||
| import numpy as np | ||
@@ -9,3 +11,6 @@ from ..encoded_array import Encoding, as_encoded_array, EncodedArray, EncodedRaggedArray, encoded_array_from_nparray | ||
| class StringEncoding(Encoding): | ||
| def __init__(self, sequences, modulo=None): | ||
| """ | ||
| Encodes strings into a numeric value corresponding to the index of the string in the input list. | ||
| """ | ||
| def __init__(self, sequences: List[str], modulo: Optional[int] = None): | ||
| self._seqeunces = as_encoded_array(sequences) | ||
@@ -15,9 +20,36 @@ self._modulo = modulo | ||
| def get_labels(self): | ||
| def get_labels(self) -> List[str]: | ||
| return self._seqeunces.tolist() | ||
| def to_string(self, n): | ||
| def to_string(self, n: int) -> str: | ||
| """ | ||
| Get the string corresponding to the index n. | ||
| Parameters | ||
| ---------- | ||
| n: int | ||
| Returns | ||
| ------- | ||
| str | ||
| The string corresponding to the index n. | ||
| """ | ||
| return self._seqeunces[int(n)].to_string() | ||
| def encode(self, encoded_ragged_array): | ||
| def encode(self, encoded_ragged_array: Union[EncodedRaggedArray, StringArray, List[str]]) -> Union[EncodedArray, EncodedRaggedArray]: | ||
| """ | ||
| Encode a string, list of strings or EncodedRaggedArray into an EncodedArray or EncodedRaggedArray of hashed strings. | ||
| Parameters | ||
| ---------- | ||
| encoded_ragged_array: Union[EncodedRaggedArray, StringArray, list[str]] | ||
| The input data to encode. | ||
| Returns | ||
| ------- | ||
| Union[EncodedArray, EncodedRaggedArray] | ||
| The encoded data. | ||
| """ | ||
| if isinstance(encoded_ragged_array, StringArray): | ||
@@ -38,3 +70,15 @@ pass # encoded_ragged_array = encoded_array_from_nparray(encoded_ragged_array) | ||
| def decode(self, encoded_array): | ||
| def decode(self, encoded_array: Union[EncodedArray, np.ndarray]) -> Union[str, List[str]]: | ||
| """ | ||
| Decode an EncodedArray or np.ndarray into a string or list of strings. | ||
| Parameters | ||
| ---------- | ||
| encoded_array | ||
| Returns | ||
| ------- | ||
| """ | ||
| if isinstance(encoded_array, EncodedArray): | ||
@@ -41,0 +85,0 @@ data = encoded_array.raw() |
@@ -62,3 +62,2 @@ import itertools | ||
| genotype_rows = EncodedArray(np.zeros((0, 3)), BaseEncoding) | ||
| # genotype_rows = as_encoded_array(genotype_rows) | ||
| data = genotype_rows.ravel() | ||
@@ -70,4 +69,2 @@ # hack because the row sometime ends with \n and sometimes with \t | ||
| return data[indices[:-1, np.newaxis] + np.array([1, 2, 3])] | ||
| #data = split(data.ravel(), "\t")[:-1, 0:3] # don't include last element which is empty | ||
| #return data | ||
@@ -81,3 +78,3 @@ def decode(self, genotype): | ||
| genotype = genotype.raw() | ||
| decoded = self.decode_lookup()[genotype].reshape(new_shape)# genotype.shape[0], genotype.shape[1]*4) | ||
| decoded = self.decode_lookup()[genotype].reshape(new_shape) | ||
| # remove last tab | ||
@@ -84,0 +81,0 @@ return decoded[..., :-1] |
@@ -42,2 +42,5 @@ from ..datatypes import GTFEntry | ||
| def __repr__(self): | ||
| return f'GenomicAnnotation(genome_context={self._genome_context}, data={self._data})' | ||
| @property | ||
@@ -44,0 +47,0 @@ def genes(self) -> Genes: |
@@ -40,3 +40,16 @@ import logging | ||
| def with_ignored_added(self, ignored): | ||
| def with_ignored_added(self, ignored: Iterable[str]) -> 'GenomeContext': | ||
| ''' | ||
| Make a new GenomeContext with additional ignored chromosomes. This is useful for allowing but ignoring | ||
| chromosome names that are not in the origin genome | ||
| Parameters | ||
| ---------- | ||
| ignored: Iterable[str] | ||
| Returns | ||
| ------- | ||
| GenomeContext | ||
| ''' | ||
| c = self._original_chrom_sizes.copy() | ||
@@ -43,0 +56,0 @@ c.update({name: 0 for name in ignored}) |
| import os | ||
| import numpy as np | ||
| from typing import Dict | ||
| from typing import Dict, List, Optional | ||
| from pathlib import PurePath | ||
@@ -39,7 +39,39 @@ from ..io import bnp_open, Bed6Buffer, BedBuffer, open_indexed | ||
| def with_ignored_added(self, ignored): | ||
| def with_ignored_added(self, ignored: List[str]) -> 'Genome': | ||
| ''' | ||
| Make a new GenomeContext with additional ignored chromosomes. This is useful for allowing but ignoring | ||
| chromosome names that are not in the origin genome. | ||
| Parameters | ||
| ---------- | ||
| ignored: Iterable[str] | ||
| Returns | ||
| ------- | ||
| Genome | ||
| ''' | ||
| return self.__class__(self._genome_context.with_ignored_added(ignored), self._fasta_filename) | ||
| @classmethod | ||
| def from_dict(cls, chrom_sizes: Dict[str, int], *args, **kwargs): | ||
| def from_dict(cls, chrom_sizes: Dict[str, int], *args, **kwargs) -> 'Genome': | ||
| ''' | ||
| Create a Genome object from a dictionary of chromosome sizes | ||
| Parameters | ||
| ---------- | ||
| chrom_sizes: Dict[str, int] | ||
| args: Additional args to be passed to the Genome constructor | ||
| kwargs: Additional kwargs to be passed to the Genome constructor | ||
| Returns | ||
| ------- | ||
| Genome | ||
| Examples | ||
| -------- | ||
| >>> import bionumpy as bnp | ||
| >>> bnp.Genome.from_dict({'chr1': 1000, 'chr2': 2000}) | ||
| Genome(['chr1', 'chr2']) | ||
| ''' | ||
| return cls(chrom_sizes, *args, **kwargs) | ||
@@ -67,2 +99,8 @@ | ||
| Examples | ||
| -------- | ||
| >>> import bionumpy as bnp | ||
| >>> bnp.Genome.from_file('example_data/hg38.chrom.sizes') | ||
| Genome(['chr1', 'chr2', 'chr3', 'chr4', 'chr5', 'chr6', 'chr7', 'chr8', 'chr9', 'chr10', '...']) | ||
| """ | ||
@@ -101,2 +139,12 @@ path = PurePath(filename) | ||
| GenomicArray | ||
| Examples | ||
| -------- | ||
| >>> import bionumpy as bnp | ||
| >>> bedgraph = bnp.datatypes.BedGraph(chromosome=['chr1', 'chr1', 'chr2'], start=[0, 10, 0], stop=[5, 15, 5], value=[1, 2, 3]) | ||
| >>> genome = bnp.Genome.from_dict({'chr1': 20, 'chr2': 10}) | ||
| >>> genome.get_track(bedgraph) | ||
| chr1: [1 1 1 1 1 0 0 0 0 0 2 2 2 2 2 0 0 0 0 0] | ||
| chr2: [3 3 3 3 3 0 0 0 0 0] | ||
| """ | ||
@@ -119,2 +167,15 @@ bedgraph = self._mask_data_on_extra_chromosomes(bedgraph) | ||
| Returns | ||
| ------- | ||
| GenomicArray | ||
| Examples | ||
| -------- | ||
| >>> import bionumpy as bnp | ||
| >>> genome = bnp.Genome.from_dict({'chr1': 30000, 'chr2': 31000, 'chr3': 32000}) | ||
| >>> genome.read_track('example_data/small_treat_pileup.bdg') | ||
| chr1: [ 0.0 0.0 1.0 ... 0.0 0.0 0.0] | ||
| chr2: [ 0.0 0.0 0.0 ... 0.0 0.0 0.0] | ||
| chr3: [ 0.0 0.0 0.0 ... 0.0 0.0 0.0] | ||
| """ | ||
@@ -137,2 +198,16 @@ content = self._open(filename, stream) | ||
| GenomicIntervals | ||
| Examples | ||
| -------- | ||
| >>> import bionumpy as bnp | ||
| >>> intervals = bnp.Interval(chromosome=['chr1', 'chr1', 'chr2'], start=[0, 10, 0], stop=[5, 15, 5]) | ||
| >>> genome = bnp.Genome.from_dict({'chr1': 20, 'chr2': 10}) | ||
| >>> genome.get_intervals(intervals) | ||
| Genomic Intervals on ['chr1', 'chr2']: | ||
| Interval with 3 entries | ||
| chromosome start stop | ||
| chr1 0 5 | ||
| chr1 10 15 | ||
| chr2 0 5 | ||
| """ | ||
@@ -159,2 +234,22 @@ return GenomicIntervals.from_intervals(intervals, self._genome_context, is_stranded=stranded) | ||
| GenomicIntervals | ||
| Examples | ||
| -------- | ||
| >>> import bionumpy as bnp | ||
| >>> genome = bnp.Genome.from_file('example_data/small_sequence.fa') | ||
| >>> genome.read_intervals('example_data/small_summits.bed') | ||
| Genomic Intervals on ['chr1', 'chr2', 'chr3']: | ||
| Interval with 13 entries | ||
| chromosome start stop | ||
| chr1 639 640 | ||
| chr1 6023 6024 | ||
| chr1 7124 7125 | ||
| chr2 849 850 | ||
| chr2 6320 6321 | ||
| chr2 8483 8484 | ||
| chr2 11342 11343 | ||
| chr2 12527 12528 | ||
| chr2 13092 13093 | ||
| chr2 18943 18944 | ||
| """ | ||
@@ -173,3 +268,2 @@ path = PurePath(filename) | ||
| return self.get_intervals(content, stranded) | ||
| # return GenomicIntervals.from_intervals(content, self._chrom_sizes) | ||
@@ -199,4 +293,25 @@ def read_locations(self, filename: str, stranded: bool = False, stream: bool = False, has_numeric_chromosomes=False, buffer_type=None) -> GenomicLocation: | ||
| Examples | ||
| -------- | ||
| >>> import bionumpy as bnp | ||
| >>> genome = bnp.Genome.from_file('example_data/hg38.chrom.sizes') | ||
| >>> genome.read_locations('example_data/thousand_genomes.vcf', has_numeric_chromosomes=True) | ||
| Genomic Locations on ['chr1', 'chr2', 'chr3', 'chr4', 'chr5', 'chr6', 'chr7', 'chr8', 'chr9', 'chr10', '...']: | ||
| LocationEntry with 74 entries | ||
| chromosome position | ||
| chr21 5033883 | ||
| chr21 5035657 | ||
| chr21 5038297 | ||
| chr21 5038312 | ||
| chr21 5052250 | ||
| chr21 5053935 | ||
| chr21 5053961 | ||
| chr21 5063903 | ||
| chr21 5063916 | ||
| chr21 5064678 | ||
| """ | ||
| assert not (stream and has_numeric_chromosomes) | ||
| assert not stranded, "Stranded locations are not supported yet" | ||
| f = bnp_open(filename, buffer_type=buffer_type) | ||
@@ -233,2 +348,15 @@ data = f.read_chunks() | ||
| Examples | ||
| -------- | ||
| >>> import bionumpy as bnp | ||
| >>> genome = bnp.Genome.from_file('example_data/small_sequence.fa') | ||
| >>> genome.get_locations(bnp.datatypes.LocationEntry(chromosome=['chr1', 'chr1', 'chr2'], position=[0, 10, 0])) | ||
| Genomic Locations on ['chr1', 'chr2', 'chr3']: | ||
| LocationEntry with 3 entries | ||
| chromosome position | ||
| chr1 0 | ||
| chr1 10 | ||
| chr2 0 | ||
| """ | ||
@@ -240,3 +368,3 @@ if has_numeric_chromosomes: | ||
| def read_sequence(self, filename: str = None) -> GenomicSequence: | ||
| def read_sequence(self, filename: Optional[str] = None) -> GenomicSequence: | ||
| """Read the genomic sequence from file. | ||
@@ -255,2 +383,11 @@ | ||
| Examples | ||
| -------- | ||
| >>> import bionumpy as bnp | ||
| >>> genome = bnp.Genome.from_file('example_data/small_sequence.fa') | ||
| >>> genome.read_sequence() | ||
| GenomicSequence over chromosomes: ['chr1', 'chr2', 'chr3'] | ||
| >>> genome = bnp.Genome.from_file('example_data/small.chrom.sizes') | ||
| >>> genome.read_sequence('example_data/small_sequence.fa') | ||
| GenomicSequence over chromosomes: ['chr1', 'chr2', 'chr3'] | ||
| """ | ||
@@ -279,2 +416,16 @@ | ||
| Examples | ||
| -------- | ||
| >>> import bionumpy as bnp | ||
| >>> genome = bnp.Genome.from_file('example_data/small_sequence.fa') | ||
| >>> genome.read_annotation('example_data/small.gtf') | ||
| GenomicAnnotation(genome_context=['chr1', 'chr2', 'chr3'], data=GTFEntry with 5 entries | ||
| chromosome source feature_type start stop score strand phase atributes | ||
| chr1 knownGene transcript 17369 17436 . - . gene_id "ENST0000061921 | ||
| chr1 knownGene exon 17369 17436 . - . gene_id "ENST0000061921 | ||
| chr1 knownGene transcript 29554 31097 . + . gene_id "ENST0000047335 | ||
| chr1 knownGene exon 29554 30039 . + . gene_id "ENST0000047335 | ||
| chr1 knownGene exon 30564 30667 . + . gene_id "ENST0000047335) | ||
| """ | ||
@@ -293,8 +444,15 @@ | ||
| def get_genome_context(self): | ||
| def get_genome_context(self) -> GenomeContext: | ||
| ''' | ||
| Get the genome context of the Genome | ||
| Returns | ||
| ------- | ||
| GenomeContext | ||
| ''' | ||
| return self._genome_context | ||
| @property | ||
| def size(self): | ||
| def size(self) -> int: | ||
| '''The size of the genome''' | ||
| return self._genome_context.size |
@@ -5,5 +5,6 @@ from abc import ABC, abstractmethod, abstractproperty, abstractclassmethod | ||
| import numpy as np | ||
| from typing import List, Iterable, Tuple, Dict | ||
| from typing import List, Iterable, Tuple, Dict, Any, Optional | ||
| from .coordinate_mapping import map_locations, find_indices | ||
| from .. import EncodedArray | ||
| from ..bnpdataclass import BNPDataClass, replace, bnpdataclass | ||
@@ -17,2 +18,3 @@ from .genomic_track import GenomicArray, GenomicArrayNode | ||
| from ..string_array import StringArray | ||
| from ..util.typing import EncodedArrayLike | ||
@@ -25,10 +27,11 @@ | ||
| @abstractproperty | ||
| def get_location(self, where='start'): | ||
| @property | ||
| @abstractmethod | ||
| def get_location(self, where='start') -> 'GenomicLocation': | ||
| return NotImplemented | ||
| def get_data_field(self, field_name: str): | ||
| def get_data_field(self, field_name: str) -> Any: | ||
| return NotImplemented | ||
| def set_strand(self, strand): | ||
| def set_strand(self, strand: str): | ||
| self._is_stranded = True | ||
@@ -41,14 +44,18 @@ self._strand = strand | ||
| @abstractproperty | ||
| @property | ||
| @abstractmethod | ||
| def chromosome(self): | ||
| return NotImplemented | ||
| @abstractproperty | ||
| @property | ||
| @abstractmethod | ||
| def position(self): | ||
| return NotImplemented | ||
| @abstractproperty | ||
| @property | ||
| @abstractmethod | ||
| def strand(self): | ||
| return NotImplemented | ||
| @property | ||
| @abstractmethod | ||
@@ -59,3 +66,4 @@ def is_stranded(self): | ||
| @classmethod | ||
| def from_fields(cls, genome_context: GenomeContextBase, chromosome: List[str], position: List[int], strand: List[str] = None) -> 'GenomicLocation': | ||
| def from_fields(cls, genome_context: GenomeContextBase, chromosome: List[str], position: List[int], | ||
| strand: List[str] = None) -> 'GenomicLocation': | ||
| """Create genomic location from a genome context and the needed fields (chromosome and position) | ||
@@ -117,3 +125,3 @@ | ||
| """ | ||
| assert all(hasattr(data, name) for name in (chromosome_name, position_name)) | ||
@@ -128,6 +136,7 @@ if is_stranded: | ||
| class GenomicLocationGlobal(GenomicLocation): | ||
| class GenomicLocationGlobal(GenomicLocation, ABC): | ||
| ''' Class for genomic locations that are kept entirely in memory''' | ||
| def __init__(self, locations: BNPDataClass, genome_context: GenomeContextBase, is_stranded: bool, field_dict: Dict[str, str]): | ||
| def __init__(self, locations: BNPDataClass, genome_context: GenomeContextBase, is_stranded: bool, | ||
| field_dict: Dict[str, str]): | ||
| self._locations = locations | ||
@@ -138,20 +147,59 @@ self._genome_context = genome_context | ||
| def __repr__(self): | ||
| return f'Genomic Locations on {self._genome_context}:\n{self._locations.astype(LocationEntry)}' | ||
| @property | ||
| def data(self): | ||
| def data(self) -> BNPDataClass: | ||
| ''' | ||
| Return the underlying data as a bnpdataclass | ||
| Returns | ||
| ------- | ||
| BNPDataClass | ||
| ''' | ||
| return self._locations | ||
| def __replace__(self, **kwargs): | ||
| ''' | ||
| Replace fields in the locations, used internally by bnp.replace | ||
| ''' | ||
| kwargs = {self._field_dict[kw]: value for kw, value in kwargs.items()} | ||
| return self.__class__(replace(self._locations, **kwargs), self._genome_context, self._is_stranded, self._field_dict) | ||
| return self.__class__(replace(self._locations, **kwargs), self._genome_context, self._is_stranded, | ||
| self._field_dict) | ||
| @property | ||
| def chromosome(self): | ||
| def chromosome(self) -> StringArray: | ||
| ''' | ||
| Return the chromosome of the locations | ||
| Returns | ||
| ------- | ||
| StringArray | ||
| ''' | ||
| return getattr(self._locations, self._field_dict['chromosome']) | ||
| @property | ||
| def position(self): | ||
| def position(self) -> np.ndarray: | ||
| """ | ||
| Return the position of the locations | ||
| Returns | ||
| ------- | ||
| np.ndarray | ||
| """ | ||
| return getattr(self._locations, self._field_dict['position']) | ||
| @property | ||
| def strand(self): | ||
| def strand(self) -> EncodedArrayLike: | ||
| """ | ||
| The strand of the locations | ||
| Returns | ||
| ------- | ||
| EncodedArrayLike | ||
| """ | ||
| if not self.is_stranded(): | ||
@@ -161,6 +209,14 @@ raise ValueError('Unstranded position has not strand') | ||
| def is_stranded(self): | ||
| def is_stranded(self) -> bool: | ||
| ''' | ||
| Return whether the locations are stranded | ||
| Returns | ||
| ------- | ||
| bool | ||
| ''' | ||
| return self._is_stranded | ||
| def get_windows(self, flank: int = None, window_size: int = None) -> 'GenomicIntervals': | ||
| def get_windows(self, flank: Optional[int] = None, window_size: Optional[int] = None) -> 'GenomicIntervals': | ||
| """Create windows around the locations. | ||
@@ -188,10 +244,10 @@ | ||
| assert window_size is not None | ||
| l_flank = window_size//2 | ||
| r_flank = window_size//2 + window_size % 2 | ||
| l_flank = window_size // 2 | ||
| r_flank = window_size // 2 + window_size % 2 | ||
| if self.is_stranded(): | ||
| intervals = StrandedInterval(self.chromosome, self.position-l_flank, | ||
| self.position+r_flank, self.strand) | ||
| intervals = StrandedInterval(self.chromosome, self.position - l_flank, | ||
| self.position + r_flank, self.strand) | ||
| else: | ||
| intervals = Interval(self.chromosome, self.position-l_flank, | ||
| self.position+r_flank) | ||
| intervals = Interval(self.chromosome, self.position - l_flank, | ||
| self.position + r_flank) | ||
| return GenomicIntervalsFull(intervals, self._genome_context, | ||
@@ -213,6 +269,31 @@ is_stranded=self.is_stranded()).clip() | ||
| def __getitem__(self, idx): | ||
| def __getitem__(self, idx: Any)-> 'GenomicLocation': | ||
| ''' | ||
| Get a subset of the locations | ||
| Parameters | ||
| ---------- | ||
| idx: numpy index like | ||
| Returns | ||
| ------- | ||
| GenomicLocation | ||
| ''' | ||
| return self.__class__(self._locations[idx], self._genome_context, self._is_stranded, self._field_dict) | ||
| def get_data_field(self, field_name: str): | ||
| def get_data_field(self, field_name: str) -> Any: | ||
| ''' | ||
| Get a field from the undelying data | ||
| Parameters | ||
| ---------- | ||
| field_name: str | ||
| Returns | ||
| ------- | ||
| Any | ||
| The field from the underlying data | ||
| ''' | ||
| return getattr(self._locations, field_name) | ||
@@ -238,3 +319,4 @@ | ||
| def __init__(self, data_node: Node, genome_context: GenomeContextBase, is_stranded=False, field_dict: Dict[str, str]=None): | ||
| def __init__(self, data_node: Node, genome_context: GenomeContextBase, is_stranded=False, | ||
| field_dict: Dict[str, str] = None): | ||
| if field_dict is None: | ||
@@ -252,21 +334,61 @@ field_dict = {name: name for name in ['chromosome', 'positions', 'strand']} | ||
| def is_stranded(self): | ||
| def is_stranded(self) -> bool: | ||
| return self._is_stranded | ||
| def sorted(self): | ||
| def sorted(self) -> 'GenomicLocationStreamed': | ||
| return NotImplemented | ||
| @property | ||
| def position(self): | ||
| def position(self) -> ComputationNode: | ||
| ''' | ||
| Return a computation node for the position of the locations. | ||
| Can be realized by calling bnp.compute on them, or can be aggregated across the stream with | ||
| numpy aggregations | ||
| ------- | ||
| ComputationNode | ||
| ''' | ||
| return self._position | ||
| @property | ||
| def chromosome(self): | ||
| def chromosome(self) -> ComputationNode: | ||
| ''' | ||
| Return a computation node for the chromosome of the locations. | ||
| Can be realized by calling bnp.compute on them, or can be aggregated across the stream with | ||
| numpy aggregations | ||
| Returns | ||
| ------- | ||
| ComputationNode | ||
| ''' | ||
| return self._chromosome | ||
| def get_data_field(self, field_name: str): | ||
| def get_data_field(self, field_name: str) -> ComputationNode: | ||
| ''' | ||
| Get a field from the undelying data returned as a computation node | ||
| Parameters | ||
| ---------- | ||
| field_name: str | ||
| Returns | ||
| ------- | ||
| ComputationNode | ||
| ''' | ||
| return ComputationNode(getattr, [self._data_node, field_name]) | ||
| @property | ||
| def strand(self): | ||
| def strand(self) -> ComputationNode: | ||
| ''' | ||
| Return a computation node for the strand of the locations. | ||
| Can be realized by calling bnp.compute on them, or can be aggregated across the stream with | ||
| numpy aggregations | ||
| Returns | ||
| ------- | ||
| ComputationNode | ||
| ''' | ||
| if not self.is_stranded(): | ||
@@ -277,5 +399,16 @@ raise ValueError('Strand not supported on unstranded intervals') | ||
| def __getitem__(self, item): | ||
| ''' | ||
| Get a subset of the locations, returned as a CompuationNode | ||
| Parameters | ||
| ---------- | ||
| item | ||
| Returns | ||
| ------- | ||
| ''' | ||
| return self.__class__(ComputationNode(lambda x, i: x[i], [self._data_node, item]), self._genome_context) | ||
| def get_windows(self, flank: int = None, window_size: int = None) -> 'GenomicIntervals': | ||
| def get_windows(self, flank: int = None, window_size: Optional[int] = None) -> 'GenomicIntervals': | ||
| """Create windows around the locations. | ||
@@ -288,2 +421,3 @@ | ||
| ---------- | ||
| window_size | ||
| flank : int | ||
@@ -304,13 +438,13 @@ Flank on either side of the location | ||
| assert window_size is not None | ||
| l_flank = window_size//2 | ||
| r_flank = window_size//2 + window_size % 2 | ||
| l_flank = window_size // 2 | ||
| r_flank = window_size // 2 + window_size % 2 | ||
| if self.is_stranded(): | ||
| intervals = ComputationNode(StrandedInterval, | ||
| intervals = ComputationNode(StrandedInterval, | ||
| [self.chromosome, | ||
| self.position-l_flank, | ||
| self.position+r_flank, self.strand]) | ||
| self.position - l_flank, | ||
| self.position + r_flank, self.strand]) | ||
| else: | ||
| intervals = ComputationNode( | ||
| Interval, [self.chromosome, self.position-l_flank, | ||
| self.position+r_flank]) | ||
| Interval, [self.chromosome, self.position - l_flank, | ||
| self.position + r_flank]) | ||
| return GenomicIntervalsStreamed(intervals, self._genome_context, | ||
@@ -329,15 +463,19 @@ is_stranded=self.is_stranded()).clip() | ||
| @abstractproperty | ||
| @property | ||
| @abstractmethod | ||
| def start(self): | ||
| return NotImplemented | ||
| @abstractproperty | ||
| @property | ||
| @abstractmethod | ||
| def stop(self): | ||
| return NotImplemented | ||
| @abstractproperty | ||
| @property | ||
| @abstractmethod | ||
| def chromosome(self): | ||
| return NotImplemented | ||
| @abstractproperty | ||
| @property | ||
| @abstractmethod | ||
| def strand(self): | ||
@@ -424,6 +562,22 @@ return NotImplemented | ||
| @classmethod | ||
| def from_fields(cls, genome_context: GenomeContextBase, chromosome, start, stop, strand=None): | ||
| def from_fields(cls, genome_context: GenomeContextBase, chromosome: StringArray, start: np.ndarray, stop: np.ndarray, strand: Optional[EncodedArray]=None) -> 'GenomicIntervals': | ||
| ''' | ||
| Create genomic intervals from fields | ||
| Parameters | ||
| ---------- | ||
| genome_context: GenomeContextBase | ||
| chromosome: StringArray | ||
| start: np.ndarray | ||
| stop: np.ndarray | ||
| strand: EncodedArray | ||
| Returns | ||
| ------- | ||
| GenomicIntervals | ||
| ''' | ||
| is_stranded = strand is not None | ||
| if is_stranded: | ||
| intervals = Bed6(chromosome, start, stop, ['.']*len(start), | ||
| intervals = Bed6(chromosome, start, stop, ['.'] * len(start), | ||
| np.zeros_like(start), strand) | ||
@@ -435,3 +589,3 @@ else: | ||
| @classmethod | ||
| def from_intervals(cls, intervals: Interval, genome_context: GenomeContextBase, is_stranded=False): | ||
| def from_intervals(cls, intervals: Interval, genome_context: GenomeContextBase, is_stranded: Optional[bool]=False) -> 'GenomicIntervals': | ||
| """Create genomic intervals from interval entries and genome info | ||
@@ -442,5 +596,11 @@ | ||
| intervals : Interval | ||
| chrom_sizes : Dict[str, int] | ||
| genome_context : GenomeContextBase | ||
| is_stranded : bool | ||
| Returns | ||
| ------- | ||
| 'GenomicIntervals' | ||
| """ | ||
| if isinstance(intervals, Interval): #TODO check is node | ||
| if isinstance(intervals, Interval): # TODO check is node | ||
| return GenomicIntervalsFull(genome_context.mask_data(intervals), genome_context, is_stranded) | ||
@@ -451,3 +611,4 @@ else: | ||
| @classmethod | ||
| def from_interval_stream(cls, interval_stream: Iterable[Interval], genome_context: GenomeContextBase, is_stranded=False): | ||
| def from_interval_stream(cls, interval_stream: Iterable[Interval], genome_context: GenomeContextBase, | ||
| is_stranded=False) -> 'GenomicIntervals': | ||
| """Create streamed genomic intervals from a stream of intervals and genome info | ||
@@ -458,5 +619,10 @@ | ||
| interval_stream : Iterable[Interval] | ||
| chrom_sizes : Dict[str, int] | ||
| genome_context : GenomeContextBase | ||
| is_stranded : bool | ||
| Returns | ||
| ------- | ||
| 'GenomicIntervals' | ||
| """ | ||
| interval_stream = genome_context.iter_chromosomes( | ||
@@ -481,6 +647,5 @@ interval_stream, StrandedInterval if is_stranded else Interval) | ||
| class GenomicIntervalsFull(GenomicIntervals): | ||
| ''' Class for holding a set of intervals in memory''' | ||
| class GenomicIntervalsFull(GenomicIntervals): | ||
| ''' Class for holding a set of intervals in memory''' | ||
| is_stream = False | ||
@@ -494,3 +659,4 @@ | ||
| @property | ||
| def data(self): | ||
| def data(self) -> BNPDataClass: | ||
| '''Return the underlying data as a bnpdataclass''' | ||
| return self._intervals | ||
@@ -500,7 +666,7 @@ | ||
| if func == np.concatenate: | ||
| return self.__class__(np.concatenate([obj._intervals for obj in args[0]]), self._genome_context, self._is_stranded) | ||
| return self.__class__(np.concatenate([obj._intervals for obj in args[0]]), self._genome_context, | ||
| self._is_stranded) | ||
| return NotImplemented | ||
| def __repr__(self): | ||
@@ -524,17 +690,28 @@ return f'Genomic Intervals on {self._genome_context}:\n{self._intervals.astype(Interval)}' | ||
| def map_locations(self, locations: LocationEntry): | ||
| ''' | ||
| Map locations to the intervals. The locations should be in the same coordinate system as the intervals | ||
| The new locations will be in the coordinate system of the intervals | ||
| Parameters | ||
| ---------- | ||
| locations: LocationEntry | ||
| Returns | ||
| ------- | ||
| LocationEntry | ||
| ''' | ||
| go = self._genome_context.global_offset.from_local_interval(self._intervals) | ||
| global_positions = self._genome_context.global_offset.from_local_coordinates(locations.chromosome, locations.position) | ||
| global_positions = self._genome_context.global_offset.from_local_coordinates(locations.chromosome, | ||
| locations.position) | ||
| location_indices, interval_indices = find_indices(global_positions, go) | ||
| new_entries = locations[location_indices] | ||
| names = self._intervals.name if hasattr(self._intervals, 'name') else StringArray(np.arange(len(self._intervals)).astype('S')) | ||
| names = self._intervals.name if hasattr(self._intervals, 'name') else StringArray( | ||
| np.arange(len(self._intervals)).astype('S')) | ||
| return replace(new_entries, chromosome=names[interval_indices], | ||
| position=new_entries.position - self.start[interval_indices]) | ||
| return map_locations(replace(locations, position=global_positions), go) | ||
| def sorted(self) -> 'GenomicIntervals': | ||
@@ -571,9 +748,9 @@ """Return the intervals sorted according to `genome_context` | ||
| else: | ||
| location = np.where(self.strand==('+' if where=='start' else '-'), | ||
| location = np.where(self.strand == ('+' if where == 'start' else '-'), | ||
| self.start, | ||
| self.stop-1) | ||
| self.stop - 1) | ||
| data = replace(self._intervals, start=location) | ||
| else: | ||
| assert where == 'center' | ||
| location = (self.start+self.stop)//2 | ||
| location = (self.start + self.stop) // 2 | ||
| data = replace(self._intervals, start=location) | ||
@@ -617,3 +794,3 @@ return GenomicLocationGlobal.from_data( | ||
| chrom_sizes = self._genome_context.global_offset.get_size(self._intervals.chromosome) | ||
| return self.from_intervals(extend_to_size(self._intervals, size, chrom_sizes), | ||
| return self.from_intervals(extend_to_size(self._intervals, size, chrom_sizes), | ||
| self._genome_context) | ||
@@ -806,3 +983,2 @@ | ||
| def get_mask(self) -> GenomicArray: | ||
@@ -813,3 +989,4 @@ return GenomicArrayNode(ComputationNode(get_boolean_mask, [self._intervals_node, self._chrom_size_node]), | ||
| def clip(self) -> 'GenomicIntervals': | ||
| return self.__class__(ComputationNode(clip, [self._intervals_node, self._chrom_size_node]), self._genome_context) | ||
| return self.__class__(ComputationNode(clip, [self._intervals_node, self._chrom_size_node]), | ||
| self._genome_context) | ||
@@ -816,0 +993,0 @@ def __replace__(self, **kwargs): |
@@ -9,3 +9,3 @@ import numpy as np | ||
| from npstructures import RunLengthRaggedArray | ||
| from typing import List, Union, Iterable, Tuple, Dict, Any | ||
| from typing import List, Union, Iterable, Tuple, Dict, Any, Callable | ||
@@ -19,11 +19,11 @@ | ||
| @property | ||
| def genome_context(self): | ||
| def genome_context(self) -> GenomeContextBase: | ||
| return self._genome_context | ||
| @abstractmethod | ||
| def __array_ufunc__(self, ufunc: callable, method: str, *inputs, **kwargs) -> 'GenomicArray': | ||
| def __array_ufunc__(self, ufunc: Callable, method: str, *inputs, **kwargs) -> 'GenomicArray': | ||
| return NotImplemented | ||
| @abstractmethod | ||
| def __array_function__(self, func: callable, types: List, args: List, kwargs: Dict) -> Any: | ||
| def __array_function__(self, func: Callable, types: List, args: List, kwargs: Dict) -> Any: | ||
| return NotImplemented | ||
@@ -39,4 +39,5 @@ | ||
| @classmethod | ||
| def from_global_data(cls, global_pileup: GenomicRunLengthArray, genome_context: GenomeContextBase) -> 'GenomicArray': | ||
| """Create the genomic array from data represened on a flat/concatenated genome. | ||
| def from_global_data(cls, global_pileup: GenomicRunLengthArray, | ||
| genome_context: GenomeContextBase) -> 'GenomicArray': | ||
| """Create the genomic array from data represented on a flat/concatenated genome. | ||
@@ -54,13 +55,5 @@ Parameters | ||
| """ | ||
| return GenomicArrayGlobal(global_pileup, genome_context) | ||
| def _get_intervals_from_data(self, name, data): | ||
| if data.dtype == bool: | ||
| return Interval([name]*len(data.starts), | ||
| data.starts, data.ends)[data.values] | ||
| else: | ||
| return BedGraph([name]*len(data.starts), | ||
| data.starts, data.ends, data.values) | ||
| @classmethod | ||
@@ -81,3 +74,3 @@ def from_bedgraph(cls, bedgraph: BedGraph, genome_context: GenomeContextBase) -> 'GenomicData': | ||
| """ | ||
| if isinstance(bedgraph, BedGraph): | ||
@@ -92,6 +85,15 @@ go = genome_context.global_offset | ||
| return GenomicArrayNode(ComputationNode(GenomicRunLengthArray.from_bedgraph, | ||
| [interval_stream, StreamNode(iter(genome_context.chrom_sizes.values()))]), | ||
| [interval_stream, | ||
| StreamNode(iter(genome_context.chrom_sizes.values()))]), | ||
| genome_context) | ||
| def _get_intervals_from_data(self, name, data): | ||
| if data.dtype == bool: | ||
| return Interval([name] * len(data.starts), | ||
| data.starts, data.ends)[data.values] | ||
| else: | ||
| return BedGraph([name] * len(data.starts), | ||
| data.starts, data.ends, data.values) | ||
| class GenomicArrayGlobal(GenomicArray, np.lib.mixins.NDArrayOperatorsMixin): | ||
@@ -101,2 +103,3 @@ ''' | ||
| ''' | ||
| def __init__(self, global_track: GenomicRunLengthArray, genome_context: GenomeContextBase): | ||
@@ -108,3 +111,3 @@ assert isinstance(global_track, GenomicRunLengthArray), global_track | ||
| @property | ||
| def dtype(self): | ||
| def dtype(self) -> np.dtype: | ||
| return self._global_track.dtype | ||
@@ -117,6 +120,9 @@ | ||
| def sum(self) -> float: | ||
| def sum(self, axis=None) -> float: | ||
| '''Sum the data in the array''' | ||
| assert axis is None | ||
| return self._global_track.sum(axis=None) | ||
| def extract_chromsome(self, chromosome: Union[str, List[str]]) -> Union[GenomicRunLengthArray, RunLengthRaggedArray]: | ||
| def extract_chromsome(self, chromosome: Union[str, List[str]]) -> Union[ | ||
| GenomicRunLengthArray, RunLengthRaggedArray]: | ||
| """Get the data on one or more chromosomes | ||
@@ -147,2 +153,10 @@ Parameters | ||
| def to_dict(self) -> Dict[str, GenomicRunLengthArray]: | ||
| """ | ||
| Convert the genomic array to a dict of arrays with chromosomes as keys | ||
| Returns | ||
| ------- | ||
| Dict[str, GenomicRunLengthArray] | ||
| """ | ||
| go = self._genome_context.global_offset | ||
@@ -152,6 +166,24 @@ names = go.names() | ||
| sizes = go.get_size(names) | ||
| return {name: self._global_track[offset:offset+size].to_array() | ||
| return {name: self._global_track[offset:offset + size].to_array() | ||
| for name, offset, size in zip(names, offsets, sizes)} | ||
| def __array_ufunc__(self, ufunc: callable, method: str, *inputs, **kwargs): | ||
| def __array_ufunc__(self, ufunc: np.ufunc, method: str, *inputs, **kwargs): | ||
| ''' | ||
| Handle numpy ufuncs on the genomic array | ||
| Parameters | ||
| ---------- | ||
| ufunc: np.ufunc | ||
| method: str | ||
| How to call the ufunc | ||
| inputs: List | ||
| Args for the ufunc | ||
| kwargs: Dict | ||
| Additional arguments | ||
| Returns | ||
| ------- | ||
| GenomicArrayGlobal | ||
| ''' | ||
| inputs = [(i._global_track if isinstance(i, GenomicArrayGlobal) else i) for i in inputs] | ||
@@ -161,3 +193,3 @@ r = self._global_track.__array_ufunc__(ufunc, method, *inputs, **kwargs) | ||
| def __array_function__(self, func: callable, types: List, args: List, kwargs: Dict): | ||
| def __array_function__(self, func: Callable, types: List, args: List, kwargs: Dict): | ||
| """Handles any numpy array functions called on a raggedarray | ||
@@ -187,7 +219,7 @@ | ||
| """ | ||
| go = self._genome_context._global_offset | ||
| go = self._genome_context.global_offset | ||
| names = go.names() | ||
| starts = go.get_offset(names) | ||
| stops = starts+go.get_size(names) | ||
| stops = starts + go.get_size(names) | ||
| intervals_list = [] | ||
@@ -201,3 +233,4 @@ for name, start, stop in zip(names, starts, stops): | ||
| def extract_intervals(self, intervals: Union[Interval, 'GenomicIntervals'], stranded: bool = False) -> RunLengthRaggedArray: | ||
| def extract_intervals(self, intervals: Union[Interval, 'GenomicIntervals'], | ||
| stranded: bool = False) -> RunLengthRaggedArray: | ||
| """Extract the data contained in a set of intervals | ||
@@ -234,3 +267,5 @@ | ||
| """ | ||
| global_intervals = self._genome_context.global_offset.from_local_coordinates(locations.chromosome, locations.position) | ||
| assert stranded is None, 'Stranded not implemented for locations' | ||
| global_intervals = self._genome_context.global_offset.from_local_coordinates(locations.chromosome, | ||
| locations.position) | ||
| return self._global_track[global_intervals] | ||
@@ -243,3 +278,2 @@ # if not stranded: | ||
| @classmethod | ||
@@ -264,2 +298,15 @@ def from_dict(cls, d: Dict[str, GenomicRunLengthArray], genome_context: GenomeContextBase) -> 'GenomicData': | ||
| genome_context: GenomeContextBase) -> 'GenomicData': | ||
| ''' | ||
| Create a genomic array from a stream of data | ||
| Parameters | ||
| ---------- | ||
| stream: Iterable[Tuple[str, GenomicRunLengthArray]] | ||
| genome_context: GenomeContextBase | ||
| Returns | ||
| ------- | ||
| GenomicData | ||
| ''' | ||
| return cls.from_dict(dict(stream)) | ||
@@ -284,3 +331,3 @@ | ||
| def __array_ufunc__(self, ufunc: callable, method: str, *inputs, **kwargs): | ||
| def __array_ufunc__(self, ufunc: np.ufunc, method: str, *inputs, **kwargs): | ||
| args = [gtn._run_length_node if isinstance(gtn, GenomicArrayNode) else gtn for gtn in inputs] | ||
@@ -296,3 +343,3 @@ return self.__class__(ufunc(*args, **kwargs), self._genome_context) | ||
| """ | ||
| return ComputationNode(self._get_intervals_from_data, [self._chrom_name_node, self._run_length_node]) | ||
@@ -321,4 +368,5 @@ | ||
| """ | ||
| def stranded_func(ra: GenomicRunLengthArray, start: int, stop: int, strand: str): | ||
| assert np.all(stop <= ra.size), (np.max(stop), ra.size) | ||
@@ -334,3 +382,4 @@ rle = ra[start:stop] | ||
| assert self.genome_context.is_compatible(intervals.genome_context), (self.genome_context, intervals.genome_context) | ||
| assert self.genome_context.is_compatible(intervals.genome_context), ( | ||
| self.genome_context, intervals.genome_context) | ||
| intervals = intervals.as_stream() | ||
@@ -345,5 +394,23 @@ if stranded: | ||
| def extract_chromsome(self, chromosome: Union[str, List[str]]) -> 'GenomicData': | ||
| ''' | ||
| Extract the data for a chromosome | ||
| Unimplemented | ||
| ''' | ||
| assert False | ||
| def from_stream(cls, stream: Iterable[Tuple[str, GenomicRunLengthArray]], genome_context: GenomeContextBase) -> 'GenomicData': | ||
| def from_stream(cls, stream: Iterable[Tuple[str, GenomicRunLengthArray]], | ||
| genome_context: GenomeContextBase) -> 'GenomicData': | ||
| ''' | ||
| Create a genomic array from a stream of data | ||
| Parameters | ||
| ---------- | ||
| stream: Iterable[Tuple[str, GenomicRunLengthArray]] | ||
| genome_context: GenomeContextBase | ||
| Returns | ||
| ------- | ||
| GenomicData | ||
| ''' | ||
| stream_node = StreamNode((array for _, array in stream)) | ||
@@ -369,5 +436,7 @@ return cls(stream_node, genome_context) | ||
| def to_dict(self): | ||
| '''Unimplemented''' | ||
| assert False | ||
| def sum(self, axis=None) -> float: | ||
| '''Sum the data in the array''' | ||
| return np.sum(self._run_length_node) | ||
@@ -374,0 +443,0 @@ |
| import dataclasses | ||
| from typing import Tuple | ||
| import numpy as np | ||
@@ -55,2 +57,7 @@ from ..encodings.string_encodings import StringEncoding | ||
| def to_local_coordinates(self, global_offset) -> Tuple[EncodedArray, np.ndarray]: | ||
| chromosome_idxs = np.searchsorted(self._offset, global_offset, side="right") - 1 | ||
| local_offset = global_offset - self._offset[chromosome_idxs] | ||
| return EncodedArray(chromosome_idxs, self._old_encoding), local_offset | ||
| def from_local_interval(self, interval, do_clip=False): | ||
@@ -57,0 +64,0 @@ start_offsets, stop_offsets = self.start_ends_from_intervals(interval, do_clip) |
+84
-35
| from functools import lru_cache | ||
| from itertools import accumulate, repeat, takewhile, chain | ||
| from typing import Union, Tuple, List, Any | ||
| import numpy as np | ||
@@ -17,3 +19,8 @@ from npstructures.raggedshape import RaggedView, RaggedView2 | ||
| class BamBufferExtractor: | ||
| def __init__(self, data, starts, ends, header_data, is_contigous=True): | ||
| ''' | ||
| Class to handle the extraction of data from a buffer from a BAM file. | ||
| ''' | ||
| def __init__(self, data: np.ndarray, starts: np.ndarray, ends: np.ndarray, header_data: Any, | ||
| is_contigous: bool = True): | ||
| self._data = data | ||
@@ -49,3 +56,3 @@ self._new_lines = starts | ||
| @property | ||
| def data(self): | ||
| def data(self) -> np.ndarray: | ||
| if not self._is_contigous: | ||
@@ -56,6 +63,7 @@ self._make_contigous() | ||
| def __getitem__(self, item): | ||
| return self.__class__(self._data, self._new_lines[item], self._ends[item], self._header_data, is_contigous=False) | ||
| return self.__class__(self._data, self._new_lines[item], self._ends[item], self._header_data, | ||
| is_contigous=False) | ||
| def _get_ints(self, offsets, n_bytes, dtype): | ||
| tmp = self._data[(self._new_lines+offsets)[:, None] + np.arange(n_bytes)].ravel() | ||
| tmp = self._data[(self._new_lines + offsets)[:, None] + np.arange(n_bytes)].ravel() | ||
| ints = (tmp).view(dtype).ravel() | ||
@@ -114,3 +122,3 @@ assert len(ints) == len(self._new_lines), (len(ints), offsets, len(self._new_lines), n_bytes, dtype) | ||
| def _get_read_name(self): | ||
| read_names = ragged_slice(self._data, self._read_name_start, self._cigar_start-1) | ||
| read_names = ragged_slice(self._data, self._read_name_start, self._cigar_start - 1) | ||
| read_names = EncodedRaggedArray( | ||
@@ -141,13 +149,30 @@ EncodedArray(read_names.ravel(), BaseEncoding), read_names.shape) | ||
| def get_field_by_number(self, i): | ||
| def get_field_by_number(self, i: int) -> Union[np.ndarray, EncodedArray, EncodedRaggedArray]: | ||
| ''' | ||
| Get the data from the field with the given number. | ||
| Parameters | ||
| ---------- | ||
| i: int | ||
| The field number. | ||
| Returns | ||
| ------- | ||
| Union[np.ndarray, EncodedArray, EncodedRaggedArray] | ||
| The data from the field. | ||
| ''' | ||
| return self._functions[i]() | ||
| @property | ||
| def size(self): | ||
| def size(self) -> int: | ||
| if self._is_contigous: | ||
| return self._data.size | ||
| else: | ||
| return (self._ends-self._new_lines).sum() | ||
| return (self._ends - self._new_lines).sum() | ||
| class BamHeader: | ||
| """ | ||
| Class to handle the header of a BAM file. | ||
| """ | ||
| def __init__(self, file_object): | ||
@@ -158,3 +183,3 @@ self._file_object = file_object | ||
| def read(self, n_bytes): | ||
| def read(self, n_bytes: int) -> bytes: | ||
| bytes = self._file_object.read(n_bytes) | ||
@@ -172,3 +197,12 @@ self._header_data.append(bytes) | ||
| def read_header(self): | ||
| def read_header(self) -> List[Tuple[str, int]]: | ||
| """ | ||
| Read the header of the BAM file. Returns a list of tuples with the reference names and lengths. | ||
| Returns | ||
| ------- | ||
| List[Tuple[str, int]] | ||
| The reference names and lengths. | ||
| """ | ||
| magic = self.read(4) | ||
@@ -193,7 +227,15 @@ assert magic == b"BAM\1", magic | ||
| def bytes(self): | ||
| def bytes(self) -> bytes: | ||
| """ | ||
| Get the header as bytes. | ||
| Returns | ||
| ------- | ||
| bytes | ||
| The header as bytes. | ||
| """ | ||
| return b''.join(self._header_data) | ||
| class BamBuffer(FileBuffer): | ||
@@ -205,2 +247,3 @@ ''' | ||
| supports_modified_write = False | ||
| def __init__(self, buffer_extractor, header_data=None): | ||
@@ -214,15 +257,15 @@ self._buffer_extractor = buffer_extractor | ||
| def get_field_range_as_text(self): | ||
| def get_field_range_as_text(self, *args): | ||
| raise Exception('Cannot write BAM file with set values') | ||
| @property | ||
| def size(self): | ||
| def size(self) -> int: | ||
| return self._buffer_extractor.size | ||
| @property | ||
| def data(self): | ||
| def data(self) -> np.ndarray: | ||
| return self._buffer_extractor.data | ||
| @property | ||
| def n_lines(self): | ||
| def n_lines(self) -> int: | ||
| return len(self._buffer_extractor) | ||
@@ -244,3 +287,3 @@ | ||
| @classmethod | ||
| def make_header(cls, data): | ||
| def make_header(cls, data: BamEntry) -> bytes: | ||
| header = data.get_context("header") | ||
@@ -250,10 +293,4 @@ return header.bytes() | ||
| @classmethod | ||
| def read_header(cls, file_object): | ||
| def read_header(cls, file_object) -> BamHeader: | ||
| return BamHeader(file_object) | ||
| magic = file_object.read(4) | ||
| assert magic == b"BAM\1", magic | ||
| header_length = cls._read_int(file_object) | ||
| file_object.read(header_length) | ||
| n_ref = cls._read_int(file_object) | ||
| return cls._handle_refs(n_ref, file_object) | ||
@@ -273,4 +310,4 @@ @classmethod | ||
| chunk = bytes(chunk) | ||
| new_start = lambda start, _: start + int.from_bytes(chunk[start:start+4], byteorder="little") + 4 | ||
| _starts = accumulate(repeat(0), new_start)# chain([0], accumulate(repeat(0), new_start)) | ||
| new_start = lambda start, _: start + int.from_bytes(chunk[start:start + 4], byteorder="little") + 4 | ||
| _starts = accumulate(repeat(0), new_start) # chain([0], accumulate(repeat(0), new_start)) | ||
| starts = list(takewhile(lambda start: start <= len(chunk), _starts)) | ||
@@ -284,15 +321,24 @@ return starts | ||
| @classmethod | ||
| def from_raw_buffer(cls, chunk, header_data): | ||
| def from_raw_buffer(cls, chunk: np.ndarray, header_data: BamHeader) -> "BamBuffer": | ||
| chunk = np.asarray(chunk) | ||
| starts = np.asanyarray(cls._find_starts(chunk)) | ||
| buffer_extractor = BamBufferExtractor(chunk[:starts[-1]], starts[:-1],starts[1:], header_data.info) | ||
| buffer_extractor = BamBufferExtractor(chunk[:starts[-1]], starts[:-1], starts[1:], header_data.info) | ||
| return cls(buffer_extractor, header_data) | ||
| def get_data(self): | ||
| def get_data(self)-> BamEntry: | ||
| """ | ||
| Get the data from the buffer. | ||
| Returns | ||
| ------- | ||
| BamEntry | ||
| The data from the buffer. | ||
| """ | ||
| return BamEntry(*(self.get_field_by_number(i) for i in range(9))) | ||
| def get_field_by_number(self, i, dtype=None): | ||
| def get_field_by_number(self, i, dtype=None)-> Union[np.ndarray, EncodedArray, EncodedRaggedArray]: | ||
| return self._buffer_extractor.get_field_by_number(i) | ||
| def count_entries(self): | ||
| def count_entries(self) -> int: | ||
| return len(self._buffer_extractor) | ||
@@ -308,7 +354,10 @@ | ||
| lambda: self._buffer_extractor.get_field_by_number(3), | ||
| lambda: self._buffer_extractor.get_field_by_number(3)+count_reference_length(*(self._buffer_extractor.get_field_by_number(i) for i in (5, 6))), | ||
| lambda: self._buffer_extractor.get_field_by_number(3) + count_reference_length( | ||
| *(self._buffer_extractor.get_field_by_number(i) for i in (5, 6))), | ||
| lambda: self._buffer_extractor.get_field_by_number(1), | ||
| lambda: self._buffer_extractor.get_field_by_number(4), | ||
| lambda: EncodedArray(np.where(self._buffer_extractor.get_field_by_number(2) & np.uint16(16), ord("-"), ord("+"))[:, None], BaseEncoding) | ||
| ] | ||
| lambda: EncodedArray( | ||
| np.where(self._buffer_extractor.get_field_by_number(2) & np.uint16(16), ord("-"), ord("+"))[:, None], | ||
| BaseEncoding) | ||
| ] | ||
| return funcs[i]() | ||
@@ -328,5 +377,5 @@ | ||
| start, | ||
| start+length, | ||
| start + length, | ||
| read_names, | ||
| mapq, | ||
| strand) |
@@ -51,14 +51,2 @@ import io | ||
| def __init___(self, data: EncodedArray, new_lines: np.ndarray = None, delimiters: np.ndarray = None, | ||
| header_data=None, buffer_extractor=None): | ||
| super().__init__(data, new_lines) | ||
| if delimiters is None: | ||
| delimiters = np.concatenate( | ||
| ([-1], np.flatnonzero(self._data == self.DELIMITER), self._new_lines) | ||
| ) | ||
| delimiters.sort(kind="mergesort") | ||
| self._delimiters = delimiters | ||
| self._header_data = header_data | ||
| self.__buffer_extractor = buffer_extractor | ||
| @classmethod | ||
@@ -92,4 +80,4 @@ def from_raw_buffer(cls, chunk: np.ndarray, header_data=None) -> "DelimitedBuffer": | ||
| n_fields = cls._get_n_fields(entry_ends) | ||
| size = delimiters[entry_ends[-1]]+1 | ||
| delimiters = np.insert(delimiters[:entry_ends[-1]+1], 0, -1) | ||
| size = delimiters[entry_ends[-1]] + 1 | ||
| delimiters = np.insert(delimiters[:entry_ends[-1] + 1], 0, -1) | ||
| buffer_extractor = cls._get_buffer_extractor( | ||
@@ -100,30 +88,2 @@ chunk[:size], delimiters, n_fields) | ||
| @classmethod | ||
| def _get_n_fields(cls, entry_ends): | ||
| return entry_ends[0] + 1 | ||
| @property | ||
| def __buffer_extractor(self): | ||
| if self.__buffer_extractor is None: | ||
| self.__buffer_extractor = self._get_buffer_extractor() | ||
| return self.__buffer_extractor | ||
| @classmethod | ||
| def _get_buffer_extractor(cls, data, delimiters, n_cols) -> TextThroughputExtractor: | ||
| starts = delimiters[:-1].reshape(-1, n_cols) + 1 | ||
| ends = delimiters[1:].reshape(-1, n_cols) | ||
| ends = cls._modify_for_carriage_return(ends, data) | ||
| entry_starts = starts[:, 0] | ||
| entry_ends = ends[:, -1] + 1 | ||
| return TextThroughputExtractor(data, starts, field_ends=ends, entry_starts=entry_starts, entry_ends=entry_ends) | ||
| @classmethod | ||
| def _modify_for_carriage_return(cls, ends, data): | ||
| if data.size==0 or ends[0, -1]==0: | ||
| return ends | ||
| if data[ends[0, -1]-1] == '\r': | ||
| ends = ends.copy() | ||
| ends[:, -1] -= data[ends[:, -1]-1] == '\r' | ||
| return ends | ||
| def __getitem__(self, idx): | ||
@@ -165,9 +125,9 @@ return self.__class__(self._buffer_extractor[idx], self._header_data) | ||
| @classmethod | ||
| def join_fields(cls, fields_list: List[EncodedRaggedArray]): | ||
| def join_fields(cls, fields_list: List[EncodedRaggedArray]) -> EncodedRaggedArray: | ||
| return join_columns(fields_list, cls.DELIMITER).ravel() | ||
| def get_field_range_as_text(self, *args, **kwargs): | ||
| def get_field_range_as_text(self, *args, **kwargs) -> EncodedRaggedArray: | ||
| return self.get_column_range_as_text(*args, **kwargs) | ||
| def get_column_range_as_text(self, col_start, col_end, keep_sep=False): | ||
| def get_column_range_as_text(self, col_start, col_end, keep_sep=False) -> EncodedRaggedArray: | ||
| """Get multiple columns as text | ||
@@ -183,2 +143,7 @@ | ||
| keep seperator at end | ||
| Returns | ||
| ------- | ||
| EncodedRaggedArray | ||
| EncodedRaggedArray of the columns | ||
| """ | ||
@@ -189,24 +154,2 @@ self.validate_if_not() | ||
| @staticmethod | ||
| def _move_ints_to_digit_array(ints, n_digits): | ||
| powers = np.uint8(10) ** np.arange(n_digits)[::-1] | ||
| ret = (ints[..., None] // powers) % 10 | ||
| return EncodedArray(ret, DigitEncoding) | ||
| def _validate(self): | ||
| chunk = self._data | ||
| delimiters = self._delimiters[1:] | ||
| n_delimiters_per_line = ( | ||
| next(i for i, d in enumerate(delimiters) if chunk[d] == NEWLINE) + 1 | ||
| ) | ||
| self._n_cols = n_delimiters_per_line | ||
| should_be_new_lines = chunk[delimiters[n_delimiters_per_line - 1::n_delimiters_per_line]] | ||
| if delimiters.size % n_delimiters_per_line != 0 or np.any(should_be_new_lines != "\n"): | ||
| offending_line = np.flatnonzero(should_be_new_lines != "\n")[0] | ||
| lines = split(self._data, '\n') | ||
| raise FormatException( | ||
| f"Irregular number of delimiters per line ({delimiters.size}, {n_delimiters_per_line}): {lines}", | ||
| line_number=offending_line) | ||
| self._validated = True | ||
| @classmethod | ||
@@ -226,3 +169,3 @@ def from_data(cls, data: BNPDataClass) -> "DelimitedBuffer": | ||
| @classmethod | ||
| def make_header(cls, data: bnpdataclass): | ||
| def make_header(cls, data: BNPDataClass): | ||
| header = "" | ||
@@ -255,2 +198,14 @@ if data.has_context("header"): | ||
| @property | ||
| def actual_dataclass(self): | ||
| return self.dataclass | ||
| def get_field_by_number(self, field_nr: int, field_type: type = object): | ||
| """Get a field by number""" | ||
| self.validate_if_not() | ||
| if field_type is None: | ||
| field_type = dataclasses.fields(self.actual_dataclass)[field_nr] | ||
| return self._get_field_by_number( | ||
| field_nr, field_type) | ||
| def _get_field_by_number(self, col_number, field_type): | ||
@@ -270,3 +225,3 @@ | ||
| col_number, | ||
| keep_sep=(field_type == List[int] or field_type==List[float])) | ||
| keep_sep=(field_type == List[int] or field_type == List[float])) | ||
| text = subresult | ||
@@ -286,18 +241,12 @@ assert isinstance(text, (EncodedRaggedArray, EncodedArray)), text | ||
| raise FormatException(e.args[0], line_number=row_number) | ||
| # if is_subclass_or_instance(field_type, Encoding): | ||
| # parsed = as_encoded_array(subresult, field_type) | ||
| return parsed | ||
| def count_entries(self) -> int: | ||
| """Count the number of entries in the buffer""" | ||
| return len(self._buffer_extractor) | ||
| @property | ||
| def actual_dataclass(self): | ||
| return self.dataclass | ||
| def n_lines(self) -> int: | ||
| return len(self._buffer_extractor) | ||
| def get_field_by_number(self, field_nr: int, field_type: type = object): | ||
| self.validate_if_not() | ||
| if field_type is None: | ||
| field_type = dataclasses.fields(self.actual_dataclass)[field_nr] | ||
| return self._get_field_by_number( | ||
| field_nr, field_type) | ||
| def _parse_split_floats(self, text, sep=','): | ||
@@ -322,3 +271,4 @@ function = str_to_float | ||
| mask = int_strings.lengths != 0 | ||
| return RaggedArray(function(int_strings[mask]), (text == sep).sum(axis=-1), safe_mode=False) # TODO: is it necessary with unsafe mode here | ||
| return RaggedArray(function(int_strings[mask]), (text == sep).sum(axis=-1), | ||
| safe_mode=False) # TODO: is it necessary with unsafe mode here | ||
| return RaggedArray(function(int_strings), (text == sep).sum(axis=-1)) | ||
@@ -329,16 +279,55 @@ else: | ||
| def count_entries(self) -> int: | ||
| """Count the number of entries in the buffer""" | ||
| return len(self._buffer_extractor) | ||
| @classmethod | ||
| def _get_n_fields(cls, entry_ends): | ||
| return entry_ends[0] + 1 | ||
| @property | ||
| def n_lines(self): | ||
| return len(self._buffer_extractor) | ||
| def __buffer_extractor(self): | ||
| if self.__buffer_extractor is None: | ||
| self.__buffer_extractor = self._get_buffer_extractor() | ||
| return self.__buffer_extractor | ||
| @classmethod | ||
| def _get_buffer_extractor(cls, data, delimiters, n_cols) -> TextThroughputExtractor: | ||
| starts = delimiters[:-1].reshape(-1, n_cols) + 1 | ||
| ends = delimiters[1:].reshape(-1, n_cols) | ||
| ends = cls._modify_for_carriage_return(ends, data) | ||
| entry_starts = starts[:, 0] | ||
| entry_ends = ends[:, -1] + 1 | ||
| return TextThroughputExtractor(data, starts, field_ends=ends, entry_starts=entry_starts, entry_ends=entry_ends) | ||
| @classmethod | ||
| def _modify_for_carriage_return(cls, ends, data): | ||
| if data.size == 0 or ends[0, -1] == 0: | ||
| return ends | ||
| if data[ends[0, -1] - 1] == '\r': | ||
| ends = ends.copy() | ||
| ends[:, -1] -= data[ends[:, -1] - 1] == '\r' | ||
| return ends | ||
| @staticmethod | ||
| def _move_ints_to_digit_array(ints, n_digits): | ||
| powers = np.uint8(10) ** np.arange(n_digits)[::-1] | ||
| ret = (ints[..., None] // powers) % 10 | ||
| return EncodedArray(ret, DigitEncoding) | ||
| def _validate(self): | ||
| chunk = self._data | ||
| delimiters = self._delimiters[1:] | ||
| n_delimiters_per_line = ( | ||
| next(i for i, d in enumerate(delimiters) if chunk[d] == NEWLINE) + 1 | ||
| ) | ||
| self._n_cols = n_delimiters_per_line | ||
| should_be_new_lines = chunk[delimiters[n_delimiters_per_line - 1::n_delimiters_per_line]] | ||
| if delimiters.size % n_delimiters_per_line != 0 or np.any(should_be_new_lines != "\n"): | ||
| offending_line = np.flatnonzero(should_be_new_lines != "\n")[0] | ||
| lines = split(self._data, '\n') | ||
| raise FormatException( | ||
| f"Irregular number of delimiters per line ({delimiters.size}, {n_delimiters_per_line}): {lines}", | ||
| line_number=offending_line) | ||
| self._validated = True | ||
| class GfaSequenceBuffer(DelimitedBuffer): | ||
| dataclass = SequenceEntry | ||
| # SKIP_LAZY = True | ||
| def get_data(self): | ||
@@ -420,3 +409,3 @@ ids = self.get_text(1, fixed_length=False) | ||
| assert [f.name for f in dataclasses.fields(tmp)] == columns, ( | ||
| columns, [f.name for f in dataclasses.fields(tmp)]) | ||
| columns, [f.name for f in dataclasses.fields(tmp)]) | ||
@@ -459,38 +448,2 @@ class NewClass(cls): | ||
| # def set_fields_from_header(self, columns: List[str]): | ||
| # if not has_header: | ||
| # return None | ||
| # fields = dataclasses.fields(self.dataclass) | ||
| # ordered_fields = [next(field for field in fields if field.name == col) for col in columns] | ||
| # # self._permuted_data_class = dataclasses.make_dataclass('TmpDataclass', ordered_fields) | ||
| # self.fields = ordered_fields | ||
| # assert np.array_equal(columns, [field.name for field in self.fields]) | ||
| # | ||
| # def get_field_by_number(self, field_nr: int, field_type: type=object): | ||
| # # if self.fields is None: | ||
| # return super().get_field_by_number(field_nr, field_type) | ||
| # # col_id, t = next((i, field.type) for i, field in enumerate(dataclasses.fields(self.dataclass)) if field.name == self.fields[field_nr].name) | ||
| # #return super().get_field_by_number(col_id, t) | ||
| # # fields = self.fields if self.fields is not None else dataclasses.fields(self.dataclass) | ||
| # | ||
| # def get_data(self) -> _dataclass: | ||
| # """Parse the data in the buffer according to the fields in _dataclass | ||
| # | ||
| # Returns | ||
| # ------- | ||
| # _dataclass | ||
| # Dataclass with parsed data | ||
| # | ||
| # """ | ||
| # self.validate_if_not() | ||
| # columns = {} | ||
| # fields = self.fields if self.fields is not None else dataclasses.fields(self.dataclass) | ||
| # for col_number, field in enumerate(fields): | ||
| # col = self._get_field_by_number(col_number, field.type) | ||
| # columns[field.name] = col | ||
| # n_entries = len(next(col for col in columns if col is not None)) | ||
| # columns = {c: value if c is not None else np.empty((n_entries, 0)) | ||
| # for c, value in columns.items()} | ||
| # return self.dataclass(**columns) | ||
| DatatypeBuffer.__name__ = _dataclass.__name__ + "Buffer" | ||
@@ -497,0 +450,0 @@ DatatypeBuffer.__qualname__ = _dataclass.__qualname__ + "Buffer" |
@@ -23,6 +23,7 @@ import numpy as np | ||
| def str_matrix_func(column): | ||
| n_rows, n_cols = column.shape | ||
| a = column.as_bytes().reshape(n_rows*n_cols, -1) | ||
| tabs = np.full((n_rows*n_cols, 1), ord("\t")) | ||
| a = column.as_bytes().reshape(n_rows * n_cols, -1) | ||
| tabs = np.full((n_rows * n_cols, 1), ord("\t")) | ||
| b = np.hstack([a, tabs]) | ||
@@ -39,9 +40,9 @@ b = b.reshape((n_rows, -1))[:, :-1] | ||
| def optional_ints_to_strings(number: np.ndarray, missing_string='.')->EncodedRaggedArray: | ||
| if np.all(number)==np.nan: | ||
| return as_encoded_array([missing_string]*len(number)) | ||
| def optional_ints_to_strings(number: np.ndarray, missing_string='.') -> EncodedRaggedArray: | ||
| if np.all(number) == np.nan: | ||
| return as_encoded_array([missing_string] * len(number)) | ||
| return ints_to_strings(number) | ||
| def get_column(values, field_type) -> EncodedRaggedArray: | ||
@@ -78,4 +79,12 @@ def get_func_for_datatype(datatype): | ||
| Parameters | ||
| data : bnpdataclass | ||
| Data | ||
| data_dict: List[Tuple] | ||
| A list of tuples where each tuple contains the field name and the field value. | ||
| sep: str | ||
| The separator to use between fields. | ||
| Returns | ||
| ------- | ||
| EncodedArray | ||
| A buffer containing the data in CSV format. | ||
| """ | ||
@@ -89,2 +98,16 @@ | ||
| def join_columns(columns: List[EncodedRaggedArray], sep: str) -> EncodedRaggedArray: | ||
| """ | ||
| Join columns into a single buffer. | ||
| Parameters | ||
| ---------- | ||
| columns: List[EncodedRaggedArray] | ||
| sep: str | ||
| Returns | ||
| ------- | ||
| EncodedRaggedArray | ||
| The lines of the buffer | ||
| """ | ||
| lengths = np.concatenate([((column.lengths if | ||
@@ -91,0 +114,0 @@ isinstance(column, RaggedArray) |
| from functools import lru_cache | ||
| from typing import Optional, List | ||
| from typing import Optional, List, Union, Any, Type, Tuple | ||
@@ -11,3 +11,3 @@ import numpy as np | ||
| from .strops import str_to_int, str_to_int_with_missing, str_to_float, str_to_float_with_missing | ||
| from ..bnpdataclass import bnpdataclass | ||
| from ..bnpdataclass import bnpdataclass, BNPDataClass | ||
| from ..encoded_array import EncodedArray, EncodedRaggedArray, Encoding, as_encoded_array | ||
@@ -23,3 +23,3 @@ from ..encodings import BaseEncoding | ||
| def move_intervals_to_digit_array(data, starts, ends, fill_value): | ||
| if len(starts)==0: | ||
| if len(starts) == 0: | ||
| return np.zeros_like(data, shape=((0, 0))) | ||
@@ -37,6 +37,5 @@ max_chars = np.max(ends - starts) | ||
| def move_intervals_to_right_padded_array(data, starts, ends, fill_value, stop_at=None): | ||
| lens = ends - starts | ||
| max_chars = np.max(lens) | ||
| indices = np.minimum(starts[..., None] + np.arange(max_chars), data.size-1) | ||
| indices = np.minimum(starts[..., None] + np.arange(max_chars), data.size - 1) | ||
| array = data[indices] | ||
@@ -46,3 +45,3 @@ del indices | ||
| new_lens = np.argmax(array == stop_at, axis=-1) | ||
| lens = np.where(new_lens>0, np.minimum(lens, new_lens), lens) | ||
| lens = np.where(new_lens > 0, np.minimum(lens, new_lens), lens) | ||
| max_chars = np.max(lens) | ||
@@ -56,3 +55,3 @@ array = array[:, :max_chars].ravel() | ||
| cm = np.cumsum(z_lens[row_idxs]) | ||
| diffs = np.diff(row_idxs)*max_chars-z_lens[row_idxs[1:]] | ||
| diffs = np.diff(row_idxs) * max_chars - z_lens[row_idxs[1:]] | ||
| del row_idxs | ||
@@ -62,3 +61,3 @@ index_builder = np.ones(cm[-1], dtype=np.int32) | ||
| del cm | ||
| index_builder[0] = first_row*max_chars+lens[first_row] | ||
| index_builder[0] = first_row * max_chars + lens[first_row] | ||
| np.cumsum(index_builder, out=index_builder) | ||
@@ -113,11 +112,10 @@ zeroed = index_builder | ||
| @lru_cache() | ||
| def size(self): | ||
| def size(self) -> int: | ||
| return self.data.size | ||
| @property | ||
| def data(self): | ||
| def data(self) -> EncodedArray: | ||
| return self._buffer_extractor.data | ||
| def __getitem__(self, idx): | ||
| def __getitem__(self, idx: Union[int, slice, List[int]]): | ||
| return NotImplemented | ||
@@ -130,3 +128,3 @@ | ||
| @property | ||
| def header_data(self): | ||
| def header_data(self) -> Any: | ||
| if hasattr(self, "_header_data"): | ||
@@ -137,15 +135,11 @@ return self._header_data | ||
| @property | ||
| def n_lines(self): | ||
| def n_lines(self) -> int: | ||
| return NotImplemented | ||
| return len(self._new_lines) | ||
| @classmethod | ||
| def modify_class_with_header_data(cls, header_data): | ||
| def modify_class_with_header_data(cls, header_data: Any) -> Type["FileBuffer"]: | ||
| return cls | ||
| # class NewClass(cls): | ||
| # _header_data = header_data | ||
| @classmethod | ||
| def read_header(cls, file_object: FileIO): | ||
| def read_header(cls, file_object: FileIO) -> str: | ||
| """Read the header data from the file | ||
@@ -226,2 +220,3 @@ | ||
| def validate_if_not(self): | ||
| """Validate the buffer if it has not been validated yet""" | ||
| if not self._is_validated: | ||
@@ -231,3 +226,3 @@ self._validate() | ||
| def get_data(self) -> bnpdataclass: | ||
| def get_data(self) -> BNPDataClass: | ||
| """Extract the data from the buffer | ||
@@ -239,3 +234,3 @@ | ||
| ------- | ||
| npdataclass | ||
| BNPDataClass | ||
| dataset containing the data from the buffer | ||
@@ -258,3 +253,3 @@ """ | ||
| max_chars = array.shape[-1] | ||
| to_indices = ends[::-1, None]-max_chars+np.arange(max_chars) | ||
| to_indices = ends[::-1, None] - max_chars + np.arange(max_chars) | ||
| self._data[to_indices] = array[::-1] | ||
@@ -282,5 +277,4 @@ | ||
| @classmethod | ||
| def contains_complete_entry(cls, chunks): | ||
| def contains_complete_entry(cls, chunks: List[np.ndarray]) -> bool: | ||
| n_new_lines = sum(np.count_nonzero(EncodedArray(chunk, BaseEncoding) == NEWLINE) for chunk in chunks) | ||
@@ -293,2 +287,3 @@ return n_new_lines >= cls.n_lines_per_entry | ||
| class IncompleteEntryException(Exception): | ||
@@ -299,3 +294,7 @@ pass | ||
| class TextBufferExtractor: | ||
| def __init__(self, data: EncodedArray, field_starts: np.ndarray, field_ends: np.ndarray=None, field_lens: np.ndarray=None): | ||
| """ | ||
| Base class for extracting data from a text buffer. | ||
| """ | ||
| def __init__(self, data: EncodedArray, field_starts: np.ndarray, field_ends: np.ndarray = None, | ||
| field_lens: np.ndarray = None): | ||
| ''' | ||
@@ -310,3 +309,3 @@ field_starts: n_entries x n_fields | ||
| assert field_ends is not None | ||
| self._field_lens = field_ends-field_starts | ||
| self._field_lens = field_ends - field_starts | ||
| else: | ||
@@ -318,7 +317,7 @@ assert field_ends is None | ||
| @property | ||
| def data(self): | ||
| def data(self) -> EncodedArray: | ||
| return self._data | ||
| @property | ||
| def n_fields(self): | ||
| def n_fields(self) -> int: | ||
| return self._n_fields | ||
@@ -329,3 +328,3 @@ | ||
| def __getitem__(self, idx): | ||
| def __getitem__(self, idx: Union[int, slice, List[int]]) -> 'TextBufferExtractor': | ||
| return self.__class__(self._data, | ||
@@ -335,3 +334,15 @@ field_starts=self._field_starts[idx], | ||
| def get_field_by_number(self, field_nr: int, keep_sep=False): | ||
| def get_field_by_number(self, field_nr: int, keep_sep: bool=False) -> EncodedRaggedArray: | ||
| """ | ||
| Extract the data for a single field. | ||
| Parameters | ||
| ---------- | ||
| field_nr: int | ||
| keep_sep: bool | ||
| Returns | ||
| ------- | ||
| EncodedRaggedArray | ||
| """ | ||
| assert field_nr < self._n_fields, (field_nr, self._n_fields) | ||
@@ -344,5 +355,2 @@ lens = self._field_lens.ravel()[field_nr::self._n_fields] | ||
| def _extract_data(self, lens, starts): | ||
@@ -353,17 +361,30 @@ values = EncodedRaggedArray(self._data, RaggedView2(starts, lens)) | ||
| def get_fixed_length_field(self, field_nr: int, field_length: int): | ||
| def get_fixed_length_field(self, field_nr: int, field_length: int)-> EncodedArray: | ||
| indices = self._field_starts[:, field_nr, None] + np.arange(field_length) | ||
| return self._data[indices] | ||
| def get_padded_field(self, field_nr, stop_at=None): | ||
| def get_padded_field(self, field_nr, stop_at=None) -> EncodedArray: | ||
| starts = self._field_starts[:, field_nr] | ||
| if starts.size == 0: | ||
| return np.zeros_like(self._data, shape = (len(starts), 0)) | ||
| return np.zeros_like(self._data, shape=(len(starts), 0)) | ||
| lens = self._field_lens[:, field_nr] | ||
| ends = lens+starts | ||
| ends = lens + starts | ||
| array = move_intervals_to_right_padded_array(self._data, starts.ravel(), ends.ravel(), fill_value='\x00', stop_at=stop_at) | ||
| return array.reshape(starts.shape+(array.shape[-1],)) | ||
| array = move_intervals_to_right_padded_array(self._data, starts.ravel(), ends.ravel(), fill_value='\x00', | ||
| stop_at=stop_at) | ||
| return array.reshape(starts.shape + (array.shape[-1],)) | ||
| def get_digit_array(self, field_nr: int): | ||
| def get_digit_array(self, field_nr: int) -> Tuple[EncodedArray, Optional[np.ndarray], Optional[np.ndarray]]: | ||
| """ | ||
| Extract the digits of the field as a 2D array of encoded integres. | ||
| Parameters | ||
| ---------- | ||
| field_nr: int | ||
| Returns | ||
| ------- | ||
| EncodedArray | ||
| """ | ||
| starts = self._field_starts[:, field_nr] | ||
@@ -375,3 +396,4 @@ possible_signs = self._data[starts] | ||
| return self.get_field_by_number(field_nr), is_negative, is_positive | ||
| digit_array = move_intervals_to_digit_array(self._data, starts, starts+self._field_lens[:, field_nr], fill_value='0') | ||
| digit_array = move_intervals_to_digit_array(self._data, starts, starts + self._field_lens[:, field_nr], | ||
| fill_value='0') | ||
| return digit_array, None, None | ||
@@ -381,2 +403,14 @@ | ||
| def concatenate(cls, buffers: List['TextBufferExtractor']): | ||
| """ | ||
| Concatenate multiple buffers into a single buffer. | ||
| Parameters | ||
| ---------- | ||
| buffers: List[TextBufferExtractor] | ||
| Returns | ||
| ------- | ||
| TextBufferExtractor | ||
| """ | ||
| sizes = np.array([b._data.size for b in buffers]) | ||
@@ -391,5 +425,10 @@ offsets = np.insert(np.cumsum(sizes), 0, 0) | ||
| class TextThroughputExtractor(TextBufferExtractor): | ||
| def __init__(self, data: EncodedArray, field_starts: np.ndarray, field_ends: np.ndarray=None, field_lens=None, entry_starts: np.ndarray=None, entry_ends:np.ndarray=None, is_contiguous=True): | ||
| """ | ||
| TextBufferExtractor made especially for making it fast to write a modified or filtered buffer to file again. | ||
| """ | ||
| def __init__(self, data: EncodedArray, field_starts: np.ndarray, field_ends: np.ndarray = None, field_lens=None, | ||
| entry_starts: np.ndarray = None, entry_ends: np.ndarray = None, is_contiguous=True): | ||
| if field_lens is None: | ||
| field_lens = field_ends-field_starts | ||
| field_lens = field_ends - field_starts | ||
| super().__init__(data, field_starts, field_lens=field_lens) | ||
@@ -409,3 +448,4 @@ self._entry_starts = entry_starts | ||
| entry_ends = np.concatenate([b._entry_ends + offset for b, offset in zip(buffers, offsets)]) | ||
| return cls(data, starts, field_lens=lens, entry_starts=entry_starts, entry_ends=entry_ends, is_contiguous=all(b._is_contiguous for b in buffers)) | ||
| return cls(data, starts, field_lens=lens, entry_starts=entry_starts, entry_ends=entry_ends, | ||
| is_contiguous=all(b._is_contiguous for b in buffers)) | ||
@@ -427,9 +467,7 @@ def __getitem__(self, idx): | ||
| self._entry_ends = new_starts[1:] | ||
| self._field_starts = self._field_starts-offsets[:, None] | ||
| # self._field_ends = self._field_ends-offsets[:, None] | ||
| self._field_starts = self._field_starts - offsets[:, None] | ||
| self._is_contiguous = True | ||
| @property | ||
| def data(self): | ||
| def data(self) -> EncodedArray: | ||
| if not self._is_contiguous: | ||
@@ -443,5 +481,5 @@ self._make_contigous() | ||
| starts = self._field_starts[:, from_nr] | ||
| lens = self._entry_ends-starts | ||
| lens = self._entry_ends - starts | ||
| if not keep_sep: | ||
| lens-=1 | ||
| lens -= 1 | ||
| return self._extract_data(lens, starts) |
+20
-5
| from pathlib import PurePath | ||
| from typing import Union | ||
| from typing import Union, Optional | ||
@@ -16,2 +16,3 @@ from .gzip_reading import gzip | ||
| from .vcf_buffers import VCFBuffer | ||
| from .pairs import PairsBuffer | ||
| from .wig import WigBuffer | ||
@@ -41,6 +42,8 @@ from .parser import NumpyFileReader, NpBufferedWriter, NumpyBamWriter | ||
| ".gff3": GFFBuffer, | ||
| ".sam": SAMBuffer, #, comment="@"), | ||
| ".sam": SAMBuffer, | ||
| ".bam": BamBuffer, | ||
| ".sizes": ChromosomeSizeBuffer, | ||
| '.wig': WigBuffer | ||
| '.wig': WigBuffer, | ||
| '.pairs': PairsBuffer, | ||
| '.pa5': PairsBuffer, | ||
| } | ||
@@ -226,6 +229,18 @@ | ||
| def read(filename: str, mode: str = None, buffer_type=None) -> NpDataclassReader: | ||
| 'openes a file, reads it and closes it ' | ||
| def read(filename: str, mode: str = None, buffer_type: Optional[FileBuffer]=None) -> BNPDataClass: | ||
| """ | ||
| Read the content of a file | ||
| Parameters | ||
| ---------- | ||
| filename: str | ||
| mode: str | ||
| buffer_type: | ||
| Returns | ||
| ------- | ||
| BNPDataClass | ||
| """ | ||
| with bnp_open(filename, mode, buffer_type) as f: | ||
| content = f.read() | ||
| return content |
| from pathlib import Path | ||
| from typing import Union | ||
| from typing import Union, Dict, Iterable, Tuple | ||
@@ -77,3 +77,3 @@ import numpy as np | ||
| def get_contig_lengths(self) -> dict: | ||
| def get_contig_lengths(self) -> Dict[str, int]: | ||
| """Return a dict of chromosome names to seqeunce lengths | ||
@@ -88,9 +88,9 @@ | ||
| def keys(self): | ||
| def keys(self) -> Iterable[str]: | ||
| return self._index.keys() | ||
| def values(self): | ||
| def values(self) -> Iterable[EncodedArray]: | ||
| return (self[key] for key in self.keys()) | ||
| def items(self): | ||
| def items(self) -> Iterable[Tuple[str, EncodedArray]]: | ||
| return ((key, self[key]) for key in self.keys()) | ||
@@ -107,3 +107,3 @@ | ||
| chromosome : str | ||
| chromsome name | ||
| chromosome name | ||
@@ -113,3 +113,3 @@ Returns | ||
| EncodedArray | ||
| The sequence for that chromoeme | ||
| The sequence for that chromosome | ||
| """ | ||
@@ -164,4 +164,2 @@ idx = self._index[chromosome] | ||
| for j in range(n_row)]) | ||
| # assert not np.any(sequence == 10), (np.flatnonzero(r_sequence==10), [lenb*(j+1)-1-start_mod | ||
| # for j in range(n_row)], read_start, read_length, n_row, start_mod, lenb, a_offset, intervals[i:i+1], indices[i:i+1]) | ||
| pre_alloc[a_offset:a_offset+sequence.size] = sequence | ||
@@ -172,3 +170,3 @@ a = EncodedArray(pre_alloc, BaseEncoding) | ||
| def get_interval_sequences(self, intervals: Interval) -> EncodedRaggedArray: | ||
| """Get the seqeunces for a set of genomic intervals | ||
| """Get the sequences for a set of genomic intervals | ||
@@ -213,5 +211,3 @@ Parameters | ||
| assert np.all(pre_alloc> 0), np.sum(pre_alloc==0) | ||
| # s = np.delete(np.array(sequences, dtype=np.uint8), delete_indices) | ||
| #s = np.delete(pre_alloc[:alloc_offset], delete_indices) | ||
| a = EncodedArray(pre_alloc, BaseEncoding) | ||
| return EncodedRaggedArray(a, lengths) |
| from itertools import takewhile, repeat | ||
| from typing import Optional | ||
@@ -33,2 +34,3 @@ from .delimited_buffers import DelimitedBufferWithInernalComments | ||
| def close(self): | ||
| """Close the file reader""" | ||
| self._reader.close() | ||
@@ -44,3 +46,3 @@ | ||
| bnpdataclass | ||
| A dataclass holdin all the entries in the class | ||
| A dataclass holding all the entries in the class | ||
@@ -60,16 +62,2 @@ Examples | ||
| def _get_lazy_class(self, dataclass, header=None): | ||
| if self.__lazy_class is None: | ||
| self.__lazy_class = create_lazy_class(dataclass, header=header) | ||
| return self.__lazy_class | ||
| def _should_be_lazy(self, chunk): | ||
| if ((not config.LAZY) and self._lazy is None) or (self._lazy is False): | ||
| return False | ||
| should_be_lazy = False | ||
| if hasattr(chunk, 'get_field_by_number') and hasattr(chunk, 'dataclass'): | ||
| if not issubclass(chunk.dataclass, (GTFEntry)): | ||
| should_be_lazy = True | ||
| return should_be_lazy | ||
| def read_chunk(self, min_chunk_size: int = 5000000, max_chunk_size: int = None) -> BNPDataClass: | ||
@@ -84,15 +72,12 @@ """Read a single chunk into memory | ||
| ---------- | ||
| chunk_size: int | ||
| How many bytes to read from file | ||
| min_chunk_size: int | ||
| How many bytes to minimally read from file | ||
| max_chunk_size: int | ||
| How many bytes to maximally read from file in order to get a full entry | ||
| Returns | ||
| ------- | ||
| bnpdataclass | ||
| A dataclass holdin all the entries in the next chunk | ||
| BNPDataClass | ||
| A dataclass holding all the entries in the next chunk | ||
| Examples | ||
| -------- | ||
| 5 | ||
| """ | ||
@@ -113,3 +98,3 @@ n_lines_read = self._reader.n_lines_read | ||
| def read_chunks(self, min_chunk_size: int = 5000000, max_chunk_size: int = None) -> NpDataclassStream: | ||
| def read_chunks(self, min_chunk_size: int = 5000000, max_chunk_size: Optional[int] = None) -> NpDataclassStream: | ||
| """Read the whole file in chunks | ||
@@ -123,4 +108,6 @@ | ||
| ---------- | ||
| chunk_size : int | ||
| Number of bytes to read per chunk | ||
| min_chunk_size : int | ||
| Minimum size of each chunk | ||
| max_chunk_size : int | ||
| Maximum size of each chunk | ||
@@ -130,8 +117,5 @@ Returns | ||
| NpDataclassStream | ||
| 4 | ||
| Examples | ||
| -------- | ||
| 5 | ||
| """ | ||
@@ -142,5 +126,4 @@ data_stream = takewhile(len, (self.read_chunk(min_chunk_size, max_chunk_size) for _ in repeat(None))) | ||
| def __iter__(self) -> NpDataclassStream: | ||
| """Iteratate over chunks in the file | ||
| """Iteratate over chunks in the file see `read_chunks` | ||
@@ -150,5 +133,20 @@ Returns | ||
| NpDataclassStream | ||
| 3 | ||
| """ | ||
| return self.read_chunks() | ||
| def _get_lazy_class(self, dataclass, header=None): | ||
| if self.__lazy_class is None: | ||
| self.__lazy_class = create_lazy_class(dataclass, header=header) | ||
| return self.__lazy_class | ||
| def _should_be_lazy(self, chunk): | ||
| if ((not config.LAZY) and self._lazy is None) or (self._lazy is False): | ||
| return False | ||
| should_be_lazy = False | ||
| if hasattr(chunk, 'get_field_by_number') and hasattr(chunk, 'dataclass'): | ||
| if not issubclass(chunk.dataclass, (GTFEntry)): | ||
| should_be_lazy = True | ||
| return should_be_lazy | ||
| import dataclasses | ||
| from typing import List | ||
| from typing import List, Tuple, Optional, Union | ||
@@ -8,3 +8,3 @@ import numpy as np | ||
| from ..encoded_array import EncodedArray, BaseEncoding, change_encoding, EncodedRaggedArray | ||
| from ..bnpdataclass import bnpdataclass | ||
| from ..bnpdataclass import bnpdataclass, BNPDataClass | ||
| from ..datatypes import SequenceEntry | ||
@@ -28,26 +28,12 @@ from .exceptions import FormatException | ||
| @property | ||
| def n_lines(self): | ||
| def n_lines(self) -> int: | ||
| return len(self._buffer_extractor)*self.n_lines_per_entry | ||
| @property | ||
| def data(self): | ||
| def data(self) -> EncodedArray: | ||
| return self._buffer_extractor.data | ||
| @classmethod | ||
| def _get_buffer_extractor(cls, data, new_lines): | ||
| tmp = np.insert(new_lines, 0, -1)+1 | ||
| field_ends = new_lines.reshape(-1, cls.n_lines_per_entry) | ||
| field_ends = cls._modify_for_carriage_return(field_ends, data) | ||
| field_starts = tmp[:-1].reshape(-1, cls.n_lines_per_entry)+(np.array(cls._line_offsets)) | ||
| entry_starts = tmp[:-1:cls.n_lines_per_entry] | ||
| entry_ends = tmp[::cls.n_lines_per_entry][1:] | ||
| return TextThroughputExtractor(data, | ||
| field_starts, | ||
| field_ends=field_ends, | ||
| entry_starts=entry_starts, | ||
| entry_ends=entry_ends) | ||
| @classmethod | ||
| def contains_complete_entry(cls, chunks): | ||
| def contains_complete_entry(cls, chunks: List[EncodedArray]) -> Tuple[bool, EncodedArray]: | ||
| if len(chunks) == 1: | ||
@@ -61,3 +47,3 @@ try: | ||
| @classmethod | ||
| def from_raw_buffer(cls, chunk, header_data=None) -> "OneLineBuffer": | ||
| def from_raw_buffer(cls, chunk: EncodedArray, header_data=None) -> "OneLineBuffer": | ||
| """Create a buffer with full entries | ||
@@ -77,6 +63,2 @@ | ||
| Examples | ||
| -------- | ||
| 8 | ||
| """ | ||
@@ -95,3 +77,3 @@ assert header_data is None | ||
| def get_data(self) -> bnpdataclass: | ||
| def get_data(self) -> BNPDataClass: | ||
| """Get and parse fields from each line""" | ||
@@ -101,3 +83,3 @@ headers, sequences = [self._buffer_extractor.get_field_by_number(i) for i in (0, 1)] | ||
| def get_field_by_number(self, i: int, t: type=None): | ||
| def get_field_by_number(self, i: int, t: Optional[type]= None) -> Union[np.ndarray, EncodedArray, EncodedRaggedArray]: | ||
| """ Get a field indexed by number""" | ||
@@ -117,3 +99,3 @@ | ||
| def get_field_range_as_text(self, start, end): | ||
| def get_field_range_as_text(self, start: int, end: int) -> EncodedRaggedArray: | ||
| """Get a range of fields as text""" | ||
@@ -124,3 +106,3 @@ assert end == start+1 | ||
| @classmethod | ||
| def from_data(cls, entries: bnpdataclass) -> "OneLineBuffer": | ||
| def from_data(cls, entries: BNPDataClass) -> "OneLineBuffer": | ||
| """Convert the data from the entries into a buffer that can be written to file | ||
@@ -132,3 +114,3 @@ | ||
| ---------- | ||
| entries : bnpdataclass | ||
| entries : BNPDataClass | ||
| The entries to be written to the buffer | ||
@@ -142,14 +124,8 @@ | ||
| """ | ||
| # names = entries.name | ||
| # sequences = entries.sequence | ||
| # if entries.sequence.encoding != BaseEncoding: | ||
| # sequences = change_encoding(sequences, BaseEncoding) | ||
| data_dict = [(field.type, getattr(entries, field.name)) for field in dataclasses.fields(entries)] | ||
| columns= [get_column(value, key) for key, value in data_dict] | ||
| return cls.join_fields(columns) | ||
| # return cls.join_fields([names, sequences]) | ||
| @classmethod | ||
| def join_fields(cls, fields: List[EncodedRaggedArray]): | ||
| def join_fields(cls, fields: List[EncodedRaggedArray]) -> EncodedArray: | ||
| field_lengths = np.hstack([field.shape[1][:, None] for field in fields]) | ||
@@ -170,3 +146,22 @@ line_lengths = field_lengths+1 | ||
| def get_text_field_by_number(self, i: int) -> Union[np.ndarray, EncodedArray, EncodedRaggedArray]: | ||
| return self.get_field_by_number(i) | ||
| @classmethod | ||
| def _get_buffer_extractor(cls, data, new_lines): | ||
| tmp = np.insert(new_lines, 0, -1)+1 | ||
| field_ends = new_lines.reshape(-1, cls.n_lines_per_entry) | ||
| field_ends = cls._modify_for_carriage_return(field_ends, data) | ||
| field_starts = tmp[:-1].reshape(-1, cls.n_lines_per_entry)+(np.array(cls._line_offsets)) | ||
| entry_starts = tmp[:-1:cls.n_lines_per_entry] | ||
| entry_ends = tmp[::cls.n_lines_per_entry][1:] | ||
| return TextThroughputExtractor(data, | ||
| field_starts, | ||
| field_ends=field_ends, | ||
| entry_starts=entry_starts, | ||
| entry_ends=entry_ends) | ||
| @classmethod | ||
| def _validate(cls, data, new_lines): | ||
@@ -191,5 +186,2 @@ header = cls.HEADER | ||
| def get_text_field_by_number(self, i): | ||
| return self.get_field_by_number(i) | ||
| @classmethod | ||
@@ -206,4 +198,8 @@ def _modify_for_carriage_return(cls, field_ends, data): | ||
| class TwoLineFastaBuffer(OneLineBuffer): | ||
| """ | ||
| Buffer for fasta files where each entry is contained in two lines (one for header and one for sequence) | ||
| For multi-line fasta files, use MultiLineFastaBuffer | ||
| """ | ||
| HEADER = ">" # 62 | ||
| n_lines_per_entry = 2 | ||
| dataclass = SequenceEntry |
+31
-23
| import codecs | ||
| import io | ||
| import logging | ||
| import numpy as np | ||
| from ..bnpdataclass import BNPDataClass | ||
| try: | ||
| from typing import IO | ||
| from typing import IO, Union, Iterable | ||
| except ImportError: | ||
@@ -197,9 +201,2 @@ from typing.io import IO | ||
| a, bytes_read = self.__add_newline_to_end(a, bytes_read) | ||
| # | ||
| # if a[bytes_read - 1] != ord("\n"): | ||
| # a = np.append(a, ord("\n")) | ||
| # bytes_read += 1 | ||
| # if hasattr(self._buffer_type, "_new_entry_marker"): | ||
| # a = np.append(a, self._buffer_type._new_entry_marker) | ||
| # bytes_read += 1 | ||
| return a[:bytes_read] | ||
@@ -215,7 +212,6 @@ | ||
| """ | ||
| File writer that can write @npdataclass objects | ||
| to file | ||
| File writer that can write BNPDataClass objects to file | ||
| """ | ||
| def __init__(self, file_obj, buffer_type): | ||
| def __init__(self, file_obj: io.FileIO, buffer_type: FileBuffer): | ||
| self._file_obj = file_obj | ||
@@ -240,3 +236,3 @@ self._buffer_type = buffer_type | ||
| def write(self, data: npdataclass): | ||
| def write(self, data: Union[BNPDataClass, BnpStream, grouped_stream]): | ||
| """Write the provided data to file | ||
@@ -246,4 +242,4 @@ | ||
| ---------- | ||
| data : npdataclass | ||
| dataset containing entries | ||
| data : Union[BNPDataClass, BnpStream, grouped_stream] | ||
| dataset containing entries, or stram of datasets | ||
@@ -264,3 +260,2 @@ """ | ||
| (not hasattr(self._file_obj, "mode") or self._file_obj.mode != 'ab'): # and \ | ||
| #getattr(self._buffer_type, 'HAS_UNCOMMENTED_HEADER_LINE', False): | ||
| if not self._header_written: | ||
@@ -275,7 +270,2 @@ header_array = self._buffer_type.make_header(data) | ||
| bytes_array = data.get_buffer(buffer_class=self._buffer_type) | ||
| # if not hasattr(self._buffer_type, 'get_column_range_as_text'): | ||
| # data = data.get_data_object() | ||
| # bytes_array = self._buffer_type.from_data(data) | ||
| # else: | ||
| # | ||
| else: | ||
@@ -286,4 +276,3 @@ bytes_array = self._buffer_type.from_data(data) | ||
| bytes_array = bytes_array.raw() | ||
| self._file_obj.write(bytes(bytes_array)) # .tofile(self._file_obj) | ||
| # self._file_obj.flush() | ||
| self._file_obj.write(bytes(bytes_array)) | ||
| logger.debug( | ||
@@ -295,2 +284,6 @@ f"Wrote chunk of size {repr_bytes(bytes_array.size)} to {self._f_name}" | ||
| class NumpyBamWriter(NpBufferedWriter): | ||
| """ | ||
| Class for handling writing of BAM files | ||
| """ | ||
| EOF_MARKER = b'\x1f\x8b\x08\x04\x00\x00\x00\x00\x00\xff\x06\x00\x42\x43\x02\x00\x1b\x00\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00' | ||
@@ -304,3 +297,18 @@ | ||
| def chunk_lines(stream, n_lines): | ||
| def chunk_lines(stream: Iterable[FileBuffer], n_lines: int) -> Iterable[FileBuffer]: | ||
| """ | ||
| Chunk the content of the stream so that each chunk contains exactly `n_lines` lines (except the last) | ||
| Parameters | ||
| ---------- | ||
| stream : Iterable[FileBuffer] | ||
| Stream of FileBuffers | ||
| n_lines : int | ||
| Number of lines in each chunk | ||
| Returns | ||
| ------- | ||
| Iterable[FileBuffer] | ||
| Stream of FileBuffers, each containing `n_lines` lines | ||
| """ | ||
| cur_buffers = [] | ||
@@ -307,0 +315,0 @@ remaining_lines = n_lines |
@@ -28,3 +28,3 @@ import numpy as np | ||
| import matplotlib.pyplot as _plt | ||
| plt.style.use('Solarize_Light2') | ||
| _plt.style.use('Solarize_Light2') | ||
| self._plt = _plt | ||
@@ -31,0 +31,0 @@ except: |
@@ -0,1 +1,3 @@ | ||
| from typing import List, Dict, Optional | ||
| import numpy as np | ||
@@ -10,2 +12,6 @@ from numpy.typing import ArrayLike | ||
| class EncodedCounts: | ||
| """ | ||
| Class for storing counts of encoded data. | ||
| """ | ||
| alphabet: list | ||
@@ -33,3 +39,3 @@ counts: np.ndarray | ||
| def __getitem__(self, idx): | ||
| def __getitem__(self, idx: str): | ||
| return self.counts[..., self.alphabet.index(idx)] | ||
@@ -55,4 +61,24 @@ | ||
| def __array_ufunc__(self, ufunc, method, *inputs, **kwargs): | ||
| if method == "__call__": | ||
| assert all(i.alphabet == self.alphabet for i in inputs if isinstance(i, EncodedCounts)) | ||
| assert all(i.alphabet == self.alphabet for i in kwargs.values() if isinstance(i, EncodedCounts)) | ||
| arrays = [i.counts if isinstance(i, EncodedCounts) else i for i in inputs] | ||
| kwargs = {k: i.counts if isinstance(i, EncodedCounts) else i for k, i in kwargs.items()} | ||
| return self.__class__(self.alphabet, getattr(ufunc, method)(*arrays, **kwargs)) | ||
| else: | ||
| return NotImplemented | ||
| @property | ||
| def proportions(self): | ||
| def proportions(self) -> np.ndarray: | ||
| """ | ||
| Calculate the proportions of each label in the counts. | ||
| Returns | ||
| ------- | ||
| np.ndarray | ||
| The proportions of each label in the counts. | ||
| """ | ||
| s = self.counts.sum(axis=-1, keepdims=True) | ||
@@ -66,7 +92,19 @@ return np.where(s > 0, self.counts / s, 0) | ||
| def get_count_for_label(self, label): | ||
| def get_count_for_label(self, label: str) -> int: | ||
| """ | ||
| Get the count for a specific label. | ||
| Parameters | ||
| ---------- | ||
| label: str | ||
| Returns | ||
| ------- | ||
| int | ||
| """ | ||
| return np.sum(self.counts[..., self.alphabet.index(l)] for l in label) | ||
| @property | ||
| def labels(self): | ||
| def labels(self) -> List[str]: | ||
| return self.alphabet | ||
@@ -84,3 +122,15 @@ | ||
| def most_common(self, n=None): | ||
| def most_common(self, n: Optional[int]=None) -> 'EncodedCounts': | ||
| """ | ||
| Extract counts for the n most common labels. | ||
| Parameters | ||
| ---------- | ||
| n | ||
| Returns | ||
| ------- | ||
| EncodedCounts | ||
| """ | ||
| args = np.argsort(self.counts)[::-1] | ||
@@ -93,3 +143,11 @@ if n is not None: | ||
| def as_dict(self): | ||
| def as_dict(self) -> Dict[str, np.ndarray]: | ||
| """ | ||
| Convert the counts to a dictionary. | ||
| Returns | ||
| ------- | ||
| Dict[str, np.ndarray] | ||
| """ | ||
| return dict(zip(self.alphabet, self.counts.T)) | ||
@@ -99,3 +157,3 @@ | ||
| def count_encoded(values: EncodedArrayLike, weights: ArrayLike = None, axis: int = -1) -> EncodedCounts: | ||
| """Count the occurances of encoded entries. Works on any encoding with finite alphabet | ||
| """Count the occurances of encoded entries. Works on any encoding with finite alphabet. | ||
@@ -106,3 +164,5 @@ Parameters | ||
| weights : ArrayLike | ||
| Weights for each entry | ||
| axis : int | ||
| 0 for column counts, -1 or 1 for row counts None for flattened counts | ||
@@ -109,0 +169,0 @@ Returns |
@@ -69,3 +69,17 @@ from ..datatypes import Interval | ||
| @streamable() | ||
| def get_strand_specific_sequences(encoded_array: EncodedArray, stranded_intervals: Interval): | ||
| def get_strand_specific_sequences(encoded_array: EncodedArray, stranded_intervals: Interval) -> EncodedRaggedArray: | ||
| """Extract the sequences within the intervals, and reverse complement if the strand is negative | ||
| Parameters | ||
| ---------- | ||
| encoded_array : EncodedArray | ||
| The encoded sequences | ||
| stranded_intervals : Interval | ||
| The intervals with strands | ||
| Returns | ||
| ------- | ||
| EncodedArray | ||
| The (possibly reverse complemented) sequences | ||
| """ | ||
| relevant_sequences = encoded_array[stranded_intervals.start:stranded_intervals.stop] | ||
@@ -78,3 +92,17 @@ rev_complimnet_seqs = get_reverse_complement(relevant_sequences) | ||
| @streamable() | ||
| def get_sequences(sequence: EncodedArray, intervals: Interval): | ||
| def get_sequences(sequence: EncodedArray, intervals: Interval) -> EncodedRaggedArray: | ||
| """ | ||
| Get the sequences within the intervals, without caring about strands. | ||
| For stranded intervals use get_strand_specific_sequences | ||
| Parameters | ||
| ---------- | ||
| sequence: EncodedArray | ||
| intervals: Interval | ||
| Returns | ||
| ------- | ||
| EncodedRaggedArray | ||
| """ | ||
| return sequence[intervals.start:intervals.stop] |
@@ -29,4 +29,4 @@ import numpy as np | ||
| def _pwm_from_counts(count_matrix): | ||
| with_pseudo = count_matrix+1 | ||
| return np.log(with_pseudo/with_pseudo.sum(axis=0, keepdims=True)) | ||
| with_pseudo = count_matrix + 1 | ||
| return np.log(with_pseudo / with_pseudo.sum(axis=0, keepdims=True)) | ||
@@ -39,2 +39,3 @@ | ||
| """ | ||
| def __init__(self, matrix, alphabet): | ||
@@ -48,3 +49,3 @@ self._matrix = matrix | ||
| if isinstance(sequence, (EncodedArray, EncodedRaggedArray)): | ||
| if isinstance(sequence.encoding, AlphabetEncoding): | ||
@@ -59,6 +60,4 @@ alphabet = list((sequence.encoding.get_alphabet())) | ||
| @property | ||
| def alphabet(self): | ||
| def alphabet(self) -> str: | ||
| return self._alphabet | ||
@@ -70,3 +69,3 @@ | ||
| @property | ||
| def window_size(self): | ||
| def window_size(self) -> int: | ||
| return self._matrix.shape[-1] | ||
@@ -102,12 +101,10 @@ | ||
| sequence = self.as_valid_encoded_array(sequence) | ||
| # sequence = as_encoded_array(sequence, self._encoding) | ||
| # assert sequence.encoding == self._encoding | ||
| scores = np.zeros(sequence.size, dtype=float) | ||
| m = self._matrix.T.copy() | ||
| for offset, row in enumerate(m): | ||
| scores[:scores.size-offset] += row[sequence[offset:].raw()] | ||
| scores[:scores.size - offset] += row[sequence[offset:].raw()] | ||
| return scores | ||
| @classmethod | ||
| def from_dict(cls, dictionary: Dict[str, ArrayLike], background: Dict[str, float]=None) -> "PWM": | ||
| def from_dict(cls, dictionary: Dict[str, ArrayLike], background: Dict[str, float] = None) -> "PWM": | ||
| """Create a PWM object from a dict of letters to position probabilities | ||
@@ -133,18 +130,13 @@ | ||
| if background is None: | ||
| background = {key: 1/len(dictionary) for key in dictionary} | ||
| background = {key: 1 / len(dictionary) for key in dictionary} | ||
| alphabet = "".join(dictionary.keys()) | ||
| with np.errstate(divide="ignore"): | ||
| matrix = np.log(np.array(list(dictionary.values())))-np.log([background[key] for key in dictionary])[:, np.newaxis] | ||
| matrix = np.log(np.array(list(dictionary.values()))) - np.log([background[key] for key in dictionary])[:, | ||
| np.newaxis] | ||
| return cls(matrix, alphabet) | ||
| # @classmethod | ||
| # def from_motif(cls, motif: Motif): | ||
| # return cls(motif.matrix, motif.alphabet) | ||
| @classmethod | ||
| def from_counts(cls, counts: typing.Union[dict]): | ||
| # if isinstance(counts, Motif): | ||
| # return cls(_pwm_from_counts(counts.matrix), counts.alphabet) | ||
| # else: | ||
| return cls(_pwm_from_counts(np.array(list(counts.values()))), | ||
| def from_counts(cls, counts: Dict[str, typing.List[int]]) -> "PWM": | ||
| """Create a PWM object from a dict of letters to position counts""" | ||
| return cls(_pwm_from_counts(np.array(list(counts.values()))), | ||
| "".join(counts.keys())) | ||
@@ -155,6 +147,5 @@ | ||
| return "PWM with alphabet " + self._alphabet + "\n" + \ | ||
| '\n'.join([' '.join([str(round(c, 2)) for c in row]) for row in matrix]) | ||
| '\n'.join([' '.join([str(round(c, 2)) for c in row]) for row in matrix]) | ||
| def get_motif_scores_old(sequence: EncodedRaggedArray, pwm: PWM) -> RaggedArray: | ||
@@ -213,2 +204,2 @@ """Computes motif scores for a motif on a sequence. | ||
| scores = RaggedArray(scores, shape[-1]) | ||
| return scores[..., :(-pwm.window_size+1)] | ||
| return scores[..., :(-pwm.window_size + 1)] |
@@ -17,2 +17,24 @@ import logging | ||
| def match_string(sequence: EncodedArrayLike, matching_sequence: SingleEncodedArrayLike) -> ArrayLike: | ||
| """ | ||
| Matches a sequence aginst sequences and returns a boolean RaggedArray representing positions | ||
| where the sequence matches. | ||
| Parameters | ||
| ---------- | ||
| sequence : | ||
| matching_sequence : | ||
| Returns | ||
| ------- | ||
| ArrayLike | ||
| A boolean RaggedArray representing positions where the sequence matches. | ||
| Examples | ||
| -------- | ||
| >>> import bionumpy as bnp | ||
| >>> sequence = bnp.as_encoded_array(['ACGT', 'TACTAC']) | ||
| >>> matching_sequence = bnp.as_encoded_array('AC', sequence.encoding) | ||
| >>> bnp.match_string(sequence, matching_sequence) | ||
| ragged_array([ True False False] | ||
| [False True False False True]) | ||
| """ | ||
| sequence = as_encoded_array(sequence) | ||
@@ -19,0 +41,0 @@ enforced_encoding = sequence.encoding |
| import numpy as np | ||
| from ..bnpdataclass import BNPDataClass | ||
| from ..streams import streamable | ||
@@ -41,3 +43,3 @@ from ..encodings import BaseEncoding, AlphabetEncoding | ||
| def __call__(self, sequence: EncodedArrayLike): | ||
| def __call__(self, sequence: EncodedArrayLike)-> EncodedArrayLike: | ||
| e = sequence.encoding | ||
@@ -52,3 +54,27 @@ sequence = sequence[..., ::-1] | ||
| @apply_to_npdataclass("sequence") | ||
| def translate_dna_to_protein(sequence): | ||
| def translate_dna_to_protein(sequence: BNPDataClass) -> BNPDataClass: | ||
| """ | ||
| Translate a DNA sequence to a protein sequence. | ||
| Parameters | ||
| ---------- | ||
| sequence : BNPDataClass | ||
| The data that should be translated, should have a sequence attribute | ||
| Returns | ||
| ------- | ||
| BNPDataClass | ||
| The translated data | ||
| Examples | ||
| -------- | ||
| >>> import bionumpy as bnp | ||
| >>> dna = bnp.SequenceEntry.from_entry_tuples([("seq1", "ACGTAT")]) | ||
| >>> protein = bnp.sequence.translate_dna_to_protein(dna) | ||
| >>> protein | ||
| SequenceEntry with 1 entries | ||
| name sequence | ||
| seq1 TY | ||
| """ | ||
| return Translate().windowed(sequence) | ||
@@ -6,14 +6,3 @@ from .stream import BnpStream | ||
| def _chunk_entries(stream: BnpStream, n_entries: int) -> Generator: | ||
| """Chunkk a stream into fixed number of entries | ||
| Parameters | ||
| ---------- | ||
| stream : BnpStream | ||
| n_chunks : int | ||
| Returns | ||
| ------- | ||
| BnpStream | ||
| """ | ||
| b = [] | ||
@@ -34,2 +23,13 @@ buffer_size = 0 | ||
| def chunk_entries(stream: BnpStream, n_entries: int) -> BnpStream: | ||
| """Chunk a stream into fixed number of entries | ||
| Parameters | ||
| ---------- | ||
| stream : BnpStream | ||
| n_entries : int | ||
| Returns | ||
| ------- | ||
| BnpStream | ||
| """ | ||
| return stream.__class__(_chunk_entries(stream, n_entries)) |
@@ -7,4 +7,6 @@ import numpy as np | ||
| class StringArray(np.lib.mixins.NDArrayOperatorsMixin): | ||
| """Wrapper around NumPy arrays of strings. Can be used as datatype in BNPDataClass fields.""" | ||
| wrapped_functions = ['size', 'shape', 'ndim', '__len__'] | ||
| wrapped_properies = ['T'] | ||
| def __init__(self, data): | ||
@@ -11,0 +13,0 @@ self._data = np.asanyarray(data, dtype='S') |
| import typing | ||
| SequenceID = typing.NewType("SequenceID", str) | ||
@@ -70,3 +70,2 @@ import numpy as np | ||
| ascii_hashes = get_ascii_hash(encoded_ragged_array, cls.big_mod) | ||
| idx = Counter(ascii_hashes).most_common(1)[0][0] | ||
| assert len(set(ascii_hashes)) == len(ascii_hashes), (len(set(ascii_hashes)), len(ascii_hashes)) | ||
@@ -73,0 +72,0 @@ hash_table = HashTable(ascii_hashes, np.arange(len(encoded_ragged_array)), mod=modulo) |
@@ -6,3 +6,32 @@ import npstructures as nps | ||
| def ragged_slice(array: EncodedRaggedArray, starts=None, ends=None) -> EncodedRaggedArray: | ||
| slice = nps.ragged_slice(array.ravel(), starts, ends) | ||
| return EncodedRaggedArray(EncodedArray(slice.ravel(), array.encoding), slice.shape) | ||
| """ | ||
| Slice a ragged array column-wise. | ||
| Parameters | ||
| ---------- | ||
| array : EncodedRaggedArray | ||
| The array to slice | ||
| starts : np.ndarray | ||
| The start indices of the slices | ||
| ends : np.ndarray, optional | ||
| The end indices of the slices. If not provided, the slices will be taken to the end of the array. | ||
| Returns | ||
| ------- | ||
| EncodedRaggedArray | ||
| The sliced array | ||
| Examples | ||
| -------- | ||
| >>> import numpy as np | ||
| >>> import bionumpy as bnp | ||
| >>> seqs = bnp.as_encoded_array(["ACGT", "ACGTT"]) | ||
| >>> starts = np.array([0, 1]) | ||
| >>> ends = np.array([2, 3]) | ||
| >>> sliced = bnp.ragged_slice(seqs, starts, ends) | ||
| >>> print(sliced) | ||
| AC | ||
| CG | ||
| """ | ||
| sliced_data = nps.ragged_slice(array.ravel(), starts, ends) | ||
| return EncodedRaggedArray(EncodedArray(sliced_data.ravel(), array.encoding), sliced_data.shape) |
| import numpy as np | ||
| from bionumpy import replace | ||
| from bionumpy import replace, EncodedArray, VCFEntry | ||
| def apply_variants_to_sequence(sequence, variants): | ||
| def apply_variants_to_sequence(sequence: EncodedArray, variants: VCFEntry) -> EncodedArray: | ||
| """ | ||
| Apply variants to a sequence by replacing the reference sequence with the alternative sequence. | ||
| Works only for variants where ref sequence and alt sequence have the same length. | ||
| Parameters | ||
| ---------- | ||
| sequence : EncodedArray | ||
| The sequence to apply the variants to | ||
| variants : VCFEntry | ||
| The variants to apply | ||
| Returns | ||
| ------- | ||
| EncodedArray | ||
| The sequence with the variants applied | ||
| """ | ||
| seq = sequence.copy() | ||
@@ -13,5 +30,13 @@ assert np.all(seq[variants.position] == variants.ref_seq.ravel()), (seq[variants.position], seq[variants.position+1], seq[variants.position-1], | ||
| def apply_variants(sequence_entries, variants): | ||
| """ | ||
| Wrapper around `apply_variants_to_sequence` that applies variants to multiple sequences. | ||
| """ | ||
| assert np.all(variants.alt_seq.lengths == 1) | ||
| return replace(sequence_entries, sequence=[apply_variants_to_sequence(entry.sequence, variants[variants.chromosome==entry.name]) for entry in sequence_entries]) | ||
| return replace(sequence_entries, sequence=[ | ||
| apply_variants_to_sequence( | ||
| entry.sequence, | ||
| variants[variants.chromosome==entry.name] | ||
| ) for entry in sequence_entries]) | ||
+1
-1
| Metadata-Version: 2.1 | ||
| Name: bionumpy | ||
| Version: 1.0.8 | ||
| Version: 1.0.10 | ||
| Summary: Library for working with biological sequence data as numpy arrays. | ||
@@ -5,0 +5,0 @@ Home-page: https://github.com/bionumpy/bionumpy |
+1
-1
@@ -49,3 +49,3 @@ #!/usr/bin/env python | ||
| url='https://github.com/bionumpy/bionumpy', | ||
| version='1.0.8', | ||
| version='1.0.10', | ||
| zip_safe=False, | ||
@@ -52,0 +52,0 @@ extras_require={'full': ['isal']} |
@@ -66,2 +66,3 @@ import numpy as np | ||
| with pytest.raises(ValueError): | ||
| f.write(bam_entries) | ||
| f.write(bam_entries) | ||
@@ -97,3 +97,3 @@ import dataclasses | ||
| # @pytest.mark.skip("Deprecated") | ||
| def test_set_get_context(): | ||
@@ -100,0 +100,0 @@ data = MyClass(a=[10, 20], b=[100, 200]) |
@@ -17,1 +17,9 @@ from bionumpy.sequence.count_encoded import EncodedCounts | ||
| ['T', 'A'], np.array([5, 4])) | ||
| def test_ufuncs(encoded_counts): | ||
| sqrt = np.sqrt(encoded_counts) | ||
| assert sqrt == EncodedCounts(list('ACGT'), np.sqrt([4, 3, 1, 5])) | ||
| assert encoded_counts + 1 == EncodedCounts(list('ACGT'), np.array([5, 4, 2, 6])) | ||
| assert encoded_counts + encoded_counts == EncodedCounts(list('ACGT'), np.array([8, 6, 2, 10])) | ||
@@ -16,3 +16,3 @@ import dataclasses | ||
| @dataclasses.dataclass | ||
| @bnpdataclass | ||
| class DummyClass: | ||
@@ -19,0 +19,0 @@ a: int |
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
704371
4.55%227
2.71%15982
5.6%