ms2pip
Advanced tools
| from pathlib import Path | ||
| from typing import Union | ||
| from ms2pip.spectrum import Spectrum | ||
| try: | ||
| import matplotlib.pyplot as plt | ||
| import spectrum_utils.plot as sup | ||
| _can_plot = True | ||
| except ImportError: | ||
| _can_plot = False | ||
| def spectrum_to_png(spectrum: Spectrum, filepath: Union[str, Path]): | ||
| """Plot a single spectrum and write to a PNG file.""" | ||
| if not _can_plot: | ||
| raise ImportError("Matplotlib and spectrum_utils are required to plot spectra.") | ||
| ax = plt.gca() | ||
| ax.set_title("MS²PIP prediction for " + str(spectrum.peptidoform)) | ||
| sup.spectrum(spectrum.to_spectrum_utils(), ax=ax) | ||
| plt.savefig(Path(filepath).with_suffix(".png")) | ||
| plt.close() |
| """ | ||
| Define and build the search space for in silico spectral library generation. | ||
| This module defines the search space for in silico spectral library generation as a | ||
| :py:class:`~ProteomeSearchSpace` object. Variable and fixed modifications can be configured | ||
| as :py:class:`~ModificationConfig` objects. | ||
| The peptide search space can be built from a protein FASTA file and a set of parameters, which can | ||
| then be converted to a :py:class:`psm_utils.PSMList` object for use in :py:mod:`ms2pip`. All | ||
| parameters are listed below at :py:class:`~ProteomeSearchSpace` and can be passed as a | ||
| dictionary, a JSON file, or as a :py:class:`~ProteomeSearchSpace` object. For example: | ||
| .. code-block:: json | ||
| { | ||
| "fasta_file": "test.fasta", | ||
| "min_length": 8, | ||
| "max_length": 3, | ||
| "cleavage_rule": "trypsin", | ||
| "missed_cleavages": 2, | ||
| "semi_specific": false, | ||
| "add_decoys": true, | ||
| "modifications": [ | ||
| { | ||
| "label": "UNIMOD:Oxidation", | ||
| "amino_acid": "M" | ||
| }, | ||
| { | ||
| "label": "UNIMOD:Carbamidomethyl", | ||
| "amino_acid": "C", | ||
| "fixed": true | ||
| } | ||
| ], | ||
| "max_variable_modifications": 3, | ||
| "charges": [2, 3] | ||
| } | ||
| For an unspecific protein digestion, the cleavage rule can be set to ``unspecific``. This will | ||
| result in a cleavage rule that allows cleavage after any amino acid with an unlimited number of | ||
| allowed missed cleavages. | ||
| To disable protein digestion when the FASTA file contains peptides, set the cleavage rule to | ||
| ``-``. This will treat each line in the FASTA file as a separate peptide sequence, but still | ||
| allow for modifications and charges to be added. | ||
| Examples | ||
| -------- | ||
| >>> from ms2pip.search_space import ProteomeSearchSpace, ModificationConfig | ||
| >>> search_space = ProteomeSearchSpace( | ||
| ... fasta_file="tests/data/test_proteins.fasta", | ||
| ... min_length=8, | ||
| ... max_length=30, | ||
| ... cleavage_rule="trypsin", | ||
| ... missed_cleavages=2, | ||
| ... semi_specific=False, | ||
| ... modifications=[ | ||
| ... ModificationConfig(label="UNIMOD:Oxidation", amino_acid="M"), | ||
| ... ModificationConfig(label="UNIMOD:Carbamidomethyl", amino_acid="C", fixed=True), | ||
| ... ], | ||
| ... charges=[2, 3], | ||
| ... ) | ||
| >>> psm_list = search_space.into_psm_list() | ||
| >>> from ms2pip.search_space import ProteomeSearchSpace | ||
| >>> search_space = ProteomeSearchSpace.from_any("tests/data/test_search_space.json") | ||
| >>> psm_list = search_space.into_psm_list() | ||
| """ | ||
| from __future__ import annotations | ||
| import multiprocessing | ||
| import multiprocessing.dummy | ||
| from collections import defaultdict | ||
| from functools import partial | ||
| from itertools import chain, combinations, product | ||
| from logging import getLogger | ||
| from pathlib import Path | ||
| from typing import Any, Dict, Generator, List, Optional, Union | ||
| import numpy as np | ||
| import pyteomics.fasta | ||
| from psm_utils import PSM, PSMList | ||
| from pydantic import BaseModel, field_validator, model_validator | ||
| from pyteomics.parser import icleave | ||
| from rich.progress import track | ||
| logger = getLogger(__name__) | ||
| class ModificationConfig(BaseModel): | ||
| """Configuration for a single modification in the search space.""" | ||
| label: str | ||
| amino_acid: Optional[str] = None | ||
| peptide_n_term: Optional[bool] = False | ||
| protein_n_term: Optional[bool] = False | ||
| peptide_c_term: Optional[bool] = False | ||
| protein_c_term: Optional[bool] = False | ||
| fixed: Optional[bool] = False | ||
| def __init__(self, **data: Any): | ||
| """ | ||
| Configuration for a single modification in the search space. | ||
| Parameters | ||
| ---------- | ||
| label | ||
| Label of the modification. This can be any valid ProForma 2.0 label. | ||
| amino_acid | ||
| Amino acid target of the modification. :py:obj:`None` if the modification is not | ||
| specific to an amino acid. Default is None. | ||
| peptide_n_term | ||
| Whether the modification occurs only on the peptide N-terminus. Default is False. | ||
| protein_n_term | ||
| Whether the modification occurs only on the protein N-terminus. Default is False. | ||
| peptide_c_term | ||
| Whether the modification occurs only on the peptide C-terminus. Default is False. | ||
| protein_c_term | ||
| Whether the modification occurs only on the protein C-terminus. Default is False. | ||
| fixed | ||
| Whether the modification is fixed. Default is False. | ||
| """ | ||
| super().__init__(**data) | ||
| @model_validator(mode="after") | ||
| def _modification_must_have_target(self): | ||
| target_fields = [ | ||
| "amino_acid", | ||
| "peptide_n_term", | ||
| "protein_n_term", | ||
| "peptide_c_term", | ||
| "protein_c_term", | ||
| ] | ||
| if not any(getattr(self, t) for t in target_fields): | ||
| raise ValueError("Modifications must have a target (amino acid or N/C-term).") | ||
| return self | ||
| DEFAULT_MODIFICATIONS = [ | ||
| ModificationConfig( | ||
| label="UNIMOD:Oxidation", | ||
| amino_acid="M", | ||
| ), | ||
| ModificationConfig( | ||
| label="UNIMOD:Carbamidomethyl", | ||
| amino_acid="C", | ||
| fixed=True, | ||
| ), | ||
| ] | ||
| class ProteomeSearchSpace(BaseModel): | ||
| """Search space for in silico spectral library generation.""" | ||
| fasta_file: Path | ||
| min_length: int = 8 | ||
| max_length: int = 30 | ||
| min_precursor_mz: Optional[float] = 0 | ||
| max_precursor_mz: Optional[float] = np.Inf | ||
| cleavage_rule: str = "trypsin" | ||
| missed_cleavages: int = 2 | ||
| semi_specific: bool = False | ||
| add_decoys: bool = False | ||
| modifications: List[ModificationConfig] = DEFAULT_MODIFICATIONS | ||
| max_variable_modifications: int = 3 | ||
| charges: List[int] = [2, 3] | ||
| def __init__(self, **data: Any): | ||
| """ | ||
| Search space for in silico spectral library generation. | ||
| Parameters | ||
| ---------- | ||
| fasta_file | ||
| Path to FASTA file with protein sequences. | ||
| min_length | ||
| Minimum peptide length. Default is 8. | ||
| max_length | ||
| Maximum peptide length. Default is 30. | ||
| min_precursor_mz | ||
| Minimum precursor m/z for peptides. Default is 0. | ||
| max_precursor_mz | ||
| Maximum precursor m/z for peptides. Default is np.Inf. | ||
| cleavage_rule | ||
| Cleavage rule for peptide digestion. Default is "trypsin". | ||
| missed_cleavages | ||
| Maximum number of missed cleavages. Default is 2. | ||
| semi_specific | ||
| Allow semi-specific cleavage. Default is False. | ||
| add_decoys | ||
| Add decoy sequences to search space. Default is False. | ||
| modifications | ||
| List of modifications to consider. Default is oxidation of M and carbamidomethylation | ||
| of C. | ||
| max_variable_modifications | ||
| Maximum number of variable modifications per peptide. Default is 3. | ||
| charges | ||
| List of charges to consider. Default is [2, 3]. | ||
| """ | ||
| super().__init__(**data) | ||
| self._peptidoform_spaces: List[_PeptidoformSearchSpace] = [] | ||
| @field_validator("modifications") | ||
| @classmethod | ||
| def _validate_modifications(cls, v): | ||
| if all(isinstance(m, ModificationConfig) for m in v): | ||
| return v | ||
| elif all(isinstance(m, dict) for m in v): | ||
| return [ModificationConfig(**modification) for modification in v] | ||
| else: | ||
| raise ValueError( | ||
| "Modifications should be a list of dicts or ModificationConfig objects." | ||
| ) | ||
| @model_validator(mode="after") | ||
| def _validate_unspecific_cleavage(self): | ||
| """Validate and configure unspecific cleavage settings.""" | ||
| # `unspecific` is not an option in pyteomics.parser.icleave, so we configure | ||
| # the settings for unspecific cleavage manually. | ||
| if self.cleavage_rule == "unspecific": | ||
| self.missed_cleavages = self.max_length | ||
| self.cleavage_rule = r"(?<=[A-Z])" | ||
| return self | ||
| def __len__(self): | ||
| if not self._peptidoform_spaces: | ||
| raise ValueError("Search space must be built before length can be determined.") | ||
| return sum(len(pep_space) for pep_space in self._peptidoform_spaces) | ||
| @classmethod | ||
| def from_any(cls, _input: Union[dict, str, Path, ProteomeSearchSpace]) -> ProteomeSearchSpace: | ||
| """ | ||
| Create ProteomeSearchSpace from various input types. | ||
| Parameters | ||
| ---------- | ||
| _input | ||
| Search space parameters as a dictionary, a path to a JSON file, an existing | ||
| :py:class:`ProteomeSearchSpace` object. | ||
| """ | ||
| if isinstance(_input, ProteomeSearchSpace): | ||
| return _input | ||
| elif isinstance(_input, (str, Path)): | ||
| with open(_input, "rt") as f: | ||
| return cls.model_validate_json(f.read()) | ||
| elif isinstance(_input, dict): | ||
| return cls.model_validate(_input) | ||
| else: | ||
| raise ValueError("Search space must be a dict, str, Path, or ProteomeSearchSpace.") | ||
| def build(self, processes: int = 1): | ||
| """ | ||
| Build peptide search space from FASTA file. | ||
| Parameters | ||
| ---------- | ||
| processes : int | ||
| Number of processes to use for parallelization. | ||
| """ | ||
| processes = processes if processes else multiprocessing.cpu_count() | ||
| self._digest_fasta(processes) | ||
| self._remove_redundancy() | ||
| self._add_modifications(processes) | ||
| self._add_charges() | ||
| def __iter__(self) -> Generator[PSM, None, None]: | ||
| """ | ||
| Generate PSMs from search space. | ||
| If :py:meth:`build` has not been called, the search space will first be built with the | ||
| given parameters. | ||
| Parameters | ||
| ---------- | ||
| processes : int | ||
| Number of processes to use for parallelization. | ||
| """ | ||
| # Build search space if not already built | ||
| if not self._peptidoform_spaces: | ||
| raise ValueError("Search space must be built before PSMs can be generated.") | ||
| spectrum_id = 0 | ||
| for pep_space in self._peptidoform_spaces: | ||
| for pep in pep_space: | ||
| yield PSM( | ||
| peptidoform=pep, | ||
| spectrum_id=spectrum_id, | ||
| protein_list=pep_space.proteins, | ||
| ) | ||
| spectrum_id += 1 | ||
| def filter_psms_by_mz(self, psms: PSMList) -> PSMList: | ||
| """Filter PSMs by precursor m/z range.""" | ||
| return PSMList( | ||
| psm_list=[ | ||
| psm | ||
| for psm in psms | ||
| if self.min_precursor_mz <= psm.peptidoform.theoretical_mz <= self.max_precursor_mz | ||
| ] | ||
| ) | ||
| def _digest_fasta(self, processes: int = 1): | ||
| """Digest FASTA file to peptides and populate search space.""" | ||
| # Convert to string to avoid issues with Path objects | ||
| self.fasta_file = str(self.fasta_file) | ||
| n_proteins = _count_fasta_entries(self.fasta_file) | ||
| if self.add_decoys: | ||
| fasta_db = pyteomics.fasta.decoy_db( | ||
| self.fasta_file, | ||
| mode="reverse", | ||
| decoy_only=False, | ||
| keep_nterm=True, | ||
| ) | ||
| n_proteins *= 2 | ||
| else: | ||
| fasta_db = pyteomics.fasta.FASTA(self.fasta_file) | ||
| # Read proteins and digest to peptides | ||
| with _get_pool(processes) as pool: | ||
| partial_digest_protein = partial( | ||
| _digest_single_protein, | ||
| min_length=self.min_length, | ||
| max_length=self.max_length, | ||
| cleavage_rule=self.cleavage_rule, | ||
| missed_cleavages=self.missed_cleavages, | ||
| semi_specific=self.semi_specific, | ||
| ) | ||
| results = track( | ||
| pool.imap(partial_digest_protein, fasta_db), | ||
| total=n_proteins, | ||
| description="Digesting proteins...", | ||
| transient=True, | ||
| ) | ||
| self._peptidoform_spaces = list(chain.from_iterable(results)) | ||
| def _remove_redundancy(self): | ||
| """Remove redundancy in peptides and combine protein lists.""" | ||
| peptide_dict = dict() | ||
| for peptide in track( | ||
| self._peptidoform_spaces, | ||
| description="Removing peptide redundancy...", | ||
| transient=True, | ||
| ): | ||
| if peptide.sequence in peptide_dict: | ||
| peptide_dict[peptide.sequence].proteins.extend(peptide.proteins) | ||
| else: | ||
| peptide_dict[peptide.sequence] = peptide | ||
| # Overwrite with non-redundant peptides | ||
| self._peptidoform_spaces = list(peptide_dict.values()) | ||
| def _add_modifications(self, processes: int = 1): | ||
| """Add modifications to peptides in search space.""" | ||
| modifications_by_target = _restructure_modifications_by_target(self.modifications) | ||
| modification_options = [] | ||
| with _get_pool(processes) as pool: | ||
| partial_get_modification_versions = partial( | ||
| _get_peptidoform_modification_versions, | ||
| modifications=self.modifications, | ||
| modifications_by_target=modifications_by_target, | ||
| max_variable_modifications=self.max_variable_modifications, | ||
| ) | ||
| modification_options = pool.imap( | ||
| partial_get_modification_versions, self._peptidoform_spaces | ||
| ) | ||
| for pep, mod_opt in track( | ||
| zip(self._peptidoform_spaces, modification_options), | ||
| description="Adding modifications...", | ||
| total=len(self._peptidoform_spaces), | ||
| transient=True, | ||
| ): | ||
| pep.modification_options = mod_opt | ||
| def _add_charges(self): | ||
| """Add charge permutations to peptides in search space.""" | ||
| for peptide in track( | ||
| self._peptidoform_spaces, | ||
| description="Adding charge permutations...", | ||
| transient=True, | ||
| ): | ||
| peptide.charge_options = self.charges | ||
| class _PeptidoformSearchSpace(BaseModel): | ||
| """Search space for a given amino acid sequence.""" | ||
| sequence: str | ||
| proteins: List[str] | ||
| is_n_term: Optional[bool] = None | ||
| is_c_term: Optional[bool] = None | ||
| modification_options: List[Dict[int, ModificationConfig]] = [] | ||
| charge_options: List[int] = [] | ||
| def __init__(self, **data: Any): | ||
| """ | ||
| Search space for a given amino acid sequence. | ||
| Parameters | ||
| ---------- | ||
| sequence | ||
| Amino acid sequence of the peptidoform. | ||
| proteins | ||
| List of protein IDs containing the peptidoform. | ||
| is_n_term | ||
| Whether the peptidoform is an N-terminal peptide. Default is None. | ||
| is_c_term | ||
| Whether the peptidoform is a C-terminal peptide. Default is None. | ||
| modification_options | ||
| List of dictionaries with modification positions and configurations. Default is []. | ||
| charge_options | ||
| List of charge states to consider. Default is []. | ||
| """ | ||
| super().__init__(**data) | ||
| def __len__(self): | ||
| return len(self.modification_options) * len(self.charge_options) | ||
| def __iter__(self) -> Generator[str, None, None]: | ||
| """Yield peptidoform strings with given charges and modifications.""" | ||
| if not self.charge_options: | ||
| raise ValueError("Peptide charge options not defined.") | ||
| if not self.modification_options: | ||
| raise ValueError("Peptide modification options not defined.") | ||
| for modifications, charge in product(self.modification_options, self.charge_options): | ||
| yield self._construct_peptidoform_string(self.sequence, modifications, charge) | ||
| @staticmethod | ||
| def _construct_peptidoform_string( | ||
| sequence: str, modifications: Dict[int, ModificationConfig], charge: int | ||
| ) -> str: | ||
| if not modifications: | ||
| return f"{sequence}/{charge}" | ||
| modded_sequence = list(sequence) | ||
| for position, mod in modifications.items(): | ||
| if isinstance(position, int): | ||
| aa = modded_sequence[position] | ||
| if aa != mod.amino_acid: | ||
| raise ValueError( | ||
| f"Modification {mod.label} at position {position} does not match amino " | ||
| f"acid {aa}." | ||
| ) | ||
| modded_sequence[position] = f"{aa}[{mod.label}]" | ||
| elif position == "N": | ||
| modded_sequence.insert(0, f"[{mod.label}]-") | ||
| elif position == "C": | ||
| modded_sequence.append(f"-[{mod.label}]") | ||
| else: | ||
| raise ValueError(f"Invalid position {position} for modification {mod.label}.") | ||
| return f"{''.join(modded_sequence)}/{charge}" | ||
| def _digest_single_protein( | ||
| protein: pyteomics.fasta.Protein, | ||
| min_length: int = 8, | ||
| max_length: int = 30, | ||
| cleavage_rule: str = "trypsin", | ||
| missed_cleavages: int = 2, | ||
| semi_specific: bool = False, | ||
| ) -> List[_PeptidoformSearchSpace]: | ||
| """Digest protein sequence and return a list of validated peptides.""" | ||
| def valid_residues(sequence: str) -> bool: | ||
| return not any(aa in sequence for aa in ["B", "J", "O", "U", "X", "Z"]) | ||
| def parse_peptide( | ||
| start_position: int, | ||
| sequence: str, | ||
| protein: pyteomics.fasta.Protein, | ||
| ) -> _PeptidoformSearchSpace: | ||
| """Parse result from parser.icleave into Peptide.""" | ||
| return _PeptidoformSearchSpace( | ||
| sequence=sequence, | ||
| # Assumes protein ID is description until first space | ||
| proteins=[protein.description.split(" ")[0]], | ||
| is_n_term=start_position == 0, | ||
| is_c_term=start_position + len(sequence) == len(protein.sequence), | ||
| ) | ||
| peptides = [ | ||
| parse_peptide(start, seq, protein) | ||
| for start, seq in icleave( | ||
| protein.sequence, | ||
| cleavage_rule, | ||
| missed_cleavages=missed_cleavages, | ||
| min_length=min_length, | ||
| max_length=max_length, | ||
| semi=semi_specific, | ||
| ) | ||
| if valid_residues(seq) | ||
| ] | ||
| return peptides | ||
| def _count_fasta_entries(filename: Path) -> int: | ||
| """Count the number of entries in a FASTA file.""" | ||
| with open(filename, "rt") as f: | ||
| count = 0 | ||
| for line in f: | ||
| if line[0] == ">": | ||
| count += 1 | ||
| return count | ||
| def _restructure_modifications_by_target( | ||
| modifications: List[ModificationConfig], | ||
| ) -> Dict[str, Dict[str, List[ModificationConfig]]]: | ||
| """Restructure variable modifications to options per side chain or terminus.""" | ||
| modifications_by_target = { | ||
| "sidechain": defaultdict(lambda: []), | ||
| "peptide_n_term": defaultdict(lambda: []), | ||
| "peptide_c_term": defaultdict(lambda: []), | ||
| "protein_n_term": defaultdict(lambda: []), | ||
| "protein_c_term": defaultdict(lambda: []), | ||
| } | ||
| def add_mod(mod, target, amino_acid): | ||
| if amino_acid: | ||
| modifications_by_target[target][amino_acid].append(mod) | ||
| else: | ||
| modifications_by_target[target]["any"].append(mod) | ||
| for mod in modifications: | ||
| if mod.fixed: | ||
| continue | ||
| if mod.peptide_n_term: | ||
| add_mod(mod, "peptide_n_term", mod.amino_acid) | ||
| elif mod.peptide_c_term: | ||
| add_mod(mod, "peptide_c_term", mod.amino_acid) | ||
| elif mod.protein_n_term: | ||
| add_mod(mod, "protein_n_term", mod.amino_acid) | ||
| elif mod.protein_c_term: | ||
| add_mod(mod, "protein_c_term", mod.amino_acid) | ||
| else: | ||
| add_mod(mod, "sidechain", mod.amino_acid) | ||
| return {k: dict(v) for k, v in modifications_by_target.items()} | ||
| def _get_modification_possibilities_by_site( | ||
| peptide: _PeptidoformSearchSpace, | ||
| modifications_by_target: Dict[str, Dict[str, List[ModificationConfig]]], | ||
| modifications: List[ModificationConfig], | ||
| ) -> Dict[Union[str, int], List[ModificationConfig]]: | ||
| """Get all possible modifications for each site in a peptide sequence.""" | ||
| possibilities_by_site = defaultdict(list) | ||
| # Generate dictionary of positions per amino acid | ||
| position_dict = defaultdict(list) | ||
| for pos, aa in enumerate(peptide.sequence): | ||
| position_dict[aa].append(pos) | ||
| # Map modifications to positions | ||
| for aa in set(position_dict).intersection(set(modifications_by_target["sidechain"])): | ||
| possibilities_by_site.update( | ||
| {pos: modifications_by_target["sidechain"][aa] for pos in position_dict[aa]} | ||
| ) | ||
| # Assign possible modifications per terminus | ||
| for terminus, position, site_name, specificity in [ | ||
| ("peptide_n_term", 0, "N", None), | ||
| ("peptide_c_term", -1, "C", None), | ||
| ("protein_n_term", 0, "N", "is_n_term"), | ||
| ("protein_c_term", -1, "C", "is_c_term"), | ||
| ]: | ||
| if specificity is None or getattr(peptide, specificity): | ||
| for site, mods in modifications_by_target[terminus].items(): | ||
| if site == "any" or peptide.sequence[position] == site: | ||
| possibilities_by_site[site_name].extend(mods) | ||
| # Override with fixed modifications | ||
| for mod in modifications: | ||
| aa = mod.amino_acid | ||
| # Skip variable modifications | ||
| if not mod.fixed: | ||
| continue | ||
| # Assign if specific aa matches or if no aa is specified for each terminus | ||
| for terminus, position, site_name, specificity in [ | ||
| ("peptide_n_term", 0, "N", None), | ||
| ("peptide_c_term", -1, "C", None), | ||
| ("protein_n_term", 0, "N", "is_n_term"), | ||
| ("protein_c_term", -1, "C", "is_c_term"), | ||
| ]: | ||
| if getattr(mod, terminus): # Mod has this terminus | ||
| if specificity is None or getattr(peptide, specificity): # Specificity matches | ||
| if not aa or (aa and peptide.sequence[position] == aa): # AA matches | ||
| possibilities_by_site[site_name] = [mod] # Override with fixed mod | ||
| break # Allow `else: if amino_acid` if no terminus matches | ||
| # Assign if fixed modification is not terminal and specific aa matches | ||
| else: | ||
| if aa: | ||
| for pos in position_dict[aa]: | ||
| possibilities_by_site[pos] = [mod] | ||
| return possibilities_by_site | ||
| def _get_peptidoform_modification_versions( | ||
| peptide: _PeptidoformSearchSpace, | ||
| modifications: List[ModificationConfig], | ||
| modifications_by_target: Dict[str, Dict[str, List[ModificationConfig]]], | ||
| max_variable_modifications: int = 3, | ||
| ) -> List[Dict[Union[str, int], List[ModificationConfig]]]: | ||
| """ | ||
| Get all potential combinations of modifications for a peptide sequence. | ||
| Examples | ||
| -------- | ||
| >>> peptide = PeptidoformSpace(sequence="PEPTIDE", proteins=["PROTEIN"]) | ||
| >>> phospho = ModificationConfig(label="Phospho", amino_acid="T", fixed=False) | ||
| >>> acetyl = ModificationConfig(label="Acetyl", peptide_n_term=True, fixed=False) | ||
| >>> modifications = [phospho, acetyl] | ||
| >>> modifications_by_target = { | ||
| ... "sidechain": {"S": [modifications[0]]}, | ||
| ... "peptide_n_term": {"any": [modifications[1]]}, | ||
| ... "peptide_c_term": {"any": []}, | ||
| ... "protein_n_term": {"any": []}, | ||
| ... "protein_c_term": {"any": []}, | ||
| ... } | ||
| >>> _get_modification_versions(peptide, modifications, modifications_by_target) | ||
| [{}, {3: phospho}, {0: acetyl}, {0: acetyl, 3: phospho}] | ||
| """ | ||
| # Get all possible modifications for each site in the peptide sequence | ||
| possibilities_by_site = _get_modification_possibilities_by_site( | ||
| peptide, modifications_by_target, modifications | ||
| ) | ||
| # Separate fixed and variable modification sites | ||
| fixed_modifications = {} | ||
| variable_sites = [] | ||
| for site, mods in possibilities_by_site.items(): | ||
| for mod in mods: | ||
| if mod.fixed: | ||
| fixed_modifications[site] = mod | ||
| else: | ||
| variable_sites.append((site, mod)) | ||
| # Generate all combinations of variable modifications up to the maximum allowed | ||
| modification_versions = [] | ||
| for i in range(max_variable_modifications + 1): | ||
| for comb in combinations(variable_sites, i): | ||
| combination_dict = fixed_modifications.copy() | ||
| for site, mod in comb: | ||
| combination_dict[site] = mod | ||
| modification_versions.append(combination_dict) | ||
| return modification_versions | ||
| def _get_pool(processes: int) -> Union[multiprocessing.Pool, multiprocessing.dummy.Pool]: | ||
| """Get a multiprocessing pool with the given number of processes.""" | ||
| # TODO: fix None default value for processes | ||
| if processes > 1: | ||
| return multiprocessing.Pool(processes) | ||
| else: | ||
| return multiprocessing.dummy.Pool(processes) |
| from ms2pip import search_space | ||
| OXIDATION = search_space.ModificationConfig( | ||
| label="Oxidation", | ||
| amino_acid="M", | ||
| ) | ||
| CARBAMIDOMETHYL = search_space.ModificationConfig( | ||
| label="Carbamidomethyl", | ||
| amino_acid="C", | ||
| fixed=True, | ||
| ) | ||
| PYROGLU = search_space.ModificationConfig( | ||
| label="Glu->pyro-Glu", | ||
| amino_acid="E", | ||
| peptide_n_term=True, | ||
| ) | ||
| ACETYL = search_space.ModificationConfig( | ||
| label="Acetyl", | ||
| amino_acid=None, | ||
| protein_n_term=True, | ||
| ) | ||
| PHOSPHO = search_space.ModificationConfig( | ||
| label="Phospho", | ||
| amino_acid="T", | ||
| fixed=False, | ||
| ) | ||
| MODIFICATION_CONFIG = [OXIDATION, CARBAMIDOMETHYL, PYROGLU, ACETYL] | ||
| def test_restructure_modifications_by_target(): | ||
| test_cases = [ | ||
| { | ||
| "modifications": [PHOSPHO, ACETYL], | ||
| "expected": { | ||
| "sidechain": {"T": [PHOSPHO]}, | ||
| "peptide_n_term": {}, | ||
| "peptide_c_term": {}, | ||
| "protein_n_term": {"any": [ACETYL]}, | ||
| "protein_c_term": {}, | ||
| }, | ||
| }, | ||
| { | ||
| "modifications": [CARBAMIDOMETHYL, ACETYL], | ||
| "expected": { | ||
| "sidechain": {}, | ||
| "peptide_n_term": {}, | ||
| "peptide_c_term": {}, | ||
| "protein_n_term": {"any": [ACETYL]}, | ||
| "protein_c_term": {}, | ||
| }, | ||
| }, | ||
| ] | ||
| for case in test_cases: | ||
| test_out = search_space._restructure_modifications_by_target(case["modifications"]) | ||
| assert test_out == case["expected"] | ||
| def test_get_peptidoform_modification_versions(): | ||
| test_cases = [ | ||
| # None | ||
| { | ||
| "sequence": "PEPTIDE", | ||
| "modifications": [], | ||
| "expected": [{}], | ||
| }, | ||
| # Single fixed | ||
| { | ||
| "sequence": "ACDE", | ||
| "modifications": [CARBAMIDOMETHYL], | ||
| "expected": [{1: CARBAMIDOMETHYL}], | ||
| }, | ||
| # Double fixed | ||
| { | ||
| "sequence": "ACCDE", | ||
| "modifications": [CARBAMIDOMETHYL], | ||
| "expected": [{1: CARBAMIDOMETHYL, 2: CARBAMIDOMETHYL}], | ||
| }, | ||
| # Single variable | ||
| { | ||
| "sequence": "ADME", | ||
| "modifications": [OXIDATION], | ||
| "expected": [{}, {2: OXIDATION}], | ||
| }, | ||
| # Double variable | ||
| { | ||
| "sequence": "ADMME", | ||
| "modifications": [OXIDATION], | ||
| "expected": [{}, {2: OXIDATION}, {3: OXIDATION}, {2: OXIDATION, 3: OXIDATION}], | ||
| }, | ||
| # More than maximum simultaneous mods should be ignored | ||
| { | ||
| "sequence": "ADMMME", | ||
| "modifications": [OXIDATION], | ||
| "expected": [ | ||
| {}, | ||
| {2: OXIDATION}, | ||
| {3: OXIDATION}, | ||
| {4: OXIDATION}, | ||
| {2: OXIDATION, 3: OXIDATION}, | ||
| {2: OXIDATION, 4: OXIDATION}, | ||
| {3: OXIDATION, 4: OXIDATION}, | ||
| ], | ||
| }, | ||
| # N-term and AA-specific | ||
| { | ||
| "sequence": "EDEF", | ||
| "modifications": [PYROGLU], | ||
| "expected": [{}, {"N": PYROGLU}], | ||
| }, | ||
| { | ||
| "sequence": "PEPTIDE", | ||
| "modifications": [PHOSPHO, ACETYL], | ||
| "expected": [{}, {3: PHOSPHO}, {"N": ACETYL}, {"N": ACETYL, 3: PHOSPHO}], | ||
| }, | ||
| { | ||
| "sequence": "ACDEK", | ||
| "modifications": [CARBAMIDOMETHYL, ACETYL], | ||
| "expected": [ | ||
| {1: CARBAMIDOMETHYL}, | ||
| {1: CARBAMIDOMETHYL, "N": ACETYL}, | ||
| ], | ||
| }, | ||
| ] | ||
| for case in test_cases: | ||
| peptide = search_space._PeptidoformSearchSpace( | ||
| sequence=case["sequence"], proteins=[], is_n_term=True | ||
| ) | ||
| modifications_by_target = search_space._restructure_modifications_by_target( | ||
| case["modifications"] | ||
| ) | ||
| test_out = search_space._get_peptidoform_modification_versions( | ||
| peptide, | ||
| case["modifications"], | ||
| modifications_by_target, | ||
| max_variable_modifications=2, | ||
| ) | ||
| assert test_out == case["expected"] | ||
| def test_get_modifications_by_target(): | ||
| modifications_by_target = search_space._restructure_modifications_by_target( | ||
| MODIFICATION_CONFIG | ||
| ) | ||
| assert modifications_by_target["sidechain"] == {"M": MODIFICATION_CONFIG[0:1]} | ||
| assert modifications_by_target["peptide_n_term"] == {"E": MODIFICATION_CONFIG[2:3]} | ||
| assert modifications_by_target["peptide_c_term"] == {} | ||
| assert modifications_by_target["protein_n_term"] == {"any": MODIFICATION_CONFIG[3:4]} | ||
| assert modifications_by_target["protein_c_term"] == {} | ||
| class TestProteomeSearchSpace: | ||
| def test_digest_fasta(self): | ||
| test_input = { | ||
| "fasta_file": "tests/test_data/test.fasta", | ||
| "min_length": 8, | ||
| "max_length": 30, | ||
| "cleavage_rule": "trypsin", | ||
| "missed_cleavages": 2, | ||
| "semi_specific": False, | ||
| } | ||
| test_output = [ | ||
| search_space._PeptidoformSearchSpace( | ||
| sequence="MYSSCSLLQR", | ||
| proteins=["P12345"], | ||
| modification_options=[], | ||
| is_n_term=True, | ||
| is_c_term=False, | ||
| ), | ||
| search_space._PeptidoformSearchSpace( | ||
| sequence="MYSSCSLLQRLVWFPFLALVATQLLFIR", | ||
| proteins=["P12345"], | ||
| modification_options=[], | ||
| is_n_term=True, | ||
| is_c_term=False, | ||
| ), | ||
| search_space._PeptidoformSearchSpace( | ||
| sequence="LVWFPFLALVATQLLFIR", | ||
| proteins=["P12345"], | ||
| modification_options=[], | ||
| is_n_term=False, | ||
| is_c_term=False, | ||
| ), | ||
| search_space._PeptidoformSearchSpace( | ||
| sequence="NVSSLNLTNEYLHHK", | ||
| proteins=["P12345"], | ||
| modification_options=[], | ||
| is_n_term=False, | ||
| is_c_term=False, | ||
| ), | ||
| search_space._PeptidoformSearchSpace( | ||
| sequence="NVSSLNLTNEYLHHKCLVSEGK", | ||
| proteins=["P12345"], | ||
| modification_options=[], | ||
| is_n_term=False, | ||
| is_c_term=False, | ||
| ), | ||
| search_space._PeptidoformSearchSpace( | ||
| sequence="NVSSLNLTNEYLHHKCLVSEGKYKPGSK", | ||
| proteins=["P12345"], | ||
| modification_options=[], | ||
| is_n_term=False, | ||
| is_c_term=False, | ||
| ), | ||
| search_space._PeptidoformSearchSpace( | ||
| sequence="CLVSEGKYKPGSK", | ||
| proteins=["P12345"], | ||
| modification_options=[], | ||
| is_n_term=False, | ||
| is_c_term=False, | ||
| ), | ||
| search_space._PeptidoformSearchSpace( | ||
| sequence="CLVSEGKYKPGSKYEYI", | ||
| proteins=["P12345"], | ||
| modification_options=[], | ||
| is_n_term=False, | ||
| is_c_term=True, | ||
| ), | ||
| search_space._PeptidoformSearchSpace( | ||
| sequence="YKPGSKYEYI", | ||
| proteins=["P12345"], | ||
| modification_options=[], | ||
| is_n_term=False, | ||
| is_c_term=True, | ||
| ), | ||
| ] | ||
| sp = search_space.ProteomeSearchSpace(**test_input) | ||
| sp._digest_fasta() | ||
| assert test_output == sp._peptidoform_spaces |
| Metadata-Version: 2.1 | ||
| Name: ms2pip | ||
| Version: 4.0.0.dev12 | ||
| Version: 4.0.0.dev13 | ||
| Summary: MS2PIP: Accurate and versatile peptide fragmentation spectrum prediction. | ||
@@ -319,2 +319,3 @@ Author: Ana Sílvia C. Silva | ||
| training. | ||
| - ``annotate-spectra``: Annotate peaks in observed spectra. | ||
@@ -321,0 +322,0 @@ MS²PIP supports a wide range of PSM input formats and spectrum output formats, and includes |
@@ -12,3 +12,5 @@ LICENSE | ||
| ms2pip/exceptions.py | ||
| ms2pip/plot.py | ||
| ms2pip/result.py | ||
| ms2pip/search_space.py | ||
| ms2pip/spectrum.py | ||
@@ -45,5 +47,3 @@ ms2pip/spectrum_input.py | ||
| tests/test_encoder.py | ||
| tests/test_fasta2speclib.py | ||
| tests/test_predictions.py | ||
| tests/test_retention_time.py | ||
| tests/test_search_space.py | ||
| tests/test_spectrum_output.py |
| # isort: skip_file | ||
| """MS2PIP: Accurate and versatile peptide fragmentation spectrum prediction.""" | ||
| __version__ = "4.0.0-dev12" | ||
| __version__ = "4.0.0-dev13" | ||
@@ -6,0 +6,0 @@ from warnings import filterwarnings |
+52
-43
@@ -13,13 +13,8 @@ import logging | ||
| import ms2pip.core | ||
| from ms2pip import __version__ | ||
| from ms2pip import __version__, exceptions | ||
| from ms2pip._utils.cli import build_credits, build_prediction_table | ||
| from ms2pip.constants import MODELS, SUPPORTED_OUTPUT_FORMATS | ||
| from ms2pip.exceptions import ( | ||
| InvalidXGBoostModelError, | ||
| UnknownModelError, | ||
| UnknownOutputFormatError, | ||
| UnresolvableModificationError, | ||
| ) | ||
| from ms2pip.result import correlations_to_csv, results_to_csv | ||
| from ms2pip.spectrum_output import write_single_spectrum_csv, write_single_spectrum_png | ||
| from ms2pip.constants import MODELS | ||
| from ms2pip.plot import spectrum_to_png | ||
| from ms2pip.result import write_correlations | ||
| from ms2pip.spectrum_output import SUPPORTED_FORMATS, write_spectra | ||
@@ -48,3 +43,4 @@ console = Console() | ||
| else: | ||
| return Path(input_filename).with_suffix("") | ||
| input__filename = Path(input_filename) | ||
| return input__filename.with_name(input__filename.stem + "_predictions").with_suffix("") | ||
@@ -70,2 +66,3 @@ | ||
| @click.option("--output-name", "-o", type=str) | ||
| @click.option("--output-format", "-f", type=click.Choice(SUPPORTED_FORMATS), default="tsv") | ||
| @click.option("--model", type=click.Choice(MODELS), default="HCD") | ||
@@ -77,5 +74,6 @@ @click.option("--model-dir") | ||
| output_name = kwargs.pop("output_name") | ||
| output_format = kwargs.pop("output_format") | ||
| plot = kwargs.pop("plot") | ||
| if not output_name: | ||
| output_name = "ms2pip_prediction_" + secure_filename(kwargs["peptidoform"]) + ".csv" | ||
| output_name = "ms2pip_prediction_" + secure_filename(kwargs["peptidoform"]) | ||
@@ -88,5 +86,5 @@ # Predict spectrum | ||
| console.print(build_prediction_table(predicted_spectrum)) | ||
| write_single_spectrum_csv(predicted_spectrum, output_name) | ||
| write_spectra(output_name, [result], output_format) | ||
| if plot: | ||
| write_single_spectrum_png(predicted_spectrum, output_name) | ||
| spectrum_to_png(predicted_spectrum, output_name) | ||
@@ -97,3 +95,3 @@ | ||
| @click.option("--output-name", "-o", type=str) | ||
| @click.option("--output-format", "-f", type=click.Choice(SUPPORTED_OUTPUT_FORMATS)) | ||
| @click.option("--output-format", "-f", type=click.Choice(SUPPORTED_FORMATS), default="tsv") | ||
| @click.option("--add-retention-time", "-r", is_flag=True) | ||
@@ -105,5 +103,4 @@ @click.option("--model", type=click.Choice(MODELS), default="HCD") | ||
| # Parse arguments | ||
| output_name = kwargs.pop("output_name") | ||
| output_format = kwargs.pop("output_format") | ||
| output_name = _infer_output_name(kwargs["psms"], output_name) | ||
| output_name = _infer_output_name(kwargs["psms"], kwargs.pop("output_name")) | ||
@@ -114,13 +111,29 @@ # Run | ||
| # Write output | ||
| output_name_csv = output_name.with_name(output_name.stem + "_predictions").with_suffix(".csv") | ||
| logger.info(f"Writing output to {output_name_csv}") | ||
| results_to_csv(predictions, output_name_csv) | ||
| # TODO: add support for other output formats | ||
| write_spectra(output_name, predictions, output_format) | ||
| @cli.command(help=ms2pip.core.predict_library.__doc__) | ||
| @click.argument("fasta-file", required=False, type=click.Path(exists=True, dir_okay=False)) | ||
| @click.option("--config", "-c", type=click.Path(exists=True, dir_okay=False)) | ||
| @click.option("--output-name", "-o", type=str) | ||
| @click.option("--output-format", "-f", type=click.Choice(SUPPORTED_FORMATS), default="msp") | ||
| @click.option("--add-retention-time", "-r", is_flag=True) | ||
| @click.option("--model", type=click.Choice(MODELS), default="HCD") | ||
| @click.option("--model-dir") | ||
| @click.option("--batch-size", type=int, default=100000) | ||
| @click.option("--processes", "-n", type=int) | ||
| def predict_library(*args, **kwargs): | ||
| ms2pip.core.predict_library(*args, **kwargs) | ||
| # Parse arguments | ||
| if not kwargs["fasta_file"] and not kwargs["config"]: | ||
| raise click.UsageError("Either `fasta_file` or `config` must be provided.") | ||
| output_format = kwargs.pop("output_format") | ||
| output_name = _infer_output_name( | ||
| kwargs["fasta_file"] or kwargs["config"], kwargs.pop("output_name") | ||
| ) | ||
| # Run and write output for each batch | ||
| for i, result_batch in enumerate(ms2pip.core.predict_library(*args, **kwargs)): | ||
| write_spectra(output_name, result_batch, output_format, write_mode="w" if i == 0 else "a") | ||
| @cli.command(help=ms2pip.core.correlate.__doc__) | ||
@@ -140,4 +153,3 @@ @click.argument("psms", required=True) | ||
| # Parse arguments | ||
| output_name = kwargs.pop("output_name") | ||
| output_name = _infer_output_name(kwargs["psms"], output_name) | ||
| output_name = _infer_output_name(kwargs["psms"], kwargs.pop("output_name")) | ||
@@ -147,14 +159,13 @@ # Run | ||
| # Write output | ||
| output_name_int = output_name.with_name(output_name.stem + "_predictions").with_suffix(".csv") | ||
| logger.info(f"Writing intensities to {output_name_int}") | ||
| results_to_csv(results, output_name_int) | ||
| # TODO: add support for other output formats | ||
| # Write intensities | ||
| logger.info(f"Writing intensities to {output_name.with_suffix('.tsv')}") | ||
| write_spectra(output_name, results, "tsv") | ||
| # Write correlations | ||
| if kwargs["compute_correlations"]: | ||
| output_name_corr = output_name.with_name(output_name.stem + "_correlations") | ||
| output_name_corr = output_name_corr.with_suffix(".csv") | ||
| output_name_corr = output_name.with_name(output_name.stem + "_correlations").with_suffix( | ||
| ".tsv" | ||
| ) | ||
| logger.info(f"Writing correlations to {output_name_corr}") | ||
| correlations_to_csv(results, output_name_corr) | ||
| write_correlations(results, output_name_corr) | ||
@@ -201,6 +212,6 @@ | ||
| # Write output | ||
| output_name_int = output_name.with_name(output_name.stem + "_observations").with_suffix(".csv") | ||
| logger.info(f"Writing intensities to {output_name_int}") | ||
| results_to_csv(results, output_name_int) | ||
| # Write intensities | ||
| output_name_int = output_name.with_name(output_name.stem + "_observations").with_suffix() | ||
| logger.info(f"Writing intensities to {output_name_int.with_suffix('.tsv')}") | ||
| write_spectra(output_name, results, "tsv") | ||
@@ -211,3 +222,3 @@ | ||
| cli() | ||
| except UnresolvableModificationError as e: | ||
| except exceptions.UnresolvableModificationError as e: | ||
| logger.critical( | ||
@@ -220,11 +231,9 @@ "Unresolvable modification: `%s`. See " | ||
| sys.exit(1) | ||
| except UnknownOutputFormatError as o: | ||
| logger.critical( | ||
| f"Unknown output format: `{o}` (supported formats: `{SUPPORTED_OUTPUT_FORMATS}`)" | ||
| ) | ||
| except exceptions.UnknownOutputFormatError as o: | ||
| logger.critical(f"Unknown output format: `{o}` (supported formats: `{SUPPORTED_FORMATS}`)") | ||
| sys.exit(1) | ||
| except UnknownModelError as f: | ||
| except exceptions.UnknownModelError as f: | ||
| logger.critical(f"Unknown model: `{f}` (supported models: {set(MODELS.keys())})") | ||
| sys.exit(1) | ||
| except InvalidXGBoostModelError: | ||
| except exceptions.InvalidXGBoostModelError: | ||
| logger.critical("Could not correctly download XGBoost model\nTry a manual download.") | ||
@@ -231,0 +240,0 @@ sys.exit(1) |
| """Database configuration for EncyclopeDIA DLIB SQLite format.""" | ||
| import zlib | ||
| from pathlib import Path | ||
| from typing import Union | ||
@@ -94,5 +96,5 @@ import numpy | ||
| def open_sqlite(filename): | ||
| def open_sqlite(filename: Union[str, Path]) -> sqlalchemy.engine.Connection: | ||
| engine = sqlalchemy.create_engine(f"sqlite:///{filename}") | ||
| metadata.bind = engine | ||
| return engine.connect() |
@@ -10,4 +10,2 @@ from __future__ import annotations | ||
| from ms2pip import exceptions | ||
| logger = logging.getLogger(__name__) | ||
@@ -27,6 +25,2 @@ | ||
| # Validate runs and collections | ||
| if not len(psm_list.collections) == 1 or not len(psm_list.runs) == 1: | ||
| raise exceptions.InvalidInputError("PSMs should be for a single run and collection.") | ||
| # Apply fixed modifications if any | ||
@@ -33,0 +27,0 @@ psm_list.apply_fixed_modifications() |
@@ -56,2 +56,3 @@ """Utilities for handling XGBoost model files within the MS²PIP prediction framework.""" | ||
| preds = xgb_model.predict(features) | ||
| preds = preds.clip(min=np.log2(0.001)) # Clip negative intensities | ||
| xgb_model.__del__() | ||
@@ -58,0 +59,0 @@ |
| """Constants and fixed configurations for MS²PIP.""" | ||
| # Supported output formats | ||
| SUPPORTED_OUTPUT_FORMATS = ["csv", "mgf", "msp", "bibliospec", "spectronaut", "dlib"] | ||
| # Models and their properties | ||
@@ -7,0 +4,0 @@ # id is passed to get_predictions to select model |
+90
-7
@@ -12,3 +12,3 @@ #!/usr/bin/env python | ||
| from pathlib import Path | ||
| from typing import Callable, List, Optional, Tuple, Union | ||
| from typing import Any, Callable, Generator, Iterable, List, Optional, Tuple, Union | ||
@@ -18,2 +18,3 @@ import numpy as np | ||
| from psm_utils import PSM, Peptidoform, PSMList | ||
| from rich.progress import track | ||
@@ -28,5 +29,7 @@ import ms2pip.exceptions as exceptions | ||
| from ms2pip._utils.xgb_models import get_predictions_xgb, validate_requested_xgb_model | ||
| from ms2pip.constants import MODELS, SUPPORTED_OUTPUT_FORMATS | ||
| from ms2pip.constants import MODELS | ||
| from ms2pip.result import ProcessingResult, calculate_correlations | ||
| from ms2pip.search_space import ProteomeSearchSpace | ||
| from ms2pip.spectrum_input import read_spectrum_file | ||
| from ms2pip.spectrum_output import SUPPORTED_FORMATS | ||
@@ -107,2 +110,4 @@ logger = logging.getLogger(__name__) | ||
| """ | ||
| if isinstance(psms, list): | ||
| psms = PSMList(psm_list=psms) | ||
| psm_list = read_psms(psms, filetype=psm_filetype) | ||
@@ -128,7 +133,66 @@ | ||
| def predict_library(): | ||
| """Predict spectral library from protein FASTA file.""" | ||
| raise NotImplementedError | ||
| def predict_library( | ||
| fasta_file: Optional[Union[str, Path]] = None, | ||
| config: Optional[Union[ProteomeSearchSpace, dict, str, Path]] = None, | ||
| add_retention_time: bool = False, | ||
| model: Optional[str] = "HCD", | ||
| model_dir: Optional[Union[str, Path]] = None, | ||
| batch_size: int = 100000, | ||
| processes: Optional[int] = None, | ||
| ) -> Generator[ProcessingResult, None, None]: | ||
| """ | ||
| Predict spectral library from protein FASTA file.\f | ||
| Parameters | ||
| ---------- | ||
| fasta_file | ||
| Path to FASTA file with protein sequences. Required if `search-space-config` is not | ||
| provided. | ||
| config | ||
| ProteomeSearchSpace, or a dictionary or path to JSON file with proteome search space | ||
| parameters. Required if `fasta_file` is not provided. | ||
| add_retention_time | ||
| Add retention time predictions with DeepLC (Requires optional DeepLC dependency). | ||
| model | ||
| Model to use for prediction. Default: "HCD". | ||
| model_dir | ||
| Directory where XGBoost model files are stored. Default: `~/.ms2pip`. | ||
| batch_size | ||
| Number of peptides to process in each batch. | ||
| processes | ||
| Number of parallel processes for multiprocessing steps. By default, all available. | ||
| """ | ||
| if fasta_file and config: | ||
| # Use provided proteome, but overwrite fasta_file | ||
| config = ProteomeSearchSpace.from_any(config) | ||
| config.fasta_file = fasta_file | ||
| elif fasta_file and not config: | ||
| # Default proteome search space with provided fasta_file | ||
| config = ProteomeSearchSpace(fasta_file=fasta_file) | ||
| elif not fasta_file and config: | ||
| # Use provided proteome | ||
| config = ProteomeSearchSpace.from_any(config) | ||
| else: | ||
| raise ValueError("Either `fasta_file` or `config` must be provided.") | ||
| search_space = ProteomeSearchSpace.from_any(config) | ||
| search_space.build() | ||
| for batch in track( | ||
| _into_batches(search_space, batch_size=batch_size), | ||
| description="Predicting spectra...", | ||
| total=ceil(len(search_space) / batch_size), | ||
| ): | ||
| logging.disable(logging.CRITICAL) | ||
| yield predict_batch( | ||
| search_space.filter_psms_by_mz(PSMList(psm_list=list(batch))), | ||
| add_retention_time=add_retention_time, | ||
| model=model, | ||
| model_dir=model_dir, | ||
| processes=processes, | ||
| ) | ||
| logging.disable(logging.NOTSET) | ||
| def correlate( | ||
@@ -432,3 +496,3 @@ psms: Union[PSMList, str, Path], | ||
| for output_format in output_formats: | ||
| if output_format not in SUPPORTED_OUTPUT_FORMATS: | ||
| if output_format not in SUPPORTED_FORMATS: | ||
| raise exceptions.UnknownOutputFormatError(output_format) | ||
@@ -553,2 +617,6 @@ self.output_formats = output_formats | ||
| """ | ||
| # Validate runs and collections | ||
| if not len(psm_list.collections) == 1 or not len(psm_list.runs) == 1: | ||
| raise exceptions.InvalidInputError("PSMs should be for a single run and collection.") | ||
| args = ( | ||
@@ -682,3 +750,6 @@ spectrum_file, | ||
| ) | ||
| predictions = {i: np.array(p, dtype=np.float32) for i, p in zip(ion_types, predictions)} | ||
| predictions = { | ||
| i: np.array(p, dtype=np.float32).clip(min=np.log2(0.001)) # Clip negative intensities | ||
| for i, p in zip(ion_types, predictions) | ||
| } | ||
| feature_vectors = None | ||
@@ -910,1 +981,13 @@ | ||
| return training_data | ||
| def _into_batches(iterable: Iterable[Any], batch_size: int) -> Generator[List[Any], None, None]: | ||
| """Accumulate iterator elements into batches of a given size.""" | ||
| batch = [] | ||
| for item in iterable: | ||
| batch.append(item) | ||
| if len(batch) == batch_size: | ||
| yield batch | ||
| batch = [] | ||
| if batch: | ||
| yield batch |
+6
-36
| """Definition and handling of MS²PIP results.""" | ||
| from __future__ import annotations | ||
@@ -6,6 +7,7 @@ | ||
| from typing import Any, Dict, List, Optional, Tuple | ||
| from logging import getLogger | ||
| import numpy as np | ||
| from psm_utils import PSM | ||
| from pydantic import ConfigDict, BaseModel | ||
| from pydantic import BaseModel, ConfigDict | ||
@@ -21,2 +23,3 @@ try: | ||
| logger = getLogger(__name__) | ||
@@ -120,42 +123,9 @@ class ProcessingResult(BaseModel): | ||
| def results_to_csv(results: List["ProcessingResult"], output_file: str) -> None: | ||
| """Write processing results to CSV file.""" | ||
| with open(output_file, "wt") as f: | ||
| fieldnames = [ | ||
| "psm_index", | ||
| "ion_type", | ||
| "ion_number", | ||
| "mz", | ||
| "predicted", | ||
| "observed", | ||
| ] | ||
| writer = csv.DictWriter(f, fieldnames=fieldnames, lineterminator="\n") | ||
| writer.writeheader() | ||
| for result in results: | ||
| if result.theoretical_mz is not None: | ||
| for ion_type in result.theoretical_mz: | ||
| for i in range(len(result.theoretical_mz[ion_type])): | ||
| writer.writerow( | ||
| { | ||
| "psm_index": result.psm_index, | ||
| "ion_type": ion_type, | ||
| "ion_number": i + 1, | ||
| "mz": "{:.6g}".format(result.theoretical_mz[ion_type][i]), | ||
| "predicted": "{:.6g}".format( | ||
| result.predicted_intensity[ion_type][i] | ||
| ) if result.predicted_intensity else None, | ||
| "observed": "{:.6g}".format(result.observed_intensity[ion_type][i]) | ||
| if result.observed_intensity | ||
| else None, | ||
| } | ||
| ) | ||
| def correlations_to_csv(results: List["ProcessingResult"], output_file: str) -> None: | ||
| def write_correlations(results: List["ProcessingResult"], output_file: str) -> None: | ||
| """Write correlations to CSV file.""" | ||
| with open(output_file, "wt") as f: | ||
| fieldnames = ["psm_index", "correlation"] | ||
| writer = csv.DictWriter(f, fieldnames=fieldnames, lineterminator="\n") | ||
| writer = csv.DictWriter(f, fieldnames=fieldnames, delimiter="\t", lineterminator="\n") | ||
| writer.writeheader() | ||
| for result in results: | ||
| writer.writerow({"psm_index": result.psm_index, "correlation": result.correlation}) |
+613
-668
| """ | ||
| Write spectrum files from MS2PIP predictions. | ||
| Write spectrum files from MS²PIP prediction results. | ||
| Examples | ||
| -------- | ||
| The simplest way to write MS²PIP predictions to a file is to use the :py:func:`write_spectra` | ||
| function: | ||
| >>> from ms2pip import predict_single, write_spectra | ||
| >>> results = [predict_single("ACDE/2")] | ||
| >>> write_spectra("/path/to/output/filename", results, "mgf") | ||
| Specific writer classes can also be used directly. Writer classes should be used in a context | ||
| manager to ensure the file is properly closed after writing. The following example writes MS²PIP | ||
| predictions to a TSV file: | ||
| >>> from ms2pip import predict_single | ||
| >>> results = [predict_single("ACDE/2")] | ||
| >>> with TSV("output.tsv") as writer: | ||
| ... writer.write(results) | ||
| Results can be written to the same file sequentially: | ||
| >>> results_2 = [predict_single("PEPTIDEK/2")] | ||
| >>> with TSV("output.tsv", write_mode="a") as writer: | ||
| ... writer.write(results) | ||
| ... writer.write(results_2) | ||
| Results can be written to an existing file using the append mode: | ||
| >>> with TSV("output.tsv", write_mode="a") as writer: | ||
| ... writer.write(results_2) | ||
| """ | ||
| from __future__ import annotations | ||
@@ -9,705 +44,677 @@ | ||
| import logging | ||
| import os | ||
| from ast import literal_eval | ||
| from functools import wraps | ||
| import re | ||
| import warnings | ||
| from abc import ABC, abstractmethod | ||
| from collections import defaultdict | ||
| from io import StringIO | ||
| from operator import itemgetter | ||
| from pathlib import Path | ||
| from time import localtime, strftime | ||
| from typing import Any, Dict, List | ||
| from typing import Any, Dict, Generator, List, Optional, Union | ||
| import numpy as np | ||
| from psm_utils import PSM, Peptidoform | ||
| from pyteomics import proforma | ||
| from sqlalchemy import engine, select | ||
| from ms2pip._utils import dlib | ||
| from ms2pip.result import ProcessingResult | ||
| logger = logging.getLogger(__name__) | ||
| LOGGER = logging.getLogger(__name__) | ||
| class InvalidWriteModeError(ValueError): | ||
| pass | ||
| def write_spectra( | ||
| filename: Union[str, Path], | ||
| processing_results: List[ProcessingResult], | ||
| file_format: str = "tsv", | ||
| write_mode: str = "w", | ||
| ): | ||
| """ | ||
| Write MS2PIP processing results to a supported spectrum file format. | ||
| Parameters | ||
| ---------- | ||
| filename | ||
| Output filename without file extension. | ||
| processing_results | ||
| List of :py:class:`ms2pip.result.ProcessingResult` objects. | ||
| file_format | ||
| File format to write. See :py:attr:`FILE_FORMATS` for available formats. | ||
| write_mode | ||
| Write mode for file. Default is ``w`` (write). Use ``a`` (append) to add to existing file. | ||
| # Writer decorator | ||
| def writer(**kwargs): | ||
| def deco(write_function): | ||
| @wraps(write_function) | ||
| def wrapper(self): | ||
| return self._write_general(write_function, **kwargs) | ||
| """ | ||
| with SUPPORTED_FORMATS[file_format](filename, write_mode) as writer: | ||
| LOGGER.info(f"Writing to {writer.filename}") | ||
| writer.write(processing_results) | ||
| return wrapper | ||
| return deco | ||
| class _Writer(ABC): | ||
| """Abstract base class for writing spectrum files.""" | ||
| suffix = "" | ||
| def output_format(output_format): | ||
| class OutputFormat: | ||
| def __init__(self, fn): | ||
| self.fn = fn | ||
| self.output_format = output_format | ||
| def __init__(self, filename: Union[str, Path], write_mode: str = "w"): | ||
| self.filename = Path(filename).with_suffix(self.suffix) | ||
| self.write_mode = write_mode | ||
| def __set_name__(self, owner, name): | ||
| owner.OUTPUT_FORMATS[self.output_format] = self.fn | ||
| setattr(owner, name, self.fn) | ||
| self._open_file = None | ||
| return OutputFormat | ||
| def __enter__(self): | ||
| """Open file in context manager.""" | ||
| self.open() | ||
| return self | ||
| def __exit__(self, exc_type, exc_value, traceback): | ||
| """Close file in context manager.""" | ||
| self.close() | ||
| class SpectrumOutput: | ||
| """Write MS2PIP predictions to various output formats.""" | ||
| def __repr__(self): | ||
| return f"{self.__class__.__name__}({self.filename, self.write_mode})" | ||
| OUTPUT_FORMATS = {} | ||
| def open(self): | ||
| """Open file.""" | ||
| if self._open_file: | ||
| self.close() | ||
| self._open_file = open(self.filename, self.write_mode) | ||
| def __init__( | ||
| self, | ||
| results: List["ProcessingResult"], | ||
| output_filename="ms2pip_predictions", | ||
| write_mode="wt+", | ||
| return_stringbuffer=False, | ||
| is_log_space=True, | ||
| ): | ||
| """ | ||
| Write MS2PIP predictions to various output formats. | ||
| def close(self): | ||
| """Close file.""" | ||
| if self._open_file: | ||
| self._open_file.close() | ||
| self._open_file = None | ||
| Parameters | ||
| ---------- | ||
| results: | ||
| List of ProcessingResult objects | ||
| output_filename: str, optional | ||
| path and name for output files, will be suffexed with `_predictions` and the | ||
| relevant file extension (default: ms2pip_predictions) | ||
| write_mode: str, optional | ||
| write mode to use: "wt+" to append to start a new file, "at" to append to an | ||
| existing file (default: "wt+") | ||
| return_stringbuffer: bool, optional | ||
| If True, files are written to a StringIO object, which the write function | ||
| returns. If False, files are written to a file on disk. | ||
| is_log_space: bool, optional | ||
| Set to true if predicted intensities in `all_preds` are in log-space. In that | ||
| case, intensities will first be transformed to "normal"-space. | ||
| Example | ||
| ------- | ||
| >>> so = ms2pip.spectrum_tools.spectrum_output.SpectrumOutput( | ||
| results | ||
| @property | ||
| def _file_object(self): | ||
| """Get open file object.""" | ||
| if self._open_file: | ||
| return self._open_file | ||
| else: | ||
| warnings.warn( | ||
| "Opening file outside of context manager. Manually close file after use." | ||
| ) | ||
| >>> so.write_msp() | ||
| >>> so.write_spectronaut() | ||
| self.open() | ||
| return self._open_file | ||
| """ | ||
| def write(self, processing_results: List[ProcessingResult]): | ||
| """Write multiple processing results to file.""" | ||
| for result in processing_results: | ||
| self._write_result(result) | ||
| self.results = results | ||
| self.output_filename = output_filename | ||
| self.write_mode = write_mode | ||
| self.return_stringbuffer = return_stringbuffer | ||
| self.is_log_space = is_log_space | ||
| @abstractmethod | ||
| def _write_result(self, result: ProcessingResult): | ||
| """Write single processing result to file.""" | ||
| ... | ||
| self.diff_modification_mapping = {} | ||
| self.has_rt = "rt" in self.peprec.columns | ||
| self.has_protein_list = "protein_list" in self.peprec.columns | ||
| class TSV(_Writer): | ||
| """Write TSV files from MS2PIP processing results.""" | ||
| if self.write_mode not in ["wt+", "wt", "at", "w", "a"]: | ||
| raise InvalidWriteModeError(self.write_mode) | ||
| suffix = ".tsv" | ||
| field_names = [ | ||
| "psm_index", | ||
| "ion_type", | ||
| "ion_number", | ||
| "mz", | ||
| "predicted", | ||
| "observed", | ||
| "rt", | ||
| ] | ||
| if "a" in self.write_mode and self.return_stringbuffer: | ||
| raise InvalidWriteModeError(self.write_mode) | ||
| def write(self, processing_results: List[ProcessingResult]): | ||
| """Write multiple processing results to file.""" | ||
| writer = csv.DictWriter( | ||
| self._file_object, fieldnames=self.field_names, delimiter="\t", lineterminator="\n" | ||
| ) | ||
| if self.write_mode == "w": | ||
| writer.writeheader() | ||
| for result in processing_results: | ||
| self._write_result(result, writer) | ||
| # def _generate_peprec_dict(self, rt_to_seconds=True): | ||
| # """ | ||
| # Create easy to access dict from all_preds and peprec dataframes | ||
| # """ | ||
| # peprec_tmp = self.peprec.copy() | ||
| def _write_result(self, result: ProcessingResult, writer: csv.DictWriter): | ||
| """Write single processing result to file.""" | ||
| # Only write results with predictions or observations | ||
| if not result.theoretical_mz: | ||
| return | ||
| # if self.has_rt and rt_to_seconds: | ||
| # peprec_tmp["rt"] = peprec_tmp["rt"] * 60 | ||
| for ion_type in result.theoretical_mz: | ||
| for i in range(len(result.theoretical_mz[ion_type])): | ||
| writer.writerow(self._write_row(result, ion_type, i)) | ||
| # peprec_tmp.index = peprec_tmp["spec_id"] | ||
| # peprec_tmp.drop("spec_id", axis=1, inplace=True) | ||
| @staticmethod | ||
| def _write_row(result: ProcessingResult, ion_type: str, ion_index: int): | ||
| """Write single row for TSV file.""" | ||
| return { | ||
| "psm_index": result.psm_index, | ||
| "ion_type": ion_type, | ||
| "ion_number": ion_index + 1, | ||
| "mz": "{:.8f}".format(result.theoretical_mz[ion_type][ion_index]), | ||
| "predicted": "{:.8f}".format(result.predicted_intensity[ion_type][ion_index]) | ||
| if result.predicted_intensity | ||
| else None, | ||
| "observed": "{:.8f}".format(result.observed_intensity[ion_type][ion_index]) | ||
| if result.observed_intensity | ||
| else None, | ||
| "rt": result.psm.retention_time if result.psm.retention_time else None, | ||
| } | ||
| # self.peprec_dict = peprec_tmp.to_dict(orient="index") | ||
| # def _generate_preds_dict(self): | ||
| # """ | ||
| # Create easy to access dict from peprec dataframes | ||
| # """ | ||
| # self.preds_dict = {} | ||
| # preds_list = self.all_preds[ | ||
| # ["spec_id", "charge", "ion", "ionnumber", "mz", "prediction"] | ||
| # ].values.tolist() | ||
| class MSP(_Writer): | ||
| """Write MSP files from MS2PIP processing results.""" | ||
| # for row in preds_list: | ||
| # spec_id = row[0] | ||
| # if spec_id in self.preds_dict.keys(): | ||
| # if row[2] in self.preds_dict[spec_id]["peaks"]: | ||
| # self.preds_dict[spec_id]["peaks"][row[2]].append(tuple(row[3:])) | ||
| # else: | ||
| # self.preds_dict[spec_id]["peaks"][row[2]] = [tuple(row[3:])] | ||
| # else: | ||
| # self.preds_dict[spec_id] = { | ||
| # "charge": row[1], | ||
| # "peaks": {row[2]: [tuple(row[3:])]}, | ||
| # } | ||
| suffix = ".msp" | ||
| # def _normalize_spectra(self, method="basepeak_10000"): | ||
| # """ | ||
| # Normalize spectra | ||
| # """ | ||
| # if self.is_log_space: | ||
| # self.all_preds["prediction"] = ((2 ** self.all_preds["prediction"]) - 0.001).clip( | ||
| # lower=0 | ||
| # ) | ||
| # self.is_log_space = False | ||
| def write(self, results: List[ProcessingResult]): | ||
| """Write multiple processing results to file.""" | ||
| for result in results: | ||
| self._write_result(result) | ||
| # if method == "basepeak_10000": | ||
| # if self.normalization == "basepeak_10000": | ||
| # pass | ||
| # elif self.normalization == "basepeak_1": | ||
| # self.all_preds["prediction"] *= 10000 | ||
| # else: | ||
| # self.all_preds["prediction"] = self.all_preds.groupby( | ||
| # ["spec_id"], group_keys=False | ||
| # )["prediction"].apply(lambda x: (x / x.max()) * 10000) | ||
| # self.normalization = "basepeak_10000" | ||
| def _write_result(self, result: ProcessingResult): | ||
| """Write single processing result to file.""" | ||
| predicted_spectrum = result.as_spectra()[0] | ||
| intensity_normalized = _basepeak_normalize(predicted_spectrum.intensity) * 1e4 | ||
| peaks = zip(predicted_spectrum.mz, intensity_normalized, predicted_spectrum.annotations) | ||
| # elif method == "basepeak_1": | ||
| # if self.normalization == "basepeak_1": | ||
| # pass | ||
| # elif self.normalization == "basepeak_10000": | ||
| # self.all_preds["prediction"] /= 10000 | ||
| # else: | ||
| # self.all_preds["prediction"] = self.all_preds.groupby( | ||
| # ["spec_id"], group_keys=False | ||
| # )["prediction"].apply(lambda x: (x / x.max())) | ||
| # self.normalization = "basepeak_1" | ||
| # Header | ||
| lines = [ | ||
| f"Name: {result.psm.peptidoform.sequence}/{result.psm.get_precursor_charge()}", | ||
| f"MW: {result.psm.peptidoform.theoretical_mass}", | ||
| self._format_comment_line(result.psm), | ||
| f"Num peaks: {len(predicted_spectrum.mz)}", | ||
| ] | ||
| # elif method == "tic": | ||
| # if self.normalization != "tic": | ||
| # self.all_preds["prediction"] = self.all_preds.groupby( | ||
| # ["spec_id"], group_keys=False | ||
| # )["prediction"].apply(lambda x: x / x.sum()) | ||
| # self.normalization = "tic" | ||
| # Peaks | ||
| lines.extend( | ||
| f"{mz:.8f}\t{intensity:.8f}\t{annotation}/0.0" for mz, intensity, annotation in peaks | ||
| ) | ||
| # else: | ||
| # raise NotImplementedError | ||
| # Write to file | ||
| self._file_object.writelines(line + "\n" for line in lines) | ||
| self._file_object.write("\n") | ||
| def _get_msp_peak_annotation( | ||
| self, | ||
| peak_dict, | ||
| sep="\t", | ||
| include_zero=False, | ||
| include_annotations=True, | ||
| intensity_type=float, | ||
| ): | ||
| """ | ||
| Get MGF/MSP-like peaklist string | ||
| """ | ||
| all_peaks = [] | ||
| for ion_type, peaks in peak_dict.items(): | ||
| for peak in peaks: | ||
| if not include_zero and peak[2] == 0: | ||
| continue | ||
| if include_annotations: | ||
| all_peaks.append( | ||
| ( | ||
| peak[1], | ||
| f'{peak[1]:.6f}{sep}{intensity_type(peak[2])}{sep}"{ion_type.lower()}{peak[0]}/0.0"', | ||
| ) | ||
| ) | ||
| else: | ||
| all_peaks.append((peak[1], f"{peak[1]:.6f}{sep}{peak[2]}")) | ||
| @staticmethod | ||
| def _format_modifications(peptidoform: Peptidoform): | ||
| """Format modifications in MSP-style string, e.g. ``Mods=1/0,E,Glu->pyro-Glu``.""" | ||
| all_peaks = sorted(all_peaks, key=itemgetter(0)) | ||
| peak_string = "\n".join([peak[1] for peak in all_peaks]) | ||
| def _format_single_modification( | ||
| amino_acid: str, | ||
| position: int, | ||
| modifications: Optional[List[proforma.ModificationBase]], | ||
| ) -> Union[str, None]: | ||
| """Get modification label from :py:class:`proforma.ModificationBase` list.""" | ||
| if not modifications: | ||
| return None | ||
| if len(modifications) > 1: | ||
| raise ValueError("Multiple modifications per amino acid not supported.") | ||
| modification = modifications[0] | ||
| return f"{position},{amino_acid},{modification.name}" | ||
| return peak_string | ||
| sequence_mods = [ | ||
| _format_single_modification(aa, pos + 1, mods) | ||
| for pos, (aa, mods) in enumerate(peptidoform.parsed_sequence) | ||
| ] | ||
| n_term = _format_single_modification( | ||
| peptidoform.sequence[0], 0, peptidoform.properties["n_term"] | ||
| ) | ||
| c_term = _format_single_modification( | ||
| peptidoform.sequence[-1], -1, peptidoform.properties["c_term"] | ||
| ) | ||
| def _get_msp_modifications(self, sequence, modifications): | ||
| """ | ||
| Format modifications in MSP-style, e.g. "1/0,E,Glu->pyro-Glu" | ||
| """ | ||
| mods = [mod for mod in [n_term] + sequence_mods + [c_term] if mod is not None] | ||
| if isinstance(modifications, str): | ||
| if not modifications or modifications == "-": | ||
| msp_modifications = "0" | ||
| else: | ||
| mods = modifications.split("|") | ||
| mods = [(int(mods[i]), mods[i + 1]) for i in range(0, len(mods), 2)] | ||
| mods = [(x, y) if x == 0 else (x - 1, y) for (x, y) in mods] | ||
| mods = sorted(mods) | ||
| mods = [(str(x), sequence[x], y) for (x, y) in mods] | ||
| msp_modifications = "/".join([",".join(list(x)) for x in mods]) | ||
| msp_modifications = f"{len(mods)}/{msp_modifications}" | ||
| if not mods: | ||
| return "Mods=0" | ||
| else: | ||
| msp_modifications = "0" | ||
| return f"Mods={len(mods)}/{'/'.join(mods)}" | ||
| return msp_modifications | ||
| @staticmethod | ||
| def _format_parent_mass(peptidoform: Peptidoform) -> str: | ||
| """Format parent mass as string.""" | ||
| return f"Parent={peptidoform.theoretical_mz}" | ||
| def _parse_protein_string(self, protein_list): | ||
| """ | ||
| Parse protein string from list, list string literal, or string. | ||
| """ | ||
| if isinstance(protein_list, list): | ||
| protein_string = "/".join(protein_list) | ||
| elif isinstance(protein_list, str): | ||
| try: | ||
| protein_string = "/".join(literal_eval(protein_list)) | ||
| except ValueError: | ||
| protein_string = protein_list | ||
| @staticmethod | ||
| def _format_protein_string(psm: PSM) -> Union[str, None]: | ||
| """Format protein list as string.""" | ||
| if psm.protein_list: | ||
| return f"Protein={','.join(psm.protein_list)}" | ||
| else: | ||
| protein_string = "" | ||
| return protein_string | ||
| return None | ||
| def _get_last_ssl_scannr(self): | ||
| """ | ||
| Return scan number of last line in a Bibliospec SSL file. | ||
| """ | ||
| ssl_filename = "{}_predictions.ssl".format(self.output_filename) | ||
| with open(ssl_filename, "rt") as ssl: | ||
| for line in ssl: | ||
| last_line = line | ||
| last_scannr = int(last_line.split("\t")[1]) | ||
| return last_scannr | ||
| @staticmethod | ||
| def _format_retention_time(psm: PSM) -> Union[str, None]: | ||
| """Format retention time as string.""" | ||
| if psm.retention_time: | ||
| return f"RetentionTime={psm.retention_time}" | ||
| else: | ||
| return None | ||
| def _generate_diff_modification_mapping(self, precision): | ||
| """ | ||
| Make modification name -> ssl modification name mapping. | ||
| """ | ||
| self.diff_modification_mapping[precision] = { | ||
| ptm.split(",")[0]: "{0:+.{1}f}".format(float(ptm.split(",")[1]), precision) | ||
| for ptm in self.params["ptm"] | ||
| } | ||
| @staticmethod | ||
| def _format_identifier(psm: PSM) -> str: | ||
| """Format MS2PIP ID as string.""" | ||
| return f"SpectrumIdentifier={psm.spectrum_id}" | ||
| def _get_diff_modified_sequence(self, sequence, modifications, precision=1): | ||
| """ | ||
| Build BiblioSpec SSL modified sequence string. | ||
| """ | ||
| pep = list(sequence) | ||
| mapping = self.diff_modification_mapping[precision] | ||
| @staticmethod | ||
| def _format_comment_line(psm: PSM) -> str: | ||
| """Format comment line for MSP file.""" | ||
| comments = " ".join( | ||
| filter( | ||
| None, | ||
| [ | ||
| MSP._format_modifications(psm.peptidoform), | ||
| MSP._format_parent_mass(psm.peptidoform), | ||
| MSP._format_protein_string(psm), | ||
| MSP._format_retention_time(psm), | ||
| MSP._format_identifier(psm), | ||
| ], | ||
| ) | ||
| ) | ||
| return f"Comment: {comments}" | ||
| for loc, name in zip(modifications.split("|")[::2], modifications.split("|")[1::2]): | ||
| # C-term mod | ||
| if loc == "-1": | ||
| pep[-1] = pep[-1] + "[{}]".format(mapping[name]) | ||
| # N-term mod | ||
| elif loc == "0": | ||
| pep[0] = pep[0] + "[{}]".format(mapping[name]) | ||
| # Normal mod | ||
| else: | ||
| pep[int(loc) - 1] = pep[int(loc) - 1] + "[{}]".format(mapping[name]) | ||
| return "".join(pep) | ||
| def write_results(self, output_formats: List[str]) -> Dict[str, Any]: | ||
| """ | ||
| Write MS2PIP predictions in output formats defined by output_formats. | ||
| """ | ||
| results = {} | ||
| for output_format in output_formats: | ||
| output_format = output_format.lower() | ||
| writer = self.OUTPUT_FORMATS[output_format] | ||
| results[output_format] = writer(self) | ||
| return results | ||
| class MGF(_Writer): | ||
| """Write MGF files from MS2PIP processing results.""" | ||
| @output_format("msp") | ||
| @writer( | ||
| file_suffix="_predictions.msp", | ||
| normalization_method="basepeak_10000", | ||
| requires_dicts=True, | ||
| requires_diff_modifications=False, | ||
| ) | ||
| def write_msp(self, file_object): | ||
| """ | ||
| Construct MSP string and write to file_object. | ||
| """ | ||
| suffix = ".mgf" | ||
| for spec_id in sorted(self.peprec_dict.keys()): | ||
| seq = self.peprec_dict[spec_id]["peptide"] | ||
| mods = self.peprec_dict[spec_id]["modifications"] | ||
| charge = self.peprec_dict[spec_id]["charge"] | ||
| prec_mass, prec_mz = self.mods.calc_precursor_mz(seq, mods, charge) | ||
| msp_modifications = self._get_msp_modifications(seq, mods) | ||
| num_peaks = sum( | ||
| [len(peaklist) for _, peaklist in self.preds_dict[spec_id]["peaks"].items()] | ||
| ) | ||
| def write(self, results: List[ProcessingResult]): | ||
| """Write multiple processing results to file.""" | ||
| for result in results: | ||
| self._write_result(result) | ||
| comment_line = f" Mods={msp_modifications} Parent={prec_mz}" | ||
| def _write_result(self, result: ProcessingResult): | ||
| """Write single processing result to file.""" | ||
| predicted_spectrum = result.as_spectra()[0] | ||
| intensity_normalized = _basepeak_normalize(predicted_spectrum.intensity) * 1e4 | ||
| peaks = zip(predicted_spectrum.mz, intensity_normalized) | ||
| if self.has_protein_list: | ||
| protein_list = self.peprec_dict[spec_id]["protein_list"] | ||
| protein_string = self._parse_protein_string(protein_list) | ||
| comment_line += f' Protein="{protein_string}"' | ||
| # Header | ||
| lines = [ | ||
| "BEGIN IONS", | ||
| f"TITLE={result.psm.peptidoform}", | ||
| f"PEPMASS={result.psm.peptidoform.theoretical_mz}", | ||
| f"CHARGE={result.psm.get_precursor_charge()}+", | ||
| f"SCANS={result.psm.spectrum_id}", | ||
| f"RTINSECONDS={result.psm.retention_time}" if result.psm.retention_time else None, | ||
| ] | ||
| if self.has_rt: | ||
| rt = self.peprec_dict[spec_id]["rt"] | ||
| comment_line += f" RetentionTime={rt}" | ||
| # Peaks | ||
| lines.extend(f"{mz:.8f} {intensity:.8f}" for mz, intensity in peaks) | ||
| comment_line += f' MS2PIP_ID="{spec_id}"' | ||
| # Write to file | ||
| self._file_object.writelines(line + "\n" for line in lines if line) | ||
| self._file_object.write("END IONS\n\n") | ||
| out = [ | ||
| f"Name: {seq}/{charge}", | ||
| f"MW: {prec_mass}", | ||
| f"Comment:{comment_line}", | ||
| f"Num peaks: {num_peaks}", | ||
| self._get_msp_peak_annotation( | ||
| self.preds_dict[spec_id]["peaks"], | ||
| sep="\t", | ||
| include_annotations=True, | ||
| intensity_type=int, | ||
| ), | ||
| ] | ||
| file_object.writelines([line + "\n" for line in out] + ["\n"]) | ||
| class Spectronaut(_Writer): | ||
| """Write Spectronaut files from MS2PIP processing results.""" | ||
| @output_format("mgf") | ||
| @writer( | ||
| file_suffix="_predictions.mgf", | ||
| normalization_method="basepeak_10000", | ||
| requires_dicts=True, | ||
| requires_diff_modifications=False, | ||
| ) | ||
| def write_mgf(self, file_object): | ||
| """ | ||
| Construct MGF string and write to file_object | ||
| """ | ||
| for spec_id in sorted(self.peprec_dict.keys()): | ||
| seq = self.peprec_dict[spec_id]["peptide"] | ||
| mods = self.peprec_dict[spec_id]["modifications"] | ||
| charge = self.peprec_dict[spec_id]["charge"] | ||
| _, prec_mz = self.mods.calc_precursor_mz(seq, mods, charge) | ||
| msp_modifications = self._get_msp_modifications(seq, mods) | ||
| suffix = ".spectronaut.tsv" | ||
| field_names = [ | ||
| "ModifiedPeptide", | ||
| "StrippedPeptide", | ||
| "PrecursorCharge", | ||
| "PrecursorMz", | ||
| "IonMobility", | ||
| "iRT", | ||
| "ProteinId", | ||
| "RelativeFragmentIntensity", | ||
| "FragmentMz", | ||
| "FragmentType", | ||
| "FragmentNumber", | ||
| "FragmentCharge", | ||
| "FragmentLossType", | ||
| ] | ||
| if self.has_protein_list: | ||
| protein_list = self.peprec_dict[spec_id]["protein_list"] | ||
| protein_string = self._parse_protein_string(protein_list) | ||
| else: | ||
| protein_string = "" | ||
| def write(self, processing_results: List[ProcessingResult]): | ||
| """Write multiple processing results to file.""" | ||
| writer = csv.DictWriter( | ||
| self._file_object, fieldnames=self.field_names, delimiter="\t", lineterminator="\n" | ||
| ) | ||
| if self.write_mode == "w": | ||
| writer.writeheader() | ||
| for result in processing_results: | ||
| self._write_result(result, writer) | ||
| out = [ | ||
| "BEGIN IONS", | ||
| f"TITLE={spec_id} {seq}/{charge} {msp_modifications} {protein_string}", | ||
| f"PEPMASS={prec_mz}", | ||
| f"CHARGE={charge}+", | ||
| ] | ||
| def _write_result(self, result: ProcessingResult, writer: csv.DictWriter): | ||
| """Write single processing result to file.""" | ||
| # Only write results with predictions | ||
| if result.predicted_intensity is None: | ||
| return | ||
| psm_info = self._process_psm(result.psm) | ||
| for fragment_info in self._yield_fragment_info(result): | ||
| writer.writerow({**psm_info, **fragment_info}) | ||
| if self.has_rt: | ||
| rt = self.peprec_dict[spec_id]["rt"] | ||
| out.append(f"RTINSECONDS={rt}") | ||
| @staticmethod | ||
| def _process_psm(psm: PSM) -> Dict[str, Any]: | ||
| """Process PSM to Spectronaut format.""" | ||
| return { | ||
| "ModifiedPeptide": _peptidoform_str_without_charge(psm.peptidoform), | ||
| "StrippedPeptide": psm.peptidoform.sequence, | ||
| "PrecursorCharge": psm.get_precursor_charge(), | ||
| "PrecursorMz": f"{psm.peptidoform.theoretical_mz:.8f}", | ||
| "IonMobility": f"{psm.ion_mobility:.8f}" if psm.ion_mobility else None, | ||
| "iRT": f"{psm.retention_time:.8f}" if psm.retention_time else None, | ||
| "ProteinId": "".join(psm.protein_list) if psm.protein_list else None, | ||
| } | ||
| out.append( | ||
| self._get_msp_peak_annotation( | ||
| self.preds_dict[spec_id]["peaks"], | ||
| sep=" ", | ||
| include_annotations=False, | ||
| ) | ||
| ) | ||
| out.append("END IONS\n") | ||
| file_object.writelines([line + "\n" for line in out]) | ||
| @staticmethod | ||
| def _yield_fragment_info(result: ProcessingResult) -> Generator[Dict[str, Any], None, None]: | ||
| """Yield fragment information for a processing result.""" | ||
| # Normalize intensities | ||
| intensities = { | ||
| ion_type: _unlogarithmize(intensities) | ||
| for ion_type, intensities in result.predicted_intensity.items() | ||
| } | ||
| max_intensity = max(itertools.chain(*intensities.values())) | ||
| intensities = { | ||
| ion_type: _basepeak_normalize(intensities[ion_type], basepeak=max_intensity) | ||
| for ion_type in intensities | ||
| } | ||
| for ion_type in result.predicted_intensity: | ||
| fragment_type = ion_type[0].lower() | ||
| fragment_charge = ion_type[1:] if len(ion_type) > 1 else "1" | ||
| for ion_index, (intensity, mz) in enumerate( | ||
| zip(intensities[ion_type], result.theoretical_mz[ion_type]) | ||
| ): | ||
| yield { | ||
| "RelativeFragmentIntensity": f"{intensity:.8f}", | ||
| "FragmentMz": f"{mz:.8f}", | ||
| "FragmentType": fragment_type, | ||
| "FragmentNumber": ion_index + 1, | ||
| "FragmentCharge": fragment_charge, | ||
| "FragmentLossType": "noloss", | ||
| } | ||
| @output_format("spectronaut") | ||
| @writer( | ||
| file_suffix="_predictions_spectronaut.csv", | ||
| normalization_method="tic", | ||
| requires_dicts=False, | ||
| requires_diff_modifications=True, | ||
| ) | ||
| def write_spectronaut(self, file_obj): | ||
| """ | ||
| Construct spectronaut DataFrame and write to file_object. | ||
| """ | ||
| if "w" in self.write_mode: | ||
| header = True | ||
| elif "a" in self.write_mode: | ||
| header = False | ||
| else: | ||
| raise InvalidWriteModeError(self.write_mode) | ||
| spectronaut_peprec = self.peprec.copy() | ||
| class Bibliospec(_Writer): | ||
| """ | ||
| Write Bibliospec SSL and MS2 files from MS2PIP processing results. | ||
| # ModifiedPeptide and PrecursorMz columns | ||
| spectronaut_peprec["ModifiedPeptide"] = spectronaut_peprec.apply( | ||
| lambda row: self._get_diff_modified_sequence(row["peptide"], row["modifications"]), | ||
| axis=1, | ||
| ) | ||
| spectronaut_peprec["PrecursorMz"] = spectronaut_peprec.apply( | ||
| lambda row: self.mods.calc_precursor_mz( | ||
| row["peptide"], row["modifications"], row["charge"] | ||
| )[1], | ||
| axis=1, | ||
| ) | ||
| spectronaut_peprec["ModifiedPeptide"] = "_" + spectronaut_peprec["ModifiedPeptide"] + "_" | ||
| Bibliospec SSL and MS2 files are also compatible with Skyline. | ||
| # Additional columns | ||
| spectronaut_peprec["FragmentLossType"] = "noloss" | ||
| """ | ||
| # Retention time | ||
| if "rt" in spectronaut_peprec.columns: | ||
| rt_cols = ["iRT"] | ||
| spectronaut_peprec["iRT"] = spectronaut_peprec["rt"] | ||
| else: | ||
| rt_cols = [] | ||
| ssl_suffix = ".ssl" | ||
| ms2_suffix = ".ms2" | ||
| ssl_field_names = [ | ||
| "file", | ||
| "scan", | ||
| "charge", | ||
| "sequence", | ||
| "score-type", | ||
| "score", | ||
| "retention-time", | ||
| ] | ||
| # ProteinId | ||
| if self.has_protein_list: | ||
| spectronaut_peprec["ProteinId"] = spectronaut_peprec["protein_list"].apply( | ||
| self._parse_protein_string | ||
| ) | ||
| else: | ||
| spectronaut_peprec["ProteinId"] = spectronaut_peprec["spec_id"] | ||
| def __init__(self, filename: Union[str, Path], write_mode: str = "w"): | ||
| super().__init__(filename, write_mode) | ||
| self.ssl_file = self.filename.with_suffix(self.ssl_suffix) | ||
| self.ms2_file = self.filename.with_suffix(self.ms2_suffix) | ||
| # Rename columns and merge with predictions | ||
| spectronaut_peprec = spectronaut_peprec.rename( | ||
| columns={"charge": "PrecursorCharge", "peptide": "StrippedPeptide"} | ||
| ) | ||
| peptide_cols = ( | ||
| [ | ||
| "ModifiedPeptide", | ||
| "StrippedPeptide", | ||
| "PrecursorCharge", | ||
| "PrecursorMz", | ||
| "ProteinId", | ||
| ] | ||
| + rt_cols | ||
| + ["FragmentLossType"] | ||
| ) | ||
| spectronaut_df = spectronaut_peprec[peptide_cols + ["spec_id"]] | ||
| spectronaut_df = self.all_preds.merge(spectronaut_df, on="spec_id") | ||
| self._open_ssl_file = None | ||
| self._open_ms2_file = None | ||
| # Fragment columns | ||
| spectronaut_df["FragmentCharge"] = ( | ||
| spectronaut_df["ion"].str.contains("2").map({True: 2, False: 1}) | ||
| ) | ||
| spectronaut_df["FragmentType"] = spectronaut_df["ion"].str[0].str.lower() | ||
| def open(self): | ||
| """Open files.""" | ||
| self._open_ssl_file = open(self.ssl_file, self.write_mode) | ||
| self._open_ms2_file = open(self.ms2_file, self.write_mode) | ||
| # Rename and sort columns | ||
| spectronaut_df = spectronaut_df.rename( | ||
| columns={ | ||
| "mz": "FragmentMz", | ||
| "prediction": "RelativeIntensity", | ||
| "ionnumber": "FragmentNumber", | ||
| } | ||
| ) | ||
| fragment_cols = [ | ||
| "FragmentCharge", | ||
| "FragmentMz", | ||
| "RelativeIntensity", | ||
| "FragmentType", | ||
| "FragmentNumber", | ||
| ] | ||
| spectronaut_df = spectronaut_df[peptide_cols + fragment_cols] | ||
| try: | ||
| spectronaut_df.to_csv( | ||
| file_obj, index=False, header=header, sep=";", lineterminator="\n" | ||
| def close(self): | ||
| """Close files.""" | ||
| if self._open_ssl_file: | ||
| self._open_ssl_file.close() | ||
| self._open_ssl_file = None | ||
| if self._open_ms2_file: | ||
| self._open_ms2_file.close() | ||
| self._open_ms2_file = None | ||
| @property | ||
| def _ssl_file_object(self): | ||
| """Get open SSL file object.""" | ||
| if self._open_ssl_file: | ||
| return self._open_ssl_file | ||
| else: | ||
| warnings.warn( | ||
| "Opening file outside of context manager. Manually close file after use." | ||
| ) | ||
| except TypeError: # Pandas < 1.5 (Required for Python 3.7 support) | ||
| spectronaut_df.to_csv( | ||
| file_obj, index=False, header=header, sep=";", line_terminator="\n" | ||
| self.open() | ||
| return self._open_ssl_file | ||
| @property | ||
| def _ms2_file_object(self): | ||
| """Get open MS2 file object.""" | ||
| if self._open_ms2_file: | ||
| return self._open_ms2_file | ||
| else: | ||
| warnings.warn( | ||
| "Opening file outside of context manager. Manually close file after use." | ||
| ) | ||
| self.open() | ||
| return self._open_ms2_file | ||
| return file_obj | ||
| def write(self, processing_results: List[ProcessingResult]): | ||
| """Write multiple processing results to file.""" | ||
| # Create CSV writer | ||
| ssl_dict_writer = csv.DictWriter( | ||
| self._ssl_file_object, | ||
| fieldnames=self.ssl_field_names, | ||
| delimiter="\t", | ||
| lineterminator="\n", | ||
| ) | ||
| def _write_bibliospec_core(self, file_obj_ssl, file_obj_ms2, start_scannr=0): | ||
| """Construct Bibliospec SSL/MS2 strings and write to file_objects.""" | ||
| # Write headers | ||
| if self.write_mode == "w": | ||
| ssl_dict_writer.writeheader() | ||
| self._write_ms2_header() | ||
| start_scan_number = 0 | ||
| elif self.write_mode == "a": | ||
| start_scan_number = self._get_last_ssl_scan_number(self.ssl_file) + 1 | ||
| else: | ||
| raise ValueError(f"Unsupported write mode: {self.write_mode}") | ||
| for i, spec_id in enumerate(sorted(self.preds_dict.keys())): | ||
| scannr = i + start_scannr | ||
| seq = self.peprec_dict[spec_id]["peptide"] | ||
| mods = self.peprec_dict[spec_id]["modifications"] | ||
| charge = self.peprec_dict[spec_id]["charge"] | ||
| prec_mass, prec_mz = self.mods.calc_precursor_mz(seq, mods, charge) | ||
| ms2_filename = os.path.basename(self.output_filename) + "_predictions.ms2" | ||
| # Write results | ||
| for i, result in enumerate(processing_results): | ||
| scan_number = start_scan_number + i | ||
| modified_sequence = self._format_modified_sequence(result.psm.peptidoform) | ||
| self._write_result(result, modified_sequence, scan_number, ssl_dict_writer) | ||
| peaks = self._get_msp_peak_annotation( | ||
| self.preds_dict[spec_id]["peaks"], | ||
| sep="\t", | ||
| include_annotations=False, | ||
| ) | ||
| def _write_ms2_header(self): | ||
| """Write header to MS2 file.""" | ||
| self._ms2_file_object.write( | ||
| f"H\tCreationDate\t{strftime('%Y-%m-%d %H:%M:%S', localtime())}\n" | ||
| ) | ||
| self._ms2_file_object.write("H\tExtractor\tMS2PIP predictions\n") | ||
| if isinstance(mods, str) and mods != "-" and mods != "": | ||
| mod_seq = self._get_diff_modified_sequence(seq, mods) | ||
| else: | ||
| mod_seq = seq | ||
| def _write_result( | ||
| self, | ||
| result: ProcessingResult, | ||
| modified_sequence: str, | ||
| scan_number: int, | ||
| writer: csv.DictWriter, | ||
| ): | ||
| """Write single processing result to files.""" | ||
| self._write_result_to_ssl(result, modified_sequence, scan_number, writer) | ||
| self._write_result_to_ms2(result, modified_sequence, scan_number) | ||
| rt = self.peprec_dict[spec_id]["rt"] if self.has_rt else "" | ||
| # TODO: implement csv instead of manual writing | ||
| file_obj_ssl.write( | ||
| "\t".join([ms2_filename, str(scannr), str(charge), mod_seq, "", "", str(rt)]) | ||
| + "\n" | ||
| ) | ||
| file_obj_ms2.write( | ||
| "\n".join( | ||
| [ | ||
| f"S\t{scannr}\t{prec_mz}", | ||
| f"Z\t{charge}\t{prec_mass}", | ||
| f"D\tseq\t{seq}", | ||
| f"D\tmodified seq\t{mod_seq}", | ||
| peaks, | ||
| ] | ||
| ) | ||
| + "\n" | ||
| ) | ||
| def _write_general( | ||
| def _write_result_to_ssl( | ||
| self, | ||
| write_function, | ||
| file_suffix, | ||
| normalization_method, | ||
| requires_dicts, | ||
| requires_diff_modifications, | ||
| diff_modification_precision=1, | ||
| result: ProcessingResult, | ||
| modified_sequence: str, | ||
| scan_number: int, | ||
| writer: csv.DictWriter, | ||
| ): | ||
| """ | ||
| General write function to call core write functions. | ||
| """Write single processing result to the SSL file.""" | ||
| writer.writerow( | ||
| { | ||
| "file": self.ms2_file.name if isinstance(self.ms2_file, Path) else "file.ms2", | ||
| "scan": scan_number, | ||
| "charge": result.psm.get_precursor_charge(), | ||
| "sequence": modified_sequence, | ||
| "score-type": None, | ||
| "score": None, | ||
| "retention-time": result.psm.retention_time if result.psm.retention_time else None, | ||
| } | ||
| ) | ||
| Note: Does not work for write_bibliospec and write_dlib functions. | ||
| """ | ||
| def _write_result_to_ms2( | ||
| self, result: ProcessingResult, modified_sequence: str, scan_number: int | ||
| ): | ||
| """Write single processing result to the MS2 file.""" | ||
| predicted_spectrum = result.as_spectra()[0] | ||
| intensity_normalized = _basepeak_normalize(predicted_spectrum.intensity) * 1e4 | ||
| peaks = zip(predicted_spectrum.mz, intensity_normalized) | ||
| # Normalize if necessary and make dicts | ||
| if not self.normalization == normalization_method: | ||
| self._normalize_spectra(method=normalization_method) | ||
| if requires_dicts: | ||
| self._generate_preds_dict() | ||
| elif requires_dicts and not self.preds_dict: | ||
| self._generate_preds_dict() | ||
| if requires_dicts and not self.peprec_dict: | ||
| self._generate_peprec_dict() | ||
| # Header | ||
| lines = [ | ||
| f"S\t{scan_number}\t{result.psm.peptidoform.theoretical_mz}", | ||
| f"Z\t{result.psm.get_precursor_charge()}\t{result.psm.peptidoform.theoretical_mass}", | ||
| f"D\tseq\t{result.psm.peptidoform.sequence}", | ||
| f"D\tmodified seq\t{modified_sequence}", | ||
| ] | ||
| if ( | ||
| requires_diff_modifications | ||
| and diff_modification_precision not in self.diff_modification_mapping | ||
| ): | ||
| self._generate_diff_modification_mapping(diff_modification_precision) | ||
| # Peaks | ||
| lines.extend(f"{mz:.8f}\t{intensity:.8f}" for mz, intensity in peaks) | ||
| # Write to file or stringbuffer | ||
| if self.return_stringbuffer: | ||
| file_object = StringIO() | ||
| logger.info("Writing results to StringIO using %s", write_function.__name__) | ||
| # Write to file | ||
| self._ms2_file_object.writelines(line + "\n" for line in lines) | ||
| self._ms2_file_object.write("\n") | ||
| @staticmethod | ||
| def _format_modified_sequence(peptidoform: Peptidoform) -> str: | ||
| """Format modified sequence as string for Spectronaut.""" | ||
| modification_dict = defaultdict(list) | ||
| for term, position in [("n_term", 0), ("c_term", len(peptidoform) - 1)]: | ||
| if peptidoform.properties[term]: | ||
| modification_dict[position].extend(peptidoform.properties[term]) | ||
| for position, (_, mods) in enumerate(peptidoform.parsed_sequence): | ||
| if mods: | ||
| modification_dict[position].extend(mods) | ||
| return "".join( | ||
| [ | ||
| f"{aa}{''.join([f'[{mod.mass:+.1f}]' for mod in modification_dict[position]])}" | ||
| for position, aa in enumerate(peptidoform.sequence) | ||
| ] | ||
| ) | ||
| @staticmethod | ||
| def _get_last_ssl_scan_number(ssl_file: Union[str, Path, StringIO]): | ||
| """Read scan number of last line in a Bibliospec SSL file.""" | ||
| if isinstance(ssl_file, StringIO): | ||
| ssl_file.seek(0) | ||
| for line in ssl_file: | ||
| last_line = line | ||
| elif isinstance(ssl_file, (str, Path)): | ||
| with open(ssl_file, "rt") as ssl: | ||
| for line in ssl: | ||
| last_line = line | ||
| else: | ||
| f_name = self.output_filename + file_suffix | ||
| file_object = open(f_name, self.write_mode) | ||
| logger.info("Writing results to %s", f_name) | ||
| raise TypeError("Unsupported type for `ssl_file`.") | ||
| return int(last_line.split("\t")[1]) | ||
| write_function(self, file_object) | ||
| return file_object | ||
| class DLIB(_Writer): | ||
| """ | ||
| Write DLIB files from MS2PIP processing results. | ||
| @output_format("bibliospec") | ||
| def write_bibliospec(self): | ||
| """Write MS2PIP predictions to BiblioSpec/Skyline SSL and MS2 spectral library files.""" | ||
| precision = 1 | ||
| if precision not in self.diff_modification_mapping: | ||
| self._generate_diff_modification_mapping(precision) | ||
| See `EncyclopeDIA File Formats <https://bitbucket.org/searleb/encyclopedia/wiki/EncyclopeDIA%20File%20Formats>`_ | ||
| for documentation on the DLIB format. | ||
| # Normalize if necessary and make dicts | ||
| if not self.normalization == "basepeak_10000": | ||
| self._normalize_spectra(method="basepeak_10000") | ||
| self._generate_preds_dict() | ||
| elif not self.preds_dict: | ||
| self._generate_preds_dict() | ||
| if not self.peprec_dict: | ||
| self._generate_peprec_dict() | ||
| """ | ||
| if self.return_stringbuffer: | ||
| file_obj_ssl = StringIO() | ||
| file_obj_ms2 = StringIO() | ||
| else: | ||
| file_obj_ssl = open("{}_predictions.ssl".format(self.output_filename), self.write_mode) | ||
| file_obj_ms2 = open("{}_predictions.ms2".format(self.output_filename), self.write_mode) | ||
| suffix = ".dlib" | ||
| # If a new file is written, write headers | ||
| if "w" in self.write_mode: | ||
| start_scannr = 0 | ||
| ssl_header = [ | ||
| "file", | ||
| "scan", | ||
| "charge", | ||
| "sequence", | ||
| "score-type", | ||
| "score", | ||
| "retention-time", | ||
| "\n", | ||
| ] | ||
| file_obj_ssl.write("\t".join(ssl_header)) | ||
| file_obj_ms2.write( | ||
| "H\tCreationDate\t{}\n".format(strftime("%Y-%m-%d %H:%M:%S", localtime())) | ||
| ) | ||
| file_obj_ms2.write("H\tExtractor\tMS2PIP predictions\n") | ||
| else: | ||
| # Get last scan number of ssl file, to continue indexing from there | ||
| # because Bibliospec speclib scan numbers can only be integers | ||
| start_scannr = self._get_last_ssl_scannr() + 1 | ||
| def open(self): | ||
| """Open file.""" | ||
| if self.write_mode == "w": | ||
| self._open_file = self.filename.unlink(missing_ok=True) | ||
| self._open_file = dlib.open_sqlite(self.filename) | ||
| self._write_bibliospec_core(file_obj_ssl, file_obj_ms2, start_scannr=start_scannr) | ||
| def write(self, processing_results: List[ProcessingResult]): | ||
| """Write MS2PIP predictions to a DLIB SQLite file.""" | ||
| connection = self._file_object | ||
| dlib.metadata.create_all() | ||
| self._write_metadata(connection) | ||
| self._write_entries(processing_results, connection, self.filename) | ||
| self._write_peptide_to_protein(processing_results, connection) | ||
| return file_obj_ssl, file_obj_ms2 | ||
| def _write_result(self, result: ProcessingResult): ... | ||
| def _write_dlib_metadata(self, connection): | ||
| from sqlalchemy import select | ||
| @staticmethod | ||
| def _format_modified_sequence(peptidoform: Peptidoform) -> str: | ||
| """Format modified sequence as string for DLIB.""" | ||
| # Sum all sequential mass shifts for each position | ||
| masses = [ | ||
| sum(mod.mass for mod in mods) if mods else 0 for _, mods in peptidoform.parsed_sequence | ||
| ] | ||
| from ms2pip._utils.dlib import DLIB_VERSION, Metadata | ||
| # Add N- and C-terminal modifications | ||
| for term, position in [("n_term", 0), ("c_term", len(peptidoform) - 1)]: | ||
| if peptidoform.properties[term]: | ||
| masses[position] += sum(mod.mass for mod in peptidoform.properties[term]) | ||
| # Format modified sequence | ||
| return "".join( | ||
| [ | ||
| f"{aa}[{mass:+.6f}]" if mass else aa | ||
| for aa, mass in zip(peptidoform.sequence, masses) | ||
| ] | ||
| ) | ||
| @staticmethod | ||
| def _write_metadata(connection: engine.Connection): | ||
| """Write metadata to DLIB SQLite file.""" | ||
| with connection.begin(): | ||
| version = connection.execute( | ||
| select([Metadata.c.Value]).where(Metadata.c.Key == "version") | ||
| select([dlib.Metadata.c.Value]).where(dlib.Metadata.c.Key == "version") | ||
| ).scalar() | ||
| if version is None: | ||
| connection.execute( | ||
| Metadata.insert().values( | ||
| dlib.Metadata.insert().values( | ||
| Key="version", | ||
| Value=DLIB_VERSION, | ||
| Value=dlib.DLIB_VERSION, | ||
| ) | ||
| ) | ||
| def _write_dlib_entries(self, connection, precision): | ||
| from ms2pip._utils.dlib import Entry | ||
| peptide_to_proteins = set() | ||
| @staticmethod | ||
| def _write_entries( | ||
| processing_results: List[ProcessingResult], | ||
| connection: engine.Connection, | ||
| output_filename: str, | ||
| ): | ||
| """Write spectra to DLIB SQLite file.""" | ||
| with connection.begin(): | ||
| for spec_id, peprec in self.peprec_dict.items(): | ||
| seq = peprec["peptide"] | ||
| mods = peprec["modifications"] | ||
| charge = peprec["charge"] | ||
| for result in processing_results: | ||
| if not result.psm.retention_time: | ||
| raise ValueError("Retention time required to write DLIB file.") | ||
| prec_mass, prec_mz = self.mods.calc_precursor_mz(seq, mods, charge) | ||
| mod_seq = self._get_diff_modified_sequence(seq, mods, precision=precision) | ||
| spectrum = result.as_spectra()[0] | ||
| intensity_normalized = _basepeak_normalize(spectrum.intensity) * 1e4 | ||
| n_peaks = len(spectrum.mz) | ||
| all_peaks = sorted( | ||
| itertools.chain.from_iterable(self.preds_dict[spec_id]["peaks"].values()), | ||
| key=itemgetter(1), | ||
| ) | ||
| mzs = [peak[1] for peak in all_peaks] | ||
| intensities = [peak[2] for peak in all_peaks] | ||
| connection.execute( | ||
| Entry.insert().values( | ||
| PrecursorMz=prec_mz, | ||
| PrecursorCharge=charge, | ||
| PeptideModSeq=mod_seq, | ||
| PeptideSeq=seq, | ||
| dlib.Entry.insert().values( | ||
| PrecursorMz=result.psm.peptidoform.theoretical_mz, | ||
| PrecursorCharge=result.psm.get_precursor_charge(), | ||
| PeptideModSeq=DLIB._format_modified_sequence(result.psm.peptidoform), | ||
| PeptideSeq=result.psm.peptidoform.sequence, | ||
| Copies=1, | ||
| RTInSeconds=peprec["rt"], | ||
| RTInSeconds=result.psm.retention_time, | ||
| Score=0, | ||
| MassEncodedLength=len(mzs), | ||
| MassArray=mzs, | ||
| IntensityEncodedLength=len(intensities), | ||
| IntensityArray=intensities, | ||
| SourceFile=self.output_filename, | ||
| MassEncodedLength=n_peaks, | ||
| MassArray=spectrum.mz.tolist(), | ||
| IntensityEncodedLength=n_peaks, | ||
| IntensityArray=intensity_normalized.tolist(), | ||
| SourceFile=str(output_filename), | ||
| ) | ||
| ) | ||
| if self.has_protein_list: | ||
| protein_list = peprec["protein_list"] | ||
| if isinstance(protein_list, str): | ||
| protein_list = literal_eval(protein_list) | ||
| @staticmethod | ||
| def _write_peptide_to_protein(results: List[ProcessingResult], connection: engine.Connection): | ||
| """Write peptide-to-protein mappings to DLIB SQLite file.""" | ||
| peptide_to_proteins = { | ||
| (result.psm.peptidoform.sequence, protein) | ||
| for result in results | ||
| if result.psm.protein_list | ||
| for protein in result.psm.protein_list | ||
| } | ||
| for protein in protein_list: | ||
| peptide_to_proteins.add((seq, protein)) | ||
| return peptide_to_proteins | ||
| def _write_dlib_peptide_to_protein(self, connection, peptide_to_proteins): | ||
| from ms2pip._utils.dlib import PeptideToProtein | ||
| if not self.has_protein_list: | ||
| return | ||
| with connection.begin(): | ||
@@ -717,3 +724,5 @@ sql_peptide_to_proteins = set() | ||
| for peptide_to_protein in connection.execute( | ||
| PeptideToProtein.select().where(PeptideToProtein.c.ProteinAccession.in_(proteins)) | ||
| dlib.PeptideToProtein.select().where( | ||
| dlib.PeptideToProtein.c.ProteinAccession.in_(proteins) | ||
| ) | ||
| ): | ||
@@ -730,3 +739,3 @@ sql_peptide_to_proteins.add( | ||
| connection.execute( | ||
| PeptideToProtein.insert().values( | ||
| dlib.PeptideToProtein.insert().values( | ||
| PeptideSeq=seq, isDecoy=False, ProteinAccession=protein | ||
@@ -736,91 +745,27 @@ ) | ||
| @output_format("dlib") | ||
| def write_dlib(self): | ||
| """Write MS2PIP predictions to a DLIB SQLite file.""" | ||
| from ms2pip._utils.dlib import metadata, open_sqlite | ||
| normalization = "basepeak_10000" | ||
| precision = 5 | ||
| if not self.normalization == normalization: | ||
| self._normalize_spectra(method=normalization) | ||
| self._generate_preds_dict() | ||
| if not self.peprec_dict: | ||
| self._generate_peprec_dict() | ||
| if precision not in self.diff_modification_mapping: | ||
| self._generate_diff_modification_mapping(precision) | ||
| SUPPORTED_FORMATS = { | ||
| "tsv": TSV, | ||
| "msp": MSP, | ||
| "mgf": MGF, | ||
| "spectronaut": Spectronaut, | ||
| "bibliospec": Bibliospec, | ||
| "dlib": DLIB, | ||
| } | ||
| filename = "{}.dlib".format(self.output_filename) | ||
| logger.info("Writing results to %s", filename) | ||
| logger.debug( | ||
| "write mode is ignored for DLIB at the file mode, although append or not is respected" | ||
| ) | ||
| if "a" not in self.write_mode and os.path.exists(filename): | ||
| os.remove(filename) | ||
| def _peptidoform_str_without_charge(peptidoform: Peptidoform) -> str: | ||
| """Get peptidoform string without charge.""" | ||
| return re.sub(r"\/\d+$", "", str(peptidoform)) | ||
| if self.return_stringbuffer: | ||
| raise NotImplementedError("`return_stringbuffer` not implemented for DLIB output.") | ||
| if not self.has_rt: | ||
| raise NotImplementedError("Retention times required to write DLIB file.") | ||
| def _unlogarithmize(intensities: np.array) -> np.array: | ||
| """Undo logarithmic transformation of intensities.""" | ||
| return (2**intensities) - 0.001 | ||
| with open_sqlite(filename) as connection: | ||
| metadata.create_all() | ||
| self._write_dlib_metadata(connection) | ||
| peptide_to_proteins = self._write_dlib_entries(connection, precision) | ||
| self._write_dlib_peptide_to_protein(connection, peptide_to_proteins) | ||
| def get_normalized_predictions(self, normalization_method="tic"): | ||
| """Return normalized copy of predictions.""" | ||
| self._normalize_spectra(method=normalization_method) | ||
| return self.all_preds.copy() | ||
| @output_format("csv") | ||
| def write_csv(self): | ||
| """Write MS2PIP predictions to CSV.""" | ||
| self._normalize_spectra(method="tic") | ||
| # Write to file or stringbuffer | ||
| if self.return_stringbuffer: | ||
| file_object = StringIO() | ||
| logger.info("Writing results to StringIO using %s", "write_csv") | ||
| else: | ||
| f_name = "{}_predictions.csv".format(self.output_filename) | ||
| file_object = open(f_name, self.write_mode) | ||
| logger.info("Writing results to %s", f_name) | ||
| try: | ||
| self.all_preds.to_csv( | ||
| file_object, float_format="%.6g", index=False, lineterminator="\n" | ||
| ) | ||
| except TypeError: # Pandas < 1.5 (Required for Python 3.7 support) | ||
| self.all_preds.to_csv( | ||
| file_object, float_format="%.6g", index=False, line_terminator="\n" | ||
| ) | ||
| return file_object | ||
| def write_single_spectrum_csv(spectrum, filepath): | ||
| """Write a single spectrum to a CSV file.""" | ||
| with open(filepath, "wt") as f: | ||
| writer = csv.writer(f, delimiter=",", lineterminator="\n") | ||
| writer.writerow(["mz", "intensity", "annotation"]) | ||
| for mz, intensity, annotation in zip( | ||
| spectrum.mz, | ||
| spectrum.intensity, | ||
| spectrum.annotations, | ||
| ): | ||
| writer.writerow([mz, intensity, annotation]) | ||
| def write_single_spectrum_png(spectrum, filepath): | ||
| """Plot a single spectrum and write to a PNG file.""" | ||
| import matplotlib.pyplot as plt | ||
| import spectrum_utils.plot as sup | ||
| ax = plt.gca() | ||
| ax.set_title("MS²PIP prediction for " + str(spectrum.peptidoform)) | ||
| sup.spectrum(spectrum.to_spectrum_utils(), ax=ax) | ||
| plt.savefig(Path(filepath).with_suffix(".png")) | ||
| plt.close() | ||
| def _basepeak_normalize(intensities: np.array, basepeak: Optional[float] = None) -> np.array: | ||
| """Normalize intensities to most intense peak.""" | ||
| if not basepeak: | ||
| basepeak = intensities.max() | ||
| return intensities / basepeak |
@@ -75,6 +75,6 @@ """MS2 spectrum handling.""" | ||
| def check_array_lengths(cls, data: dict): | ||
| if len(data["mz"]) != len(data["intensity"]): | ||
| if len(data.mz) != len(data.intensity): | ||
| raise ValueError("Array lengths do not match.") | ||
| if data["annotations"] is not None: | ||
| if len(data["annotations"]) != len(data["intensity"]): | ||
| if data.annotations is not None: | ||
| if len(data.annotations) != len(data.intensity): | ||
| raise ValueError("Array lengths do not match.") | ||
@@ -81,0 +81,0 @@ return data |
+2
-1
| Metadata-Version: 2.1 | ||
| Name: ms2pip | ||
| Version: 4.0.0.dev12 | ||
| Version: 4.0.0.dev13 | ||
| Summary: MS2PIP: Accurate and versatile peptide fragmentation spectrum prediction. | ||
@@ -319,2 +319,3 @@ Author: Ana Sílvia C. Silva | ||
| training. | ||
| - ``annotate-spectra``: Annotate peaks in observed spectra. | ||
@@ -321,0 +322,0 @@ MS²PIP supports a wide range of PSM input formats and spectrum output formats, and includes |
+1
-0
@@ -62,2 +62,3 @@ .. image:: https://github.com/compomics/ms2pip_c/raw/releases/img/ms2pip_logo_1000px.png | ||
| training. | ||
| - ``annotate-spectra``: Annotate peaks in observed spectra. | ||
@@ -64,0 +65,0 @@ MS²PIP supports a wide range of PSM input formats and spectrum output formats, and includes |
@@ -1,100 +0,47 @@ | ||
| import os | ||
| import re | ||
| from psm_utils import Peptidoform | ||
| import numpy as np | ||
| import pandas as pd | ||
| from ms2pip.spectrum_output import MSP, Bibliospec, DLIB | ||
| from ms2pip.ms2pip_tools.spectrum_output import SpectrumOutput | ||
| class TestMSP: | ||
| def test__format_modification_string(self): | ||
| test_cases = [ | ||
| ("ACDE/2", "Mods=0"), | ||
| ("AC[Carbamidomethyl]DE/2", "Mods=1/2,C,Carbamidomethyl"), | ||
| ("[Glu->pyro-Glu]-EPEPTIDEK/2", "Mods=1/0,E,Glu->pyro-Glu"), | ||
| ("PEPTIDEK-[Amidated]/2", "Mods=1/-1,K,Amidated"), | ||
| ("AM[Oxidation]C[Carbamidomethyl]DE/2", "Mods=2/2,M,Oxidation/3,C,Carbamidomethyl"), | ||
| ] | ||
| TEST_DIR = os.path.dirname(__file__) | ||
| for peptidoform_str, expected_output in test_cases: | ||
| peptidoform = Peptidoform(peptidoform_str) | ||
| assert MSP._format_modifications(peptidoform) == expected_output | ||
| class TestSpectrumOutput: | ||
| def test_integration(self): | ||
| def compare_line(test_line, target_line): | ||
| """Assert if two lines in spectrum output are the same.""" | ||
| # Extract float values from line and use assert_allclose, to allow for | ||
| # float imprecisions | ||
| float_pattern = re.compile(r"[0-9]*[.][0-9]+") | ||
| test_floats = float_pattern.findall(test_line) | ||
| target_floats = float_pattern.findall(target_line) | ||
| assert len(test_floats) == len(target_floats) | ||
| [ | ||
| np.testing.assert_allclose(float(te), float(ta), rtol=1e-5) | ||
| for te, ta in zip(test_floats, target_floats) | ||
| ] | ||
| assert float_pattern.sub(test_line, "") == float_pattern.sub( | ||
| target_line, "" | ||
| ) | ||
| peprec = pd.read_pickle( | ||
| os.path.join(TEST_DIR, "test_data/spectrum_output/input_peprec.pkl") | ||
| ) | ||
| all_preds = pd.read_pickle( | ||
| os.path.join(TEST_DIR, "test_data/spectrum_output/input_preds.pkl") | ||
| ) | ||
| params = { | ||
| "ptm": [ | ||
| "Oxidation,15.994915,opt,M", | ||
| "Carbamidomethyl,57.021464,opt,C", | ||
| "Glu->pyro-Glu,-18.010565,opt,E", | ||
| "Gln->pyro-Glu,-17.026549,opt,Q", | ||
| "Acetyl,42.010565,opt,N-term", | ||
| ], | ||
| "sptm": [], | ||
| "gptm": [], | ||
| "model": "HCD", | ||
| "frag_error": "0.02", | ||
| "out": "csv", | ||
| } | ||
| peprec_tmp = peprec.sample(5, random_state=10).copy() | ||
| all_preds_tmp = all_preds[ | ||
| all_preds["spec_id"].isin(peprec_tmp["spec_id"]) | ||
| ].copy() | ||
| so = SpectrumOutput( | ||
| all_preds_tmp, | ||
| peprec_tmp, | ||
| params, | ||
| output_filename="test", | ||
| return_stringbuffer=True, | ||
| ) | ||
| target_filename_base = os.path.join( | ||
| TEST_DIR, "test_data/spectrum_output/target" | ||
| ) | ||
| # Test general output | ||
| class TestBiblioSpec: | ||
| def test__format_modified_sequence(self): | ||
| test_cases = [ | ||
| (so.write_mgf, "_predictions.mgf"), | ||
| (so.write_msp, "_predictions.msp"), | ||
| (so.write_spectronaut, "_predictions_spectronaut.csv"), | ||
| ("ACDE/2", "ACDE"), | ||
| ("AC[Carbamidomethyl]DE/2", "AC[+57.0]DE"), | ||
| ("[Glu->pyro-Glu]-EPEPTIDEK/2", "E[-18.0]PEPTIDEK"), | ||
| ("PEPTIDEK-[Amidated]/2", "PEPTIDEK[-1.0]"), | ||
| ("AM[Oxidation]C[Carbamidomethyl]DE/2", "AM[+16.0]C[+57.0]DE"), | ||
| ] | ||
| for test_function, file_ext in test_cases: | ||
| test = test_function() | ||
| test.seek(0) | ||
| with open(target_filename_base + file_ext) as target: | ||
| for test_line, target_line in zip(test.readlines(), target.readlines()): | ||
| compare_line(test_line, target_line) | ||
| for peptidoform_str, expected_output in test_cases: | ||
| peptidoform = Peptidoform(peptidoform_str) | ||
| assert Bibliospec._format_modified_sequence(peptidoform) == expected_output | ||
| # Test bibliospec output | ||
| bibliospec_ssl, bibliospec_ms2 = so.write_bibliospec() | ||
| class TestDLIB: | ||
| def test__format_modified_sequence(self): | ||
| test_cases = [ | ||
| (bibliospec_ssl, "_predictions.ssl"), | ||
| (bibliospec_ms2, "_predictions.ms2"), | ||
| ("ACDE/2", "ACDE"), | ||
| ("AC[Carbamidomethyl]DE/2", "AC[+57.021464]DE"), | ||
| ("[Glu->pyro-Glu]-EPEPTIDEK/2", "E[-18.010565]PEPTIDEK"), | ||
| ("PEPTIDEK-[Amidated]/2", "PEPTIDEK[-0.984016]"), | ||
| ("AM[Oxidation]C[Carbamidomethyl]DE/2", "AM[+15.994915]C[+57.021464]DE"), | ||
| ] | ||
| for test, file_ext in test_cases: | ||
| test.seek(0) | ||
| with open(target_filename_base + file_ext) as target: | ||
| for test_line, target_line in zip(test.readlines(), target.readlines()): | ||
| test_line = test_line.replace( | ||
| "test_predictions.ms2", "target_predictions.ms2" | ||
| ) | ||
| if not "CreationDate" in target_line: | ||
| compare_line(test_line, target_line) | ||
| for test_in, expected_out in test_cases: | ||
| assert DLIB._format_modified_sequence(Peptidoform(test_in)) == expected_out |
| """Tests for fasta2speclib.""" | ||
| from pyteomics.fasta import Protein | ||
| from fasta2speclib.fasta2speclib import Fasta2SpecLib, ModificationConfig, Peptide | ||
| MODIFICATION_CONFIG = [ | ||
| { | ||
| "name": "Oxidation", | ||
| "mass_shift": 15.9994, | ||
| "amino_acid": "M", | ||
| }, | ||
| { | ||
| "name": "Carbamidomethyl", | ||
| "mass_shift": 57.0513, | ||
| "amino_acid": "C", | ||
| "fixed": True, | ||
| }, | ||
| { | ||
| "name": "Glu->pyro-Glu", | ||
| "mass_shift": -18.010565, | ||
| "amino_acid": "E", | ||
| "peptide_n_term": True, | ||
| }, | ||
| { | ||
| "name": "Acetyl", | ||
| "mass_shift": 42.010565, | ||
| "amino_acid": None, | ||
| "protein_n_term": True, | ||
| }, | ||
| ] | ||
| MODIFICATION_CONFIG = [ModificationConfig(**mod) for mod in MODIFICATION_CONFIG] | ||
| def test_get_modifications_by_target(): | ||
| modifications_by_target = Fasta2SpecLib._get_modifications_by_target(MODIFICATION_CONFIG) | ||
| assert modifications_by_target["sidechain"] == {"M": [None] + MODIFICATION_CONFIG[0:1]} | ||
| assert modifications_by_target["peptide_n_term"] == {"E": [None] + MODIFICATION_CONFIG[2:3]} | ||
| assert modifications_by_target["peptide_c_term"] == {} | ||
| assert modifications_by_target["protein_n_term"] == {"any": [None] + MODIFICATION_CONFIG[3:4]} | ||
| assert modifications_by_target["protein_c_term"] == {} | ||
| def test_get_modification_versions(): | ||
| modification_config = [ | ||
| ModificationConfig( | ||
| **{ | ||
| "name": "Oxidation", | ||
| "mass_shift": 15.9994, | ||
| "amino_acid": "M", | ||
| } | ||
| ), | ||
| ModificationConfig( | ||
| **{ | ||
| "name": "Carbamidomethyl", | ||
| "mass_shift": 57.0513, | ||
| "amino_acid": "C", | ||
| "fixed": True, | ||
| } | ||
| ), | ||
| ModificationConfig( | ||
| **{ | ||
| "name": "Glu->pyro-Glu", | ||
| "mass_shift": -18.010565, | ||
| "amino_acid": "E", | ||
| "protein_n_term": True, | ||
| } | ||
| ), | ||
| ] | ||
| modifications_by_target = Fasta2SpecLib._get_modifications_by_target(modification_config) | ||
| test_cases = [ | ||
| ("ADEF", {""}), # None | ||
| ("ACDE", {"2|Carbamidomethyl"}), # Single fixed | ||
| ("ACCDE", {"2|Carbamidomethyl|3|Carbamidomethyl"}), # Double fixed | ||
| ("ADME", {"", "3|Oxidation"}), # Single variable | ||
| ( | ||
| "ADMME", | ||
| {"", "3|Oxidation", "4|Oxidation", "3|Oxidation|4|Oxidation"}, | ||
| ), # Double variable | ||
| ( | ||
| "ADMMME", | ||
| { | ||
| "", | ||
| "3|Oxidation", | ||
| "4|Oxidation", | ||
| "5|Oxidation", | ||
| "3|Oxidation|4|Oxidation", | ||
| "4|Oxidation|5|Oxidation", | ||
| "3|Oxidation|5|Oxidation", | ||
| }, | ||
| ), # More than maximum simultaneous mods should be ignored | ||
| ("EDEF", {"", "0|Glu->pyro-Glu"}), # N-term and AA-specific | ||
| ] | ||
| for peptide, expected_output in test_cases: | ||
| output = Fasta2SpecLib._get_modification_versions( | ||
| Peptide(sequence=peptide, is_n_term=True, proteins=[]), | ||
| modification_config, | ||
| modifications_by_target, | ||
| max_variable_modifications=2, | ||
| ) | ||
| assert set(output) == expected_output | ||
| def test_digest_protein(): | ||
| test_input = { | ||
| "protein": Protein( | ||
| description="P12345", | ||
| sequence="MYSSCSLLQRLVWFPFLALVATQLLFIRNVSSLNLTNEYLHHKCLVSEGKYKPGSKYEYI", | ||
| ), | ||
| "min_length": 8, | ||
| "max_length": 30, | ||
| "cleavage_rule": "trypsin", | ||
| "missed_cleavages": 2, | ||
| "semi_specific": False, | ||
| } | ||
| test_output = [ | ||
| Peptide( | ||
| sequence="MYSSCSLLQR", | ||
| proteins=["P12345"], | ||
| modification_options=None, | ||
| is_n_term=True, | ||
| is_c_term=False, | ||
| ), | ||
| Peptide( | ||
| sequence="MYSSCSLLQRLVWFPFLALVATQLLFIR", | ||
| proteins=["P12345"], | ||
| modification_options=None, | ||
| is_n_term=True, | ||
| is_c_term=False, | ||
| ), | ||
| Peptide( | ||
| sequence="LVWFPFLALVATQLLFIR", | ||
| proteins=["P12345"], | ||
| modification_options=None, | ||
| is_n_term=False, | ||
| is_c_term=False, | ||
| ), | ||
| Peptide( | ||
| sequence="NVSSLNLTNEYLHHK", | ||
| proteins=["P12345"], | ||
| modification_options=None, | ||
| is_n_term=False, | ||
| is_c_term=False, | ||
| ), | ||
| Peptide( | ||
| sequence="NVSSLNLTNEYLHHKCLVSEGK", | ||
| proteins=["P12345"], | ||
| modification_options=None, | ||
| is_n_term=False, | ||
| is_c_term=False, | ||
| ), | ||
| Peptide( | ||
| sequence="NVSSLNLTNEYLHHKCLVSEGKYKPGSK", | ||
| proteins=["P12345"], | ||
| modification_options=None, | ||
| is_n_term=False, | ||
| is_c_term=False, | ||
| ), | ||
| Peptide( | ||
| sequence="CLVSEGKYKPGSK", | ||
| proteins=["P12345"], | ||
| modification_options=None, | ||
| is_n_term=False, | ||
| is_c_term=False, | ||
| ), | ||
| Peptide( | ||
| sequence="CLVSEGKYKPGSKYEYI", | ||
| proteins=["P12345"], | ||
| modification_options=None, | ||
| is_n_term=False, | ||
| is_c_term=True, | ||
| ), | ||
| Peptide( | ||
| sequence="YKPGSKYEYI", | ||
| proteins=["P12345"], | ||
| modification_options=None, | ||
| is_n_term=False, | ||
| is_c_term=True, | ||
| ), | ||
| ] | ||
| assert test_output == Fasta2SpecLib._digest_protein(**test_input) |
| import os | ||
| import pandas as pd | ||
| from ms2pip.ms2pipC import MS2PIP | ||
| TEST_DIR = os.path.dirname(__file__) | ||
| def run_ms2pip(): | ||
| """Run ms2pipC to predict peak intensities from a PEPREC file (HCD model). """ | ||
| params = { | ||
| "ms2pip": { | ||
| "ptm": [ | ||
| "Oxidation,15.994915,opt,M", | ||
| "Carbamidomethyl,57.021464,opt,C", | ||
| "Acetyl,42.010565,opt,N-term", | ||
| ], | ||
| "sptm": [], | ||
| "gptm": [], | ||
| "frag_method": "HCD2019", | ||
| "frag_error": 0.02, | ||
| "out": "csv", | ||
| } | ||
| } | ||
| ms2pip = MS2PIP(os.path.join(TEST_DIR, "test_data/test.peprec"), params=params) | ||
| ms2pip.run() | ||
| test_data = pd.read_csv( | ||
| os.path.join(TEST_DIR, "test_data/test_HCD2019_predictions.csv") | ||
| ) | ||
| target_data = pd.read_csv( | ||
| os.path.join(TEST_DIR, "test_data/target_HCD2019_predictions.csv") | ||
| ) | ||
| pepfile = pd.read_csv( | ||
| os.path.join(TEST_DIR, "test_data/test.peprec"), | ||
| sep=" ", | ||
| index_col=False, | ||
| dtype={"spec_id": str, "modifications": str}, | ||
| ) | ||
| return test_data, target_data, pepfile | ||
| TEST_DATA, TARGET_DATA, PEPFILE = run_ms2pip() | ||
| class TestPredictions: | ||
| def test_all_spec(self): | ||
| assert set(TEST_DATA.spec_id.unique()) == set(PEPFILE.spec_id) | ||
| def test_amount_peaks(self): | ||
| for pep in ["peptide1", "peptide2", "peptide3"]: | ||
| peplen = len(PEPFILE[PEPFILE.spec_id == pep].peptide.values[0]) | ||
| assert len(TEST_DATA[TEST_DATA.spec_id == pep]) == (2 * peplen) - 2 | ||
| def test_peak_ints_b(self): | ||
| for pep in TARGET_DATA.spec_id.unique(): | ||
| tmp_test = TEST_DATA[TEST_DATA.spec_id == pep] | ||
| tmp_test = tmp_test[tmp_test.ion == "b"] | ||
| tmp_target = TARGET_DATA[TARGET_DATA.spec_id == pep] | ||
| tmp_target = tmp_target[tmp_target.ion == "b"] | ||
| for no in tmp_target.ionnumber: | ||
| assert ( | ||
| tmp_test[tmp_test.ionnumber == no]["prediction"].values[0] | ||
| == tmp_target[tmp_target.ionnumber == no]["prediction"].values[0] | ||
| ) | ||
| def test_peak_ints_y(self): | ||
| for pep in TARGET_DATA.spec_id.unique(): | ||
| tmp_test = TEST_DATA[TEST_DATA.spec_id == pep] | ||
| tmp_test = tmp_test[tmp_test.ion == "y"] | ||
| tmp_target = TARGET_DATA[TARGET_DATA.spec_id == pep] | ||
| tmp_target = tmp_target[tmp_target.ion == "y"] | ||
| for no in tmp_target.ionnumber: | ||
| assert ( | ||
| tmp_test[tmp_test.ionnumber == no]["prediction"].values[0] | ||
| == tmp_target[tmp_target.ionnumber == no]["prediction"].values[0] | ||
| ) |
| import os | ||
| import numpy as np | ||
| import pandas as pd | ||
| from ms2pip.retention_time import RetentionTime | ||
| from ms2pip.config_parser import ConfigParser | ||
| TEST_DIR = os.path.dirname(__file__) | ||
| class TestRetentionTime: | ||
| def test_prepare_deeplc_peptide_df(self): | ||
| peprec = pd.read_csv(os.path.join(TEST_DIR, "test_data/test.peprec"), sep=" ") | ||
| config = { | ||
| "deeplc": { | ||
| "calibration_file": False, | ||
| "verbose": False, | ||
| "path_model": False, | ||
| "split_cal": 25, | ||
| "batch_num": 350000, | ||
| } | ||
| } | ||
| rt_predictor = RetentionTime(config=config) | ||
| rt_predictor.peprec = peprec | ||
| rt_predictor._prepare_deeplc_peptide_df() | ||
| dlc_df = rt_predictor.deeplc_pep_df | ||
| assert dlc_df.equals( | ||
| pd.DataFrame({ | ||
| "seq": {0: "ACDE", 1: "ACDEFGHI", 2: "ACDEFGHIKMNPQ"}, | ||
| "modifications": {0: np.nan, 1: np.nan, 2: np.nan}, | ||
| }) | ||
| ) |
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
35499008
0.06%3880
14.79%