Latest Threat Research:SANDWORM_MODE: Shai-Hulud-Style npm Worm Hijacks CI Workflows and Poisons AI Toolchains.Details
Socket
Book a DemoInstallSign in
Socket

ms2pip

Package Overview
Dependencies
Maintainers
3
Versions
33
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

ms2pip - npm Package Compare versions

Comparing version
4.0.0.dev12
to
4.0.0.dev13
+23
ms2pip/plot.py
from pathlib import Path
from typing import Union
from ms2pip.spectrum import Spectrum
try:
import matplotlib.pyplot as plt
import spectrum_utils.plot as sup
_can_plot = True
except ImportError:
_can_plot = False
def spectrum_to_png(spectrum: Spectrum, filepath: Union[str, Path]):
"""Plot a single spectrum and write to a PNG file."""
if not _can_plot:
raise ImportError("Matplotlib and spectrum_utils are required to plot spectra.")
ax = plt.gca()
ax.set_title("MS²PIP prediction for " + str(spectrum.peptidoform))
sup.spectrum(spectrum.to_spectrum_utils(), ax=ax)
plt.savefig(Path(filepath).with_suffix(".png"))
plt.close()
"""
Define and build the search space for in silico spectral library generation.
This module defines the search space for in silico spectral library generation as a
:py:class:`~ProteomeSearchSpace` object. Variable and fixed modifications can be configured
as :py:class:`~ModificationConfig` objects.
The peptide search space can be built from a protein FASTA file and a set of parameters, which can
then be converted to a :py:class:`psm_utils.PSMList` object for use in :py:mod:`ms2pip`. All
parameters are listed below at :py:class:`~ProteomeSearchSpace` and can be passed as a
dictionary, a JSON file, or as a :py:class:`~ProteomeSearchSpace` object. For example:
.. code-block:: json
{
"fasta_file": "test.fasta",
"min_length": 8,
"max_length": 3,
"cleavage_rule": "trypsin",
"missed_cleavages": 2,
"semi_specific": false,
"add_decoys": true,
"modifications": [
{
"label": "UNIMOD:Oxidation",
"amino_acid": "M"
},
{
"label": "UNIMOD:Carbamidomethyl",
"amino_acid": "C",
"fixed": true
}
],
"max_variable_modifications": 3,
"charges": [2, 3]
}
For an unspecific protein digestion, the cleavage rule can be set to ``unspecific``. This will
result in a cleavage rule that allows cleavage after any amino acid with an unlimited number of
allowed missed cleavages.
To disable protein digestion when the FASTA file contains peptides, set the cleavage rule to
``-``. This will treat each line in the FASTA file as a separate peptide sequence, but still
allow for modifications and charges to be added.
Examples
--------
>>> from ms2pip.search_space import ProteomeSearchSpace, ModificationConfig
>>> search_space = ProteomeSearchSpace(
... fasta_file="tests/data/test_proteins.fasta",
... min_length=8,
... max_length=30,
... cleavage_rule="trypsin",
... missed_cleavages=2,
... semi_specific=False,
... modifications=[
... ModificationConfig(label="UNIMOD:Oxidation", amino_acid="M"),
... ModificationConfig(label="UNIMOD:Carbamidomethyl", amino_acid="C", fixed=True),
... ],
... charges=[2, 3],
... )
>>> psm_list = search_space.into_psm_list()
>>> from ms2pip.search_space import ProteomeSearchSpace
>>> search_space = ProteomeSearchSpace.from_any("tests/data/test_search_space.json")
>>> psm_list = search_space.into_psm_list()
"""
from __future__ import annotations
import multiprocessing
import multiprocessing.dummy
from collections import defaultdict
from functools import partial
from itertools import chain, combinations, product
from logging import getLogger
from pathlib import Path
from typing import Any, Dict, Generator, List, Optional, Union
import numpy as np
import pyteomics.fasta
from psm_utils import PSM, PSMList
from pydantic import BaseModel, field_validator, model_validator
from pyteomics.parser import icleave
from rich.progress import track
logger = getLogger(__name__)
class ModificationConfig(BaseModel):
"""Configuration for a single modification in the search space."""
label: str
amino_acid: Optional[str] = None
peptide_n_term: Optional[bool] = False
protein_n_term: Optional[bool] = False
peptide_c_term: Optional[bool] = False
protein_c_term: Optional[bool] = False
fixed: Optional[bool] = False
def __init__(self, **data: Any):
"""
Configuration for a single modification in the search space.
Parameters
----------
label
Label of the modification. This can be any valid ProForma 2.0 label.
amino_acid
Amino acid target of the modification. :py:obj:`None` if the modification is not
specific to an amino acid. Default is None.
peptide_n_term
Whether the modification occurs only on the peptide N-terminus. Default is False.
protein_n_term
Whether the modification occurs only on the protein N-terminus. Default is False.
peptide_c_term
Whether the modification occurs only on the peptide C-terminus. Default is False.
protein_c_term
Whether the modification occurs only on the protein C-terminus. Default is False.
fixed
Whether the modification is fixed. Default is False.
"""
super().__init__(**data)
@model_validator(mode="after")
def _modification_must_have_target(self):
target_fields = [
"amino_acid",
"peptide_n_term",
"protein_n_term",
"peptide_c_term",
"protein_c_term",
]
if not any(getattr(self, t) for t in target_fields):
raise ValueError("Modifications must have a target (amino acid or N/C-term).")
return self
DEFAULT_MODIFICATIONS = [
ModificationConfig(
label="UNIMOD:Oxidation",
amino_acid="M",
),
ModificationConfig(
label="UNIMOD:Carbamidomethyl",
amino_acid="C",
fixed=True,
),
]
class ProteomeSearchSpace(BaseModel):
"""Search space for in silico spectral library generation."""
fasta_file: Path
min_length: int = 8
max_length: int = 30
min_precursor_mz: Optional[float] = 0
max_precursor_mz: Optional[float] = np.Inf
cleavage_rule: str = "trypsin"
missed_cleavages: int = 2
semi_specific: bool = False
add_decoys: bool = False
modifications: List[ModificationConfig] = DEFAULT_MODIFICATIONS
max_variable_modifications: int = 3
charges: List[int] = [2, 3]
def __init__(self, **data: Any):
"""
Search space for in silico spectral library generation.
Parameters
----------
fasta_file
Path to FASTA file with protein sequences.
min_length
Minimum peptide length. Default is 8.
max_length
Maximum peptide length. Default is 30.
min_precursor_mz
Minimum precursor m/z for peptides. Default is 0.
max_precursor_mz
Maximum precursor m/z for peptides. Default is np.Inf.
cleavage_rule
Cleavage rule for peptide digestion. Default is "trypsin".
missed_cleavages
Maximum number of missed cleavages. Default is 2.
semi_specific
Allow semi-specific cleavage. Default is False.
add_decoys
Add decoy sequences to search space. Default is False.
modifications
List of modifications to consider. Default is oxidation of M and carbamidomethylation
of C.
max_variable_modifications
Maximum number of variable modifications per peptide. Default is 3.
charges
List of charges to consider. Default is [2, 3].
"""
super().__init__(**data)
self._peptidoform_spaces: List[_PeptidoformSearchSpace] = []
@field_validator("modifications")
@classmethod
def _validate_modifications(cls, v):
if all(isinstance(m, ModificationConfig) for m in v):
return v
elif all(isinstance(m, dict) for m in v):
return [ModificationConfig(**modification) for modification in v]
else:
raise ValueError(
"Modifications should be a list of dicts or ModificationConfig objects."
)
@model_validator(mode="after")
def _validate_unspecific_cleavage(self):
"""Validate and configure unspecific cleavage settings."""
# `unspecific` is not an option in pyteomics.parser.icleave, so we configure
# the settings for unspecific cleavage manually.
if self.cleavage_rule == "unspecific":
self.missed_cleavages = self.max_length
self.cleavage_rule = r"(?<=[A-Z])"
return self
def __len__(self):
if not self._peptidoform_spaces:
raise ValueError("Search space must be built before length can be determined.")
return sum(len(pep_space) for pep_space in self._peptidoform_spaces)
@classmethod
def from_any(cls, _input: Union[dict, str, Path, ProteomeSearchSpace]) -> ProteomeSearchSpace:
"""
Create ProteomeSearchSpace from various input types.
Parameters
----------
_input
Search space parameters as a dictionary, a path to a JSON file, an existing
:py:class:`ProteomeSearchSpace` object.
"""
if isinstance(_input, ProteomeSearchSpace):
return _input
elif isinstance(_input, (str, Path)):
with open(_input, "rt") as f:
return cls.model_validate_json(f.read())
elif isinstance(_input, dict):
return cls.model_validate(_input)
else:
raise ValueError("Search space must be a dict, str, Path, or ProteomeSearchSpace.")
def build(self, processes: int = 1):
"""
Build peptide search space from FASTA file.
Parameters
----------
processes : int
Number of processes to use for parallelization.
"""
processes = processes if processes else multiprocessing.cpu_count()
self._digest_fasta(processes)
self._remove_redundancy()
self._add_modifications(processes)
self._add_charges()
def __iter__(self) -> Generator[PSM, None, None]:
"""
Generate PSMs from search space.
If :py:meth:`build` has not been called, the search space will first be built with the
given parameters.
Parameters
----------
processes : int
Number of processes to use for parallelization.
"""
# Build search space if not already built
if not self._peptidoform_spaces:
raise ValueError("Search space must be built before PSMs can be generated.")
spectrum_id = 0
for pep_space in self._peptidoform_spaces:
for pep in pep_space:
yield PSM(
peptidoform=pep,
spectrum_id=spectrum_id,
protein_list=pep_space.proteins,
)
spectrum_id += 1
def filter_psms_by_mz(self, psms: PSMList) -> PSMList:
"""Filter PSMs by precursor m/z range."""
return PSMList(
psm_list=[
psm
for psm in psms
if self.min_precursor_mz <= psm.peptidoform.theoretical_mz <= self.max_precursor_mz
]
)
def _digest_fasta(self, processes: int = 1):
"""Digest FASTA file to peptides and populate search space."""
# Convert to string to avoid issues with Path objects
self.fasta_file = str(self.fasta_file)
n_proteins = _count_fasta_entries(self.fasta_file)
if self.add_decoys:
fasta_db = pyteomics.fasta.decoy_db(
self.fasta_file,
mode="reverse",
decoy_only=False,
keep_nterm=True,
)
n_proteins *= 2
else:
fasta_db = pyteomics.fasta.FASTA(self.fasta_file)
# Read proteins and digest to peptides
with _get_pool(processes) as pool:
partial_digest_protein = partial(
_digest_single_protein,
min_length=self.min_length,
max_length=self.max_length,
cleavage_rule=self.cleavage_rule,
missed_cleavages=self.missed_cleavages,
semi_specific=self.semi_specific,
)
results = track(
pool.imap(partial_digest_protein, fasta_db),
total=n_proteins,
description="Digesting proteins...",
transient=True,
)
self._peptidoform_spaces = list(chain.from_iterable(results))
def _remove_redundancy(self):
"""Remove redundancy in peptides and combine protein lists."""
peptide_dict = dict()
for peptide in track(
self._peptidoform_spaces,
description="Removing peptide redundancy...",
transient=True,
):
if peptide.sequence in peptide_dict:
peptide_dict[peptide.sequence].proteins.extend(peptide.proteins)
else:
peptide_dict[peptide.sequence] = peptide
# Overwrite with non-redundant peptides
self._peptidoform_spaces = list(peptide_dict.values())
def _add_modifications(self, processes: int = 1):
"""Add modifications to peptides in search space."""
modifications_by_target = _restructure_modifications_by_target(self.modifications)
modification_options = []
with _get_pool(processes) as pool:
partial_get_modification_versions = partial(
_get_peptidoform_modification_versions,
modifications=self.modifications,
modifications_by_target=modifications_by_target,
max_variable_modifications=self.max_variable_modifications,
)
modification_options = pool.imap(
partial_get_modification_versions, self._peptidoform_spaces
)
for pep, mod_opt in track(
zip(self._peptidoform_spaces, modification_options),
description="Adding modifications...",
total=len(self._peptidoform_spaces),
transient=True,
):
pep.modification_options = mod_opt
def _add_charges(self):
"""Add charge permutations to peptides in search space."""
for peptide in track(
self._peptidoform_spaces,
description="Adding charge permutations...",
transient=True,
):
peptide.charge_options = self.charges
class _PeptidoformSearchSpace(BaseModel):
"""Search space for a given amino acid sequence."""
sequence: str
proteins: List[str]
is_n_term: Optional[bool] = None
is_c_term: Optional[bool] = None
modification_options: List[Dict[int, ModificationConfig]] = []
charge_options: List[int] = []
def __init__(self, **data: Any):
"""
Search space for a given amino acid sequence.
Parameters
----------
sequence
Amino acid sequence of the peptidoform.
proteins
List of protein IDs containing the peptidoform.
is_n_term
Whether the peptidoform is an N-terminal peptide. Default is None.
is_c_term
Whether the peptidoform is a C-terminal peptide. Default is None.
modification_options
List of dictionaries with modification positions and configurations. Default is [].
charge_options
List of charge states to consider. Default is [].
"""
super().__init__(**data)
def __len__(self):
return len(self.modification_options) * len(self.charge_options)
def __iter__(self) -> Generator[str, None, None]:
"""Yield peptidoform strings with given charges and modifications."""
if not self.charge_options:
raise ValueError("Peptide charge options not defined.")
if not self.modification_options:
raise ValueError("Peptide modification options not defined.")
for modifications, charge in product(self.modification_options, self.charge_options):
yield self._construct_peptidoform_string(self.sequence, modifications, charge)
@staticmethod
def _construct_peptidoform_string(
sequence: str, modifications: Dict[int, ModificationConfig], charge: int
) -> str:
if not modifications:
return f"{sequence}/{charge}"
modded_sequence = list(sequence)
for position, mod in modifications.items():
if isinstance(position, int):
aa = modded_sequence[position]
if aa != mod.amino_acid:
raise ValueError(
f"Modification {mod.label} at position {position} does not match amino "
f"acid {aa}."
)
modded_sequence[position] = f"{aa}[{mod.label}]"
elif position == "N":
modded_sequence.insert(0, f"[{mod.label}]-")
elif position == "C":
modded_sequence.append(f"-[{mod.label}]")
else:
raise ValueError(f"Invalid position {position} for modification {mod.label}.")
return f"{''.join(modded_sequence)}/{charge}"
def _digest_single_protein(
protein: pyteomics.fasta.Protein,
min_length: int = 8,
max_length: int = 30,
cleavage_rule: str = "trypsin",
missed_cleavages: int = 2,
semi_specific: bool = False,
) -> List[_PeptidoformSearchSpace]:
"""Digest protein sequence and return a list of validated peptides."""
def valid_residues(sequence: str) -> bool:
return not any(aa in sequence for aa in ["B", "J", "O", "U", "X", "Z"])
def parse_peptide(
start_position: int,
sequence: str,
protein: pyteomics.fasta.Protein,
) -> _PeptidoformSearchSpace:
"""Parse result from parser.icleave into Peptide."""
return _PeptidoformSearchSpace(
sequence=sequence,
# Assumes protein ID is description until first space
proteins=[protein.description.split(" ")[0]],
is_n_term=start_position == 0,
is_c_term=start_position + len(sequence) == len(protein.sequence),
)
peptides = [
parse_peptide(start, seq, protein)
for start, seq in icleave(
protein.sequence,
cleavage_rule,
missed_cleavages=missed_cleavages,
min_length=min_length,
max_length=max_length,
semi=semi_specific,
)
if valid_residues(seq)
]
return peptides
def _count_fasta_entries(filename: Path) -> int:
"""Count the number of entries in a FASTA file."""
with open(filename, "rt") as f:
count = 0
for line in f:
if line[0] == ">":
count += 1
return count
def _restructure_modifications_by_target(
modifications: List[ModificationConfig],
) -> Dict[str, Dict[str, List[ModificationConfig]]]:
"""Restructure variable modifications to options per side chain or terminus."""
modifications_by_target = {
"sidechain": defaultdict(lambda: []),
"peptide_n_term": defaultdict(lambda: []),
"peptide_c_term": defaultdict(lambda: []),
"protein_n_term": defaultdict(lambda: []),
"protein_c_term": defaultdict(lambda: []),
}
def add_mod(mod, target, amino_acid):
if amino_acid:
modifications_by_target[target][amino_acid].append(mod)
else:
modifications_by_target[target]["any"].append(mod)
for mod in modifications:
if mod.fixed:
continue
if mod.peptide_n_term:
add_mod(mod, "peptide_n_term", mod.amino_acid)
elif mod.peptide_c_term:
add_mod(mod, "peptide_c_term", mod.amino_acid)
elif mod.protein_n_term:
add_mod(mod, "protein_n_term", mod.amino_acid)
elif mod.protein_c_term:
add_mod(mod, "protein_c_term", mod.amino_acid)
else:
add_mod(mod, "sidechain", mod.amino_acid)
return {k: dict(v) for k, v in modifications_by_target.items()}
def _get_modification_possibilities_by_site(
peptide: _PeptidoformSearchSpace,
modifications_by_target: Dict[str, Dict[str, List[ModificationConfig]]],
modifications: List[ModificationConfig],
) -> Dict[Union[str, int], List[ModificationConfig]]:
"""Get all possible modifications for each site in a peptide sequence."""
possibilities_by_site = defaultdict(list)
# Generate dictionary of positions per amino acid
position_dict = defaultdict(list)
for pos, aa in enumerate(peptide.sequence):
position_dict[aa].append(pos)
# Map modifications to positions
for aa in set(position_dict).intersection(set(modifications_by_target["sidechain"])):
possibilities_by_site.update(
{pos: modifications_by_target["sidechain"][aa] for pos in position_dict[aa]}
)
# Assign possible modifications per terminus
for terminus, position, site_name, specificity in [
("peptide_n_term", 0, "N", None),
("peptide_c_term", -1, "C", None),
("protein_n_term", 0, "N", "is_n_term"),
("protein_c_term", -1, "C", "is_c_term"),
]:
if specificity is None or getattr(peptide, specificity):
for site, mods in modifications_by_target[terminus].items():
if site == "any" or peptide.sequence[position] == site:
possibilities_by_site[site_name].extend(mods)
# Override with fixed modifications
for mod in modifications:
aa = mod.amino_acid
# Skip variable modifications
if not mod.fixed:
continue
# Assign if specific aa matches or if no aa is specified for each terminus
for terminus, position, site_name, specificity in [
("peptide_n_term", 0, "N", None),
("peptide_c_term", -1, "C", None),
("protein_n_term", 0, "N", "is_n_term"),
("protein_c_term", -1, "C", "is_c_term"),
]:
if getattr(mod, terminus): # Mod has this terminus
if specificity is None or getattr(peptide, specificity): # Specificity matches
if not aa or (aa and peptide.sequence[position] == aa): # AA matches
possibilities_by_site[site_name] = [mod] # Override with fixed mod
break # Allow `else: if amino_acid` if no terminus matches
# Assign if fixed modification is not terminal and specific aa matches
else:
if aa:
for pos in position_dict[aa]:
possibilities_by_site[pos] = [mod]
return possibilities_by_site
def _get_peptidoform_modification_versions(
peptide: _PeptidoformSearchSpace,
modifications: List[ModificationConfig],
modifications_by_target: Dict[str, Dict[str, List[ModificationConfig]]],
max_variable_modifications: int = 3,
) -> List[Dict[Union[str, int], List[ModificationConfig]]]:
"""
Get all potential combinations of modifications for a peptide sequence.
Examples
--------
>>> peptide = PeptidoformSpace(sequence="PEPTIDE", proteins=["PROTEIN"])
>>> phospho = ModificationConfig(label="Phospho", amino_acid="T", fixed=False)
>>> acetyl = ModificationConfig(label="Acetyl", peptide_n_term=True, fixed=False)
>>> modifications = [phospho, acetyl]
>>> modifications_by_target = {
... "sidechain": {"S": [modifications[0]]},
... "peptide_n_term": {"any": [modifications[1]]},
... "peptide_c_term": {"any": []},
... "protein_n_term": {"any": []},
... "protein_c_term": {"any": []},
... }
>>> _get_modification_versions(peptide, modifications, modifications_by_target)
[{}, {3: phospho}, {0: acetyl}, {0: acetyl, 3: phospho}]
"""
# Get all possible modifications for each site in the peptide sequence
possibilities_by_site = _get_modification_possibilities_by_site(
peptide, modifications_by_target, modifications
)
# Separate fixed and variable modification sites
fixed_modifications = {}
variable_sites = []
for site, mods in possibilities_by_site.items():
for mod in mods:
if mod.fixed:
fixed_modifications[site] = mod
else:
variable_sites.append((site, mod))
# Generate all combinations of variable modifications up to the maximum allowed
modification_versions = []
for i in range(max_variable_modifications + 1):
for comb in combinations(variable_sites, i):
combination_dict = fixed_modifications.copy()
for site, mod in comb:
combination_dict[site] = mod
modification_versions.append(combination_dict)
return modification_versions
def _get_pool(processes: int) -> Union[multiprocessing.Pool, multiprocessing.dummy.Pool]:
"""Get a multiprocessing pool with the given number of processes."""
# TODO: fix None default value for processes
if processes > 1:
return multiprocessing.Pool(processes)
else:
return multiprocessing.dummy.Pool(processes)
from ms2pip import search_space
OXIDATION = search_space.ModificationConfig(
label="Oxidation",
amino_acid="M",
)
CARBAMIDOMETHYL = search_space.ModificationConfig(
label="Carbamidomethyl",
amino_acid="C",
fixed=True,
)
PYROGLU = search_space.ModificationConfig(
label="Glu->pyro-Glu",
amino_acid="E",
peptide_n_term=True,
)
ACETYL = search_space.ModificationConfig(
label="Acetyl",
amino_acid=None,
protein_n_term=True,
)
PHOSPHO = search_space.ModificationConfig(
label="Phospho",
amino_acid="T",
fixed=False,
)
MODIFICATION_CONFIG = [OXIDATION, CARBAMIDOMETHYL, PYROGLU, ACETYL]
def test_restructure_modifications_by_target():
test_cases = [
{
"modifications": [PHOSPHO, ACETYL],
"expected": {
"sidechain": {"T": [PHOSPHO]},
"peptide_n_term": {},
"peptide_c_term": {},
"protein_n_term": {"any": [ACETYL]},
"protein_c_term": {},
},
},
{
"modifications": [CARBAMIDOMETHYL, ACETYL],
"expected": {
"sidechain": {},
"peptide_n_term": {},
"peptide_c_term": {},
"protein_n_term": {"any": [ACETYL]},
"protein_c_term": {},
},
},
]
for case in test_cases:
test_out = search_space._restructure_modifications_by_target(case["modifications"])
assert test_out == case["expected"]
def test_get_peptidoform_modification_versions():
test_cases = [
# None
{
"sequence": "PEPTIDE",
"modifications": [],
"expected": [{}],
},
# Single fixed
{
"sequence": "ACDE",
"modifications": [CARBAMIDOMETHYL],
"expected": [{1: CARBAMIDOMETHYL}],
},
# Double fixed
{
"sequence": "ACCDE",
"modifications": [CARBAMIDOMETHYL],
"expected": [{1: CARBAMIDOMETHYL, 2: CARBAMIDOMETHYL}],
},
# Single variable
{
"sequence": "ADME",
"modifications": [OXIDATION],
"expected": [{}, {2: OXIDATION}],
},
# Double variable
{
"sequence": "ADMME",
"modifications": [OXIDATION],
"expected": [{}, {2: OXIDATION}, {3: OXIDATION}, {2: OXIDATION, 3: OXIDATION}],
},
# More than maximum simultaneous mods should be ignored
{
"sequence": "ADMMME",
"modifications": [OXIDATION],
"expected": [
{},
{2: OXIDATION},
{3: OXIDATION},
{4: OXIDATION},
{2: OXIDATION, 3: OXIDATION},
{2: OXIDATION, 4: OXIDATION},
{3: OXIDATION, 4: OXIDATION},
],
},
# N-term and AA-specific
{
"sequence": "EDEF",
"modifications": [PYROGLU],
"expected": [{}, {"N": PYROGLU}],
},
{
"sequence": "PEPTIDE",
"modifications": [PHOSPHO, ACETYL],
"expected": [{}, {3: PHOSPHO}, {"N": ACETYL}, {"N": ACETYL, 3: PHOSPHO}],
},
{
"sequence": "ACDEK",
"modifications": [CARBAMIDOMETHYL, ACETYL],
"expected": [
{1: CARBAMIDOMETHYL},
{1: CARBAMIDOMETHYL, "N": ACETYL},
],
},
]
for case in test_cases:
peptide = search_space._PeptidoformSearchSpace(
sequence=case["sequence"], proteins=[], is_n_term=True
)
modifications_by_target = search_space._restructure_modifications_by_target(
case["modifications"]
)
test_out = search_space._get_peptidoform_modification_versions(
peptide,
case["modifications"],
modifications_by_target,
max_variable_modifications=2,
)
assert test_out == case["expected"]
def test_get_modifications_by_target():
modifications_by_target = search_space._restructure_modifications_by_target(
MODIFICATION_CONFIG
)
assert modifications_by_target["sidechain"] == {"M": MODIFICATION_CONFIG[0:1]}
assert modifications_by_target["peptide_n_term"] == {"E": MODIFICATION_CONFIG[2:3]}
assert modifications_by_target["peptide_c_term"] == {}
assert modifications_by_target["protein_n_term"] == {"any": MODIFICATION_CONFIG[3:4]}
assert modifications_by_target["protein_c_term"] == {}
class TestProteomeSearchSpace:
def test_digest_fasta(self):
test_input = {
"fasta_file": "tests/test_data/test.fasta",
"min_length": 8,
"max_length": 30,
"cleavage_rule": "trypsin",
"missed_cleavages": 2,
"semi_specific": False,
}
test_output = [
search_space._PeptidoformSearchSpace(
sequence="MYSSCSLLQR",
proteins=["P12345"],
modification_options=[],
is_n_term=True,
is_c_term=False,
),
search_space._PeptidoformSearchSpace(
sequence="MYSSCSLLQRLVWFPFLALVATQLLFIR",
proteins=["P12345"],
modification_options=[],
is_n_term=True,
is_c_term=False,
),
search_space._PeptidoformSearchSpace(
sequence="LVWFPFLALVATQLLFIR",
proteins=["P12345"],
modification_options=[],
is_n_term=False,
is_c_term=False,
),
search_space._PeptidoformSearchSpace(
sequence="NVSSLNLTNEYLHHK",
proteins=["P12345"],
modification_options=[],
is_n_term=False,
is_c_term=False,
),
search_space._PeptidoformSearchSpace(
sequence="NVSSLNLTNEYLHHKCLVSEGK",
proteins=["P12345"],
modification_options=[],
is_n_term=False,
is_c_term=False,
),
search_space._PeptidoformSearchSpace(
sequence="NVSSLNLTNEYLHHKCLVSEGKYKPGSK",
proteins=["P12345"],
modification_options=[],
is_n_term=False,
is_c_term=False,
),
search_space._PeptidoformSearchSpace(
sequence="CLVSEGKYKPGSK",
proteins=["P12345"],
modification_options=[],
is_n_term=False,
is_c_term=False,
),
search_space._PeptidoformSearchSpace(
sequence="CLVSEGKYKPGSKYEYI",
proteins=["P12345"],
modification_options=[],
is_n_term=False,
is_c_term=True,
),
search_space._PeptidoformSearchSpace(
sequence="YKPGSKYEYI",
proteins=["P12345"],
modification_options=[],
is_n_term=False,
is_c_term=True,
),
]
sp = search_space.ProteomeSearchSpace(**test_input)
sp._digest_fasta()
assert test_output == sp._peptidoform_spaces
+2
-1
Metadata-Version: 2.1
Name: ms2pip
Version: 4.0.0.dev12
Version: 4.0.0.dev13
Summary: MS2PIP: Accurate and versatile peptide fragmentation spectrum prediction.

@@ -319,2 +319,3 @@ Author: Ana Sílvia C. Silva

training.
- ``annotate-spectra``: Annotate peaks in observed spectra.

@@ -321,0 +322,0 @@ MS²PIP supports a wide range of PSM input formats and spectrum output formats, and includes

@@ -12,3 +12,5 @@ LICENSE

ms2pip/exceptions.py
ms2pip/plot.py
ms2pip/result.py
ms2pip/search_space.py
ms2pip/spectrum.py

@@ -45,5 +47,3 @@ ms2pip/spectrum_input.py

tests/test_encoder.py
tests/test_fasta2speclib.py
tests/test_predictions.py
tests/test_retention_time.py
tests/test_search_space.py
tests/test_spectrum_output.py
# isort: skip_file
"""MS2PIP: Accurate and versatile peptide fragmentation spectrum prediction."""
__version__ = "4.0.0-dev12"
__version__ = "4.0.0-dev13"

@@ -6,0 +6,0 @@ from warnings import filterwarnings

@@ -13,13 +13,8 @@ import logging

import ms2pip.core
from ms2pip import __version__
from ms2pip import __version__, exceptions
from ms2pip._utils.cli import build_credits, build_prediction_table
from ms2pip.constants import MODELS, SUPPORTED_OUTPUT_FORMATS
from ms2pip.exceptions import (
InvalidXGBoostModelError,
UnknownModelError,
UnknownOutputFormatError,
UnresolvableModificationError,
)
from ms2pip.result import correlations_to_csv, results_to_csv
from ms2pip.spectrum_output import write_single_spectrum_csv, write_single_spectrum_png
from ms2pip.constants import MODELS
from ms2pip.plot import spectrum_to_png
from ms2pip.result import write_correlations
from ms2pip.spectrum_output import SUPPORTED_FORMATS, write_spectra

@@ -48,3 +43,4 @@ console = Console()

else:
return Path(input_filename).with_suffix("")
input__filename = Path(input_filename)
return input__filename.with_name(input__filename.stem + "_predictions").with_suffix("")

@@ -70,2 +66,3 @@

@click.option("--output-name", "-o", type=str)
@click.option("--output-format", "-f", type=click.Choice(SUPPORTED_FORMATS), default="tsv")
@click.option("--model", type=click.Choice(MODELS), default="HCD")

@@ -77,5 +74,6 @@ @click.option("--model-dir")

output_name = kwargs.pop("output_name")
output_format = kwargs.pop("output_format")
plot = kwargs.pop("plot")
if not output_name:
output_name = "ms2pip_prediction_" + secure_filename(kwargs["peptidoform"]) + ".csv"
output_name = "ms2pip_prediction_" + secure_filename(kwargs["peptidoform"])

@@ -88,5 +86,5 @@ # Predict spectrum

console.print(build_prediction_table(predicted_spectrum))
write_single_spectrum_csv(predicted_spectrum, output_name)
write_spectra(output_name, [result], output_format)
if plot:
write_single_spectrum_png(predicted_spectrum, output_name)
spectrum_to_png(predicted_spectrum, output_name)

@@ -97,3 +95,3 @@

@click.option("--output-name", "-o", type=str)
@click.option("--output-format", "-f", type=click.Choice(SUPPORTED_OUTPUT_FORMATS))
@click.option("--output-format", "-f", type=click.Choice(SUPPORTED_FORMATS), default="tsv")
@click.option("--add-retention-time", "-r", is_flag=True)

@@ -105,5 +103,4 @@ @click.option("--model", type=click.Choice(MODELS), default="HCD")

# Parse arguments
output_name = kwargs.pop("output_name")
output_format = kwargs.pop("output_format")
output_name = _infer_output_name(kwargs["psms"], output_name)
output_name = _infer_output_name(kwargs["psms"], kwargs.pop("output_name"))

@@ -114,13 +111,29 @@ # Run

# Write output
output_name_csv = output_name.with_name(output_name.stem + "_predictions").with_suffix(".csv")
logger.info(f"Writing output to {output_name_csv}")
results_to_csv(predictions, output_name_csv)
# TODO: add support for other output formats
write_spectra(output_name, predictions, output_format)
@cli.command(help=ms2pip.core.predict_library.__doc__)
@click.argument("fasta-file", required=False, type=click.Path(exists=True, dir_okay=False))
@click.option("--config", "-c", type=click.Path(exists=True, dir_okay=False))
@click.option("--output-name", "-o", type=str)
@click.option("--output-format", "-f", type=click.Choice(SUPPORTED_FORMATS), default="msp")
@click.option("--add-retention-time", "-r", is_flag=True)
@click.option("--model", type=click.Choice(MODELS), default="HCD")
@click.option("--model-dir")
@click.option("--batch-size", type=int, default=100000)
@click.option("--processes", "-n", type=int)
def predict_library(*args, **kwargs):
ms2pip.core.predict_library(*args, **kwargs)
# Parse arguments
if not kwargs["fasta_file"] and not kwargs["config"]:
raise click.UsageError("Either `fasta_file` or `config` must be provided.")
output_format = kwargs.pop("output_format")
output_name = _infer_output_name(
kwargs["fasta_file"] or kwargs["config"], kwargs.pop("output_name")
)
# Run and write output for each batch
for i, result_batch in enumerate(ms2pip.core.predict_library(*args, **kwargs)):
write_spectra(output_name, result_batch, output_format, write_mode="w" if i == 0 else "a")
@cli.command(help=ms2pip.core.correlate.__doc__)

@@ -140,4 +153,3 @@ @click.argument("psms", required=True)

# Parse arguments
output_name = kwargs.pop("output_name")
output_name = _infer_output_name(kwargs["psms"], output_name)
output_name = _infer_output_name(kwargs["psms"], kwargs.pop("output_name"))

@@ -147,14 +159,13 @@ # Run

# Write output
output_name_int = output_name.with_name(output_name.stem + "_predictions").with_suffix(".csv")
logger.info(f"Writing intensities to {output_name_int}")
results_to_csv(results, output_name_int)
# TODO: add support for other output formats
# Write intensities
logger.info(f"Writing intensities to {output_name.with_suffix('.tsv')}")
write_spectra(output_name, results, "tsv")
# Write correlations
if kwargs["compute_correlations"]:
output_name_corr = output_name.with_name(output_name.stem + "_correlations")
output_name_corr = output_name_corr.with_suffix(".csv")
output_name_corr = output_name.with_name(output_name.stem + "_correlations").with_suffix(
".tsv"
)
logger.info(f"Writing correlations to {output_name_corr}")
correlations_to_csv(results, output_name_corr)
write_correlations(results, output_name_corr)

@@ -201,6 +212,6 @@

# Write output
output_name_int = output_name.with_name(output_name.stem + "_observations").with_suffix(".csv")
logger.info(f"Writing intensities to {output_name_int}")
results_to_csv(results, output_name_int)
# Write intensities
output_name_int = output_name.with_name(output_name.stem + "_observations").with_suffix()
logger.info(f"Writing intensities to {output_name_int.with_suffix('.tsv')}")
write_spectra(output_name, results, "tsv")

@@ -211,3 +222,3 @@

cli()
except UnresolvableModificationError as e:
except exceptions.UnresolvableModificationError as e:
logger.critical(

@@ -220,11 +231,9 @@ "Unresolvable modification: `%s`. See "

sys.exit(1)
except UnknownOutputFormatError as o:
logger.critical(
f"Unknown output format: `{o}` (supported formats: `{SUPPORTED_OUTPUT_FORMATS}`)"
)
except exceptions.UnknownOutputFormatError as o:
logger.critical(f"Unknown output format: `{o}` (supported formats: `{SUPPORTED_FORMATS}`)")
sys.exit(1)
except UnknownModelError as f:
except exceptions.UnknownModelError as f:
logger.critical(f"Unknown model: `{f}` (supported models: {set(MODELS.keys())})")
sys.exit(1)
except InvalidXGBoostModelError:
except exceptions.InvalidXGBoostModelError:
logger.critical("Could not correctly download XGBoost model\nTry a manual download.")

@@ -231,0 +240,0 @@ sys.exit(1)

"""Database configuration for EncyclopeDIA DLIB SQLite format."""
import zlib
from pathlib import Path
from typing import Union

@@ -94,5 +96,5 @@ import numpy

def open_sqlite(filename):
def open_sqlite(filename: Union[str, Path]) -> sqlalchemy.engine.Connection:
engine = sqlalchemy.create_engine(f"sqlite:///{filename}")
metadata.bind = engine
return engine.connect()

@@ -10,4 +10,2 @@ from __future__ import annotations

from ms2pip import exceptions
logger = logging.getLogger(__name__)

@@ -27,6 +25,2 @@

# Validate runs and collections
if not len(psm_list.collections) == 1 or not len(psm_list.runs) == 1:
raise exceptions.InvalidInputError("PSMs should be for a single run and collection.")
# Apply fixed modifications if any

@@ -33,0 +27,0 @@ psm_list.apply_fixed_modifications()

@@ -56,2 +56,3 @@ """Utilities for handling XGBoost model files within the MS²PIP prediction framework."""

preds = xgb_model.predict(features)
preds = preds.clip(min=np.log2(0.001)) # Clip negative intensities
xgb_model.__del__()

@@ -58,0 +59,0 @@

"""Constants and fixed configurations for MS²PIP."""
# Supported output formats
SUPPORTED_OUTPUT_FORMATS = ["csv", "mgf", "msp", "bibliospec", "spectronaut", "dlib"]
# Models and their properties

@@ -7,0 +4,0 @@ # id is passed to get_predictions to select model

@@ -12,3 +12,3 @@ #!/usr/bin/env python

from pathlib import Path
from typing import Callable, List, Optional, Tuple, Union
from typing import Any, Callable, Generator, Iterable, List, Optional, Tuple, Union

@@ -18,2 +18,3 @@ import numpy as np

from psm_utils import PSM, Peptidoform, PSMList
from rich.progress import track

@@ -28,5 +29,7 @@ import ms2pip.exceptions as exceptions

from ms2pip._utils.xgb_models import get_predictions_xgb, validate_requested_xgb_model
from ms2pip.constants import MODELS, SUPPORTED_OUTPUT_FORMATS
from ms2pip.constants import MODELS
from ms2pip.result import ProcessingResult, calculate_correlations
from ms2pip.search_space import ProteomeSearchSpace
from ms2pip.spectrum_input import read_spectrum_file
from ms2pip.spectrum_output import SUPPORTED_FORMATS

@@ -107,2 +110,4 @@ logger = logging.getLogger(__name__)

"""
if isinstance(psms, list):
psms = PSMList(psm_list=psms)
psm_list = read_psms(psms, filetype=psm_filetype)

@@ -128,7 +133,66 @@

def predict_library():
"""Predict spectral library from protein FASTA file."""
raise NotImplementedError
def predict_library(
fasta_file: Optional[Union[str, Path]] = None,
config: Optional[Union[ProteomeSearchSpace, dict, str, Path]] = None,
add_retention_time: bool = False,
model: Optional[str] = "HCD",
model_dir: Optional[Union[str, Path]] = None,
batch_size: int = 100000,
processes: Optional[int] = None,
) -> Generator[ProcessingResult, None, None]:
"""
Predict spectral library from protein FASTA file.\f
Parameters
----------
fasta_file
Path to FASTA file with protein sequences. Required if `search-space-config` is not
provided.
config
ProteomeSearchSpace, or a dictionary or path to JSON file with proteome search space
parameters. Required if `fasta_file` is not provided.
add_retention_time
Add retention time predictions with DeepLC (Requires optional DeepLC dependency).
model
Model to use for prediction. Default: "HCD".
model_dir
Directory where XGBoost model files are stored. Default: `~/.ms2pip`.
batch_size
Number of peptides to process in each batch.
processes
Number of parallel processes for multiprocessing steps. By default, all available.
"""
if fasta_file and config:
# Use provided proteome, but overwrite fasta_file
config = ProteomeSearchSpace.from_any(config)
config.fasta_file = fasta_file
elif fasta_file and not config:
# Default proteome search space with provided fasta_file
config = ProteomeSearchSpace(fasta_file=fasta_file)
elif not fasta_file and config:
# Use provided proteome
config = ProteomeSearchSpace.from_any(config)
else:
raise ValueError("Either `fasta_file` or `config` must be provided.")
search_space = ProteomeSearchSpace.from_any(config)
search_space.build()
for batch in track(
_into_batches(search_space, batch_size=batch_size),
description="Predicting spectra...",
total=ceil(len(search_space) / batch_size),
):
logging.disable(logging.CRITICAL)
yield predict_batch(
search_space.filter_psms_by_mz(PSMList(psm_list=list(batch))),
add_retention_time=add_retention_time,
model=model,
model_dir=model_dir,
processes=processes,
)
logging.disable(logging.NOTSET)
def correlate(

@@ -432,3 +496,3 @@ psms: Union[PSMList, str, Path],

for output_format in output_formats:
if output_format not in SUPPORTED_OUTPUT_FORMATS:
if output_format not in SUPPORTED_FORMATS:
raise exceptions.UnknownOutputFormatError(output_format)

@@ -553,2 +617,6 @@ self.output_formats = output_formats

"""
# Validate runs and collections
if not len(psm_list.collections) == 1 or not len(psm_list.runs) == 1:
raise exceptions.InvalidInputError("PSMs should be for a single run and collection.")
args = (

@@ -682,3 +750,6 @@ spectrum_file,

)
predictions = {i: np.array(p, dtype=np.float32) for i, p in zip(ion_types, predictions)}
predictions = {
i: np.array(p, dtype=np.float32).clip(min=np.log2(0.001)) # Clip negative intensities
for i, p in zip(ion_types, predictions)
}
feature_vectors = None

@@ -910,1 +981,13 @@

return training_data
def _into_batches(iterable: Iterable[Any], batch_size: int) -> Generator[List[Any], None, None]:
"""Accumulate iterator elements into batches of a given size."""
batch = []
for item in iterable:
batch.append(item)
if len(batch) == batch_size:
yield batch
batch = []
if batch:
yield batch
"""Definition and handling of MS²PIP results."""
from __future__ import annotations

@@ -6,6 +7,7 @@

from typing import Any, Dict, List, Optional, Tuple
from logging import getLogger
import numpy as np
from psm_utils import PSM
from pydantic import ConfigDict, BaseModel
from pydantic import BaseModel, ConfigDict

@@ -21,2 +23,3 @@ try:

logger = getLogger(__name__)

@@ -120,42 +123,9 @@ class ProcessingResult(BaseModel):

def results_to_csv(results: List["ProcessingResult"], output_file: str) -> None:
"""Write processing results to CSV file."""
with open(output_file, "wt") as f:
fieldnames = [
"psm_index",
"ion_type",
"ion_number",
"mz",
"predicted",
"observed",
]
writer = csv.DictWriter(f, fieldnames=fieldnames, lineterminator="\n")
writer.writeheader()
for result in results:
if result.theoretical_mz is not None:
for ion_type in result.theoretical_mz:
for i in range(len(result.theoretical_mz[ion_type])):
writer.writerow(
{
"psm_index": result.psm_index,
"ion_type": ion_type,
"ion_number": i + 1,
"mz": "{:.6g}".format(result.theoretical_mz[ion_type][i]),
"predicted": "{:.6g}".format(
result.predicted_intensity[ion_type][i]
) if result.predicted_intensity else None,
"observed": "{:.6g}".format(result.observed_intensity[ion_type][i])
if result.observed_intensity
else None,
}
)
def correlations_to_csv(results: List["ProcessingResult"], output_file: str) -> None:
def write_correlations(results: List["ProcessingResult"], output_file: str) -> None:
"""Write correlations to CSV file."""
with open(output_file, "wt") as f:
fieldnames = ["psm_index", "correlation"]
writer = csv.DictWriter(f, fieldnames=fieldnames, lineterminator="\n")
writer = csv.DictWriter(f, fieldnames=fieldnames, delimiter="\t", lineterminator="\n")
writer.writeheader()
for result in results:
writer.writerow({"psm_index": result.psm_index, "correlation": result.correlation})
"""
Write spectrum files from MS2PIP predictions.
Write spectrum files from MS²PIP prediction results.
Examples
--------
The simplest way to write MS²PIP predictions to a file is to use the :py:func:`write_spectra`
function:
>>> from ms2pip import predict_single, write_spectra
>>> results = [predict_single("ACDE/2")]
>>> write_spectra("/path/to/output/filename", results, "mgf")
Specific writer classes can also be used directly. Writer classes should be used in a context
manager to ensure the file is properly closed after writing. The following example writes MS²PIP
predictions to a TSV file:
>>> from ms2pip import predict_single
>>> results = [predict_single("ACDE/2")]
>>> with TSV("output.tsv") as writer:
... writer.write(results)
Results can be written to the same file sequentially:
>>> results_2 = [predict_single("PEPTIDEK/2")]
>>> with TSV("output.tsv", write_mode="a") as writer:
... writer.write(results)
... writer.write(results_2)
Results can be written to an existing file using the append mode:
>>> with TSV("output.tsv", write_mode="a") as writer:
... writer.write(results_2)
"""
from __future__ import annotations

@@ -9,705 +44,677 @@

import logging
import os
from ast import literal_eval
from functools import wraps
import re
import warnings
from abc import ABC, abstractmethod
from collections import defaultdict
from io import StringIO
from operator import itemgetter
from pathlib import Path
from time import localtime, strftime
from typing import Any, Dict, List
from typing import Any, Dict, Generator, List, Optional, Union
import numpy as np
from psm_utils import PSM, Peptidoform
from pyteomics import proforma
from sqlalchemy import engine, select
from ms2pip._utils import dlib
from ms2pip.result import ProcessingResult
logger = logging.getLogger(__name__)
LOGGER = logging.getLogger(__name__)
class InvalidWriteModeError(ValueError):
pass
def write_spectra(
filename: Union[str, Path],
processing_results: List[ProcessingResult],
file_format: str = "tsv",
write_mode: str = "w",
):
"""
Write MS2PIP processing results to a supported spectrum file format.
Parameters
----------
filename
Output filename without file extension.
processing_results
List of :py:class:`ms2pip.result.ProcessingResult` objects.
file_format
File format to write. See :py:attr:`FILE_FORMATS` for available formats.
write_mode
Write mode for file. Default is ``w`` (write). Use ``a`` (append) to add to existing file.
# Writer decorator
def writer(**kwargs):
def deco(write_function):
@wraps(write_function)
def wrapper(self):
return self._write_general(write_function, **kwargs)
"""
with SUPPORTED_FORMATS[file_format](filename, write_mode) as writer:
LOGGER.info(f"Writing to {writer.filename}")
writer.write(processing_results)
return wrapper
return deco
class _Writer(ABC):
"""Abstract base class for writing spectrum files."""
suffix = ""
def output_format(output_format):
class OutputFormat:
def __init__(self, fn):
self.fn = fn
self.output_format = output_format
def __init__(self, filename: Union[str, Path], write_mode: str = "w"):
self.filename = Path(filename).with_suffix(self.suffix)
self.write_mode = write_mode
def __set_name__(self, owner, name):
owner.OUTPUT_FORMATS[self.output_format] = self.fn
setattr(owner, name, self.fn)
self._open_file = None
return OutputFormat
def __enter__(self):
"""Open file in context manager."""
self.open()
return self
def __exit__(self, exc_type, exc_value, traceback):
"""Close file in context manager."""
self.close()
class SpectrumOutput:
"""Write MS2PIP predictions to various output formats."""
def __repr__(self):
return f"{self.__class__.__name__}({self.filename, self.write_mode})"
OUTPUT_FORMATS = {}
def open(self):
"""Open file."""
if self._open_file:
self.close()
self._open_file = open(self.filename, self.write_mode)
def __init__(
self,
results: List["ProcessingResult"],
output_filename="ms2pip_predictions",
write_mode="wt+",
return_stringbuffer=False,
is_log_space=True,
):
"""
Write MS2PIP predictions to various output formats.
def close(self):
"""Close file."""
if self._open_file:
self._open_file.close()
self._open_file = None
Parameters
----------
results:
List of ProcessingResult objects
output_filename: str, optional
path and name for output files, will be suffexed with `_predictions` and the
relevant file extension (default: ms2pip_predictions)
write_mode: str, optional
write mode to use: "wt+" to append to start a new file, "at" to append to an
existing file (default: "wt+")
return_stringbuffer: bool, optional
If True, files are written to a StringIO object, which the write function
returns. If False, files are written to a file on disk.
is_log_space: bool, optional
Set to true if predicted intensities in `all_preds` are in log-space. In that
case, intensities will first be transformed to "normal"-space.
Example
-------
>>> so = ms2pip.spectrum_tools.spectrum_output.SpectrumOutput(
results
@property
def _file_object(self):
"""Get open file object."""
if self._open_file:
return self._open_file
else:
warnings.warn(
"Opening file outside of context manager. Manually close file after use."
)
>>> so.write_msp()
>>> so.write_spectronaut()
self.open()
return self._open_file
"""
def write(self, processing_results: List[ProcessingResult]):
"""Write multiple processing results to file."""
for result in processing_results:
self._write_result(result)
self.results = results
self.output_filename = output_filename
self.write_mode = write_mode
self.return_stringbuffer = return_stringbuffer
self.is_log_space = is_log_space
@abstractmethod
def _write_result(self, result: ProcessingResult):
"""Write single processing result to file."""
...
self.diff_modification_mapping = {}
self.has_rt = "rt" in self.peprec.columns
self.has_protein_list = "protein_list" in self.peprec.columns
class TSV(_Writer):
"""Write TSV files from MS2PIP processing results."""
if self.write_mode not in ["wt+", "wt", "at", "w", "a"]:
raise InvalidWriteModeError(self.write_mode)
suffix = ".tsv"
field_names = [
"psm_index",
"ion_type",
"ion_number",
"mz",
"predicted",
"observed",
"rt",
]
if "a" in self.write_mode and self.return_stringbuffer:
raise InvalidWriteModeError(self.write_mode)
def write(self, processing_results: List[ProcessingResult]):
"""Write multiple processing results to file."""
writer = csv.DictWriter(
self._file_object, fieldnames=self.field_names, delimiter="\t", lineterminator="\n"
)
if self.write_mode == "w":
writer.writeheader()
for result in processing_results:
self._write_result(result, writer)
# def _generate_peprec_dict(self, rt_to_seconds=True):
# """
# Create easy to access dict from all_preds and peprec dataframes
# """
# peprec_tmp = self.peprec.copy()
def _write_result(self, result: ProcessingResult, writer: csv.DictWriter):
"""Write single processing result to file."""
# Only write results with predictions or observations
if not result.theoretical_mz:
return
# if self.has_rt and rt_to_seconds:
# peprec_tmp["rt"] = peprec_tmp["rt"] * 60
for ion_type in result.theoretical_mz:
for i in range(len(result.theoretical_mz[ion_type])):
writer.writerow(self._write_row(result, ion_type, i))
# peprec_tmp.index = peprec_tmp["spec_id"]
# peprec_tmp.drop("spec_id", axis=1, inplace=True)
@staticmethod
def _write_row(result: ProcessingResult, ion_type: str, ion_index: int):
"""Write single row for TSV file."""
return {
"psm_index": result.psm_index,
"ion_type": ion_type,
"ion_number": ion_index + 1,
"mz": "{:.8f}".format(result.theoretical_mz[ion_type][ion_index]),
"predicted": "{:.8f}".format(result.predicted_intensity[ion_type][ion_index])
if result.predicted_intensity
else None,
"observed": "{:.8f}".format(result.observed_intensity[ion_type][ion_index])
if result.observed_intensity
else None,
"rt": result.psm.retention_time if result.psm.retention_time else None,
}
# self.peprec_dict = peprec_tmp.to_dict(orient="index")
# def _generate_preds_dict(self):
# """
# Create easy to access dict from peprec dataframes
# """
# self.preds_dict = {}
# preds_list = self.all_preds[
# ["spec_id", "charge", "ion", "ionnumber", "mz", "prediction"]
# ].values.tolist()
class MSP(_Writer):
"""Write MSP files from MS2PIP processing results."""
# for row in preds_list:
# spec_id = row[0]
# if spec_id in self.preds_dict.keys():
# if row[2] in self.preds_dict[spec_id]["peaks"]:
# self.preds_dict[spec_id]["peaks"][row[2]].append(tuple(row[3:]))
# else:
# self.preds_dict[spec_id]["peaks"][row[2]] = [tuple(row[3:])]
# else:
# self.preds_dict[spec_id] = {
# "charge": row[1],
# "peaks": {row[2]: [tuple(row[3:])]},
# }
suffix = ".msp"
# def _normalize_spectra(self, method="basepeak_10000"):
# """
# Normalize spectra
# """
# if self.is_log_space:
# self.all_preds["prediction"] = ((2 ** self.all_preds["prediction"]) - 0.001).clip(
# lower=0
# )
# self.is_log_space = False
def write(self, results: List[ProcessingResult]):
"""Write multiple processing results to file."""
for result in results:
self._write_result(result)
# if method == "basepeak_10000":
# if self.normalization == "basepeak_10000":
# pass
# elif self.normalization == "basepeak_1":
# self.all_preds["prediction"] *= 10000
# else:
# self.all_preds["prediction"] = self.all_preds.groupby(
# ["spec_id"], group_keys=False
# )["prediction"].apply(lambda x: (x / x.max()) * 10000)
# self.normalization = "basepeak_10000"
def _write_result(self, result: ProcessingResult):
"""Write single processing result to file."""
predicted_spectrum = result.as_spectra()[0]
intensity_normalized = _basepeak_normalize(predicted_spectrum.intensity) * 1e4
peaks = zip(predicted_spectrum.mz, intensity_normalized, predicted_spectrum.annotations)
# elif method == "basepeak_1":
# if self.normalization == "basepeak_1":
# pass
# elif self.normalization == "basepeak_10000":
# self.all_preds["prediction"] /= 10000
# else:
# self.all_preds["prediction"] = self.all_preds.groupby(
# ["spec_id"], group_keys=False
# )["prediction"].apply(lambda x: (x / x.max()))
# self.normalization = "basepeak_1"
# Header
lines = [
f"Name: {result.psm.peptidoform.sequence}/{result.psm.get_precursor_charge()}",
f"MW: {result.psm.peptidoform.theoretical_mass}",
self._format_comment_line(result.psm),
f"Num peaks: {len(predicted_spectrum.mz)}",
]
# elif method == "tic":
# if self.normalization != "tic":
# self.all_preds["prediction"] = self.all_preds.groupby(
# ["spec_id"], group_keys=False
# )["prediction"].apply(lambda x: x / x.sum())
# self.normalization = "tic"
# Peaks
lines.extend(
f"{mz:.8f}\t{intensity:.8f}\t{annotation}/0.0" for mz, intensity, annotation in peaks
)
# else:
# raise NotImplementedError
# Write to file
self._file_object.writelines(line + "\n" for line in lines)
self._file_object.write("\n")
def _get_msp_peak_annotation(
self,
peak_dict,
sep="\t",
include_zero=False,
include_annotations=True,
intensity_type=float,
):
"""
Get MGF/MSP-like peaklist string
"""
all_peaks = []
for ion_type, peaks in peak_dict.items():
for peak in peaks:
if not include_zero and peak[2] == 0:
continue
if include_annotations:
all_peaks.append(
(
peak[1],
f'{peak[1]:.6f}{sep}{intensity_type(peak[2])}{sep}"{ion_type.lower()}{peak[0]}/0.0"',
)
)
else:
all_peaks.append((peak[1], f"{peak[1]:.6f}{sep}{peak[2]}"))
@staticmethod
def _format_modifications(peptidoform: Peptidoform):
"""Format modifications in MSP-style string, e.g. ``Mods=1/0,E,Glu->pyro-Glu``."""
all_peaks = sorted(all_peaks, key=itemgetter(0))
peak_string = "\n".join([peak[1] for peak in all_peaks])
def _format_single_modification(
amino_acid: str,
position: int,
modifications: Optional[List[proforma.ModificationBase]],
) -> Union[str, None]:
"""Get modification label from :py:class:`proforma.ModificationBase` list."""
if not modifications:
return None
if len(modifications) > 1:
raise ValueError("Multiple modifications per amino acid not supported.")
modification = modifications[0]
return f"{position},{amino_acid},{modification.name}"
return peak_string
sequence_mods = [
_format_single_modification(aa, pos + 1, mods)
for pos, (aa, mods) in enumerate(peptidoform.parsed_sequence)
]
n_term = _format_single_modification(
peptidoform.sequence[0], 0, peptidoform.properties["n_term"]
)
c_term = _format_single_modification(
peptidoform.sequence[-1], -1, peptidoform.properties["c_term"]
)
def _get_msp_modifications(self, sequence, modifications):
"""
Format modifications in MSP-style, e.g. "1/0,E,Glu->pyro-Glu"
"""
mods = [mod for mod in [n_term] + sequence_mods + [c_term] if mod is not None]
if isinstance(modifications, str):
if not modifications or modifications == "-":
msp_modifications = "0"
else:
mods = modifications.split("|")
mods = [(int(mods[i]), mods[i + 1]) for i in range(0, len(mods), 2)]
mods = [(x, y) if x == 0 else (x - 1, y) for (x, y) in mods]
mods = sorted(mods)
mods = [(str(x), sequence[x], y) for (x, y) in mods]
msp_modifications = "/".join([",".join(list(x)) for x in mods])
msp_modifications = f"{len(mods)}/{msp_modifications}"
if not mods:
return "Mods=0"
else:
msp_modifications = "0"
return f"Mods={len(mods)}/{'/'.join(mods)}"
return msp_modifications
@staticmethod
def _format_parent_mass(peptidoform: Peptidoform) -> str:
"""Format parent mass as string."""
return f"Parent={peptidoform.theoretical_mz}"
def _parse_protein_string(self, protein_list):
"""
Parse protein string from list, list string literal, or string.
"""
if isinstance(protein_list, list):
protein_string = "/".join(protein_list)
elif isinstance(protein_list, str):
try:
protein_string = "/".join(literal_eval(protein_list))
except ValueError:
protein_string = protein_list
@staticmethod
def _format_protein_string(psm: PSM) -> Union[str, None]:
"""Format protein list as string."""
if psm.protein_list:
return f"Protein={','.join(psm.protein_list)}"
else:
protein_string = ""
return protein_string
return None
def _get_last_ssl_scannr(self):
"""
Return scan number of last line in a Bibliospec SSL file.
"""
ssl_filename = "{}_predictions.ssl".format(self.output_filename)
with open(ssl_filename, "rt") as ssl:
for line in ssl:
last_line = line
last_scannr = int(last_line.split("\t")[1])
return last_scannr
@staticmethod
def _format_retention_time(psm: PSM) -> Union[str, None]:
"""Format retention time as string."""
if psm.retention_time:
return f"RetentionTime={psm.retention_time}"
else:
return None
def _generate_diff_modification_mapping(self, precision):
"""
Make modification name -> ssl modification name mapping.
"""
self.diff_modification_mapping[precision] = {
ptm.split(",")[0]: "{0:+.{1}f}".format(float(ptm.split(",")[1]), precision)
for ptm in self.params["ptm"]
}
@staticmethod
def _format_identifier(psm: PSM) -> str:
"""Format MS2PIP ID as string."""
return f"SpectrumIdentifier={psm.spectrum_id}"
def _get_diff_modified_sequence(self, sequence, modifications, precision=1):
"""
Build BiblioSpec SSL modified sequence string.
"""
pep = list(sequence)
mapping = self.diff_modification_mapping[precision]
@staticmethod
def _format_comment_line(psm: PSM) -> str:
"""Format comment line for MSP file."""
comments = " ".join(
filter(
None,
[
MSP._format_modifications(psm.peptidoform),
MSP._format_parent_mass(psm.peptidoform),
MSP._format_protein_string(psm),
MSP._format_retention_time(psm),
MSP._format_identifier(psm),
],
)
)
return f"Comment: {comments}"
for loc, name in zip(modifications.split("|")[::2], modifications.split("|")[1::2]):
# C-term mod
if loc == "-1":
pep[-1] = pep[-1] + "[{}]".format(mapping[name])
# N-term mod
elif loc == "0":
pep[0] = pep[0] + "[{}]".format(mapping[name])
# Normal mod
else:
pep[int(loc) - 1] = pep[int(loc) - 1] + "[{}]".format(mapping[name])
return "".join(pep)
def write_results(self, output_formats: List[str]) -> Dict[str, Any]:
"""
Write MS2PIP predictions in output formats defined by output_formats.
"""
results = {}
for output_format in output_formats:
output_format = output_format.lower()
writer = self.OUTPUT_FORMATS[output_format]
results[output_format] = writer(self)
return results
class MGF(_Writer):
"""Write MGF files from MS2PIP processing results."""
@output_format("msp")
@writer(
file_suffix="_predictions.msp",
normalization_method="basepeak_10000",
requires_dicts=True,
requires_diff_modifications=False,
)
def write_msp(self, file_object):
"""
Construct MSP string and write to file_object.
"""
suffix = ".mgf"
for spec_id in sorted(self.peprec_dict.keys()):
seq = self.peprec_dict[spec_id]["peptide"]
mods = self.peprec_dict[spec_id]["modifications"]
charge = self.peprec_dict[spec_id]["charge"]
prec_mass, prec_mz = self.mods.calc_precursor_mz(seq, mods, charge)
msp_modifications = self._get_msp_modifications(seq, mods)
num_peaks = sum(
[len(peaklist) for _, peaklist in self.preds_dict[spec_id]["peaks"].items()]
)
def write(self, results: List[ProcessingResult]):
"""Write multiple processing results to file."""
for result in results:
self._write_result(result)
comment_line = f" Mods={msp_modifications} Parent={prec_mz}"
def _write_result(self, result: ProcessingResult):
"""Write single processing result to file."""
predicted_spectrum = result.as_spectra()[0]
intensity_normalized = _basepeak_normalize(predicted_spectrum.intensity) * 1e4
peaks = zip(predicted_spectrum.mz, intensity_normalized)
if self.has_protein_list:
protein_list = self.peprec_dict[spec_id]["protein_list"]
protein_string = self._parse_protein_string(protein_list)
comment_line += f' Protein="{protein_string}"'
# Header
lines = [
"BEGIN IONS",
f"TITLE={result.psm.peptidoform}",
f"PEPMASS={result.psm.peptidoform.theoretical_mz}",
f"CHARGE={result.psm.get_precursor_charge()}+",
f"SCANS={result.psm.spectrum_id}",
f"RTINSECONDS={result.psm.retention_time}" if result.psm.retention_time else None,
]
if self.has_rt:
rt = self.peprec_dict[spec_id]["rt"]
comment_line += f" RetentionTime={rt}"
# Peaks
lines.extend(f"{mz:.8f} {intensity:.8f}" for mz, intensity in peaks)
comment_line += f' MS2PIP_ID="{spec_id}"'
# Write to file
self._file_object.writelines(line + "\n" for line in lines if line)
self._file_object.write("END IONS\n\n")
out = [
f"Name: {seq}/{charge}",
f"MW: {prec_mass}",
f"Comment:{comment_line}",
f"Num peaks: {num_peaks}",
self._get_msp_peak_annotation(
self.preds_dict[spec_id]["peaks"],
sep="\t",
include_annotations=True,
intensity_type=int,
),
]
file_object.writelines([line + "\n" for line in out] + ["\n"])
class Spectronaut(_Writer):
"""Write Spectronaut files from MS2PIP processing results."""
@output_format("mgf")
@writer(
file_suffix="_predictions.mgf",
normalization_method="basepeak_10000",
requires_dicts=True,
requires_diff_modifications=False,
)
def write_mgf(self, file_object):
"""
Construct MGF string and write to file_object
"""
for spec_id in sorted(self.peprec_dict.keys()):
seq = self.peprec_dict[spec_id]["peptide"]
mods = self.peprec_dict[spec_id]["modifications"]
charge = self.peprec_dict[spec_id]["charge"]
_, prec_mz = self.mods.calc_precursor_mz(seq, mods, charge)
msp_modifications = self._get_msp_modifications(seq, mods)
suffix = ".spectronaut.tsv"
field_names = [
"ModifiedPeptide",
"StrippedPeptide",
"PrecursorCharge",
"PrecursorMz",
"IonMobility",
"iRT",
"ProteinId",
"RelativeFragmentIntensity",
"FragmentMz",
"FragmentType",
"FragmentNumber",
"FragmentCharge",
"FragmentLossType",
]
if self.has_protein_list:
protein_list = self.peprec_dict[spec_id]["protein_list"]
protein_string = self._parse_protein_string(protein_list)
else:
protein_string = ""
def write(self, processing_results: List[ProcessingResult]):
"""Write multiple processing results to file."""
writer = csv.DictWriter(
self._file_object, fieldnames=self.field_names, delimiter="\t", lineterminator="\n"
)
if self.write_mode == "w":
writer.writeheader()
for result in processing_results:
self._write_result(result, writer)
out = [
"BEGIN IONS",
f"TITLE={spec_id} {seq}/{charge} {msp_modifications} {protein_string}",
f"PEPMASS={prec_mz}",
f"CHARGE={charge}+",
]
def _write_result(self, result: ProcessingResult, writer: csv.DictWriter):
"""Write single processing result to file."""
# Only write results with predictions
if result.predicted_intensity is None:
return
psm_info = self._process_psm(result.psm)
for fragment_info in self._yield_fragment_info(result):
writer.writerow({**psm_info, **fragment_info})
if self.has_rt:
rt = self.peprec_dict[spec_id]["rt"]
out.append(f"RTINSECONDS={rt}")
@staticmethod
def _process_psm(psm: PSM) -> Dict[str, Any]:
"""Process PSM to Spectronaut format."""
return {
"ModifiedPeptide": _peptidoform_str_without_charge(psm.peptidoform),
"StrippedPeptide": psm.peptidoform.sequence,
"PrecursorCharge": psm.get_precursor_charge(),
"PrecursorMz": f"{psm.peptidoform.theoretical_mz:.8f}",
"IonMobility": f"{psm.ion_mobility:.8f}" if psm.ion_mobility else None,
"iRT": f"{psm.retention_time:.8f}" if psm.retention_time else None,
"ProteinId": "".join(psm.protein_list) if psm.protein_list else None,
}
out.append(
self._get_msp_peak_annotation(
self.preds_dict[spec_id]["peaks"],
sep=" ",
include_annotations=False,
)
)
out.append("END IONS\n")
file_object.writelines([line + "\n" for line in out])
@staticmethod
def _yield_fragment_info(result: ProcessingResult) -> Generator[Dict[str, Any], None, None]:
"""Yield fragment information for a processing result."""
# Normalize intensities
intensities = {
ion_type: _unlogarithmize(intensities)
for ion_type, intensities in result.predicted_intensity.items()
}
max_intensity = max(itertools.chain(*intensities.values()))
intensities = {
ion_type: _basepeak_normalize(intensities[ion_type], basepeak=max_intensity)
for ion_type in intensities
}
for ion_type in result.predicted_intensity:
fragment_type = ion_type[0].lower()
fragment_charge = ion_type[1:] if len(ion_type) > 1 else "1"
for ion_index, (intensity, mz) in enumerate(
zip(intensities[ion_type], result.theoretical_mz[ion_type])
):
yield {
"RelativeFragmentIntensity": f"{intensity:.8f}",
"FragmentMz": f"{mz:.8f}",
"FragmentType": fragment_type,
"FragmentNumber": ion_index + 1,
"FragmentCharge": fragment_charge,
"FragmentLossType": "noloss",
}
@output_format("spectronaut")
@writer(
file_suffix="_predictions_spectronaut.csv",
normalization_method="tic",
requires_dicts=False,
requires_diff_modifications=True,
)
def write_spectronaut(self, file_obj):
"""
Construct spectronaut DataFrame and write to file_object.
"""
if "w" in self.write_mode:
header = True
elif "a" in self.write_mode:
header = False
else:
raise InvalidWriteModeError(self.write_mode)
spectronaut_peprec = self.peprec.copy()
class Bibliospec(_Writer):
"""
Write Bibliospec SSL and MS2 files from MS2PIP processing results.
# ModifiedPeptide and PrecursorMz columns
spectronaut_peprec["ModifiedPeptide"] = spectronaut_peprec.apply(
lambda row: self._get_diff_modified_sequence(row["peptide"], row["modifications"]),
axis=1,
)
spectronaut_peprec["PrecursorMz"] = spectronaut_peprec.apply(
lambda row: self.mods.calc_precursor_mz(
row["peptide"], row["modifications"], row["charge"]
)[1],
axis=1,
)
spectronaut_peprec["ModifiedPeptide"] = "_" + spectronaut_peprec["ModifiedPeptide"] + "_"
Bibliospec SSL and MS2 files are also compatible with Skyline.
# Additional columns
spectronaut_peprec["FragmentLossType"] = "noloss"
"""
# Retention time
if "rt" in spectronaut_peprec.columns:
rt_cols = ["iRT"]
spectronaut_peprec["iRT"] = spectronaut_peprec["rt"]
else:
rt_cols = []
ssl_suffix = ".ssl"
ms2_suffix = ".ms2"
ssl_field_names = [
"file",
"scan",
"charge",
"sequence",
"score-type",
"score",
"retention-time",
]
# ProteinId
if self.has_protein_list:
spectronaut_peprec["ProteinId"] = spectronaut_peprec["protein_list"].apply(
self._parse_protein_string
)
else:
spectronaut_peprec["ProteinId"] = spectronaut_peprec["spec_id"]
def __init__(self, filename: Union[str, Path], write_mode: str = "w"):
super().__init__(filename, write_mode)
self.ssl_file = self.filename.with_suffix(self.ssl_suffix)
self.ms2_file = self.filename.with_suffix(self.ms2_suffix)
# Rename columns and merge with predictions
spectronaut_peprec = spectronaut_peprec.rename(
columns={"charge": "PrecursorCharge", "peptide": "StrippedPeptide"}
)
peptide_cols = (
[
"ModifiedPeptide",
"StrippedPeptide",
"PrecursorCharge",
"PrecursorMz",
"ProteinId",
]
+ rt_cols
+ ["FragmentLossType"]
)
spectronaut_df = spectronaut_peprec[peptide_cols + ["spec_id"]]
spectronaut_df = self.all_preds.merge(spectronaut_df, on="spec_id")
self._open_ssl_file = None
self._open_ms2_file = None
# Fragment columns
spectronaut_df["FragmentCharge"] = (
spectronaut_df["ion"].str.contains("2").map({True: 2, False: 1})
)
spectronaut_df["FragmentType"] = spectronaut_df["ion"].str[0].str.lower()
def open(self):
"""Open files."""
self._open_ssl_file = open(self.ssl_file, self.write_mode)
self._open_ms2_file = open(self.ms2_file, self.write_mode)
# Rename and sort columns
spectronaut_df = spectronaut_df.rename(
columns={
"mz": "FragmentMz",
"prediction": "RelativeIntensity",
"ionnumber": "FragmentNumber",
}
)
fragment_cols = [
"FragmentCharge",
"FragmentMz",
"RelativeIntensity",
"FragmentType",
"FragmentNumber",
]
spectronaut_df = spectronaut_df[peptide_cols + fragment_cols]
try:
spectronaut_df.to_csv(
file_obj, index=False, header=header, sep=";", lineterminator="\n"
def close(self):
"""Close files."""
if self._open_ssl_file:
self._open_ssl_file.close()
self._open_ssl_file = None
if self._open_ms2_file:
self._open_ms2_file.close()
self._open_ms2_file = None
@property
def _ssl_file_object(self):
"""Get open SSL file object."""
if self._open_ssl_file:
return self._open_ssl_file
else:
warnings.warn(
"Opening file outside of context manager. Manually close file after use."
)
except TypeError: # Pandas < 1.5 (Required for Python 3.7 support)
spectronaut_df.to_csv(
file_obj, index=False, header=header, sep=";", line_terminator="\n"
self.open()
return self._open_ssl_file
@property
def _ms2_file_object(self):
"""Get open MS2 file object."""
if self._open_ms2_file:
return self._open_ms2_file
else:
warnings.warn(
"Opening file outside of context manager. Manually close file after use."
)
self.open()
return self._open_ms2_file
return file_obj
def write(self, processing_results: List[ProcessingResult]):
"""Write multiple processing results to file."""
# Create CSV writer
ssl_dict_writer = csv.DictWriter(
self._ssl_file_object,
fieldnames=self.ssl_field_names,
delimiter="\t",
lineterminator="\n",
)
def _write_bibliospec_core(self, file_obj_ssl, file_obj_ms2, start_scannr=0):
"""Construct Bibliospec SSL/MS2 strings and write to file_objects."""
# Write headers
if self.write_mode == "w":
ssl_dict_writer.writeheader()
self._write_ms2_header()
start_scan_number = 0
elif self.write_mode == "a":
start_scan_number = self._get_last_ssl_scan_number(self.ssl_file) + 1
else:
raise ValueError(f"Unsupported write mode: {self.write_mode}")
for i, spec_id in enumerate(sorted(self.preds_dict.keys())):
scannr = i + start_scannr
seq = self.peprec_dict[spec_id]["peptide"]
mods = self.peprec_dict[spec_id]["modifications"]
charge = self.peprec_dict[spec_id]["charge"]
prec_mass, prec_mz = self.mods.calc_precursor_mz(seq, mods, charge)
ms2_filename = os.path.basename(self.output_filename) + "_predictions.ms2"
# Write results
for i, result in enumerate(processing_results):
scan_number = start_scan_number + i
modified_sequence = self._format_modified_sequence(result.psm.peptidoform)
self._write_result(result, modified_sequence, scan_number, ssl_dict_writer)
peaks = self._get_msp_peak_annotation(
self.preds_dict[spec_id]["peaks"],
sep="\t",
include_annotations=False,
)
def _write_ms2_header(self):
"""Write header to MS2 file."""
self._ms2_file_object.write(
f"H\tCreationDate\t{strftime('%Y-%m-%d %H:%M:%S', localtime())}\n"
)
self._ms2_file_object.write("H\tExtractor\tMS2PIP predictions\n")
if isinstance(mods, str) and mods != "-" and mods != "":
mod_seq = self._get_diff_modified_sequence(seq, mods)
else:
mod_seq = seq
def _write_result(
self,
result: ProcessingResult,
modified_sequence: str,
scan_number: int,
writer: csv.DictWriter,
):
"""Write single processing result to files."""
self._write_result_to_ssl(result, modified_sequence, scan_number, writer)
self._write_result_to_ms2(result, modified_sequence, scan_number)
rt = self.peprec_dict[spec_id]["rt"] if self.has_rt else ""
# TODO: implement csv instead of manual writing
file_obj_ssl.write(
"\t".join([ms2_filename, str(scannr), str(charge), mod_seq, "", "", str(rt)])
+ "\n"
)
file_obj_ms2.write(
"\n".join(
[
f"S\t{scannr}\t{prec_mz}",
f"Z\t{charge}\t{prec_mass}",
f"D\tseq\t{seq}",
f"D\tmodified seq\t{mod_seq}",
peaks,
]
)
+ "\n"
)
def _write_general(
def _write_result_to_ssl(
self,
write_function,
file_suffix,
normalization_method,
requires_dicts,
requires_diff_modifications,
diff_modification_precision=1,
result: ProcessingResult,
modified_sequence: str,
scan_number: int,
writer: csv.DictWriter,
):
"""
General write function to call core write functions.
"""Write single processing result to the SSL file."""
writer.writerow(
{
"file": self.ms2_file.name if isinstance(self.ms2_file, Path) else "file.ms2",
"scan": scan_number,
"charge": result.psm.get_precursor_charge(),
"sequence": modified_sequence,
"score-type": None,
"score": None,
"retention-time": result.psm.retention_time if result.psm.retention_time else None,
}
)
Note: Does not work for write_bibliospec and write_dlib functions.
"""
def _write_result_to_ms2(
self, result: ProcessingResult, modified_sequence: str, scan_number: int
):
"""Write single processing result to the MS2 file."""
predicted_spectrum = result.as_spectra()[0]
intensity_normalized = _basepeak_normalize(predicted_spectrum.intensity) * 1e4
peaks = zip(predicted_spectrum.mz, intensity_normalized)
# Normalize if necessary and make dicts
if not self.normalization == normalization_method:
self._normalize_spectra(method=normalization_method)
if requires_dicts:
self._generate_preds_dict()
elif requires_dicts and not self.preds_dict:
self._generate_preds_dict()
if requires_dicts and not self.peprec_dict:
self._generate_peprec_dict()
# Header
lines = [
f"S\t{scan_number}\t{result.psm.peptidoform.theoretical_mz}",
f"Z\t{result.psm.get_precursor_charge()}\t{result.psm.peptidoform.theoretical_mass}",
f"D\tseq\t{result.psm.peptidoform.sequence}",
f"D\tmodified seq\t{modified_sequence}",
]
if (
requires_diff_modifications
and diff_modification_precision not in self.diff_modification_mapping
):
self._generate_diff_modification_mapping(diff_modification_precision)
# Peaks
lines.extend(f"{mz:.8f}\t{intensity:.8f}" for mz, intensity in peaks)
# Write to file or stringbuffer
if self.return_stringbuffer:
file_object = StringIO()
logger.info("Writing results to StringIO using %s", write_function.__name__)
# Write to file
self._ms2_file_object.writelines(line + "\n" for line in lines)
self._ms2_file_object.write("\n")
@staticmethod
def _format_modified_sequence(peptidoform: Peptidoform) -> str:
"""Format modified sequence as string for Spectronaut."""
modification_dict = defaultdict(list)
for term, position in [("n_term", 0), ("c_term", len(peptidoform) - 1)]:
if peptidoform.properties[term]:
modification_dict[position].extend(peptidoform.properties[term])
for position, (_, mods) in enumerate(peptidoform.parsed_sequence):
if mods:
modification_dict[position].extend(mods)
return "".join(
[
f"{aa}{''.join([f'[{mod.mass:+.1f}]' for mod in modification_dict[position]])}"
for position, aa in enumerate(peptidoform.sequence)
]
)
@staticmethod
def _get_last_ssl_scan_number(ssl_file: Union[str, Path, StringIO]):
"""Read scan number of last line in a Bibliospec SSL file."""
if isinstance(ssl_file, StringIO):
ssl_file.seek(0)
for line in ssl_file:
last_line = line
elif isinstance(ssl_file, (str, Path)):
with open(ssl_file, "rt") as ssl:
for line in ssl:
last_line = line
else:
f_name = self.output_filename + file_suffix
file_object = open(f_name, self.write_mode)
logger.info("Writing results to %s", f_name)
raise TypeError("Unsupported type for `ssl_file`.")
return int(last_line.split("\t")[1])
write_function(self, file_object)
return file_object
class DLIB(_Writer):
"""
Write DLIB files from MS2PIP processing results.
@output_format("bibliospec")
def write_bibliospec(self):
"""Write MS2PIP predictions to BiblioSpec/Skyline SSL and MS2 spectral library files."""
precision = 1
if precision not in self.diff_modification_mapping:
self._generate_diff_modification_mapping(precision)
See `EncyclopeDIA File Formats <https://bitbucket.org/searleb/encyclopedia/wiki/EncyclopeDIA%20File%20Formats>`_
for documentation on the DLIB format.
# Normalize if necessary and make dicts
if not self.normalization == "basepeak_10000":
self._normalize_spectra(method="basepeak_10000")
self._generate_preds_dict()
elif not self.preds_dict:
self._generate_preds_dict()
if not self.peprec_dict:
self._generate_peprec_dict()
"""
if self.return_stringbuffer:
file_obj_ssl = StringIO()
file_obj_ms2 = StringIO()
else:
file_obj_ssl = open("{}_predictions.ssl".format(self.output_filename), self.write_mode)
file_obj_ms2 = open("{}_predictions.ms2".format(self.output_filename), self.write_mode)
suffix = ".dlib"
# If a new file is written, write headers
if "w" in self.write_mode:
start_scannr = 0
ssl_header = [
"file",
"scan",
"charge",
"sequence",
"score-type",
"score",
"retention-time",
"\n",
]
file_obj_ssl.write("\t".join(ssl_header))
file_obj_ms2.write(
"H\tCreationDate\t{}\n".format(strftime("%Y-%m-%d %H:%M:%S", localtime()))
)
file_obj_ms2.write("H\tExtractor\tMS2PIP predictions\n")
else:
# Get last scan number of ssl file, to continue indexing from there
# because Bibliospec speclib scan numbers can only be integers
start_scannr = self._get_last_ssl_scannr() + 1
def open(self):
"""Open file."""
if self.write_mode == "w":
self._open_file = self.filename.unlink(missing_ok=True)
self._open_file = dlib.open_sqlite(self.filename)
self._write_bibliospec_core(file_obj_ssl, file_obj_ms2, start_scannr=start_scannr)
def write(self, processing_results: List[ProcessingResult]):
"""Write MS2PIP predictions to a DLIB SQLite file."""
connection = self._file_object
dlib.metadata.create_all()
self._write_metadata(connection)
self._write_entries(processing_results, connection, self.filename)
self._write_peptide_to_protein(processing_results, connection)
return file_obj_ssl, file_obj_ms2
def _write_result(self, result: ProcessingResult): ...
def _write_dlib_metadata(self, connection):
from sqlalchemy import select
@staticmethod
def _format_modified_sequence(peptidoform: Peptidoform) -> str:
"""Format modified sequence as string for DLIB."""
# Sum all sequential mass shifts for each position
masses = [
sum(mod.mass for mod in mods) if mods else 0 for _, mods in peptidoform.parsed_sequence
]
from ms2pip._utils.dlib import DLIB_VERSION, Metadata
# Add N- and C-terminal modifications
for term, position in [("n_term", 0), ("c_term", len(peptidoform) - 1)]:
if peptidoform.properties[term]:
masses[position] += sum(mod.mass for mod in peptidoform.properties[term])
# Format modified sequence
return "".join(
[
f"{aa}[{mass:+.6f}]" if mass else aa
for aa, mass in zip(peptidoform.sequence, masses)
]
)
@staticmethod
def _write_metadata(connection: engine.Connection):
"""Write metadata to DLIB SQLite file."""
with connection.begin():
version = connection.execute(
select([Metadata.c.Value]).where(Metadata.c.Key == "version")
select([dlib.Metadata.c.Value]).where(dlib.Metadata.c.Key == "version")
).scalar()
if version is None:
connection.execute(
Metadata.insert().values(
dlib.Metadata.insert().values(
Key="version",
Value=DLIB_VERSION,
Value=dlib.DLIB_VERSION,
)
)
def _write_dlib_entries(self, connection, precision):
from ms2pip._utils.dlib import Entry
peptide_to_proteins = set()
@staticmethod
def _write_entries(
processing_results: List[ProcessingResult],
connection: engine.Connection,
output_filename: str,
):
"""Write spectra to DLIB SQLite file."""
with connection.begin():
for spec_id, peprec in self.peprec_dict.items():
seq = peprec["peptide"]
mods = peprec["modifications"]
charge = peprec["charge"]
for result in processing_results:
if not result.psm.retention_time:
raise ValueError("Retention time required to write DLIB file.")
prec_mass, prec_mz = self.mods.calc_precursor_mz(seq, mods, charge)
mod_seq = self._get_diff_modified_sequence(seq, mods, precision=precision)
spectrum = result.as_spectra()[0]
intensity_normalized = _basepeak_normalize(spectrum.intensity) * 1e4
n_peaks = len(spectrum.mz)
all_peaks = sorted(
itertools.chain.from_iterable(self.preds_dict[spec_id]["peaks"].values()),
key=itemgetter(1),
)
mzs = [peak[1] for peak in all_peaks]
intensities = [peak[2] for peak in all_peaks]
connection.execute(
Entry.insert().values(
PrecursorMz=prec_mz,
PrecursorCharge=charge,
PeptideModSeq=mod_seq,
PeptideSeq=seq,
dlib.Entry.insert().values(
PrecursorMz=result.psm.peptidoform.theoretical_mz,
PrecursorCharge=result.psm.get_precursor_charge(),
PeptideModSeq=DLIB._format_modified_sequence(result.psm.peptidoform),
PeptideSeq=result.psm.peptidoform.sequence,
Copies=1,
RTInSeconds=peprec["rt"],
RTInSeconds=result.psm.retention_time,
Score=0,
MassEncodedLength=len(mzs),
MassArray=mzs,
IntensityEncodedLength=len(intensities),
IntensityArray=intensities,
SourceFile=self.output_filename,
MassEncodedLength=n_peaks,
MassArray=spectrum.mz.tolist(),
IntensityEncodedLength=n_peaks,
IntensityArray=intensity_normalized.tolist(),
SourceFile=str(output_filename),
)
)
if self.has_protein_list:
protein_list = peprec["protein_list"]
if isinstance(protein_list, str):
protein_list = literal_eval(protein_list)
@staticmethod
def _write_peptide_to_protein(results: List[ProcessingResult], connection: engine.Connection):
"""Write peptide-to-protein mappings to DLIB SQLite file."""
peptide_to_proteins = {
(result.psm.peptidoform.sequence, protein)
for result in results
if result.psm.protein_list
for protein in result.psm.protein_list
}
for protein in protein_list:
peptide_to_proteins.add((seq, protein))
return peptide_to_proteins
def _write_dlib_peptide_to_protein(self, connection, peptide_to_proteins):
from ms2pip._utils.dlib import PeptideToProtein
if not self.has_protein_list:
return
with connection.begin():

@@ -717,3 +724,5 @@ sql_peptide_to_proteins = set()

for peptide_to_protein in connection.execute(
PeptideToProtein.select().where(PeptideToProtein.c.ProteinAccession.in_(proteins))
dlib.PeptideToProtein.select().where(
dlib.PeptideToProtein.c.ProteinAccession.in_(proteins)
)
):

@@ -730,3 +739,3 @@ sql_peptide_to_proteins.add(

connection.execute(
PeptideToProtein.insert().values(
dlib.PeptideToProtein.insert().values(
PeptideSeq=seq, isDecoy=False, ProteinAccession=protein

@@ -736,91 +745,27 @@ )

@output_format("dlib")
def write_dlib(self):
"""Write MS2PIP predictions to a DLIB SQLite file."""
from ms2pip._utils.dlib import metadata, open_sqlite
normalization = "basepeak_10000"
precision = 5
if not self.normalization == normalization:
self._normalize_spectra(method=normalization)
self._generate_preds_dict()
if not self.peprec_dict:
self._generate_peprec_dict()
if precision not in self.diff_modification_mapping:
self._generate_diff_modification_mapping(precision)
SUPPORTED_FORMATS = {
"tsv": TSV,
"msp": MSP,
"mgf": MGF,
"spectronaut": Spectronaut,
"bibliospec": Bibliospec,
"dlib": DLIB,
}
filename = "{}.dlib".format(self.output_filename)
logger.info("Writing results to %s", filename)
logger.debug(
"write mode is ignored for DLIB at the file mode, although append or not is respected"
)
if "a" not in self.write_mode and os.path.exists(filename):
os.remove(filename)
def _peptidoform_str_without_charge(peptidoform: Peptidoform) -> str:
"""Get peptidoform string without charge."""
return re.sub(r"\/\d+$", "", str(peptidoform))
if self.return_stringbuffer:
raise NotImplementedError("`return_stringbuffer` not implemented for DLIB output.")
if not self.has_rt:
raise NotImplementedError("Retention times required to write DLIB file.")
def _unlogarithmize(intensities: np.array) -> np.array:
"""Undo logarithmic transformation of intensities."""
return (2**intensities) - 0.001
with open_sqlite(filename) as connection:
metadata.create_all()
self._write_dlib_metadata(connection)
peptide_to_proteins = self._write_dlib_entries(connection, precision)
self._write_dlib_peptide_to_protein(connection, peptide_to_proteins)
def get_normalized_predictions(self, normalization_method="tic"):
"""Return normalized copy of predictions."""
self._normalize_spectra(method=normalization_method)
return self.all_preds.copy()
@output_format("csv")
def write_csv(self):
"""Write MS2PIP predictions to CSV."""
self._normalize_spectra(method="tic")
# Write to file or stringbuffer
if self.return_stringbuffer:
file_object = StringIO()
logger.info("Writing results to StringIO using %s", "write_csv")
else:
f_name = "{}_predictions.csv".format(self.output_filename)
file_object = open(f_name, self.write_mode)
logger.info("Writing results to %s", f_name)
try:
self.all_preds.to_csv(
file_object, float_format="%.6g", index=False, lineterminator="\n"
)
except TypeError: # Pandas < 1.5 (Required for Python 3.7 support)
self.all_preds.to_csv(
file_object, float_format="%.6g", index=False, line_terminator="\n"
)
return file_object
def write_single_spectrum_csv(spectrum, filepath):
"""Write a single spectrum to a CSV file."""
with open(filepath, "wt") as f:
writer = csv.writer(f, delimiter=",", lineterminator="\n")
writer.writerow(["mz", "intensity", "annotation"])
for mz, intensity, annotation in zip(
spectrum.mz,
spectrum.intensity,
spectrum.annotations,
):
writer.writerow([mz, intensity, annotation])
def write_single_spectrum_png(spectrum, filepath):
"""Plot a single spectrum and write to a PNG file."""
import matplotlib.pyplot as plt
import spectrum_utils.plot as sup
ax = plt.gca()
ax.set_title("MS²PIP prediction for " + str(spectrum.peptidoform))
sup.spectrum(spectrum.to_spectrum_utils(), ax=ax)
plt.savefig(Path(filepath).with_suffix(".png"))
plt.close()
def _basepeak_normalize(intensities: np.array, basepeak: Optional[float] = None) -> np.array:
"""Normalize intensities to most intense peak."""
if not basepeak:
basepeak = intensities.max()
return intensities / basepeak

@@ -75,6 +75,6 @@ """MS2 spectrum handling."""

def check_array_lengths(cls, data: dict):
if len(data["mz"]) != len(data["intensity"]):
if len(data.mz) != len(data.intensity):
raise ValueError("Array lengths do not match.")
if data["annotations"] is not None:
if len(data["annotations"]) != len(data["intensity"]):
if data.annotations is not None:
if len(data.annotations) != len(data.intensity):
raise ValueError("Array lengths do not match.")

@@ -81,0 +81,0 @@ return data

Metadata-Version: 2.1
Name: ms2pip
Version: 4.0.0.dev12
Version: 4.0.0.dev13
Summary: MS2PIP: Accurate and versatile peptide fragmentation spectrum prediction.

@@ -319,2 +319,3 @@ Author: Ana Sílvia C. Silva

training.
- ``annotate-spectra``: Annotate peaks in observed spectra.

@@ -321,0 +322,0 @@ MS²PIP supports a wide range of PSM input formats and spectrum output formats, and includes

@@ -62,2 +62,3 @@ .. image:: https://github.com/compomics/ms2pip_c/raw/releases/img/ms2pip_logo_1000px.png

training.
- ``annotate-spectra``: Annotate peaks in observed spectra.

@@ -64,0 +65,0 @@ MS²PIP supports a wide range of PSM input formats and spectrum output formats, and includes

@@ -1,100 +0,47 @@

import os
import re
from psm_utils import Peptidoform
import numpy as np
import pandas as pd
from ms2pip.spectrum_output import MSP, Bibliospec, DLIB
from ms2pip.ms2pip_tools.spectrum_output import SpectrumOutput
class TestMSP:
def test__format_modification_string(self):
test_cases = [
("ACDE/2", "Mods=0"),
("AC[Carbamidomethyl]DE/2", "Mods=1/2,C,Carbamidomethyl"),
("[Glu->pyro-Glu]-EPEPTIDEK/2", "Mods=1/0,E,Glu->pyro-Glu"),
("PEPTIDEK-[Amidated]/2", "Mods=1/-1,K,Amidated"),
("AM[Oxidation]C[Carbamidomethyl]DE/2", "Mods=2/2,M,Oxidation/3,C,Carbamidomethyl"),
]
TEST_DIR = os.path.dirname(__file__)
for peptidoform_str, expected_output in test_cases:
peptidoform = Peptidoform(peptidoform_str)
assert MSP._format_modifications(peptidoform) == expected_output
class TestSpectrumOutput:
def test_integration(self):
def compare_line(test_line, target_line):
"""Assert if two lines in spectrum output are the same."""
# Extract float values from line and use assert_allclose, to allow for
# float imprecisions
float_pattern = re.compile(r"[0-9]*[.][0-9]+")
test_floats = float_pattern.findall(test_line)
target_floats = float_pattern.findall(target_line)
assert len(test_floats) == len(target_floats)
[
np.testing.assert_allclose(float(te), float(ta), rtol=1e-5)
for te, ta in zip(test_floats, target_floats)
]
assert float_pattern.sub(test_line, "") == float_pattern.sub(
target_line, ""
)
peprec = pd.read_pickle(
os.path.join(TEST_DIR, "test_data/spectrum_output/input_peprec.pkl")
)
all_preds = pd.read_pickle(
os.path.join(TEST_DIR, "test_data/spectrum_output/input_preds.pkl")
)
params = {
"ptm": [
"Oxidation,15.994915,opt,M",
"Carbamidomethyl,57.021464,opt,C",
"Glu->pyro-Glu,-18.010565,opt,E",
"Gln->pyro-Glu,-17.026549,opt,Q",
"Acetyl,42.010565,opt,N-term",
],
"sptm": [],
"gptm": [],
"model": "HCD",
"frag_error": "0.02",
"out": "csv",
}
peprec_tmp = peprec.sample(5, random_state=10).copy()
all_preds_tmp = all_preds[
all_preds["spec_id"].isin(peprec_tmp["spec_id"])
].copy()
so = SpectrumOutput(
all_preds_tmp,
peprec_tmp,
params,
output_filename="test",
return_stringbuffer=True,
)
target_filename_base = os.path.join(
TEST_DIR, "test_data/spectrum_output/target"
)
# Test general output
class TestBiblioSpec:
def test__format_modified_sequence(self):
test_cases = [
(so.write_mgf, "_predictions.mgf"),
(so.write_msp, "_predictions.msp"),
(so.write_spectronaut, "_predictions_spectronaut.csv"),
("ACDE/2", "ACDE"),
("AC[Carbamidomethyl]DE/2", "AC[+57.0]DE"),
("[Glu->pyro-Glu]-EPEPTIDEK/2", "E[-18.0]PEPTIDEK"),
("PEPTIDEK-[Amidated]/2", "PEPTIDEK[-1.0]"),
("AM[Oxidation]C[Carbamidomethyl]DE/2", "AM[+16.0]C[+57.0]DE"),
]
for test_function, file_ext in test_cases:
test = test_function()
test.seek(0)
with open(target_filename_base + file_ext) as target:
for test_line, target_line in zip(test.readlines(), target.readlines()):
compare_line(test_line, target_line)
for peptidoform_str, expected_output in test_cases:
peptidoform = Peptidoform(peptidoform_str)
assert Bibliospec._format_modified_sequence(peptidoform) == expected_output
# Test bibliospec output
bibliospec_ssl, bibliospec_ms2 = so.write_bibliospec()
class TestDLIB:
def test__format_modified_sequence(self):
test_cases = [
(bibliospec_ssl, "_predictions.ssl"),
(bibliospec_ms2, "_predictions.ms2"),
("ACDE/2", "ACDE"),
("AC[Carbamidomethyl]DE/2", "AC[+57.021464]DE"),
("[Glu->pyro-Glu]-EPEPTIDEK/2", "E[-18.010565]PEPTIDEK"),
("PEPTIDEK-[Amidated]/2", "PEPTIDEK[-0.984016]"),
("AM[Oxidation]C[Carbamidomethyl]DE/2", "AM[+15.994915]C[+57.021464]DE"),
]
for test, file_ext in test_cases:
test.seek(0)
with open(target_filename_base + file_ext) as target:
for test_line, target_line in zip(test.readlines(), target.readlines()):
test_line = test_line.replace(
"test_predictions.ms2", "target_predictions.ms2"
)
if not "CreationDate" in target_line:
compare_line(test_line, target_line)
for test_in, expected_out in test_cases:
assert DLIB._format_modified_sequence(Peptidoform(test_in)) == expected_out
"""Tests for fasta2speclib."""
from pyteomics.fasta import Protein
from fasta2speclib.fasta2speclib import Fasta2SpecLib, ModificationConfig, Peptide
MODIFICATION_CONFIG = [
{
"name": "Oxidation",
"mass_shift": 15.9994,
"amino_acid": "M",
},
{
"name": "Carbamidomethyl",
"mass_shift": 57.0513,
"amino_acid": "C",
"fixed": True,
},
{
"name": "Glu->pyro-Glu",
"mass_shift": -18.010565,
"amino_acid": "E",
"peptide_n_term": True,
},
{
"name": "Acetyl",
"mass_shift": 42.010565,
"amino_acid": None,
"protein_n_term": True,
},
]
MODIFICATION_CONFIG = [ModificationConfig(**mod) for mod in MODIFICATION_CONFIG]
def test_get_modifications_by_target():
modifications_by_target = Fasta2SpecLib._get_modifications_by_target(MODIFICATION_CONFIG)
assert modifications_by_target["sidechain"] == {"M": [None] + MODIFICATION_CONFIG[0:1]}
assert modifications_by_target["peptide_n_term"] == {"E": [None] + MODIFICATION_CONFIG[2:3]}
assert modifications_by_target["peptide_c_term"] == {}
assert modifications_by_target["protein_n_term"] == {"any": [None] + MODIFICATION_CONFIG[3:4]}
assert modifications_by_target["protein_c_term"] == {}
def test_get_modification_versions():
modification_config = [
ModificationConfig(
**{
"name": "Oxidation",
"mass_shift": 15.9994,
"amino_acid": "M",
}
),
ModificationConfig(
**{
"name": "Carbamidomethyl",
"mass_shift": 57.0513,
"amino_acid": "C",
"fixed": True,
}
),
ModificationConfig(
**{
"name": "Glu->pyro-Glu",
"mass_shift": -18.010565,
"amino_acid": "E",
"protein_n_term": True,
}
),
]
modifications_by_target = Fasta2SpecLib._get_modifications_by_target(modification_config)
test_cases = [
("ADEF", {""}), # None
("ACDE", {"2|Carbamidomethyl"}), # Single fixed
("ACCDE", {"2|Carbamidomethyl|3|Carbamidomethyl"}), # Double fixed
("ADME", {"", "3|Oxidation"}), # Single variable
(
"ADMME",
{"", "3|Oxidation", "4|Oxidation", "3|Oxidation|4|Oxidation"},
), # Double variable
(
"ADMMME",
{
"",
"3|Oxidation",
"4|Oxidation",
"5|Oxidation",
"3|Oxidation|4|Oxidation",
"4|Oxidation|5|Oxidation",
"3|Oxidation|5|Oxidation",
},
), # More than maximum simultaneous mods should be ignored
("EDEF", {"", "0|Glu->pyro-Glu"}), # N-term and AA-specific
]
for peptide, expected_output in test_cases:
output = Fasta2SpecLib._get_modification_versions(
Peptide(sequence=peptide, is_n_term=True, proteins=[]),
modification_config,
modifications_by_target,
max_variable_modifications=2,
)
assert set(output) == expected_output
def test_digest_protein():
test_input = {
"protein": Protein(
description="P12345",
sequence="MYSSCSLLQRLVWFPFLALVATQLLFIRNVSSLNLTNEYLHHKCLVSEGKYKPGSKYEYI",
),
"min_length": 8,
"max_length": 30,
"cleavage_rule": "trypsin",
"missed_cleavages": 2,
"semi_specific": False,
}
test_output = [
Peptide(
sequence="MYSSCSLLQR",
proteins=["P12345"],
modification_options=None,
is_n_term=True,
is_c_term=False,
),
Peptide(
sequence="MYSSCSLLQRLVWFPFLALVATQLLFIR",
proteins=["P12345"],
modification_options=None,
is_n_term=True,
is_c_term=False,
),
Peptide(
sequence="LVWFPFLALVATQLLFIR",
proteins=["P12345"],
modification_options=None,
is_n_term=False,
is_c_term=False,
),
Peptide(
sequence="NVSSLNLTNEYLHHK",
proteins=["P12345"],
modification_options=None,
is_n_term=False,
is_c_term=False,
),
Peptide(
sequence="NVSSLNLTNEYLHHKCLVSEGK",
proteins=["P12345"],
modification_options=None,
is_n_term=False,
is_c_term=False,
),
Peptide(
sequence="NVSSLNLTNEYLHHKCLVSEGKYKPGSK",
proteins=["P12345"],
modification_options=None,
is_n_term=False,
is_c_term=False,
),
Peptide(
sequence="CLVSEGKYKPGSK",
proteins=["P12345"],
modification_options=None,
is_n_term=False,
is_c_term=False,
),
Peptide(
sequence="CLVSEGKYKPGSKYEYI",
proteins=["P12345"],
modification_options=None,
is_n_term=False,
is_c_term=True,
),
Peptide(
sequence="YKPGSKYEYI",
proteins=["P12345"],
modification_options=None,
is_n_term=False,
is_c_term=True,
),
]
assert test_output == Fasta2SpecLib._digest_protein(**test_input)
import os
import pandas as pd
from ms2pip.ms2pipC import MS2PIP
TEST_DIR = os.path.dirname(__file__)
def run_ms2pip():
"""Run ms2pipC to predict peak intensities from a PEPREC file (HCD model). """
params = {
"ms2pip": {
"ptm": [
"Oxidation,15.994915,opt,M",
"Carbamidomethyl,57.021464,opt,C",
"Acetyl,42.010565,opt,N-term",
],
"sptm": [],
"gptm": [],
"frag_method": "HCD2019",
"frag_error": 0.02,
"out": "csv",
}
}
ms2pip = MS2PIP(os.path.join(TEST_DIR, "test_data/test.peprec"), params=params)
ms2pip.run()
test_data = pd.read_csv(
os.path.join(TEST_DIR, "test_data/test_HCD2019_predictions.csv")
)
target_data = pd.read_csv(
os.path.join(TEST_DIR, "test_data/target_HCD2019_predictions.csv")
)
pepfile = pd.read_csv(
os.path.join(TEST_DIR, "test_data/test.peprec"),
sep=" ",
index_col=False,
dtype={"spec_id": str, "modifications": str},
)
return test_data, target_data, pepfile
TEST_DATA, TARGET_DATA, PEPFILE = run_ms2pip()
class TestPredictions:
def test_all_spec(self):
assert set(TEST_DATA.spec_id.unique()) == set(PEPFILE.spec_id)
def test_amount_peaks(self):
for pep in ["peptide1", "peptide2", "peptide3"]:
peplen = len(PEPFILE[PEPFILE.spec_id == pep].peptide.values[0])
assert len(TEST_DATA[TEST_DATA.spec_id == pep]) == (2 * peplen) - 2
def test_peak_ints_b(self):
for pep in TARGET_DATA.spec_id.unique():
tmp_test = TEST_DATA[TEST_DATA.spec_id == pep]
tmp_test = tmp_test[tmp_test.ion == "b"]
tmp_target = TARGET_DATA[TARGET_DATA.spec_id == pep]
tmp_target = tmp_target[tmp_target.ion == "b"]
for no in tmp_target.ionnumber:
assert (
tmp_test[tmp_test.ionnumber == no]["prediction"].values[0]
== tmp_target[tmp_target.ionnumber == no]["prediction"].values[0]
)
def test_peak_ints_y(self):
for pep in TARGET_DATA.spec_id.unique():
tmp_test = TEST_DATA[TEST_DATA.spec_id == pep]
tmp_test = tmp_test[tmp_test.ion == "y"]
tmp_target = TARGET_DATA[TARGET_DATA.spec_id == pep]
tmp_target = tmp_target[tmp_target.ion == "y"]
for no in tmp_target.ionnumber:
assert (
tmp_test[tmp_test.ionnumber == no]["prediction"].values[0]
== tmp_target[tmp_target.ionnumber == no]["prediction"].values[0]
)
import os
import numpy as np
import pandas as pd
from ms2pip.retention_time import RetentionTime
from ms2pip.config_parser import ConfigParser
TEST_DIR = os.path.dirname(__file__)
class TestRetentionTime:
def test_prepare_deeplc_peptide_df(self):
peprec = pd.read_csv(os.path.join(TEST_DIR, "test_data/test.peprec"), sep=" ")
config = {
"deeplc": {
"calibration_file": False,
"verbose": False,
"path_model": False,
"split_cal": 25,
"batch_num": 350000,
}
}
rt_predictor = RetentionTime(config=config)
rt_predictor.peprec = peprec
rt_predictor._prepare_deeplc_peptide_df()
dlc_df = rt_predictor.deeplc_pep_df
assert dlc_df.equals(
pd.DataFrame({
"seq": {0: "ACDE", 1: "ACDEFGHI", 2: "ACDEFGHIKMNPQ"},
"modifications": {0: np.nan, 1: np.nan, 2: np.nan},
})
)