bionumpy
Advanced tools
| from pathlib import Path | ||
| import pytest | ||
| @pytest.fixture() | ||
| def data_path(): | ||
| return Path(__file__).parent.parent / 'example_data' |
| Metadata-Version: 2.1 | ||
| Name: bionumpy | ||
| Version: 1.0.5 | ||
| Version: 1.0.6 | ||
| Summary: Library for working with biological sequence data as numpy arrays. | ||
@@ -5,0 +5,0 @@ Home-page: https://github.com/bionumpy/bionumpy |
@@ -134,2 +134,3 @@ HISTORY.rst | ||
| tests/buffers.py | ||
| tests/conftest.py | ||
| tests/fixtures.py | ||
@@ -136,0 +137,0 @@ tests/genomic_fixtures.py |
@@ -5,3 +5,3 @@ """Top-level package for bionumpy.""" | ||
| __email__ = "knutdrand@gmail.com" | ||
| __version__ = '1.0.5' | ||
| __version__ = '1.0.6' | ||
@@ -8,0 +8,0 @@ import npstructures as nps |
@@ -0,1 +1,4 @@ | ||
| from pathlib import Path | ||
| from typing import Union | ||
| import numpy as np | ||
@@ -64,5 +67,7 @@ from ..encoded_array import EncodedArray, as_encoded_array, EncodedRaggedArray | ||
| def __init__(self, filename: str): | ||
| def __init__(self, filename: Union[str, Path]): | ||
| if isinstance(filename, str): | ||
| filename = Path(filename) | ||
| self._filename = filename | ||
| self._index = read_index(filename+".fai") | ||
| self._index = read_index(filename.with_suffix(filename.suffix + ".fai")) | ||
| self._f_obj = open(filename, "rb") | ||
@@ -69,0 +74,0 @@ self._index_table = FastaIdx.from_entry_tuples( |
| import codecs | ||
| import logging | ||
| import numpy as np | ||
| from typing.io import IO | ||
| try: | ||
| from typing import IO | ||
| except ImportError: | ||
| from typing.io import IO | ||
| from npstructures import npdataclass | ||
@@ -6,0 +9,0 @@ |
+1
-1
| Metadata-Version: 2.1 | ||
| Name: bionumpy | ||
| Version: 1.0.5 | ||
| Version: 1.0.6 | ||
| Summary: Library for working with biological sequence data as numpy arrays. | ||
@@ -5,0 +5,0 @@ Home-page: https://github.com/bionumpy/bionumpy |
+1
-1
@@ -49,3 +49,3 @@ #!/usr/bin/env python | ||
| url='https://github.com/bionumpy/bionumpy', | ||
| version='1.0.5', | ||
| version='1.0.6', | ||
| zip_safe=False, | ||
@@ -52,0 +52,0 @@ extras_require={'full': ['isal']} |
+10
-8
@@ -6,12 +6,12 @@ from bionumpy import Bed6 | ||
| "bed": Bed6.from_entry_tuples([ | ||
| ("chr1", 1, 3, ".", 0, "-"), | ||
| ("chr1", 40, 60, ".", 1, "+"), | ||
| ("chr20", 400, 600, ".", 2, "+")]), | ||
| ("chr1", 1, 3, ".", 0, "-"), | ||
| ("chr1", 40, 60, ".", 1, "+"), | ||
| ("chr20", 400, 600, ".", 2, "+")]), | ||
| "vcf2": VCFEntry.from_entry_tuples([ | ||
| ("chr1", 88361, "rs4970378", "A", "G", ".", ".", "."), | ||
| ("chr1", 887559, "rs3748595", "A", "CAA", ".", ".", "."), | ||
| ("chr2", 8877, "rs3828047", "AGG", "C", ".", ".", ".")]), | ||
| ("chr1", 88361, "rs4970378", "A", "G", ".", ".", "."), | ||
| ("chr1", 887559, "rs3748595", "A", "CAA", ".", ".", "."), | ||
| ("chr2", 8877, "rs3828047", "AGG", "C", ".", ".", ".")]), | ||
| "fastq": SequenceEntryWithQuality.from_entry_tuples([ | ||
| ("headerishere", "CTTGTTGA", "".join("!" for _ in "CTTGTTGA")), | ||
| ("anotherheader", "CGG", "".join("~" for _ in "CGG"))]),} | ||
| ("anotherheader", "CGG", "".join("~" for _ in "CGG"))]), } | ||
| ''' | ||
@@ -42,2 +42,4 @@ "vcf": [ | ||
| ('chr1', 9871, 9872, 0.17042)]) | ||
| }''' | ||
| }''' | ||
@@ -80,8 +80,2 @@ import pytest | ||
| @pytest.fixture | ||
| def tmp_path(): | ||
| from pathlib import Path | ||
| path = Path('tmp_folder') | ||
| path.mkdir(exist_ok=True) | ||
| return path | ||
@@ -103,4 +97,2 @@ @pytest.fixture | ||
| d = f.read() | ||
| print(d) | ||
| print(d.flag.dtype) | ||
| assert_encoded_array_equal(d.extra[-1], 'NM:i:1') | ||
@@ -107,0 +99,0 @@ |
+13
-10
@@ -11,4 +11,5 @@ import numpy as np | ||
| def test_read_acceptance(): | ||
| filename = "example_data/test.bam" | ||
| def test_read_acceptance(data_path): | ||
| filename = data_path / "test.bam" | ||
| f = bnp.open(filename) | ||
@@ -20,4 +21,4 @@ d = f.read() | ||
| def test_read_intervals_acceptance(): | ||
| filename = "example_data/test.bam" | ||
| def test_read_intervals_acceptance(data_path): | ||
| filename = data_path / "test.bam" | ||
| f = bnp.open(filename, buffer_type=BamIntervalBuffer) | ||
@@ -30,4 +31,4 @@ d = f.read() | ||
| @pytest.fixture() | ||
| def bam_entries(): | ||
| filename = get_file_name('example_data/small_alignments.bam') | ||
| def bam_entries(data_path): | ||
| filename = data_path / 'small_alignments.bam' | ||
| entries = bnp.open(filename).read() | ||
@@ -52,8 +53,10 @@ return entries | ||
| def test_write_bam(bam_entries): | ||
| def test_write_bam(bam_entries, tmp_path): | ||
| subset = bam_entries[bam_entries.mapq == 60] | ||
| with bnp.open('tmp.bam', mode='w') as f: | ||
| output_file = tmp_path / 'tmp.bam' | ||
| with bnp.open(output_file, mode='w') as f: | ||
| f.write(subset) | ||
| assert open('tmp.bam', 'rb').read()[-28:] == b'\x1f\x8b\x08\x04\x00\x00\x00\x00\x00\xff\x06\x00\x42\x43\x02\x00\x1b\x00\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00' | ||
| new_entries = bnp.open('tmp.bam').read() | ||
| assert open(output_file, 'rb').read()[-28:] == b'\x1f\x8b\x08\x04\x00\x00\x00\x00\x00\xff\x06\x00\x42\x43\x02\x00\x1b\x00\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00' | ||
| new_entries = bnp.open(output_file).read() | ||
| assert_array_equal(new_entries.position, subset.position) | ||
@@ -60,0 +63,0 @@ |
@@ -106,6 +106,7 @@ import dataclasses | ||
| @pytest.mark.parametrize("file", [ | ||
| "example_data/variants.vcf", | ||
| "example_data/variants_with_header.vcf" | ||
| "variants.vcf", | ||
| "variants_with_header.vcf" | ||
| ]) | ||
| def test_read_header(file): | ||
| def test_read_header(file,data_path): | ||
| file = data_path/file | ||
| chunks = list(bnp.open(file).read_chunks()) | ||
@@ -112,0 +113,0 @@ true_header = "".join(line for line in open(file) if line.startswith("#")) |
@@ -30,8 +30,8 @@ import pytest | ||
| @pytest.fixture | ||
| def intervals(): | ||
| return bnp.open("example_data/ctcf.bed.gz").read() | ||
| def intervals(data_path): | ||
| return bnp.open(data_path / "ctcf.bed.gz").read() | ||
| @pytest.fixture | ||
| def chrom_sizes(): | ||
| return bnp.open("example_data/hg38.chrom.sizes").read() | ||
| def chrom_sizes(data_path): | ||
| return bnp.open(data_path / "hg38.chrom.sizes").read() | ||
@@ -38,0 +38,0 @@ |
| from bionumpy.genomic_data import Genome | ||
| def test_genomic_annotation(): | ||
| g = Genome.from_file('example_data/hg38.chrom.sizes') | ||
| a = g.read_annotation('example_data/small_gff.gff3') | ||
| def test_genomic_annotation(data_path): | ||
| g = Genome.from_file(data_path / 'hg38.chrom.sizes') | ||
| a = g.read_annotation(data_path / 'small_gff.gff3') | ||
| print(a.genes.gene_id) | ||
| print(a.transcripts.transcript_id) | ||
| print(a.exons.exon_id) |
@@ -68,4 +68,4 @@ import pytest | ||
| #@pytest.mark.xfail | ||
| def test_groupby_many_chunks(): | ||
| file = "example_data/variants_with_header.vcf" | ||
| def test_groupby_many_chunks(data_path): | ||
| file = data_path / "variants_with_header.vcf" | ||
| chunks = bnp.open(file).read_chunks(100) | ||
@@ -72,0 +72,0 @@ for chromosome, variants in bnp.groupby(chunks, "chromosome"): |
@@ -7,7 +7,6 @@ import pytest | ||
| from bionumpy.util.testing import assert_encoded_array_equal | ||
| from .util import get_file_name | ||
| @pytest.fixture | ||
| def gtf_entries(): | ||
| return bnp.open(get_file_name("example_data/small.gtf")).read() | ||
| def gtf_entries(data_path ): | ||
| return bnp.open(data_path / "small.gtf").read() | ||
@@ -40,4 +39,4 @@ @pytest.fixture | ||
| # @pytest.mark.skip('waiting') | ||
| def test_read_gff(): | ||
| annotation = bnp.open('example_data/small_gff.gff3').read() | ||
| def test_read_gff(data_path): | ||
| annotation = bnp.open(data_path / 'small_gff.gff3').read() | ||
| genes = annotation.get_genes() | ||
@@ -49,5 +48,5 @@ assert genes[0].gene_id == 'ENSG00000290825.1' | ||
| def test_read_sarcer_gtf(): | ||
| annotation = bnp.open('example_data/sacCer3.ensGene.gtf.gz').read() | ||
| def test_read_sarcer_gtf(data_path): | ||
| annotation = bnp.open(data_path / 'sacCer3.ensGene.gtf.gz').read() | ||
| transcripts = annotation.get_transcripts() | ||
| assert len(transcripts) > 0 |
@@ -30,4 +30,4 @@ import dataclasses | ||
| @pytest.fixture | ||
| def file_name(): | ||
| name = 'tmp1234.tsv' | ||
| def file_name(tmp_path): | ||
| name = tmp_path / 'tmp1234.tsv' | ||
| with open(name, 'w') as f: | ||
@@ -38,4 +38,4 @@ f.write(text) | ||
| @pytest.fixture | ||
| def empty_file_name(header): | ||
| name = 'empty.csv' | ||
| def empty_file_name(header, tmp_path): | ||
| name = tmp_path / 'empty.csv' | ||
| with open(name, 'w') as f: | ||
@@ -46,4 +46,4 @@ f.write(header) | ||
| @pytest.fixture | ||
| def full_file_name(full_text): | ||
| name = 'tmp1234full.tsv' | ||
| def full_file_name(full_text, tmp_path): | ||
| name = tmp_path / 'tmp1234full.tsv' | ||
| with open(name, 'w') as f: | ||
@@ -50,0 +50,0 @@ f.write(full_text) |
@@ -18,9 +18,9 @@ from numpy.testing import assert_equal | ||
| def test_fasta_index(): | ||
| index = create_index("example_data/small_genome.fa") | ||
| def test_fasta_index(data_path): | ||
| index = create_index(data_path / "small_genome.fa") | ||
| assert_equal(index.length, [300, 600, 900, 1200]) | ||
| def test_dictlike(): | ||
| idx_fasta = bnp.open_indexed("example_data/small_genome.fa") | ||
| def test_dictlike(data_path): | ||
| idx_fasta = bnp.open_indexed(data_path / "small_genome.fa") | ||
| assert list(idx_fasta.keys()) == ["0", "1", "2", "3"] | ||
@@ -36,4 +36,4 @@ assert "Indexed Fasta" in repr(idx_fasta) | ||
| def test_get_sequences(): | ||
| idx_fasta = bnp.open_indexed("example_data/small_genome.fa") | ||
| def test_get_sequences(data_path): | ||
| idx_fasta = bnp.open_indexed(data_path / "small_genome.fa") | ||
| _intervals = Interval.from_entry_tuples([("1", 10, 20), | ||
@@ -40,0 +40,0 @@ ("2", 11, 50), |
+26
-25
@@ -122,5 +122,4 @@ import os | ||
| @pytest.mark.parametrize("buffer_name", ["bed", "vcf", "fastq", "fasta"]) | ||
| def test_ctx_manager_read(buffer_name): | ||
| file_path = Path(f"./{buffer_name}_example.{buffer_name}") | ||
| def test_ctx_manager_read(buffer_name, tmp_path): | ||
| file_path = tmp_path / f"./{buffer_name}_example.{buffer_name}" | ||
| with open(file_path, "w") as file: | ||
@@ -132,5 +131,3 @@ file.write(buffer_texts[buffer_name]) | ||
| os.remove(file_path) | ||
| @pytest.mark.parametrize("buffer_name", ["bed", "vcf", "fastq", "fasta"]) | ||
@@ -170,6 +167,7 @@ def test_append_to_file(buffer_name): | ||
| def test_write_empty(): | ||
| def test_write_empty(tmp_path): | ||
| entry = VCFEntry([], [], [], [], | ||
| [], [], [], []) | ||
| with bnp.open('tmp.vcf', 'w') as f: | ||
| filename = tmp_path / 'tmp.vcf' | ||
| with bnp.open(filename, 'w') as f: | ||
| f.write(entry) | ||
@@ -198,3 +196,3 @@ | ||
| @pytest.fixture | ||
| def fastq_with_carriage_return_filename(): | ||
| def fastq_with_carriage_return_filename(tmp_path): | ||
| text = '''\ | ||
@@ -206,3 +204,3 @@ @test_sequence_id_here\r | ||
| ''' | ||
| filename = 'carriage_return.fq' | ||
| filename = tmp_path/'carriage_return.fq' | ||
| with open(filename, 'w') as file: | ||
@@ -214,3 +212,3 @@ file.write(text) | ||
| @pytest.fixture | ||
| def bed_with_carriage_return_filename(): | ||
| def bed_with_carriage_return_filename(tmp_path): | ||
| text = '''\ | ||
@@ -220,3 +218,3 @@ chr1\t1\t2\r | ||
| ''' | ||
| filename = 'carriage_return.bed' | ||
| filename = tmp_path / 'carriage_return.bed' | ||
| with open(filename, 'w') as file: | ||
@@ -228,3 +226,3 @@ file.write(text) | ||
| @pytest.fixture | ||
| def fasta_with_carriage_return_filename(): | ||
| def fasta_with_carriage_return_filename(tmp_path): | ||
| text = '''\ | ||
@@ -237,3 +235,3 @@ >test_sequence_id_here\r | ||
| ''' | ||
| filename = 'carriage_return.fa' | ||
| filename = tmp_path/'carriage_return.fa' | ||
| with open(filename, 'w') as file: | ||
@@ -264,13 +262,16 @@ file.write(text) | ||
| # @pytest.mark.xfail | ||
| def test_carriage_return_fai(fasta_with_carriage_return_filename): | ||
| def test_carriage_return_fai(fasta_with_carriage_return_filename: Path): | ||
| # remove file if it exists | ||
| if os.path.exists(fasta_with_carriage_return_filename + '.fai'): | ||
| os.remove(fasta_with_carriage_return_filename + '.fai') | ||
| fai = bnp.open_indexed(fasta_with_carriage_return_filename) | ||
| # add .fai to the end of the file | ||
| filename = fasta_with_carriage_return_filename | ||
| fai_filename = filename.with_suffix(filename.suffix + '.fai') | ||
| if os.path.exists(fai_filename): | ||
| os.remove(fai_filename) | ||
| fai = bnp.open_indexed(filename) | ||
| assert_encoded_array_equal(fai['test_sequence_id_here'].raw(), 'GACTG') | ||
| assert_encoded_array_equal(fai['test_sequence_id_here2'].raw(), 'GACTCGAG') | ||
| def test_rwr_bed_with_change(): | ||
| tmp_path = 'tmp_rwr.bed' | ||
| filename = get_file_name('example_data/alignments.bed') | ||
| def test_rwr_bed_with_change(tmp_path, data_path): | ||
| file_path = tmp_path / 'tmp_rwr.bed' | ||
| filename = data_path / 'alignments.bed' | ||
| data = bnp.open(filename, buffer_type=bnp.io.Bed6Buffer).read() | ||
@@ -281,10 +282,10 @@ data.start = data.start + 1 | ||
| data == data[::2] | ||
| if os.path.exists(tmp_path): | ||
| os.remove(tmp_path) | ||
| bnp.open(tmp_path, 'w', buffer_type=bnp.io.Bed6Buffer).write(data) | ||
| text = open(tmp_path).read() | ||
| if os.path.exists(file_path): | ||
| os.remove(file_path) | ||
| bnp.open(file_path, 'w', buffer_type=bnp.io.Bed6Buffer).write(data) | ||
| text = open(file_path).read() | ||
| assert text.startswith('chr1'), text[:10] | ||
| print(text) | ||
| data2 = bnp.open(tmp_path).read() | ||
| data2 = bnp.open(file_path).read() | ||
| assert_equal(data.start, data2.start) | ||
| assert np.all(data.chromosome == data2.chromosome) |
@@ -35,4 +35,4 @@ import pytest | ||
| def test_cosmic_read(): | ||
| matrix = bnp.io.read_matrix('example_data/COSMIC_v3.3.1_SBS_GRCh38.txt') | ||
| def test_cosmic_read(data_path): | ||
| matrix = bnp.io.read_matrix(data_path / 'COSMIC_v3.3.1_SBS_GRCh38.txt') | ||
| encoded = bnp.as_encoded_array(matrix.row_names.to_numpy_array(), | ||
@@ -39,0 +39,0 @@ MutationTypeEncoding(1)) |
+15
-16
@@ -58,13 +58,13 @@ import os | ||
| @pytest.mark.parametrize("file", ["example_data/reads.fq", "example_data/big.fq.gz"]) | ||
| @pytest.mark.parametrize("file", ["reads.fq", "big.fq.gz"]) | ||
| @pytest.mark.parametrize("chunk_size", [100, 5000000]) | ||
| def test_buffered_writer_ctx_manager(file, chunk_size): | ||
| def test_buffered_writer_ctx_manager(file, chunk_size, tmp_path, data_path): | ||
| file = data_path / file | ||
| file_path = tmp_path / "tmp.fq" | ||
| true_stream = bnp_open(data_path /'reads.fq').read_chunks() | ||
| file_path = "./tmp.fq" | ||
| true_stream = bnp_open('example_data/reads.fq').read_chunks() | ||
| with bnp_open(file_path, mode='w') as f: | ||
| f.write(true_stream) | ||
| true_stream = bnp_open('example_data/reads.fq').read_chunks() | ||
| true_stream = bnp_open(data_path / 'reads.fq').read_chunks() | ||
| fq_stream = bnp_open(file_path) | ||
@@ -74,3 +74,3 @@ for fq_item, true_item in zip(fq_stream, true_stream): | ||
| os.remove(file_path) | ||
| # os.remove(file_path) | ||
@@ -118,3 +118,2 @@ | ||
| @pytest.mark.skip("makingtrouble") | ||
| @pytest.mark.parametrize("file_name", glob.glob("example_data/*")) | ||
| def test_read_example_data(file_name): | ||
@@ -189,4 +188,4 @@ if "broken" in file_name: | ||
| def test_read_chunk_after_read_chunks_returns_empty_dataclass(): | ||
| file = bnp.open("example_data/reads.fq") | ||
| def test_read_chunk_after_read_chunks_returns_empty_dataclass(data_path): | ||
| file = bnp.open(data_path / 'reads.fq') | ||
| chunks = list(file.read_chunks()) | ||
@@ -198,4 +197,4 @@ new_chunk = file.read_chunk() | ||
| def test_read_gtf(): | ||
| file = bnp.open("example_data/small.gtf") | ||
| def test_read_gtf(data_path): | ||
| file = bnp.open(data_path / 'small.gtf') | ||
| chunk = file.read_chunk() | ||
@@ -205,5 +204,5 @@ assert True | ||
| def test_read_bam(): | ||
| data = bnp.open("example_data/alignments.bam").read() | ||
| data2 = bnp.open("example_data/alignments.sam").read() | ||
| def test_read_bam(data_path): | ||
| data = bnp.open(data_path / 'alignments.bam').read() | ||
| data2 = bnp.open(data_path / 'alignments.sam').read() | ||
| print(data) | ||
@@ -216,4 +215,4 @@ print(data2) | ||
| print(data) | ||
| n_lines = len([line for line in open("example_data/alignments.sam") if not line.startswith("@")]) | ||
| n_lines = len([line for line in open(data_path / 'alignments.sam') if not line.startswith("@")]) | ||
| assert n_lines == len(data) | ||
@@ -7,7 +7,7 @@ import pytest | ||
| from bionumpy.io.jaspar import read_jaspar_matrix | ||
| from bionumpy.sequence.position_weight_matrix import PositionWeightMatrix, _pwm_from_counts, PWM, get_motif_scores, get_motif_scores_old | ||
| from bionumpy.encodings.alphabet_encoding import AlphabetEncoding | ||
| from bionumpy.sequence.position_weight_matrix import PositionWeightMatrix, PWM, get_motif_scores, get_motif_scores_old | ||
| from bionumpy import EncodedArray | ||
| from bionumpy.io.motifs import read_motif | ||
| @pytest.fixture | ||
@@ -20,2 +20,3 @@ def neutral_ppm_dict(): | ||
| @pytest.fixture | ||
@@ -58,3 +59,3 @@ def a_ppm_dict(): | ||
| log_prob = PositionWeightMatrix(pwm)(window) | ||
| np.testing.assert_allclose(np.exp(log_prob), 0.4*0.25) | ||
| np.testing.assert_allclose(np.exp(log_prob), 0.4 * 0.25) | ||
@@ -65,22 +66,22 @@ | ||
| log_prob = PositionWeightMatrix(pwm).rolling_window(sequence) | ||
| np.testing.assert_allclose(np.exp(log_prob), [0.4*0.25, 0.025, 0.4*0.25]) | ||
| np.testing.assert_allclose(np.exp(log_prob), [0.4 * 0.25, 0.025, 0.4 * 0.25]) | ||
| def test_integration(): | ||
| def test_integration(data_path): | ||
| # Read the alphabet and counts from jaspar file | ||
| pwm = read_jaspar_matrix("example_data/MA0080.1.jaspar") | ||
| pwm = read_jaspar_matrix(data_path /"MA0080.1.jaspar") | ||
| # Convert counts to position weight matrix | ||
| # pwm = PWM.from_dict(pwm) | ||
| # Make an array-class for the alphabet | ||
| # encoding = AlphabetEncoding(alphabet) | ||
| # Get the motif score function | ||
| # pwm = PWM(pwm, alphabet) | ||
| motif_score = PositionWeightMatrix(pwm) | ||
| #Get reads | ||
| entries = bnp.open("example_data/reads.fq").read() | ||
| # Get reads | ||
| entries = bnp.open(data_path / "reads.fq").read() | ||
| # Calculate the motif score for each valid window | ||
@@ -90,5 +91,5 @@ scores = motif_score.rolling_window(entries.sequence) | ||
| def test_read_csv_motif(): | ||
| pwm = read_motif("example_data/pwm.csv") | ||
| pwm_jaspar = read_motif("example_data/pwm.jaspar") | ||
| def test_read_csv_motif(data_path): | ||
| pwm = read_motif(data_path / "pwm.csv") | ||
| pwm_jaspar = read_motif(data_path / "pwm.jaspar") | ||
| assert str(pwm) == str(pwm_jaspar) | ||
@@ -99,5 +100,5 @@ | ||
| pwm = PWM(matrix, "ACGT") | ||
| #window = EncodedArray(window, AlphabetEncoding("ACGT")) | ||
| # window = EncodedArray(window, AlphabetEncoding("ACGT")) | ||
| log_prob = pwm.calculate_score(window) | ||
| np.testing.assert_allclose(np.exp(log_prob), 0.4*0.25) | ||
| np.testing.assert_allclose(np.exp(log_prob), 0.4 * 0.25) | ||
@@ -126,3 +127,3 @@ | ||
| scores = pwm.calculate_scores("AAC") | ||
| assert_array_equal(scores, [np.log(4**2), -np.inf, -np.inf]) | ||
| assert_array_equal(scores, [np.log(4 ** 2), -np.inf, -np.inf]) | ||
@@ -136,3 +137,2 @@ | ||
| log_prob = pwm.calculate_score(window) | ||
| np.testing.assert_allclose(np.exp(log_prob), 0.4*0.25) | ||
| np.testing.assert_allclose(np.exp(log_prob), 0.4 * 0.25) |
@@ -11,4 +11,4 @@ # Various tests for reading and parsing real data files | ||
| def test_read_polaris_vcf(): | ||
| data = bnp.open("example_data/polaris.vcf") | ||
| def test_read_polaris_vcf(data_path): | ||
| data = bnp.open(data_path / "polaris.vcf") | ||
@@ -20,10 +20,10 @@ for chunk in data: | ||
| def test_read_syndip_vcf(): | ||
| data = bnp.open("example_data/syndip.vcf").read() | ||
| def test_read_syndip_vcf(data_path): | ||
| data = bnp.open(data_path / "syndip.vcf").read() | ||
| print(data.info) | ||
| def test_read_vcf_info_field_with_missing_header(): | ||
| data = bnp.open("example_data/vcf_with_broken_header.vcf").read() | ||
| def test_read_vcf_info_field_with_missing_header(data_path): | ||
| data = bnp.open(data_path / "vcf_with_broken_header.vcf").read() | ||
| assert isinstance(data.info, EncodedRaggedArray) and data.info.encoding == bnp.BaseEncoding, \ | ||
| "Should parse as string when info tags missing" |
@@ -17,3 +17,2 @@ import pytest | ||
| from numpy.random import default_rng | ||
| from .util import get_file_name | ||
@@ -110,4 +109,4 @@ rng = default_rng() | ||
| def test_simulate_from_genome(): | ||
| ref = get_file_name("example_data/small_genome.fa") | ||
| def test_simulate_from_genome(data_path): | ||
| ref = data_path / "small_genome.fa" | ||
| genome = bnp.Genome.from_file(ref) | ||
@@ -125,4 +124,4 @@ genome = genome.read_sequence(ref) | ||
| def test_simulate_variants(): | ||
| ref = "example_data/small_genome.fa" | ||
| def test_simulate_variants(data_path): | ||
| ref = data_path / "small_genome.fa" | ||
| genome = bnp.Genome.from_file(ref) | ||
@@ -129,0 +128,0 @@ genome = genome.read_sequence(ref) |
@@ -36,4 +36,4 @@ import numpy as np | ||
| @pytest.fixture | ||
| def file_name(): | ||
| name = 'string_array_test.txt' | ||
| def file_name(tmp_path): | ||
| name = tmp_path / 'string_array_test.txt' | ||
| open(name, 'w').write( | ||
@@ -40,0 +40,0 @@ '''\ |
@@ -27,4 +27,4 @@ import pytest | ||
| @pytest.fixture | ||
| def chrom_names(): | ||
| return bnp.open("example_data/hg38.chrom.sizes").read().name | ||
| def chrom_names(data_path): | ||
| return bnp.open(data_path / "hg38.chrom.sizes").read().name | ||
@@ -31,0 +31,0 @@ |
@@ -114,5 +114,5 @@ import pytest | ||
| def test_chromosome_str_equal(): | ||
| bam = bnp.open("example_data/test.bam").read() | ||
| bam2 = bnp.open("example_data/test.bam").read() | ||
| def test_chromosome_str_equal(data_path): | ||
| bam = bnp.open(data_path / "test.bam").read() | ||
| bam2 = bnp.open(data_path / "test.bam").read() | ||
| assert np.all(str_equal(bam.chromosome, bam2.chromosome)) | ||
@@ -119,0 +119,0 @@ |
| import dataclasses | ||
| import bionumpy as bnp | ||
| import numpy as np | ||
| from npstructures.testing import assert_raggedarray_equal | ||
| from numpy.testing import assert_array_equal | ||
| from bionumpy.bnpdataclass import BNPDataClass | ||
| import bionumpy as bnp | ||
@@ -14,13 +12,13 @@ import bionumpy.encoded_array | ||
| from bionumpy.datatypes import VCFEntry, VCFEntryWithGenotypes | ||
| from bionumpy.datatypes import VCFEntryWithGenotypes | ||
| from bionumpy.encodings.vcf_encoding import PhasedGenotypeRowEncoding, GenotypeRowEncoding, PhasedHaplotypeRowEncoding | ||
| from bionumpy.util.testing import assert_bnpdataclass_equal | ||
| from tests.util import get_file_name | ||
| def test_vcf_matrix_buffer(): | ||
| f = bnp.open("example_data/variants_with_header.vcf", | ||
| def test_vcf_matrix_buffer(tmp_path, data_path): | ||
| f = bnp.open(data_path / "variants_with_header.vcf", | ||
| buffer_type=bionumpy.io.vcf_buffers.PhasedVCFMatrixBuffer) | ||
| out = bnp.open("test1.vcf", mode="w") | ||
| out_path = tmp_path / "test1.vcf" | ||
| out = bnp.open(out_path, mode="w") | ||
@@ -32,23 +30,24 @@ for chunk in f: | ||
| filestart = open('test1.vcf').read(100) | ||
| filestart = open(out_path).read(100) | ||
| assert filestart.startswith('#'), filestart | ||
| # check that header was written | ||
| chunk = bnp.open("test1.vcf").read_chunk() | ||
| chunk = bnp.open(out_path).read_chunk() | ||
| assert chunk.get_context("header") != "" and chunk.get_context("header") == header | ||
| def test_vcf_matrix_buffer_stream(): | ||
| f = bnp.open("example_data/variants_with_header.vcf", | ||
| def test_vcf_matrix_buffer_stream(tmp_path, data_path): | ||
| f = bnp.open(data_path / "variants_with_header.vcf", | ||
| buffer_type=bionumpy.io.vcf_buffers.PhasedVCFMatrixBuffer) | ||
| out = bnp.open("test1.vcf", mode="w") | ||
| outpath = tmp_path / "test1.vcf" | ||
| out = bnp.open(outpath, mode="w") | ||
| out.write(f.read_chunks()) | ||
| # check that header was written | ||
| chunk = bnp.open("test1.vcf").read_chunk() | ||
| chunk = bnp.open(outpath).read_chunk() | ||
| assert chunk.get_context("header") != "" | ||
| def test_context_state(): | ||
| f = bnp.open("example_data/variants_with_header.vcf").read() | ||
| def test_context_state(data_path): | ||
| f = bnp.open(data_path / "variants_with_header.vcf").read() | ||
| assert f.get_context("header") | ||
| f2 = bnp.open("example_data/variants.vcf").read() | ||
| f2 = bnp.open(data_path / "variants.vcf").read() | ||
| assert not f2.get_context("header") | ||
@@ -101,5 +100,5 @@ | ||
| def test_parse_unphased_vcf(): | ||
| # example_data/variants.vcf has messy unphased and missing genotypes | ||
| filename = "example_data/variants.vcf" | ||
| def test_parse_unphased_vcf(data_path): | ||
| # variants.vcf has messy unphased and missing genotypes | ||
| filename = data_path / "variants.vcf" | ||
| print(open(filename).read()) | ||
@@ -125,4 +124,4 @@ f = bnp.open(filename, buffer_type=bionumpy.io.vcf_buffers.VCFMatrixBuffer) | ||
| def test_parse_phased_vcf(): | ||
| f = bnp.open("example_data/variants_phased.vcf", buffer_type=bionumpy.io.vcf_buffers.PhasedVCFMatrixBuffer) | ||
| def test_parse_phased_vcf(data_path): | ||
| f = bnp.open(data_path / "variants_phased.vcf", buffer_type=bionumpy.io.vcf_buffers.PhasedVCFMatrixBuffer) | ||
| data = f.read() | ||
@@ -139,4 +138,4 @@ data = data.genotypes.raw() | ||
| def test_read_info_field(): | ||
| vcf_filename = "example_data/variants_with_header.vcf" | ||
| def test_read_info_field(data_path): | ||
| vcf_filename = data_path / "variants_with_header.vcf" | ||
| f = bnp.open(vcf_filename, | ||
@@ -152,4 +151,4 @@ buffer_type=bionumpy.io.vcf_buffers.PhasedVCFMatrixBuffer) | ||
| @pytest.mark.skip('missing data') | ||
| def test_read_info_field2(): | ||
| vcf_filename = "example_data/info_flag.vcf" | ||
| def test_read_info_field2(data_path): | ||
| vcf_filename = data_path / "info_flag.vcf" | ||
| f = bnp.open(vcf_filename, | ||
@@ -163,4 +162,4 @@ buffer_type=bionumpy.io.vcf_buffers.PhasedVCFMatrixBuffer) | ||
| # @pytest.mark.xfail | ||
| def test_read_biallelic_vcf(): | ||
| file_name = "example_data/small_phased_biallelic.vcf" | ||
| def test_read_biallelic_vcf(data_path): | ||
| file_name = data_path / "small_phased_biallelic.vcf" | ||
| vcf = bnp.open(file_name, buffer_type=bnp.io.vcf_buffers.PhasedHaplotypeVCFMatrixBuffer) | ||
@@ -172,4 +171,4 @@ for chunk in vcf.read_chunks(): | ||
| @pytest.mark.xfail | ||
| def test_read_info_from_vcf(): | ||
| file = "example_data/variants_with_single_individual_genotypes_and_info.vcf" | ||
| def test_read_info_from_vcf(data_path): | ||
| file = data_path / "variants_with_single_individual_genotypes_and_info.vcf" | ||
| variants = bnp.open(file).read() | ||
@@ -183,4 +182,4 @@ | ||
| @pytest.mark.skip | ||
| def test_concatenate_variants(): | ||
| file = "example_data/variants_with_single_individual_genotypes_and_info.vcf" | ||
| def test_concatenate_variants(data_path): | ||
| file = data_path / "variants_with_single_individual_genotypes_and_info.vcf" | ||
| f = bnp.open(file) | ||
@@ -210,4 +209,4 @@ chunk1 = f.read_chunk(min_chunk_size=200) | ||
| @pytest.fixture | ||
| def data_with_info(): | ||
| file = "example_data/vcf_symbolic_sequences.vcf" | ||
| def data_with_info(data_path): | ||
| file = data_path / "vcf_symbolic_sequences.vcf" | ||
| data = bnp.open(file).read() | ||
@@ -246,4 +245,4 @@ return data | ||
| # @pytest.mark.skip # .genotype not implemented | ||
| def test_read_genotype_data_from_messy_vcf(): | ||
| file_name = "example_data/polaris_small.vcf" | ||
| def test_read_genotype_data_from_messy_vcf(data_path): | ||
| file_name = data_path / "polaris_small.vcf" | ||
| data = bnp.open(file_name, buffer_type=VCFBuffer2).read() | ||
@@ -254,4 +253,4 @@ genotype = data.genotype[0] | ||
| def test_read_genotype_with_more_data(): | ||
| file_name = get_file_name("example_data/syndip.vcf") | ||
| def test_read_genotype_with_more_data(data_path): | ||
| file_name = data_path / "syndip.vcf" | ||
| data = bnp.open(file_name, buffer_type=VCFBuffer2).read() | ||
@@ -261,3 +260,3 @@ genotypes = data.genotype[:4] | ||
| def test_write_genotype(): | ||
| def test_write_genotype(tmp_path): | ||
| data = narrow_type(VCFEntryWithGenotypes, 'info', str)( | ||
@@ -274,9 +273,10 @@ ['chr1', 'chr2'], | ||
| ) | ||
| with bnp.open("tmp.vcf", "w", buffer_type=VCFBuffer2) as f: | ||
| file_path = tmp_path / "tmp.vcf" | ||
| with bnp.open(file_path, "w", buffer_type=VCFBuffer2) as f: | ||
| f.write(data) | ||
| new_data = bnp.open("tmp.vcf", buffer_type=VCFBuffer2).read().get_data_object() | ||
| new_data = bnp.open(file_path, buffer_type=VCFBuffer2).read().get_data_object() | ||
| assert_bnpdataclass_equal(data, new_data) | ||
| def test_read_genotype_with_no_data(): | ||
| file_name = "example_data/variants_without_genotypes.vcf" | ||
| def test_read_genotype_with_no_data(data_path): | ||
| file_name = data_path / "variants_without_genotypes.vcf" | ||
| data = bnp.open(file_name, buffer_type=VCFBuffer2).read() | ||
@@ -286,4 +286,4 @@ genotypes = data.genotype[:4] | ||
| def test_read_empty_vcf(): | ||
| file_name = get_file_name("example_data/empty_variants.vcf") | ||
| def test_read_empty_vcf(data_path): | ||
| file_name = data_path / "empty_variants.vcf" | ||
| data = bnp.open(file_name, buffer_type=VCFBuffer2).read() | ||
@@ -294,4 +294,4 @@ assert len(data) == 0 | ||
| @pytest.mark.skip # genotype fields not implemented | ||
| def test_read_genotype_ad_field(): | ||
| file_name = "example_data/syndip.vcf" | ||
| def test_read_genotype_ad_field(data_path): | ||
| file_name = data_path / "syndip.vcf" | ||
| data = bnp.open(file_name, buffer_type=VCFBuffer2).read() | ||
@@ -305,4 +305,4 @@ assert_array_equal(data[0].genotype_data.AD == [1, 1]) | ||
| @pytest.mark.skip # genotype fields not implemented | ||
| def test_read_genotype_ad_field(): | ||
| file_name = "example_data/syndip.vcf" | ||
| def test_read_genotype_ad_field(data_path): | ||
| file_name = data_path / "syndip.vcf" | ||
| data = bnp.open(file_name, buffer_type=VCFBuffer2).read() | ||
@@ -315,4 +315,4 @@ assert_array_equal(data[0].genotype_data.AD == [1, 1]) | ||
| def test_read_thousand_genomes_info_field(): | ||
| data = bnp.open("example_data/thousand_genomes.vcf").read() | ||
| def test_read_thousand_genomes_info_field(data_path): | ||
| data = bnp.open(data_path / "thousand_genomes.vcf").read() | ||
@@ -327,4 +327,4 @@ assert_raggedarray_equal( | ||
| def test_read_hprc_multiallelic(): | ||
| data = bnp.open(get_file_name("example_data/hprc_multiallelic.vcf")).read() | ||
| def test_read_hprc_multiallelic(data_path): | ||
| data = bnp.open(data_path / "hprc_multiallelic.vcf").read() | ||
| result = data.info.AF[0:2] | ||
@@ -337,4 +337,4 @@ assert_raggedarray_equal(result, [ | ||
| def test_read_write_vcf_gives_identical_file(): | ||
| file = "example_data/variants_with_single_individual_genotypes_and_info.vcf" | ||
| def test_read_write_vcf_gives_identical_file(data_path): | ||
| file = data_path /"variants_with_single_individual_genotypes_and_info.vcf" | ||
| variants = bnp.open(file).read() | ||
@@ -353,3 +353,3 @@ | ||
| def test_read_vcf_replace_field(): | ||
| file = "example_data/variants_with_single_individual_genotypes_and_info.vcf" | ||
| file = data_path / "variants_with_single_individual_genotypes_and_info.vcf" | ||
| variants = bnp.open(file).read() | ||
@@ -370,15 +370,15 @@ variants = bnp.replace(variants, position=np.ones_like(variants.position)) | ||
| #@pytest.mark.xfail | ||
| def test_parse_vcf_that_fails(): | ||
| vcf = bnp.open(get_file_name("example_data/variants_with_af.vcf")).read() | ||
| def test_parse_vcf_that_fails(data_path): | ||
| vcf = bnp.open(data_path /"variants_with_af.vcf").read() | ||
| print(vcf) | ||
| def test_genotype_print(): | ||
| i = bnp.open(get_file_name("example_data/thousand_genomes.vcf"), | ||
| def test_genotype_print(data_path): | ||
| i = bnp.open(data_path / "thousand_genomes.vcf", | ||
| buffer_type=VCFBuffer2).read() | ||
| print(i.genotype) | ||
| def test_ioi(): | ||
| out_filename = "tmp_ioi.vcf" | ||
| i = bnp.open(get_file_name("example_data/thousand_genomes.vcf"), | ||
| def test_ioi(tmp_path, data_path): | ||
| out_filename = tmp_path / "tmp_ioi.vcf" | ||
| i = bnp.open(data_path / "thousand_genomes.vcf", | ||
| buffer_type=VCFBuffer2).read() | ||
@@ -393,4 +393,4 @@ print(i.genotype) | ||
| def test_vcf_haplotyped(): | ||
| vcf = bnp.open(get_file_name("example_data/haplotypes.vcf"), buffer_type=VCFHaplotypeBuffer).read() | ||
| def test_vcf_haplotyped(data_path ): | ||
| vcf = bnp.open(data_path / "haplotypes.vcf", buffer_type=VCFHaplotypeBuffer).read() | ||
| print(vcf.genotype) | ||
@@ -397,0 +397,0 @@ genotype_ = vcf.genotype[1][:3] |
+25
-15
@@ -0,5 +1,7 @@ | ||
| import os | ||
| from bionumpy.io.vcf_header import parse_header | ||
| import bionumpy as bnp | ||
| import pytest | ||
| from .conftest import data_path | ||
| from bionumpy.util.testing import assert_encoded_array_equal | ||
@@ -95,4 +97,4 @@ | ||
| def test_vcf_lof(): | ||
| variants = bnp.open("example_data/lof_file.vcf").read() | ||
| def test_vcf_lof(data_path): | ||
| variants = bnp.open(data_path / "lof_file.vcf").read() | ||
| lof = variants.info.LoF | ||
@@ -107,4 +109,4 @@ n_variants = len(variants) | ||
| def test_vcf_info_data_object(): | ||
| variants = bnp.open("example_data/lof_file.vcf").read() | ||
| def test_vcf_info_data_object(data_path): | ||
| variants = bnp.open(data_path / "lof_file.vcf").read() | ||
| info = variants.info.get_data_object() | ||
@@ -114,14 +116,18 @@ print(str(info)) | ||
| def test_vcf_filtering_chunk(): | ||
| with bnp.open('tmp.vcf', 'w') as f: | ||
| for chunk in bnp.open("example_data/lof_file.vcf").read_chunks(): | ||
| def test_vcf_filtering_chunk(tmp_path, data_path): | ||
| in_filepath = data_path / "lof_file.vcf" | ||
| out_filepath = tmp_path / 'tmp.vcf' | ||
| with bnp.open(out_filepath, 'w') as f: | ||
| for chunk in bnp.open(in_filepath).read_chunks(): | ||
| f.write(chunk[(chunk.info.LoF.lengths > 0) & chunk.info.ONCOGENE]) | ||
| assert bnp.count_entries('tmp.vcf') == 2 | ||
| assert bnp.count_entries(out_filepath) == 2 | ||
| def test_locations(): | ||
| def test_locations(data_path): | ||
| k = 5 | ||
| # Read genome and variants | ||
| genome = bnp.Genome.from_file("example_data/sacCer3.fa", filter_function=None) | ||
| variants_file = "example_data/sacCer3_sample_variants.vcf.gz" | ||
| genome_file_name = data_path / "sacCer3.fa" | ||
| genome = bnp.Genome.from_file(genome_file_name, filter_function=None) | ||
| variants_file = data_path / "sacCer3_sample_variants.vcf.gz" | ||
| print(bnp.open(variants_file).read()) | ||
@@ -137,3 +143,3 @@ | ||
| print(variants) | ||
| windows = variants.get_windows(flank=k-1) | ||
| windows = variants.get_windows(flank=k - 1) | ||
| print(windows) | ||
@@ -146,3 +152,3 @@ # Use the windows to extract sequences (kmers) | ||
| assert_encoded_array_equal(sequences[:, k - 1], variants.get_data_field('ref_seq').ravel()) | ||
| sequences[:, k-1] = variants.get_data_field('alt_seq').ravel() | ||
| sequences[:, k - 1] = variants.get_data_field('alt_seq').ravel() | ||
| assert_encoded_array_equal(sequences[:, k - 1], variants.get_data_field('alt_seq').ravel()) | ||
@@ -152,2 +158,6 @@ print(sequences) | ||
| alt_kmers = bnp.get_kmers(sequences, k) | ||
| print(alt_kmers[0:3]) | ||
| print(alt_kmers[0:3]) | ||
| fai_filename = genome_file_name.with_suffix(genome_file_name.suffix + '.fai') | ||
| # remove file | ||
| if os.path.exists(fai_filename): | ||
| os.remove(fai_filename) |
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
669607
0.14%218
0.46%15035
0.11%