Latest Threat Research:SANDWORM_MODE: Shai-Hulud-Style npm Worm Hijacks CI Workflows and Poisons AI Toolchains.Details
Socket
Book a DemoInstallSign in
Socket

bionumpy

Package Overview
Dependencies
Maintainers
1
Versions
48
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

bionumpy - npm Package Compare versions

Comparing version
1.0.5
to
1.0.6
+8
tests/conftest.py
from pathlib import Path
import pytest
@pytest.fixture()
def data_path():
return Path(__file__).parent.parent / 'example_data'
+1
-1
Metadata-Version: 2.1
Name: bionumpy
Version: 1.0.5
Version: 1.0.6
Summary: Library for working with biological sequence data as numpy arrays.

@@ -5,0 +5,0 @@ Home-page: https://github.com/bionumpy/bionumpy

@@ -134,2 +134,3 @@ HISTORY.rst

tests/buffers.py
tests/conftest.py
tests/fixtures.py

@@ -136,0 +137,0 @@ tests/genomic_fixtures.py

@@ -5,3 +5,3 @@ """Top-level package for bionumpy."""

__email__ = "knutdrand@gmail.com"
__version__ = '1.0.5'
__version__ = '1.0.6'

@@ -8,0 +8,0 @@ import npstructures as nps

@@ -0,1 +1,4 @@

from pathlib import Path
from typing import Union
import numpy as np

@@ -64,5 +67,7 @@ from ..encoded_array import EncodedArray, as_encoded_array, EncodedRaggedArray

def __init__(self, filename: str):
def __init__(self, filename: Union[str, Path]):
if isinstance(filename, str):
filename = Path(filename)
self._filename = filename
self._index = read_index(filename+".fai")
self._index = read_index(filename.with_suffix(filename.suffix + ".fai"))
self._f_obj = open(filename, "rb")

@@ -69,0 +74,0 @@ self._index_table = FastaIdx.from_entry_tuples(

import codecs
import logging
import numpy as np
from typing.io import IO
try:
from typing import IO
except ImportError:
from typing.io import IO
from npstructures import npdataclass

@@ -6,0 +9,0 @@

Metadata-Version: 2.1
Name: bionumpy
Version: 1.0.5
Version: 1.0.6
Summary: Library for working with biological sequence data as numpy arrays.

@@ -5,0 +5,0 @@ Home-page: https://github.com/bionumpy/bionumpy

@@ -49,3 +49,3 @@ #!/usr/bin/env python

url='https://github.com/bionumpy/bionumpy',
version='1.0.5',
version='1.0.6',
zip_safe=False,

@@ -52,0 +52,0 @@ extras_require={'full': ['isal']}

@@ -6,12 +6,12 @@ from bionumpy import Bed6

"bed": Bed6.from_entry_tuples([
("chr1", 1, 3, ".", 0, "-"),
("chr1", 40, 60, ".", 1, "+"),
("chr20", 400, 600, ".", 2, "+")]),
("chr1", 1, 3, ".", 0, "-"),
("chr1", 40, 60, ".", 1, "+"),
("chr20", 400, 600, ".", 2, "+")]),
"vcf2": VCFEntry.from_entry_tuples([
("chr1", 88361, "rs4970378", "A", "G", ".", ".", "."),
("chr1", 887559, "rs3748595", "A", "CAA", ".", ".", "."),
("chr2", 8877, "rs3828047", "AGG", "C", ".", ".", ".")]),
("chr1", 88361, "rs4970378", "A", "G", ".", ".", "."),
("chr1", 887559, "rs3748595", "A", "CAA", ".", ".", "."),
("chr2", 8877, "rs3828047", "AGG", "C", ".", ".", ".")]),
"fastq": SequenceEntryWithQuality.from_entry_tuples([
("headerishere", "CTTGTTGA", "".join("!" for _ in "CTTGTTGA")),
("anotherheader", "CGG", "".join("~" for _ in "CGG"))]),}
("anotherheader", "CGG", "".join("~" for _ in "CGG"))]), }
'''

@@ -42,2 +42,4 @@ "vcf": [

('chr1', 9871, 9872, 0.17042)])
}'''
}'''

@@ -80,8 +80,2 @@ import pytest

@pytest.fixture
def tmp_path():
from pathlib import Path
path = Path('tmp_folder')
path.mkdir(exist_ok=True)
return path

@@ -103,4 +97,2 @@ @pytest.fixture

d = f.read()
print(d)
print(d.flag.dtype)
assert_encoded_array_equal(d.extra[-1], 'NM:i:1')

@@ -107,0 +99,0 @@

@@ -11,4 +11,5 @@ import numpy as np

def test_read_acceptance():
filename = "example_data/test.bam"
def test_read_acceptance(data_path):
filename = data_path / "test.bam"
f = bnp.open(filename)

@@ -20,4 +21,4 @@ d = f.read()

def test_read_intervals_acceptance():
filename = "example_data/test.bam"
def test_read_intervals_acceptance(data_path):
filename = data_path / "test.bam"
f = bnp.open(filename, buffer_type=BamIntervalBuffer)

@@ -30,4 +31,4 @@ d = f.read()

@pytest.fixture()
def bam_entries():
filename = get_file_name('example_data/small_alignments.bam')
def bam_entries(data_path):
filename = data_path / 'small_alignments.bam'
entries = bnp.open(filename).read()

@@ -52,8 +53,10 @@ return entries

def test_write_bam(bam_entries):
def test_write_bam(bam_entries, tmp_path):
subset = bam_entries[bam_entries.mapq == 60]
with bnp.open('tmp.bam', mode='w') as f:
output_file = tmp_path / 'tmp.bam'
with bnp.open(output_file, mode='w') as f:
f.write(subset)
assert open('tmp.bam', 'rb').read()[-28:] == b'\x1f\x8b\x08\x04\x00\x00\x00\x00\x00\xff\x06\x00\x42\x43\x02\x00\x1b\x00\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00'
new_entries = bnp.open('tmp.bam').read()
assert open(output_file, 'rb').read()[-28:] == b'\x1f\x8b\x08\x04\x00\x00\x00\x00\x00\xff\x06\x00\x42\x43\x02\x00\x1b\x00\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00'
new_entries = bnp.open(output_file).read()
assert_array_equal(new_entries.position, subset.position)

@@ -60,0 +63,0 @@

@@ -106,6 +106,7 @@ import dataclasses

@pytest.mark.parametrize("file", [
"example_data/variants.vcf",
"example_data/variants_with_header.vcf"
"variants.vcf",
"variants_with_header.vcf"
])
def test_read_header(file):
def test_read_header(file,data_path):
file = data_path/file
chunks = list(bnp.open(file).read_chunks())

@@ -112,0 +113,0 @@ true_header = "".join(line for line in open(file) if line.startswith("#"))

@@ -30,8 +30,8 @@ import pytest

@pytest.fixture
def intervals():
return bnp.open("example_data/ctcf.bed.gz").read()
def intervals(data_path):
return bnp.open(data_path / "ctcf.bed.gz").read()
@pytest.fixture
def chrom_sizes():
return bnp.open("example_data/hg38.chrom.sizes").read()
def chrom_sizes(data_path):
return bnp.open(data_path / "hg38.chrom.sizes").read()

@@ -38,0 +38,0 @@

from bionumpy.genomic_data import Genome
def test_genomic_annotation():
g = Genome.from_file('example_data/hg38.chrom.sizes')
a = g.read_annotation('example_data/small_gff.gff3')
def test_genomic_annotation(data_path):
g = Genome.from_file(data_path / 'hg38.chrom.sizes')
a = g.read_annotation(data_path / 'small_gff.gff3')
print(a.genes.gene_id)
print(a.transcripts.transcript_id)
print(a.exons.exon_id)

@@ -68,4 +68,4 @@ import pytest

#@pytest.mark.xfail
def test_groupby_many_chunks():
file = "example_data/variants_with_header.vcf"
def test_groupby_many_chunks(data_path):
file = data_path / "variants_with_header.vcf"
chunks = bnp.open(file).read_chunks(100)

@@ -72,0 +72,0 @@ for chromosome, variants in bnp.groupby(chunks, "chromosome"):

@@ -7,7 +7,6 @@ import pytest

from bionumpy.util.testing import assert_encoded_array_equal
from .util import get_file_name
@pytest.fixture
def gtf_entries():
return bnp.open(get_file_name("example_data/small.gtf")).read()
def gtf_entries(data_path ):
return bnp.open(data_path / "small.gtf").read()

@@ -40,4 +39,4 @@ @pytest.fixture

# @pytest.mark.skip('waiting')
def test_read_gff():
annotation = bnp.open('example_data/small_gff.gff3').read()
def test_read_gff(data_path):
annotation = bnp.open(data_path / 'small_gff.gff3').read()
genes = annotation.get_genes()

@@ -49,5 +48,5 @@ assert genes[0].gene_id == 'ENSG00000290825.1'

def test_read_sarcer_gtf():
annotation = bnp.open('example_data/sacCer3.ensGene.gtf.gz').read()
def test_read_sarcer_gtf(data_path):
annotation = bnp.open(data_path / 'sacCer3.ensGene.gtf.gz').read()
transcripts = annotation.get_transcripts()
assert len(transcripts) > 0

@@ -30,4 +30,4 @@ import dataclasses

@pytest.fixture
def file_name():
name = 'tmp1234.tsv'
def file_name(tmp_path):
name = tmp_path / 'tmp1234.tsv'
with open(name, 'w') as f:

@@ -38,4 +38,4 @@ f.write(text)

@pytest.fixture
def empty_file_name(header):
name = 'empty.csv'
def empty_file_name(header, tmp_path):
name = tmp_path / 'empty.csv'
with open(name, 'w') as f:

@@ -46,4 +46,4 @@ f.write(header)

@pytest.fixture
def full_file_name(full_text):
name = 'tmp1234full.tsv'
def full_file_name(full_text, tmp_path):
name = tmp_path / 'tmp1234full.tsv'
with open(name, 'w') as f:

@@ -50,0 +50,0 @@ f.write(full_text)

@@ -18,9 +18,9 @@ from numpy.testing import assert_equal

def test_fasta_index():
index = create_index("example_data/small_genome.fa")
def test_fasta_index(data_path):
index = create_index(data_path / "small_genome.fa")
assert_equal(index.length, [300, 600, 900, 1200])
def test_dictlike():
idx_fasta = bnp.open_indexed("example_data/small_genome.fa")
def test_dictlike(data_path):
idx_fasta = bnp.open_indexed(data_path / "small_genome.fa")
assert list(idx_fasta.keys()) == ["0", "1", "2", "3"]

@@ -36,4 +36,4 @@ assert "Indexed Fasta" in repr(idx_fasta)

def test_get_sequences():
idx_fasta = bnp.open_indexed("example_data/small_genome.fa")
def test_get_sequences(data_path):
idx_fasta = bnp.open_indexed(data_path / "small_genome.fa")
_intervals = Interval.from_entry_tuples([("1", 10, 20),

@@ -40,0 +40,0 @@ ("2", 11, 50),

@@ -122,5 +122,4 @@ import os

@pytest.mark.parametrize("buffer_name", ["bed", "vcf", "fastq", "fasta"])
def test_ctx_manager_read(buffer_name):
file_path = Path(f"./{buffer_name}_example.{buffer_name}")
def test_ctx_manager_read(buffer_name, tmp_path):
file_path = tmp_path / f"./{buffer_name}_example.{buffer_name}"
with open(file_path, "w") as file:

@@ -132,5 +131,3 @@ file.write(buffer_texts[buffer_name])

os.remove(file_path)
@pytest.mark.parametrize("buffer_name", ["bed", "vcf", "fastq", "fasta"])

@@ -170,6 +167,7 @@ def test_append_to_file(buffer_name):

def test_write_empty():
def test_write_empty(tmp_path):
entry = VCFEntry([], [], [], [],
[], [], [], [])
with bnp.open('tmp.vcf', 'w') as f:
filename = tmp_path / 'tmp.vcf'
with bnp.open(filename, 'w') as f:
f.write(entry)

@@ -198,3 +196,3 @@

@pytest.fixture
def fastq_with_carriage_return_filename():
def fastq_with_carriage_return_filename(tmp_path):
text = '''\

@@ -206,3 +204,3 @@ @test_sequence_id_here\r

'''
filename = 'carriage_return.fq'
filename = tmp_path/'carriage_return.fq'
with open(filename, 'w') as file:

@@ -214,3 +212,3 @@ file.write(text)

@pytest.fixture
def bed_with_carriage_return_filename():
def bed_with_carriage_return_filename(tmp_path):
text = '''\

@@ -220,3 +218,3 @@ chr1\t1\t2\r

'''
filename = 'carriage_return.bed'
filename = tmp_path / 'carriage_return.bed'
with open(filename, 'w') as file:

@@ -228,3 +226,3 @@ file.write(text)

@pytest.fixture
def fasta_with_carriage_return_filename():
def fasta_with_carriage_return_filename(tmp_path):
text = '''\

@@ -237,3 +235,3 @@ >test_sequence_id_here\r

'''
filename = 'carriage_return.fa'
filename = tmp_path/'carriage_return.fa'
with open(filename, 'w') as file:

@@ -264,13 +262,16 @@ file.write(text)

# @pytest.mark.xfail
def test_carriage_return_fai(fasta_with_carriage_return_filename):
def test_carriage_return_fai(fasta_with_carriage_return_filename: Path):
# remove file if it exists
if os.path.exists(fasta_with_carriage_return_filename + '.fai'):
os.remove(fasta_with_carriage_return_filename + '.fai')
fai = bnp.open_indexed(fasta_with_carriage_return_filename)
# add .fai to the end of the file
filename = fasta_with_carriage_return_filename
fai_filename = filename.with_suffix(filename.suffix + '.fai')
if os.path.exists(fai_filename):
os.remove(fai_filename)
fai = bnp.open_indexed(filename)
assert_encoded_array_equal(fai['test_sequence_id_here'].raw(), 'GACTG')
assert_encoded_array_equal(fai['test_sequence_id_here2'].raw(), 'GACTCGAG')
def test_rwr_bed_with_change():
tmp_path = 'tmp_rwr.bed'
filename = get_file_name('example_data/alignments.bed')
def test_rwr_bed_with_change(tmp_path, data_path):
file_path = tmp_path / 'tmp_rwr.bed'
filename = data_path / 'alignments.bed'
data = bnp.open(filename, buffer_type=bnp.io.Bed6Buffer).read()

@@ -281,10 +282,10 @@ data.start = data.start + 1

data == data[::2]
if os.path.exists(tmp_path):
os.remove(tmp_path)
bnp.open(tmp_path, 'w', buffer_type=bnp.io.Bed6Buffer).write(data)
text = open(tmp_path).read()
if os.path.exists(file_path):
os.remove(file_path)
bnp.open(file_path, 'w', buffer_type=bnp.io.Bed6Buffer).write(data)
text = open(file_path).read()
assert text.startswith('chr1'), text[:10]
print(text)
data2 = bnp.open(tmp_path).read()
data2 = bnp.open(file_path).read()
assert_equal(data.start, data2.start)
assert np.all(data.chromosome == data2.chromosome)

@@ -35,4 +35,4 @@ import pytest

def test_cosmic_read():
matrix = bnp.io.read_matrix('example_data/COSMIC_v3.3.1_SBS_GRCh38.txt')
def test_cosmic_read(data_path):
matrix = bnp.io.read_matrix(data_path / 'COSMIC_v3.3.1_SBS_GRCh38.txt')
encoded = bnp.as_encoded_array(matrix.row_names.to_numpy_array(),

@@ -39,0 +39,0 @@ MutationTypeEncoding(1))

@@ -58,13 +58,13 @@ import os

@pytest.mark.parametrize("file", ["example_data/reads.fq", "example_data/big.fq.gz"])
@pytest.mark.parametrize("file", ["reads.fq", "big.fq.gz"])
@pytest.mark.parametrize("chunk_size", [100, 5000000])
def test_buffered_writer_ctx_manager(file, chunk_size):
def test_buffered_writer_ctx_manager(file, chunk_size, tmp_path, data_path):
file = data_path / file
file_path = tmp_path / "tmp.fq"
true_stream = bnp_open(data_path /'reads.fq').read_chunks()
file_path = "./tmp.fq"
true_stream = bnp_open('example_data/reads.fq').read_chunks()
with bnp_open(file_path, mode='w') as f:
f.write(true_stream)
true_stream = bnp_open('example_data/reads.fq').read_chunks()
true_stream = bnp_open(data_path / 'reads.fq').read_chunks()
fq_stream = bnp_open(file_path)

@@ -74,3 +74,3 @@ for fq_item, true_item in zip(fq_stream, true_stream):

os.remove(file_path)
# os.remove(file_path)

@@ -118,3 +118,2 @@

@pytest.mark.skip("makingtrouble")
@pytest.mark.parametrize("file_name", glob.glob("example_data/*"))
def test_read_example_data(file_name):

@@ -189,4 +188,4 @@ if "broken" in file_name:

def test_read_chunk_after_read_chunks_returns_empty_dataclass():
file = bnp.open("example_data/reads.fq")
def test_read_chunk_after_read_chunks_returns_empty_dataclass(data_path):
file = bnp.open(data_path / 'reads.fq')
chunks = list(file.read_chunks())

@@ -198,4 +197,4 @@ new_chunk = file.read_chunk()

def test_read_gtf():
file = bnp.open("example_data/small.gtf")
def test_read_gtf(data_path):
file = bnp.open(data_path / 'small.gtf')
chunk = file.read_chunk()

@@ -205,5 +204,5 @@ assert True

def test_read_bam():
data = bnp.open("example_data/alignments.bam").read()
data2 = bnp.open("example_data/alignments.sam").read()
def test_read_bam(data_path):
data = bnp.open(data_path / 'alignments.bam').read()
data2 = bnp.open(data_path / 'alignments.sam').read()
print(data)

@@ -216,4 +215,4 @@ print(data2)

print(data)
n_lines = len([line for line in open("example_data/alignments.sam") if not line.startswith("@")])
n_lines = len([line for line in open(data_path / 'alignments.sam') if not line.startswith("@")])
assert n_lines == len(data)

@@ -7,7 +7,7 @@ import pytest

from bionumpy.io.jaspar import read_jaspar_matrix
from bionumpy.sequence.position_weight_matrix import PositionWeightMatrix, _pwm_from_counts, PWM, get_motif_scores, get_motif_scores_old
from bionumpy.encodings.alphabet_encoding import AlphabetEncoding
from bionumpy.sequence.position_weight_matrix import PositionWeightMatrix, PWM, get_motif_scores, get_motif_scores_old
from bionumpy import EncodedArray
from bionumpy.io.motifs import read_motif
@pytest.fixture

@@ -20,2 +20,3 @@ def neutral_ppm_dict():

@pytest.fixture

@@ -58,3 +59,3 @@ def a_ppm_dict():

log_prob = PositionWeightMatrix(pwm)(window)
np.testing.assert_allclose(np.exp(log_prob), 0.4*0.25)
np.testing.assert_allclose(np.exp(log_prob), 0.4 * 0.25)

@@ -65,22 +66,22 @@

log_prob = PositionWeightMatrix(pwm).rolling_window(sequence)
np.testing.assert_allclose(np.exp(log_prob), [0.4*0.25, 0.025, 0.4*0.25])
np.testing.assert_allclose(np.exp(log_prob), [0.4 * 0.25, 0.025, 0.4 * 0.25])
def test_integration():
def test_integration(data_path):
# Read the alphabet and counts from jaspar file
pwm = read_jaspar_matrix("example_data/MA0080.1.jaspar")
pwm = read_jaspar_matrix(data_path /"MA0080.1.jaspar")
# Convert counts to position weight matrix
# pwm = PWM.from_dict(pwm)
# Make an array-class for the alphabet
# encoding = AlphabetEncoding(alphabet)
# Get the motif score function
# pwm = PWM(pwm, alphabet)
motif_score = PositionWeightMatrix(pwm)
#Get reads
entries = bnp.open("example_data/reads.fq").read()
# Get reads
entries = bnp.open(data_path / "reads.fq").read()
# Calculate the motif score for each valid window

@@ -90,5 +91,5 @@ scores = motif_score.rolling_window(entries.sequence)

def test_read_csv_motif():
pwm = read_motif("example_data/pwm.csv")
pwm_jaspar = read_motif("example_data/pwm.jaspar")
def test_read_csv_motif(data_path):
pwm = read_motif(data_path / "pwm.csv")
pwm_jaspar = read_motif(data_path / "pwm.jaspar")
assert str(pwm) == str(pwm_jaspar)

@@ -99,5 +100,5 @@

pwm = PWM(matrix, "ACGT")
#window = EncodedArray(window, AlphabetEncoding("ACGT"))
# window = EncodedArray(window, AlphabetEncoding("ACGT"))
log_prob = pwm.calculate_score(window)
np.testing.assert_allclose(np.exp(log_prob), 0.4*0.25)
np.testing.assert_allclose(np.exp(log_prob), 0.4 * 0.25)

@@ -126,3 +127,3 @@

scores = pwm.calculate_scores("AAC")
assert_array_equal(scores, [np.log(4**2), -np.inf, -np.inf])
assert_array_equal(scores, [np.log(4 ** 2), -np.inf, -np.inf])

@@ -136,3 +137,2 @@

log_prob = pwm.calculate_score(window)
np.testing.assert_allclose(np.exp(log_prob), 0.4*0.25)
np.testing.assert_allclose(np.exp(log_prob), 0.4 * 0.25)

@@ -11,4 +11,4 @@ # Various tests for reading and parsing real data files

def test_read_polaris_vcf():
data = bnp.open("example_data/polaris.vcf")
def test_read_polaris_vcf(data_path):
data = bnp.open(data_path / "polaris.vcf")

@@ -20,10 +20,10 @@ for chunk in data:

def test_read_syndip_vcf():
data = bnp.open("example_data/syndip.vcf").read()
def test_read_syndip_vcf(data_path):
data = bnp.open(data_path / "syndip.vcf").read()
print(data.info)
def test_read_vcf_info_field_with_missing_header():
data = bnp.open("example_data/vcf_with_broken_header.vcf").read()
def test_read_vcf_info_field_with_missing_header(data_path):
data = bnp.open(data_path / "vcf_with_broken_header.vcf").read()
assert isinstance(data.info, EncodedRaggedArray) and data.info.encoding == bnp.BaseEncoding, \
"Should parse as string when info tags missing"

@@ -17,3 +17,2 @@ import pytest

from numpy.random import default_rng
from .util import get_file_name

@@ -110,4 +109,4 @@ rng = default_rng()

def test_simulate_from_genome():
ref = get_file_name("example_data/small_genome.fa")
def test_simulate_from_genome(data_path):
ref = data_path / "small_genome.fa"
genome = bnp.Genome.from_file(ref)

@@ -125,4 +124,4 @@ genome = genome.read_sequence(ref)

def test_simulate_variants():
ref = "example_data/small_genome.fa"
def test_simulate_variants(data_path):
ref = data_path / "small_genome.fa"
genome = bnp.Genome.from_file(ref)

@@ -129,0 +128,0 @@ genome = genome.read_sequence(ref)

@@ -36,4 +36,4 @@ import numpy as np

@pytest.fixture
def file_name():
name = 'string_array_test.txt'
def file_name(tmp_path):
name = tmp_path / 'string_array_test.txt'
open(name, 'w').write(

@@ -40,0 +40,0 @@ '''\

@@ -27,4 +27,4 @@ import pytest

@pytest.fixture
def chrom_names():
return bnp.open("example_data/hg38.chrom.sizes").read().name
def chrom_names(data_path):
return bnp.open(data_path / "hg38.chrom.sizes").read().name

@@ -31,0 +31,0 @@

@@ -114,5 +114,5 @@ import pytest

def test_chromosome_str_equal():
bam = bnp.open("example_data/test.bam").read()
bam2 = bnp.open("example_data/test.bam").read()
def test_chromosome_str_equal(data_path):
bam = bnp.open(data_path / "test.bam").read()
bam2 = bnp.open(data_path / "test.bam").read()
assert np.all(str_equal(bam.chromosome, bam2.chromosome))

@@ -119,0 +119,0 @@

import dataclasses
import bionumpy as bnp
import numpy as np
from npstructures.testing import assert_raggedarray_equal
from numpy.testing import assert_array_equal
from bionumpy.bnpdataclass import BNPDataClass
import bionumpy as bnp

@@ -14,13 +12,13 @@ import bionumpy.encoded_array

from bionumpy.datatypes import VCFEntry, VCFEntryWithGenotypes
from bionumpy.datatypes import VCFEntryWithGenotypes
from bionumpy.encodings.vcf_encoding import PhasedGenotypeRowEncoding, GenotypeRowEncoding, PhasedHaplotypeRowEncoding
from bionumpy.util.testing import assert_bnpdataclass_equal
from tests.util import get_file_name
def test_vcf_matrix_buffer():
f = bnp.open("example_data/variants_with_header.vcf",
def test_vcf_matrix_buffer(tmp_path, data_path):
f = bnp.open(data_path / "variants_with_header.vcf",
buffer_type=bionumpy.io.vcf_buffers.PhasedVCFMatrixBuffer)
out = bnp.open("test1.vcf", mode="w")
out_path = tmp_path / "test1.vcf"
out = bnp.open(out_path, mode="w")

@@ -32,23 +30,24 @@ for chunk in f:

filestart = open('test1.vcf').read(100)
filestart = open(out_path).read(100)
assert filestart.startswith('#'), filestart
# check that header was written
chunk = bnp.open("test1.vcf").read_chunk()
chunk = bnp.open(out_path).read_chunk()
assert chunk.get_context("header") != "" and chunk.get_context("header") == header
def test_vcf_matrix_buffer_stream():
f = bnp.open("example_data/variants_with_header.vcf",
def test_vcf_matrix_buffer_stream(tmp_path, data_path):
f = bnp.open(data_path / "variants_with_header.vcf",
buffer_type=bionumpy.io.vcf_buffers.PhasedVCFMatrixBuffer)
out = bnp.open("test1.vcf", mode="w")
outpath = tmp_path / "test1.vcf"
out = bnp.open(outpath, mode="w")
out.write(f.read_chunks())
# check that header was written
chunk = bnp.open("test1.vcf").read_chunk()
chunk = bnp.open(outpath).read_chunk()
assert chunk.get_context("header") != ""
def test_context_state():
f = bnp.open("example_data/variants_with_header.vcf").read()
def test_context_state(data_path):
f = bnp.open(data_path / "variants_with_header.vcf").read()
assert f.get_context("header")
f2 = bnp.open("example_data/variants.vcf").read()
f2 = bnp.open(data_path / "variants.vcf").read()
assert not f2.get_context("header")

@@ -101,5 +100,5 @@

def test_parse_unphased_vcf():
# example_data/variants.vcf has messy unphased and missing genotypes
filename = "example_data/variants.vcf"
def test_parse_unphased_vcf(data_path):
# variants.vcf has messy unphased and missing genotypes
filename = data_path / "variants.vcf"
print(open(filename).read())

@@ -125,4 +124,4 @@ f = bnp.open(filename, buffer_type=bionumpy.io.vcf_buffers.VCFMatrixBuffer)

def test_parse_phased_vcf():
f = bnp.open("example_data/variants_phased.vcf", buffer_type=bionumpy.io.vcf_buffers.PhasedVCFMatrixBuffer)
def test_parse_phased_vcf(data_path):
f = bnp.open(data_path / "variants_phased.vcf", buffer_type=bionumpy.io.vcf_buffers.PhasedVCFMatrixBuffer)
data = f.read()

@@ -139,4 +138,4 @@ data = data.genotypes.raw()

def test_read_info_field():
vcf_filename = "example_data/variants_with_header.vcf"
def test_read_info_field(data_path):
vcf_filename = data_path / "variants_with_header.vcf"
f = bnp.open(vcf_filename,

@@ -152,4 +151,4 @@ buffer_type=bionumpy.io.vcf_buffers.PhasedVCFMatrixBuffer)

@pytest.mark.skip('missing data')
def test_read_info_field2():
vcf_filename = "example_data/info_flag.vcf"
def test_read_info_field2(data_path):
vcf_filename = data_path / "info_flag.vcf"
f = bnp.open(vcf_filename,

@@ -163,4 +162,4 @@ buffer_type=bionumpy.io.vcf_buffers.PhasedVCFMatrixBuffer)

# @pytest.mark.xfail
def test_read_biallelic_vcf():
file_name = "example_data/small_phased_biallelic.vcf"
def test_read_biallelic_vcf(data_path):
file_name = data_path / "small_phased_biallelic.vcf"
vcf = bnp.open(file_name, buffer_type=bnp.io.vcf_buffers.PhasedHaplotypeVCFMatrixBuffer)

@@ -172,4 +171,4 @@ for chunk in vcf.read_chunks():

@pytest.mark.xfail
def test_read_info_from_vcf():
file = "example_data/variants_with_single_individual_genotypes_and_info.vcf"
def test_read_info_from_vcf(data_path):
file = data_path / "variants_with_single_individual_genotypes_and_info.vcf"
variants = bnp.open(file).read()

@@ -183,4 +182,4 @@

@pytest.mark.skip
def test_concatenate_variants():
file = "example_data/variants_with_single_individual_genotypes_and_info.vcf"
def test_concatenate_variants(data_path):
file = data_path / "variants_with_single_individual_genotypes_and_info.vcf"
f = bnp.open(file)

@@ -210,4 +209,4 @@ chunk1 = f.read_chunk(min_chunk_size=200)

@pytest.fixture
def data_with_info():
file = "example_data/vcf_symbolic_sequences.vcf"
def data_with_info(data_path):
file = data_path / "vcf_symbolic_sequences.vcf"
data = bnp.open(file).read()

@@ -246,4 +245,4 @@ return data

# @pytest.mark.skip # .genotype not implemented
def test_read_genotype_data_from_messy_vcf():
file_name = "example_data/polaris_small.vcf"
def test_read_genotype_data_from_messy_vcf(data_path):
file_name = data_path / "polaris_small.vcf"
data = bnp.open(file_name, buffer_type=VCFBuffer2).read()

@@ -254,4 +253,4 @@ genotype = data.genotype[0]

def test_read_genotype_with_more_data():
file_name = get_file_name("example_data/syndip.vcf")
def test_read_genotype_with_more_data(data_path):
file_name = data_path / "syndip.vcf"
data = bnp.open(file_name, buffer_type=VCFBuffer2).read()

@@ -261,3 +260,3 @@ genotypes = data.genotype[:4]

def test_write_genotype():
def test_write_genotype(tmp_path):
data = narrow_type(VCFEntryWithGenotypes, 'info', str)(

@@ -274,9 +273,10 @@ ['chr1', 'chr2'],

)
with bnp.open("tmp.vcf", "w", buffer_type=VCFBuffer2) as f:
file_path = tmp_path / "tmp.vcf"
with bnp.open(file_path, "w", buffer_type=VCFBuffer2) as f:
f.write(data)
new_data = bnp.open("tmp.vcf", buffer_type=VCFBuffer2).read().get_data_object()
new_data = bnp.open(file_path, buffer_type=VCFBuffer2).read().get_data_object()
assert_bnpdataclass_equal(data, new_data)
def test_read_genotype_with_no_data():
file_name = "example_data/variants_without_genotypes.vcf"
def test_read_genotype_with_no_data(data_path):
file_name = data_path / "variants_without_genotypes.vcf"
data = bnp.open(file_name, buffer_type=VCFBuffer2).read()

@@ -286,4 +286,4 @@ genotypes = data.genotype[:4]

def test_read_empty_vcf():
file_name = get_file_name("example_data/empty_variants.vcf")
def test_read_empty_vcf(data_path):
file_name = data_path / "empty_variants.vcf"
data = bnp.open(file_name, buffer_type=VCFBuffer2).read()

@@ -294,4 +294,4 @@ assert len(data) == 0

@pytest.mark.skip # genotype fields not implemented
def test_read_genotype_ad_field():
file_name = "example_data/syndip.vcf"
def test_read_genotype_ad_field(data_path):
file_name = data_path / "syndip.vcf"
data = bnp.open(file_name, buffer_type=VCFBuffer2).read()

@@ -305,4 +305,4 @@ assert_array_equal(data[0].genotype_data.AD == [1, 1])

@pytest.mark.skip # genotype fields not implemented
def test_read_genotype_ad_field():
file_name = "example_data/syndip.vcf"
def test_read_genotype_ad_field(data_path):
file_name = data_path / "syndip.vcf"
data = bnp.open(file_name, buffer_type=VCFBuffer2).read()

@@ -315,4 +315,4 @@ assert_array_equal(data[0].genotype_data.AD == [1, 1])

def test_read_thousand_genomes_info_field():
data = bnp.open("example_data/thousand_genomes.vcf").read()
def test_read_thousand_genomes_info_field(data_path):
data = bnp.open(data_path / "thousand_genomes.vcf").read()

@@ -327,4 +327,4 @@ assert_raggedarray_equal(

def test_read_hprc_multiallelic():
data = bnp.open(get_file_name("example_data/hprc_multiallelic.vcf")).read()
def test_read_hprc_multiallelic(data_path):
data = bnp.open(data_path / "hprc_multiallelic.vcf").read()
result = data.info.AF[0:2]

@@ -337,4 +337,4 @@ assert_raggedarray_equal(result, [

def test_read_write_vcf_gives_identical_file():
file = "example_data/variants_with_single_individual_genotypes_and_info.vcf"
def test_read_write_vcf_gives_identical_file(data_path):
file = data_path /"variants_with_single_individual_genotypes_and_info.vcf"
variants = bnp.open(file).read()

@@ -353,3 +353,3 @@

def test_read_vcf_replace_field():
file = "example_data/variants_with_single_individual_genotypes_and_info.vcf"
file = data_path / "variants_with_single_individual_genotypes_and_info.vcf"
variants = bnp.open(file).read()

@@ -370,15 +370,15 @@ variants = bnp.replace(variants, position=np.ones_like(variants.position))

#@pytest.mark.xfail
def test_parse_vcf_that_fails():
vcf = bnp.open(get_file_name("example_data/variants_with_af.vcf")).read()
def test_parse_vcf_that_fails(data_path):
vcf = bnp.open(data_path /"variants_with_af.vcf").read()
print(vcf)
def test_genotype_print():
i = bnp.open(get_file_name("example_data/thousand_genomes.vcf"),
def test_genotype_print(data_path):
i = bnp.open(data_path / "thousand_genomes.vcf",
buffer_type=VCFBuffer2).read()
print(i.genotype)
def test_ioi():
out_filename = "tmp_ioi.vcf"
i = bnp.open(get_file_name("example_data/thousand_genomes.vcf"),
def test_ioi(tmp_path, data_path):
out_filename = tmp_path / "tmp_ioi.vcf"
i = bnp.open(data_path / "thousand_genomes.vcf",
buffer_type=VCFBuffer2).read()

@@ -393,4 +393,4 @@ print(i.genotype)

def test_vcf_haplotyped():
vcf = bnp.open(get_file_name("example_data/haplotypes.vcf"), buffer_type=VCFHaplotypeBuffer).read()
def test_vcf_haplotyped(data_path ):
vcf = bnp.open(data_path / "haplotypes.vcf", buffer_type=VCFHaplotypeBuffer).read()
print(vcf.genotype)

@@ -397,0 +397,0 @@ genotype_ = vcf.genotype[1][:3]

@@ -0,5 +1,7 @@

import os
from bionumpy.io.vcf_header import parse_header
import bionumpy as bnp
import pytest
from .conftest import data_path
from bionumpy.util.testing import assert_encoded_array_equal

@@ -95,4 +97,4 @@

def test_vcf_lof():
variants = bnp.open("example_data/lof_file.vcf").read()
def test_vcf_lof(data_path):
variants = bnp.open(data_path / "lof_file.vcf").read()
lof = variants.info.LoF

@@ -107,4 +109,4 @@ n_variants = len(variants)

def test_vcf_info_data_object():
variants = bnp.open("example_data/lof_file.vcf").read()
def test_vcf_info_data_object(data_path):
variants = bnp.open(data_path / "lof_file.vcf").read()
info = variants.info.get_data_object()

@@ -114,14 +116,18 @@ print(str(info))

def test_vcf_filtering_chunk():
with bnp.open('tmp.vcf', 'w') as f:
for chunk in bnp.open("example_data/lof_file.vcf").read_chunks():
def test_vcf_filtering_chunk(tmp_path, data_path):
in_filepath = data_path / "lof_file.vcf"
out_filepath = tmp_path / 'tmp.vcf'
with bnp.open(out_filepath, 'w') as f:
for chunk in bnp.open(in_filepath).read_chunks():
f.write(chunk[(chunk.info.LoF.lengths > 0) & chunk.info.ONCOGENE])
assert bnp.count_entries('tmp.vcf') == 2
assert bnp.count_entries(out_filepath) == 2
def test_locations():
def test_locations(data_path):
k = 5
# Read genome and variants
genome = bnp.Genome.from_file("example_data/sacCer3.fa", filter_function=None)
variants_file = "example_data/sacCer3_sample_variants.vcf.gz"
genome_file_name = data_path / "sacCer3.fa"
genome = bnp.Genome.from_file(genome_file_name, filter_function=None)
variants_file = data_path / "sacCer3_sample_variants.vcf.gz"
print(bnp.open(variants_file).read())

@@ -137,3 +143,3 @@

print(variants)
windows = variants.get_windows(flank=k-1)
windows = variants.get_windows(flank=k - 1)
print(windows)

@@ -146,3 +152,3 @@ # Use the windows to extract sequences (kmers)

assert_encoded_array_equal(sequences[:, k - 1], variants.get_data_field('ref_seq').ravel())
sequences[:, k-1] = variants.get_data_field('alt_seq').ravel()
sequences[:, k - 1] = variants.get_data_field('alt_seq').ravel()
assert_encoded_array_equal(sequences[:, k - 1], variants.get_data_field('alt_seq').ravel())

@@ -152,2 +158,6 @@ print(sequences)

alt_kmers = bnp.get_kmers(sequences, k)
print(alt_kmers[0:3])
print(alt_kmers[0:3])
fai_filename = genome_file_name.with_suffix(genome_file_name.suffix + '.fai')
# remove file
if os.path.exists(fai_filename):
os.remove(fai_filename)