Latest Threat Research:SANDWORM_MODE: Shai-Hulud-Style npm Worm Hijacks CI Workflows and Poisons AI Toolchains.Details
Socket
Book a DemoInstallSign in
Socket

bionumpy

Package Overview
Dependencies
Maintainers
1
Versions
48
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

bionumpy - npm Package Compare versions

Comparing version
1.0.8
to
1.0.10
+21
bionumpy/alignments/msa.py
from bionumpy import SequenceEntry
import numpy as np
class MultipleSequenceAlignment:
def __init__(self, matrix, sequence_names):
self.matrix = matrix
self.sequence_names = sequence_names
def matrix(self):
return self.matrix
@classmethod
def from_sequence_entries(cls, entries: SequenceEntry):
sequences = entries.sequence
L = len(sequences[0])
assert np.all(sequences.lengths == L)
matrix = sequences.ravel().reshape(len(sequences), L)
return cls(matrix, entries.name)
def mask(self):
return self.matrix != '-'
import functools
from . import bnp_open
from .bnpdataclass.bnpdataclass import BNPDataClass
import inspect
try:
import typer
except ImportError:
typer = None
def check_arguments(function):
arguments = inspect.getfullargspec(function)
print(arguments)
new_annotations = {name: str if isinstance(t, BNPDataClass) else t for name, t in arguments.annotations.items()}
if BNPDataClass(arguments.annotations['return']):
pass
class CliWrapper:
'''Convert all arguments that are typed with BNPDataclass into filename options'''
def __init__(self, *args, **kwargs):
self._args = args
self._kwargs = kwargs
def __call__(self, function):
argspec = inspect.getfullargspec(function)
do_write = 'return' in argspec.annotations and issubclass(argspec.annotations['return'], BNPDataClass)
def is_bnpdataclass(name: int) -> bool:
return issubclass(argspec.annotations[name], BNPDataClass)
@functools.wraps(function)
def new_func(*args, **kwargs):
new_args = [bnp_open(arg).read() if is_bnpdataclass(argspec.args[i]) else arg for i, arg in enumerate(args)]
new_kwargs = {k: bnp_open(v).read() if is_bnpdataclass(k) else v for k, v in kwargs.items() if k != 'output'}
return_val = function(*new_args, **new_kwargs)
if do_write:
bnp_open(kwargs['output'], "w").write(return_val)
sig = inspect.signature(function)
new_parameters = [val.replace(annotation=str) if issubclass(val.annotation, BNPDataClass) else val for key, val in sig.parameters.items()]
if do_write:
new_parameters.append(inspect.Parameter('output', inspect.Parameter.KEYWORD_ONLY, annotation=str, default=None))
new_sig = sig.replace(parameters=new_parameters, return_annotation=sig.empty)
new_func.__signature__ = new_sig
# Add the new annotations to the function
annotations = {name: str if is_bnpdataclass(name) else t for name, t in argspec.annotations.items() if name != 'return'}
if do_write:
annotations['output'] = str
new_func.__annotations__ = annotations
return new_func
from bionumpy.datatypes import PairsEntry
from .delimited_buffers import DelimitedBuffer
class PairsBuffer(DelimitedBuffer):
dataclass = PairsEntry
import inspect
import bionumpy as bnp
from bionumpy.cli import CliWrapper
from .util import get_file_name
def mock_function(reads: bnp.datatypes.SequenceEntry) -> bnp.datatypes.SequenceEntry:
return reads[reads.sequence[:, 0] == 'A']
def test_cli_wrapper(data_path, tmp_path):
cli_function = CliWrapper()(mock_function)
output_filename = tmp_path / 'tmp.fq.gz'
input_filename = data_path / 'big.fq.gz'
cli_function(input_filename, output=output_filename)
assert bnp.count_entries(output_filename) < bnp.count_entries(input_filename) // 2
def test_cli_wrapper_annotations():
cli_function = CliWrapper()(mock_function)
argspec = inspect.getfullargspec(cli_function)
print(argspec)
assert argspec.annotations['reads'] == str
import pytest
from bionumpy import MultiLineFastaBuffer
import bionumpy as bnp
from bionumpy.alignments.msa import MultipleSequenceAlignment
@pytest.fixture
def mfa_obj(data_path):
return bnp.open(data_path / "example.mfa", buffer_type=MultiLineFastaBuffer).read()
@pytest.fixture()
def msa(mfa_obj):
return MultipleSequenceAlignment.from_sequence_entries(mfa_obj)
def test_from_sequence_entries(mfa_obj):
sequences = mfa_obj
msa = MultipleSequenceAlignment.from_sequence_entries(sequences)
msa.matrix.shape == (3, 5)
@pytest.mark.skip
def test_mask(msa):
assert msa.mask().shape == msa.matrix.shape
assert msa.mask().dtype == bool
assert msa.mask().sum() == 20
import bionumpy as bnp
from tempfile import NamedTemporaryFile
import numpy as np
def test_read_write_pairs_file(data_path):
file = data_path / "small.pairs"
data = bnp.open(file).read()
assert data[0].pos1 == 61
file = NamedTemporaryFile(suffix=".pairs")
with bnp.open(file.name, mode='w') as f:
f.write(data)
new_data = bnp.open(file.name).read()
assert np.all(data == new_data)
+1
-1
Metadata-Version: 2.1
Name: bionumpy
Version: 1.0.8
Version: 1.0.10
Summary: Library for working with biological sequence data as numpy arrays.

@@ -5,0 +5,0 @@ Home-page: https://github.com/bionumpy/bionumpy

@@ -8,2 +8,3 @@ HISTORY.rst

bionumpy/__init__.py
bionumpy/cli.py
bionumpy/computation_graph.py

@@ -24,2 +25,3 @@ bionumpy/config.py

bionumpy/alignments/cigar.py
bionumpy/alignments/msa.py
bionumpy/arithmetics/__init__.py

@@ -82,2 +84,3 @@ bionumpy/arithmetics/bedgraph.py

bionumpy/io/one_line_buffer.py
bionumpy/io/pairs.py
bionumpy/io/parser.py

@@ -151,2 +154,3 @@ bionumpy/io/regexp.py

tests/test_chromosome_provider.py
tests/test_cli.py
tests/test_computation_graph.py

@@ -184,5 +188,7 @@ tests/test_coordinate_mapping.py

tests/test_minimizers.py
tests/test_msa.py
tests/test_multistream.py
tests/test_mutation_types.py
tests/test_npdataclassstream.py
tests/test_pairs.py
tests/test_pandas_interface.py

@@ -189,0 +195,0 @@ tests/test_parsers.py

@@ -5,3 +5,3 @@ """Top-level package for bionumpy."""

__email__ = "knutdrand@gmail.com"
__version__ = '1.0.8'
__version__ = '1.0.10'

@@ -8,0 +8,0 @@ import npstructures as nps

@@ -48,3 +48,3 @@ from .intervals import get_boolean_mask

N = (a+b+c+d)
return a*N/((a+b)*(a+c))
return float(a*N/((a+b)*(a+c)))

@@ -76,2 +76,2 @@

N = (a+b+c+d)
return a/(N-d)
return float(a/(N-d))

@@ -5,3 +5,3 @@ import dataclasses

from collections import defaultdict
from typing import List, Type, Dict, Iterable, Union, Optional
from typing import List, Type, Dict, Iterable, Union, Optional, Any, Tuple
from numpy.typing import ArrayLike

@@ -40,3 +40,10 @@ from npstructures.npdataclasses import npdataclass, NpDataClass, shallow_tuple

def todict(self):
def todict(self) -> Dict[str, ArrayLike]:
'''
Convert the data into a dictionary with the field names as keys and the corresponding data as values.
Returns
-------
dict[str, ArrayLike]
'''
field_dict = {}

@@ -52,3 +59,10 @@ for field in dataclasses.fields(self):

def topandas(self):
def topandas(self) -> 'pandas.DataFrame':
'''
Convert the data into a pandas DataFrame with the fields as columns
Returns
-------
pandas.DataFrame
'''
return pandas_adaptor.get_data_frame(self.todict())

@@ -58,3 +72,15 @@ # return pd.DataFrame(self.todict())

@classmethod
def from_data_frame(cls, df):
def from_data_frame(cls, df: 'pandas.DataFrame') -> 'BNPDataClass':
'''
Convert a pandas DataFrame into a BNPDataClass object.
The columns of the dataframe are used as fields in the BNPDataClass object.
Parameters
----------
df: pandas.DataFrame
Returns
-------
BNPDataClass
'''
d = df.to_dict('series')

@@ -64,3 +90,13 @@ return cls.from_dict(d)

@classmethod
def from_dict(cls, dict_object: Dict) -> 'BNPDataClass':
def from_dict(cls, dict_object: Dict[str, Any]) -> 'BNPDataClass':
'''
Convert a dictionary into a BNPDataClass object. The keys of the dictionary are used as field names in the BNPDataClass object.
Parameters
----------
dict_object: dict
Returns
-------
BNPDataClass
'''
dict_names = [name.split('.')[0] for name in dict_object.keys()]

@@ -90,2 +126,3 @@ field_names = {field.name for field in dataclasses.fields(cls)}

the data, use `toiter` instead.
Returns

@@ -99,3 +136,12 @@ -------

def toiter(self):
def toiter(self) -> Iterable['dataclass']:
"""
Convert the data into an iterator of entries from the
corrsponding dataclass with normal python types.
Returns
-------
Iterable[cls.dataclass]
"""
iters = tuple(get_vanilla_generator(f)

@@ -181,9 +227,32 @@ for f in shallow_tuple(self))

@classmethod
def from_entry_tuples(cls, tuples):
def from_entry_tuples(cls, tuples: Iterable[tuple]) -> 'BNPDataClass':
return cls(*(list(c) for c in zip(*tuples)))
def sort_by(self, field_name: str) -> 'BNPDataClass':
"""
Sort the data by the given field
Parameters
----------
field_name: str
Returns
-------
BNPDataClass
"""
return self[np.argsort(getattr(self, field_name))]
def set_context(self, name, value):
def set_context(self, name: str, value: Any):
"""
Set a context value for the object, typycally used for storing auxillary information like header information
Parameters
----------
name: str
value: Any
Returns
-------
"""
if not hasattr(self, '_context'):

@@ -193,3 +262,14 @@ self._context = dict()

def get_context(self, name):
def get_context(self, name: str)->Any:
"""
Get a context value for the object, typycally used for storing auxillary information like header information
Parameters
----------
name: str
Returns
-------
"""
logger.warning(f'Deprecated method set_context in BNPDataClass')
if not hasattr(self, '_context'):

@@ -222,3 +302,3 @@ self._context = dict()

-------
npdataclass
bnpdataclass
`bnpdataclass` object that supports numpy like indexing

@@ -329,7 +409,7 @@

NewClass.__qualname__ = base_class.__qualname__
NewClass.__doc__ = dataclasses.dataclass(base_class).__doc__
return NewClass
def make_dataclass(fields: list, name: str = "DynamicDC", bases=()) -> Type[BNPDataClass]:
def make_dataclass(fields: List[Tuple], name: str = "DynamicDC", bases=()) -> Type[BNPDataClass]:
"""

@@ -353,3 +433,18 @@ Constructs a dynamic dataclass from a list of attributes

def narrow_type(bnp_dc, field_name, field_type):
def narrow_type(bnp_dc: Type[BNPDataClass], field_name: str, field_type: type):
"""
Resticts the type of a field in a BNPDataClass
Parameters
----------
bnp_dc: Type[BNPDataClass]
field_name: str
field_type: type
Returns
-------
Type[BNPDataClass]
"""
new_fields = [(f.name, field_type) if f.name==field_name else (f.name, f.type, f) for f in dataclasses.fields(bnp_dc)]

@@ -356,0 +451,0 @@ return make_dataclass(new_fields, name=bnp_dc.__name__, bases=(bnp_dc,))

@@ -14,3 +14,3 @@ from .bnpdataclass import BNPDataClass

def replace(obj, **kwargs):
def replace(obj: BNPDataClass, **kwargs) -> BNPDataClass:
'''Replace the values of a dataclass with new values

@@ -17,0 +17,0 @@

import dataclasses
import types
from functools import lru_cache
from functools import lru_cache, wraps
from numbers import Number
from typing import Type, Optional, Any
import numpy as np
from bionumpy.io.dump_csv import get_column, join_columns
from bionumpy.io.exceptions import FormatException, ParsingException
from .bnpdataclass import BNPDataClass
from ..encoded_array import EncodedRaggedArray
from ..io.dump_csv import get_column
from ..io.exceptions import FormatException, ParsingException
# from bionumpy import EncodedRaggedArray
def translate_types(input_type):
if input_type == str:
return EncodedRaggedArray
elif input_type == int:
return np.ndarray
def buffer_backed_bnp(old_cls):
cls = types.new_class(old_cls.__name__, old_cls.__bases__, {})
for i, (var_name, var_type) in enumerate(old_cls.__annotations__.items()):
setattr(cls, var_name, BufferBackedDescriptor(i, var_type))
setattr(cls, '__init__', lambda self, buffer: setattr(self, '_buffer', buffer))
return cls
class BufferBackedDescriptor:
'''
This class is made to access and parse parts of a text buffer lazily.
'''
def __init__(self, buffer, index, dtype):
self._buffer = buffer
self._index = index
self._dtype = dtype
def __get__(self, obj, objtype):
return self._dtype(obj._buffer.get_field_by_number(self._index, self._dtype))
class LazyBNPDataClass:

@@ -48,12 +19,2 @@ pass

class BaseClass:
def __init__(self, buffer):
self._buffer = buffer
def __getattr__(self, var_name):
if var_name in self._buffer:
return self._buffer[var_name]
return super().__getattr__(var_name)
class ItemGetter:

@@ -92,6 +53,22 @@ def __init__(self, buffer: 'FileBuffer', dataclass: dataclasses.dataclass, start_line=0):

def create_lazy_class(dataclass, header=None):
def create_lazy_class(dataclass: Type[BNPDataClass], header: Optional[Any] = None) -> Type[BNPDataClass]:
'''
Creates a dataclass that emulates the given BNPDataClass but with lazy loading of fields
Parameters
----------
dataclass
header
Returns
-------
'''
field_names = [field.name for field in dataclasses.fields(dataclass)]
class NewClass(dataclass, LazyBNPDataClass):
"""
A class that lazily loads fields from a buffer
"""
def __init__(self, item_getter, set_values=None, computed_values=None):

@@ -108,18 +85,24 @@ self._itemgetter = item_getter

@classmethod
def from_data_frame(cls, df):
@wraps(dataclass.from_data_frame)
def from_data_frame(cls, df: 'pd.DataFrame') -> 'NewClass':
return dataclass.from_data_frame(df)
@classmethod
@wraps(dataclass.from_dict)
def from_dict(cls, d):
return dataclass.from_dict(d)
@wraps(dataclass.toiter)
def toiter(self):
return self.get_data_object().toiter()
@wraps(dataclass.tolist)
def tolist(self):
return self.get_data_object().tolist()
@wraps(dataclass.todict)
def todict(self):
return self.get_data_object().todict()
@wraps(dataclass.topandas)
def topandas(self):

@@ -179,3 +162,11 @@ return self.get_data_object().topandas()

def get_data_object(self):
def get_data_object(self) -> BNPDataClass:
"""
Returns the BNPDataClass with all fields loaded
Returns
-------
BNPDataClass
"""
if not self._computed:

@@ -196,3 +187,3 @@ fields = [getattr(self, field.name) for field in dataclasses.fields(dataclass)]

if hasattr(values[0]._itemgetter.buffer, 'concatenate'):
#if all(not a._set_values for a in values):
# if all(not a._set_values for a in values):
set_values = {name: np.concatenate([a._set_values[name] for a in values])

@@ -210,3 +201,3 @@ for name in self._set_values}

def get_buffer(self, buffer_class=None):
def get_buffer(self, buffer_class=None) -> EncodedRaggedArray:
if buffer_class is None:

@@ -216,3 +207,3 @@ buffer_class = self._itemgetter.buffer.__class__

'SKIP_LAZY') or hasattr(
buffer_class, 'SKIP_LAZY'):
buffer_class, 'SKIP_LAZY'):
return self._itemgetter.buffer.from_data(self.get_data_object())

@@ -219,0 +210,0 @@ columns = []

@@ -158,22 +158,2 @@ from ..typing import SequenceID

class SortedIntervals:
def __init__(self, data):
self.data = np.asanyarray(data)
assert data.shape[-1] == 2
assert len(data.shape) == 2
self.starts = self.data[..., 0]
self.ends = self.data[..., 1]
def in_intervals(self, position):
idx = np.minimum(
np.searchsorted(self.starts, position, side="left"), self.starts.size - 1
)
return (position >= self.starts[idx]) & (position < self.ends[idx])
@classmethod
def concatenate(cls, elements):
return cls(np.vstack([element.data for element in elements]))
@bnpdataclass

@@ -218,1 +198,14 @@ class SAMEntry:

directions: List[int]
@bnpdataclass
class PairsEntry:
"""https://pairtools.readthedocs.io/en/latest/formats.html"""
read_id: str
chrom1: SequenceID
pos1: int
chrom2: SequenceID
pos2: int
strand1: StrandEncoding
strand2: StrandEncoding

@@ -20,3 +20,2 @@ from ..bnpdataclass import bnpdataclass, BNPDataClass

atributes: str
#attributes: BNPDataClass

@@ -27,3 +26,2 @@ def _get_attributes(self, attribute_names):

for name in attribute_names}
# ends_in_sep = gtf_entries.atributes[:, -1] ==
self.atributes[:, -1] = " "

@@ -30,0 +28,0 @@ all_features = split(self.atributes.ravel(), " ")

@@ -11,3 +11,3 @@ """

from npstructures.mixin import NPSArray
from typing import Tuple, List
from typing import Tuple, List, Union
import numpy as np

@@ -40,2 +40,5 @@ from abc import abstractmethod

class OneToOneEncoding(Encoding):
"""Represents encodings that are one-to-one, i.e. where each element is
encoded to one element and vice versa. This class is meant to be subclassed
when implementing specific encodings."""

@@ -233,2 +236,3 @@ def encode(self, data):

def get_NPSArray(array):

@@ -253,2 +257,4 @@ return array.view(NPSArray)

Parameters

@@ -261,3 +267,10 @@ ----------

Examples
--------
>>> import bionumpy as bnp
>>> import numpy as np
>>> print(EncodedArray(np.array([0, 1, 2, 3]), bnp.DNAEncoding))
ACGT
"""
if isinstance(data, EncodedArray):

@@ -274,6 +287,6 @@ assert data.encoding == encoding

@property
def T(self):
def T(self) -> "EncodedArray":
return self.__class__(self.data.T, self.encoding)
def copy(self):
def copy(self) -> 'EncodedArray':
return self.__class__(self.data.copy(), self.encoding)

@@ -287,6 +300,9 @@

def tolist(self):
def tolist(self) -> str:
"""Converts the data to a string by decoding the data.
This behaviour is compatible with NumPy's scalar behaviour (only a single element)"""
return self.to_string()
def to_string(self) -> str:
"""Converts the data to a string by decoding the data"""
if not self.encoding.is_one_to_one_encoding():

@@ -381,3 +397,3 @@ return self.encoding.to_string(self.data)

def __hash__(self):
def __hash__(self) -> int:
if len(self.shape) <= 1:

@@ -480,3 +496,3 @@ return hash(self.to_string())

def ravel(self):
def ravel(self) -> "EncodedArray":
return self.__class__(self.data.ravel(), self.encoding)

@@ -633,4 +649,10 @@

--------
5
>>> import bionumpy as bnp
>>> encoded_array = bnp.DNAEncoding.encode("ACGT")
>>> print(from_encoded_array(encoded_array))
ACGT
>>> encoded_array = bnp.DNAEncoding.encode(["ACGT", "ACGT"])
>>> print(from_encoded_array(encoded_array))
['ACGT', 'ACGT']
"""

@@ -643,3 +665,32 @@ if isinstance(encoded_array, EncodedRaggedArray):

def change_encoding(encoded_array, new_encoding):
def change_encoding(encoded_array: Union[EncodedArray, EncodedRaggedArray], new_encoding: Encoding) \
-> Union[EncodedArray, EncodedRaggedArray]:
"""
Changes the encoding of an `EncodedArray` or `EncodedRaggedArray` by decoding the data and
encoding it again with the new encoding.
Parameters:
-----------
encoded_array : EncodedArray/EncodedRaggedArray
The data to change encoding on
new_encoding : Encoding
The new encoding to use
Returns:
--------
EncodedArray/EncodedRaggedArray
The data with the new encoding
Examples
--------
>>> import bionumpy as bnp
>>> encoded_array = bnp.as_encoded_array("ACGT", bnp.DNAEncoding)
>>> encoded_array.raw()
array([0, 1, 2, 3], dtype=uint8)
>>> new_encoding = bnp.BaseEncoding
>>> new_encoded_array = change_encoding(encoded_array, new_encoding)
>>> new_encoded_array.raw()
array([65, 67, 71, 84], dtype=uint8)
"""
assert isinstance(encoded_array, (EncodedArray, EncodedRaggedArray)), \

@@ -646,0 +697,0 @@ "Can only change encoding of EncodedArray or EncodedRaggedArray"

@@ -11,14 +11,2 @@ from ..encoded_array import BaseEncoding, Encoding, NumericEncoding

# class StrandEncoding(Encoding):
# MIN_CODE = ord("+")
#
# @classmethod
# def encode(cls, bytes_array):
# return (bytes_array & np.uint8(2)) >> np.uint8(1)
#
# @classmethod
# def decode(cls, strands):
# return 2 * strands + cls.MIN_CODE
class DigitEncodingFactory(NumericEncoding):

@@ -44,9 +32,2 @@ def __init__(self, min_code):

def set_backend(lib):
#from ..cupy_compatible.encodings.alphabet_encoding import CPAlphabetEncoding
#from ..cupy_compatible.encodings.alphabet_encoding import CPACTGEncoding
#from ..cupy_compatible.encodings.alphabet_encoding import CPAminoAcidEncoding
#sys.modules[__name__].AlphabetEncoding = CPAlphabetEncoding
#sys.modules[__name__].ACTGEncoding = CPACTGEncoding
#sys.modules[__name__].AminoAcidEncoding = CPAminoAcidEncoding

@@ -53,0 +34,0 @@ from . import base_encoding

@@ -52,61 +52,61 @@ import numpy as np

class ACTGTwoBitEncoding:
letters = ["A", "C", "T", "G"]
bitcodes = ["00", "01", "10", "11"]
reverse = np.array([1, 3, 20, 7], dtype=np.uint8)
_lookup_2bytes_to_4bits = np.zeros(256 * 256, dtype=np.uint8)
_lookup_2bytes_to_4bits[
256 * reverse[np.arange(4)[:, None]] + reverse[np.arange(4)]
] = np.arange(4)[:, None] * 4 + np.arange(4)
_shift_4bits = 4 * np.arange(2, dtype=np.uint8)
_shift_2bits = 2 * np.arange(4, dtype=np.uint8)
# class ACTGTwoBitEncoding:
# letters = ["A", "C", "T", "G"]
# bitcodes = ["00", "01", "10", "11"]
# reverse = np.array([1, 3, 20, 7], dtype=np.uint8)
# _lookup_2bytes_to_4bits = np.zeros(256 * 256, dtype=np.uint8)
# _lookup_2bytes_to_4bits[
# 256 * reverse[np.arange(4)[:, None]] + reverse[np.arange(4)]
# ] = np.arange(4)[:, None] * 4 + np.arange(4)
# _shift_4bits = 4 * np.arange(2, dtype=np.uint8)
# _shift_2bits = 2 * np.arange(4, dtype=np.uint8)
#
# @classmethod
# def convert_2bytes_to_4bits(cls, two_bytes):
# assert two_bytes.dtype == np.uint16, two_bytes.dtype
# return cls._lookup_2bytes_to_4bits[two_bytes]
#
# @classmethod
# def join_4bits_to_byte(cls, four_bits):
# return np.sum(four_bits << cls._shift_4bits, axis=1, dtype=np.uint8)
#
# @classmethod
# def complement(cls, char):
# complements = np.packbits([1, 0, 1, 0, 1, 0, 1, 0])
# dtype = char.dtype
# return (char.view(np.uint8) ^ complements).view(dtype)
#
# @classmethod
# def encode(cls, sequence):
# if sequence.size % 16 != 0:
# sequence = np.append(
# sequence, np.empty(16 - (sequence.size % 16), dtype=np.uint8)
# )
#
# assert sequence.dtype == np.uint8
# assert sequence.size % 4 == 0, sequence.size
# sequence = sequence & 31
# four_bits = cls.convert_2bytes_to_4bits(sequence.view(np.uint16))
# codes = cls.join_4bits_to_byte(four_bits.reshape(-1, 2))
# assert codes.dtype == np.uint8, codes.dtype
# return codes.flatten().view(np.uint8)
#
# @classmethod
# def from_string(cls, string):
# byte_repr = np.array([ord(c) for c in string], dtype=np.uint8)
# return cls.encode(byte_repr)
#
# @classmethod
# def to_string(cls, bits):
# byte_repr = cls.decode(bits)
# return "".join(chr(b) for b in byte_repr)
#
# @classmethod
# def decode(cls, sequence):
# assert sequence.dtype == np.uint8
# bit_mask = np.uint8(3) # last two bits
# all_bytes = (sequence[:, None] >> cls._shift_2bits) & bit_mask
# return cls.reverse[all_bytes.flatten()] + 96
@classmethod
def convert_2bytes_to_4bits(cls, two_bytes):
assert two_bytes.dtype == np.uint16, two_bytes.dtype
return cls._lookup_2bytes_to_4bits[two_bytes]
@classmethod
def join_4bits_to_byte(cls, four_bits):
return np.sum(four_bits << cls._shift_4bits, axis=1, dtype=np.uint8)
@classmethod
def complement(cls, char):
complements = np.packbits([1, 0, 1, 0, 1, 0, 1, 0])
dtype = char.dtype
return (char.view(np.uint8) ^ complements).view(dtype)
@classmethod
def encode(cls, sequence):
if sequence.size % 16 != 0:
sequence = np.append(
sequence, np.empty(16 - (sequence.size % 16), dtype=np.uint8)
)
assert sequence.dtype == np.uint8
assert sequence.size % 4 == 0, sequence.size
sequence = sequence & 31
four_bits = cls.convert_2bytes_to_4bits(sequence.view(np.uint16))
codes = cls.join_4bits_to_byte(four_bits.reshape(-1, 2))
assert codes.dtype == np.uint8, codes.dtype
return codes.flatten().view(np.uint8)
@classmethod
def from_string(cls, string):
byte_repr = np.array([ord(c) for c in string], dtype=np.uint8)
return cls.encode(byte_repr)
@classmethod
def to_string(cls, bits):
byte_repr = cls.decode(bits)
return "".join(chr(b) for b in byte_repr)
@classmethod
def decode(cls, sequence):
assert sequence.dtype == np.uint8
bit_mask = np.uint8(3) # last two bits
all_bytes = (sequence[:, None] >> cls._shift_2bits) & bit_mask
return cls.reverse[all_bytes.flatten()] + 96
class ACTGEncoding:

@@ -129,29 +129,29 @@ _lookup_byte_to_2bits = np.zeros(256, dtype=np.uint8)

class SimpleEncoding(ACTGTwoBitEncoding):
_lookup_byte_to_2bits = np.zeros(256, dtype=np.uint8)
_lookup_byte_to_2bits[[97, 65]] = 0
_lookup_byte_to_2bits[[99, 67]] = 1
_lookup_byte_to_2bits[[116, 84]] = 2
_lookup_byte_to_2bits[[103, 71]] = 3
# class SimpleEncoding(ACTGTwoBitEncoding):
# _lookup_byte_to_2bits = np.zeros(256, dtype=np.uint8)
# _lookup_byte_to_2bits[[97, 65]] = 0
# _lookup_byte_to_2bits[[99, 67]] = 1
# _lookup_byte_to_2bits[[116, 84]] = 2
# _lookup_byte_to_2bits[[103, 71]] = 3
#
# _shift_2bits = 2 * np.arange(4, dtype=np.uint8)
#
# @classmethod
# def convert_byte_to_2bits(cls, one_byte):
# assert one_byte.dtype == np.uint8, one_byte.dtype
# return cls._lookup_byte_to_2bits[one_byte]
#
# @classmethod
# def join_2bits_to_byte(cls, two_bits_vector):
# return np.bitwise_or.reduce(two_bits_vector << cls._shift_2bits, axis=-1)
#
# @classmethod
# def encode(cls, sequence):
# assert sequence.dtype == np.uint8
# assert sequence.size % 4 == 0, sequence.size
# two_bits = cls.convert_byte_to_2bits(sequence)
# codes = cls.join_2bits_to_byte(two_bits.reshape(-1, 4))
# return codes.flatten()
_shift_2bits = 2 * np.arange(4, dtype=np.uint8)
@classmethod
def convert_byte_to_2bits(cls, one_byte):
assert one_byte.dtype == np.uint8, one_byte.dtype
return cls._lookup_byte_to_2bits[one_byte]
@classmethod
def join_2bits_to_byte(cls, two_bits_vector):
return np.bitwise_or.reduce(two_bits_vector << cls._shift_2bits, axis=-1)
@classmethod
def encode(cls, sequence):
assert sequence.dtype == np.uint8
assert sequence.size % 4 == 0, sequence.size
two_bits = cls.convert_byte_to_2bits(sequence)
codes = cls.join_2bits_to_byte(two_bits.reshape(-1, 4))
return codes.flatten()
def twobit_swap(number):

@@ -158,0 +158,0 @@ dtype = number.dtype

@@ -0,1 +1,3 @@

from typing import List
import numpy as np

@@ -7,2 +9,7 @@ from ..encoded_array import OneToOneEncoding

class AlphabetEncoding(OneToOneEncoding):
"""
Encoding for an alphabet. The encoding is one-to-one and the alphabet is
defined by the input string. The encoding is case-insensitive.
"""
def __init__(self, alphabet: str):

@@ -45,15 +52,32 @@ self._raw_alphabet = [c.upper() for c in alphabet]

array = np.asarray(encoded)
# assert np.issubdtype(array.dtype, int), (encoded, array)
return self._alphabet[array]
@property
def alphabet_size(self):
def alphabet_size(self)->int:
"""
Get the size of the alphabet
Returns
-------
int
The size of the alphabet
"""
self._initialize()
return self._alphabet.size
def get_alphabet(self):
def get_alphabet(self)-> List[str]:
"""
Get the alphabet
Returns
-------
list[str]
The alphabet
"""
self._initialize()
return [chr(c) for c in self._alphabet]
def get_labels(self):
def get_labels(self) -> List[str]:
return self.get_alphabet()

@@ -60,0 +84,0 @@

@@ -0,1 +1,3 @@

from typing import Union, List
from . import AlphabetEncoding

@@ -10,2 +12,6 @@ from ..encoded_array import Encoding

class KmerEncoding(Encoding):
"""
Encodes kmers of a given length using an alphabet encoding.
"""
def __init__(self, alphabet_encoding: AlphabetEncoding, k: int):

@@ -17,6 +23,19 @@ assert is_subclass_or_instance(alphabet_encoding, AlphabetEncoding), alphabet_encoding

@property
def k(self):
def k(self) -> int:
return self._k
def encode(self, data):
def encode(self, data: Union[str, list, EncodedRaggedArray]) -> Union[EncodedArray, EncodedRaggedArray]:
"""
Encodes a string, list of strings or EncodedRaggedArray into an EncodedArray or
EncodedRaggedArray of hashed kmers.
Parameters
----------
data: Union[str, list, EncodedRaggedArray]
Returns
-------
Union[EncodedArray, EncodedRaggedArray]
"""
if isinstance(data, str):

@@ -26,3 +45,3 @@ assert len(data) == self.k

return EncodedArray(
letters.dot(self._alphabet_encoding.alphabet_size**np.arange(self._k)),
letters.dot(self._alphabet_encoding.alphabet_size ** np.arange(self._k)),
self)

@@ -35,8 +54,7 @@ if isinstance(data, (list, EncodedRaggedArray)):

return EncodedArray(
letters.dot(self._alphabet_encoding.alphabet_size**np.arange(self._k)),
letters.dot(self._alphabet_encoding.alphabet_size ** np.arange(self._k)),
self)
print(data, type(data))
raise NotImplementedError
def to_string(self, kmer):
def to_string(self, kmer: int) -> str:
"""

@@ -52,3 +70,3 @@ Returns a human-readable string representation

n = self._alphabet_encoding.alphabet_size
tmp = (kmer//n**np.arange(self._k)) % n
tmp = (kmer // n ** np.arange(self._k)) % n

@@ -59,5 +77,5 @@ chars = EncodedArray(tmp, self._alphabet_encoding)

def get_labels(self):
def get_labels(self) -> List[str]:
assert self._k <= 8, "Only supported for k <= 5"
return [self.to_string(kmer) for kmer in range(self._alphabet_encoding.alphabet_size**self._k)]
return [self.to_string(kmer) for kmer in range(self._alphabet_encoding.alphabet_size ** self._k)]

@@ -64,0 +82,0 @@ def __str__(self):

@@ -0,1 +1,3 @@

from typing import Optional, List, Union
import numpy as np

@@ -9,3 +11,6 @@ from ..encoded_array import Encoding, as_encoded_array, EncodedArray, EncodedRaggedArray, encoded_array_from_nparray

class StringEncoding(Encoding):
def __init__(self, sequences, modulo=None):
"""
Encodes strings into a numeric value corresponding to the index of the string in the input list.
"""
def __init__(self, sequences: List[str], modulo: Optional[int] = None):
self._seqeunces = as_encoded_array(sequences)

@@ -15,9 +20,36 @@ self._modulo = modulo

def get_labels(self):
def get_labels(self) -> List[str]:
return self._seqeunces.tolist()
def to_string(self, n):
def to_string(self, n: int) -> str:
"""
Get the string corresponding to the index n.
Parameters
----------
n: int
Returns
-------
str
The string corresponding to the index n.
"""
return self._seqeunces[int(n)].to_string()
def encode(self, encoded_ragged_array):
def encode(self, encoded_ragged_array: Union[EncodedRaggedArray, StringArray, List[str]]) -> Union[EncodedArray, EncodedRaggedArray]:
"""
Encode a string, list of strings or EncodedRaggedArray into an EncodedArray or EncodedRaggedArray of hashed strings.
Parameters
----------
encoded_ragged_array: Union[EncodedRaggedArray, StringArray, list[str]]
The input data to encode.
Returns
-------
Union[EncodedArray, EncodedRaggedArray]
The encoded data.
"""
if isinstance(encoded_ragged_array, StringArray):

@@ -38,3 +70,15 @@ pass # encoded_ragged_array = encoded_array_from_nparray(encoded_ragged_array)

def decode(self, encoded_array):
def decode(self, encoded_array: Union[EncodedArray, np.ndarray]) -> Union[str, List[str]]:
"""
Decode an EncodedArray or np.ndarray into a string or list of strings.
Parameters
----------
encoded_array
Returns
-------
"""
if isinstance(encoded_array, EncodedArray):

@@ -41,0 +85,0 @@ data = encoded_array.raw()

@@ -62,3 +62,2 @@ import itertools

genotype_rows = EncodedArray(np.zeros((0, 3)), BaseEncoding)
# genotype_rows = as_encoded_array(genotype_rows)
data = genotype_rows.ravel()

@@ -70,4 +69,2 @@ # hack because the row sometime ends with \n and sometimes with \t

return data[indices[:-1, np.newaxis] + np.array([1, 2, 3])]
#data = split(data.ravel(), "\t")[:-1, 0:3] # don't include last element which is empty
#return data

@@ -81,3 +78,3 @@ def decode(self, genotype):

genotype = genotype.raw()
decoded = self.decode_lookup()[genotype].reshape(new_shape)# genotype.shape[0], genotype.shape[1]*4)
decoded = self.decode_lookup()[genotype].reshape(new_shape)
# remove last tab

@@ -84,0 +81,0 @@ return decoded[..., :-1]

@@ -42,2 +42,5 @@ from ..datatypes import GTFEntry

def __repr__(self):
return f'GenomicAnnotation(genome_context={self._genome_context}, data={self._data})'
@property

@@ -44,0 +47,0 @@ def genes(self) -> Genes:

@@ -40,3 +40,16 @@ import logging

def with_ignored_added(self, ignored):
def with_ignored_added(self, ignored: Iterable[str]) -> 'GenomeContext':
'''
Make a new GenomeContext with additional ignored chromosomes. This is useful for allowing but ignoring
chromosome names that are not in the origin genome
Parameters
----------
ignored: Iterable[str]
Returns
-------
GenomeContext
'''
c = self._original_chrom_sizes.copy()

@@ -43,0 +56,0 @@ c.update({name: 0 for name in ignored})

import os
import numpy as np
from typing import Dict
from typing import Dict, List, Optional
from pathlib import PurePath

@@ -39,7 +39,39 @@ from ..io import bnp_open, Bed6Buffer, BedBuffer, open_indexed

def with_ignored_added(self, ignored):
def with_ignored_added(self, ignored: List[str]) -> 'Genome':
'''
Make a new GenomeContext with additional ignored chromosomes. This is useful for allowing but ignoring
chromosome names that are not in the origin genome.
Parameters
----------
ignored: Iterable[str]
Returns
-------
Genome
'''
return self.__class__(self._genome_context.with_ignored_added(ignored), self._fasta_filename)
@classmethod
def from_dict(cls, chrom_sizes: Dict[str, int], *args, **kwargs):
def from_dict(cls, chrom_sizes: Dict[str, int], *args, **kwargs) -> 'Genome':
'''
Create a Genome object from a dictionary of chromosome sizes
Parameters
----------
chrom_sizes: Dict[str, int]
args: Additional args to be passed to the Genome constructor
kwargs: Additional kwargs to be passed to the Genome constructor
Returns
-------
Genome
Examples
--------
>>> import bionumpy as bnp
>>> bnp.Genome.from_dict({'chr1': 1000, 'chr2': 2000})
Genome(['chr1', 'chr2'])
'''
return cls(chrom_sizes, *args, **kwargs)

@@ -67,2 +99,8 @@

Examples
--------
>>> import bionumpy as bnp
>>> bnp.Genome.from_file('example_data/hg38.chrom.sizes')
Genome(['chr1', 'chr2', 'chr3', 'chr4', 'chr5', 'chr6', 'chr7', 'chr8', 'chr9', 'chr10', '...'])
"""

@@ -101,2 +139,12 @@ path = PurePath(filename)

GenomicArray
Examples
--------
>>> import bionumpy as bnp
>>> bedgraph = bnp.datatypes.BedGraph(chromosome=['chr1', 'chr1', 'chr2'], start=[0, 10, 0], stop=[5, 15, 5], value=[1, 2, 3])
>>> genome = bnp.Genome.from_dict({'chr1': 20, 'chr2': 10})
>>> genome.get_track(bedgraph)
chr1: [1 1 1 1 1 0 0 0 0 0 2 2 2 2 2 0 0 0 0 0]
chr2: [3 3 3 3 3 0 0 0 0 0]
"""

@@ -119,2 +167,15 @@ bedgraph = self._mask_data_on_extra_chromosomes(bedgraph)

Returns
-------
GenomicArray
Examples
--------
>>> import bionumpy as bnp
>>> genome = bnp.Genome.from_dict({'chr1': 30000, 'chr2': 31000, 'chr3': 32000})
>>> genome.read_track('example_data/small_treat_pileup.bdg')
chr1: [ 0.0 0.0 1.0 ... 0.0 0.0 0.0]
chr2: [ 0.0 0.0 0.0 ... 0.0 0.0 0.0]
chr3: [ 0.0 0.0 0.0 ... 0.0 0.0 0.0]
"""

@@ -137,2 +198,16 @@ content = self._open(filename, stream)

GenomicIntervals
Examples
--------
>>> import bionumpy as bnp
>>> intervals = bnp.Interval(chromosome=['chr1', 'chr1', 'chr2'], start=[0, 10, 0], stop=[5, 15, 5])
>>> genome = bnp.Genome.from_dict({'chr1': 20, 'chr2': 10})
>>> genome.get_intervals(intervals)
Genomic Intervals on ['chr1', 'chr2']:
Interval with 3 entries
chromosome start stop
chr1 0 5
chr1 10 15
chr2 0 5
"""

@@ -159,2 +234,22 @@ return GenomicIntervals.from_intervals(intervals, self._genome_context, is_stranded=stranded)

GenomicIntervals
Examples
--------
>>> import bionumpy as bnp
>>> genome = bnp.Genome.from_file('example_data/small_sequence.fa')
>>> genome.read_intervals('example_data/small_summits.bed')
Genomic Intervals on ['chr1', 'chr2', 'chr3']:
Interval with 13 entries
chromosome start stop
chr1 639 640
chr1 6023 6024
chr1 7124 7125
chr2 849 850
chr2 6320 6321
chr2 8483 8484
chr2 11342 11343
chr2 12527 12528
chr2 13092 13093
chr2 18943 18944
"""

@@ -173,3 +268,2 @@ path = PurePath(filename)

return self.get_intervals(content, stranded)
# return GenomicIntervals.from_intervals(content, self._chrom_sizes)

@@ -199,4 +293,25 @@ def read_locations(self, filename: str, stranded: bool = False, stream: bool = False, has_numeric_chromosomes=False, buffer_type=None) -> GenomicLocation:

Examples
--------
>>> import bionumpy as bnp
>>> genome = bnp.Genome.from_file('example_data/hg38.chrom.sizes')
>>> genome.read_locations('example_data/thousand_genomes.vcf', has_numeric_chromosomes=True)
Genomic Locations on ['chr1', 'chr2', 'chr3', 'chr4', 'chr5', 'chr6', 'chr7', 'chr8', 'chr9', 'chr10', '...']:
LocationEntry with 74 entries
chromosome position
chr21 5033883
chr21 5035657
chr21 5038297
chr21 5038312
chr21 5052250
chr21 5053935
chr21 5053961
chr21 5063903
chr21 5063916
chr21 5064678
"""
assert not (stream and has_numeric_chromosomes)
assert not stranded, "Stranded locations are not supported yet"
f = bnp_open(filename, buffer_type=buffer_type)

@@ -233,2 +348,15 @@ data = f.read_chunks()

Examples
--------
>>> import bionumpy as bnp
>>> genome = bnp.Genome.from_file('example_data/small_sequence.fa')
>>> genome.get_locations(bnp.datatypes.LocationEntry(chromosome=['chr1', 'chr1', 'chr2'], position=[0, 10, 0]))
Genomic Locations on ['chr1', 'chr2', 'chr3']:
LocationEntry with 3 entries
chromosome position
chr1 0
chr1 10
chr2 0
"""

@@ -240,3 +368,3 @@ if has_numeric_chromosomes:

def read_sequence(self, filename: str = None) -> GenomicSequence:
def read_sequence(self, filename: Optional[str] = None) -> GenomicSequence:
"""Read the genomic sequence from file.

@@ -255,2 +383,11 @@

Examples
--------
>>> import bionumpy as bnp
>>> genome = bnp.Genome.from_file('example_data/small_sequence.fa')
>>> genome.read_sequence()
GenomicSequence over chromosomes: ['chr1', 'chr2', 'chr3']
>>> genome = bnp.Genome.from_file('example_data/small.chrom.sizes')
>>> genome.read_sequence('example_data/small_sequence.fa')
GenomicSequence over chromosomes: ['chr1', 'chr2', 'chr3']
"""

@@ -279,2 +416,16 @@

Examples
--------
>>> import bionumpy as bnp
>>> genome = bnp.Genome.from_file('example_data/small_sequence.fa')
>>> genome.read_annotation('example_data/small.gtf')
GenomicAnnotation(genome_context=['chr1', 'chr2', 'chr3'], data=GTFEntry with 5 entries
chromosome source feature_type start stop score strand phase atributes
chr1 knownGene transcript 17369 17436 . - . gene_id "ENST0000061921
chr1 knownGene exon 17369 17436 . - . gene_id "ENST0000061921
chr1 knownGene transcript 29554 31097 . + . gene_id "ENST0000047335
chr1 knownGene exon 29554 30039 . + . gene_id "ENST0000047335
chr1 knownGene exon 30564 30667 . + . gene_id "ENST0000047335)
"""

@@ -293,8 +444,15 @@

def get_genome_context(self):
def get_genome_context(self) -> GenomeContext:
'''
Get the genome context of the Genome
Returns
-------
GenomeContext
'''
return self._genome_context
@property
def size(self):
def size(self) -> int:
'''The size of the genome'''
return self._genome_context.size

@@ -5,5 +5,6 @@ from abc import ABC, abstractmethod, abstractproperty, abstractclassmethod

import numpy as np
from typing import List, Iterable, Tuple, Dict
from typing import List, Iterable, Tuple, Dict, Any, Optional
from .coordinate_mapping import map_locations, find_indices
from .. import EncodedArray
from ..bnpdataclass import BNPDataClass, replace, bnpdataclass

@@ -17,2 +18,3 @@ from .genomic_track import GenomicArray, GenomicArrayNode

from ..string_array import StringArray
from ..util.typing import EncodedArrayLike

@@ -25,10 +27,11 @@

@abstractproperty
def get_location(self, where='start'):
@property
@abstractmethod
def get_location(self, where='start') -> 'GenomicLocation':
return NotImplemented
def get_data_field(self, field_name: str):
def get_data_field(self, field_name: str) -> Any:
return NotImplemented
def set_strand(self, strand):
def set_strand(self, strand: str):
self._is_stranded = True

@@ -41,14 +44,18 @@ self._strand = strand

@abstractproperty
@property
@abstractmethod
def chromosome(self):
return NotImplemented
@abstractproperty
@property
@abstractmethod
def position(self):
return NotImplemented
@abstractproperty
@property
@abstractmethod
def strand(self):
return NotImplemented
@property
@abstractmethod

@@ -59,3 +66,4 @@ def is_stranded(self):

@classmethod
def from_fields(cls, genome_context: GenomeContextBase, chromosome: List[str], position: List[int], strand: List[str] = None) -> 'GenomicLocation':
def from_fields(cls, genome_context: GenomeContextBase, chromosome: List[str], position: List[int],
strand: List[str] = None) -> 'GenomicLocation':
"""Create genomic location from a genome context and the needed fields (chromosome and position)

@@ -117,3 +125,3 @@

"""
assert all(hasattr(data, name) for name in (chromosome_name, position_name))

@@ -128,6 +136,7 @@ if is_stranded:

class GenomicLocationGlobal(GenomicLocation):
class GenomicLocationGlobal(GenomicLocation, ABC):
''' Class for genomic locations that are kept entirely in memory'''
def __init__(self, locations: BNPDataClass, genome_context: GenomeContextBase, is_stranded: bool, field_dict: Dict[str, str]):
def __init__(self, locations: BNPDataClass, genome_context: GenomeContextBase, is_stranded: bool,
field_dict: Dict[str, str]):
self._locations = locations

@@ -138,20 +147,59 @@ self._genome_context = genome_context

def __repr__(self):
return f'Genomic Locations on {self._genome_context}:\n{self._locations.astype(LocationEntry)}'
@property
def data(self):
def data(self) -> BNPDataClass:
'''
Return the underlying data as a bnpdataclass
Returns
-------
BNPDataClass
'''
return self._locations
def __replace__(self, **kwargs):
'''
Replace fields in the locations, used internally by bnp.replace
'''
kwargs = {self._field_dict[kw]: value for kw, value in kwargs.items()}
return self.__class__(replace(self._locations, **kwargs), self._genome_context, self._is_stranded, self._field_dict)
return self.__class__(replace(self._locations, **kwargs), self._genome_context, self._is_stranded,
self._field_dict)
@property
def chromosome(self):
def chromosome(self) -> StringArray:
'''
Return the chromosome of the locations
Returns
-------
StringArray
'''
return getattr(self._locations, self._field_dict['chromosome'])
@property
def position(self):
def position(self) -> np.ndarray:
"""
Return the position of the locations
Returns
-------
np.ndarray
"""
return getattr(self._locations, self._field_dict['position'])
@property
def strand(self):
def strand(self) -> EncodedArrayLike:
"""
The strand of the locations
Returns
-------
EncodedArrayLike
"""
if not self.is_stranded():

@@ -161,6 +209,14 @@ raise ValueError('Unstranded position has not strand')

def is_stranded(self):
def is_stranded(self) -> bool:
'''
Return whether the locations are stranded
Returns
-------
bool
'''
return self._is_stranded
def get_windows(self, flank: int = None, window_size: int = None) -> 'GenomicIntervals':
def get_windows(self, flank: Optional[int] = None, window_size: Optional[int] = None) -> 'GenomicIntervals':
"""Create windows around the locations.

@@ -188,10 +244,10 @@

assert window_size is not None
l_flank = window_size//2
r_flank = window_size//2 + window_size % 2
l_flank = window_size // 2
r_flank = window_size // 2 + window_size % 2
if self.is_stranded():
intervals = StrandedInterval(self.chromosome, self.position-l_flank,
self.position+r_flank, self.strand)
intervals = StrandedInterval(self.chromosome, self.position - l_flank,
self.position + r_flank, self.strand)
else:
intervals = Interval(self.chromosome, self.position-l_flank,
self.position+r_flank)
intervals = Interval(self.chromosome, self.position - l_flank,
self.position + r_flank)
return GenomicIntervalsFull(intervals, self._genome_context,

@@ -213,6 +269,31 @@ is_stranded=self.is_stranded()).clip()

def __getitem__(self, idx):
def __getitem__(self, idx: Any)-> 'GenomicLocation':
'''
Get a subset of the locations
Parameters
----------
idx: numpy index like
Returns
-------
GenomicLocation
'''
return self.__class__(self._locations[idx], self._genome_context, self._is_stranded, self._field_dict)
def get_data_field(self, field_name: str):
def get_data_field(self, field_name: str) -> Any:
'''
Get a field from the undelying data
Parameters
----------
field_name: str
Returns
-------
Any
The field from the underlying data
'''
return getattr(self._locations, field_name)

@@ -238,3 +319,4 @@

def __init__(self, data_node: Node, genome_context: GenomeContextBase, is_stranded=False, field_dict: Dict[str, str]=None):
def __init__(self, data_node: Node, genome_context: GenomeContextBase, is_stranded=False,
field_dict: Dict[str, str] = None):
if field_dict is None:

@@ -252,21 +334,61 @@ field_dict = {name: name for name in ['chromosome', 'positions', 'strand']}

def is_stranded(self):
def is_stranded(self) -> bool:
return self._is_stranded
def sorted(self):
def sorted(self) -> 'GenomicLocationStreamed':
return NotImplemented
@property
def position(self):
def position(self) -> ComputationNode:
'''
Return a computation node for the position of the locations.
Can be realized by calling bnp.compute on them, or can be aggregated across the stream with
numpy aggregations
-------
ComputationNode
'''
return self._position
@property
def chromosome(self):
def chromosome(self) -> ComputationNode:
'''
Return a computation node for the chromosome of the locations.
Can be realized by calling bnp.compute on them, or can be aggregated across the stream with
numpy aggregations
Returns
-------
ComputationNode
'''
return self._chromosome
def get_data_field(self, field_name: str):
def get_data_field(self, field_name: str) -> ComputationNode:
'''
Get a field from the undelying data returned as a computation node
Parameters
----------
field_name: str
Returns
-------
ComputationNode
'''
return ComputationNode(getattr, [self._data_node, field_name])
@property
def strand(self):
def strand(self) -> ComputationNode:
'''
Return a computation node for the strand of the locations.
Can be realized by calling bnp.compute on them, or can be aggregated across the stream with
numpy aggregations
Returns
-------
ComputationNode
'''
if not self.is_stranded():

@@ -277,5 +399,16 @@ raise ValueError('Strand not supported on unstranded intervals')

def __getitem__(self, item):
'''
Get a subset of the locations, returned as a CompuationNode
Parameters
----------
item
Returns
-------
'''
return self.__class__(ComputationNode(lambda x, i: x[i], [self._data_node, item]), self._genome_context)
def get_windows(self, flank: int = None, window_size: int = None) -> 'GenomicIntervals':
def get_windows(self, flank: int = None, window_size: Optional[int] = None) -> 'GenomicIntervals':
"""Create windows around the locations.

@@ -288,2 +421,3 @@

----------
window_size
flank : int

@@ -304,13 +438,13 @@ Flank on either side of the location

assert window_size is not None
l_flank = window_size//2
r_flank = window_size//2 + window_size % 2
l_flank = window_size // 2
r_flank = window_size // 2 + window_size % 2
if self.is_stranded():
intervals = ComputationNode(StrandedInterval,
intervals = ComputationNode(StrandedInterval,
[self.chromosome,
self.position-l_flank,
self.position+r_flank, self.strand])
self.position - l_flank,
self.position + r_flank, self.strand])
else:
intervals = ComputationNode(
Interval, [self.chromosome, self.position-l_flank,
self.position+r_flank])
Interval, [self.chromosome, self.position - l_flank,
self.position + r_flank])
return GenomicIntervalsStreamed(intervals, self._genome_context,

@@ -329,15 +463,19 @@ is_stranded=self.is_stranded()).clip()

@abstractproperty
@property
@abstractmethod
def start(self):
return NotImplemented
@abstractproperty
@property
@abstractmethod
def stop(self):
return NotImplemented
@abstractproperty
@property
@abstractmethod
def chromosome(self):
return NotImplemented
@abstractproperty
@property
@abstractmethod
def strand(self):

@@ -424,6 +562,22 @@ return NotImplemented

@classmethod
def from_fields(cls, genome_context: GenomeContextBase, chromosome, start, stop, strand=None):
def from_fields(cls, genome_context: GenomeContextBase, chromosome: StringArray, start: np.ndarray, stop: np.ndarray, strand: Optional[EncodedArray]=None) -> 'GenomicIntervals':
'''
Create genomic intervals from fields
Parameters
----------
genome_context: GenomeContextBase
chromosome: StringArray
start: np.ndarray
stop: np.ndarray
strand: EncodedArray
Returns
-------
GenomicIntervals
'''
is_stranded = strand is not None
if is_stranded:
intervals = Bed6(chromosome, start, stop, ['.']*len(start),
intervals = Bed6(chromosome, start, stop, ['.'] * len(start),
np.zeros_like(start), strand)

@@ -435,3 +589,3 @@ else:

@classmethod
def from_intervals(cls, intervals: Interval, genome_context: GenomeContextBase, is_stranded=False):
def from_intervals(cls, intervals: Interval, genome_context: GenomeContextBase, is_stranded: Optional[bool]=False) -> 'GenomicIntervals':
"""Create genomic intervals from interval entries and genome info

@@ -442,5 +596,11 @@

intervals : Interval
chrom_sizes : Dict[str, int]
genome_context : GenomeContextBase
is_stranded : bool
Returns
-------
'GenomicIntervals'
"""
if isinstance(intervals, Interval): #TODO check is node
if isinstance(intervals, Interval): # TODO check is node
return GenomicIntervalsFull(genome_context.mask_data(intervals), genome_context, is_stranded)

@@ -451,3 +611,4 @@ else:

@classmethod
def from_interval_stream(cls, interval_stream: Iterable[Interval], genome_context: GenomeContextBase, is_stranded=False):
def from_interval_stream(cls, interval_stream: Iterable[Interval], genome_context: GenomeContextBase,
is_stranded=False) -> 'GenomicIntervals':
"""Create streamed genomic intervals from a stream of intervals and genome info

@@ -458,5 +619,10 @@

interval_stream : Iterable[Interval]
chrom_sizes : Dict[str, int]
genome_context : GenomeContextBase
is_stranded : bool
Returns
-------
'GenomicIntervals'
"""
interval_stream = genome_context.iter_chromosomes(

@@ -481,6 +647,5 @@ interval_stream, StrandedInterval if is_stranded else Interval)

class GenomicIntervalsFull(GenomicIntervals):
''' Class for holding a set of intervals in memory'''
class GenomicIntervalsFull(GenomicIntervals):
''' Class for holding a set of intervals in memory'''
is_stream = False

@@ -494,3 +659,4 @@

@property
def data(self):
def data(self) -> BNPDataClass:
'''Return the underlying data as a bnpdataclass'''
return self._intervals

@@ -500,7 +666,7 @@

if func == np.concatenate:
return self.__class__(np.concatenate([obj._intervals for obj in args[0]]), self._genome_context, self._is_stranded)
return self.__class__(np.concatenate([obj._intervals for obj in args[0]]), self._genome_context,
self._is_stranded)
return NotImplemented
def __repr__(self):

@@ -524,17 +690,28 @@ return f'Genomic Intervals on {self._genome_context}:\n{self._intervals.astype(Interval)}'

def map_locations(self, locations: LocationEntry):
'''
Map locations to the intervals. The locations should be in the same coordinate system as the intervals
The new locations will be in the coordinate system of the intervals
Parameters
----------
locations: LocationEntry
Returns
-------
LocationEntry
'''
go = self._genome_context.global_offset.from_local_interval(self._intervals)
global_positions = self._genome_context.global_offset.from_local_coordinates(locations.chromosome, locations.position)
global_positions = self._genome_context.global_offset.from_local_coordinates(locations.chromosome,
locations.position)
location_indices, interval_indices = find_indices(global_positions, go)
new_entries = locations[location_indices]
names = self._intervals.name if hasattr(self._intervals, 'name') else StringArray(np.arange(len(self._intervals)).astype('S'))
names = self._intervals.name if hasattr(self._intervals, 'name') else StringArray(
np.arange(len(self._intervals)).astype('S'))
return replace(new_entries, chromosome=names[interval_indices],
position=new_entries.position - self.start[interval_indices])
return map_locations(replace(locations, position=global_positions), go)
def sorted(self) -> 'GenomicIntervals':

@@ -571,9 +748,9 @@ """Return the intervals sorted according to `genome_context`

else:
location = np.where(self.strand==('+' if where=='start' else '-'),
location = np.where(self.strand == ('+' if where == 'start' else '-'),
self.start,
self.stop-1)
self.stop - 1)
data = replace(self._intervals, start=location)
else:
assert where == 'center'
location = (self.start+self.stop)//2
location = (self.start + self.stop) // 2
data = replace(self._intervals, start=location)

@@ -617,3 +794,3 @@ return GenomicLocationGlobal.from_data(

chrom_sizes = self._genome_context.global_offset.get_size(self._intervals.chromosome)
return self.from_intervals(extend_to_size(self._intervals, size, chrom_sizes),
return self.from_intervals(extend_to_size(self._intervals, size, chrom_sizes),
self._genome_context)

@@ -806,3 +983,2 @@

def get_mask(self) -> GenomicArray:

@@ -813,3 +989,4 @@ return GenomicArrayNode(ComputationNode(get_boolean_mask, [self._intervals_node, self._chrom_size_node]),

def clip(self) -> 'GenomicIntervals':
return self.__class__(ComputationNode(clip, [self._intervals_node, self._chrom_size_node]), self._genome_context)
return self.__class__(ComputationNode(clip, [self._intervals_node, self._chrom_size_node]),
self._genome_context)

@@ -816,0 +993,0 @@ def __replace__(self, **kwargs):

@@ -9,3 +9,3 @@ import numpy as np

from npstructures import RunLengthRaggedArray
from typing import List, Union, Iterable, Tuple, Dict, Any
from typing import List, Union, Iterable, Tuple, Dict, Any, Callable

@@ -19,11 +19,11 @@

@property
def genome_context(self):
def genome_context(self) -> GenomeContextBase:
return self._genome_context
@abstractmethod
def __array_ufunc__(self, ufunc: callable, method: str, *inputs, **kwargs) -> 'GenomicArray':
def __array_ufunc__(self, ufunc: Callable, method: str, *inputs, **kwargs) -> 'GenomicArray':
return NotImplemented
@abstractmethod
def __array_function__(self, func: callable, types: List, args: List, kwargs: Dict) -> Any:
def __array_function__(self, func: Callable, types: List, args: List, kwargs: Dict) -> Any:
return NotImplemented

@@ -39,4 +39,5 @@

@classmethod
def from_global_data(cls, global_pileup: GenomicRunLengthArray, genome_context: GenomeContextBase) -> 'GenomicArray':
"""Create the genomic array from data represened on a flat/concatenated genome.
def from_global_data(cls, global_pileup: GenomicRunLengthArray,
genome_context: GenomeContextBase) -> 'GenomicArray':
"""Create the genomic array from data represented on a flat/concatenated genome.

@@ -54,13 +55,5 @@ Parameters

"""
return GenomicArrayGlobal(global_pileup, genome_context)
def _get_intervals_from_data(self, name, data):
if data.dtype == bool:
return Interval([name]*len(data.starts),
data.starts, data.ends)[data.values]
else:
return BedGraph([name]*len(data.starts),
data.starts, data.ends, data.values)
@classmethod

@@ -81,3 +74,3 @@ def from_bedgraph(cls, bedgraph: BedGraph, genome_context: GenomeContextBase) -> 'GenomicData':

"""
if isinstance(bedgraph, BedGraph):

@@ -92,6 +85,15 @@ go = genome_context.global_offset

return GenomicArrayNode(ComputationNode(GenomicRunLengthArray.from_bedgraph,
[interval_stream, StreamNode(iter(genome_context.chrom_sizes.values()))]),
[interval_stream,
StreamNode(iter(genome_context.chrom_sizes.values()))]),
genome_context)
def _get_intervals_from_data(self, name, data):
if data.dtype == bool:
return Interval([name] * len(data.starts),
data.starts, data.ends)[data.values]
else:
return BedGraph([name] * len(data.starts),
data.starts, data.ends, data.values)
class GenomicArrayGlobal(GenomicArray, np.lib.mixins.NDArrayOperatorsMixin):

@@ -101,2 +103,3 @@ '''

'''
def __init__(self, global_track: GenomicRunLengthArray, genome_context: GenomeContextBase):

@@ -108,3 +111,3 @@ assert isinstance(global_track, GenomicRunLengthArray), global_track

@property
def dtype(self):
def dtype(self) -> np.dtype:
return self._global_track.dtype

@@ -117,6 +120,9 @@

def sum(self) -> float:
def sum(self, axis=None) -> float:
'''Sum the data in the array'''
assert axis is None
return self._global_track.sum(axis=None)
def extract_chromsome(self, chromosome: Union[str, List[str]]) -> Union[GenomicRunLengthArray, RunLengthRaggedArray]:
def extract_chromsome(self, chromosome: Union[str, List[str]]) -> Union[
GenomicRunLengthArray, RunLengthRaggedArray]:
"""Get the data on one or more chromosomes

@@ -147,2 +153,10 @@ Parameters

def to_dict(self) -> Dict[str, GenomicRunLengthArray]:
"""
Convert the genomic array to a dict of arrays with chromosomes as keys
Returns
-------
Dict[str, GenomicRunLengthArray]
"""
go = self._genome_context.global_offset

@@ -152,6 +166,24 @@ names = go.names()

sizes = go.get_size(names)
return {name: self._global_track[offset:offset+size].to_array()
return {name: self._global_track[offset:offset + size].to_array()
for name, offset, size in zip(names, offsets, sizes)}
def __array_ufunc__(self, ufunc: callable, method: str, *inputs, **kwargs):
def __array_ufunc__(self, ufunc: np.ufunc, method: str, *inputs, **kwargs):
'''
Handle numpy ufuncs on the genomic array
Parameters
----------
ufunc: np.ufunc
method: str
How to call the ufunc
inputs: List
Args for the ufunc
kwargs: Dict
Additional arguments
Returns
-------
GenomicArrayGlobal
'''
inputs = [(i._global_track if isinstance(i, GenomicArrayGlobal) else i) for i in inputs]

@@ -161,3 +193,3 @@ r = self._global_track.__array_ufunc__(ufunc, method, *inputs, **kwargs)

def __array_function__(self, func: callable, types: List, args: List, kwargs: Dict):
def __array_function__(self, func: Callable, types: List, args: List, kwargs: Dict):
"""Handles any numpy array functions called on a raggedarray

@@ -187,7 +219,7 @@

"""
go = self._genome_context._global_offset
go = self._genome_context.global_offset
names = go.names()
starts = go.get_offset(names)
stops = starts+go.get_size(names)
stops = starts + go.get_size(names)
intervals_list = []

@@ -201,3 +233,4 @@ for name, start, stop in zip(names, starts, stops):

def extract_intervals(self, intervals: Union[Interval, 'GenomicIntervals'], stranded: bool = False) -> RunLengthRaggedArray:
def extract_intervals(self, intervals: Union[Interval, 'GenomicIntervals'],
stranded: bool = False) -> RunLengthRaggedArray:
"""Extract the data contained in a set of intervals

@@ -234,3 +267,5 @@

"""
global_intervals = self._genome_context.global_offset.from_local_coordinates(locations.chromosome, locations.position)
assert stranded is None, 'Stranded not implemented for locations'
global_intervals = self._genome_context.global_offset.from_local_coordinates(locations.chromosome,
locations.position)
return self._global_track[global_intervals]

@@ -243,3 +278,2 @@ # if not stranded:

@classmethod

@@ -264,2 +298,15 @@ def from_dict(cls, d: Dict[str, GenomicRunLengthArray], genome_context: GenomeContextBase) -> 'GenomicData':

genome_context: GenomeContextBase) -> 'GenomicData':
'''
Create a genomic array from a stream of data
Parameters
----------
stream: Iterable[Tuple[str, GenomicRunLengthArray]]
genome_context: GenomeContextBase
Returns
-------
GenomicData
'''
return cls.from_dict(dict(stream))

@@ -284,3 +331,3 @@

def __array_ufunc__(self, ufunc: callable, method: str, *inputs, **kwargs):
def __array_ufunc__(self, ufunc: np.ufunc, method: str, *inputs, **kwargs):
args = [gtn._run_length_node if isinstance(gtn, GenomicArrayNode) else gtn for gtn in inputs]

@@ -296,3 +343,3 @@ return self.__class__(ufunc(*args, **kwargs), self._genome_context)

"""
return ComputationNode(self._get_intervals_from_data, [self._chrom_name_node, self._run_length_node])

@@ -321,4 +368,5 @@

"""
def stranded_func(ra: GenomicRunLengthArray, start: int, stop: int, strand: str):
assert np.all(stop <= ra.size), (np.max(stop), ra.size)

@@ -334,3 +382,4 @@ rle = ra[start:stop]

assert self.genome_context.is_compatible(intervals.genome_context), (self.genome_context, intervals.genome_context)
assert self.genome_context.is_compatible(intervals.genome_context), (
self.genome_context, intervals.genome_context)
intervals = intervals.as_stream()

@@ -345,5 +394,23 @@ if stranded:

def extract_chromsome(self, chromosome: Union[str, List[str]]) -> 'GenomicData':
'''
Extract the data for a chromosome
Unimplemented
'''
assert False
def from_stream(cls, stream: Iterable[Tuple[str, GenomicRunLengthArray]], genome_context: GenomeContextBase) -> 'GenomicData':
def from_stream(cls, stream: Iterable[Tuple[str, GenomicRunLengthArray]],
genome_context: GenomeContextBase) -> 'GenomicData':
'''
Create a genomic array from a stream of data
Parameters
----------
stream: Iterable[Tuple[str, GenomicRunLengthArray]]
genome_context: GenomeContextBase
Returns
-------
GenomicData
'''
stream_node = StreamNode((array for _, array in stream))

@@ -369,5 +436,7 @@ return cls(stream_node, genome_context)

def to_dict(self):
'''Unimplemented'''
assert False
def sum(self, axis=None) -> float:
'''Sum the data in the array'''
return np.sum(self._run_length_node)

@@ -374,0 +443,0 @@

import dataclasses
from typing import Tuple
import numpy as np

@@ -55,2 +57,7 @@ from ..encodings.string_encodings import StringEncoding

def to_local_coordinates(self, global_offset) -> Tuple[EncodedArray, np.ndarray]:
chromosome_idxs = np.searchsorted(self._offset, global_offset, side="right") - 1
local_offset = global_offset - self._offset[chromosome_idxs]
return EncodedArray(chromosome_idxs, self._old_encoding), local_offset
def from_local_interval(self, interval, do_clip=False):

@@ -57,0 +64,0 @@ start_offsets, stop_offsets = self.start_ends_from_intervals(interval, do_clip)

from functools import lru_cache
from itertools import accumulate, repeat, takewhile, chain
from typing import Union, Tuple, List, Any
import numpy as np

@@ -17,3 +19,8 @@ from npstructures.raggedshape import RaggedView, RaggedView2

class BamBufferExtractor:
def __init__(self, data, starts, ends, header_data, is_contigous=True):
'''
Class to handle the extraction of data from a buffer from a BAM file.
'''
def __init__(self, data: np.ndarray, starts: np.ndarray, ends: np.ndarray, header_data: Any,
is_contigous: bool = True):
self._data = data

@@ -49,3 +56,3 @@ self._new_lines = starts

@property
def data(self):
def data(self) -> np.ndarray:
if not self._is_contigous:

@@ -56,6 +63,7 @@ self._make_contigous()

def __getitem__(self, item):
return self.__class__(self._data, self._new_lines[item], self._ends[item], self._header_data, is_contigous=False)
return self.__class__(self._data, self._new_lines[item], self._ends[item], self._header_data,
is_contigous=False)
def _get_ints(self, offsets, n_bytes, dtype):
tmp = self._data[(self._new_lines+offsets)[:, None] + np.arange(n_bytes)].ravel()
tmp = self._data[(self._new_lines + offsets)[:, None] + np.arange(n_bytes)].ravel()
ints = (tmp).view(dtype).ravel()

@@ -114,3 +122,3 @@ assert len(ints) == len(self._new_lines), (len(ints), offsets, len(self._new_lines), n_bytes, dtype)

def _get_read_name(self):
read_names = ragged_slice(self._data, self._read_name_start, self._cigar_start-1)
read_names = ragged_slice(self._data, self._read_name_start, self._cigar_start - 1)
read_names = EncodedRaggedArray(

@@ -141,13 +149,30 @@ EncodedArray(read_names.ravel(), BaseEncoding), read_names.shape)

def get_field_by_number(self, i):
def get_field_by_number(self, i: int) -> Union[np.ndarray, EncodedArray, EncodedRaggedArray]:
'''
Get the data from the field with the given number.
Parameters
----------
i: int
The field number.
Returns
-------
Union[np.ndarray, EncodedArray, EncodedRaggedArray]
The data from the field.
'''
return self._functions[i]()
@property
def size(self):
def size(self) -> int:
if self._is_contigous:
return self._data.size
else:
return (self._ends-self._new_lines).sum()
return (self._ends - self._new_lines).sum()
class BamHeader:
"""
Class to handle the header of a BAM file.
"""
def __init__(self, file_object):

@@ -158,3 +183,3 @@ self._file_object = file_object

def read(self, n_bytes):
def read(self, n_bytes: int) -> bytes:
bytes = self._file_object.read(n_bytes)

@@ -172,3 +197,12 @@ self._header_data.append(bytes)

def read_header(self):
def read_header(self) -> List[Tuple[str, int]]:
"""
Read the header of the BAM file. Returns a list of tuples with the reference names and lengths.
Returns
-------
List[Tuple[str, int]]
The reference names and lengths.
"""
magic = self.read(4)

@@ -193,7 +227,15 @@ assert magic == b"BAM\1", magic

def bytes(self):
def bytes(self) -> bytes:
"""
Get the header as bytes.
Returns
-------
bytes
The header as bytes.
"""
return b''.join(self._header_data)
class BamBuffer(FileBuffer):

@@ -205,2 +247,3 @@ '''

supports_modified_write = False
def __init__(self, buffer_extractor, header_data=None):

@@ -214,15 +257,15 @@ self._buffer_extractor = buffer_extractor

def get_field_range_as_text(self):
def get_field_range_as_text(self, *args):
raise Exception('Cannot write BAM file with set values')
@property
def size(self):
def size(self) -> int:
return self._buffer_extractor.size
@property
def data(self):
def data(self) -> np.ndarray:
return self._buffer_extractor.data
@property
def n_lines(self):
def n_lines(self) -> int:
return len(self._buffer_extractor)

@@ -244,3 +287,3 @@

@classmethod
def make_header(cls, data):
def make_header(cls, data: BamEntry) -> bytes:
header = data.get_context("header")

@@ -250,10 +293,4 @@ return header.bytes()

@classmethod
def read_header(cls, file_object):
def read_header(cls, file_object) -> BamHeader:
return BamHeader(file_object)
magic = file_object.read(4)
assert magic == b"BAM\1", magic
header_length = cls._read_int(file_object)
file_object.read(header_length)
n_ref = cls._read_int(file_object)
return cls._handle_refs(n_ref, file_object)

@@ -273,4 +310,4 @@ @classmethod

chunk = bytes(chunk)
new_start = lambda start, _: start + int.from_bytes(chunk[start:start+4], byteorder="little") + 4
_starts = accumulate(repeat(0), new_start)# chain([0], accumulate(repeat(0), new_start))
new_start = lambda start, _: start + int.from_bytes(chunk[start:start + 4], byteorder="little") + 4
_starts = accumulate(repeat(0), new_start) # chain([0], accumulate(repeat(0), new_start))
starts = list(takewhile(lambda start: start <= len(chunk), _starts))

@@ -284,15 +321,24 @@ return starts

@classmethod
def from_raw_buffer(cls, chunk, header_data):
def from_raw_buffer(cls, chunk: np.ndarray, header_data: BamHeader) -> "BamBuffer":
chunk = np.asarray(chunk)
starts = np.asanyarray(cls._find_starts(chunk))
buffer_extractor = BamBufferExtractor(chunk[:starts[-1]], starts[:-1],starts[1:], header_data.info)
buffer_extractor = BamBufferExtractor(chunk[:starts[-1]], starts[:-1], starts[1:], header_data.info)
return cls(buffer_extractor, header_data)
def get_data(self):
def get_data(self)-> BamEntry:
"""
Get the data from the buffer.
Returns
-------
BamEntry
The data from the buffer.
"""
return BamEntry(*(self.get_field_by_number(i) for i in range(9)))
def get_field_by_number(self, i, dtype=None):
def get_field_by_number(self, i, dtype=None)-> Union[np.ndarray, EncodedArray, EncodedRaggedArray]:
return self._buffer_extractor.get_field_by_number(i)
def count_entries(self):
def count_entries(self) -> int:
return len(self._buffer_extractor)

@@ -308,7 +354,10 @@

lambda: self._buffer_extractor.get_field_by_number(3),
lambda: self._buffer_extractor.get_field_by_number(3)+count_reference_length(*(self._buffer_extractor.get_field_by_number(i) for i in (5, 6))),
lambda: self._buffer_extractor.get_field_by_number(3) + count_reference_length(
*(self._buffer_extractor.get_field_by_number(i) for i in (5, 6))),
lambda: self._buffer_extractor.get_field_by_number(1),
lambda: self._buffer_extractor.get_field_by_number(4),
lambda: EncodedArray(np.where(self._buffer_extractor.get_field_by_number(2) & np.uint16(16), ord("-"), ord("+"))[:, None], BaseEncoding)
]
lambda: EncodedArray(
np.where(self._buffer_extractor.get_field_by_number(2) & np.uint16(16), ord("-"), ord("+"))[:, None],
BaseEncoding)
]
return funcs[i]()

@@ -328,5 +377,5 @@

start,
start+length,
start + length,
read_names,
mapq,
strand)

@@ -51,14 +51,2 @@ import io

def __init___(self, data: EncodedArray, new_lines: np.ndarray = None, delimiters: np.ndarray = None,
header_data=None, buffer_extractor=None):
super().__init__(data, new_lines)
if delimiters is None:
delimiters = np.concatenate(
([-1], np.flatnonzero(self._data == self.DELIMITER), self._new_lines)
)
delimiters.sort(kind="mergesort")
self._delimiters = delimiters
self._header_data = header_data
self.__buffer_extractor = buffer_extractor
@classmethod

@@ -92,4 +80,4 @@ def from_raw_buffer(cls, chunk: np.ndarray, header_data=None) -> "DelimitedBuffer":

n_fields = cls._get_n_fields(entry_ends)
size = delimiters[entry_ends[-1]]+1
delimiters = np.insert(delimiters[:entry_ends[-1]+1], 0, -1)
size = delimiters[entry_ends[-1]] + 1
delimiters = np.insert(delimiters[:entry_ends[-1] + 1], 0, -1)
buffer_extractor = cls._get_buffer_extractor(

@@ -100,30 +88,2 @@ chunk[:size], delimiters, n_fields)

@classmethod
def _get_n_fields(cls, entry_ends):
return entry_ends[0] + 1
@property
def __buffer_extractor(self):
if self.__buffer_extractor is None:
self.__buffer_extractor = self._get_buffer_extractor()
return self.__buffer_extractor
@classmethod
def _get_buffer_extractor(cls, data, delimiters, n_cols) -> TextThroughputExtractor:
starts = delimiters[:-1].reshape(-1, n_cols) + 1
ends = delimiters[1:].reshape(-1, n_cols)
ends = cls._modify_for_carriage_return(ends, data)
entry_starts = starts[:, 0]
entry_ends = ends[:, -1] + 1
return TextThroughputExtractor(data, starts, field_ends=ends, entry_starts=entry_starts, entry_ends=entry_ends)
@classmethod
def _modify_for_carriage_return(cls, ends, data):
if data.size==0 or ends[0, -1]==0:
return ends
if data[ends[0, -1]-1] == '\r':
ends = ends.copy()
ends[:, -1] -= data[ends[:, -1]-1] == '\r'
return ends
def __getitem__(self, idx):

@@ -165,9 +125,9 @@ return self.__class__(self._buffer_extractor[idx], self._header_data)

@classmethod
def join_fields(cls, fields_list: List[EncodedRaggedArray]):
def join_fields(cls, fields_list: List[EncodedRaggedArray]) -> EncodedRaggedArray:
return join_columns(fields_list, cls.DELIMITER).ravel()
def get_field_range_as_text(self, *args, **kwargs):
def get_field_range_as_text(self, *args, **kwargs) -> EncodedRaggedArray:
return self.get_column_range_as_text(*args, **kwargs)
def get_column_range_as_text(self, col_start, col_end, keep_sep=False):
def get_column_range_as_text(self, col_start, col_end, keep_sep=False) -> EncodedRaggedArray:
"""Get multiple columns as text

@@ -183,2 +143,7 @@

keep seperator at end
Returns
-------
EncodedRaggedArray
EncodedRaggedArray of the columns
"""

@@ -189,24 +154,2 @@ self.validate_if_not()

@staticmethod
def _move_ints_to_digit_array(ints, n_digits):
powers = np.uint8(10) ** np.arange(n_digits)[::-1]
ret = (ints[..., None] // powers) % 10
return EncodedArray(ret, DigitEncoding)
def _validate(self):
chunk = self._data
delimiters = self._delimiters[1:]
n_delimiters_per_line = (
next(i for i, d in enumerate(delimiters) if chunk[d] == NEWLINE) + 1
)
self._n_cols = n_delimiters_per_line
should_be_new_lines = chunk[delimiters[n_delimiters_per_line - 1::n_delimiters_per_line]]
if delimiters.size % n_delimiters_per_line != 0 or np.any(should_be_new_lines != "\n"):
offending_line = np.flatnonzero(should_be_new_lines != "\n")[0]
lines = split(self._data, '\n')
raise FormatException(
f"Irregular number of delimiters per line ({delimiters.size}, {n_delimiters_per_line}): {lines}",
line_number=offending_line)
self._validated = True
@classmethod

@@ -226,3 +169,3 @@ def from_data(cls, data: BNPDataClass) -> "DelimitedBuffer":

@classmethod
def make_header(cls, data: bnpdataclass):
def make_header(cls, data: BNPDataClass):
header = ""

@@ -255,2 +198,14 @@ if data.has_context("header"):

@property
def actual_dataclass(self):
return self.dataclass
def get_field_by_number(self, field_nr: int, field_type: type = object):
"""Get a field by number"""
self.validate_if_not()
if field_type is None:
field_type = dataclasses.fields(self.actual_dataclass)[field_nr]
return self._get_field_by_number(
field_nr, field_type)
def _get_field_by_number(self, col_number, field_type):

@@ -270,3 +225,3 @@

col_number,
keep_sep=(field_type == List[int] or field_type==List[float]))
keep_sep=(field_type == List[int] or field_type == List[float]))
text = subresult

@@ -286,18 +241,12 @@ assert isinstance(text, (EncodedRaggedArray, EncodedArray)), text

raise FormatException(e.args[0], line_number=row_number)
# if is_subclass_or_instance(field_type, Encoding):
# parsed = as_encoded_array(subresult, field_type)
return parsed
def count_entries(self) -> int:
"""Count the number of entries in the buffer"""
return len(self._buffer_extractor)
@property
def actual_dataclass(self):
return self.dataclass
def n_lines(self) -> int:
return len(self._buffer_extractor)
def get_field_by_number(self, field_nr: int, field_type: type = object):
self.validate_if_not()
if field_type is None:
field_type = dataclasses.fields(self.actual_dataclass)[field_nr]
return self._get_field_by_number(
field_nr, field_type)
def _parse_split_floats(self, text, sep=','):

@@ -322,3 +271,4 @@ function = str_to_float

mask = int_strings.lengths != 0
return RaggedArray(function(int_strings[mask]), (text == sep).sum(axis=-1), safe_mode=False) # TODO: is it necessary with unsafe mode here
return RaggedArray(function(int_strings[mask]), (text == sep).sum(axis=-1),
safe_mode=False) # TODO: is it necessary with unsafe mode here
return RaggedArray(function(int_strings), (text == sep).sum(axis=-1))

@@ -329,16 +279,55 @@ else:

def count_entries(self) -> int:
"""Count the number of entries in the buffer"""
return len(self._buffer_extractor)
@classmethod
def _get_n_fields(cls, entry_ends):
return entry_ends[0] + 1
@property
def n_lines(self):
return len(self._buffer_extractor)
def __buffer_extractor(self):
if self.__buffer_extractor is None:
self.__buffer_extractor = self._get_buffer_extractor()
return self.__buffer_extractor
@classmethod
def _get_buffer_extractor(cls, data, delimiters, n_cols) -> TextThroughputExtractor:
starts = delimiters[:-1].reshape(-1, n_cols) + 1
ends = delimiters[1:].reshape(-1, n_cols)
ends = cls._modify_for_carriage_return(ends, data)
entry_starts = starts[:, 0]
entry_ends = ends[:, -1] + 1
return TextThroughputExtractor(data, starts, field_ends=ends, entry_starts=entry_starts, entry_ends=entry_ends)
@classmethod
def _modify_for_carriage_return(cls, ends, data):
if data.size == 0 or ends[0, -1] == 0:
return ends
if data[ends[0, -1] - 1] == '\r':
ends = ends.copy()
ends[:, -1] -= data[ends[:, -1] - 1] == '\r'
return ends
@staticmethod
def _move_ints_to_digit_array(ints, n_digits):
powers = np.uint8(10) ** np.arange(n_digits)[::-1]
ret = (ints[..., None] // powers) % 10
return EncodedArray(ret, DigitEncoding)
def _validate(self):
chunk = self._data
delimiters = self._delimiters[1:]
n_delimiters_per_line = (
next(i for i, d in enumerate(delimiters) if chunk[d] == NEWLINE) + 1
)
self._n_cols = n_delimiters_per_line
should_be_new_lines = chunk[delimiters[n_delimiters_per_line - 1::n_delimiters_per_line]]
if delimiters.size % n_delimiters_per_line != 0 or np.any(should_be_new_lines != "\n"):
offending_line = np.flatnonzero(should_be_new_lines != "\n")[0]
lines = split(self._data, '\n')
raise FormatException(
f"Irregular number of delimiters per line ({delimiters.size}, {n_delimiters_per_line}): {lines}",
line_number=offending_line)
self._validated = True
class GfaSequenceBuffer(DelimitedBuffer):
dataclass = SequenceEntry
# SKIP_LAZY = True
def get_data(self):

@@ -420,3 +409,3 @@ ids = self.get_text(1, fixed_length=False)

assert [f.name for f in dataclasses.fields(tmp)] == columns, (
columns, [f.name for f in dataclasses.fields(tmp)])
columns, [f.name for f in dataclasses.fields(tmp)])

@@ -459,38 +448,2 @@ class NewClass(cls):

# def set_fields_from_header(self, columns: List[str]):
# if not has_header:
# return None
# fields = dataclasses.fields(self.dataclass)
# ordered_fields = [next(field for field in fields if field.name == col) for col in columns]
# # self._permuted_data_class = dataclasses.make_dataclass('TmpDataclass', ordered_fields)
# self.fields = ordered_fields
# assert np.array_equal(columns, [field.name for field in self.fields])
#
# def get_field_by_number(self, field_nr: int, field_type: type=object):
# # if self.fields is None:
# return super().get_field_by_number(field_nr, field_type)
# # col_id, t = next((i, field.type) for i, field in enumerate(dataclasses.fields(self.dataclass)) if field.name == self.fields[field_nr].name)
# #return super().get_field_by_number(col_id, t)
# # fields = self.fields if self.fields is not None else dataclasses.fields(self.dataclass)
#
# def get_data(self) -> _dataclass:
# """Parse the data in the buffer according to the fields in _dataclass
#
# Returns
# -------
# _dataclass
# Dataclass with parsed data
#
# """
# self.validate_if_not()
# columns = {}
# fields = self.fields if self.fields is not None else dataclasses.fields(self.dataclass)
# for col_number, field in enumerate(fields):
# col = self._get_field_by_number(col_number, field.type)
# columns[field.name] = col
# n_entries = len(next(col for col in columns if col is not None))
# columns = {c: value if c is not None else np.empty((n_entries, 0))
# for c, value in columns.items()}
# return self.dataclass(**columns)
DatatypeBuffer.__name__ = _dataclass.__name__ + "Buffer"

@@ -497,0 +450,0 @@ DatatypeBuffer.__qualname__ = _dataclass.__qualname__ + "Buffer"

@@ -23,6 +23,7 @@ import numpy as np

def str_matrix_func(column):
n_rows, n_cols = column.shape
a = column.as_bytes().reshape(n_rows*n_cols, -1)
tabs = np.full((n_rows*n_cols, 1), ord("\t"))
a = column.as_bytes().reshape(n_rows * n_cols, -1)
tabs = np.full((n_rows * n_cols, 1), ord("\t"))
b = np.hstack([a, tabs])

@@ -39,9 +40,9 @@ b = b.reshape((n_rows, -1))[:, :-1]

def optional_ints_to_strings(number: np.ndarray, missing_string='.')->EncodedRaggedArray:
if np.all(number)==np.nan:
return as_encoded_array([missing_string]*len(number))
def optional_ints_to_strings(number: np.ndarray, missing_string='.') -> EncodedRaggedArray:
if np.all(number) == np.nan:
return as_encoded_array([missing_string] * len(number))
return ints_to_strings(number)
def get_column(values, field_type) -> EncodedRaggedArray:

@@ -78,4 +79,12 @@ def get_func_for_datatype(datatype):

Parameters
data : bnpdataclass
Data
data_dict: List[Tuple]
A list of tuples where each tuple contains the field name and the field value.
sep: str
The separator to use between fields.
Returns
-------
EncodedArray
A buffer containing the data in CSV format.
"""

@@ -89,2 +98,16 @@

def join_columns(columns: List[EncodedRaggedArray], sep: str) -> EncodedRaggedArray:
"""
Join columns into a single buffer.
Parameters
----------
columns: List[EncodedRaggedArray]
sep: str
Returns
-------
EncodedRaggedArray
The lines of the buffer
"""
lengths = np.concatenate([((column.lengths if

@@ -91,0 +114,0 @@ isinstance(column, RaggedArray)

from functools import lru_cache
from typing import Optional, List
from typing import Optional, List, Union, Any, Type, Tuple

@@ -11,3 +11,3 @@ import numpy as np

from .strops import str_to_int, str_to_int_with_missing, str_to_float, str_to_float_with_missing
from ..bnpdataclass import bnpdataclass
from ..bnpdataclass import bnpdataclass, BNPDataClass
from ..encoded_array import EncodedArray, EncodedRaggedArray, Encoding, as_encoded_array

@@ -23,3 +23,3 @@ from ..encodings import BaseEncoding

def move_intervals_to_digit_array(data, starts, ends, fill_value):
if len(starts)==0:
if len(starts) == 0:
return np.zeros_like(data, shape=((0, 0)))

@@ -37,6 +37,5 @@ max_chars = np.max(ends - starts)

def move_intervals_to_right_padded_array(data, starts, ends, fill_value, stop_at=None):
lens = ends - starts
max_chars = np.max(lens)
indices = np.minimum(starts[..., None] + np.arange(max_chars), data.size-1)
indices = np.minimum(starts[..., None] + np.arange(max_chars), data.size - 1)
array = data[indices]

@@ -46,3 +45,3 @@ del indices

new_lens = np.argmax(array == stop_at, axis=-1)
lens = np.where(new_lens>0, np.minimum(lens, new_lens), lens)
lens = np.where(new_lens > 0, np.minimum(lens, new_lens), lens)
max_chars = np.max(lens)

@@ -56,3 +55,3 @@ array = array[:, :max_chars].ravel()

cm = np.cumsum(z_lens[row_idxs])
diffs = np.diff(row_idxs)*max_chars-z_lens[row_idxs[1:]]
diffs = np.diff(row_idxs) * max_chars - z_lens[row_idxs[1:]]
del row_idxs

@@ -62,3 +61,3 @@ index_builder = np.ones(cm[-1], dtype=np.int32)

del cm
index_builder[0] = first_row*max_chars+lens[first_row]
index_builder[0] = first_row * max_chars + lens[first_row]
np.cumsum(index_builder, out=index_builder)

@@ -113,11 +112,10 @@ zeroed = index_builder

@lru_cache()
def size(self):
def size(self) -> int:
return self.data.size
@property
def data(self):
def data(self) -> EncodedArray:
return self._buffer_extractor.data
def __getitem__(self, idx):
def __getitem__(self, idx: Union[int, slice, List[int]]):
return NotImplemented

@@ -130,3 +128,3 @@

@property
def header_data(self):
def header_data(self) -> Any:
if hasattr(self, "_header_data"):

@@ -137,15 +135,11 @@ return self._header_data

@property
def n_lines(self):
def n_lines(self) -> int:
return NotImplemented
return len(self._new_lines)
@classmethod
def modify_class_with_header_data(cls, header_data):
def modify_class_with_header_data(cls, header_data: Any) -> Type["FileBuffer"]:
return cls
# class NewClass(cls):
# _header_data = header_data
@classmethod
def read_header(cls, file_object: FileIO):
def read_header(cls, file_object: FileIO) -> str:
"""Read the header data from the file

@@ -226,2 +220,3 @@

def validate_if_not(self):
"""Validate the buffer if it has not been validated yet"""
if not self._is_validated:

@@ -231,3 +226,3 @@ self._validate()

def get_data(self) -> bnpdataclass:
def get_data(self) -> BNPDataClass:
"""Extract the data from the buffer

@@ -239,3 +234,3 @@

-------
npdataclass
BNPDataClass
dataset containing the data from the buffer

@@ -258,3 +253,3 @@ """

max_chars = array.shape[-1]
to_indices = ends[::-1, None]-max_chars+np.arange(max_chars)
to_indices = ends[::-1, None] - max_chars + np.arange(max_chars)
self._data[to_indices] = array[::-1]

@@ -282,5 +277,4 @@

@classmethod
def contains_complete_entry(cls, chunks):
def contains_complete_entry(cls, chunks: List[np.ndarray]) -> bool:
n_new_lines = sum(np.count_nonzero(EncodedArray(chunk, BaseEncoding) == NEWLINE) for chunk in chunks)

@@ -293,2 +287,3 @@ return n_new_lines >= cls.n_lines_per_entry

class IncompleteEntryException(Exception):

@@ -299,3 +294,7 @@ pass

class TextBufferExtractor:
def __init__(self, data: EncodedArray, field_starts: np.ndarray, field_ends: np.ndarray=None, field_lens: np.ndarray=None):
"""
Base class for extracting data from a text buffer.
"""
def __init__(self, data: EncodedArray, field_starts: np.ndarray, field_ends: np.ndarray = None,
field_lens: np.ndarray = None):
'''

@@ -310,3 +309,3 @@ field_starts: n_entries x n_fields

assert field_ends is not None
self._field_lens = field_ends-field_starts
self._field_lens = field_ends - field_starts
else:

@@ -318,7 +317,7 @@ assert field_ends is None

@property
def data(self):
def data(self) -> EncodedArray:
return self._data
@property
def n_fields(self):
def n_fields(self) -> int:
return self._n_fields

@@ -329,3 +328,3 @@

def __getitem__(self, idx):
def __getitem__(self, idx: Union[int, slice, List[int]]) -> 'TextBufferExtractor':
return self.__class__(self._data,

@@ -335,3 +334,15 @@ field_starts=self._field_starts[idx],

def get_field_by_number(self, field_nr: int, keep_sep=False):
def get_field_by_number(self, field_nr: int, keep_sep: bool=False) -> EncodedRaggedArray:
"""
Extract the data for a single field.
Parameters
----------
field_nr: int
keep_sep: bool
Returns
-------
EncodedRaggedArray
"""
assert field_nr < self._n_fields, (field_nr, self._n_fields)

@@ -344,5 +355,2 @@ lens = self._field_lens.ravel()[field_nr::self._n_fields]

def _extract_data(self, lens, starts):

@@ -353,17 +361,30 @@ values = EncodedRaggedArray(self._data, RaggedView2(starts, lens))

def get_fixed_length_field(self, field_nr: int, field_length: int):
def get_fixed_length_field(self, field_nr: int, field_length: int)-> EncodedArray:
indices = self._field_starts[:, field_nr, None] + np.arange(field_length)
return self._data[indices]
def get_padded_field(self, field_nr, stop_at=None):
def get_padded_field(self, field_nr, stop_at=None) -> EncodedArray:
starts = self._field_starts[:, field_nr]
if starts.size == 0:
return np.zeros_like(self._data, shape = (len(starts), 0))
return np.zeros_like(self._data, shape=(len(starts), 0))
lens = self._field_lens[:, field_nr]
ends = lens+starts
ends = lens + starts
array = move_intervals_to_right_padded_array(self._data, starts.ravel(), ends.ravel(), fill_value='\x00', stop_at=stop_at)
return array.reshape(starts.shape+(array.shape[-1],))
array = move_intervals_to_right_padded_array(self._data, starts.ravel(), ends.ravel(), fill_value='\x00',
stop_at=stop_at)
return array.reshape(starts.shape + (array.shape[-1],))
def get_digit_array(self, field_nr: int):
def get_digit_array(self, field_nr: int) -> Tuple[EncodedArray, Optional[np.ndarray], Optional[np.ndarray]]:
"""
Extract the digits of the field as a 2D array of encoded integres.
Parameters
----------
field_nr: int
Returns
-------
EncodedArray
"""
starts = self._field_starts[:, field_nr]

@@ -375,3 +396,4 @@ possible_signs = self._data[starts]

return self.get_field_by_number(field_nr), is_negative, is_positive
digit_array = move_intervals_to_digit_array(self._data, starts, starts+self._field_lens[:, field_nr], fill_value='0')
digit_array = move_intervals_to_digit_array(self._data, starts, starts + self._field_lens[:, field_nr],
fill_value='0')
return digit_array, None, None

@@ -381,2 +403,14 @@

def concatenate(cls, buffers: List['TextBufferExtractor']):
"""
Concatenate multiple buffers into a single buffer.
Parameters
----------
buffers: List[TextBufferExtractor]
Returns
-------
TextBufferExtractor
"""
sizes = np.array([b._data.size for b in buffers])

@@ -391,5 +425,10 @@ offsets = np.insert(np.cumsum(sizes), 0, 0)

class TextThroughputExtractor(TextBufferExtractor):
def __init__(self, data: EncodedArray, field_starts: np.ndarray, field_ends: np.ndarray=None, field_lens=None, entry_starts: np.ndarray=None, entry_ends:np.ndarray=None, is_contiguous=True):
"""
TextBufferExtractor made especially for making it fast to write a modified or filtered buffer to file again.
"""
def __init__(self, data: EncodedArray, field_starts: np.ndarray, field_ends: np.ndarray = None, field_lens=None,
entry_starts: np.ndarray = None, entry_ends: np.ndarray = None, is_contiguous=True):
if field_lens is None:
field_lens = field_ends-field_starts
field_lens = field_ends - field_starts
super().__init__(data, field_starts, field_lens=field_lens)

@@ -409,3 +448,4 @@ self._entry_starts = entry_starts

entry_ends = np.concatenate([b._entry_ends + offset for b, offset in zip(buffers, offsets)])
return cls(data, starts, field_lens=lens, entry_starts=entry_starts, entry_ends=entry_ends, is_contiguous=all(b._is_contiguous for b in buffers))
return cls(data, starts, field_lens=lens, entry_starts=entry_starts, entry_ends=entry_ends,
is_contiguous=all(b._is_contiguous for b in buffers))

@@ -427,9 +467,7 @@ def __getitem__(self, idx):

self._entry_ends = new_starts[1:]
self._field_starts = self._field_starts-offsets[:, None]
# self._field_ends = self._field_ends-offsets[:, None]
self._field_starts = self._field_starts - offsets[:, None]
self._is_contiguous = True
@property
def data(self):
def data(self) -> EncodedArray:
if not self._is_contiguous:

@@ -443,5 +481,5 @@ self._make_contigous()

starts = self._field_starts[:, from_nr]
lens = self._entry_ends-starts
lens = self._entry_ends - starts
if not keep_sep:
lens-=1
lens -= 1
return self._extract_data(lens, starts)
from pathlib import PurePath
from typing import Union
from typing import Union, Optional

@@ -16,2 +16,3 @@ from .gzip_reading import gzip

from .vcf_buffers import VCFBuffer
from .pairs import PairsBuffer
from .wig import WigBuffer

@@ -41,6 +42,8 @@ from .parser import NumpyFileReader, NpBufferedWriter, NumpyBamWriter

".gff3": GFFBuffer,
".sam": SAMBuffer, #, comment="@"),
".sam": SAMBuffer,
".bam": BamBuffer,
".sizes": ChromosomeSizeBuffer,
'.wig': WigBuffer
'.wig': WigBuffer,
'.pairs': PairsBuffer,
'.pa5': PairsBuffer,
}

@@ -226,6 +229,18 @@

def read(filename: str, mode: str = None, buffer_type=None) -> NpDataclassReader:
'openes a file, reads it and closes it '
def read(filename: str, mode: str = None, buffer_type: Optional[FileBuffer]=None) -> BNPDataClass:
"""
Read the content of a file
Parameters
----------
filename: str
mode: str
buffer_type:
Returns
-------
BNPDataClass
"""
with bnp_open(filename, mode, buffer_type) as f:
content = f.read()
return content
from pathlib import Path
from typing import Union
from typing import Union, Dict, Iterable, Tuple

@@ -77,3 +77,3 @@ import numpy as np

def get_contig_lengths(self) -> dict:
def get_contig_lengths(self) -> Dict[str, int]:
"""Return a dict of chromosome names to seqeunce lengths

@@ -88,9 +88,9 @@

def keys(self):
def keys(self) -> Iterable[str]:
return self._index.keys()
def values(self):
def values(self) -> Iterable[EncodedArray]:
return (self[key] for key in self.keys())
def items(self):
def items(self) -> Iterable[Tuple[str, EncodedArray]]:
return ((key, self[key]) for key in self.keys())

@@ -107,3 +107,3 @@

chromosome : str
chromsome name
chromosome name

@@ -113,3 +113,3 @@ Returns

EncodedArray
The sequence for that chromoeme
The sequence for that chromosome
"""

@@ -164,4 +164,2 @@ idx = self._index[chromosome]

for j in range(n_row)])
# assert not np.any(sequence == 10), (np.flatnonzero(r_sequence==10), [lenb*(j+1)-1-start_mod
# for j in range(n_row)], read_start, read_length, n_row, start_mod, lenb, a_offset, intervals[i:i+1], indices[i:i+1])
pre_alloc[a_offset:a_offset+sequence.size] = sequence

@@ -172,3 +170,3 @@ a = EncodedArray(pre_alloc, BaseEncoding)

def get_interval_sequences(self, intervals: Interval) -> EncodedRaggedArray:
"""Get the seqeunces for a set of genomic intervals
"""Get the sequences for a set of genomic intervals

@@ -213,5 +211,3 @@ Parameters

assert np.all(pre_alloc> 0), np.sum(pre_alloc==0)
# s = np.delete(np.array(sequences, dtype=np.uint8), delete_indices)
#s = np.delete(pre_alloc[:alloc_offset], delete_indices)
a = EncodedArray(pre_alloc, BaseEncoding)
return EncodedRaggedArray(a, lengths)
from itertools import takewhile, repeat
from typing import Optional

@@ -33,2 +34,3 @@ from .delimited_buffers import DelimitedBufferWithInernalComments

def close(self):
"""Close the file reader"""
self._reader.close()

@@ -44,3 +46,3 @@

bnpdataclass
A dataclass holdin all the entries in the class
A dataclass holding all the entries in the class

@@ -60,16 +62,2 @@ Examples

def _get_lazy_class(self, dataclass, header=None):
if self.__lazy_class is None:
self.__lazy_class = create_lazy_class(dataclass, header=header)
return self.__lazy_class
def _should_be_lazy(self, chunk):
if ((not config.LAZY) and self._lazy is None) or (self._lazy is False):
return False
should_be_lazy = False
if hasattr(chunk, 'get_field_by_number') and hasattr(chunk, 'dataclass'):
if not issubclass(chunk.dataclass, (GTFEntry)):
should_be_lazy = True
return should_be_lazy
def read_chunk(self, min_chunk_size: int = 5000000, max_chunk_size: int = None) -> BNPDataClass:

@@ -84,15 +72,12 @@ """Read a single chunk into memory

----------
chunk_size: int
How many bytes to read from file
min_chunk_size: int
How many bytes to minimally read from file
max_chunk_size: int
How many bytes to maximally read from file in order to get a full entry
Returns
-------
bnpdataclass
A dataclass holdin all the entries in the next chunk
BNPDataClass
A dataclass holding all the entries in the next chunk
Examples
--------
5
"""

@@ -113,3 +98,3 @@ n_lines_read = self._reader.n_lines_read

def read_chunks(self, min_chunk_size: int = 5000000, max_chunk_size: int = None) -> NpDataclassStream:
def read_chunks(self, min_chunk_size: int = 5000000, max_chunk_size: Optional[int] = None) -> NpDataclassStream:
"""Read the whole file in chunks

@@ -123,4 +108,6 @@

----------
chunk_size : int
Number of bytes to read per chunk
min_chunk_size : int
Minimum size of each chunk
max_chunk_size : int
Maximum size of each chunk

@@ -130,8 +117,5 @@ Returns

NpDataclassStream
4
Examples
--------
5
"""

@@ -142,5 +126,4 @@ data_stream = takewhile(len, (self.read_chunk(min_chunk_size, max_chunk_size) for _ in repeat(None)))

def __iter__(self) -> NpDataclassStream:
"""Iteratate over chunks in the file
"""Iteratate over chunks in the file see `read_chunks`

@@ -150,5 +133,20 @@ Returns

NpDataclassStream
3
"""
return self.read_chunks()
def _get_lazy_class(self, dataclass, header=None):
if self.__lazy_class is None:
self.__lazy_class = create_lazy_class(dataclass, header=header)
return self.__lazy_class
def _should_be_lazy(self, chunk):
if ((not config.LAZY) and self._lazy is None) or (self._lazy is False):
return False
should_be_lazy = False
if hasattr(chunk, 'get_field_by_number') and hasattr(chunk, 'dataclass'):
if not issubclass(chunk.dataclass, (GTFEntry)):
should_be_lazy = True
return should_be_lazy
import dataclasses
from typing import List
from typing import List, Tuple, Optional, Union

@@ -8,3 +8,3 @@ import numpy as np

from ..encoded_array import EncodedArray, BaseEncoding, change_encoding, EncodedRaggedArray
from ..bnpdataclass import bnpdataclass
from ..bnpdataclass import bnpdataclass, BNPDataClass
from ..datatypes import SequenceEntry

@@ -28,26 +28,12 @@ from .exceptions import FormatException

@property
def n_lines(self):
def n_lines(self) -> int:
return len(self._buffer_extractor)*self.n_lines_per_entry
@property
def data(self):
def data(self) -> EncodedArray:
return self._buffer_extractor.data
@classmethod
def _get_buffer_extractor(cls, data, new_lines):
tmp = np.insert(new_lines, 0, -1)+1
field_ends = new_lines.reshape(-1, cls.n_lines_per_entry)
field_ends = cls._modify_for_carriage_return(field_ends, data)
field_starts = tmp[:-1].reshape(-1, cls.n_lines_per_entry)+(np.array(cls._line_offsets))
entry_starts = tmp[:-1:cls.n_lines_per_entry]
entry_ends = tmp[::cls.n_lines_per_entry][1:]
return TextThroughputExtractor(data,
field_starts,
field_ends=field_ends,
entry_starts=entry_starts,
entry_ends=entry_ends)
@classmethod
def contains_complete_entry(cls, chunks):
def contains_complete_entry(cls, chunks: List[EncodedArray]) -> Tuple[bool, EncodedArray]:
if len(chunks) == 1:

@@ -61,3 +47,3 @@ try:

@classmethod
def from_raw_buffer(cls, chunk, header_data=None) -> "OneLineBuffer":
def from_raw_buffer(cls, chunk: EncodedArray, header_data=None) -> "OneLineBuffer":
"""Create a buffer with full entries

@@ -77,6 +63,2 @@

Examples
--------
8
"""

@@ -95,3 +77,3 @@ assert header_data is None

def get_data(self) -> bnpdataclass:
def get_data(self) -> BNPDataClass:
"""Get and parse fields from each line"""

@@ -101,3 +83,3 @@ headers, sequences = [self._buffer_extractor.get_field_by_number(i) for i in (0, 1)]

def get_field_by_number(self, i: int, t: type=None):
def get_field_by_number(self, i: int, t: Optional[type]= None) -> Union[np.ndarray, EncodedArray, EncodedRaggedArray]:
""" Get a field indexed by number"""

@@ -117,3 +99,3 @@

def get_field_range_as_text(self, start, end):
def get_field_range_as_text(self, start: int, end: int) -> EncodedRaggedArray:
"""Get a range of fields as text"""

@@ -124,3 +106,3 @@ assert end == start+1

@classmethod
def from_data(cls, entries: bnpdataclass) -> "OneLineBuffer":
def from_data(cls, entries: BNPDataClass) -> "OneLineBuffer":
"""Convert the data from the entries into a buffer that can be written to file

@@ -132,3 +114,3 @@

----------
entries : bnpdataclass
entries : BNPDataClass
The entries to be written to the buffer

@@ -142,14 +124,8 @@

"""
# names = entries.name
# sequences = entries.sequence
# if entries.sequence.encoding != BaseEncoding:
# sequences = change_encoding(sequences, BaseEncoding)
data_dict = [(field.type, getattr(entries, field.name)) for field in dataclasses.fields(entries)]
columns= [get_column(value, key) for key, value in data_dict]
return cls.join_fields(columns)
# return cls.join_fields([names, sequences])
@classmethod
def join_fields(cls, fields: List[EncodedRaggedArray]):
def join_fields(cls, fields: List[EncodedRaggedArray]) -> EncodedArray:
field_lengths = np.hstack([field.shape[1][:, None] for field in fields])

@@ -170,3 +146,22 @@ line_lengths = field_lengths+1

def get_text_field_by_number(self, i: int) -> Union[np.ndarray, EncodedArray, EncodedRaggedArray]:
return self.get_field_by_number(i)
@classmethod
def _get_buffer_extractor(cls, data, new_lines):
tmp = np.insert(new_lines, 0, -1)+1
field_ends = new_lines.reshape(-1, cls.n_lines_per_entry)
field_ends = cls._modify_for_carriage_return(field_ends, data)
field_starts = tmp[:-1].reshape(-1, cls.n_lines_per_entry)+(np.array(cls._line_offsets))
entry_starts = tmp[:-1:cls.n_lines_per_entry]
entry_ends = tmp[::cls.n_lines_per_entry][1:]
return TextThroughputExtractor(data,
field_starts,
field_ends=field_ends,
entry_starts=entry_starts,
entry_ends=entry_ends)
@classmethod
def _validate(cls, data, new_lines):

@@ -191,5 +186,2 @@ header = cls.HEADER

def get_text_field_by_number(self, i):
return self.get_field_by_number(i)
@classmethod

@@ -206,4 +198,8 @@ def _modify_for_carriage_return(cls, field_ends, data):

class TwoLineFastaBuffer(OneLineBuffer):
"""
Buffer for fasta files where each entry is contained in two lines (one for header and one for sequence)
For multi-line fasta files, use MultiLineFastaBuffer
"""
HEADER = ">" # 62
n_lines_per_entry = 2
dataclass = SequenceEntry
import codecs
import io
import logging
import numpy as np
from ..bnpdataclass import BNPDataClass
try:
from typing import IO
from typing import IO, Union, Iterable
except ImportError:

@@ -197,9 +201,2 @@ from typing.io import IO

a, bytes_read = self.__add_newline_to_end(a, bytes_read)
#
# if a[bytes_read - 1] != ord("\n"):
# a = np.append(a, ord("\n"))
# bytes_read += 1
# if hasattr(self._buffer_type, "_new_entry_marker"):
# a = np.append(a, self._buffer_type._new_entry_marker)
# bytes_read += 1
return a[:bytes_read]

@@ -215,7 +212,6 @@

"""
File writer that can write @npdataclass objects
to file
File writer that can write BNPDataClass objects to file
"""
def __init__(self, file_obj, buffer_type):
def __init__(self, file_obj: io.FileIO, buffer_type: FileBuffer):
self._file_obj = file_obj

@@ -240,3 +236,3 @@ self._buffer_type = buffer_type

def write(self, data: npdataclass):
def write(self, data: Union[BNPDataClass, BnpStream, grouped_stream]):
"""Write the provided data to file

@@ -246,4 +242,4 @@

----------
data : npdataclass
dataset containing entries
data : Union[BNPDataClass, BnpStream, grouped_stream]
dataset containing entries, or stram of datasets

@@ -264,3 +260,2 @@ """

(not hasattr(self._file_obj, "mode") or self._file_obj.mode != 'ab'): # and \
#getattr(self._buffer_type, 'HAS_UNCOMMENTED_HEADER_LINE', False):
if not self._header_written:

@@ -275,7 +270,2 @@ header_array = self._buffer_type.make_header(data)

bytes_array = data.get_buffer(buffer_class=self._buffer_type)
# if not hasattr(self._buffer_type, 'get_column_range_as_text'):
# data = data.get_data_object()
# bytes_array = self._buffer_type.from_data(data)
# else:
#
else:

@@ -286,4 +276,3 @@ bytes_array = self._buffer_type.from_data(data)

bytes_array = bytes_array.raw()
self._file_obj.write(bytes(bytes_array)) # .tofile(self._file_obj)
# self._file_obj.flush()
self._file_obj.write(bytes(bytes_array))
logger.debug(

@@ -295,2 +284,6 @@ f"Wrote chunk of size {repr_bytes(bytes_array.size)} to {self._f_name}"

class NumpyBamWriter(NpBufferedWriter):
"""
Class for handling writing of BAM files
"""
EOF_MARKER = b'\x1f\x8b\x08\x04\x00\x00\x00\x00\x00\xff\x06\x00\x42\x43\x02\x00\x1b\x00\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00'

@@ -304,3 +297,18 @@

def chunk_lines(stream, n_lines):
def chunk_lines(stream: Iterable[FileBuffer], n_lines: int) -> Iterable[FileBuffer]:
"""
Chunk the content of the stream so that each chunk contains exactly `n_lines` lines (except the last)
Parameters
----------
stream : Iterable[FileBuffer]
Stream of FileBuffers
n_lines : int
Number of lines in each chunk
Returns
-------
Iterable[FileBuffer]
Stream of FileBuffers, each containing `n_lines` lines
"""
cur_buffers = []

@@ -307,0 +315,0 @@ remaining_lines = n_lines

@@ -28,3 +28,3 @@ import numpy as np

import matplotlib.pyplot as _plt
plt.style.use('Solarize_Light2')
_plt.style.use('Solarize_Light2')
self._plt = _plt

@@ -31,0 +31,0 @@ except:

@@ -0,1 +1,3 @@

from typing import List, Dict, Optional
import numpy as np

@@ -10,2 +12,6 @@ from numpy.typing import ArrayLike

class EncodedCounts:
"""
Class for storing counts of encoded data.
"""
alphabet: list

@@ -33,3 +39,3 @@ counts: np.ndarray

def __getitem__(self, idx):
def __getitem__(self, idx: str):
return self.counts[..., self.alphabet.index(idx)]

@@ -55,4 +61,24 @@

def __array_ufunc__(self, ufunc, method, *inputs, **kwargs):
if method == "__call__":
assert all(i.alphabet == self.alphabet for i in inputs if isinstance(i, EncodedCounts))
assert all(i.alphabet == self.alphabet for i in kwargs.values() if isinstance(i, EncodedCounts))
arrays = [i.counts if isinstance(i, EncodedCounts) else i for i in inputs]
kwargs = {k: i.counts if isinstance(i, EncodedCounts) else i for k, i in kwargs.items()}
return self.__class__(self.alphabet, getattr(ufunc, method)(*arrays, **kwargs))
else:
return NotImplemented
@property
def proportions(self):
def proportions(self) -> np.ndarray:
"""
Calculate the proportions of each label in the counts.
Returns
-------
np.ndarray
The proportions of each label in the counts.
"""
s = self.counts.sum(axis=-1, keepdims=True)

@@ -66,7 +92,19 @@ return np.where(s > 0, self.counts / s, 0)

def get_count_for_label(self, label):
def get_count_for_label(self, label: str) -> int:
"""
Get the count for a specific label.
Parameters
----------
label: str
Returns
-------
int
"""
return np.sum(self.counts[..., self.alphabet.index(l)] for l in label)
@property
def labels(self):
def labels(self) -> List[str]:
return self.alphabet

@@ -84,3 +122,15 @@

def most_common(self, n=None):
def most_common(self, n: Optional[int]=None) -> 'EncodedCounts':
"""
Extract counts for the n most common labels.
Parameters
----------
n
Returns
-------
EncodedCounts
"""
args = np.argsort(self.counts)[::-1]

@@ -93,3 +143,11 @@ if n is not None:

def as_dict(self):
def as_dict(self) -> Dict[str, np.ndarray]:
"""
Convert the counts to a dictionary.
Returns
-------
Dict[str, np.ndarray]
"""
return dict(zip(self.alphabet, self.counts.T))

@@ -99,3 +157,3 @@

def count_encoded(values: EncodedArrayLike, weights: ArrayLike = None, axis: int = -1) -> EncodedCounts:
"""Count the occurances of encoded entries. Works on any encoding with finite alphabet
"""Count the occurances of encoded entries. Works on any encoding with finite alphabet.

@@ -106,3 +164,5 @@ Parameters

weights : ArrayLike
Weights for each entry
axis : int
0 for column counts, -1 or 1 for row counts None for flattened counts

@@ -109,0 +169,0 @@ Returns

@@ -69,3 +69,17 @@ from ..datatypes import Interval

@streamable()
def get_strand_specific_sequences(encoded_array: EncodedArray, stranded_intervals: Interval):
def get_strand_specific_sequences(encoded_array: EncodedArray, stranded_intervals: Interval) -> EncodedRaggedArray:
"""Extract the sequences within the intervals, and reverse complement if the strand is negative
Parameters
----------
encoded_array : EncodedArray
The encoded sequences
stranded_intervals : Interval
The intervals with strands
Returns
-------
EncodedArray
The (possibly reverse complemented) sequences
"""
relevant_sequences = encoded_array[stranded_intervals.start:stranded_intervals.stop]

@@ -78,3 +92,17 @@ rev_complimnet_seqs = get_reverse_complement(relevant_sequences)

@streamable()
def get_sequences(sequence: EncodedArray, intervals: Interval):
def get_sequences(sequence: EncodedArray, intervals: Interval) -> EncodedRaggedArray:
"""
Get the sequences within the intervals, without caring about strands.
For stranded intervals use get_strand_specific_sequences
Parameters
----------
sequence: EncodedArray
intervals: Interval
Returns
-------
EncodedRaggedArray
"""
return sequence[intervals.start:intervals.stop]

@@ -29,4 +29,4 @@ import numpy as np

def _pwm_from_counts(count_matrix):
with_pseudo = count_matrix+1
return np.log(with_pseudo/with_pseudo.sum(axis=0, keepdims=True))
with_pseudo = count_matrix + 1
return np.log(with_pseudo / with_pseudo.sum(axis=0, keepdims=True))

@@ -39,2 +39,3 @@

"""
def __init__(self, matrix, alphabet):

@@ -48,3 +49,3 @@ self._matrix = matrix

if isinstance(sequence, (EncodedArray, EncodedRaggedArray)):
if isinstance(sequence.encoding, AlphabetEncoding):

@@ -59,6 +60,4 @@ alphabet = list((sequence.encoding.get_alphabet()))

@property
def alphabet(self):
def alphabet(self) -> str:
return self._alphabet

@@ -70,3 +69,3 @@

@property
def window_size(self):
def window_size(self) -> int:
return self._matrix.shape[-1]

@@ -102,12 +101,10 @@

sequence = self.as_valid_encoded_array(sequence)
# sequence = as_encoded_array(sequence, self._encoding)
# assert sequence.encoding == self._encoding
scores = np.zeros(sequence.size, dtype=float)
m = self._matrix.T.copy()
for offset, row in enumerate(m):
scores[:scores.size-offset] += row[sequence[offset:].raw()]
scores[:scores.size - offset] += row[sequence[offset:].raw()]
return scores
@classmethod
def from_dict(cls, dictionary: Dict[str, ArrayLike], background: Dict[str, float]=None) -> "PWM":
def from_dict(cls, dictionary: Dict[str, ArrayLike], background: Dict[str, float] = None) -> "PWM":
"""Create a PWM object from a dict of letters to position probabilities

@@ -133,18 +130,13 @@

if background is None:
background = {key: 1/len(dictionary) for key in dictionary}
background = {key: 1 / len(dictionary) for key in dictionary}
alphabet = "".join(dictionary.keys())
with np.errstate(divide="ignore"):
matrix = np.log(np.array(list(dictionary.values())))-np.log([background[key] for key in dictionary])[:, np.newaxis]
matrix = np.log(np.array(list(dictionary.values()))) - np.log([background[key] for key in dictionary])[:,
np.newaxis]
return cls(matrix, alphabet)
# @classmethod
# def from_motif(cls, motif: Motif):
# return cls(motif.matrix, motif.alphabet)
@classmethod
def from_counts(cls, counts: typing.Union[dict]):
# if isinstance(counts, Motif):
# return cls(_pwm_from_counts(counts.matrix), counts.alphabet)
# else:
return cls(_pwm_from_counts(np.array(list(counts.values()))),
def from_counts(cls, counts: Dict[str, typing.List[int]]) -> "PWM":
"""Create a PWM object from a dict of letters to position counts"""
return cls(_pwm_from_counts(np.array(list(counts.values()))),
"".join(counts.keys()))

@@ -155,6 +147,5 @@

return "PWM with alphabet " + self._alphabet + "\n" + \
'\n'.join([' '.join([str(round(c, 2)) for c in row]) for row in matrix])
'\n'.join([' '.join([str(round(c, 2)) for c in row]) for row in matrix])
def get_motif_scores_old(sequence: EncodedRaggedArray, pwm: PWM) -> RaggedArray:

@@ -213,2 +204,2 @@ """Computes motif scores for a motif on a sequence.

scores = RaggedArray(scores, shape[-1])
return scores[..., :(-pwm.window_size+1)]
return scores[..., :(-pwm.window_size + 1)]

@@ -17,2 +17,24 @@ import logging

def match_string(sequence: EncodedArrayLike, matching_sequence: SingleEncodedArrayLike) -> ArrayLike:
"""
Matches a sequence aginst sequences and returns a boolean RaggedArray representing positions
where the sequence matches.
Parameters
----------
sequence :
matching_sequence :
Returns
-------
ArrayLike
A boolean RaggedArray representing positions where the sequence matches.
Examples
--------
>>> import bionumpy as bnp
>>> sequence = bnp.as_encoded_array(['ACGT', 'TACTAC'])
>>> matching_sequence = bnp.as_encoded_array('AC', sequence.encoding)
>>> bnp.match_string(sequence, matching_sequence)
ragged_array([ True False False]
[False True False False True])
"""
sequence = as_encoded_array(sequence)

@@ -19,0 +41,0 @@ enforced_encoding = sequence.encoding

import numpy as np
from ..bnpdataclass import BNPDataClass
from ..streams import streamable

@@ -41,3 +43,3 @@ from ..encodings import BaseEncoding, AlphabetEncoding

def __call__(self, sequence: EncodedArrayLike):
def __call__(self, sequence: EncodedArrayLike)-> EncodedArrayLike:
e = sequence.encoding

@@ -52,3 +54,27 @@ sequence = sequence[..., ::-1]

@apply_to_npdataclass("sequence")
def translate_dna_to_protein(sequence):
def translate_dna_to_protein(sequence: BNPDataClass) -> BNPDataClass:
"""
Translate a DNA sequence to a protein sequence.
Parameters
----------
sequence : BNPDataClass
The data that should be translated, should have a sequence attribute
Returns
-------
BNPDataClass
The translated data
Examples
--------
>>> import bionumpy as bnp
>>> dna = bnp.SequenceEntry.from_entry_tuples([("seq1", "ACGTAT")])
>>> protein = bnp.sequence.translate_dna_to_protein(dna)
>>> protein
SequenceEntry with 1 entries
name sequence
seq1 TY
"""
return Translate().windowed(sequence)

@@ -6,14 +6,3 @@ from .stream import BnpStream

def _chunk_entries(stream: BnpStream, n_entries: int) -> Generator:
"""Chunkk a stream into fixed number of entries
Parameters
----------
stream : BnpStream
n_chunks : int
Returns
-------
BnpStream
"""
b = []

@@ -34,2 +23,13 @@ buffer_size = 0

def chunk_entries(stream: BnpStream, n_entries: int) -> BnpStream:
"""Chunk a stream into fixed number of entries
Parameters
----------
stream : BnpStream
n_entries : int
Returns
-------
BnpStream
"""
return stream.__class__(_chunk_entries(stream, n_entries))

@@ -7,4 +7,6 @@ import numpy as np

class StringArray(np.lib.mixins.NDArrayOperatorsMixin):
"""Wrapper around NumPy arrays of strings. Can be used as datatype in BNPDataClass fields."""
wrapped_functions = ['size', 'shape', 'ndim', '__len__']
wrapped_properies = ['T']
def __init__(self, data):

@@ -11,0 +13,0 @@ self._data = np.asanyarray(data, dtype='S')

import typing
SequenceID = typing.NewType("SequenceID", str)

@@ -70,3 +70,2 @@ import numpy as np

ascii_hashes = get_ascii_hash(encoded_ragged_array, cls.big_mod)
idx = Counter(ascii_hashes).most_common(1)[0][0]
assert len(set(ascii_hashes)) == len(ascii_hashes), (len(set(ascii_hashes)), len(ascii_hashes))

@@ -73,0 +72,0 @@ hash_table = HashTable(ascii_hashes, np.arange(len(encoded_ragged_array)), mod=modulo)

@@ -6,3 +6,32 @@ import npstructures as nps

def ragged_slice(array: EncodedRaggedArray, starts=None, ends=None) -> EncodedRaggedArray:
slice = nps.ragged_slice(array.ravel(), starts, ends)
return EncodedRaggedArray(EncodedArray(slice.ravel(), array.encoding), slice.shape)
"""
Slice a ragged array column-wise.
Parameters
----------
array : EncodedRaggedArray
The array to slice
starts : np.ndarray
The start indices of the slices
ends : np.ndarray, optional
The end indices of the slices. If not provided, the slices will be taken to the end of the array.
Returns
-------
EncodedRaggedArray
The sliced array
Examples
--------
>>> import numpy as np
>>> import bionumpy as bnp
>>> seqs = bnp.as_encoded_array(["ACGT", "ACGTT"])
>>> starts = np.array([0, 1])
>>> ends = np.array([2, 3])
>>> sliced = bnp.ragged_slice(seqs, starts, ends)
>>> print(sliced)
AC
CG
"""
sliced_data = nps.ragged_slice(array.ravel(), starts, ends)
return EncodedRaggedArray(EncodedArray(sliced_data.ravel(), array.encoding), sliced_data.shape)
import numpy as np
from bionumpy import replace
from bionumpy import replace, EncodedArray, VCFEntry
def apply_variants_to_sequence(sequence, variants):
def apply_variants_to_sequence(sequence: EncodedArray, variants: VCFEntry) -> EncodedArray:
"""
Apply variants to a sequence by replacing the reference sequence with the alternative sequence.
Works only for variants where ref sequence and alt sequence have the same length.
Parameters
----------
sequence : EncodedArray
The sequence to apply the variants to
variants : VCFEntry
The variants to apply
Returns
-------
EncodedArray
The sequence with the variants applied
"""
seq = sequence.copy()

@@ -13,5 +30,13 @@ assert np.all(seq[variants.position] == variants.ref_seq.ravel()), (seq[variants.position], seq[variants.position+1], seq[variants.position-1],

def apply_variants(sequence_entries, variants):
"""
Wrapper around `apply_variants_to_sequence` that applies variants to multiple sequences.
"""
assert np.all(variants.alt_seq.lengths == 1)
return replace(sequence_entries, sequence=[apply_variants_to_sequence(entry.sequence, variants[variants.chromosome==entry.name]) for entry in sequence_entries])
return replace(sequence_entries, sequence=[
apply_variants_to_sequence(
entry.sequence,
variants[variants.chromosome==entry.name]
) for entry in sequence_entries])
Metadata-Version: 2.1
Name: bionumpy
Version: 1.0.8
Version: 1.0.10
Summary: Library for working with biological sequence data as numpy arrays.

@@ -5,0 +5,0 @@ Home-page: https://github.com/bionumpy/bionumpy

@@ -49,3 +49,3 @@ #!/usr/bin/env python

url='https://github.com/bionumpy/bionumpy',
version='1.0.8',
version='1.0.10',
zip_safe=False,

@@ -52,0 +52,0 @@ extras_require={'full': ['isal']}

@@ -66,2 +66,3 @@ import numpy as np

with pytest.raises(ValueError):
f.write(bam_entries)
f.write(bam_entries)

@@ -97,3 +97,3 @@ import dataclasses

# @pytest.mark.skip("Deprecated")
def test_set_get_context():

@@ -100,0 +100,0 @@ data = MyClass(a=[10, 20], b=[100, 200])

@@ -17,1 +17,9 @@ from bionumpy.sequence.count_encoded import EncodedCounts

['T', 'A'], np.array([5, 4]))
def test_ufuncs(encoded_counts):
sqrt = np.sqrt(encoded_counts)
assert sqrt == EncodedCounts(list('ACGT'), np.sqrt([4, 3, 1, 5]))
assert encoded_counts + 1 == EncodedCounts(list('ACGT'), np.array([5, 4, 2, 6]))
assert encoded_counts + encoded_counts == EncodedCounts(list('ACGT'), np.array([8, 6, 2, 10]))

@@ -16,3 +16,3 @@ import dataclasses

@dataclasses.dataclass
@bnpdataclass
class DummyClass:

@@ -19,0 +19,0 @@ a: int