asimov
Advanced tools
| """ | ||
| This module contains logic to assist with various | ||
| authorisation tasks against the open science grid. | ||
| This will ultimately be a good candidate to spin-out | ||
| into its own package. | ||
| """ | ||
| import subprocess | ||
| import configparser | ||
| import logging | ||
| from asimov import config | ||
| def refresh_scitoken(func): | ||
| """ | ||
| Decorator to refresh an existing scitoken. | ||
| """ | ||
| def wrapper(*args, **kwargs): | ||
| logger = logging.getLogger("asimov").getChild("auth") | ||
| try: | ||
| command = ["kinit"] + [config.get("authentication", "kinit options")] | ||
| pipe = subprocess.Popen( | ||
| command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT | ||
| ) | ||
| logger.info(" ".join(command)) | ||
| out, err = pipe.communicate() | ||
| logger.info(out) | ||
| if err and len(err) > 0: | ||
| logger.error(err) | ||
| command = ["htgettoken"] + [ | ||
| config.get("authentication", "htgettoken options") | ||
| ] | ||
| pipe = subprocess.Popen( | ||
| command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT | ||
| ) | ||
| logger.info(" ".join(command)) | ||
| out, err = pipe.communicate() | ||
| logger.info(out) | ||
| if err and len(err) > 0: | ||
| logger.error(err) | ||
| command = ["condor_vault_storer"] + [config.get("authentication", "scopes")] | ||
| pipe = subprocess.Popen( | ||
| command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT | ||
| ) | ||
| logger.info(" ".join(command)) | ||
| out, err = pipe.communicate() | ||
| logger.info(out) | ||
| if err and len(err) > 0: | ||
| logger.error(err) | ||
| except configparser.NoSectionError: | ||
| # If authentication isn't set up then just ignore this. | ||
| pass | ||
| func(*args, **kwargs) | ||
| return wrapper |
| # This file isn't actually required except by asimov. |
| """Defines the interface with generic analysis pipelines.""" | ||
| import os | ||
| import warnings | ||
| warnings.filterwarnings("ignore", module="htcondor") | ||
| import htcondor # NoQA | ||
| from asimov import utils # NoQA | ||
| from asimov import config, logger, logging, LOGGER_LEVEL # NoQA | ||
| import otter # NoQA | ||
| from ..storage import Store # NoQA | ||
| from ..pipeline import Pipeline, PipelineException, PipelineLogger # NoQA | ||
| class PESummary(Pipeline): | ||
| """ | ||
| A postprocessing pipeline add-in using PESummary. | ||
| """ | ||
| executable = os.path.join( | ||
| config.get("pipelines", "environment"), "bin", "summarypages" | ||
| ) | ||
| name = "PESummary" | ||
| def __init__(self, production, category=None): | ||
| self.production = production | ||
| self.category = category if category else production.category | ||
| self.logger = logger | ||
| self.meta = self.production.meta["postprocessing"][self.name.lower()] | ||
| def results(self): | ||
| """ | ||
| Fetch the results file from this post-processing step. | ||
| A dictionary of results will be returned with the description | ||
| of each results file as the key. These may be nested if it | ||
| makes sense for the output, for example skymaps. | ||
| For example | ||
| {'metafile': '/home/asimov/working/samples/metafile.hd5', | ||
| 'skymaps': {'H1': '/another/file/path', ...} | ||
| } | ||
| Returns | ||
| ------- | ||
| dict | ||
| A dictionary of the results. | ||
| """ | ||
| self.outputs = os.path.join( | ||
| config.get("project", "root"), | ||
| config.get("general", "webroot"), | ||
| self.subject.name, | ||
| ) | ||
| self.outputs = os.path.join(self.outputs, self.production.name) | ||
| self.outputs = os.path.join(self.outputs, "pesummary") | ||
| metafile = os.path.join(self.outputs, "samples", "posterior_samples.h5") | ||
| return dict(metafile=metafile) | ||
| def submit_dag(self, dryrun=False): | ||
| """ | ||
| Run PESummary on the results of this job. | ||
| """ | ||
| configfile = self.production.event.repository.find_prods( | ||
| self.production.name, self.category | ||
| )[0] | ||
| label = str(self.production.name) | ||
| command = [ | ||
| "--webdir", | ||
| os.path.join( | ||
| config.get("project", "root"), | ||
| config.get("general", "webroot"), | ||
| self.production.event.name, | ||
| self.production.name, | ||
| "pesummary", | ||
| ), | ||
| "--labels", | ||
| label, | ||
| ] | ||
| command += ["--gw"] | ||
| command += [ | ||
| "--approximant", | ||
| self.production.meta["waveform"]["approximant"], | ||
| ] | ||
| command += [ | ||
| "--f_low", | ||
| str(min(self.production.meta["quality"]["minimum frequency"].values())), | ||
| "--f_ref", | ||
| str(self.production.meta["waveform"]["reference frequency"]), | ||
| ] | ||
| if "cosmology" in self.meta: | ||
| command += [ | ||
| "--cosmology", | ||
| self.meta["cosmology"], | ||
| ] | ||
| if "redshift" in self.meta: | ||
| command += ["--redshift_method", self.meta["redshift"]] | ||
| if "skymap samples" in self.meta: | ||
| command += [ | ||
| "--nsamples_for_skymap", | ||
| str(self.meta["skymap samples"]), | ||
| ] | ||
| if "evolve spins" in self.meta: | ||
| if "forwards" in self.meta["evolve spins"]: | ||
| command += ["--evolve_spins_fowards", "True"] | ||
| if "backwards" in self.meta["evolve spins"]: | ||
| command += ["--evolve_spins_backwards", "precession_averaged"] | ||
| if "nrsur" in self.production.meta["waveform"]["approximant"].lower(): | ||
| command += ["--NRSur_fits"] | ||
| if "multiprocess" in self.meta: | ||
| command += ["--multi_process", str(self.meta["multiprocess"])] | ||
| if "regenerate" in self.meta: | ||
| command += ["--regenerate", " ".join(self.meta["regenerate posteriors"])] | ||
| if "calculate" in self.meta: | ||
| if "precessing snr" in self.meta["calculate"]: | ||
| command += ["--calculate_precessing_snr"] | ||
| # Config file | ||
| command += [ | ||
| "--config", | ||
| os.path.join( | ||
| self.production.event.repository.directory, self.category, configfile | ||
| ), | ||
| ] | ||
| # Samples | ||
| command += ["--samples"] | ||
| command += [self.production._previous_assets().get("samples", {})] | ||
| # PSDs | ||
| psds = { | ||
| ifo: os.path.abspath(psd) | ||
| for ifo, psd in self.production._previous_assets().get("psds", {}).items() | ||
| } | ||
| if len(psds) > 0: | ||
| command += ["--psds"] | ||
| for key, value in psds.items(): | ||
| command += [f"{key}:{value}"] | ||
| # Calibration envelopes | ||
| cals = { | ||
| ifo: os.path.abspath(psd) | ||
| for ifo, psd in self.production._previous_assets() | ||
| .get("calibration", {}) | ||
| .items() | ||
| } | ||
| if len(cals) > 0: | ||
| command += ["--calibration"] | ||
| for key, value in cals.items(): | ||
| command += [f"{key}:{value}"] | ||
| with utils.set_directory(self.subject.work_dir): | ||
| with open("pesummary.sh", "w") as bash_file: | ||
| bash_file.write(f"{self.executable} " + " ".join(command)) | ||
| self.logger.info( | ||
| f"PE summary command: {self.executable} {' '.join(command)}", | ||
| ) | ||
| if dryrun: | ||
| print("PESUMMARY COMMAND") | ||
| print("-----------------") | ||
| print(" ".join(command)) | ||
| self.subject = self.production.event | ||
| submit_description = { | ||
| "executable": self.executable, | ||
| "arguments": " ".join(command), | ||
| "output": f"{self.subject.work_dir}/pesummary.out", | ||
| "error": f"{self.subject.work_dir}/pesummary.err", | ||
| "log": f"{self.subject.work_dir}/pesummary.log", | ||
| "request_cpus": self.meta["multiprocess"], | ||
| "getenv": "true", | ||
| "batch_name": f"Summary Pages/{self.subject.name}/{self.production.name}", | ||
| "request_memory": "8192MB", | ||
| "should_transfer_files": "YES", | ||
| "request_disk": "8192MB", | ||
| } | ||
| if "accounting group" in self.meta: | ||
| submit_description["accounting_group_user"] = config.get("condor", "user") | ||
| submit_description["accounting_group"] = self.meta["accounting group"] | ||
| if dryrun: | ||
| print("SUBMIT DESCRIPTION") | ||
| print("------------------") | ||
| print(submit_description) | ||
| if not dryrun: | ||
| hostname_job = htcondor.Submit(submit_description) | ||
| try: | ||
| # There should really be a specified submit node, and if there is, use it. | ||
| schedulers = htcondor.Collector().locate( | ||
| htcondor.DaemonTypes.Schedd, config.get("condor", "scheduler") | ||
| ) | ||
| schedd = htcondor.Schedd(schedulers) | ||
| except: # NoQA | ||
| # If you can't find a specified scheduler, use the first one you find | ||
| schedd = htcondor.Schedd() | ||
| with schedd.transaction() as txn: | ||
| cluster_id = hostname_job.queue(txn) | ||
| else: | ||
| cluster_id = 0 | ||
| return cluster_id |
| .. _citation-guide: | ||
| Citation Guide | ||
| ============== | ||
| If you used asimov in part of your analysis, or it contributed towards results in a paper we'd appreciate it if you would cite the original paper which describes asimov, which was published in the Journal of Open Source Software. | ||
| Williams, D. et al., "Asimov: A framework for coordinating parameter estimation workflows", *The Journal of Open Source Software*, vol. 8, no. 84, Art. no. 4170, 2023. doi:10.21105/joss.04170. | ||
| BibTeX Entry | ||
| ------------ | ||
| .. code-block:: bibtex | ||
| @ARTICLE{asimov-paper, | ||
| author = {{Williams}, Daniel and {Veitch}, John and {Chiofalo}, Maria and {Schmidt}, Patricia and {Udall}, Rhiannon and {Vajpeyi}, Avi and {Hoy}, Charlie}, | ||
| title = "{Asimov: A framework for coordinating parameter estimation workflows}", | ||
| journal = {The Journal of Open Source Software}, | ||
| keywords = {Python, astronomy, gravitational waves, General Relativity and Quantum Cosmology, Physics - Data Analysis, Statistics and Probability}, | ||
| year = 2023, | ||
| month = apr, | ||
| volume = {8}, | ||
| number = {84}, | ||
| eid = {4170}, | ||
| pages = {4170}, | ||
| doi = {10.21105/joss.04170}, | ||
| archivePrefix = {arXiv}, | ||
| eprint = {2207.01468}, | ||
| primaryClass = {gr-qc}, | ||
| } | ||
| Citing specific versions of the software | ||
| ---------------------------------------- | ||
| We use Zenodo to publish releases, and each release receives a DOI which can be cited. | ||
| These should be cited, where relevant, in addition to the paper above, in the format: | ||
| Daniel Williams, Duncan Macleod, Avi Vajpeyi, James Clark, & Richard O'Shaughnessy. (2024). transientlunatic/asimov: v0.5.8 (v0.5.8). Zenodo. https://doi.org/10.5281/zenodo.13992299 | ||
| (See `this page<https://doi.org/10.5281/zenodo.4024432>`_ for the latest version, and links to previous versions' DOIs, where links to BibTeX snippets for relevant versions are also available). |
| data: | ||
| channels: | ||
| H1: H1:DCS-CALIB_STRAIN_C02 | ||
| L1: L1:DCS-CALIB_STRAIN_C02 | ||
| frame types: | ||
| H1: H1_HOFT_C02 | ||
| L1: L1_HOFT_C02 | ||
| segment length: 4 | ||
| event time: 1126259462.391 | ||
| interferometers: | ||
| - H1 | ||
| - L1 | ||
| kind: event | ||
| likelihood: | ||
| psd length: 4 | ||
| reference frequency: 20 | ||
| sample rate: 2048 | ||
| segment start: 1126259460.391 | ||
| start frequency: 13.333333333333334 | ||
| window length: 4 | ||
| name: GW150914_095045 | ||
| priors: | ||
| amplitude order: 1 | ||
| chirp mass: | ||
| maximum: 41.97447913941358 | ||
| minimum: 21.418182160215295 | ||
| luminosity distance: | ||
| maximum: 10000 | ||
| minimum: 10 | ||
| mass 1: | ||
| maximum: 1000 | ||
| minimum: 1 | ||
| mass ratio: | ||
| maximum: 1.0 | ||
| minimum: 0.05 | ||
| quality: | ||
| minimum frequency: | ||
| H1: 20 | ||
| L1: 20 | ||
| --- | ||
| data: | ||
| channels: | ||
| H1: H1:DCS-CALIB_STRAIN_C02 | ||
| L1: L1:DCS-CALIB_STRAIN_C02 | ||
| frame types: | ||
| H1: H1_HOFT_C02 | ||
| L1: L1_HOFT_C02 | ||
| segment length: 8 | ||
| event time: 1128678900.444336 | ||
| interferometers: | ||
| - H1 | ||
| - L1 | ||
| kind: event | ||
| likelihood: | ||
| psd length: 8 | ||
| reference frequency: 20 | ||
| sample rate: 2048 | ||
| segment start: 1128678894.444336 | ||
| start frequency: 13.333333333333334 | ||
| window length: 8 | ||
| name: GW151012_095443 | ||
| priors: | ||
| amplitude order: 1 | ||
| chirp mass: | ||
| maximum: 30.55016955177058 | ||
| minimum: 11.331236548800117 | ||
| luminosity distance: | ||
| maximum: 10000 | ||
| minimum: 10 | ||
| mass 1: | ||
| maximum: 1000 | ||
| minimum: 1 | ||
| mass ratio: | ||
| maximum: 1.0 | ||
| minimum: 0.05 | ||
| quality: | ||
| minimum frequency: | ||
| H1: 20 | ||
| L1: 20 |
| kind: ProjectAnalysis | ||
| name: TestPA | ||
| pipeline: TestPipeline | ||
| subjects: | ||
| - GW150914_095045 | ||
| - GW151012_095443 |
| kind: analysis | ||
| name: Prod0 | ||
| event: GW150914_095045 | ||
| pipeline: bayeswave | ||
| --- | ||
| kind: analysis | ||
| name: Prod1 | ||
| event: GW150914_095045 | ||
| pipeline: bilby | ||
| waveform: | ||
| approximant: IMRPhenomXPHM | ||
| --- | ||
| kind: analysis | ||
| name: Prod0 | ||
| event: GW151226_033853 | ||
| pipeline: bayeswave | ||
| --- | ||
| kind: analysis | ||
| name: Prod1 | ||
| event: GW151226_033853 | ||
| pipeline: bilby | ||
| waveform: | ||
| approximant: IMRPhenomXPHM | ||
| --- | ||
| kind: analysis | ||
| name: Prod2 | ||
| event: GW151226_033853 | ||
| pipeline: lalinference | ||
| waveform: | ||
| approximant: SEOBNRv4PHM | ||
| status: uploaded | ||
| --- | ||
| kind: analysis | ||
| name: Prod3 | ||
| event: GW151226_033853 | ||
| pipeline: lalinference | ||
| waveform: | ||
| approximant: SEOBNRv4PHM | ||
| status: uploaded | ||
| review: | ||
| - status: approved | ||
| message: "blah" | ||
| --- | ||
| kind: analysis | ||
| name: Prod4 | ||
| event: GW151226_033853 | ||
| pipeline: lalinference | ||
| waveform: | ||
| approximant: IMRPhenomXPHM | ||
| status: uploaded | ||
| review: | ||
| - status: approved | ||
| message: "blah" |
| kind: ProjectAnalysis | ||
| # This kind of analysis is used where multiple events need to be | ||
| # analysed jointly. | ||
| name: JointTest | ||
| subjects: | ||
| - GW150914_095045 | ||
| - GW151226_033853 | ||
| analyses: | ||
| - pipeline:bayeswave | ||
| pipeline: asimov-testing-project | ||
| --- | ||
| kind: ProjectAnalysis | ||
| # This kind of analysis is used where multiple events need to be | ||
| # analysed jointly. | ||
| name: JointTestWaveform | ||
| subjects: | ||
| - GW150914_095045 | ||
| - GW151226_033853 | ||
| analyses: | ||
| - waveform.approximant:IMRPhenomXPHM | ||
| pipeline: asimov-testing-project | ||
| --- | ||
| kind: ProjectAnalysis | ||
| # This kind of analysis is used where multiple events need to be | ||
| # analysed jointly. | ||
| name: JointTestStatus | ||
| subjects: | ||
| - GW150914_095045 | ||
| - GW151226_033853 | ||
| analyses: | ||
| - status:uploaded | ||
| pipeline: asimov-testing-project | ||
| --- | ||
| kind: ProjectAnalysis | ||
| # This kind of analysis is used where multiple events need to be | ||
| # analysed jointly. | ||
| name: JointTestReview | ||
| subjects: | ||
| - GW150914_095045 | ||
| - GW151226_033853 | ||
| analyses: | ||
| - review:approved | ||
| pipeline: asimov-testing-project | ||
| --- | ||
| kind: ProjectAnalysis | ||
| # This kind of analysis is used where multiple events need to be | ||
| # analysed jointly. | ||
| name: JointTestReview2 | ||
| subjects: | ||
| - GW150914_095045 | ||
| - GW151226_033853 | ||
| analyses: | ||
| - review:approved | ||
| - waveform.approximant:IMRPhenomXPHM | ||
| pipeline: asimov-testing-project |
| kind: postprocessing | ||
| name: standard pe postprocessing | ||
| stages: | ||
| - name: simple PE summary | ||
| pipeline: pesummary | ||
| - name: less simple PE summary | ||
| pipeline: pesummary | ||
| needs: | ||
| - simple PE summary | ||
| --- | ||
| kind: analysis | ||
| name: analysis with postprocessing | ||
| event: GW150914_095045 | ||
| postprocessing: | ||
| - standard pe postprocessing | ||
| pipeline: bilby | ||
| --- | ||
| kind: postprocessing | ||
| name: combined summary pages for bilby | ||
| subject: all | ||
| analyses: | ||
| - pipeline:bilby | ||
| stages: | ||
| - name: combined pages | ||
| pipeline: pesummary |
| kind: analysis | ||
| name: Prod0 | ||
| pipeline: bayeswave | ||
| comment: Bayeswave on-source PSD estimation job | ||
| event: GW150914_095045 | ||
| status: uploaded | ||
| --- | ||
| kind: analysis | ||
| name: Prod1 | ||
| event: GW150914_095045 | ||
| pipeline: bilby | ||
| approximant: IMRPhenomXPHM | ||
| comment: Bilby parameter estimation job | ||
| needs: | ||
| - pipeline:Bayeswave |
| data: | ||
| channels: | ||
| H1: H1:DCS-CALIB_STRAIN_C02 | ||
| L1: L1:DCS-CALIB_STRAIN_C02 | ||
| frame types: | ||
| H1: H1_HOFT_C02 | ||
| L1: L1_HOFT_C02 | ||
| segment length: 4 | ||
| event time: 1126259462.391 | ||
| interferometers: | ||
| - H1 | ||
| - L1 | ||
| kind: event | ||
| likelihood: | ||
| psd length: 4 | ||
| reference frequency: 20 | ||
| sample rate: 2048 | ||
| segment start: 1126259460.391 | ||
| start frequency: 13.333333333333334 | ||
| window length: 4 | ||
| name: GW150914_095045 | ||
| priors: | ||
| amplitude order: 1 | ||
| chirp mass: | ||
| maximum: 41.97447913941358 | ||
| minimum: 21.418182160215295 | ||
| luminosity distance: | ||
| maximum: 10000 | ||
| minimum: 10 | ||
| mass 1: | ||
| maximum: 1000 | ||
| minimum: 1 | ||
| mass ratio: | ||
| maximum: 1.0 | ||
| minimum: 0.05 | ||
| quality: | ||
| minimum frequency: | ||
| H1: 20 | ||
| L1: 20 | ||
| --- | ||
| data: | ||
| channels: | ||
| H1: H1:DCS-CALIB_STRAIN_C02 | ||
| L1: L1:DCS-CALIB_STRAIN_C02 | ||
| frame types: | ||
| H1: H1_HOFT_C02 | ||
| L1: L1_HOFT_C02 | ||
| segment length: 16 | ||
| event time: 1135136350.647758 | ||
| interferometers: | ||
| - H1 | ||
| - L1 | ||
| kind: event | ||
| likelihood: | ||
| psd length: 16 | ||
| reference frequency: 20 | ||
| sample rate: 4096 | ||
| segment start: 1135136336.647758 | ||
| start frequency: 13.333333333333334 | ||
| window length: 16 | ||
| name: GW151226_033853 | ||
| priors: | ||
| amplitude order: 1 | ||
| chirp mass: | ||
| maximum: 11.825459113009993 | ||
| minimum: 7.60927765774096 | ||
| luminosity distance: | ||
| maximum: 10000 | ||
| minimum: 100 | ||
| mass 1: | ||
| maximum: 1000 | ||
| minimum: 1 | ||
| mass ratio: | ||
| maximum: 1.0 | ||
| minimum: 0.05 | ||
| quality: | ||
| minimum frequency: | ||
| H1: 20 | ||
| L1: 20 | ||
| --- | ||
| data: | ||
| channels: | ||
| H1: H1:DCH-CLEAN_STRAIN_C02 | ||
| L1: L1:DCH-CLEAN_STRAIN_C02 | ||
| V1: V1:Hrec_hoft_V1O2Repro2A_16384Hz | ||
| frame types: | ||
| H1: H1_CLEANED_HOFT_C02 | ||
| L1: L1_CLEANED_HOFT_C02 | ||
| V1: V1O2Repro2A | ||
| segment length: 4 | ||
| event time: 1186741861.52678 | ||
| interferometers: | ||
| - H1 | ||
| - L1 | ||
| - V1 | ||
| kind: event | ||
| likelihood: | ||
| psd length: 4 | ||
| reference frequency: 20 | ||
| sample rate: 2048 | ||
| segment start: 1186741859.52678 | ||
| start frequency: 13.333333333333334 | ||
| window length: 4 | ||
| name: GW170814_103043 | ||
| priors: | ||
| amplitude order: 1 | ||
| chirp mass: | ||
| maximum: 36.75187106272061 | ||
| minimum: 19.520822504718232 | ||
| luminosity distance: | ||
| maximum: 10000 | ||
| minimum: 100 | ||
| mass 1: | ||
| maximum: 1000 | ||
| minimum: 1 | ||
| mass ratio: | ||
| maximum: 1.0 | ||
| minimum: 0.05 | ||
| quality: | ||
| minimum frequency: | ||
| H1: 20 | ||
| L1: 20 | ||
| V1: 20 |
| # """These tests are designed to be run on all pipelines to ensure that | ||
| # they are generally compliant within asimov's specifications. They can | ||
| # include checking that ini files have the correct information and that | ||
| # the pipelines report information as expected. | ||
| # """ | ||
| # import os | ||
| # import io | ||
| # import unittest | ||
| # from unittest.mock import patch | ||
| # import shutil | ||
| # import git | ||
| # import contextlib | ||
| # from click.testing import CliRunner | ||
| # from importlib import reload | ||
| # import asimov.event | ||
| # from asimov.cli.project import make_project | ||
| # from asimov.cli.application import apply_page | ||
| # from asimov.ledger import YAMLLedger | ||
| # from asimov.pipelines import known_pipelines | ||
| # from asimov.testing import AsimovTestCase | ||
| # from asimov import config | ||
| # from asimov.utils import set_directory | ||
| # from asimov.cli import monitor | ||
| # class TestSimplePostProcessing(AsimovTestCase): | ||
| # def event_setup(self): | ||
| # f = io.StringIO() | ||
| # with contextlib.redirect_stdout(f): | ||
| # apply_page( | ||
| # f"{self.cwd}/tests/test_data/testing_pe.yaml", | ||
| # event=None, | ||
| # ledger=self.ledger, | ||
| # ) | ||
| # apply_page( | ||
| # file=f"{self.cwd}/tests/test_data/testing_events.yaml", | ||
| # ledger=self.ledger, | ||
| # ) | ||
| # apply_page( | ||
| # file=f"{self.cwd}/tests/test_data/test_analyses.yaml", | ||
| # ledger=self.ledger, | ||
| # ) | ||
| # apply_page( | ||
| # file=f"{self.cwd}/tests/test_data/test_postprocessing.yaml", | ||
| # ledger=self.ledger, | ||
| # ) | ||
| # def test_analysis_has_postprocesses(self): | ||
| # self.event_setup() | ||
| # post = self.ledger.get_event("GW150914_095045")[0].analyses[-1].postprocessing | ||
| # self.assertEqual(len(post), 1) | ||
| # def test_analysis_postprocess_stages(self): | ||
| # self.event_setup() | ||
| # post = self.ledger.get_event("GW150914_095045")[0].analyses[-1].postprocessing | ||
| # self.assertEqual(len(post[0].stages), 2) | ||
| # def test_analysis_postprocess_stages_into_pipelines(self): | ||
| # self.event_setup() | ||
| # post = self.ledger.get_event("GW150914_095045")[0].analyses[-1].postprocessing | ||
| # self.assertEqual(post[0].stages[0].pipeline.name.lower(), "pesummary") | ||
| # def test_analysis_postprocess_dag(self): | ||
| # """Test that the DAG of analysis stages is created""" | ||
| # self.event_setup() | ||
| # # Mark the analysis as finished | ||
| # os.chdir(f"{self.cwd}/tests/tmp/project") | ||
| # reload(asimov) | ||
| # analysis = asimov.current_ledger.get_event("GW150914_095045")[0].analyses[-1] | ||
| # analysis.status = "finished" | ||
| # asimov.current_ledger.update_event(analysis.event) | ||
| # reload(asimov) | ||
| # post = asimov.current_ledger.get_event("GW150914_095045")[0].analyses[-1].postprocessing | ||
| # self.assertEqual(len(post[0]._stages_dag), 2) | ||
| # self.assertEqual(len(post[0].next), 1) | ||
| # self.assertEqual(post[0].next[0], "simple PE summary") | ||
| # os.chdir(self.cwd) | ||
| # @patch('asimov.pipelines.bilby.Bilby.samples') | ||
| # def test_analysis_postprocess_run_next(self, mock_samples): | ||
| # mock_samples.return_value = ["/home/test/samples"] | ||
| # self.event_setup() | ||
| # os.chdir(f"{self.cwd}/tests/tmp/project") | ||
| # reload(asimov) | ||
| # analysis = asimov.current_ledger.get_event("GW150914_095045")[0].analyses[-1] | ||
| # analysis.status = "finished" | ||
| # asimov.current_ledger.update_event(analysis.event) | ||
| # reload(asimov) | ||
| # post = asimov.current_ledger.get_event("GW150914_095045")[0].analyses[-1].postprocessing | ||
| # f = io.StringIO() | ||
| # with contextlib.redirect_stdout(f): | ||
| # string = """'output': 'working/GW150914_095045/pesummary.out', 'error': 'working/GW150914_095045/pesummary.err', 'log': 'working/GW150914_095045/pesummary.log', 'request_cpus': 4, 'getenv': 'true', 'batch_name': 'Summary Pages/GW150914_095045', 'request_memory': '8192MB', 'should_transfer_files': 'YES', 'request_disk': '8192MB'""" | ||
| # post[0].run_next(dryrun=True) | ||
| # self.assertTrue(string in f.getvalue()) | ||
| # os.chdir(self.cwd) | ||
| # @patch('asimov.pipelines.bilby.Bilby.samples') | ||
| # def test_analysis_postprocess_run_mocked_samples(self, mock_samples): | ||
| # mock_samples.return_value = ["/home/test/samples"] | ||
| # self.event_setup() | ||
| # os.chdir(f"{self.cwd}/tests/tmp/project") | ||
| # reload(asimov) | ||
| # analysis = asimov.current_ledger.get_event("GW150914_095045")[0].analyses[-1] | ||
| # analysis.status = "finished" | ||
| # asimov.current_ledger.update_event(analysis.event) | ||
| # reload(asimov) | ||
| # post = asimov.current_ledger.get_event("GW150914_095045")[0].analyses[-1].postprocessing | ||
| # f = io.StringIO() | ||
| # with contextlib.redirect_stdout(f): | ||
| # post[0].run_next(dryrun=True) | ||
| # self.assertTrue("--samples /home/test/samples" in f.getvalue()) | ||
| # os.chdir(self.cwd) | ||
| # # Next we need to test that asimov monitor will actually start a post-processing job correctly. | ||
| # @patch('asimov.condor.CondorJobList') | ||
| # @patch('asimov.pipelines.bilby.Bilby.detect_completion') | ||
| # @patch('asimov.pipelines.bilby.Bilby.collect_assets') | ||
| # @patch('asimov.pipelines.bilby.Bilby.samples') | ||
| # def test_analysis_postprocess_monitor_mocked_samples(self, mock_samples, mock_assets, mock_complete, mock_condor): | ||
| # """Check that asimov monitor attempts to start a post-processing job""" | ||
| # class MockJobList: | ||
| # def __init__(self): | ||
| # pass | ||
| # def refresh(self): | ||
| # return [] | ||
| # mock_samples.return_value = ["/home/test/samples"] | ||
| # mock_assets.return_value = {"samples": "/home/test/samples"} | ||
| # mock_complete.return_value = True | ||
| # mock_condor.return_value = MockJobList() | ||
| # self.event_setup() | ||
| # os.chdir(f"{self.cwd}/tests/tmp/project") | ||
| # runner = CliRunner() | ||
| # reload(asimov) | ||
| # reload(monitor) | ||
| # # Mark the analysis as running | ||
| # analysis = asimov.current_ledger.get_event("GW150914_095045")[0].analyses[-1] | ||
| # analysis.status = "finished" | ||
| # asimov.current_ledger.update_event(analysis.event) | ||
| # reload(asimov) | ||
| # analysis = asimov.current_ledger.get_event("GW150914_095045")[0].analyses[-1] | ||
| # # Run the monitor | ||
| # result = runner.invoke(monitor.monitor, "--dry-run") | ||
| # os.chdir(self.cwd) | ||
| # self.assertTrue("--samples /home/test/samples" in result.output) | ||
| # class TestEventPostProcessing(AsimovTestCase): | ||
| # def event_setup(self): | ||
| # # f = io.StringIO() | ||
| # # with contextlib.redirect_stdout(f): | ||
| # apply_page( | ||
| # f"{self.cwd}/tests/test_data/testing_pe.yaml", | ||
| # event=None, | ||
| # ledger=self.ledger, | ||
| # ) | ||
| # apply_page( | ||
| # file=f"{self.cwd}/tests/test_data/testing_events.yaml", | ||
| # ledger=self.ledger, | ||
| # ) | ||
| # apply_page( | ||
| # file=f"{self.cwd}/tests/test_data/test_analyses.yaml", | ||
| # ledger=self.ledger, | ||
| # ) | ||
| # apply_page( | ||
| # file=f"{self.cwd}/tests/test_data/test_postprocessing.yaml", | ||
| # ledger=self.ledger, | ||
| # ) | ||
| # def test_subject_has_postprocesses(self): | ||
| # self.event_setup() | ||
| # post = self.ledger.get_event("GW150914_095045")[0].postprocessing | ||
| # self.assertEqual(len(post), 1) | ||
| # def test_correct_analyses(self): | ||
| # """Test that the correct analyses are returned.""" | ||
| # self.event_setup() | ||
| # post = self.ledger.get_event("GW150914_095045")[0].postprocessing | ||
| # self.assertEqual(str(list(post[0].analyses)[0].pipeline), "bilby") | ||
| # def test_fresh_is_true_no_finished_analyses(self): | ||
| # """Test that when no analyses are finished the job is not fresh""" | ||
| # self.event_setup() | ||
| # post_job = self.ledger.get_event("GW150914_095045")[0].postprocessing[0] | ||
| # self.assertEqual(post_job.fresh, True) | ||
| # # Next we need to test that asimov monitor will actually start a post-processing job correctly. | ||
| # @patch('asimov.condor.CondorJobList') | ||
| # @patch('asimov.pipelines.bilby.Bilby.detect_completion') | ||
| # @patch('asimov.pipelines.bilby.Bilby.collect_assets') | ||
| # @patch('asimov.pipelines.bilby.Bilby.samples') | ||
| # def test_analysis_postprocess_monitor_mocked_samples(self, mock_samples, mock_assets, mock_complete, mock_condor): | ||
| # """Check that asimov monitor attempts to start a post-processing job""" | ||
| # class MockJobList: | ||
| # def __init__(self): | ||
| # pass | ||
| # def refresh(self): | ||
| # return [] | ||
| # mock_samples.return_value = ["/home/test/samples"] | ||
| # mock_complete.return_value = True | ||
| # mock_condor.return_value = MockJobList() | ||
| # self.event_setup() | ||
| # os.chdir(f"{self.cwd}/tests/tmp/project") | ||
| # runner = CliRunner() | ||
| # reload(asimov) | ||
| # reload(monitor) | ||
| # # Mark the analysis as running | ||
| # analysis = asimov.current_ledger.get_event("GW150914_095045")[0].analyses[-1] | ||
| # analysis.status = "finished" | ||
| # asimov.current_ledger.get_event("GW150914_095045")[0].analyses[-1].status = "finished" | ||
| # asimov.current_ledger.update_event(analysis.event) | ||
| # reload(asimov) | ||
| # reload(monitor) | ||
| # # Run the monitor | ||
| # result = runner.invoke(monitor.monitor, "--dry-run") | ||
| # os.chdir(self.cwd) | ||
| # self.assertTrue("--samples /home/test/samples /home/test/samples" in result.output) | ||
| # # Next we need to test that asimov monitor will actually start a post-processing job correctly. | ||
| # @patch('asimov.condor.CondorJobList') | ||
| # @patch('asimov.pipelines.bilby.Bilby.detect_completion') | ||
| # @patch('asimov.pipelines.bilby.Bilby.collect_assets') | ||
| # @patch('asimov.pipelines.bilby.Bilby.samples') | ||
| # def test_analysis_postprocess_only_ready(self, mock_samples, mock_assets, mock_complete, mock_condor): | ||
| # """Check that asimov monitor attempts to start a post-processing job only if one of the analyses is ready""" | ||
| # class MockJobList: | ||
| # def __init__(self): | ||
| # pass | ||
| # def refresh(self): | ||
| # return [] | ||
| # mock_samples.return_value = ["/home/test/samples"] | ||
| # #mock_assets.return_value = {"samples": "/home/test/samples"} | ||
| # mock_complete.return_value = True | ||
| # mock_condor.return_value = MockJobList() | ||
| # self.event_setup() | ||
| # os.chdir(f"{self.cwd}/tests/tmp/project") | ||
| # runner = CliRunner() | ||
| # reload(asimov) | ||
| # reload(monitor) | ||
| # # Mark the analysis as running | ||
| # analysis = asimov.current_ledger.get_event("GW150914_095045")[0].analyses[-1] | ||
| # analysis.status = "finished" | ||
| # asimov.current_ledger.get_event("GW150914_095045")[0].analyses[-1].status = "finished" | ||
| # asimov.current_ledger.update_event(analysis.event) | ||
| # reload(asimov) | ||
| # reload(monitor) | ||
| # result = runner.invoke(monitor.monitor, "--dry-run") | ||
| # os.chdir(self.cwd) | ||
| # self.assertTrue("--samples /home/test/samples /home/test/samples" in result.output) |
| Metadata-Version: 2.1 | ||
| Name: asimov | ||
| Version: 0.5.8 | ||
| Version: 0.6.0 | ||
| Summary: A Python package for managing and interacting with data analysis jobs. | ||
@@ -49,6 +49,7 @@ Home-page: https://git.ligo.org/asimov/asimov | ||
| [](https://anaconda.org/conda-forge/ligo-asimov) | ||
| [](https://conda.anaconda.org/conda-forge) | ||
| [](https://git.ligo.org/asimov/asimov/-/commits/infrastructure-updates) | ||
| [](https://git.ligo.org/asimov/asimov/-/commits/master) | ||
| [](https://anaconda.org/conda-forge/asimov/) | ||
|  | ||
|  | ||
@@ -59,16 +60,2 @@ Asimov was developed to manage and automate the parameter estimation analyses used by the LIGO, Virgo, and KAGRA collaborations to analyse gravitational wave signals, but it aims to provide tools which can be used for other workflows. | ||
| ## Branch notes | ||
| These notes relate to in-development features on this branch, and what's described here is only expected to be relevant during development. | ||
| More generally useful documentation will move to the main documentation before moving to production. | ||
| ### Starting the logging server | ||
| Run in ``asimov`` directory: | ||
| ``` | ||
| export FLASK_APP=server | ||
| flask run | ||
| ``` | ||
| ## Features | ||
@@ -112,3 +99,3 @@ | ||
| ``` | ||
| $ conda install -c conda-forge ligo-asimov | ||
| $ conda install -c conda-forge asimov | ||
| ``` | ||
@@ -122,2 +109,3 @@ | ||
| ## Get started | ||
@@ -124,0 +112,0 @@ |
@@ -23,2 +23,3 @@ .gitlab-ci.yml | ||
| asimov/asimov.conf | ||
| asimov/auth.py | ||
| asimov/condor.py | ||
@@ -28,3 +29,2 @@ asimov/database.py | ||
| asimov/git.py | ||
| asimov/gitlab.py | ||
| asimov/ini.py | ||
@@ -68,2 +68,3 @@ asimov/ledger.py | ||
| asimov/configs/lalinference.ini | ||
| asimov/configs/pesummary.ini | ||
| asimov/configs/rift.ini | ||
@@ -74,2 +75,3 @@ asimov/pipelines/__init__.py | ||
| asimov/pipelines/lalinference.py | ||
| asimov/pipelines/pesummary.py | ||
| asimov/pipelines/rift.py | ||
@@ -87,2 +89,3 @@ asimov/report-theme/body.html | ||
| docs/source/building-docs.rst | ||
| docs/source/citing.rst | ||
| docs/source/clusters.rst | ||
@@ -106,3 +109,2 @@ docs/source/code-of-conduct.rst | ||
| docs/source/textmark.png | ||
| docs/source/yamlformat.rst | ||
| docs/source/advanced/projects.rst | ||
@@ -136,3 +138,2 @@ docs/source/api/asimov.rst | ||
| scripts/check-ifo.py | ||
| scripts/condor_tests.py | ||
| scripts/find_calibration.py | ||
@@ -154,2 +155,3 @@ scripts/make_events.py | ||
| tests/test_pipelines_generic.py | ||
| tests/test_postprocessing.py | ||
| tests/test_review.py | ||
@@ -170,8 +172,15 @@ tests/test_specific_events.py | ||
| tests/test_data/event_non_standard_settings.yaml | ||
| tests/test_data/events_blueprint.yaml | ||
| tests/test_data/lalinference_settings_nodeps.yaml | ||
| tests/test_data/project_analysis.yaml | ||
| tests/test_data/rift_settings_nodeps.yaml | ||
| tests/test_data/test_analyses.yaml | ||
| tests/test_data/test_complex_dag.yaml | ||
| tests/test_data/test_joint_analysis.yaml | ||
| tests/test_data/test_linear_dag.yaml | ||
| tests/test_data/test_postprocessing.yaml | ||
| tests/test_data/test_query_dag.yaml | ||
| tests/test_data/test_rift.yaml | ||
| tests/test_data/test_simple_dag.yaml | ||
| tests/test_data/testing_events.yaml | ||
| tests/test_data/testing_pe.yaml | ||
@@ -178,0 +187,0 @@ tests/test_data/priors/bbh.prior.template |
@@ -6,3 +6,2 @@ """ | ||
| __author__ = """Daniel Williams""" | ||
@@ -95,10 +94,8 @@ __email__ = "daniel.williams@ligo.org" | ||
| try: | ||
| if config.get("ledger", "engine") == "gitlab": | ||
| from .gitlab import GitlabLedger | ||
| current_ledger = GitlabLedger() | ||
| elif config.get("ledger", "engine") == "yamlfile": | ||
| if config.get("ledger", "engine") == "yamlfile": | ||
| from .ledger import YAMLLedger | ||
| current_ledger = YAMLLedger(config.get("ledger", "location")) | ||
| elif config.get("ledger", "engine") == "gitlab": | ||
| logger.error("The gitlab interface has been removed from v0.6 of asimov") | ||
| else: | ||
@@ -105,0 +102,0 @@ current_ledger = None |
+897
-55
@@ -22,25 +22,57 @@ """ | ||
| on all events, including event and simple analyses. | ||
| This type of analysis is useful for defining a population analysis, for example. | ||
| This type of analysis is useful for defining a population analysis or analysing multiple events together, for example. | ||
| """ | ||
| import os | ||
| import configparser | ||
| from copy import deepcopy | ||
| from warnings import warn | ||
| import pathlib | ||
| from functools import reduce | ||
| import operator | ||
| from liquid import Liquid | ||
| from asimov import config | ||
| from asimov import config, logger, LOGGER_LEVEL | ||
| from asimov.pipelines import known_pipelines | ||
| from asimov.utils import update | ||
| from asimov.utils import update, diff_dict | ||
| from asimov.storage import Store | ||
| from .review import Review | ||
| from .ini import RunConfiguration | ||
| status_map = { | ||
| "cancelled": "light", | ||
| "finished": "success", | ||
| "uploaded": "success", | ||
| "processing": "primary", | ||
| "running": "primary", | ||
| "stuck": "warning", | ||
| "restart": "secondary", | ||
| "ready": "secondary", | ||
| "wait": "light", | ||
| "stop": "danger", | ||
| "manual": "light", | ||
| "stopped": "light", | ||
| } | ||
| review_map = { | ||
| "deprecated": "warning", | ||
| "none": "default", | ||
| "approved": "success", | ||
| "rejected": "danger", | ||
| "checked": "info", | ||
| } | ||
| class Analysis: | ||
| """ | ||
| The base class for all other types of analysis. | ||
| TODO: Add a check to make sure names cannot conflict | ||
| """ | ||
| meta = {} | ||
| meta_defaults = {"scheduler": {}, "sampler": {}, "review": {}, "likelihood": {}} | ||
| _reviews = Review() | ||
| @property | ||
@@ -51,2 +83,7 @@ def review(self): | ||
| """ | ||
| if "review" in self.meta: | ||
| if len(self.meta["review"]) > 0: | ||
| self._reviews = Review.from_dict(self.meta["review"], production=self) | ||
| self.meta.pop("review") | ||
| return self._reviews | ||
@@ -56,13 +93,68 @@ def _process_dependencies(self, needs): | ||
| Process the dependencies list for this production. | ||
| The dependencies can be provided either as the name of a production, | ||
| or a query against the analysis's attributes. | ||
| Parameters | ||
| ---------- | ||
| needs : list | ||
| A list of all the requirements | ||
| Returns | ||
| ------- | ||
| list | ||
| A list of all the requirements processed for evaluation. | ||
| """ | ||
| return needs | ||
| all_requirements = [] | ||
| for need in deepcopy(needs): | ||
| try: | ||
| requirement = need.split(":") | ||
| requirement = [requirement[0].split("."), requirement[1]] | ||
| except IndexError: | ||
| requirement = [["name"], need] | ||
| except AttributeError: | ||
| requirement = need | ||
| all_requirements.append(requirement) | ||
| return all_requirements | ||
| @property | ||
| def job_id(self): | ||
| """ | ||
| Get the ID number of this job as it resides in the scheduler. | ||
| """ | ||
| if "scheduler" in self.meta: | ||
| if "job id" in self.meta["scheduler"]: | ||
| return self.meta["scheduler"]["job id"] | ||
| else: | ||
| return None | ||
| @job_id.setter | ||
| def job_id(self, value): | ||
| if "scheduler" not in self.meta: | ||
| self.meta["scheduler"] = {} | ||
| self.meta["scheduler"]["job id"] = value | ||
| @property | ||
| def dependencies(self): | ||
| if "needs" in self.meta: | ||
| dependencies = self._process_dependencies(self.meta["needs"]) | ||
| """Return a list of analyses which this analysis depends upon.""" | ||
| all_matches = [] | ||
| if len(self._needs) == 0: | ||
| return [] | ||
| else: | ||
| dependencies = None | ||
| return dependencies | ||
| matches = set({}) # set(self.event.analyses) | ||
| # matches.remove(self) | ||
| requirements = self._process_dependencies(deepcopy(self._needs)) | ||
| for attribute, match in requirements: | ||
| filtered_analyses = list( | ||
| filter( | ||
| lambda x: x.matches_filter(attribute, match), | ||
| self.event.analyses, | ||
| ) | ||
| ) | ||
| matches = set.union(matches, set(filtered_analyses)) | ||
| for analysis in matches: | ||
| all_matches.append(analysis.name) | ||
| return all_matches | ||
| @property | ||
@@ -78,3 +170,3 @@ def priors(self): | ||
| def finished(self): | ||
| finished_states = ["uploaded"] | ||
| finished_states = ["finished", "processing", "uploaded"] | ||
| return self.status in finished_states | ||
@@ -89,5 +181,54 @@ | ||
| self.status_str = value.lower() | ||
| if self.event.issue_object is not None: | ||
| self.event.issue_object.update_data() | ||
| def matches_filter(self, attribute, match): | ||
| """ | ||
| Checks to see if this analysis matches a given filtering | ||
| criterion. | ||
| A variety of different attributes can be used for filtering. | ||
| The primary attributes are | ||
| - review status | ||
| - processing status | ||
| - name | ||
| In addition, any quantity contained in the analysis metadata | ||
| may be used by accessing it in the nested structure of this | ||
| data, with levels of the hierarchy separated with period | ||
| characters. For example, to access the waveform approximant | ||
| the correct attribute would be `waveform.approximant`. | ||
| Parameters | ||
| ---------- | ||
| attribute : str | ||
| The name of the attribute to be tested | ||
| match : str | ||
| The string to be matched against the value of the attribute | ||
| Returns | ||
| ------- | ||
| bool | ||
| Returns True if this analysis matches the query, | ||
| otherwise returns False. | ||
| """ | ||
| is_review = False | ||
| is_status = False | ||
| is_name = False | ||
| in_meta = False | ||
| if attribute[0] == "review": | ||
| is_review = match.lower() == str(self.review.status).lower() | ||
| elif attribute[0] == "status": | ||
| is_status = match.lower() == self.status.lower() | ||
| elif attribute[0] == "name": | ||
| is_name = match == self.name | ||
| else: | ||
| try: | ||
| in_meta = reduce(operator.getitem, attribute, self.meta) == match | ||
| except KeyError: | ||
| in_meta = False | ||
| return is_name | in_meta | is_status | is_review | ||
| def results(self, filename=None, handle=False, hash=None): | ||
@@ -111,9 +252,10 @@ store = Store(root=config.get("storage", "results_store")) | ||
| """ | ||
| Return the run directory for this event. | ||
| Return the run directory for this analysis. | ||
| """ | ||
| if "rundir" in self.meta: | ||
| return self.meta["rundir"] | ||
| return os.path.abspath(self.meta["rundir"]) | ||
| elif "working directory" in self.subject.meta: | ||
| value = os.path.join(self.subject.meta["working directory"], self.name) | ||
| self.meta["rundir"] = value | ||
| return os.path.abspath(self.meta["rundir"]) | ||
| # TODO: Make sure this is saved back to the ledger | ||
@@ -123,4 +265,34 @@ else: | ||
| def make_config(self, filename, template_directory=None): | ||
| @rundir.setter | ||
| def rundir(self, value): | ||
| """ | ||
| Set the run directory. | ||
| """ | ||
| if "rundir" not in self.meta: | ||
| self.meta["rundir"] = value | ||
| else: | ||
| self.meta["rundir"] = value | ||
| def get_meta(self, key): | ||
| """ | ||
| Get the value of a metadata attribute, or return None if it doesn't | ||
| exist. | ||
| """ | ||
| if key in self.meta: | ||
| return self.meta[key] | ||
| else: | ||
| return None | ||
| def set_meta(self, key, value): | ||
| """ | ||
| Set a metadata attribute which doesn't currently exist. | ||
| """ | ||
| if key not in self.meta: | ||
| self.meta[key] = value | ||
| self.event.ledger.update_event(self.event) | ||
| else: | ||
| raise ValueError | ||
| def make_config(self, filename, template_directory=None, dryrun=False): | ||
| """ | ||
| Make the configuration file for this production. | ||
@@ -139,7 +311,7 @@ | ||
| template = f"{self.meta['template']}.ini" | ||
| else: | ||
| template = f"{self.pipeline}.ini" | ||
| pipeline = known_pipelines[self.pipeline] | ||
| pipeline = self.pipeline | ||
| try: | ||
@@ -158,8 +330,131 @@ template_directory = config.get("templating", "directory") | ||
| liq = Liquid(template_file) | ||
| rendered = liq.render(production=self, config=config) | ||
| rendered = liq.render(production=self, analysis=self, config=config) | ||
| with open(filename, "w") as output_file: | ||
| output_file.write(rendered) | ||
| def build_report(self): | ||
| if self.pipeline: | ||
| self.pipeline.build_report() | ||
| def html(self): | ||
| """ | ||
| An HTML representation of this production. | ||
| """ | ||
| production = self | ||
| card = "" | ||
| card += f"<div class='asimov-analysis asimov-analysis-{self.status}'>" | ||
| card += f"<h4>{self.name}" | ||
| if self.comment: | ||
| card += ( | ||
| f""" <small class="asimov-comment text-muted">{self.comment}</small>""" | ||
| ) | ||
| card += "</h4>" | ||
| if self.status: | ||
| card += f"""<p class="asimov-status"> | ||
| <span class="badge badge-pill badge-{status_map[self.status]}">{self.status}</span> | ||
| </p>""" | ||
| if self.pipeline: | ||
| card += f"""<p class="asimov-pipeline-name">{self.pipeline.name}</p>""" | ||
| if self.pipeline: | ||
| # self.pipeline.collect_pages() | ||
| card += self.pipeline.html() | ||
| if self.rundir: | ||
| card += f"""<p class="asimov-rundir"><code>{production.rundir}</code></p>""" | ||
| else: | ||
| card += """ """ | ||
| if "approximant" in production.meta: | ||
| card += f"""<p class="asimov-attribute">Waveform approximant: | ||
| <span class="asimov-approximant">{production.meta['approximant']}</span> | ||
| </p>""" | ||
| card += """ """ | ||
| card += """</div>""" | ||
| if len(self.review) > 0: | ||
| for review in self.review: | ||
| card += review.html() | ||
| return card | ||
| def to_dict(self, event=True): | ||
| """ | ||
| Return this production as a dictionary. | ||
| Parameters | ||
| ---------- | ||
| event : bool | ||
| If set to True the output is designed to be included nested within an event. | ||
| The event name is not included in the representation, and the production name is provided as a key. | ||
| """ | ||
| dictionary = deepcopy(self.meta) | ||
| if not event: | ||
| dictionary["event"] = self.event.name | ||
| dictionary["name"] = self.name | ||
| if isinstance(self.pipeline, str): | ||
| dictionary["pipeline"] = self.pipeline | ||
| else: | ||
| dictionary["pipeline"] = self.pipeline.name.lower() | ||
| dictionary["comment"] = self.comment | ||
| if self.review: | ||
| dictionary["review"] = self.review.to_dicts() | ||
| dictionary["needs"] = self._needs # self.dependencies | ||
| if "data" in self.meta: | ||
| dictionary["data"] = self.meta["data"] | ||
| if "likelihood" in self.meta: | ||
| dictionary["likelihood"] = self.meta["likelihood"] | ||
| if "quality" in self.meta: | ||
| dictionary["quality"] = self.meta["quality"] | ||
| if "priors" in self.meta: | ||
| dictionary["priors"] = self.meta["priors"] | ||
| if "waveform" in self.meta: | ||
| dictionary["waveform"] = self.meta["waveform"] | ||
| for key, value in self.meta.items(): | ||
| dictionary[key] = value | ||
| dictionary["status"] = self.status | ||
| dictionary["job id"] = self.job_id | ||
| # Remove duplicates of pipeline defaults | ||
| if self.pipeline.name.lower() in self.event.ledger.data["pipelines"]: | ||
| defaults = deepcopy( | ||
| self.event.ledger.data["pipelines"][self.pipeline.name.lower()] | ||
| ) | ||
| else: | ||
| defaults = {} | ||
| # if "postprocessing" in self.event.ledger.data: | ||
| # defaults["postprocessing"] = deepcopy( | ||
| # self.event.ledger.data["postprocessing"] | ||
| # ) | ||
| defaults = update(defaults, deepcopy(self.event.meta)) | ||
| dictionary = diff_dict(defaults, dictionary) | ||
| if "repository" in self.meta: | ||
| dictionary["repository"] = self.repository.url | ||
| if "ledger" in dictionary: | ||
| dictionary.pop("ledger") | ||
| if "pipelines" in dictionary: | ||
| dictionary.pop("pipelines") | ||
| if "productions" in dictionary: | ||
| dictionary.pop("productions") | ||
| if not event: | ||
| output = dictionary | ||
| else: | ||
| output = {self.name: dictionary} | ||
| return output | ||
| class SimpleAnalysis(Analysis): | ||
@@ -171,5 +466,22 @@ """ | ||
| def __init__(self, subject, name, pipeline, status=None, comment=None, **kwargs): | ||
| self.ledger = kwargs.get("ledger", None) | ||
| self.event = self.subject = subject | ||
| self.name = name | ||
| pathlib.Path( | ||
| os.path.join(config.get("logging", "directory"), self.event.name, name) | ||
| ).mkdir(parents=True, exist_ok=True) | ||
| self.logger = logger.getChild("analysis").getChild( | ||
| f"{self.event.name}/{self.name}" | ||
| ) | ||
| self.logger.setLevel(LOGGER_LEVEL) | ||
| # fh = logging.FileHandler(logfile) | ||
| # formatter = logging.Formatter("%(asctime)s - %(message)s", "%Y-%m-%d %H:%M:%S") | ||
| # fh.setFormatter(formatter) | ||
| # self.logger.addHandler(fh) | ||
| if status: | ||
@@ -179,8 +491,70 @@ self.status_str = status.lower() | ||
| self.status_str = "none" | ||
| self.pipeline = pipeline.lower() | ||
| self.comment = comment | ||
| self.meta = deepcopy(self.subject.meta) | ||
| self.meta = update(self.meta, kwargs) | ||
| self.meta = deepcopy(self.meta_defaults) | ||
| # Start by adding pipeline defaults | ||
| if "pipelines" in self.event.ledger.data: | ||
| if pipeline in self.event.ledger.data["pipelines"]: | ||
| self.meta = update( | ||
| self.meta, deepcopy(self.event.ledger.data["pipelines"][pipeline]) | ||
| ) | ||
| if "postprocessing" in self.event.ledger.data: | ||
| self.meta["postprocessing"] = deepcopy( | ||
| self.event.ledger.data["postprocessing"] | ||
| ) | ||
| # self.meta["pipeline"] = pipeline | ||
| # Update with the subject defaults | ||
| self.meta = update(self.meta, deepcopy(self.subject.meta)) | ||
| if "productions" in self.meta: | ||
| self.meta.pop("productions") | ||
| self.meta = update(self.meta, deepcopy(kwargs)) | ||
| self.pipeline = pipeline.lower() | ||
| self.pipeline = known_pipelines[pipeline.lower()](self) | ||
| if "needs" in self.meta: | ||
| self._needs = self.meta.pop("needs") | ||
| else: | ||
| self._needs = [] | ||
| self.comment = kwargs.get("comment", None) | ||
| def _previous_assets(self): | ||
| assets = {} | ||
| if self.dependencies: | ||
| productions = {} | ||
| for production in self.event.productions: | ||
| productions[production.name] = production | ||
| for previous_job in self.dependencies: | ||
| assets.update(productions[previous_job].pipeline.collect_assets()) | ||
| return assets | ||
| @classmethod | ||
| def from_dict(cls, parameters, subject=None, ledger=None): | ||
| parameters = deepcopy(parameters) | ||
| # Check that pars is a dictionary | ||
| if not {"pipeline", "name"} <= parameters.keys(): | ||
| raise ValueError( | ||
| f"Some of the required parameters are missing." | ||
| f"Found {parameters.keys()}" | ||
| ) | ||
| if "status" not in parameters: | ||
| parameters["status"] = "ready" | ||
| if "event" in parameters: | ||
| parameters.pop("event") | ||
| pipeline = parameters.pop("pipeline") | ||
| name = parameters.pop("name") | ||
| if "comment" not in parameters: | ||
| parameters["comment"] = None | ||
| if "ledger" in parameters: | ||
| ledger = parameters.pop("ledger") | ||
| return cls( | ||
| name=name, pipeline=pipeline, ledger=ledger, subject=subject, **parameters | ||
| ) | ||
| class SubjectAnalysis(Analysis): | ||
@@ -191,5 +565,124 @@ """ | ||
| pass | ||
| def __init__(self, subject, name, pipeline, status=None, comment=None, **kwargs): | ||
| self.event = self.subject = subject | ||
| self.name = name | ||
| self.category = "subject_analyses" | ||
| self.logger = logger.getChild("event").getChild(f"{self.name}") | ||
| self.logger.setLevel(LOGGER_LEVEL) | ||
| if status: | ||
| self.status_str = status.lower() | ||
| else: | ||
| self.status_str = "none" | ||
| self.meta = deepcopy(self.meta_defaults) | ||
| self.meta = update(self.meta, deepcopy(self.subject.meta)) | ||
| self.meta = update(self.meta, deepcopy(kwargs)) | ||
| self._analysis_spec = self.meta.get("needs") | ||
| if self._analysis_spec: | ||
| requirements = self._process_dependencies(self._analysis_spec) | ||
| self.analyses = [] | ||
| for attribute, match in requirements: | ||
| matches = set(self.subject.analyses) | ||
| filtered_analyses = list( | ||
| filter( | ||
| lambda x: x.matches_filter(attribute, match), subject.analyses | ||
| ) | ||
| ) | ||
| matches = set.intersection(matches, set(filtered_analyses)) | ||
| for analysis in matches: | ||
| self.analyses.append(analysis) | ||
| self.productions = self.analyses | ||
| if "needs" in self.meta: | ||
| self.meta.pop("needs") | ||
| self.pipeline = pipeline.lower() | ||
| self.pipeline = known_pipelines[pipeline.lower()](self) | ||
| if "needs" in self.meta: | ||
| self._needs = self.meta.pop("needs") | ||
| else: | ||
| self._needs = [] | ||
| if "comment" in kwargs: | ||
| self.comment = kwargs["comment"] | ||
| else: | ||
| self.comment = None | ||
| def to_dict(self, event=True): | ||
| """ | ||
| Return this production as a dictionary. | ||
| Parameters | ||
| ---------- | ||
| event : bool | ||
| If set to True the output is designed to be included nested within an event. | ||
| The event name is not included in the representation, and the production name is provided as a key. | ||
| """ | ||
| dictionary = {} | ||
| dictionary = update(dictionary, self.meta) | ||
| if not event: | ||
| dictionary["event"] = self.event.name | ||
| dictionary["name"] = self.name | ||
| dictionary["status"] = self.status | ||
| if isinstance(self.pipeline, str): | ||
| dictionary["pipeline"] = self.pipeline | ||
| else: | ||
| dictionary["pipeline"] = self.pipeline.name.lower() | ||
| dictionary["comment"] = self.comment | ||
| dictionary["analyses"] = self._analysis_spec | ||
| if self.review: | ||
| dictionary["review"] = self.review.to_dicts() | ||
| dictionary["needs"] = deepcopy(self._needs) # self.dependencies | ||
| if "quality" in self.meta: | ||
| dictionary["quality"] = self.meta["quality"] | ||
| if "priors" in self.meta: | ||
| dictionary["priors"] = self.meta["priors"] | ||
| for key, value in self.meta.items(): | ||
| dictionary[key] = value | ||
| if "repository" in self.meta: | ||
| dictionary["repository"] = self.repository.url | ||
| if "ledger" in dictionary: | ||
| dictionary.pop("ledger") | ||
| if "pipelines" in dictionary: | ||
| dictionary.pop("pipelines") | ||
| if not event: | ||
| output = dictionary | ||
| else: | ||
| output = {self.name: dictionary} | ||
| return output | ||
| @classmethod | ||
| def from_dict(cls, parameters, subject): | ||
| parameters = deepcopy(parameters) | ||
| # Check that pars is a dictionary | ||
| if not {"pipeline", "name"} <= parameters.keys(): | ||
| raise ValueError( | ||
| f"Some of the required parameters are missing." | ||
| f"Found {parameters.keys()}" | ||
| ) | ||
| if "status" not in parameters: | ||
| parameters["status"] = "ready" | ||
| if "event" in parameters: | ||
| parameters.pop("event") | ||
| pipeline = parameters.pop("pipeline") | ||
| name = parameters.pop("name") | ||
| if "comment" not in parameters: | ||
| parameters["comment"] = None | ||
| return cls(subject, name, pipeline, **parameters) | ||
| class ProjectAnalysis(Analysis): | ||
@@ -200,12 +693,222 @@ """ | ||
| def __init__( | ||
| self, subjects, analyses, name, pipeline, status=None, comment=None, **kwargs | ||
| ): | ||
| meta_defaults = {"scheduler": {}, "sampler": {}, "review": {}} | ||
| def __init__(self, name, pipeline, ledger=None, **kwargs): | ||
| """ """ | ||
| super().__init__() | ||
| self.name = name | ||
| self.logger = logger.getChild("project analyses").getChild(f"{self.name}") | ||
| self.logger.setLevel(LOGGER_LEVEL) | ||
| self.ledger = ledger | ||
| self.subjects = subjects | ||
| self.analyses = analyses | ||
| self._subjects = kwargs["subjects"] | ||
| self._events = self._subjects | ||
| if "analyses" in kwargs.keys(): | ||
| self._analysis_spec = kwargs["analyses"] | ||
| else: | ||
| self._analysis_spec = {} | ||
| requirements = self._process_dependencies(self._analysis_spec) | ||
| self.analyses = [] | ||
| # set up the working directory | ||
| if "working_directory" in kwargs: | ||
| self.work_dir = kwargs["working_directory"] | ||
| else: | ||
| self.work_dir = os.path.join("working", "project analyses", f"{self.name}") | ||
| if not os.path.exists(self.work_dir): | ||
| os.makedirs(self.work_dir) | ||
| self.repository = None | ||
| self._subject_obs = [] | ||
| for subject in self.subjects: | ||
| if self._analysis_spec: | ||
| matches = set(subject.analyses) | ||
| for attribute, match in requirements: | ||
| filtered_analyses = list( | ||
| filter( | ||
| lambda x: x.matches_filter(attribute, match), | ||
| subject.analyses, | ||
| ) | ||
| ) | ||
| matches = set.intersection(matches, set(filtered_analyses)) | ||
| for analysis in matches: | ||
| self.analyses.append(analysis) | ||
| if "status" in kwargs: | ||
| self.status_str = kwargs["status"].lower() | ||
| else: | ||
| self.status_str = "none" | ||
| self.pipeline = pipeline # .lower() | ||
| if isinstance(pipeline, str): | ||
| # try: | ||
| self.pipeline = known_pipelines[str(pipeline).lower()](self) | ||
| # except KeyError: | ||
| self.logger.warning(f"The pipeline {pipeline} could not be found.") | ||
| if "needs" in self.meta: | ||
| self._needs = self.meta.pop("needs") | ||
| else: | ||
| self._needs = [] | ||
| if "comment" in kwargs: | ||
| self.comment = kwargs["comment"] | ||
| else: | ||
| self.comment = None | ||
| self.meta = deepcopy(self.meta_defaults) | ||
| # Start by adding pipeline defaults | ||
| if "pipelines" in self.ledger.data: | ||
| if pipeline in self.ledger.data["pipelines"]: | ||
| self.meta = update( | ||
| self.meta, deepcopy(self.ledger.data["pipelines"][pipeline]) | ||
| ) | ||
| self.meta = update(self.meta, deepcopy(kwargs)) | ||
| def __repr__(self): | ||
| """ | ||
| A human-friendly representation of this project analysis. | ||
| Parameters | ||
| ---------- | ||
| None | ||
| """ | ||
| return f"<Project analysis for {len(self.events)} events and {len(self.analyses)} analyses>" | ||
| @property | ||
| def subjects(self): | ||
| """Return a list of subjects for this project analysis.""" | ||
| return [self.ledger.get_event(subject)[0] for subject in self._subjects] | ||
| @property | ||
| def events(self): | ||
| return self.subjects() | ||
| @classmethod | ||
| def from_dict(cls, parameters, ledger=None): | ||
| parameters = deepcopy(parameters) | ||
| # Check that pars is a dictionary | ||
| if not {"pipeline", "name"} <= parameters.keys(): | ||
| raise ValueError( | ||
| f"Some of the required parameters are missing. " | ||
| f"Found {parameters.keys()}" | ||
| ) | ||
| if "status" not in parameters: | ||
| parameters["status"] = "ready" | ||
| if "event" in parameters: | ||
| parameters.pop("event") | ||
| pipeline = parameters.pop("pipeline") | ||
| name = parameters.pop("name") | ||
| if "comment" not in parameters: | ||
| parameters["comment"] = None | ||
| if "analyses" not in parameters: | ||
| parameters["analyses"] = [] | ||
| return cls(name=name, pipeline=pipeline, ledger=ledger, **parameters) | ||
| @property | ||
| def dependencies(self): | ||
| """Return a list of analyses which this analysis depends upon.""" | ||
| all_matches = [] | ||
| if len(self._needs) == 0: | ||
| return [] | ||
| else: | ||
| matches = set({}) # set(self.event.analyses) | ||
| # matches.remove(self) | ||
| requirements = self._process_dependencies(deepcopy(self._needs)) | ||
| analyses = [] | ||
| for subject in self.subjects: | ||
| sub = self.ledger.get_event(subject)[0] | ||
| self._subject_obs.append(sub) | ||
| for analysis in sub.analyses: | ||
| analyses.append(analysis) | ||
| for attribute, match in requirements: | ||
| filtered_analyses = list( | ||
| filter( | ||
| lambda x: x.matches_filter(attribute, match), | ||
| analyses, | ||
| ) | ||
| ) | ||
| matches = set.union(matches, set(filtered_analyses)) | ||
| for analysis in matches: | ||
| all_matches.append(analysis.name) | ||
| return all_matches | ||
| def to_dict(self): | ||
| """ | ||
| Return this project production as a dictionary. | ||
| Parameters | ||
| ---------- | ||
| event : bool | ||
| If set to True the output is designed to be included nested within an event. | ||
| The event name is not included in the representation, and the production name is provided as a key. | ||
| """ | ||
| dictionary = {} | ||
| dictionary = update(dictionary, self.meta) | ||
| dictionary["name"] = self.name | ||
| dictionary["status"] = self.status | ||
| if isinstance(self.pipeline, str): | ||
| dictionary["pipeline"] = self.pipeline | ||
| else: | ||
| dictionary["pipeline"] = self.pipeline.name.lower() | ||
| dictionary["comment"] = self.comment | ||
| if self.review: | ||
| dictionary["review"] = self.review.copy() # .to_dicts() | ||
| dictionary["needs"] = self.dependencies | ||
| if "quality" in self.meta: | ||
| dictionary["quality"] = self.meta["quality"] | ||
| if "priors" in self.meta: | ||
| dictionary["priors"] = self.meta["priors"] | ||
| for key, value in self.meta.items(): | ||
| dictionary[key] = value | ||
| if "repository" in self.meta: | ||
| dictionary["repository"] = self.repository.url | ||
| if "ledger" in dictionary: | ||
| dictionary.pop("ledger") | ||
| if "pipelines" in dictionary: | ||
| dictionary.pop("pipelines") | ||
| dictionary["subjects"] = self._subjects | ||
| dictionary["analyses"] = self._analysis_spec | ||
| output = dictionary | ||
| return output | ||
| @property | ||
| def rundir(self): | ||
| """ | ||
| Returns the rundir for this event | ||
| """ | ||
| if "rundir" in self.meta: | ||
| return self.meta["rundir"] | ||
| elif self.work_dir: | ||
| self.meta["rundir"] = self.work_dir | ||
| return self.meta["rundir"] | ||
| else: | ||
| return None | ||
| @rundir.setter | ||
| def rundir(self, value): | ||
| """ | ||
| Set the run directory. | ||
| """ | ||
| if "rundir" not in self.meta: | ||
| self.meta["rundir"] = value | ||
| else: | ||
| self.meta["rundir"] = value | ||
| class GravitationalWaveTransient(SimpleAnalysis): | ||
@@ -216,6 +919,91 @@ """ | ||
| def __init__(self, subject, name, pipeline, status=None, comment=None, **kwargs): | ||
| super().init(subject, name, pipeline, status=None, comment=None, **kwargs) | ||
| def __init__(self, subject, name, pipeline, **kwargs): | ||
| """ | ||
| A specific analysis on a GW transient event. | ||
| Parameters | ||
| ---------- | ||
| subject : `asimov.event` | ||
| The event this analysis is running on. | ||
| name : str | ||
| The name of this analysis. | ||
| status : str | ||
| The status of this analysis. | ||
| pipeline : str | ||
| This analysis's pipeline. | ||
| comment : str | ||
| A comment on this analysis. | ||
| """ | ||
| self.category = config.get("general", "calibration_directory") | ||
| super().__init__(subject, name, pipeline, **kwargs) | ||
| self._checks() | ||
| self.psds = self._collect_psds() | ||
| self.xml_psds = self._collect_psds(format="xml") | ||
| if "cip jobs" in self.meta: | ||
| # TODO: Should probably raise a deprecation warning | ||
| self.meta["sampler"]["cip jobs"] = self.meta["cip jobs"] | ||
| if "scheduler" not in self.meta: | ||
| self.meta["scheduler"] = {} | ||
| if "likelihood" not in self.meta: | ||
| self.meta["likelihood"] = {} | ||
| if "marginalization" not in self.meta["likelihood"]: | ||
| self.meta["likelihood"]["marginalization"] = {} | ||
| if "data" not in self.meta: | ||
| self.meta["data"] = {} | ||
| if "data files" not in self.meta["data"]: | ||
| self.meta["data"]["data files"] = {} | ||
| if "lmax" in self.meta: | ||
| # TODO: Should probably raise a deprecation warning | ||
| self.meta["sampler"]["lmax"] = self.meta["lmax"] | ||
| # Check that the upper frequency is included, otherwise calculate it | ||
| if "quality" in self.meta: | ||
| if ("maximum frequency" not in self.meta["quality"]) and ( | ||
| "sample rate" in self.meta["likelihood"] | ||
| ): | ||
| self.meta["quality"]["maximum frequency"] = {} | ||
| # Account for the PSD roll-off with the 0.875 factor | ||
| for ifo in self.meta["interferometers"]: | ||
| self.meta["quality"]["maximum frequency"][ifo] = int( | ||
| 0.875 * self.meta["likelihood"]["sample rate"] / 2 | ||
| ) | ||
| if ("quality" in self.meta) and ("event time" in self.meta): | ||
| if ("segment start" not in self.meta["quality"]) and ( | ||
| "segment length" in self.meta["data"] | ||
| ): | ||
| self.meta["likelihood"]["segment start"] = ( | ||
| self.meta["event time"] - self.meta["data"]["segment length"] + 2 | ||
| ) | ||
| # self.event.meta['likelihood']['segment start'] = self.meta['data']['segment start'] | ||
| # Update waveform data | ||
| if "waveform" not in self.meta: | ||
| self.logger.info("Didn't find waveform information in the metadata") | ||
| self.meta["waveform"] = {} | ||
| if "approximant" in self.meta: | ||
| self.logger.warning( | ||
| "Found deprecated approximant information, " | ||
| "moving to waveform area of ledger" | ||
| ) | ||
| approximant = self.meta.pop("approximant") | ||
| self.meta["waveform"]["approximant"] = approximant | ||
| if "reference frequency" in self.meta["likelihood"]: | ||
| self.logger.warning( | ||
| "Found deprecated ref freq information, " | ||
| "moving to waveform area of ledger" | ||
| ) | ||
| ref_freq = self.meta["likelihood"].pop("reference frequency") | ||
| self.meta["waveform"]["reference frequency"] = ref_freq | ||
| # Gather the PSDs for the job | ||
| self.psds = self._collect_psds() | ||
| def _checks(self): | ||
@@ -238,3 +1026,3 @@ """ | ||
| ): | ||
| warn( | ||
| self.logger.warn( | ||
| "The upper-cutoff frequency is not equal to 0.875 times the Nyquist frequency." | ||
@@ -261,21 +1049,2 @@ ) | ||
| @property | ||
| def psds(self): | ||
| """ | ||
| Return the PSDs stored for this transient event. | ||
| """ | ||
| if "psds" in self.meta and self.quality: | ||
| if self.quality["sample-rate"] in self.meta["psds"]: | ||
| self.psds = self.meta["psds"][self.quality["sample-rate"]] | ||
| else: | ||
| self.psds = {} | ||
| else: | ||
| self.psds = {} | ||
| for ifo, psd in self.psds.items(): | ||
| if self.subject.repository: | ||
| self.psds[ifo] = os.path.join(self.subject.repository.directory, psd) | ||
| else: | ||
| self.psds[ifo] = psd | ||
| def get_timefile(self): | ||
@@ -314,4 +1083,77 @@ """ | ||
| def get_configuration(self): | ||
| """ | ||
| Get the configuration file contents for this event. | ||
| """ | ||
| if "ini" in self.meta: | ||
| ini_loc = self.meta["ini"] | ||
| else: | ||
| # We'll need to search the repository for it. | ||
| try: | ||
| ini_loc = self.subject.repository.find_prods(self.name, self.category)[ | ||
| 0 | ||
| ] | ||
| if not os.path.exists(ini_loc): | ||
| raise ValueError("Could not open the ini file.") | ||
| except IndexError: | ||
| raise ValueError("Could not open the ini file.") | ||
| try: | ||
| ini = RunConfiguration(ini_loc) | ||
| except ValueError: | ||
| raise ValueError("Could not open the ini file") | ||
| except configparser.MissingSectionHeaderError: | ||
| raise ValueError("This isn't a valid ini file") | ||
| class Production(SimpleAnalysis): | ||
| pass | ||
| return ini | ||
| def _collect_psds(self, format="ascii"): | ||
| """ | ||
| Collect the required psds for this production. | ||
| """ | ||
| psds = {} | ||
| # If the PSDs are specifically provided in the ledger, | ||
| # use those. | ||
| if format == "ascii": | ||
| keyword = "psds" | ||
| elif format == "xml": | ||
| keyword = "xml psds" | ||
| else: | ||
| raise ValueError(f"This PSD format ({format}) is not recognised.") | ||
| if keyword in self.meta: | ||
| # if self.meta["likelihood"]["sample rate"] in self.meta[keyword]: | ||
| psds = self.meta[keyword] # [self.meta["likelihood"]["sample rate"]] | ||
| # First look through the list of the job's dependencies | ||
| # to see if they're provided by a job there. | ||
| elif self.dependencies: | ||
| productions = {} | ||
| for production in self.event.productions: | ||
| productions[production.name] = production | ||
| for previous_job in self.dependencies: | ||
| try: | ||
| # Check if the job provides PSDs as an asset and were produced with compatible settings | ||
| if keyword in productions[previous_job].pipeline.collect_assets(): | ||
| if self._check_compatible(productions[previous_job]): | ||
| psds = productions[previous_job].pipeline.collect_assets()[ | ||
| keyword | ||
| ] | ||
| break | ||
| else: | ||
| self.logger.info( | ||
| f"The PSDs from {previous_job} are not compatible with this job." | ||
| ) | ||
| else: | ||
| psds = {} | ||
| except Exception: | ||
| psds = {} | ||
| # Otherwise return no PSDs | ||
| else: | ||
| psds = {} | ||
| for ifo, psd in psds.items(): | ||
| self.logger.debug(f"PSD-{ifo}: {psd}") | ||
| return psds |
@@ -18,2 +18,5 @@ [general] | ||
| [asimov start] | ||
| accounting = ligo.dev.o4.cbc.pe.lalinference | ||
| [pipelines] | ||
@@ -20,0 +23,0 @@ environment = /cvmfs/oasis.opensciencegrid.org/ligo/sw/conda/envs/igwn-py39 |
@@ -12,3 +12,5 @@ """ | ||
| import asimov.event | ||
| from asimov.analysis import ProjectAnalysis | ||
| from asimov import current_ledger as ledger | ||
| from asimov.ledger import Ledger | ||
| from asimov.utils import update | ||
@@ -28,3 +30,3 @@ | ||
| def apply_page(file, event, ledger=ledger): | ||
| def apply_page(file, event=None, ledger=ledger): | ||
| if file[:4] == "http": | ||
@@ -46,3 +48,2 @@ r = requests.get(file) | ||
| for document in quick_parse: | ||
| if document["kind"] == "event": | ||
@@ -77,6 +78,7 @@ logger.info("Found an event") | ||
| logger.exception(e) | ||
| production = asimov.event.Production.from_dict(document, event=event_o) | ||
| production = asimov.event.Production.from_dict( | ||
| parameters=document, subject=event_o, ledger=ledger | ||
| ) | ||
| try: | ||
| event_o.add_production(production) | ||
| ledger.update_event(event_o) | ||
| ledger.add_analysis(production, event=event_o) | ||
| click.echo( | ||
@@ -95,2 +97,74 @@ click.style("●", fg="green") | ||
| elif document["kind"].lower() == "postprocessing": | ||
| # Handle a project analysis | ||
| logger.info("Found a postprocessing description") | ||
| document.pop("kind") | ||
| if event: | ||
| event_s = event | ||
| if event: | ||
| try: | ||
| event_o = ledger.get_event(event_s)[0] | ||
| level = event_o | ||
| except KeyError as e: | ||
| click.echo( | ||
| click.style("●", fg="red") | ||
| + f" Could not apply postprocessing, couldn't find the event {event}" | ||
| ) | ||
| logger.exception(e) | ||
| else: | ||
| level = ledger | ||
| try: | ||
| if document["name"] in level.data.get("postprocessing stages", {}): | ||
| click.echo( | ||
| click.style("●", fg="red") | ||
| + f" Could not apply postprocessing, as {document['name']} is already in the ledger." | ||
| ) | ||
| logger.error( | ||
| f" Could not apply postprocessing, as {document['name']} is already in the ledger." | ||
| ) | ||
| else: | ||
| if "postprocessing stages" not in level.data: | ||
| level.data["postprocessing stages"] = {} | ||
| if isinstance(level, asimov.event.Event): | ||
| level.meta["postprocessing stages"][document["name"]] = document | ||
| elif isinstance(level, Ledger): | ||
| level.data["postprocessing stages"][document["name"]] = document | ||
| level.name = "the project" | ||
| ledger.save() | ||
| click.echo( | ||
| click.style("●", fg="green") | ||
| + f" Successfully added {document['name']} to {level.name}." | ||
| ) | ||
| logger.info(f"Added {document['name']}") | ||
| except ValueError as e: | ||
| click.echo( | ||
| click.style("●", fg="red") | ||
| + f" Could not apply {document['name']} to project as " | ||
| + "a post-process already exists with this name" | ||
| ) | ||
| logger.exception(e) | ||
| elif document["kind"].lower() == "projectanalysis": | ||
| # Handle a project analysis | ||
| logger.info("Found a project analysis") | ||
| document.pop("kind") | ||
| analysis = ProjectAnalysis.from_dict(document, ledger=ledger) | ||
| try: | ||
| ledger.add_analysis(analysis) | ||
| click.echo( | ||
| click.style("●", fg="green") | ||
| + f" Successfully added {analysis.name} to this project." | ||
| ) | ||
| ledger.save() | ||
| logger.info(f"Added {analysis.name}") | ||
| except ValueError as e: | ||
| click.echo( | ||
| click.style("●", fg="red") | ||
| + f" Could not apply {analysis.name} to project as " | ||
| + "an analysis already exists with this name" | ||
| ) | ||
| logger.exception(e) | ||
| elif document["kind"] == "configuration": | ||
@@ -112,6 +186,3 @@ logger.info("Found configurations") | ||
| hook.load()(ledger).run(event) | ||
| click.echo( | ||
| click.style("●", fg="green") | ||
| + f"{event} has been applied." | ||
| ) | ||
| click.echo(click.style("●", fg="green") + f"{event} has been applied.") | ||
@@ -121,4 +192,3 @@ break | ||
| click.echo( | ||
| click.style("●", fg="red") | ||
| + f"No hook found matching {hookname}. " | ||
| click.style("●", fg="red") + f"No hook found matching {hookname}. " | ||
| f"Installed hooks are {', '.join(discovered_hooks.names)}" | ||
@@ -125,0 +195,0 @@ ) |
+11
-59
@@ -15,4 +15,4 @@ import ast | ||
| from asimov import current_ledger as ledger | ||
| from asimov.utils import find_calibrations, update | ||
| from asimov.event import DescriptionException, Event | ||
| from asimov.utils import update | ||
| from asimov.event import Event | ||
@@ -136,3 +136,3 @@ | ||
| @click.argument("delete") | ||
| @click.argument("event", default=None) | ||
| @event.command() | ||
@@ -146,27 +146,2 @@ def delete(event): | ||
| # @click.argument("event") | ||
| # @click.option("--yaml", "yaml", default=None) | ||
| # @click.option("--ini", "ini", default=None) | ||
| # @event.command() | ||
| # def populate(event, yaml, ini): | ||
| # """ | ||
| # Populate an event ledger with data from ini or yaml files. | ||
| # """ | ||
| # event = ledger.get_event(event) | ||
| # # Check the calibration files for this event | ||
| # click.echo("Check the calibration.") | ||
| # click.echo(event.name) | ||
| # calibration(event=event.name) | ||
| # # Check the IFOs for this event | ||
| # click.echo("Check the IFO list") | ||
| # try: | ||
| # checkifo(event.name) | ||
| # except: | ||
| # pass | ||
| # if yaml: | ||
| # add_data(event.name, yaml) | ||
| @click.argument("event", default=None) | ||
@@ -284,33 +259,10 @@ @click.option("--json", "json_data", default=None) | ||
| def calibration(event, calibration): | ||
| """ | ||
| Add calibration files to an event from a filepath. | ||
| """ | ||
| event = ledger.get_event(event)[0] | ||
| try: | ||
| event._check_calibration() | ||
| except DescriptionException: | ||
| print(event.name) | ||
| time = event.meta["event time"] | ||
| if not calibration[0]: | ||
| try: | ||
| calibrations = find_calibrations(time) | ||
| except ValueError: | ||
| calibrations = {} | ||
| else: | ||
| calibrations = {} | ||
| for cal in calibration: | ||
| calibrations[cal.split(":")[0]] = cal.split(":")[1] | ||
| print(calibrations) | ||
| update(event.meta["data"]["calibration"], calibrations) | ||
| ledger.update_event(event) | ||
| # @click.argument("data") | ||
| # @click.argument("event") | ||
| # @event.command() | ||
| # def load(event, data): | ||
| # event = ledger.get_event(event) | ||
| # with open(data, "r") as datafile: | ||
| # data = yaml.safe_load(datafile.read()) | ||
| # event.meta = update(event.meta, data) | ||
| # ledger.update_event(event) | ||
| calibrations = {} | ||
| for cal in calibration: | ||
| calibrations[cal.split(":")[0]] = cal.split(":")[1] | ||
| update(event.meta["data"]["calibration"], calibrations) | ||
| ledger.update_event(event) |
+234
-5
| """ | ||
| Olivaw management commands | ||
| """ | ||
| import os | ||
@@ -15,2 +16,3 @@ import pathlib | ||
| from asimov.pipeline import PipelineException | ||
| from asimov.git import EventRepo | ||
@@ -46,2 +48,28 @@ | ||
| logger.setLevel(LOGGER_LEVEL) | ||
| for analysis in ledger.project_analyses: | ||
| if analysis.status in {"ready"}: | ||
| # Need to ensure a directory exists for these! | ||
| subj_string = "_".join([f"{subject}" for subject in analysis._subjects]) | ||
| project_analysis_dir = os.path.join( | ||
| "checkouts", "project-analyses", subj_string | ||
| ) | ||
| if not os.path.exists(project_analysis_dir): | ||
| os.makedirs(project_analysis_dir) | ||
| click.echo( | ||
| click.style("●", fg="green") | ||
| + f" Building project analysis {analysis.name}" | ||
| ) | ||
| analysis.pipeline.before_config() | ||
| analysis.make_config( | ||
| filename=os.path.join(project_analysis_dir, f"{analysis.name}.ini"), | ||
| dryrun=dryrun, | ||
| ) | ||
| click.echo( | ||
| click.style("●", fg="green") | ||
| + f" Created configuration for {analysis.name}" | ||
| ) | ||
| for event in ledger.get_event(event): | ||
@@ -143,2 +171,206 @@ | ||
| logger.setLevel(LOGGER_LEVEL) | ||
| # this should add the required repository field to the project analysis | ||
| # with the correct type | ||
| # FIXME: for now, this only accountd for the case where we have multiple analysis | ||
| # done but not yet several productions for the same event and pipelines. This would | ||
| # require extra checks when adding to the dictionary | ||
| interest_dict = {} | ||
| for analysis in ledger.project_analyses: | ||
| if analysis.pipeline.name not in interest_dict.keys(): | ||
| interest_dict[analysis.pipeline.name] = [] | ||
| if "interest status" in analysis.meta.keys(): | ||
| interest_dict[analysis.pipeline.name].append( | ||
| { | ||
| "interest status": analysis.meta["interest status"], | ||
| "subjects": set(analysis.subjects), | ||
| } | ||
| ) | ||
| for analysis in ledger.project_analyses: | ||
| # need to change the logic of analysis set up as to account for | ||
| # dependencies | ||
| to_analyse = True | ||
| extra_prio = False | ||
| if analysis.status not in {"ready"}: | ||
| to_analyse = False | ||
| elif analysis._needs: | ||
| # check if the parent jobs are said to be interesting | ||
| # this does not account for the old logic in the project analyses | ||
| interested_pipelines = 0 | ||
| for old_analysis in analysis._needs: | ||
| if old_analysis in interest_dict.keys(): | ||
| if len(interest_dict[old_analysis]) > 0: | ||
| for cases in interest_dict[old_analysis]: | ||
| if cases["interest status"] is True and cases[ | ||
| "subjects" | ||
| ] == set(analysis.subjects): | ||
| interested_pipelines += 1 | ||
| if interested_pipelines < int(analysis.meta["needs settings"]["minimum"]): | ||
| to_analyse = False | ||
| # check if we need to account for extra priority comming from atlenstics | ||
| if "extra priority" in analysis.meta["needs settings"].keys(): | ||
| if ( | ||
| analysis.meta["needs settings"]["extra priority"] | ||
| == "atlenstics_compatibility" | ||
| ): | ||
| # we need to verify if extra priority is needed | ||
| if "atlenstics_compatibility" in interest_dict.keys(): | ||
| # need to add a check on the length of the list to avoid some failures | ||
| if len(interest_dict["atlenstics_compatibility"]) > 0: | ||
| if ( | ||
| interest_dict["atlenstics_compatibility"][0][ | ||
| "interest status" | ||
| ] | ||
| is True | ||
| ): | ||
| extra_prio = True | ||
| running_and_requiring_priority_check = False | ||
| if analysis.status in {"running"} and analysis._needs: | ||
| if "needs settings" in analysis.meta.keys(): | ||
| if "logic" in analysis.meta["needs settings"]: | ||
| if analysis.meta["needs settings"]["logic"] == "add_priority": | ||
| running_and_requiring_priority_check = True | ||
| if to_analyse: | ||
| # Need to ensure a directory exists for these! | ||
| subjects = analysis._subjects | ||
| subj_string = "_".join([f"{subjects[i]}" for i in range(len(subjects))]) | ||
| project_analysis_dir = os.path.join( | ||
| "checkouts", | ||
| "project-analyses", | ||
| subj_string, | ||
| ) | ||
| if analysis.repository is None: | ||
| analysis.repository = EventRepo.create(project_analysis_dir) | ||
| else: | ||
| if isinstance(analysis.repository, str): | ||
| if ( | ||
| "git@" in analysis.repository | ||
| or "https://" in analysis.repository | ||
| ): | ||
| analysis.repository = EventRepo.from_url( | ||
| analysis.repository, | ||
| analysis.event.name, | ||
| directory=None, | ||
| update=update, | ||
| ) | ||
| else: | ||
| analysis.repository = EventRepo.create(analysis.repository) | ||
| click.echo( | ||
| click.style("●", fg="green") | ||
| + f" Submitting project analysis {analysis.name}" | ||
| ) | ||
| pipe = analysis.pipeline | ||
| try: | ||
| pipe.build_dag(dryrun=dryrun) | ||
| except PipelineException as e: | ||
| logger.error( | ||
| "The pipeline failed to build a DAG file.", | ||
| ) | ||
| logger.exception(e) | ||
| click.echo( | ||
| click.style("●", fg="red") + f" Failed to submit {analysis.name}" | ||
| ) | ||
| except ValueError: | ||
| logger.info("Unable to submit an unbuilt project analysis") | ||
| click.echo( | ||
| click.style("●", fg="red") | ||
| + f" Unable to submit {analysis.name} as it hasn't been built yet." | ||
| ) | ||
| click.echo("Try running `asimov manage build` first.") | ||
| try: | ||
| pipe.submit_dag(dryrun=dryrun) | ||
| if not dryrun: | ||
| click.echo( | ||
| click.style("●", fg="green") + f" Submitted {analysis.name}" | ||
| ) | ||
| analysis.status = "running" | ||
| ledger.update_analysis_in_project_analysis(analysis) | ||
| # directly add the extra priority related if needed | ||
| if extra_prio: | ||
| job_id = analysis.scheduler["job id"] | ||
| extra_prio = 20 | ||
| condor.change_job_priority(job_id, extra_prio, use_old=False) | ||
| except PipelineException as e: | ||
| analysis.status = "stuck" | ||
| click.echo( | ||
| click.style("●", fg="red") + f" Unable to submit {analysis.name}" | ||
| ) | ||
| logger.exception(e) | ||
| ledger.update_analysis_in_project_analysis(analysis) | ||
| ledger.save() | ||
| logger.error( | ||
| f"The pipeline failed to submit the DAG file to the cluster. {e}", | ||
| ) | ||
| if not dryrun: | ||
| # Refresh the job list | ||
| job_list = condor.CondorJobList() | ||
| job_list.refresh() | ||
| # Update the ledger | ||
| ledger.save() | ||
| else: | ||
| click.echo( | ||
| click.style("●", fg="yellow") | ||
| + f"Project analysis {analysis.name} not ready to submit" | ||
| ) | ||
| # addition to see if we need to adjust the priority of a running job | ||
| if running_and_requiring_priority_check: | ||
| # enquire about the old priority | ||
| try: | ||
| current_prio = int( | ||
| condor.get_job_priority(analysis.meta["scheduler"]["job id"]) | ||
| ) | ||
| except TypeError: | ||
| # can happen when the job has done running | ||
| current_prio = 0 | ||
| # calculate the priority it is expected to have | ||
| interested_pipelines = 0 | ||
| for old_analysis in analysis._needs: | ||
| if old_analysis in interest_dict.keys(): | ||
| if len(interest_dict[old_analysis]) > 0: | ||
| if interest_dict[old_analysis][-1]["interest status"] is True: | ||
| interested_pipelines += 1 | ||
| if interested_pipelines < 2: | ||
| theoretical_prio = 0 | ||
| else: | ||
| theoretical_prio = int((interested_pipelines - 2) * 10) | ||
| extra_prio = False | ||
| if "extra priority" in analysis.meta["needs settings"].keys(): | ||
| if ( | ||
| analysis.meta["needs settings"]["extra priority"] | ||
| == "atlenstics_compatibility" | ||
| ): | ||
| # we need to verify if extra priority is needed | ||
| if "atlenstics_compatibility" in interest_dict.keys(): | ||
| if len(interest_dict["atlenstics_compatibility"]) > 0: | ||
| if ( | ||
| interest_dict["atlenstics_compatibility"][0][ | ||
| "interest status" | ||
| ] | ||
| is True | ||
| ): | ||
| extra_prio = True | ||
| if extra_prio: | ||
| theoretical_prio += 20 | ||
| # check if we currently have the correct priority or if an adaptation is needed | ||
| if theoretical_prio != current_prio: | ||
| logger.info( | ||
| f"Adjusting priority of {analysis.name} from {current_prio} to {theoretical_prio}" | ||
| ) | ||
| condor.change_job_priority( | ||
| analysis.meta["scheduler"]["job id"], | ||
| theoretical_prio, | ||
| use_old=False, | ||
| ) | ||
| for event in ledger.get_event(event): | ||
@@ -184,3 +416,3 @@ ready_productions = event.get_all_latest() | ||
| logger.error( | ||
| "The pipeline failed to build a DAG file.", | ||
| "failed to build a DAG file.", | ||
| ) | ||
@@ -192,4 +424,3 @@ logger.exception(e) | ||
| ) | ||
| except ValueError as e: | ||
| print("ERROR", e) | ||
| except ValueError: | ||
| logger.info("Unable to submit an unbuilt production") | ||
@@ -257,3 +488,2 @@ click.echo( | ||
| click.echo("\t (No results available)") | ||
| # print(production.results()) | ||
@@ -296,2 +526,1 @@ | ||
| pass | ||
| # print(production.results()) |
+281
-28
| import shutil | ||
| import configparser | ||
| import sys | ||
| import traceback | ||
| import os | ||
| import sys | ||
| import click | ||
@@ -34,2 +35,3 @@ from copy import deepcopy | ||
| "arguments": "monitor --chain", | ||
| "accounting_group": config.get("asimov start", "accounting"), | ||
| "output": os.path.join(".asimov", "asimov_cron.out"), | ||
@@ -124,3 +126,249 @@ "on_exit_remove": "false", | ||
| # also check the analyses in the project analyses | ||
| for analysis in ledger.project_analyses: | ||
| click.secho(f"Subjects: {analysis.subjects}", bold=True) | ||
| if analysis.status.lower() in ACTIVE_STATES: | ||
| logger.debug(f"Available analyses: project_analyses/{analysis.name}") | ||
| click.echo( | ||
| "\t- " | ||
| + click.style(f"{analysis.name}", bold=True) | ||
| + click.style(f"[{analysis.pipeline}]", fg="green") | ||
| ) | ||
| # ignore the analysis if it is set to ready as it has not been started yet | ||
| if analysis.status.lower() == "ready": | ||
| click.secho(f" \t ● {analysis.status.lower()}", fg="green") | ||
| logger.debug(f"Ready production: project_analyses/{analysis.name}") | ||
| continue | ||
| # check if there are jobs that need to be stopped | ||
| if analysis.status.lower() == "stop": | ||
| pipe = analysis.pipeline | ||
| logger.debug(f"Stop production project_analyses/{analysis.name}") | ||
| if not dry_run: | ||
| pipe.eject_job() | ||
| analysis.status = "stopped" | ||
| ledger.update_analysis_in_project_analysis(analysis) | ||
| click.secho(" \t Stopped", fg="red") | ||
| else: | ||
| click.echo("\t\t{analysis.name} --> stopped") | ||
| continue | ||
| # deal with the condor jobs | ||
| analysis_scheduler = analysis.meta["scheduler"].copy() | ||
| try: | ||
| if "job id" in analysis_scheduler: | ||
| if not dry_run: | ||
| if analysis_scheduler["job id"] in job_list.jobs: | ||
| job = job_list.jobs[analysis_scheduler["job id"]] | ||
| else: | ||
| job = None | ||
| else: | ||
| logger.debug( | ||
| f"Running analysis: {event}/{analysis.name}, cluster {analysis.job_id}" | ||
| ) | ||
| click.echo("\t\tRunning under condor") | ||
| else: | ||
| raise ValueError | ||
| if not dry_run: | ||
| if job.status.lower() == "idle": | ||
| click.echo( | ||
| " \t " | ||
| + click.style("●", "green") | ||
| + f" {analysis.name} is in the queue (condor id: {analysis_scheduler['job id']})" | ||
| ) | ||
| elif job.status.lower() == "running": | ||
| click.echo( | ||
| " \t " | ||
| + click.style("●", "green") | ||
| + f" {analysis.name} is running (condor id: {analysis_scheduler['job id']})" | ||
| ) | ||
| if "profiling" not in analysis.meta: | ||
| analysis.meta["profiling"] = {} | ||
| if hasattr(analysis.pipeline, "while_running"): | ||
| analysis.pipeline.while_running() | ||
| analysis.status = "running" | ||
| ledger.update_analysis_in_project_analysis(analysis) | ||
| elif job.status.lower() == "completed": | ||
| pipe.after_completion() | ||
| click.echo( | ||
| " \t " | ||
| + click.style("●", "green") | ||
| + f" {analysis.name} has finished and post-processing has been started" | ||
| ) | ||
| job_list.refresh() | ||
| elif job.status.lower() == "held": | ||
| click.echo( | ||
| " \t " | ||
| + click.style("●", "yellow") | ||
| + f" {analysis.name} is held on the scheduler" | ||
| + f" (condor id: {analysis_scheduler['job id']})" | ||
| ) | ||
| analysis.status = "stuck" | ||
| ledger.update_analysis_in_project_analysis(analysis) | ||
| else: | ||
| continue | ||
| except (ValueError, AttributeError): | ||
| if analysis.pipeline: | ||
| pipe = analysis.pipeline | ||
| if analysis.status.lower() == "stop": | ||
| pipe.eject_job() | ||
| analysis.status = "stopped" | ||
| ledger.update_analysis_in_project_analysis(analysis) | ||
| click.echo( | ||
| " \t " | ||
| + click.style("●", "red") | ||
| + f" {analysis.name} has been stopped" | ||
| ) | ||
| job_list.refresh() | ||
| elif analysis.status.lower() == "finished": | ||
| pipe.after_completion() | ||
| click.echo( | ||
| " \t " | ||
| + click.style("●", "green") | ||
| + f" {analysis.name} has finished and post-processing has been started" | ||
| ) | ||
| job_list.refresh() | ||
| elif analysis.status.lower() == "processing": | ||
| if pipe.detect_completion_processing(): | ||
| try: | ||
| pipe.after_processing() | ||
| click.echo( | ||
| " \t " | ||
| + click.style("●", "green") | ||
| + f" {analysis.name} has been finalised and stored" | ||
| ) | ||
| except ValueError as e: | ||
| click.echo(e) | ||
| else: | ||
| click.echo( | ||
| " \t " | ||
| + click.style("●", "green") | ||
| + f" {analysis.name} has finished and post-processing" | ||
| + f" is stuck ({analysis_scheduler['job id']})" | ||
| ) | ||
| elif ( | ||
| pipe.detect_completion() | ||
| and analysis.status.lower() == "processing" | ||
| ): | ||
| click.echo( | ||
| " \t " | ||
| + click.style("●", "green") | ||
| + f" {analysis.name} has finished and post-processing is running" | ||
| ) | ||
| elif ( | ||
| pipe.detect_completion() | ||
| and analysis.status.lower() == "running" | ||
| ): | ||
| if "profiling" not in analysis.meta: | ||
| analysis.meta["profiling"] = {} | ||
| try: | ||
| config.get("condor", "scheduler") | ||
| analysis.meta["profiling"] = condor.collect_history( | ||
| analysis_scheduler["job id"] | ||
| ) | ||
| analysis_scheduler["job id"] = None | ||
| ledger.update_analysis_in_project_analysis(analysis) | ||
| except ( | ||
| configparser.NoOptionError, | ||
| configparser.NoSectionError, | ||
| ): | ||
| logger.warning( | ||
| "Could not collect condor profiling data as no " | ||
| + "scheduler was specified in the config file." | ||
| ) | ||
| except ValueError as e: | ||
| logger.error("Could not collect condor profiling data.") | ||
| logger.exception(e) | ||
| pass | ||
| analysis.status = "finished" | ||
| ledger.update_analysis_in_project_analysis(analysis) | ||
| pipe.after_completion() | ||
| click.secho( | ||
| f" \t ● {analysis.name} - Completion detected", | ||
| fg="green", | ||
| ) | ||
| job_list.refresh() | ||
| else: | ||
| # job may have been evicted from the clusters | ||
| click.echo( | ||
| " \t " | ||
| + click.style("●", "yellow") | ||
| + f" {analysis.name} is stuck; attempting a rescue" | ||
| ) | ||
| try: | ||
| pipe.resurrect() | ||
| except ( | ||
| Exception | ||
| ): # Sorry, but there are many ways the above command can fail | ||
| analysis.status = "stuck" | ||
| click.echo( | ||
| " \t " | ||
| + click.style("●", "red") | ||
| + f" {analysis.name} is stuck; automatic rescue was not possible" | ||
| ) | ||
| ledger.update_analysis_in_project_analysis(analysis) | ||
| if analysis.status == "stuck": | ||
| click.echo( | ||
| " \t " | ||
| + click.style("●", "yellow") | ||
| + f" {analysis.name} is stuck" | ||
| ) | ||
| ledger.update_analysis_in_project_analysis(analysis) | ||
| ledger.save() | ||
| if chain: | ||
| ctx.invoke(report.html) | ||
| all_analyses = set(ledger.project_analyses) | ||
| complete = { | ||
| analysis | ||
| for analysis in ledger.project_analyses | ||
| if analysis.status in {"finished", "uploaded"} | ||
| } | ||
| others = all_analyses - complete | ||
| if len(others) > 0: | ||
| click.echo( | ||
| "There are also these analyses waiting for other analyses to complete:" | ||
| ) | ||
| for analysis in others: | ||
| needs = ", ".join(analysis._needs) | ||
| click.echo(f"\t{analysis.name} which needs {needs}") | ||
| # need to check for post monitor hooks for each of the analyses | ||
| for analysis in ledger.project_analyses: | ||
| # check for post monitoring | ||
| if "hooks" in ledger.data: | ||
| if "postmonitor" in ledger.data["hooks"]: | ||
| discovered_hooks = entry_points(group="asimov.hooks.postmonitor") | ||
| for hook in discovered_hooks: | ||
| # do not run cbcflow every time | ||
| if hook.name in list( | ||
| ledger.data["hooks"]["postmonitor"].keys() | ||
| ) and hook.name not in ["cbcflow"]: | ||
| try: | ||
| hook.load()(deepcopy(ledger)).run() | ||
| except Exception: | ||
| pass | ||
| if chain: | ||
| ctx.invoke(report.html) | ||
| for event in sorted(ledger.get_event(event), key=lambda e: e.name): | ||
| stuck = 0 | ||
@@ -135,2 +383,3 @@ running = 0 | ||
| ] | ||
| for production in on_deck: | ||
@@ -166,6 +415,6 @@ | ||
| try: | ||
| if "job id" in production.meta: | ||
| if "job id" in production.meta["scheduler"]: | ||
| if not dry_run: | ||
| if production.meta["job id"] in job_list.jobs: | ||
| job = job_list.jobs[production.meta["job id"]] | ||
| if production.job_id in job_list.jobs: | ||
| job = job_list.jobs[production.job_id] | ||
| else: | ||
@@ -175,3 +424,3 @@ job = None | ||
| logger.debug( | ||
| f"Running analysis: {event}/{production.name}, cluster {production.meta['job id']}" | ||
| f"Running analysis: {event}/{production.name}, cluster {production.job_id}" | ||
| ) | ||
@@ -183,20 +432,6 @@ click.echo("\t\tRunning under condor") | ||
| if not dry_run: | ||
| if ( | ||
| job.status.lower() == "running" | ||
| and production.status == "processing" | ||
| ): | ||
| if job.status.lower() == "idle": | ||
| click.echo( | ||
| " \t " | ||
| + click.style("●", "green") | ||
| + f" Postprocessing for {production.name} is running" | ||
| + f" (condor id: {production.job_id})" | ||
| ) | ||
| production.meta["postprocessing"]["status"] = "running" | ||
| elif job.status.lower() == "idle": | ||
| click.echo( | ||
| " \t " | ||
| + click.style("●", "green") | ||
| + f" {production.name} is in the queue (condor id: {production.job_id})" | ||
@@ -277,3 +512,2 @@ ) | ||
| ) | ||
| production.meta["postprocessing"]["status"] = "stuck" | ||
| elif ( | ||
@@ -300,3 +534,3 @@ pipe.detect_completion() | ||
| ) | ||
| production.meta["job id"] = None | ||
| production.job_id = None | ||
| except ( | ||
@@ -333,3 +567,5 @@ configparser.NoOptionError, | ||
| pipe.resurrect() | ||
| except Exception: # Sorry, but there are many ways the above command can fail | ||
| except ( | ||
| Exception | ||
| ): # Sorry, but there are many ways the above command can fail | ||
| production.status = "stuck" | ||
@@ -363,5 +599,4 @@ click.echo( | ||
| for production in others: | ||
| needs = ", ".join(production.meta["needs"]) | ||
| needs = ", ".join(production._needs) | ||
| click.echo(f"\t{production.name} which needs {needs}") | ||
| # Post-monitor hooks | ||
@@ -372,9 +607,27 @@ if "hooks" in ledger.data: | ||
| for hook in discovered_hooks: | ||
| if hook.name in list(ledger.data["hooks"]["postmonitor"].keys()): | ||
| # do not run cbcflow every time | ||
| if hook.name in list( | ||
| ledger.data["hooks"]["postmonitor"].keys() | ||
| ) and hook.name not in ["cbcflow"]: | ||
| try: | ||
| hook.load()(deepcopy(ledger)).run() | ||
| except Exception: | ||
| pass | ||
| except Exception as exc: | ||
| logger.warning("%s experienced %s", hook.name, type(exc)) | ||
| traceback_lines = traceback.format_exc().splitlines() | ||
| traceback_text = "Traceback:\n" + "\n".join(traceback_lines) | ||
| logger.warning(traceback_text) | ||
| if chain: | ||
| ctx.invoke(report.html) | ||
| # run the cbcflow hook once to update all the info if needed | ||
| if "hooks" in ledger.data: | ||
| if "postmonitor" in ledger.data["hooks"]: | ||
| discovered_hooks = entry_points(group="asimov.hooks.postmonitor") | ||
| for hook in discovered_hooks: | ||
| if hook.name == "cbcflow": | ||
| logger.info("Found cbcflow postmonitor hook, trying to run it") | ||
| try: | ||
| hook.load()(deepcopy(ledger)).run() | ||
| except Exception: | ||
| logger.warning("Unable to run the cbcflow hook") |
@@ -99,7 +99,3 @@ from copy import copy | ||
| if config.get("ledger", "engine") == "gitlab": | ||
| raise NotImplementedError( | ||
| "The Gitlab interface has been removed from this version." "" | ||
| ) | ||
| elif config.get("ledger", "engine") == "yamlfile": | ||
| if config.get("ledger", "engine") == "yamlfile": | ||
| ledger.events[event.name] = event.to_dict() | ||
@@ -106,0 +102,0 @@ ledger.save() |
| """ | ||
| Project management tools. | ||
| zProject management tools. | ||
| """ | ||
@@ -197,7 +197,2 @@ | ||
| ) | ||
| elif config.get("ledger", "engine") == "gitlab": | ||
| raise NotImplementedError( | ||
| "The gitlab interface has been removed from this version of asimov." | ||
| ) | ||
| config.set("ledger", "engine", "yamlfile") | ||
@@ -204,0 +199,0 @@ config.set("ledger", "location", os.path.join(".asimov", "ledger.yml")) |
| """ | ||
| Reporting functions | ||
| """ | ||
| from datetime import datetime | ||
@@ -5,0 +6,0 @@ |
+87
-27
@@ -20,2 +20,4 @@ """ | ||
| @click.option("--other_subjects", "-o_e", "other_subjects", default=None) | ||
| @click.option("--pipeline", "-p", default=None) | ||
| @click.option("--message", "-m", "message", default=None) | ||
@@ -26,40 +28,98 @@ @click.argument("status", required=False, default=None) | ||
| @review.command() | ||
| def add(event, production, status, message): | ||
| def add(event, production, status, message, other_subjects=None, pipeline=None): | ||
| """ | ||
| Add a review signoff or rejection to an event. | ||
| Arguments: | ||
| ---------- | ||
| event: str | ||
| The event for which we need to add a review for a given analysis. | ||
| If we are considering a project analysis, this will be used as the first | ||
| subject | ||
| production: str | ||
| The production for which we need to add the review status | ||
| status: str | ||
| The status of the review. Can be one of | ||
| "rejected", "approved", "preferred", "deprecated" | ||
| message: str, optional | ||
| The message to add to the review | ||
| other_subjects: str, optional | ||
| The other subjects to be considered in the project analysis and | ||
| for which we want to add a review status. | ||
| pipeline: str, optional | ||
| The pipeline used in the project analysis and for which we want to add | ||
| the review status. | ||
| """ | ||
| valid = {"REJECTED", "APPROVED", "PREFERRED", "DEPRECATED"} | ||
| events = current_ledger.get_event(event) | ||
| if events is None: | ||
| click.echo( | ||
| click.style("●", fg="red") + f" Could not find an event called {event}" | ||
| ) | ||
| if other_subjects is None: | ||
| valid = {"REJECTED", "APPROVED", "PREFERRED", "DEPRECATED"} | ||
| events = current_ledger.get_event(event) | ||
| if events is None: | ||
| click.echo( | ||
| click.style("●", fg="red") + f" Could not find an event called {event}" | ||
| ) | ||
| else: | ||
| for event in events: | ||
| production = [ | ||
| production_o | ||
| for production_o in event.productions | ||
| if production_o.name == production | ||
| ][0] | ||
| click.secho(event.name, bold=True) | ||
| if status.upper() in valid: | ||
| message = ReviewMessage( | ||
| message=message, status=status, production=production | ||
| ) | ||
| production.review.add(message) | ||
| else: | ||
| click.echo( | ||
| click.style("●", fg="red") | ||
| + f" Did not understand the review status {status.lower()}." | ||
| + " The review status must be one of " | ||
| + "{APPROVED, REJECTED, PREFERRED, DEPRECATED}" | ||
| ) | ||
| if hasattr(event, "issue_object"): | ||
| production.event.update_data() | ||
| current_ledger.update_event(event) | ||
| click.echo( | ||
| click.style("●", fg="green") | ||
| + f" {event.name}/{production.name} {status.lower()}" | ||
| ) | ||
| else: | ||
| for event in events: | ||
| production = [ | ||
| production_o | ||
| for production_o in event.productions | ||
| if production_o.name == production | ||
| ][0] | ||
| click.secho(event.name, bold=True) | ||
| found = False | ||
| if status.upper() in valid: | ||
| subjects = list(other_subjects.replace("[", "").replace("]", "").split(",")) | ||
| subjects = [subject.strip() for subject in subjects] | ||
| subjects = [event] + subjects | ||
| for analysis in current_ledger.project_analyses: | ||
| if ( | ||
| (analysis.name == production) | ||
| and (analysis.pipeline.name == pipeline) | ||
| and (set(analysis.subjects) == set(subjects)) | ||
| ): | ||
| found = True | ||
| click.secho(analysis.name, bold=True) | ||
| click.secho(analysis.pipeline) | ||
| click.secho(" ".join(analysis.subjects)) | ||
| message = ReviewMessage( | ||
| message=message, status=status, production=production | ||
| ) | ||
| production.review.add(message) | ||
| else: | ||
| analysis.review.add(message) | ||
| click.echo( | ||
| click.style("●", fg="red") | ||
| + f" Did not understand the review status {status.lower()}." | ||
| + " The review status must be one of " | ||
| + "{APPROVED, REJECTED, PREFERRED, DEPRECATED}" | ||
| click.style("●", fg="green") | ||
| + f" {event.name}/{production.name} {status.lower()}" | ||
| ) | ||
| if hasattr(event, "issue_object"): | ||
| production.event.update_data() | ||
| current_ledger.update_event(event) | ||
| click.echo( | ||
| click.style("●", fg="green") | ||
| + f" {event.name}/{production.name} {status.lower()}" | ||
| if not found: | ||
| click.secho( | ||
| f"Unable to find a project analysis for pipeline {pipeline}, " | ||
| f"production {production} and subjects {set(subjects)}", | ||
| fg="red", | ||
| ) | ||
@@ -66,0 +126,0 @@ |
+70
-9
@@ -9,2 +9,3 @@ """ | ||
| """ | ||
| import os | ||
@@ -102,11 +103,11 @@ import datetime | ||
| HISTORY_CLASSADS = [ | ||
| "CompletionDate", | ||
| "CpusProvisioned", | ||
| "GpusProvisioned", | ||
| "CumulativeSuspensionTime", | ||
| "EnteredCurrentStatus", | ||
| "MaxHosts", | ||
| "RemoteWallClockTime", | ||
| "RequestCpus", | ||
| ] | ||
| "CompletionDate", | ||
| "CpusProvisioned", | ||
| "GpusProvisioned", | ||
| "CumulativeSuspensionTime", | ||
| "EnteredCurrentStatus", | ||
| "MaxHosts", | ||
| "RemoteWallClockTime", | ||
| "RequestCpus", | ||
| ] | ||
| try: | ||
@@ -179,2 +180,5 @@ jobs = schedd.history( | ||
| for key, value in kwargs.items(): | ||
| setattr(self, key, value) | ||
| def __repr__(self): | ||
@@ -359,1 +363,58 @@ out = f"<htcondor job | {self.idno} | {self.status} " | ||
| f.write(yaml.dump(self.jobs)) | ||
| def get_job_priority(job_id): | ||
| """ | ||
| Returns the priority of a job given its id. | ||
| This is useful when some conitioning happens and should lead | ||
| to a change in the analysis priority compared to other events. | ||
| This returns None if the information cannot be found. | ||
| Parameters | ||
| ---------- | ||
| - job_id: the condor job id for which we want to get the priority | ||
| """ | ||
| # make collector to query the info | ||
| schedd = htcondor.Schedd() | ||
| # query job info | ||
| job_info = schedd.query(f"ClusterId == {job_id}.0") | ||
| if job_info: | ||
| priority = job_info[0].get("JobPrio", None) | ||
| return priority | ||
| else: | ||
| return None | ||
| def change_job_priority(job_id, extra_priority, use_old=False): | ||
| """ | ||
| Function to change the job priority for a given job. | ||
| Parameters: | ||
| ----------- | ||
| - job_id: the condor job id for which we want to change the priority | ||
| - extra_priority: the extra priority that we want to add to the job | ||
| - use_old: if True, we add the new priority to the old one. Else, we simply replace it | ||
| """ | ||
| # setup a schedduler to query the priority | ||
| schedd = htcondor.Schedd() | ||
| main_job_info = schedd.query(f"ClusterId == {job_id}") | ||
| all_jobs = schedd.query() | ||
| if main_job_info: | ||
| # look for all the jobs needing to be updated (also child jobs) | ||
| jobs_to_update = [] | ||
| for job in all_jobs: | ||
| if "JobBatchId" in job.keys(): | ||
| if job["JobBatchId"] == main_job_info[0]["JobBatchId"]: | ||
| jobs_to_update.append(job["ClusterId"]) | ||
| for j_id in jobs_to_update: | ||
| if use_old: | ||
| extra_priority += get_job_priority(j_id) | ||
| schedd.edit(f"ClusterId == {j_id}", {"JobPrio": extra_priority}) | ||
| logger.info(f"Changed the priority of job {j_id} to {extra_priority}") | ||
| else: | ||
| logger.warning(f"Unable to adapt the priority for job {job_id}") |
@@ -21,7 +21,7 @@ ; | ||
| seglen={{ data['segment length'] }} | ||
| window={{ likelihood['window length'] }} | ||
| flow={{ quality['lowest minimum frequency'] }} | ||
| window={{ likelihood['window length'] | default: likelihood['segment length'] }} | ||
| flow={{ quality['lowest minimum frequency'] | default: 20}} | ||
| srate={{ likelihood['sample rate'] }} | ||
| PSDlength={{ likelihood['psd length'] }} | ||
| rolloff={{ likelihood['roll off time'] | default: 0.4 }} | ||
| PSDlength={{ likelihood['psd length'] | default: likelihood['segment length'] }} | ||
| rolloff={{ likelihood['roll off time'] | default: 1.0 }} | ||
| ifo-list={{ ifos }} | ||
@@ -28,0 +28,0 @@ segment-start={{ likelihood['segment start'] }} |
@@ -67,3 +67,7 @@ {%- if production.event.repository -%} | ||
| generation-seed=42 | ||
| psd-dict={ {% for ifo in ifos %}{{ifo}}:{{production.psds[ifo]}},{% endfor %} } | ||
| {%- if production._previous_assets() contains "psds" %} | ||
| psd-dict={ {% for ifo in ifos %}{{ifo}}:{{production._previous_assets()['psds'][ifo]}},{% endfor %} } | ||
| {%- elif production.meta contains "psds" %} | ||
| psd-dict={ {% for ifo in ifos %}{{ifo}}:{{production.meta['psds'][ifo]}},{% endfor %} } | ||
| {% endif %} | ||
| psd-fractional-overlap=0.5 | ||
@@ -119,3 +123,2 @@ post-trigger-duration={{ likelihood['post trigger time'] | default: 2.0 }} | ||
| log-directory=None | ||
| online-pe=False | ||
| osg={{ scheduler['osg'] | default: False }} | ||
@@ -122,0 +125,0 @@ desired-sites={{ scheduler['desired sites'] | default: None }} |
@@ -222,2 +222,2 @@ {%- if production.event.repository %} | ||
| # force-eta-range : the usual doesn't awlays work; this uses 20:1 prior range, should be set consistently to above | ||
| # force-eta-range="[0.0453514739,0.24999999999]" | ||
| # force-eta-range="[0.0453514739,0.24999999999]" |
@@ -19,2 +19,3 @@ """ | ||
| """ | ||
| from tinydb import Query, TinyDB | ||
@@ -21,0 +22,0 @@ |
+93
-875
@@ -5,7 +5,3 @@ """ | ||
| import glob | ||
| import os | ||
| import pathlib | ||
| from copy import deepcopy | ||
| import configparser | ||
| import subprocess | ||
@@ -16,12 +12,7 @@ | ||
| from ligo.gracedb.rest import GraceDb, HTTPError | ||
| from liquid import Liquid | ||
| from asimov import config, logger, LOGGER_LEVEL | ||
| from asimov.pipelines import known_pipelines | ||
| from asimov.storage import Store | ||
| from asimov.utils import update, diff_dict | ||
| from asimov.analysis import SubjectAnalysis, GravitationalWaveTransient | ||
| from .git import EventRepo | ||
| from .ini import RunConfiguration | ||
| from .review import Review | ||
@@ -47,6 +38,5 @@ status_map = { | ||
| def __init__(self, message, issue=None, production=None): | ||
| def __init__(self, message, production=None): | ||
| super(DescriptionException, self).__init__(message) | ||
| self.message = message | ||
| self.issue = issue | ||
| self.production = production | ||
@@ -70,14 +60,3 @@ | ||
| def submit_comment(self): | ||
| """ | ||
| Submit this exception as a comment on the gitlab | ||
| issue for the event. | ||
| """ | ||
| if self.issue: | ||
| self.issue.add_label("yaml-error", state=False) | ||
| self.issue.add_note(self.__repr__()) | ||
| else: | ||
| print(self.__repr__()) | ||
| class Event: | ||
@@ -126,2 +105,7 @@ """ | ||
| if "ledger" in kwargs: | ||
| self.ledger = kwargs["ledger"] | ||
| else: | ||
| self.ledger = None | ||
| if repository: | ||
@@ -150,10 +134,2 @@ if "git@" in repository or "https://" in repository: | ||
| self.issue_object = None | ||
| if "issue" in kwargs: | ||
| if kwargs["issue"]: | ||
| self.issue_object = kwargs.pop("issue") | ||
| self.from_notes() | ||
| else: | ||
| self.issue_object = None | ||
| self.productions = [] | ||
@@ -164,25 +140,12 @@ self.graph = nx.DiGraph() | ||
| for production in kwargs["productions"]: | ||
| try: | ||
| if ("analyses" in production) or ("productions" in production): | ||
| self.add_production( | ||
| Production.from_dict( | ||
| production, event=self, issue=self.issue_object | ||
| ) | ||
| SubjectAnalysis.from_dict(production, subject=self) | ||
| ) | ||
| except DescriptionException as error: | ||
| error.submit_comment() | ||
| self.productions = [] | ||
| self.graph = nx.DiGraph() | ||
| if "productions" in kwargs: | ||
| for production in kwargs["productions"]: | ||
| try: | ||
| else: | ||
| self.add_production( | ||
| Production.from_dict( | ||
| production, event=self, issue=self.issue_object | ||
| production, subject=self, ledger=self.ledger | ||
| ) | ||
| ) | ||
| except DescriptionException as error: | ||
| error.submit_comment() | ||
| self._check_required() | ||
@@ -200,2 +163,6 @@ | ||
| @property | ||
| def analyses(self): | ||
| return self.productions | ||
| def __eq__(self, other): | ||
@@ -270,3 +237,2 @@ if isinstance(other, Event): | ||
| """ | ||
| if production.name in [production_o.name for production_o in self.productions]: | ||
@@ -281,10 +247,9 @@ raise ValueError( | ||
| if production.dependencies: | ||
| dependencies = production.dependencies | ||
| dependencies = [ | ||
| production | ||
| for production in self.productions | ||
| if production.name in dependencies | ||
| ] | ||
| for dependency in dependencies: | ||
| self.graph.add_edge(dependency, production) | ||
| for dependency in production.dependencies: | ||
| if dependency == production: | ||
| continue | ||
| analysis_dict = { | ||
| production.name: production for production in self.productions | ||
| } | ||
| self.graph.add_edge(analysis_dict[dependency], production) | ||
@@ -295,7 +260,7 @@ def __repr__(self): | ||
| @classmethod | ||
| def from_dict(cls, data, issue=None, update=False, ledger=None): | ||
| def from_dict(cls, data, update=False, ledger=None): | ||
| """ | ||
| Convert a dictionary representation of the event object to an Event object. | ||
| """ | ||
| event = cls(**data, issue=issue, update=update, ledger=ledger) | ||
| event = cls(**data, update=update, ledger=ledger) | ||
| if ledger: | ||
@@ -306,22 +271,20 @@ ledger.add_event(event) | ||
| @classmethod | ||
| def from_yaml(cls, data, issue=None, update=False, repo=True, ledger=None): | ||
| def from_yaml(cls, data, update=False, repo=True, ledger=None): | ||
| """ | ||
| Parse YAML to generate this event. | ||
| Parse YAML to generate this event. | ||
| Parameters | ||
| ---------- | ||
| data : str | ||
| YAML-formatted event specification. | ||
| issue : int | ||
| The gitlab issue which stores this event. | ||
| update : bool | ||
| Flag to determine if the repository is updated when loaded. | ||
| Defaults to False. | ||
| ledger : `asimov.ledger.Ledger` | ||
| An asimov ledger which the event should be included in. | ||
| | Parameters | ||
| ---------- | ||
| data : str | ||
| YAML-formatted event specification. | ||
| update : bool | ||
| Flag to determine if the repository is updated when loaded. | ||
| Defaults to False. | ||
| ledger : `asimov.ledger.Ledger` | ||
| An asimov ledger which the event should be included in. | ||
| Returns | ||
| ------- | ||
| Event | ||
| An event. | ||
| Returns | ||
| ------- | ||
| Event | ||
| An event. | ||
| """ | ||
@@ -341,7 +304,15 @@ data = yaml.safe_load(data) | ||
| try: | ||
| calibration = data["data"]["calibration"] | ||
| except KeyError: | ||
| calibration = {} | ||
| if "productions" in data: | ||
| if isinstance(data["productions"], type(None)): | ||
| data["productions"] = [] | ||
| if "working directory" not in data: | ||
| data["working directory"] = os.path.join( | ||
| config.get("general", "rundir_default"), data["name"] | ||
| ) | ||
| if not repo and "repository" in data: | ||
| data.pop("repository") | ||
| event = cls.from_dict(data, update=update, ledger=ledger) | ||
| if "productions" in data: | ||
@@ -353,15 +324,2 @@ if isinstance(data["productions"], type(None)): | ||
| if "interferometers" in data and "event time" in data: | ||
| if calibration.keys() != data["interferometers"]: | ||
| # We need to fetch the calibration data | ||
| from asimov.utils import find_calibrations | ||
| try: | ||
| data["data"]["calibration"] = find_calibrations(data["event time"]) | ||
| except ValueError: | ||
| logger.warning( | ||
| f"Could not find calibration files for {data['name']}" | ||
| ) | ||
| if "working directory" not in data: | ||
@@ -374,43 +332,6 @@ data["working directory"] = os.path.join( | ||
| data.pop("repository") | ||
| event = cls.from_dict(data, issue=issue, update=update, ledger=ledger) | ||
| event = cls.from_dict(data, update=update, ledger=ledger) | ||
| if issue: | ||
| event.issue_object = issue | ||
| event.from_notes() | ||
| return event | ||
| @classmethod | ||
| def from_issue(cls, issue, update=False, repo=True): | ||
| """ | ||
| Parse an issue description to generate this event. | ||
| Parameters | ||
| ---------- | ||
| update : bool | ||
| Flag to determine if the repository is updated when loaded. | ||
| Defaults to False. | ||
| """ | ||
| text = issue.text.split("---") | ||
| event = cls.from_yaml(text[1], issue, update=update, repo=repo) | ||
| event.text = text | ||
| # event.from_notes() | ||
| return event | ||
| def from_notes(self): | ||
| """ | ||
| Update the event data from information in the issue comments. | ||
| Uses nested dictionary update code from | ||
| https://stackoverflow.com/questions/3232943/update-value-of-a-nested-dictionary-of-varying-depth#3233356 | ||
| """ | ||
| notes_data = self.issue_object.parse_notes() | ||
| for note in notes_data: | ||
| update(self.meta, note) | ||
| def get_gracedb(self, gfile, destination): | ||
@@ -478,19 +399,5 @@ """ | ||
| for production in self.productions: | ||
| # Remove duplicate data | ||
| prod_dict = production.to_dict()[production.name] | ||
| dupes = [] | ||
| prod_names = [] | ||
| for key, value in prod_dict.items(): | ||
| if production.name in prod_names: | ||
| continue | ||
| if key in data: | ||
| if data[key] == value: | ||
| dupes.append(key) | ||
| for dupe in dupes: | ||
| prod_dict.pop(dupe) | ||
| prod_names.append(production.name) | ||
| data["productions"].append({production.name: prod_dict}) | ||
| data["productions"].append(production.to_dict(event=False)) | ||
| data["working directory"] = self.work_dir | ||
| if "issue" in data: | ||
| data.pop("issue") | ||
| if "ledger" in data: | ||
@@ -505,9 +412,4 @@ data.pop("ledger") | ||
| data = self.to_dict() | ||
| return yaml.dump(data, default_flow_style=False) | ||
| def to_issue(self): | ||
| self.text[1] = "\n" + self.to_yaml() | ||
| return "---".join(self.text) | ||
| def draw_dag(self): | ||
@@ -533,12 +435,43 @@ """ | ||
| for production in self.productions | ||
| if production.finished is False | ||
| if (production.finished is False and production.status not in {"wait"}) | ||
| ] | ||
| ) | ||
| ends = [ | ||
| x | ||
| for x in unfinished.reverse().nodes() | ||
| if unfinished.reverse().out_degree(x) == 0 | ||
| ] | ||
| return set(ends) # only want to return one version of each production! | ||
| ends = [] | ||
| for production in unfinished.reverse().nodes(): | ||
| if ( | ||
| "needs settings" not in production.meta | ||
| or production.meta["needs settings"] == "default" | ||
| ): | ||
| if ( | ||
| unfinished.reverse().out_degree(production) == 0 | ||
| and production.finished is False | ||
| ): | ||
| ends.append(production) | ||
| elif "needs settings" in production.meta: | ||
| interested_pipelines = 0 | ||
| if ( | ||
| "minimum" in production.meta["needs settings"] | ||
| and production.meta["needs settings"]["condition"] | ||
| == "is_interesting" | ||
| ): | ||
| for prod in unfinished.reverse().nodes(): | ||
| if ( | ||
| prod.pipeline.name != production.pipeline.name | ||
| and prod.pipeline.name in production._needs | ||
| ): | ||
| if prod.meta["interest status"] is True: | ||
| interested_pipelines += 1 | ||
| if ( | ||
| interested_pipelines | ||
| >= production.meta["needs settings"]["minimum"] | ||
| ): | ||
| ends.append(production) | ||
| ready_values = {end for end in ends if end.status.lower() == "ready"} | ||
| return set(ready_values) # only want to return one version of each production! | ||
| def build_report(self): | ||
@@ -570,717 +503,2 @@ for production in self.productions: | ||
| class Production: | ||
| """ | ||
| A specific production run. | ||
| Parameters | ||
| ---------- | ||
| event : `asimov.event` | ||
| The event this production is running on. | ||
| name : str | ||
| The name of this production. | ||
| status : str | ||
| The status of this production. | ||
| pipeline : str | ||
| This production's pipeline. | ||
| comment : str | ||
| A comment on this production. | ||
| """ | ||
| def __init__(self, event, name, status, pipeline, comment=None, **kwargs): | ||
| self.event = event if isinstance(event, Event) else event[0] | ||
| self.subject = self.event | ||
| self.name = name | ||
| pathlib.Path( | ||
| os.path.join(config.get("logging", "directory"), self.event.name, name) | ||
| ).mkdir(parents=True, exist_ok=True) | ||
| self.logger = logger.getChild("analysis").getChild( | ||
| f"{self.event.name}/{self.name}" | ||
| ) | ||
| self.logger.setLevel(LOGGER_LEVEL) | ||
| # fh = logging.FileHandler(logfile) | ||
| # formatter = logging.Formatter("%(asctime)s - %(message)s", "%Y-%m-%d %H:%M:%S") | ||
| # fh.setFormatter(formatter) | ||
| # self.logger.addHandler(fh) | ||
| self.category = config.get("general", "calibration_directory") | ||
| if status: | ||
| self.status_str = status.lower() | ||
| else: | ||
| self.status_str = "none" | ||
| self.comment = comment | ||
| # Start by adding pipeline defaults | ||
| if "pipelines" in self.event.ledger.data: | ||
| if pipeline in self.event.ledger.data["pipelines"]: | ||
| self.meta = deepcopy(self.event.ledger.data["pipelines"][pipeline]) | ||
| else: | ||
| self.meta = {} | ||
| else: | ||
| self.meta = {} | ||
| if "postprocessing" in self.event.ledger.data: | ||
| self.meta["postprocessing"] = deepcopy( | ||
| self.event.ledger.data["postprocessing"] | ||
| ) | ||
| # Update with the event and project defaults | ||
| if "ledger" in self.event.meta: | ||
| self.event.meta.pop("ledger") | ||
| self.meta = update(self.meta, deepcopy(self.event.meta)) | ||
| if "productions" in self.meta: | ||
| self.meta.pop("productions") | ||
| self.meta = update(self.meta, kwargs) | ||
| if "sampler" not in self.meta: | ||
| self.meta["sampler"] = {} | ||
| if "cip jobs" in self.meta: | ||
| # TODO: Should probably raise a deprecation warning | ||
| self.meta["sampler"]["cip jobs"] = self.meta["cip jobs"] | ||
| if "scheduler" not in self.meta: | ||
| self.meta["scheduler"] = {} | ||
| if "likelihood" not in self.meta: | ||
| self.meta["likelihood"] = {} | ||
| if "marginalization" not in self.meta["likelihood"]: | ||
| self.meta["likelihood"]["marginalization"] = {} | ||
| if "data" not in self.meta: | ||
| self.meta["data"] = {} | ||
| if "data files" not in self.meta["data"]: | ||
| self.meta["data"]["data files"] = {} | ||
| if "lmax" in self.meta: | ||
| # TODO: Should probably raise a deprecation warning | ||
| self.meta["sampler"]["lmax"] = self.meta["lmax"] | ||
| self.pipeline = pipeline | ||
| self.pipeline = known_pipelines[pipeline.lower()](self) | ||
| if "review" in self.meta: | ||
| self.review = Review.from_dict(self.meta["review"], production=self) | ||
| self.meta.pop("review") | ||
| else: | ||
| self.review = Review() | ||
| # Check that the upper frequency is included, otherwise calculate it | ||
| if ( | ||
| "quality" in self.meta | ||
| and ("maximum frequency" not in self.meta["quality"]) | ||
| and ("sample rate" in self.meta["likelihood"]) | ||
| and len(self.meta["interferometers"]) > 0 | ||
| ) or ( | ||
| list(self.meta.get("quality", {}).get("maximum frequency", {}).keys()) | ||
| != self.meta.get("interferometers") | ||
| and ("sample rate" in self.meta["likelihood"]) | ||
| ): | ||
| if "maximum frequency" not in self.meta["quality"]: | ||
| self.meta["quality"]["maximum frequency"] = {} | ||
| # Account for the PSD roll-off with the 0.875 factor | ||
| for ifo in self.meta["interferometers"]: | ||
| psd_rolloff = self.meta.get("likelihood", {}).get( | ||
| "roll off factor", 0.875 | ||
| ) | ||
| if ifo not in self.meta["quality"]["maximum frequency"]: | ||
| self.meta["quality"]["maximum frequency"][ifo] = int( | ||
| psd_rolloff * self.meta["likelihood"]["sample rate"] / 2 | ||
| ) | ||
| # Add a warning about roll-offs | ||
| if not ("roll off time" in self.meta["likelihood"]): | ||
| self.logger.warning( | ||
| "Using the default roll off settings (0.4-seconds); note that these may result in spectral leakage." | ||
| ) | ||
| # Get the data quality recommendations | ||
| if "quality" in self.event.meta: | ||
| self.quality = self.event.meta["quality"] | ||
| else: | ||
| self.quality = {} | ||
| if "quality" in self.meta: | ||
| if "quality" in kwargs: | ||
| self.meta["quality"].update(kwargs["quality"]) | ||
| self.quality = self.meta["quality"] | ||
| if ("quality" in self.meta) and ("event time" in self.meta): | ||
| if ("segment start" not in self.meta["quality"]) and ( | ||
| "segment length" in self.meta["data"] | ||
| ): | ||
| self.meta["likelihood"]["segment start"] = ( | ||
| self.meta["event time"] | ||
| - self.meta["data"]["segment length"] | ||
| + self.meta.get("likelihood", {}).get("post trigger time", 2) | ||
| ) | ||
| # self.event.meta['likelihood']['segment start'] = self.meta['data']['segment start'] | ||
| # Update waveform data | ||
| if "waveform" not in self.meta: | ||
| self.logger.info("Didn't find waveform information in the metadata") | ||
| self.meta["waveform"] = {} | ||
| if "approximant" in self.meta: | ||
| self.logger.warn( | ||
| "Found deprecated approximant information, " | ||
| "moving to waveform area of ledger" | ||
| ) | ||
| approximant = self.meta.pop("approximant") | ||
| self.meta["waveform"]["approximant"] = approximant | ||
| if "reference frequency" in self.meta["likelihood"]: | ||
| self.logger.warning( | ||
| "Found deprecated ref freq information, " | ||
| "moving to waveform area of ledger" | ||
| ) | ||
| ref_freq = self.meta["likelihood"].pop("reference frequency") | ||
| self.meta["waveform"]["reference frequency"] = ref_freq | ||
| # Gather the PSDs for the job | ||
| self.psds = self._collect_psds() | ||
| # Gather the appropriate prior data for this production | ||
| if "priors" in self.meta: | ||
| self.priors = self.meta["priors"] | ||
| if ( | ||
| "amplitude order" in self.meta["priors"] | ||
| and "pn amplitude order" not in self.meta["waveform"] | ||
| ): | ||
| self.meta["waveform"]["pn amplitude order"] = self.meta["priors"][ | ||
| "amplitude order" | ||
| ] | ||
| def __hash__(self): | ||
| return int(f"{hash(self.name)}{abs(hash(self.event.name))}") | ||
| def __eq__(self, other): | ||
| return (self.name == other.name) & (self.event == other.event) | ||
| def _process_dependencies(self, needs): | ||
| """ | ||
| Process the dependencies list for this production. | ||
| """ | ||
| return needs | ||
| @property | ||
| def dependencies(self): | ||
| if "needs" in self.meta: | ||
| return self._process_dependencies(self.meta["needs"]) | ||
| else: | ||
| return None | ||
| def results(self, filename=None, handle=False, hash=None): | ||
| store = Store(root=config.get("storage", "results_store")) | ||
| if not filename: | ||
| try: | ||
| items = store.manifest.list_resources(self.event.name, self.name) | ||
| return items | ||
| except KeyError: | ||
| return None | ||
| elif handle: | ||
| return open( | ||
| store.fetch_file(self.event.name, self.name, filename, hash), "r" | ||
| ) | ||
| else: | ||
| return store.fetch_file(self.event.name, self.name, filename, hash=hash) | ||
| @property | ||
| def rel_psds(self): | ||
| """ | ||
| Return the relative path to a PSD for a given event repo. | ||
| """ | ||
| rels = {} | ||
| for ifo, psds in self.psds.items(): | ||
| psd = self.psds[ifo] | ||
| psd = psd.split("/") | ||
| rels[ifo] = "/".join(psd[-3:]) | ||
| return rels | ||
| @property | ||
| def reference_frame(self): | ||
| """ | ||
| Calculate the appropriate reference frame. | ||
| """ | ||
| ifos = self.meta["interferometers"] | ||
| if len(ifos) == 1: | ||
| return ifos[0] | ||
| else: | ||
| return "".join(ifos[:2]) | ||
| def get_meta(self, key): | ||
| """ | ||
| Get the value of a metadata attribute, or return None if it doesn't | ||
| exist. | ||
| """ | ||
| if key in self.meta: | ||
| return self.meta[key] | ||
| else: | ||
| return None | ||
| def set_meta(self, key, value): | ||
| """ | ||
| Set a metadata attribute which doesn't currently exist. | ||
| """ | ||
| if key not in self.meta: | ||
| self.meta[key] = value | ||
| self.event.ledger.update_event(self.event) | ||
| else: | ||
| raise ValueError | ||
| @property | ||
| def finished(self): | ||
| finished_states = ["uploaded"] | ||
| return self.status in finished_states | ||
| @property | ||
| def status(self): | ||
| return self.status_str.lower() | ||
| @status.setter | ||
| def status(self, value): | ||
| self.status_str = value.lower() | ||
| self.event.ledger.update_event(self.event) | ||
| @property | ||
| def job_id(self): | ||
| if "job id" in self.meta: | ||
| return self.meta["job id"] | ||
| else: | ||
| return None | ||
| @job_id.setter | ||
| def job_id(self, value): | ||
| self.meta["job id"] = value | ||
| if self.event.issue_object: | ||
| self.event.issue_object.update_data() | ||
| def to_dict(self, event=True): | ||
| """ | ||
| Return this production as a dictionary. | ||
| Parameters | ||
| ---------- | ||
| event : bool | ||
| If set to True the output is designed to be included nested within an event. | ||
| The event name is not included in the representation, and the production name is provided as a key. | ||
| """ | ||
| dictionary = deepcopy(self.meta) | ||
| if not event: | ||
| dictionary["event"] = self.event.name | ||
| dictionary["name"] = self.name | ||
| dictionary["status"] = self.status | ||
| dictionary["pipeline"] = self.pipeline.name.lower() | ||
| dictionary["comment"] = self.comment | ||
| dictionary["review"] = self.review.to_dicts() | ||
| if "data" in self.meta: | ||
| dictionary["data"] = self.meta["data"] | ||
| if "likelihood" in self.meta: | ||
| dictionary["likelihood"] = self.meta["likelihood"] | ||
| if "quality" in self.meta: | ||
| dictionary["quality"] = self.meta["quality"] | ||
| if "priors" in self.meta: | ||
| dictionary["priors"] = self.meta["priors"] | ||
| if "waveform" in self.meta: | ||
| dictionary["waveform"] = self.meta["waveform"] | ||
| dictionary["needs"] = self.dependencies | ||
| dictionary["job id"] = self.job_id | ||
| # Remove duplicates of pipeline defaults | ||
| if self.pipeline.name.lower() in self.event.ledger.data["pipelines"]: | ||
| defaults = deepcopy( | ||
| self.event.ledger.data["pipelines"][self.pipeline.name.lower()] | ||
| ) | ||
| else: | ||
| defaults = {} | ||
| if "postprocessing" in self.event.ledger.data: | ||
| defaults["postprocessing"] = deepcopy( | ||
| self.event.ledger.data["postprocessing"] | ||
| ) | ||
| if "ledger" in self.event.meta: | ||
| self.event.meta.pop("ledger") | ||
| defaults = update(defaults, deepcopy(self.event.meta)) | ||
| dictionary = diff_dict(defaults, dictionary) | ||
| for key, value in self.meta.items(): | ||
| if key == "operations": | ||
| continue | ||
| if "repository" in self.meta: | ||
| dictionary["repository"] = self.repository.url | ||
| if "ledger" in dictionary: | ||
| dictionary.pop("ledger") | ||
| if "pipelines" in dictionary: | ||
| dictionary.pop("pipelines") | ||
| if "productions" in dictionary: | ||
| dictionary.pop("productions") | ||
| if not event: | ||
| output = dictionary | ||
| else: | ||
| output = {self.name: dictionary} | ||
| return output | ||
| @property | ||
| def rundir(self): | ||
| """ | ||
| Return the run directory for this event. | ||
| """ | ||
| if "rundir" in self.meta: | ||
| return os.path.abspath(self.meta["rundir"]) | ||
| elif "working directory" in self.event.meta: | ||
| value = os.path.join(self.event.meta["working directory"], self.name) | ||
| self.meta["rundir"] = value | ||
| return os.path.abspath(value) | ||
| else: | ||
| return None | ||
| @rundir.setter | ||
| def rundir(self, value): | ||
| """ | ||
| Set the run directory. | ||
| """ | ||
| if "rundir" not in self.meta: | ||
| self.meta["rundir"] = value | ||
| if self.event.issue_object is not None: | ||
| self.event.issue_object.update_data() | ||
| else: | ||
| raise ValueError | ||
| def get_psds(self, format="ascii", sample_rate=None): | ||
| """ | ||
| Get the PSDs for this production. | ||
| Parameters | ||
| ---------- | ||
| format : {ascii, xml} | ||
| The format of the PSD to be returned. | ||
| Defaults to the ascii format. | ||
| sample_rate : int | ||
| The sample rate of the PSD to be returned. | ||
| Defaults to None, in which case the sample rate in the event data is used. | ||
| Returns | ||
| ------- | ||
| list | ||
| A list of PSD files for the production. | ||
| """ | ||
| if sample_rate is None: | ||
| try: | ||
| if ( | ||
| "likelihood" in self.meta | ||
| and "sample rate" in self.meta["likelihood"] | ||
| ): | ||
| sample_rate = self.meta["likelihood"]["sample rate"] | ||
| else: | ||
| raise DescriptionException( | ||
| "The sample rate for this event cannot be found.", | ||
| issue=self.event.issue_object, | ||
| production=self.name, | ||
| ) | ||
| except Exception as e: | ||
| raise DescriptionException( | ||
| "The sample rate for this event cannot be found.", | ||
| issue=self.event.issue_object, | ||
| production=self.name, | ||
| ) from e | ||
| if (len(self.psds) > 0) and (format == "ascii"): | ||
| return self.psds | ||
| elif format == "xml": | ||
| # TODO: This is a hack, and we need a better way to sort this. | ||
| files = glob.glob( | ||
| f"{self.event.repository.directory}/{self.category}/psds/{sample_rate}/*.xml.gz" | ||
| ) | ||
| return files | ||
| def get_timefile(self): | ||
| """ | ||
| Find this event's time file. | ||
| Returns | ||
| ------- | ||
| str | ||
| The location of the time file. | ||
| """ | ||
| try: | ||
| return self.event.repository.find_timefile(self.category) | ||
| except FileNotFoundError: | ||
| new_file = os.path.join("gps.txt") | ||
| with open(new_file, "w") as f: | ||
| f.write(f"{self.event.meta['event time']}") | ||
| self.logger.info( | ||
| f"Created a new time file in {new_file} with time {self.event.meta['event time']}" | ||
| ) | ||
| self.event.repository.add_file( | ||
| new_file, | ||
| os.path.join(self.category, new_file), | ||
| "Added a new GPS timefile.", | ||
| ) | ||
| return new_file | ||
| def get_coincfile(self): | ||
| """ | ||
| Find this event's coinc.xml file. | ||
| Returns | ||
| ------- | ||
| str | ||
| The location of the time file. | ||
| """ | ||
| try: | ||
| coinc = self.event.repository.find_coincfile(self.category) | ||
| return coinc | ||
| except FileNotFoundError: | ||
| self.logger.info("Could not find a coinc.xml file") | ||
| savepath = os.path.abspath( | ||
| os.path.join( | ||
| self.event.repository.directory, self.category, "coinc.xml" | ||
| ), | ||
| ) | ||
| print(savepath) | ||
| self.event.get_gracedb( | ||
| "coinc.xml", | ||
| savepath, | ||
| ) | ||
| coinc = self.event.repository.find_coincfile(self.category) | ||
| return coinc | ||
| def get_configuration(self): | ||
| """ | ||
| Get the configuration file contents for this event. | ||
| """ | ||
| if "ini" in self.meta: | ||
| ini_loc = self.meta["ini"] | ||
| else: | ||
| # We'll need to search the repository for it. | ||
| try: | ||
| ini_loc = self.event.repository.find_prods(self.name, self.category)[0] | ||
| if not os.path.exists(ini_loc): | ||
| raise ValueError("Could not open the ini file.") | ||
| except IndexError: | ||
| raise ValueError("Could not open the ini file.") | ||
| try: | ||
| ini = RunConfiguration(ini_loc) | ||
| except ValueError: | ||
| raise ValueError("Could not open the ini file") | ||
| except configparser.MissingSectionHeaderError: | ||
| raise ValueError("This isn't a valid ini file") | ||
| return ini | ||
| @classmethod | ||
| def from_dict(cls, parameters, event, issue=None): | ||
| name, pars = list(parameters.items())[0] | ||
| # Check that pars is a dictionary | ||
| if not isinstance(pars, dict): | ||
| if "event" in parameters: | ||
| parameters.pop("event") | ||
| if "status" not in parameters: | ||
| parameters["status"] = "ready" | ||
| return cls(event=event, **parameters) | ||
| # Check all of the required parameters are included | ||
| if not {"status", "pipeline"} <= pars.keys(): | ||
| raise DescriptionException( | ||
| f"Some of the required parameters are missing from {name}", issue, name | ||
| ) | ||
| if "comment" not in pars: | ||
| pars["comment"] = None | ||
| if "event" in pars: | ||
| pars.pop(event) | ||
| return cls(event, name, **pars) | ||
| def __repr__(self): | ||
| return f"<Production {self.name} for {self.event} | status: {self.status}>" | ||
| def _collect_psds(self, format="ascii"): | ||
| """ | ||
| Collect the required psds for this production. | ||
| """ | ||
| psds = {} | ||
| # If the PSDs are specifically provided in the ledger, | ||
| # use those. | ||
| if format == "ascii": | ||
| keyword = "psds" | ||
| elif format == "xml": | ||
| keyword = "xml psds" | ||
| else: | ||
| raise ValueError(f"This PSD format ({format}) is not recognised.") | ||
| if keyword in self.meta: | ||
| # if self.meta["likelihood"]["sample rate"] in self.meta[keyword]: | ||
| psds = self.meta[keyword] # [self.meta["likelihood"]["sample rate"]] | ||
| # First look through the list of the job's dependencies | ||
| # to see if they're provided by a job there. | ||
| elif self.dependencies: | ||
| productions = {} | ||
| for production in self.event.productions: | ||
| productions[production.name] = production | ||
| for previous_job in self.dependencies: | ||
| try: | ||
| # Check if the job provides PSDs as an asset and were produced with compatible settings | ||
| if keyword in productions[previous_job].pipeline.collect_assets(): | ||
| if self._check_compatible(productions[previous_job]): | ||
| psds = productions[previous_job].pipeline.collect_assets()[ | ||
| keyword | ||
| ] | ||
| break | ||
| else: | ||
| self.logger.info( | ||
| f"The PSDs from {previous_job} are not compatible with this job." | ||
| ) | ||
| else: | ||
| psds = {} | ||
| except Exception: | ||
| psds = {} | ||
| # Otherwise return no PSDs | ||
| else: | ||
| psds = {} | ||
| for ifo, psd in psds.items(): | ||
| self.logger.debug(f"PSD-{ifo}: {psd}") | ||
| return psds | ||
| def _check_compatible(self, other_production): | ||
| """ | ||
| Check that the data settings in two productions are sufficiently compatible | ||
| that one can be used as a dependency of the other. | ||
| """ | ||
| compatible = True | ||
| # Check that the sample rate of this analysis is equal or less than that of the preceeding analysis | ||
| # to ensure that PSDs have points at the correct frequencies. | ||
| try: | ||
| compatible = self.meta.get("likelihood", {}).get( | ||
| "sample rate", None | ||
| ) <= other_production.meta.get("likelihood", {}).get("sample rate", None) | ||
| except TypeError: | ||
| # One or more sample rates are missing so these can't be deemed compatible. | ||
| return False | ||
| tests = [ | ||
| ("data", "channels"), | ||
| ("data", "frame types"), | ||
| ("data", "segment length"), | ||
| ] | ||
| for test in tests: | ||
| compatible &= self.meta.get(test[0], {}).get( | ||
| test[1], None | ||
| ) == other_production.meta.get(test[0], {}).get(test[1], None) | ||
| return compatible | ||
| def make_config(self, filename, template_directory=None, dryrun=False): | ||
| """ | ||
| Make the configuration file for this production. | ||
| Parameters | ||
| ---------- | ||
| filename : str | ||
| The location at which the config file should be saved. | ||
| template_directory : str, optional | ||
| The path to the directory containing the pipeline config templates. | ||
| Defaults to the directory specified in the asimov configuration file. | ||
| """ | ||
| self.logger.info("Creating config file.") | ||
| self.psds = self._collect_psds() | ||
| self.xml_psds = self._collect_psds(format="xml") | ||
| if "template" in self.meta: | ||
| template = f"{self.meta['template']}.ini" | ||
| else: | ||
| template = f"{self.pipeline.name.lower()}.ini" | ||
| pipeline = self.pipeline | ||
| if hasattr(pipeline, "config_template"): | ||
| template_file = pipeline.config_template | ||
| else: | ||
| try: | ||
| template_directory = config.get("templating", "directory") | ||
| template_file = os.path.join(f"{template_directory}", template) | ||
| if not os.path.exists(template_file): | ||
| raise Exception | ||
| except Exception: | ||
| from pkg_resources import resource_filename | ||
| template_file = resource_filename("asimov", f"configs/{template}") | ||
| self.logger.info(f"Using {template_file}") | ||
| liq = Liquid(template_file) | ||
| rendered = liq.render(production=self, config=config) | ||
| if not dryrun: | ||
| with open(filename, "w") as output_file: | ||
| output_file.write(rendered) | ||
| self.logger.info(f"Saved as {filename}") | ||
| else: | ||
| print(rendered) | ||
| def build_report(self): | ||
| if self.pipeline: | ||
| self.pipeline.build_report() | ||
| def html(self): | ||
| """ | ||
| An HTML representation of this production. | ||
| """ | ||
| production = self | ||
| card = "" | ||
| card += f"<div class='asimov-analysis asimov-analysis-{self.status}'>" | ||
| card += f"<h4>{self.name}" | ||
| if self.comment: | ||
| card += ( | ||
| f""" <small class="asimov-comment text-muted">{self.comment}</small>""" | ||
| ) | ||
| card += "</h4>" | ||
| if self.status: | ||
| card += f"""<p class="asimov-status"> | ||
| <span class="badge badge-pill badge-{status_map[self.status]}">{self.status}</span> | ||
| </p>""" | ||
| if self.pipeline: | ||
| card += f"""<p class="asimov-pipeline-name">{self.pipeline.name}</p>""" | ||
| if self.pipeline: | ||
| # self.pipeline.collect_pages() | ||
| card += self.pipeline.html() | ||
| if self.rundir: | ||
| card += f"""<p class="asimov-rundir"><code>{production.rundir}</code></p>""" | ||
| else: | ||
| card += """ """ | ||
| if "approximant" in production.meta: | ||
| card += f"""<p class="asimov-attribute">Waveform approximant: | ||
| <span class="asimov-approximant">{production.meta['approximant']}</span> | ||
| </p>""" | ||
| card += """ """ | ||
| card += """</div>""" | ||
| if len(self.review) > 0: | ||
| for review in self.review: | ||
| card += review.html() | ||
| return card | ||
| Production = GravitationalWaveTransient |
+14
-4
@@ -61,3 +61,6 @@ import glob | ||
| os.makedirs(location, exist_ok=True) | ||
| repo = git.Repo.init(location) | ||
| try: | ||
| repo = git.Repo.init(location, initial_branch="master") | ||
| except Exception: | ||
| repo = git.Repo.init(location) | ||
| os.makedirs(os.path.join(location, directory), exist_ok=True) | ||
@@ -193,3 +196,5 @@ with open(os.path.join(location, directory, ".gitkeep"), "w") as f: | ||
| """ | ||
| coinc_file = glob.glob(os.path.join(os.getcwd(), self.directory, category, "*coinc*.xml")) | ||
| coinc_file = glob.glob( | ||
| os.path.join(os.getcwd(), self.directory, category, "*coinc*.xml") | ||
| ) | ||
@@ -218,3 +223,8 @@ if len(coinc_file) > 0: | ||
| self.update() | ||
| path = f"{os.path.join(os.getcwd(), self.directory, category)}/{name}.ini" | ||
| if category is not None: | ||
| path = f"{os.path.join(os.getcwd(), self.directory, category)}/{name}.ini" | ||
| else: | ||
| category = "project_analyses" | ||
| path = f"{os.path.join(os.getcwd(), self.directory)}/{name}.ini" | ||
| return [path] | ||
@@ -286,3 +296,3 @@ | ||
| ---------- | ||
| event : `asimov.gitlab.EventIssue` | ||
| event : `asimov.event.Event` | ||
| The event which the preferred upload is being prepared for. | ||
@@ -289,0 +299,0 @@ prods : list |
+1
-0
@@ -9,2 +9,3 @@ """ | ||
| """ | ||
| import ast | ||
@@ -11,0 +12,0 @@ import getpass |
+87
-58
| """ | ||
| Code for the project ledger. | ||
| """ | ||
| import yaml | ||
@@ -8,2 +9,3 @@ | ||
| import shutil | ||
| from functools import reduce | ||
@@ -13,2 +15,3 @@ import asimov | ||
| from asimov import config | ||
| from asimov.analysis import ProjectAnalysis | ||
| from asimov.event import Event, Production | ||
@@ -33,13 +36,4 @@ from asimov.utils import update, set_directory | ||
| DatabaseLedger.create() | ||
| elif engine == "gitlab": | ||
| raise NotImplementedError( | ||
| "This hasn't been ported to the new interface yet. Stay tuned!" | ||
| ) | ||
| elif engine == "gitlab": | ||
| raise NotImplementedError( | ||
| "This hasn't been ported to the new interface yet. Stay tuned!" | ||
| ) | ||
| class YAMLLedger(Ledger): | ||
@@ -57,7 +51,7 @@ def __init__(self, location=None): | ||
| ] | ||
| self.events = {ev["name"]: ev for ev in self.data["events"]} | ||
| self._all_events = [ | ||
| Event(**self.events[event], ledger=self) | ||
| for event in self.events.keys() | ||
| ] | ||
| Event(**self.events[event], ledger=self) for event in self.events.keys() | ||
| ] | ||
| self.data.pop("events") | ||
@@ -73,2 +67,3 @@ | ||
| data["events"] = [] | ||
| data["project analyses"] = [] | ||
| data["project"] = {} | ||
@@ -86,2 +81,13 @@ data["project"]["name"] = name | ||
| def update_analysis_in_project_analysis(self, analysis): | ||
| """ | ||
| Function to update an analysis contained in the project analyses | ||
| """ | ||
| for i in range(len(self.data["project analyses"])): | ||
| if self.data["project analyses"][i]["name"] == analysis.name: | ||
| dict_to_save = analysis.to_dict().copy() | ||
| dict_to_save["status"] = analysis.status | ||
| self.data["project analyses"][i] = dict_to_save | ||
| self.save() | ||
| def delete_event(self, event_name): | ||
@@ -125,14 +131,47 @@ """ | ||
| def add_event(self, event): | ||
| def add_subject(self, subject): | ||
| """Add a new subject to the ledger.""" | ||
| if "events" not in self.data: | ||
| self.data["events"] = [] | ||
| self.events[event.name] = event.to_dict() | ||
| self.events[subject.name] = subject.to_dict() | ||
| self.save() | ||
| def add_production(self, event, production): | ||
| event.add_production(production) | ||
| self.events[event.name] = event.to_dict() | ||
| def add_event(self, event): | ||
| self.add_subject(subject=event) | ||
| def add_analysis(self, analysis, event=None): | ||
| """ | ||
| Add an analysis to the ledger. | ||
| This method can accept any of the forms of analysis supported by asimov, and | ||
| will determine the correct way to add them to the ledger. | ||
| Parameters | ||
| ---------- | ||
| analysis : `asimov.Analysis` | ||
| The analysis to be added to the ledger. | ||
| event : str, optional | ||
| The name of the event which the analysis should be added to. | ||
| This is not required for project analyses. | ||
| Examples | ||
| -------- | ||
| """ | ||
| if isinstance(analysis, ProjectAnalysis): | ||
| names = [ana["name"] for ana in self.data["project analyses"]] | ||
| if analysis.name not in names: | ||
| self.data["project analyses"].append(analysis.to_dict()) | ||
| else: | ||
| raise ValueError( | ||
| "An analysis with that name already exists in the ledger." | ||
| ) | ||
| else: | ||
| event.add_production(analysis) | ||
| self.events[event.name] = event.to_dict() | ||
| self.save() | ||
| def add_production(self, event, production): | ||
| self.add_analysis(analysis=production, event=event) | ||
| def get_defaults(self): | ||
@@ -157,2 +196,9 @@ """ | ||
| @property | ||
| def project_analyses(self): | ||
| return [ | ||
| ProjectAnalysis.from_dict(analysis, ledger=self) | ||
| for analysis in self.data["project analyses"] | ||
| ] | ||
| def get_event(self, event=None): | ||
@@ -193,6 +239,10 @@ if event: | ||
| productions = filter( | ||
| lambda x: x.meta[parameter] == value | ||
| if (parameter in x.meta) | ||
| else ( | ||
| getattr(x, parameter) == value if hasattr(x, parameter) else False | ||
| lambda x: ( | ||
| x.meta[parameter] == value | ||
| if (parameter in x.meta) | ||
| else ( | ||
| getattr(x, parameter) == value | ||
| if hasattr(x, parameter) | ||
| else False | ||
| ) | ||
| ), | ||
@@ -255,43 +305,22 @@ productions, | ||
| def get_productions(self, event=None, filters=None): | ||
| """Get a list of productions either for a single event or for all events. | ||
| def get_productions(self, event, filters=None, query=None): | ||
| """ | ||
| Get all of the productions for a given event. | ||
| """ | ||
| Parameters | ||
| ---------- | ||
| event : str | ||
| The name of the event to pull productions from. | ||
| Optional; if no event is specified then all of the productions are | ||
| returned. | ||
| if not filters and not query: | ||
| productions = self.db.query("production", "event", event) | ||
| filters : dict | ||
| A dictionary of parameters to filter on. | ||
| Examples | ||
| -------- | ||
| FIXME: Add docs. | ||
| """ | ||
| if event: | ||
| productions = self.get_event(event).productions | ||
| else: | ||
| productions = [] | ||
| for event_i in self.get_event(): | ||
| for production in event_i.productions: | ||
| productions.append(production) | ||
| def apply_filter(productions, parameter, value): | ||
| productions = filter( | ||
| lambda x: x.meta[parameter] == value | ||
| if (parameter in x.meta) | ||
| else ( | ||
| getattr(x, parameter) == value if hasattr(x, parameter) else False | ||
| ), | ||
| productions, | ||
| queries_1 = self.db.Q["event"] == event | ||
| queries = [ | ||
| self.db.Q[parameter] == value for parameter, value in filters.items() | ||
| ] | ||
| productions = self.db.tables["production"].search( | ||
| queries_1 & reduce(lambda x, y: x & y, queries) | ||
| ) | ||
| return productions | ||
| if filters: | ||
| for parameter, value in filters.items(): | ||
| productions = apply_filter(productions, parameter, value) | ||
| return list(productions) | ||
| event = self.get_event(event) | ||
| return [ | ||
| Production.from_dict(dict(production), event) for production in productions | ||
| ] |
| """ | ||
| The locutus script. | ||
| """ | ||
| import os | ||
@@ -5,0 +6,0 @@ |
@@ -13,5 +13,3 @@ """ | ||
| # from .gitlab import EventIssue | ||
| logging.getLogger("werkzeug").setLevel(logging.WARNING) | ||
@@ -81,3 +79,3 @@ logging.getLogger("MARKDOWN").setLevel(logging.WARNING) | ||
| The free-form message for the log message. | ||
| channels : list, or str, {"file", "mattermost", "gitlab"}, optional | ||
| channels : list, or str, {"file", "mattermost"}, optional | ||
| The list of places where the log message should be sent. | ||
@@ -111,3 +109,3 @@ Defaults to file only. | ||
| The free-form message for the log message. | ||
| channels : list, or str, {"file", "mattermost", "gitlab"}, optional | ||
| channels : list, or str, {"file", "mattermost"}, optional | ||
| The list of places where the log message should be sent. | ||
@@ -130,3 +128,3 @@ Defaults to file only. | ||
| The free-form message for the log message. | ||
| channels : list, or str, {"file", "mattermost", "gitlab"}, optional | ||
| channels : list, or str, {"file", "mattermost"}, optional | ||
| The list of places where the log message should be sent. | ||
@@ -149,3 +147,3 @@ Defaults to file only. | ||
| The free-form message for the log message. | ||
| channels : list, or str, {"file", "mattermost", "gitlab"}, optional | ||
| channels : list, or str, {"file", "mattermost"}, optional | ||
| The list of places where the log message should be sent. | ||
@@ -218,3 +216,3 @@ Defaults to file only. | ||
| The free-form message for the log message. | ||
| channels : list, or str, {"file", "mattermost", "gitlab"}, optional | ||
| channels : list, or str, {"file", "mattermost"}, optional | ||
| The list of places where the log message should be sent. | ||
@@ -221,0 +219,0 @@ Defaults to file only. |
+19
-16
@@ -9,5 +9,5 @@ """Defines the interface with generic analysis pipelines.""" | ||
| import asimov.analysis | ||
| warnings.filterwarnings("ignore", module="htcondor") | ||
| import htcondor # NoQA | ||
@@ -47,12 +47,3 @@ | ||
| def submit_comment(self): | ||
| """ | ||
| Submit this exception as a comment on the gitlab | ||
| issue for the event. | ||
| """ | ||
| if self.issue: | ||
| self.issue.add_label("pipeline-error", state=False) | ||
| self.issue.add_note(self.__repr__()) | ||
| class PipelineLogger: | ||
@@ -99,7 +90,13 @@ """Log things for pipelines.""" | ||
| self.category = production.category | ||
| try: | ||
| self.category = production.category | ||
| except AttributeError: | ||
| self.category = None | ||
| self.logger = logger.getChild( | ||
| f"analysis.{production.event.name}/{production.name}" | ||
| ) | ||
| if isinstance(production, asimov.analysis.ProjectAnalysis): | ||
| full_name = f"ProjectAnalysis/{production.name}" | ||
| else: | ||
| full_name = f"analysis.{production.event.name}/{production.name}" | ||
| self.logger = logger.getChild(full_name) | ||
| self.logger.setLevel(LOGGER_LEVEL) | ||
@@ -147,4 +144,7 @@ | ||
| self.production.status = "finished" | ||
| self.production.meta.pop("job id") | ||
| # Need to determine the correct list of post-processing jobs here | ||
| # self.production.meta.pop("job id") | ||
| def collect_assets(self): | ||
@@ -285,2 +285,5 @@ """ | ||
| def build(self): | ||
| pass | ||
| def build_report(self, reportformat="html"): | ||
@@ -287,0 +290,0 @@ """ |
@@ -13,2 +13,4 @@ import sys | ||
| from asimov.pipelines.pesummary import PESummary | ||
| discovered_pipelines = entry_points(group="asimov.pipelines") | ||
@@ -22,2 +24,3 @@ | ||
| "lalinference": LALInference, | ||
| "pesummary": PESummary, | ||
| } | ||
@@ -24,0 +27,0 @@ |
@@ -16,3 +16,3 @@ """BayesWave Pipeline specification.""" | ||
| from ..git import AsimovFileNotFound | ||
| from ..pipeline import Pipeline, PipelineException, PipelineLogger | ||
| from ..pipeline import Pipeline, PipelineException | ||
| from ..storage import AlreadyPresentException, Store | ||
@@ -39,5 +39,10 @@ | ||
| super(BayesWave, self).__init__(production, category) | ||
| self.logger.warning( | ||
| "The Bayeswave interface built into asimov will be removed " | ||
| "in v0.7 of asimov, and replaced with an integration from an " | ||
| "external package." | ||
| ) | ||
| self.logger.info("Using the Bayeswave pipeline") | ||
| if not production.pipeline.lower() == "bayeswave": | ||
| raise PipelineException | ||
| raise PipelineException("Pipeline mismatch") | ||
@@ -169,3 +174,2 @@ try: | ||
| self.logger.info(" ".join(command)) | ||
| if dryrun: | ||
@@ -175,3 +179,2 @@ print(" ".join(command)) | ||
| else: | ||
| pipe = subprocess.Popen( | ||
@@ -259,7 +262,5 @@ command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT | ||
| self.collect_pages() | ||
| except FileNotFoundError: | ||
| PipelineLogger( | ||
| message=b"Failed to copy megaplot pages.", | ||
| production=self.production.name, | ||
| ) | ||
| except FileNotFoundError as e: | ||
| self.logger.error("Failed to copy the megaplot output") | ||
| self.logger.exception(e) | ||
@@ -269,8 +270,5 @@ try: | ||
| self.store_assets() | ||
| except Exception: | ||
| PipelineLogger( | ||
| message=b"Failed to store PSDs.", | ||
| issue=self.production.event.issue_object, | ||
| production=self.production.name, | ||
| ) | ||
| except Exception as e: | ||
| self.logger.error("Failed to store the PSDs") | ||
| self.logger.exception(e) | ||
@@ -354,2 +352,4 @@ if "supress" in self.production.meta["quality"]: | ||
| self.logger.info((" ".join(command))) | ||
| if dryrun: | ||
@@ -459,7 +459,14 @@ print(" ".join(command)) | ||
| psds = {} | ||
| results_dir = glob.glob(f"{self.production.rundir}/trigtime_*")[0] | ||
| for det in self.production.meta["interferometers"]: | ||
| asset = os.path.join( | ||
| results_dir, "post", "clean", f"glitch_median_PSD_forLI_{det}.dat" | ||
| asset = glob.glob( | ||
| os.path.join( | ||
| self.production.rundir, | ||
| "trigtime*", | ||
| "post", | ||
| "clean", | ||
| f"glitch_median_PSD_forLI_{det}.dat", | ||
| ) | ||
| ) | ||
| if len(asset) > 0: | ||
| asset = asset[0] | ||
| if os.path.exists(asset): | ||
@@ -494,3 +501,3 @@ psds[det] = asset | ||
| store = Store(root=config.get("storage", "directory")) | ||
| sample_rate = self.production.meta["quality"]["sample-rate"] | ||
| sample_rate = self.production.meta["likelihood"]["sample rate"] | ||
| orig_PSD_file = np.genfromtxt( | ||
@@ -497,0 +504,0 @@ os.path.join( |
@@ -12,5 +12,8 @@ """Bilby Pipeline specification.""" | ||
| from .. import config | ||
| from ..pipeline import Pipeline, PipelineException, PipelineLogger, PESummaryPipeline | ||
| from ..pipeline import Pipeline, PipelineException, PipelineLogger | ||
| from .. import auth | ||
| from .pesummary import PESummary | ||
| class Bilby(Pipeline): | ||
@@ -34,6 +37,11 @@ """ | ||
| super(Bilby, self).__init__(production, category) | ||
| self.logger.warning( | ||
| "The Bilby interface built into asimov will be removed " | ||
| "in v0.7 of asimov, and replaced with an integration from an " | ||
| "external package." | ||
| ) | ||
| self.logger.info("Using the bilby pipeline") | ||
| if not production.pipeline.lower() == "bilby": | ||
| raise PipelineException | ||
| raise PipelineException("Pipeline mismatch") | ||
@@ -64,2 +72,3 @@ def detect_completion(self): | ||
| @auth.refresh_scitoken | ||
| def before_submit(self): | ||
@@ -69,13 +78,5 @@ """ | ||
| """ | ||
| self.logger.info("Running the before_submit hook") | ||
| sub_files = glob.glob(f"{self.production.rundir}/submit/*.submit") | ||
| for sub_file in sub_files: | ||
| if "dag" in sub_file: | ||
| continue | ||
| with open(sub_file, "r") as f_handle: | ||
| original = f_handle.read() | ||
| with open(sub_file, "w") as f_handle: | ||
| self.logger.info(f"Adding preserve_relative_paths to {sub_file}") | ||
| f_handle.write("preserve_relative_paths = True\n" + original) | ||
| pass | ||
| @auth.refresh_scitoken | ||
| def build_dag(self, psds=None, user=None, clobber_psd=False, dryrun=False): | ||
@@ -209,4 +210,2 @@ """ | ||
| try: | ||
| # to do: Check that this is the correct name of the output DAG file for billby (it | ||
| # probably isn't) | ||
| if "job label" in self.production.meta: | ||
@@ -229,3 +228,2 @@ job_label = self.production.meta["job label"] | ||
| # with set_directory(self.production.rundir): | ||
| self.logger.info(f"Working in {os.getcwd()}") | ||
@@ -271,3 +269,8 @@ | ||
| """ | ||
| return {"samples": self.samples()} | ||
| return { | ||
| "samples": self.samples(), | ||
| "config": self.production.event.repository.find_prods( | ||
| self.production.name, self.category | ||
| )[0], | ||
| } | ||
@@ -289,3 +292,3 @@ def samples(self, absolute=False): | ||
| def after_completion(self): | ||
| post_pipeline = PESummaryPipeline(production=self.production) | ||
| post_pipeline = PESummary(production=self.production) | ||
| self.logger.info("Job has completed. Running PE Summary.") | ||
@@ -314,5 +317,5 @@ cluster = post_pipeline.submit_dag() | ||
| except FileNotFoundError: | ||
| messages[ | ||
| log.split("/")[-1] | ||
| ] = "There was a problem opening this log file." | ||
| messages[log.split("/")[-1]] = ( | ||
| "There was a problem opening this log file." | ||
| ) | ||
| return messages | ||
@@ -341,5 +344,5 @@ | ||
| except FileNotFoundError: | ||
| messages[ | ||
| log.split("/")[-1] | ||
| ] = "There was a problem opening this log file." | ||
| messages[log.split("/")[-1]] = ( | ||
| "There was a problem opening this log file." | ||
| ) | ||
| return messages | ||
@@ -346,0 +349,0 @@ |
@@ -35,4 +35,9 @@ """LALInference Pipeline specification.""" | ||
| self.logger = logger | ||
| self.logger.warning( | ||
| "The LALInference interface built into asimov will be removed " | ||
| "in v0.7 of asimov, and replaced with an integration from an " | ||
| "external package." | ||
| ) | ||
| if not production.pipeline.lower() == "lalinference": | ||
| raise PipelineException | ||
| raise PipelineException("Pipeline mismatch") | ||
@@ -195,2 +200,4 @@ def detect_completion(self): | ||
| """ | ||
| if not dryrun: | ||
| os.chdir(self.production.rundir) | ||
@@ -240,3 +247,3 @@ with set_directory(self.production.rundir): | ||
| cluster = self.run_pesummary() | ||
| self.production.meta["job id"] = int(cluster) | ||
| self.production.job_id = int(cluster) | ||
| self.production.status = "processing" | ||
@@ -243,0 +250,0 @@ |
+32
-13
@@ -15,3 +15,3 @@ """RIFT Pipeline specification.""" | ||
| from asimov.pipeline import Pipeline, PipelineException, PipelineLogger | ||
| from asimov.pipeline import PESummaryPipeline | ||
| from asimov.pipelines.pesummary import PESummary | ||
@@ -40,3 +40,3 @@ | ||
| if not production.pipeline.lower() == "rift": | ||
| raise PipelineException | ||
| raise PipelineException("Pipeline mismatch") | ||
@@ -67,3 +67,3 @@ if "bootstrap" in self.production.meta: | ||
| self.logger.info("Job has completed. Running PE Summary.") | ||
| post_pipeline = PESummaryPipeline(production=self.production) | ||
| post_pipeline = PESummary(production=self.production) | ||
| cluster = post_pipeline.submit_dag() | ||
@@ -193,15 +193,34 @@ | ||
| os.environ["LIGO_USER_NAME"] = f"{user}" | ||
| os.environ[ | ||
| "LIGO_ACCOUNTING" | ||
| ] = f"{self.production.meta['scheduler']['accounting group']}" | ||
| if "accounting group" in self.production.meta["scheduler"]: | ||
| os.environ["LIGO_USER_NAME"] = f"{user}" | ||
| os.environ["LIGO_ACCOUNTING"] = ( | ||
| f"{self.production.meta['scheduler']['accounting group']}" | ||
| ) | ||
| if "gpu architectures" in self.production.meta["scheduler"]: | ||
| # Write details of required GPU architectures into an environment variable. | ||
| os.environ["RIFT_REQUIRE_GPUS"] = "&&".join( | ||
| [ | ||
| f"""(DeviceName=!="{device}")""" | ||
| for device in self.production.meta["scheduler"]["gpu architectures"] | ||
| ] | ||
| ) | ||
| if "avoid hosts" in self.production.meta["scheduler"]: | ||
| # Collect alist of specific hosts to avoid on the OSG | ||
| os.environ["RIFT_AVOID_HOSTS"] = "&&".join( | ||
| [ | ||
| f"""(TARGET.Machine =!="{machine}")""" | ||
| for machine in self.production.meta["scheduler"]["avoid hosts"] | ||
| ] | ||
| ) | ||
| if "singularity image" in self.production.meta["scheduler"]: | ||
| # Collect the correct information for the singularity image | ||
| os.environ[ | ||
| "SINGULARITY_RIFT_IMAGE" | ||
| ] = f"{self.production.meta['scheduler']['singularity image']}" | ||
| os.environ[ | ||
| "SINGULARITY_BASE_EXE_DIR" | ||
| ] = f"{self.production.meta['scheduler']['singularity base exe directory']}" | ||
| os.environ["SINGULARITY_RIFT_IMAGE"] = ( | ||
| f"{self.production.meta['scheduler']['singularity image']}" | ||
| ) | ||
| os.environ["SINGULARITY_BASE_EXE_DIR"] = ( | ||
| f"{self.production.meta['scheduler']['singularity base exe directory']}" | ||
| ) | ||
@@ -208,0 +227,0 @@ try: |
+2
-0
@@ -65,2 +65,3 @@ """ | ||
| ) | ||
| review_ob = cls() | ||
@@ -121,2 +122,3 @@ messages = sorted(messages, key=lambda k: k.timestamp) | ||
| out["status"] = self.status | ||
| return out | ||
@@ -123,0 +125,0 @@ |
+0
-37
| import collections | ||
| import glob | ||
| import os | ||
@@ -8,4 +7,2 @@ from contextlib import contextmanager | ||
| import numpy as np | ||
| from asimov import logger | ||
@@ -36,36 +33,2 @@ | ||
| def find_calibrations(time): | ||
| """ | ||
| Find the calibration file for a given time. | ||
| """ | ||
| if time < 1190000000: | ||
| dir = "/home/cal/public_html/uncertainty/O2C02" | ||
| virgo = "/home/carl-johan.haster/projects/O2/C02_reruns/V_calibrationUncertaintyEnvelope_magnitude5p1percent_phase40mraddeg20microsecond.txt" # NoQA | ||
| elif time < 1290000000: | ||
| dir = "/home/cal/public_html/uncertainty/O3C01" | ||
| virgo = "/home/cbc/pe/O3/calibrationenvelopes/Virgo/V_O3a_calibrationUncertaintyEnvelope_magnitude5percent_phase35milliradians10microseconds.txt" # NoQA | ||
| elif time > 1268306607: | ||
| raise ValueError( | ||
| "This method cannot be used to add calibration envelope data beyond O3." | ||
| ) | ||
| data_llo = glob.glob(f"{dir}/L1/*LLO*FinalResults.txt") | ||
| times_llo = { | ||
| int(datum.split("GPSTime_")[1].split("_C0")[0]): datum for datum in data_llo | ||
| } | ||
| data_lho = glob.glob(f"{dir}/H1/*LHO*FinalResults.txt") | ||
| times_lho = { | ||
| int(datum.split("GPSTime_")[1].split("_C0")[0]): datum for datum in data_lho | ||
| } | ||
| keys_llo = np.array(list(times_llo.keys())) | ||
| keys_lho = np.array(list(times_lho.keys())) | ||
| return { | ||
| "H1": times_lho[keys_lho[np.argmin(np.abs(keys_lho - time))]], | ||
| "L1": times_llo[keys_llo[np.argmin(np.abs(keys_llo - time))]], | ||
| "V1": virgo, | ||
| } | ||
| def update(d, u, inplace=True): | ||
@@ -72,0 +35,0 @@ """Recursively update a dictionary.""" |
+37
-2
@@ -0,1 +1,37 @@ | ||
| 0.6.0 | ||
| ===== | ||
| This is a major feature release which introduces some change in functionality which may have a minor effect on backwards compatibility. | ||
| New features | ||
| ------------ | ||
| *New analysis types*: asimov 0.6.0 introduces two new kinds of analysis: subject analyses, which allow an analysis to access configurations and results from multiple other analyses defined on a given event or subject, and project analyses which allow access to multiple analyses to be accessed across multiple events. These will allow much more complicated workflows than before which allow combinations of large numbers of subsidiary results. | ||
| Changes | ||
| ------- | ||
| *Deprecation warnings*: Deprecation warnings have been added to each of the pipelines (bilby, pesummary, lalinference, rift, and bayeswave) which are currently supported "out of the box" by asimov, as these will be removed in asimov 0.7 and replaced with plugins. | ||
| *Bilby*: The online pe argument has been removed from the bilby configuration template as this has been deprecated. | ||
| *Removal of gitlab interfaces*: More logic related to the disused and deprecated gitlab issue tracker interface has been removed. | ||
| *Removal of calibration envelope logic*: We have removed the logic required to find calibration files on the LIGO Scientific Collaboration computing resources. These have been moved to the asimov-gwdata pipeline, which will be used in O4 collaboration analyses. | ||
| Deprecation | ||
| ----------- | ||
| + All pipeline interfaces in the main package have been deprecated, and will be removed in v0.7.0 | ||
| Merges and fixes | ||
| ---------------- | ||
| + `ligo!120 <https://git.ligo.org/asimov/asimov/-/merge_requests/120>`_: Fixes the event deletion CLI option. | ||
| + `ligo!66 <https://git.ligo.org/asimov/asimov/-/merge_requests/66>`_: Updates the post-processing interface. | ||
| + `ligo!128 <https://git.ligo.org/asimov/asimov/-/merge_requests/128>`_: Updates to the README | ||
| + `ligo!157 <https://git.ligo.org/asimov/asimov/-/merge_requests/157>`_: Fixes to the interface between asimov and lensingflow | ||
| 0.5.8 | ||
@@ -41,3 +77,3 @@ ===== | ||
| 0.5.6 | ||
@@ -58,3 +94,2 @@ ===== | ||
| 0.5.5 | ||
@@ -61,0 +96,0 @@ ===== |
+198
-1
@@ -0,1 +1,4 @@ | ||
| Analyses | ||
| ======== | ||
| Analyses are the fundamental operations which asimov conducts on data. | ||
@@ -10,3 +13,3 @@ | ||
| as these only require access to the data for a single event. | ||
| Before version 0.4 these were called `Productions`. | ||
| Before version 0.6 these were called `Productions`. | ||
@@ -26,1 +29,195 @@ Event analyses | ||
| An analysis is defined as a series of configuration variables in an asimov project's ledger, which are then used to configure analysis pipelines. | ||
| .. _states: | ||
| Analysis state | ||
| -------------- | ||
| In order to track each of the jobs asimov monitors it employs a simple state machine on each production. | ||
| This state is tracked within :ref:`the production ledger<ledger>` for each production with the value of ``status``. | ||
| Under normal running conditions the sequence of these states is | ||
| :: | ||
| ready --> running --> finished --> processing --> uploaded | ||
| A number of additional states are also possible which interupt the normal flow of the job through ``asimov``'s workflow. | ||
| + ``ready`` : This job should be started automatically. This state must be applied manually. The job will then be started once its dependencies are met. | ||
| + ``stopped`` : This run has been manually stopped; stop tracking it. Must be applied manually. | ||
| + ``stuck`` : A problem has been discovered with this run, and needs manual intervention. | ||
| + ``uploaded`` : This event has been uploaded to the event repository. | ||
| + ``restart`` : This job should be restarted by Olivaw. Must be applied manually. | ||
| + ``finished`` : This job has finished running and the results are ready to be processed on the next bot check. | ||
| + ``processing`` : The results of this job are currently being processed by ``PESummary``. | ||
| + ``uploaded`` : This job has been uploaded to the data store. | ||
| .. note:: | ||
| The way that analyses are handled by asimov changed considerably in ``asimov 0.6.0``, but generally you shouldn't notice any major differences if you've been using ``Productions`` in the past. | ||
| Creating Analyses | ||
| ================= | ||
| The easiest way to create a new analysis is using an YAML Blueprint file. | ||
| We settled on using these because analyses often contain a very large number of settings, and trying to set everything up on the command line becomes rapidly impractical, and difficult to reproduce reliably, without writing a shell script. | ||
| It is also possible to create simple analyses via the command line, but a blueprint is normally the best way. | ||
| A Blueprint for a simple analysis | ||
| --------------------------------- | ||
| A simple analysis is designed to only perform analysis on a single subject or event. | ||
| The blueprint for these analyses can be very short if you don't need to specify many settings for the analysis. | ||
| For example, ``Bayeswave`` is a gravitational wave analysis pipeline which is designed to perform analysis on a single stretch of data, and a single gravitational wave event. | ||
| It doesn't require information about other gravitational wave events to be shared with the analysis, and so is best modelled in asimov as a simple analysis. | ||
| A blueprint to set up a default ``Bayeswave`` run is just a handful of lines long. | ||
| .. code-block:: yaml | ||
| kind: analysis | ||
| name: sample-analysis | ||
| pipeline: bayeswave | ||
| comment: This is a simple analysis pipeline. | ||
| Save this as ``bayeswave-blueprint.yaml``, and you can then add this analysis to an event in a project by running | ||
| .. code-block:: console | ||
| $ asimov apply -e <subject name> -f bayeswave-blueprint.yaml | ||
| replacing ``<subject name>`` with the name of the subject you're adding the analysis to. | ||
| It's also possible to make a simple analysis depend on the results of a previous analysis using the ``needs`` keyword in the blueprint. | ||
| The pipeline ``giskard`` needs a datafile which is produced by the ``bayeswave`` pipeline defined in the first blueprint, so it can be created with this blueprint: | ||
| .. code-block:: yaml | ||
| kind: analysis | ||
| name: stage-2-analysis | ||
| pipeline: giskard | ||
| comment: Search for evidence of gravitational wave bending. | ||
| needs: | ||
| - sample-analysis | ||
| Here we defined the requirement by the *name* of the previous analysis, but we can also use various properties of the analysis. | ||
| This can be useful if you don't want to rely on having consistent naming between events or even projects, but you want to be able to reuse a blueprint for many subjects or even many projects. | ||
| You can update the previous blueprint to always require a ``Bayeswave`` pipeline to have completed before starting the ``giskard`` analysis: | ||
| .. code-block:: yaml | ||
| kind: analysis | ||
| name: stage-2-analysis | ||
| pipeline: giskard | ||
| comment: Search for evidence of gravitational wave bending. | ||
| waveform: | ||
| approximant: impecableOstritchv56PHMX | ||
| needs: | ||
| - "pipeline:bayeswave" | ||
| We can use any of the metadata for an analysis to create the dependencies. | ||
| For example, we can require an analysis which used the ``impecableOstritchv56PHMX`` waveform by stating ``"waveform.approximant:impecableOstritchv56PHMX"`` in the ``needs`` section. | ||
| You can also define mutliple criteria for an analyses dependencies, and asimov will wait until all of the requirements are satisfied before starting. | ||
| For example: | ||
| .. code-block:: yaml | ||
| kind: analysis | ||
| name: stage-3-analysis | ||
| pipeline: calvin | ||
| comment: A third step analysis. | ||
| needs: | ||
| - "pipeline:giskard" | ||
| - "waveform.approximant:impecableOstritchv56PHMX" | ||
| A Blueprint for a project analysis | ||
| ---------------------------------- | ||
| Creating a project analysis in asimov is very similar to a simple analysis, except that we can also provide a list of subjects which should be included in the analysis. | ||
| For example, the ``gladia`` pipeline is used to perform a joint analysis between two gravitational waves. | ||
| To create a ``gladia`` pipeline which analyses two events, ``GW150914`` and ``GW151012`` you need to add a ``subjects`` list to the blueprint, for example: | ||
| .. code-block:: yaml | ||
| kind: projectanalysis | ||
| name: gladia-joint | ||
| pipeline: gladia | ||
| comment: An example joint analysis. | ||
| subjects: | ||
| - GW150914 | ||
| - GW151012 | ||
| Creating a Project Analysis Pipeline | ||
| ==================================== | ||
| For the most part a Project Analysis pipeline is similar to a simple analysis pipeline. | ||
| The main difference will be how you access metadata from each event. | ||
| In the template configuration file project analyses have access to the ``analysis.subjects`` property, which provides a list of subjects available to the analysis. | ||
| These then give access to all of the metadata for each subject. | ||
| The example below uses two subjects, and to make the sample template easier to read we've assigned each to its own liquid variable. | ||
| .. code-block:: liquid | ||
| {%- assign subject_1 = analysis.subjects[0] -%} | ||
| {%- assign subject_2 = analysis.subjects[1] -%} | ||
| [event_1_settings] | ||
| {%- assign ifos = subject_1.meta['interferometers'] -%} | ||
| channel-dict = { {% for ifo in ifos %}{{ subject_1.meta['data']['channels'][ifo] }},{% endfor %} } | ||
| psd-dict = { {% for ifo in ifos %}{{ifo}}:{{subject_1.psds[ifo]}},{% endfor %} } | ||
| [event_2_settings] | ||
| {%- assign ifos = subject_2.meta['interferometers'] -%} | ||
| channel-dict = { {% for ifo in ifos %}{{ subject_2.meta['data']['channels'][ifo] }},{% endfor %} } | ||
| psd-dict = { {% for ifo in ifos %}{{ifo}}:{{subject_2.psds[ifo]}},{% endfor %} } | ||
| Postprocessing Workflows | ||
| ======================== | ||
| It's common to have workflows where one process produces a result which then needs to have some additional processing required which may not fit neatly into the notion of an analysis. | ||
| For example, in gravitational wave transient analyses it is common to perform parameter estimation in an ``Analysis``, but then want to run a script which will plot the outputs after the analysis is complete. | ||
| Indeed, there are tools which are designed to do this for a wide range of pipelines, in order to produce results in a common format. | ||
| In asimov these jobs are called "Postprocessing pipelines", and they share much of the same functionality as a full analysis. | ||
| Where an analysis is applied to a single event, and provides the settings which are required for a single analysis, in general postprocessing analyses are designed to be applied identically to any Analysis when it completes, if it satisfies the Pipeline's criteria. | ||
| As a concrete example, let's look at the blueprint for a postprocessing analysis. | ||
| .. code-block:: yaml | ||
| kind: postprocessing | ||
| name: combined summary pages for bilby | ||
| analyses: | ||
| - pipeline:bilby | ||
| stages: | ||
| - name: combined pages | ||
| pipeline: pesummary | ||
| This blueprint describes postprocessing using a pipeline called ``pesummary`` which applies to all events (aka subjects) in the project, and all analyses which have "bilby" as their pipeline. | ||
| In contrast to a normal Analysis, it is possible to define multiple stages to a Postprocessing workflow; for example, this blueprint creates a workflow with two stages: | ||
| .. code-block:: yaml | ||
| kind: postprocessing | ||
| name: standard pe postprocessing | ||
| analyses: | ||
| - pipeline:bilby | ||
| - pipeline:rift | ||
| stages: | ||
| - name: simple PE summary | ||
| pipeline: pesummary | ||
| - name: less simple PE summary | ||
| pipeline: pesummary | ||
| needs: | ||
| - simple PE summary | ||
| This workflow has two stages, with ``less simple PE summary`` requiring ``simple PE summary`` to complete before it is started. | ||
@@ -0,1 +1,3 @@ | ||
| :page_template: homepage.html | ||
| .. raw:: html | ||
@@ -109,3 +111,6 @@ | ||
| Citing asimov | ||
| ------------- | ||
| If you use asimov in a publication please consider consulting our :ref:`citation-guide`. | ||
@@ -132,10 +137,18 @@ .. | ||
| installation | ||
| olivaw/projects | ||
| olivaw/events | ||
| olivaw/productions | ||
| olivaw/running | ||
| olivaw/monitoring | ||
| olivaw/reporting | ||
| user-guide/projects | ||
| user-guide/events | ||
| user-guide/productions | ||
| user-guide/running | ||
| user-guide/monitoring | ||
| user-guide/reporting | ||
| storage | ||
| olivaw/review | ||
| citing | ||
| .. toctree:: | ||
| :maxdepth: 2 | ||
| :caption: The Ledger | ||
| ledger | ||
@@ -175,6 +188,12 @@ .. toctree:: | ||
| asimov-repository | ||
| code-overview | ||
| ledger | ||
| pipelines-dev | ||
| hooks | ||
| building-docs | ||
@@ -202,4 +221,10 @@ asimov-repository | ||
| api/git | ||
| state | ||
| pipelines | ||
| api/gitlab | ||
| api/ini | ||
| api/ledger | ||
| api/locutus | ||
| api/logging | ||
| api/mattermost | ||
| api/olivaw | ||
| api/pipeline | ||
| config | ||
@@ -206,0 +231,0 @@ |
@@ -0,1 +1,6 @@ | ||
| <<<<<<< HEAD | ||
| .. _ledger: | ||
| ======= | ||
| >>>>>>> v0.4-release | ||
| The Asimov Ledger | ||
@@ -102,3 +107,5 @@ ================= | ||
| For example | ||
| :: | ||
| data: | ||
@@ -113,3 +120,5 @@ calibration: | ||
| For example | ||
| :: | ||
| data: | ||
@@ -124,3 +133,5 @@ channels: | ||
| For example | ||
| :: | ||
| data: | ||
@@ -135,3 +146,5 @@ frame-types: | ||
| For example | ||
| :: | ||
| data: | ||
@@ -146,3 +159,5 @@ segments: | ||
| For example | ||
| :: | ||
| data: | ||
@@ -191,3 +206,11 @@ data files: | ||
| ~~~~~~~~ | ||
| .. code-block:: yaml | ||
| scheduler: | ||
| accounting group: ligo.dev.o4.cbc.pe.bilby | ||
| request cpus: 4 | ||
| Prior settings | ||
@@ -250,1 +273,28 @@ -------------- | ||
| boundary: periodic | ||
| Postprocessing settings | ||
| ----------------------- | ||
| Examples | ||
| ~~~~~~~~ | ||
| .. code-block:: yaml | ||
| postprocessing: | ||
| pesummary: | ||
| accounting group: ligo.dev.o4.cbc.pe.lalinference | ||
| cosmology: Planck15_lal | ||
| evolve spins: forward | ||
| multiprocess: 4 | ||
| redshift: exact | ||
| regenerate posteriors: | ||
| - redshift | ||
| - mass_1_source | ||
| - mass_2_source | ||
| - chirp_mass_source | ||
| - total_mass_source | ||
| - final_mass_source | ||
| - final_mass_source_non_evolved | ||
| - radiated_energy | ||
| skymap samples: 2000 |
@@ -0,1 +1,6 @@ | ||
| <<<<<<< HEAD | ||
| .. _pipeline-dev: | ||
| ======= | ||
| >>>>>>> v0.4-release | ||
| Developing new pipelines | ||
@@ -56,2 +61,3 @@ ======================== | ||
| :: | ||
| {"psds": {"L1": /path/to/L1/psd, "H1": /path/to/H1/psd}} | ||
@@ -162,2 +168,230 @@ | ||
| asimov.pipelines = | ||
| mypipeline = asimov:MyPipeline | ||
| mypipeline = mypipeline.asimov:MyPipeline | ||
| ``setup.py`` | ||
| ~~~~~~~~~~~~ | ||
| .. code-block:: python | ||
| import setuptools | ||
| setuptools.setup( | ||
| name = "MyPipeline", | ||
| ..., | ||
| entry_points = { | ||
| "asimov.pipelines": ["mypipeline = mypipeline.asimov:MyPipeline"] | ||
| }, | ||
| .. _templates: | ||
| Pipeline Configuration Templates | ||
| -------------------------------- | ||
| All of the pipelines which ``asimov`` is designed to work with use some manner of configuration file to define their operation. | ||
| Previously, creating these configuration files could be a tedious manual process, but ``asimov`` allows these files to be *templated*, combining various pieces of data and metadata from the production ledger with a template configuration file to produce the configuration which is then used to generate the DAG files which run the analysis. | ||
| Details of the metadata stored in the ledger can be found in the :ref:`documentation for the ledger format<ledger>`. | ||
| In order to run a ``bilby`` job a config file is required for ``bilby_pipe``. | ||
| ``Asimov`` can produce this from a template. | ||
| Config file templates can be written using the liquid templating language, and should be kept in a directory which is specified in the `asimov` configuration file under the ``templating>directory`` configuration value e.g. | ||
| .. code-block:: ini | ||
| [templating] | ||
| directory = config-templates | ||
| The liquid language allows some logic to be included in the template. | ||
| This can be used to only include a given value if an interferometer is included in the analysis. | ||
| For example: | ||
| .. code-block:: ini | ||
| spline-calibration-envelope-dict={ | ||
| {% if production.meta['interferometers'] contains "H1" %} | ||
| H1:{{ production.event.repository.directory }}/{{ production.meta['calibration']['H1'] }}, | ||
| {% endif %} | ||
| {% if production.meta['interferometers'] contains "L1" %} | ||
| L1:{{ production.event.repository.directory }}/{{ production.meta['calibration']['L1'] }}, | ||
| {% endif %} | ||
| {% if production.meta['interferometers'] contains "V1" %} | ||
| V1:{{ production.event.repository.directory }}/{{ production.meta['calibration']['V1'] }} | ||
| {% endif %} | ||
| } | ||
| Adds only the calibration files for the required detectors to the configuration file. | ||
| The majority of the data passed to the template can be found in the ``production.meta`` dictionary. | ||
| These are stored in the same nested format as the production ledger; evbent-wide values are inherited by the production, so in the example ledger below the sample rate can be retrieved from ``production.meta['quality']['sample-rate']``, for example. | ||
| There are also a number of additional variables are available for convenience: | ||
| + ``production.quality`` is an alias for ``production.meta['quality']`` | ||
| + ``production.psds`` provides the dictionary of PSDs for this event's specified sample rate. | ||
| + ``production.event`` provides access to the data from the event (e.g. for the repository directory path, located at ``production.event.repository.directory``) | ||
| A full example ``bilby`` template is available below: | ||
| .. code-block:: ini | ||
| ################################################################################ | ||
| ## Calibration arguments | ||
| ################################################################################ | ||
| calibration-model=CubicSpline | ||
| spline-calibration-envelope-dict={ {% if production.meta['interferometers'] contains "H1" %}H1:{{ production.event.repository.directory }}/{{ production.meta['calibration']['H1'] }},{% endif %}{% if production.meta['interferometers'] contains "L1" %}L1:{{ production.event.repository.directory }}/{{ production.meta['calibration']['L1'] }},{% endif %}{% if production.meta['interferometers'] contains "V1" %}V1:{{ production.event.repository.directory }}/{{ production.meta['calibration']['V1'] }}{% endif %} } | ||
| spline-calibration-nodes=10 | ||
| spline-calibration-amplitude-uncertainty-dict=None | ||
| spline-calibration-phase-uncertainty-dict=None | ||
| ################################################################################ | ||
| ## Data generation arguments | ||
| ################################################################################ | ||
| ignore-gwpy-data-quality-check=True | ||
| gps-tuple=None | ||
| gps-file=None | ||
| timeslide-file=None | ||
| timeslide-dict=None | ||
| trigger-time={{ production.meta['event time'] }} | ||
| gaussian-noise=False | ||
| n-simulation=0 | ||
| data-dict=None | ||
| data-format=None | ||
| channel-dict={ {% if production.meta['interferometers'] contains "H1" %}{{ production.meta['data']['channels']['H1'] }},{% endif %} {% if production.meta['interferometers'] contains "L1" %}{{ production.meta['data']['channels']['L1'] }},{% endif %}{% if production.meta['interferometers'] contains "V1" %}{{ production.meta['data']['channels']['V1'] }}{% endif %} } | ||
| ################################################################################ | ||
| ## Detector arguments | ||
| ################################################################################ | ||
| coherence-test=False | ||
| detectors={{ production.meta['interferometers'] }} | ||
| duration={{ production.meta['quality']['segment-length'] }} | ||
| generation-seed=None | ||
| psd-dict={ {% if production.meta['interferometers'] contains "H1" %}H1:{{ production.psds['H1'] }},{% endif %} {% if production.meta['interferometers'] contains "L1" %}L1:{{ production.psds['L1'] }},{% endif %} {% if production.meta['interferometers'] contains "V1" %}V1:{{ production.psds['V1'] }}{% endif %} } | ||
| psd-fractional-overlap=0.5 | ||
| post-trigger-duration=2.0 | ||
| sampling-frequency={{ production.meta['quality']['sample-rate'] }} | ||
| psd-length={{ production.meta['quality']['psd-length'] }} | ||
| psd-maximum-duration=1024 | ||
| psd-method=median | ||
| psd-start-time=None | ||
| maximum-frequency=1024 | ||
| minimum-frequency={{ production.meta['quality']['reference-frequency'] }} | ||
| zero-noise=False | ||
| tukey-roll-off=0.4 | ||
| resampling-method=lal | ||
| ################################################################################ | ||
| ## Injection arguments | ||
| ################################################################################ | ||
| injection=False | ||
| injection-dict=None | ||
| injection-file=None | ||
| injection-numbers=None | ||
| injection-waveform-approximant=None | ||
| ################################################################################ | ||
| ## Job submission arguments | ||
| ################################################################################ | ||
| accounting=ligo.dev.o3.cbc.pe.lalinference | ||
| label={{ production.name }} | ||
| local=False | ||
| local-generation=False | ||
| local-plot=False | ||
| outdir={{ production.rundir }} | ||
| periodic-restart-time=28800 | ||
| request-memory=4.0 | ||
| request-memory-generation=None | ||
| request-cpus=4 | ||
| singularity-image=None | ||
| scheduler=condor | ||
| scheduler-args=None | ||
| scheduler-module=None | ||
| scheduler-env=None | ||
| transfer-files=False | ||
| log-directory=None | ||
| online-pe=False | ||
| osg=False | ||
| ################################################################################ | ||
| ## Likelihood arguments | ||
| ################################################################################ | ||
| distance-marginalization=True | ||
| distance-marginalization-lookup-table=None | ||
| phase-marginalization=True | ||
| time-marginalization=True | ||
| jitter-time=True | ||
| reference-frame={% if production.meta['interferometers'] contains "H1" %}H1{% endif %}{% if production.meta['interferometers'] contains "L1" %}L1{% endif %}{% if production.meta['interferometers'] contains "V1" %}V1{% endif %} | ||
| time-reference={% if production.meta['interferometers'] contains "H1" %}H1{% elsif production.meta['interferometers'] contains "L1" %}L1{% elsif production.meta['interferometers'] contains "V1" %}V1{% endif %} | ||
| likelihood-type=GravitationalWaveTransient | ||
| roq-folder=None | ||
| roq-scale-factor=1 | ||
| extra-likelihood-kwargs=None | ||
| ################################################################################ | ||
| ## Output arguments | ||
| ################################################################################ | ||
| create-plots=True | ||
| plot-calibration=False | ||
| plot-corner=False | ||
| plot-marginal=False | ||
| plot-skymap=False | ||
| plot-waveform=False | ||
| plot-format=png | ||
| create-summary=False | ||
| email=None | ||
| existing-dir=None | ||
| webdir=/home/pe.o3/public_html/LVC/o3b-catalog/{{ production.event.name }}/{{ production.name }} | ||
| summarypages-arguments=None | ||
| ################################################################################ | ||
| ## Prior arguments | ||
| ################################################################################ | ||
| default-prior=BBHPriorDict | ||
| deltaT=0.2 | ||
| prior-file=4s | ||
| prior-dict=None | ||
| convert-to-flat-in-component-mass=False | ||
| ################################################################################ | ||
| ## Post processing arguments | ||
| ################################################################################ | ||
| postprocessing-executable=None | ||
| postprocessing-arguments=None | ||
| single-postprocessing-executable=None | ||
| single-postprocessing-arguments=None | ||
| ################################################################################ | ||
| ## Sampler arguments | ||
| ################################################################################ | ||
| sampler=dynesty | ||
| sampling-seed=None | ||
| n-parallel=5 | ||
| sampler-kwargs={'queue_size': 4, 'nlive': 2000, 'sample': 'rwalk', 'walks': 100, 'n_check_point': 2000, 'nact': 10, 'npool': 4} | ||
| ################################################################################ | ||
| ## Waveform arguments | ||
| ################################################################################ | ||
| waveform-generator=bilby.gw.waveform_generator.WaveformGenerator | ||
| reference-frequency={{ production.meta['quality']['reference-frequency'] }} | ||
| waveform-approximant={{ production.meta['approximant'] }} | ||
| catch-waveform-errors=False | ||
| pn-spin-order=-1 | ||
| pn-tidal-order=-1 | ||
| pn-phase-order=-1 | ||
| pn-amplitude-order=0 | ||
| mode-array=None | ||
| frequency-domain-source-model=lal_binary_black_hole |
@@ -16,6 +16,6 @@ ================== | ||
| + :ref:`lalinference<LALInference pipelines>` | ||
| + :ref:`bayeswave<pipelines/BayesWave pipelines>` | ||
| + ``bilby`` | ||
| + ``RIFT`` | ||
| + :ref:`LALInference<lalinference-pipelines>` | ||
| + :ref:`BayesWave<bayeswave-pipelines>` | ||
| + :ref:`Bilby<bilby-pipelines>` | ||
| + :ref:`RIFT<rift-pipelines>` | ||
@@ -25,21 +25,3 @@ Adding new pipelines | ||
| New pipelines can be added to asimov by overloading the various methods in the :class:``asimov.pipeline.Pipeline`` class. | ||
| The most important of these is the ``build_dag`` method, which is used by the asimov framework to construct the DAG file to be submitted to the condor scheduler. | ||
| An example of a complete pipeline interface can be seen in the code for :class:``asimov.pipelines.lalinference.LALinference``. | ||
| Pipeline hooks | ||
| -------------- | ||
| It is possible to customise the run process of the asimov pipeline runner using hooks. | ||
| By overloading the hook methods (listed below) inherited from the ``asimov.pipeline.Pipeline`` class additional operations can | ||
| be conducted during the processing workflow. | ||
| Hooks should take no arguments. | ||
| Implemented hooks are: | ||
| :: | ||
| before_submit() --- Executed immediately before the DAG file for a pipeline is generated. | ||
| after_completion() --- Executed once execution has successfully | ||
| New pipelines can be added to asimov by overloading the various methods in the ``asimov.pipeline.Pipeline`` class. | ||
| Details for how you can develop new pipeline interfaces can be found in the :ref:`pipelines development guide<pipeline-dev>`. |
@@ -0,1 +1,3 @@ | ||
| .. _bayeswave-pipelines: | ||
| BayesWave pipelines | ||
@@ -2,0 +4,0 @@ =================== |
@@ -0,1 +1,3 @@ | ||
| .. _bilby-pipelines: | ||
| Bilby pipelines | ||
@@ -40,2 +42,3 @@ =============== | ||
| This section takes a list of types of marginalization to apply to the analysis (see below for an example of the syntax). | ||
| ``distance`` | ||
@@ -50,2 +53,3 @@ Activates distance marginalization. | ||
| This section allows ROQs to be defined for the likelihood function. | ||
| ``folder`` | ||
@@ -52,0 +56,0 @@ The location of the ROQs. |
@@ -12,5 +12,8 @@ .. _lalinference-pipelines: | ||
| .. note:: | ||
| The current integration with LALInference is fully reviewed and is suitable for use in collaboration analyses. | ||
| .. warning:: | ||
| **v0.4.0** | ||
| The integration with LALInference has been deprecated. | ||
| It *must not* be used for collaboration parameter estimation analyses. | ||
| Examples | ||
@@ -17,0 +20,0 @@ -------- |
@@ -0,1 +1,3 @@ | ||
| .. _rift-pipelines: | ||
| RIFT pipelines | ||
@@ -10,4 +12,8 @@ ============== | ||
| .. note:: | ||
| The current integration with RIFT is fully reviewed and is suitable for use with all collaboration analyses. | ||
| .. warning:: | ||
| **v0.4.0** | ||
| The current integration with RIFT is experimental, and is not reviewed. | ||
| It *must not* be used for collaboration parameter estimation analyses. | ||
| A reviewed version is expected to be available in the v0.5 series of releases. | ||
@@ -14,0 +20,0 @@ |
+7
-19
| Metadata-Version: 2.1 | ||
| Name: asimov | ||
| Version: 0.5.8 | ||
| Version: 0.6.0 | ||
| Summary: A Python package for managing and interacting with data analysis jobs. | ||
@@ -49,6 +49,7 @@ Home-page: https://git.ligo.org/asimov/asimov | ||
| [](https://anaconda.org/conda-forge/ligo-asimov) | ||
| [](https://conda.anaconda.org/conda-forge) | ||
| [](https://git.ligo.org/asimov/asimov/-/commits/infrastructure-updates) | ||
| [](https://git.ligo.org/asimov/asimov/-/commits/master) | ||
| [](https://anaconda.org/conda-forge/asimov/) | ||
|  | ||
|  | ||
@@ -59,16 +60,2 @@ Asimov was developed to manage and automate the parameter estimation analyses used by the LIGO, Virgo, and KAGRA collaborations to analyse gravitational wave signals, but it aims to provide tools which can be used for other workflows. | ||
| ## Branch notes | ||
| These notes relate to in-development features on this branch, and what's described here is only expected to be relevant during development. | ||
| More generally useful documentation will move to the main documentation before moving to production. | ||
| ### Starting the logging server | ||
| Run in ``asimov`` directory: | ||
| ``` | ||
| export FLASK_APP=server | ||
| flask run | ||
| ``` | ||
| ## Features | ||
@@ -112,3 +99,3 @@ | ||
| ``` | ||
| $ conda install -c conda-forge ligo-asimov | ||
| $ conda install -c conda-forge asimov | ||
| ``` | ||
@@ -122,2 +109,3 @@ | ||
| ## Get started | ||
@@ -124,0 +112,0 @@ |
+6
-18
@@ -7,6 +7,7 @@ # Asimov | ||
| [](https://anaconda.org/conda-forge/ligo-asimov) | ||
| [](https://conda.anaconda.org/conda-forge) | ||
| [](https://git.ligo.org/asimov/asimov/-/commits/infrastructure-updates) | ||
| [](https://git.ligo.org/asimov/asimov/-/commits/master) | ||
| [](https://anaconda.org/conda-forge/asimov/) | ||
|  | ||
|  | ||
@@ -17,16 +18,2 @@ Asimov was developed to manage and automate the parameter estimation analyses used by the LIGO, Virgo, and KAGRA collaborations to analyse gravitational wave signals, but it aims to provide tools which can be used for other workflows. | ||
| ## Branch notes | ||
| These notes relate to in-development features on this branch, and what's described here is only expected to be relevant during development. | ||
| More generally useful documentation will move to the main documentation before moving to production. | ||
| ### Starting the logging server | ||
| Run in ``asimov`` directory: | ||
| ``` | ||
| export FLASK_APP=server | ||
| flask run | ||
| ``` | ||
| ## Features | ||
@@ -70,3 +57,3 @@ | ||
| ``` | ||
| $ conda install -c conda-forge ligo-asimov | ||
| $ conda install -c conda-forge asimov | ||
| ``` | ||
@@ -80,2 +67,3 @@ | ||
| ## Get started | ||
@@ -82,0 +70,0 @@ |
+1
-0
@@ -59,1 +59,2 @@ from setuptools import setup | ||
| ) | ||
+160
-5
@@ -9,12 +9,167 @@ """ | ||
| from importlib import reload | ||
| from unittest.mock import patch | ||
| import os | ||
| import io | ||
| import shutil | ||
| import contextlib | ||
| from click.testing import CliRunner | ||
| import asimov | ||
| from asimov.event import Production | ||
| from asimov.cli.application import apply_page | ||
| from asimov.cli import manage, project | ||
| from asimov.ledger import YAMLLedger | ||
| from asimov.pipeline import PipelineException | ||
| from asimov.cli.application import apply_page | ||
| class TestBaseAnalysis(unittest.TestCase): | ||
| pass | ||
| EVENTS = ["GW150914_095045", "GW151226_033853", "GW170814_103043"] | ||
| @classmethod | ||
| def setUpClass(cls): | ||
| cls.cwd = os.getcwd() | ||
| @classmethod | ||
| def tearDownClass(self): | ||
| os.chdir(self.cwd) | ||
| try: | ||
| shutil.rmtree(f"{self.cwd}/tests/tmp/") | ||
| except FileNotFoundError: | ||
| pass | ||
| def tearDown(self): | ||
| os.chdir(self.cwd) | ||
| shutil.rmtree(f"{self.cwd}/tests/tmp/") | ||
| def setUp(self): | ||
| reload(asimov) | ||
| reload(manage) | ||
| os.makedirs(f"{self.cwd}/tests/tmp/project") | ||
| os.chdir(f"{self.cwd}/tests/tmp/project") | ||
| runner = CliRunner() | ||
| result = runner.invoke( | ||
| project.init, ["Test Project", "--root", f"{self.cwd}/tests/tmp/project"] | ||
| ) | ||
| assert result.exit_code == 0 | ||
| assert result.output == "● New project created successfully!\n" | ||
| self.ledger = YAMLLedger(".asimov/ledger.yml") | ||
| f = io.StringIO() | ||
| with contextlib.redirect_stdout(f): | ||
| apply_page( | ||
| file=f"{self.cwd}/tests/test_data/testing_pe.yaml", | ||
| event=None, | ||
| ledger=self.ledger, | ||
| ) | ||
| apply_page( | ||
| file=f"{self.cwd}/tests/test_data/testing_events.yaml", | ||
| ledger=self.ledger, | ||
| ) | ||
| class TestSimpleAnalysis(TestBaseAnalysis): | ||
| pass | ||
| class TestEventAnalysis(TestBaseAnalysis): | ||
| pass | ||
| class TestProjectAnalysis(TestBaseAnalysis): | ||
| pass | ||
| class TestSubjectAnalysis(TestBaseAnalysis): | ||
| """ | ||
| Tests for subject analysis code. | ||
| """ | ||
| pipelines = {"bayeswave"} | ||
| EVENTS = ["GW150914_095045", "GW151226_033853", "GW170814_103043"] | ||
| def setUp(self): | ||
| super().setUp() | ||
| def test_all_events_present(self): | ||
| """Test that all of the expected events are in the ledger.""" | ||
| self.assertEqual(list(self.ledger.events.keys()), self.EVENTS) | ||
| def test_that_analyses_are_matched(self): | ||
| pass | ||
| # class TestProjectAnalysis(TestBaseAnalysis): | ||
| # """ | ||
| # Tests for Project Analysis code. | ||
| # """ | ||
| # def setUp(self): | ||
| # super().setUp() | ||
| # apply_page( | ||
| # file=f"{self.cwd}/tests/test_data/test_joint_analysis.yaml", | ||
| # ledger=self.ledger, | ||
| # ) | ||
| # def test_that_subjects_are_accessible(self): | ||
| # """Test that it is possible to get a list of subjects from a project analysis.""" | ||
| # self.assertTrue( | ||
| # self.ledger.project_analyses[0].subjects | ||
| # == ["GW150914_095045", "GW151226_033853"] | ||
| # ) | ||
| # self.assertTrue( | ||
| # self.ledger.project_analyses[0].events | ||
| # == ["GW150914_095045", "GW151226_033853"] | ||
| # ) | ||
| # def test_that_all_waveform_analyses_are_returned(self): | ||
| # """Test that the query returns all of the jobs specifying a specific waveform.""" | ||
| # apply_page( | ||
| # file=f"{self.cwd}/tests/test_data/test_analyses.yaml", | ||
| # ledger=self.ledger, | ||
| # ) | ||
| # self.assertEqual(len(self.ledger.events), 3) | ||
| # p_analysis = self.ledger.project_analyses | ||
| # print(p_analysis[1]._analysis_spec) | ||
| # analyses = p_analysis[1].analyses | ||
| # self.assertEqual(len(analyses), 3) | ||
| # def test_that_all_uploaded_analyses_are_returned(self): | ||
| # """Test that the query returns all of the jobs specifying a specific waveform.""" | ||
| # apply_page( | ||
| # file=f"{self.cwd}/tests/test_data/test_analyses.yaml", | ||
| # ledger=self.ledger, | ||
| # ) | ||
| # p_analysis = self.ledger.project_analyses | ||
| # analyses = p_analysis[2].analyses | ||
| # self.assertEqual(len(analyses), 3) | ||
| # def test_that_all_reviewed_analyses_are_returned(self): | ||
| # """Test that the query returns all of the jobs with a specific review state.""" | ||
| # apply_page( | ||
| # file=f"{self.cwd}/tests/test_data/test_analyses.yaml", | ||
| # ledger=self.ledger, | ||
| # ) | ||
| # p_analysis = self.ledger.project_analyses | ||
| # analyses = p_analysis[3].analyses | ||
| # self.assertEqual(len(analyses), 2) | ||
| # def test_mutliple_filters(self): | ||
| # """Test that the query returns all of the jobs with a specific review state.""" | ||
| # apply_page( | ||
| # file=f"{self.cwd}/tests/test_data/test_analyses.yaml", | ||
| # ledger=self.ledger, | ||
| # ) | ||
| # p_analysis = self.ledger.project_analyses | ||
| # analyses = p_analysis[4].analyses | ||
| # self.assertEqual(len(analyses), 1) | ||
| # for analysis in analyses: | ||
| # self.assertTrue(str(analysis.pipeline).lower() == "lalinference") | ||
| # self.assertTrue(str(analysis.meta['waveform']['approximant']).lower() == "imrphenomxphm") |
@@ -64,3 +64,3 @@ """ | ||
| runner = CliRunner() | ||
| result = runner.invoke(manage.manage, ['build']) | ||
@@ -131,10 +131,10 @@ for event in EVENTS: | ||
| f = io.StringIO() | ||
| with contextlib.redirect_stdout(f): | ||
| apply_page(file = "https://git.ligo.org/asimov/data/-/raw/main/defaults/production-pe.yaml", event=None, ledger=self.ledger) | ||
| apply_page(file = "https://git.ligo.org/asimov/data/-/raw/main/defaults/production-pe-priors.yaml", event=None, ledger=self.ledger) | ||
| for event in EVENTS: | ||
| for pipeline in pipelines: | ||
| apply_page(file = f"https://git.ligo.org/asimov/data/-/raw/main/tests/{event}.yaml", event=None, ledger=self.ledger) | ||
| apply_page(file = f"https://git.ligo.org/asimov/data/-/raw/main/tests/{pipeline}.yaml", event=event, ledger=self.ledger) | ||
| #f = io.StringIO() | ||
| #with contextlib.redirect_stdout(f): | ||
| apply_page(file = "https://git.ligo.org/asimov/data/-/raw/main/defaults/production-pe.yaml", event=None, ledger=self.ledger) | ||
| apply_page(file = "https://git.ligo.org/asimov/data/-/raw/main/defaults/production-pe-priors.yaml", event=None, ledger=self.ledger) | ||
| for event in EVENTS: | ||
| for pipeline in pipelines: | ||
| apply_page(file = f"https://git.ligo.org/asimov/data/-/raw/main/tests/{event}.yaml", event=None, ledger=self.ledger) | ||
| apply_page(file = f"https://git.ligo.org/asimov/data/-/raw/main/tests/{pipeline}.yaml", event=event, ledger=self.ledger) | ||
@@ -141,0 +141,0 @@ def test_buildsubmit_all_events(self): |
+35
-10
@@ -35,19 +35,34 @@ import os | ||
| apply_page(file = "https://git.ligo.org/asimov/data/-/raw/main/defaults/production-pe.yaml", event=None, ledger=self.ledger) | ||
| apply_page(file = "https://git.ligo.org/asimov/data/-/raw/main/events/gwtc-2-1/GW150914_095045.yaml", event=None, ledger=self.ledger) | ||
| apply_page(file = f"{self.cwd}/tests/test_data/events_blueprint.yaml", ledger=self.ledger) | ||
| def tearDown(self): | ||
| del(self.ledger) | ||
| shutil.rmtree(f"{self.cwd}/tests/tmp/project") | ||
| def test_simple_dag(self): | ||
| """Check that all jobs are run when there are no dependencies specified.""" | ||
| apply_page(file = f"{self.cwd}/tests/test_data/test_simple_dag.yaml", event='GW150914_095045', ledger=self.ledger) | ||
| def test_dependency_list(self): | ||
| """Check that all jobs are run when the dependencies are a chain.""" | ||
| self.assertTrue(len(self.ledger.get_event('GW150914_095045')[0].productions)==0) | ||
| apply_page(file = f"{self.cwd}/tests/test_data/test_linear_dag.yaml", ledger=self.ledger) | ||
| event = self.ledger.get_event('GW150914_095045')[0] | ||
| self.assertEqual(len(event.get_all_latest()), 2) | ||
| self.assertTrue(len(event.productions[0]._needs) == 0) | ||
| self.assertTrue(len(event.productions[0].dependencies) == 0) | ||
| self.assertTrue(len(event.productions[1].dependencies) == 1) | ||
| def test_dependency_tree(self): | ||
| apply_page(file = f"{self.cwd}/tests/test_data/test_linear_dag.yaml", ledger=self.ledger) | ||
| event = self.ledger.get_event('GW150914_095045')[0] | ||
| self.assertTrue(len(event.graph.edges) == 1) | ||
| def test_linear_dag(self): | ||
| """Check that all jobs are run when the dependencies are a chain.""" | ||
| apply_page(file = f"{self.cwd}/tests/test_data/test_linear_dag.yaml", event='GW150914_095045', ledger=self.ledger) | ||
| apply_page(file = f"{self.cwd}/tests/test_data/test_linear_dag.yaml", ledger=self.ledger) | ||
| event = self.ledger.get_event('GW150914_095045')[0] | ||
| self.assertEqual(len(event.get_all_latest()), 1) | ||
| def test_simple_dag(self): | ||
| """Check that all jobs are run when there are no dependencies specified.""" | ||
| apply_page(file = f"{self.cwd}/tests/test_data/test_simple_dag.yaml", ledger=self.ledger) | ||
| event = self.ledger.get_event('GW150914_095045')[0] | ||
| self.assertEqual(len(event.get_all_latest()), 2) | ||
@@ -59,2 +74,12 @@ def test_complex_dag(self): | ||
| event = self.ledger.get_event('GW150914_095045')[0] | ||
| print([ev.name for ev in event.get_all_latest()]) | ||
| self.assertEqual(len(event.get_all_latest()), 2) | ||
| def test_query_dag(self): | ||
| """Check that all jobs are run when the dependencies are not a chain.""" | ||
| apply_page(file = f"{self.cwd}/tests/test_data/test_query_dag.yaml", event='GW150914_095045', ledger=self.ledger) | ||
| event = self.ledger.get_event('GW150914_095045')[0] | ||
| self.assertEqual(len(event.get_all_latest()), 1) | ||
@@ -6,3 +6,3 @@ --- | ||
| comment: PSD production | ||
| status: finished | ||
| status: uploaded | ||
| --- | ||
@@ -13,3 +13,3 @@ kind: analysis | ||
| comment: PSD production | ||
| status: wait | ||
| status: ready | ||
| needs: | ||
@@ -22,3 +22,3 @@ - Prod0 | ||
| comment: PSD production | ||
| status: wait | ||
| status: ready | ||
| needs: | ||
@@ -31,3 +31,3 @@ - Prod1 | ||
| comment: PSD production | ||
| status: wait | ||
| status: ready | ||
| needs: | ||
@@ -40,3 +40,3 @@ - Prod0 | ||
| comment: PSD production | ||
| status: wait | ||
| status: ready | ||
| needs: | ||
@@ -43,0 +43,0 @@ - Prod2 |
@@ -5,2 +5,4 @@ kind: analysis | ||
| comment: Bayeswave on-source PSD estimation job | ||
| event: GW150914_095045 | ||
| status: ready | ||
| --- | ||
@@ -14,1 +16,3 @@ kind: analysis | ||
| - Prod0 | ||
| event: GW150914_095045 | ||
| status: ready |
@@ -5,7 +5,9 @@ kind: analysis | ||
| comment: Bayeswave on-source PSD estimation job | ||
| event: GW150914_095045 | ||
| --- | ||
| kind: analysis | ||
| name: Prod1 | ||
| event: GW150914_095045 | ||
| pipeline: bilby | ||
| approximant: IMRPhenomXPHM | ||
| comment: Bilby parameter estimation job |
| """Tests for the LALInference interface.""" | ||
| import unittest | ||
| from unittest.mock import Mock, patch | ||
| from unittest.mock import patch | ||
| import shutil | ||
@@ -10,17 +9,2 @@ import os | ||
| import io | ||
| import contextlib | ||
| from click.testing import CliRunner | ||
| from asimov.utils import set_directory | ||
| from asimov import config | ||
| from asimov.cli import project, manage | ||
| from asimov.cli import configuration | ||
| from asimov.cli.application import apply_page | ||
| from asimov.ledger import YAMLLedger | ||
| from asimov.event import Event | ||
| from asimov.pipeline import PipelineException | ||
| from asimov.pipelines.bayeswave import BayesWave | ||
@@ -44,2 +28,3 @@ from asimov.event import Event | ||
| @unittest.skip("Skipped until Bayeswave is added to the testing environment correctly.") | ||
| class BayeswaveTests(unittest.TestCase): | ||
@@ -58,2 +43,6 @@ """Test bayeswave interface. | ||
| cls.cwd = os.getcwd() | ||
| repo = git.Repo.init(cls.cwd+"/tests/test_data/s000000xx/") | ||
| os.chdir(cls.cwd+"/tests/test_data/s000000xx/") | ||
| os.system("git add C01_offline/Prod1_test.ini C01_offline/s000000xx_gpsTime.txt") | ||
| os.system("git commit -m 'test'") | ||
@@ -138,13 +127,13 @@ def setUp(self): | ||
| f = io.StringIO() | ||
| with contextlib.redirect_stdout(f): | ||
| production = self.ledger.get_event(event)[0].productions[0] | ||
| with set_directory(os.path.join("checkouts", event, config.get("general", "calibration_directory"))): | ||
| production.make_config(f"{production.name}.ini") | ||
| # We need to make the workdir as this ought to be done by bayeswave_pipe | ||
| os.makedirs(os.path.join("working", event, production.name)) | ||
| #with contextlib.redirect_stdout(f): | ||
| production = self.ledger.get_event(event)[0].productions[0] | ||
| with set_directory(os.path.join("checkouts", event, config.get("general", "calibration_directory"))): | ||
| # We need to make the workdir as this ought to be done by bayeswave_pipe | ||
| production.make_config(f"{production.name}.ini") | ||
| os.makedirs(os.path.join("working", event, production.name)) | ||
| mock_popen.returncode=0 | ||
| mock_popen.return_value.communicate.return_value=(b"Blah blah blah To submit: just run this", b"Lots of stuff on stderr") | ||
| mock_popen.returncode=0 | ||
| mock_popen.return_value.communicate.return_value=(b"Blah blah blah To submit: just run this", b"Lots of stuff on stderr") | ||
| production.pipeline.build_dag(dryrun=False) | ||
| production.pipeline.build_dag(dryrun=False) | ||
@@ -156,5 +145,4 @@ with contextlib.redirect_stdout(f): | ||
| production.pipeline.submit_dag(dryrun=False) | ||
| self.ledger.update_event(production.event) | ||
| self.ledger.update_event(production.event) | ||
@@ -192,21 +180,24 @@ self.assertEqual(production.job_id, 999) | ||
| @patch('subprocess.Popen.communicate') | ||
| @patch('subprocess.Popen') | ||
| def test_bad_dag_build(self, mock_popen, mock_pcomm): | ||
| """Check that things behave as expected if the DAG file can't be build.""" | ||
| apply_page(file = "https://git.ligo.org/asimov/data/-/raw/main/defaults/production-pe.yaml", event=None, ledger=self.ledger) | ||
| apply_page(file = "https://git.ligo.org/asimov/data/-/raw/main/defaults/production-pe-priors.yaml", event=None, ledger=self.ledger) | ||
| event = "GW150914_095045" | ||
| pipeline = "bayeswave" | ||
| apply_page(file = f"https://git.ligo.org/asimov/data/-/raw/main/tests/{event}.yaml", event=None, ledger=self.ledger) | ||
| apply_page(file = f"https://git.ligo.org/asimov/data/-/raw/main/tests/{pipeline}.yaml", event=event, ledger=self.ledger) | ||
| @classmethod | ||
| def tearDownClass(cls): | ||
| """Destroy all the products of this test.""" | ||
| os.system(f"{cls.cwd}/tests/tmp/-rf") | ||
| os.system(f"{cls.cwd}/tests/test_data/s000000xx/.git -rf") | ||
| try: | ||
| shutil.rmtree("/tmp/S000000xx") | ||
| except: | ||
| pass | ||
| mock_popen.returncode=1 #"Could not build the DAG file" | ||
| mock_popen.return_value.communicate.return_value=(b"Could not be created", b"Lots of stuff on stderr") | ||
| def tearDown(self): | ||
| os.system(f"{self.cwd}/tests/tmp/-rf") | ||
| production = self.ledger.get_event(event)[0].productions[0] | ||
| with set_directory(os.path.join("checkouts", event, config.get("general", "calibration_directory"))): | ||
| production.make_config(f"{production.name}.ini") | ||
| with self.assertRaises(PipelineException): | ||
| production.pipeline.build_dag(dryrun=False) | ||
| def setUp(self): | ||
| """Create a pipeline.""" | ||
| self.event = Event.from_yaml(TEST_YAML.format(self.cwd)) | ||
| self.pipeline = LALInference(self.event.productions[0]) | ||
| out = self.pipeline.build_dag() | ||
| def test_dag(self): | ||
| """Check that a DAG is actually produced.""" | ||
| print(f"{self.cwd}/tests/tmp/s000000xx/C01_offline/Prod1/lalinference_1248617392-1248617397.dag") | ||
| self.assertEqual(os.path.exists(f"{self.cwd}/tests/tmp/s000000xx/C01_offline/Prod1/lalinference_1248617392-1248617397.dag"), 1) |
@@ -54,3 +54,3 @@ """Tests for the RIFT interface.""" | ||
| # @unittest.skip("Skipped temporarily while RIFT is updated") | ||
| @unittest.skip("Skipped temporarily while RIFT is updated") | ||
| def test_submit_cli(self): | ||
@@ -72,4 +72,6 @@ """Check that a RIFT config file can be built.""" | ||
| result = runner.invoke(manage.submit, "--dryrun") | ||
| print(result.output) | ||
| self.assertTrue("util_RIFT_pseudo_pipe.py" in result.output) | ||
| @unittest.skip("Skipped temporarily while RIFT is updated") | ||
| def test_build_api(self): | ||
@@ -99,3 +101,3 @@ """Check that a RIFT config file can be built.""" | ||
| @unittest.skip("Skipped temporarily while RIFT is updated") | ||
| def test_build_api_non_default_calibration(self): | ||
@@ -102,0 +104,0 @@ """Check that a RIFT correctly picks up non C01 calibration.""" |
+22
-17
@@ -38,3 +38,3 @@ import unittest | ||
| productions: | ||
| - Prod0: | ||
| - name: Prod0 | ||
| pipeline: lalinference | ||
@@ -66,3 +66,3 @@ comment: PSD production | ||
| productions: | ||
| - Prod0: | ||
| - name: Prod0 | ||
| pipeline: lalinference | ||
@@ -100,5 +100,5 @@ comment: PSD production | ||
| def test_empty_register_creation(self): | ||
| """Check that productions initialise an empty register if no review information attached.""" | ||
| self.assertEqual(len(self.event_no_review.productions[0].review), 0) | ||
| # def test_empty_register_creation(self): | ||
| # """Check that productions initialise an empty register if no review information attached.""" | ||
| # self.assertEqual(len(self.event_no_review.productions[0].review), 0) | ||
@@ -131,2 +131,5 @@ def test_review_message_sort(self): | ||
| def setUp(self): | ||
| project_dir = f"{self.cwd}/tests/tmp/" | ||
| if os.path.exists(project_dir): | ||
| shutil.rmtree(project_dir) | ||
| os.makedirs(f"{self.cwd}/tests/tmp/project") | ||
@@ -150,2 +153,16 @@ os.chdir(f"{self.cwd}/tests/tmp/project") | ||
| # def test_show_review_no_review(self): | ||
| # """Check that the CLI can show a review report with no reviews""" | ||
| # with patch("asimov.current_ledger", new=YAMLLedger(".asimov/ledger.yml")): | ||
| # reload(asimov) | ||
| # reload(review) | ||
| # runner = CliRunner() | ||
| # for event in EVENTS: | ||
| # result = runner.invoke(review.review, | ||
| # ['status', event]) | ||
| # print(result.output) | ||
| # self.assertTrue(f"No review information exists for this production." in result.output) | ||
| def test_add_review_to_event_reject(self): | ||
@@ -187,14 +204,2 @@ """Check that the CLI can add an event review""" | ||
| def test_show_review_no_review(self): | ||
| """Check that the CLI can show a review report with no reviews""" | ||
| with patch("asimov.current_ledger", new=YAMLLedger(".asimov/ledger.yml")): | ||
| reload(asimov) | ||
| reload(review) | ||
| runner = CliRunner() | ||
| for event in EVENTS: | ||
| result = runner.invoke(review.review, | ||
| ['status', event]) | ||
| self.assertTrue(f"No review information exists for this production." in result.output) | ||
| def test_show_review_with_review(self): | ||
@@ -201,0 +206,0 @@ """Check that the CLI can show a review report with no reviews""" |
+38
-37
@@ -25,3 +25,3 @@ """Test that yaml event formats are parsed and written correctly.""" | ||
| productions: | ||
| - Prod0: | ||
| - name: Prod0 | ||
| pipeline: lalinference | ||
@@ -41,3 +41,3 @@ comment: PSD production | ||
| productions: | ||
| - Prod0: | ||
| - name: Prod0 | ||
| comment: PSD production | ||
@@ -56,3 +56,3 @@ status: wait | ||
| productions: | ||
| - Prod0: | ||
| - name: Prod0 | ||
| comment: PSD production | ||
@@ -93,6 +93,6 @@ status: wait | ||
| def test_no_name_error(self): | ||
| """Check an exception is raised if the event name is missing.""" | ||
| with self.assertRaises(asimov.event.DescriptionException): | ||
| asimov.event.Event.from_yaml(BAD_YAML.format(self.cwd)) | ||
| # def test_no_name_error(self): | ||
| # """Check an exception is raised if the event name is missing.""" | ||
| # with self.assertRaises(asimov.event.DescriptionException): | ||
| # asimov.event.Event.from_yaml(BAD_YAML.format(self.cwd)) | ||
@@ -127,4 +127,4 @@ class ProductionTests(unittest.TestCase): | ||
| production = dict(status="wait", pipeline="lalinference") | ||
| with self.assertRaises(TypeError): | ||
| asimov.event.Production.from_dict(production, event=self.event) | ||
| with self.assertRaises(ValueError): | ||
| asimov.event.Production.from_dict(production, subject=self.event) | ||
@@ -134,4 +134,4 @@ def test_missing_pipeline(self): | ||
| production = {"S000000x": dict(status="wait")} | ||
| with self.assertRaises(asimov.event.DescriptionException): | ||
| asimov.event.Production.from_dict(production, event=self.event) | ||
| with self.assertRaises(ValueError): | ||
| asimov.event.Production.from_dict(production, subject=self.event) | ||
@@ -141,4 +141,4 @@ def test_missing_status(self): | ||
| production = {"S000000x": dict(pipeline="lalinference")} | ||
| with self.assertRaises(asimov.event.DescriptionException): | ||
| asimov.event.Production.from_dict(production, event=self.event) | ||
| with self.assertRaises(ValueError): | ||
| asimov.event.Production.from_dict(production, subject=self.event) | ||
@@ -158,14 +158,14 @@ def test_production_prior_read(self): | ||
| productions: | ||
| - Prod0: | ||
| comment: PSD production | ||
| pipeline: lalinference | ||
| priors: | ||
| q: [0.0, 0.05] | ||
| status: wait | ||
| - Prod1: | ||
| comment: PSD production | ||
| pipeline: lalinference | ||
| priors: | ||
| q: [0.0, 1.0] | ||
| status: wait | ||
| - name: Prod0 | ||
| comment: PSD production | ||
| pipeline: lalinference | ||
| priors: | ||
| q: [0.0, 0.05] | ||
| status: wait | ||
| - name: Prod1 | ||
| comment: PSD production | ||
| pipeline: lalinference | ||
| priors: | ||
| q: [0.0, 0.8] | ||
| status: wait | ||
| """ | ||
@@ -179,4 +179,4 @@ event = asimov.event.Event.from_yaml( | ||
| self.assertEqual(prod0.meta['priors']['q'][1], 0.05) | ||
| self.assertEqual(prod1.priors['q'][1], 1.00) | ||
| self.assertEqual(prod1.meta['priors']['q'][1], 1.00) | ||
| self.assertEqual(prod1.priors['q'][1], 0.8) | ||
| self.assertEqual(prod1.meta['priors']['q'][1], 0.8) | ||
@@ -196,14 +196,14 @@ def test_production_prior_preserved(self): | ||
| productions: | ||
| - Prod0: | ||
| comment: PSD production | ||
| pipeline: lalinference | ||
| priors: | ||
| - name: Prod0 | ||
| comment: PSD production | ||
| pipeline: lalinference | ||
| priors: | ||
| q: [0.0, 0.05] | ||
| status: wait | ||
| - Prod1: | ||
| comment: PSD production | ||
| pipeline: lalinference | ||
| priors: | ||
| status: wait | ||
| - name: Prod1 | ||
| comment: PSD production | ||
| pipeline: lalinference | ||
| priors: | ||
| q: [0.0, 1.0] | ||
| status: wait | ||
| status: wait | ||
| """ | ||
@@ -215,2 +215,3 @@ event = asimov.event.Event.from_yaml( | ||
| event_YAML = event.to_yaml() | ||
| event2 = asimov.event.Event.from_yaml(event_YAML, ledger=self.ledger) | ||
@@ -217,0 +218,0 @@ prod0 = event.productions[0] |
-260
| """ | ||
| Code for interacting with a gitlab instance. | ||
| """ | ||
| import datetime | ||
| import time | ||
| import configparser | ||
| import gitlab | ||
| import yaml | ||
| from liquid import Liquid | ||
| from asimov import config | ||
| from .event import Event | ||
| from .ledger import Ledger | ||
| STATE_PREFIX = "C01" | ||
| class GitlabLedger(Ledger): | ||
| """ | ||
| Connect to a gitlab-based issue tracker. | ||
| """ | ||
| def __init__( | ||
| self, configs, milestone=None, subset=None, update=False, repo=True, label=None | ||
| ): | ||
| """ | ||
| Search through a repository's issues and find all of the ones | ||
| for events. | ||
| """ | ||
| _, self.repository = self._connect_gitlab() | ||
| if subset == [None]: | ||
| subset = None | ||
| if not label: | ||
| event_label = config.get("gitlab", "event_label") | ||
| else: | ||
| event_label = label | ||
| try: | ||
| sleep_time = int(config.get("gitlab", "rest_time")) | ||
| except configparser.NoOptionError: | ||
| sleep_time = 30 | ||
| issues = self.repository.issues.list(labels=[event_label], per_page=1000) | ||
| output = [] | ||
| if subset: | ||
| for issue in issues: | ||
| if issue.title in subset: | ||
| output += [EventIssue(issue, self.repository, update, repo=repo)] | ||
| if update: | ||
| time.sleep(sleep_time) | ||
| else: | ||
| for issue in issues: | ||
| output += [EventIssue(issue, self.repository, update, repo=repo)] | ||
| if update: | ||
| time.sleep(sleep_time) | ||
| self.data["events"] = output | ||
| self.events = {ev["name"]: ev for ev in self.data["events"]} | ||
| def _connect_gitlab(self): | ||
| """ | ||
| Connect to the gitlab server. | ||
| Returns | ||
| ------- | ||
| server : `Gitlab` | ||
| The gitlab server. | ||
| repository: `Gitlab.project` | ||
| The gitlab project. | ||
| """ | ||
| server = gitlab.gitlab.Gitlab( | ||
| config.get("gitlab", "server"), private_token=config.get("gitlab", "token") | ||
| ) | ||
| repository = server.projects.get(config.get("gitlab", "tracking_repository")) | ||
| return server, repository | ||
| def get_event(self, event=None): | ||
| if event: | ||
| return self.events[event] | ||
| else: | ||
| return self.events.values() | ||
| @classmethod | ||
| def add_event(self, event_object, issue_template=None): | ||
| """ | ||
| Create an issue for an event. | ||
| """ | ||
| if not issue_template: | ||
| from pkg_resources import resource_filename | ||
| issue_template = resource_filename("asimov", "gitlabissue.md") | ||
| with open(issue_template, "r") as template_file: | ||
| liq = Liquid(template_file.read()) | ||
| rendered = liq.render( | ||
| event_object=event_object, yaml=event_object.to_yaml() | ||
| ) | ||
| self.repository.issues.create( | ||
| {"title": event_object.name, "description": rendered} | ||
| ) | ||
| def update_event(self, event): | ||
| event.update_data() | ||
| def save(self): | ||
| pass | ||
| @classmethod | ||
| def create(cls): | ||
| raise NotImplementedError( | ||
| "You must create a gitlab issue tracker manually first." | ||
| ) | ||
| pass | ||
| class EventIssue(Event): | ||
| """ | ||
| Use an issue on the gitlab issue tracker to | ||
| hold variable data for the program. | ||
| Parameters | ||
| ---------- | ||
| issue : `gitlab.issue` | ||
| The issue which represents the event. | ||
| update : bool | ||
| Flag to determine if the git repository is updated | ||
| when it is loaded. Defaults to False to prevent | ||
| excessive load on the git server. | ||
| """ | ||
| def __init__(self, issue, repository, update=False, repo=True): | ||
| self.from_issue(self, issue, update, repo) | ||
| self.issue_object = issue | ||
| self.title = issue.title | ||
| self.text = issue.description | ||
| self.issue_id = issue.id | ||
| self.labels = issue.labels | ||
| self.data = self.parse_notes() | ||
| def from_issue(self, issue, update=False, repo=True): | ||
| """ | ||
| Parse an issue description to generate this event. | ||
| Parameters | ||
| ---------- | ||
| update : bool | ||
| Flag to determine if the repository is updated when loaded. | ||
| Defaults to False. | ||
| """ | ||
| text = issue.text.split("---") | ||
| event = self.from_yaml(text[1], issue, update=update, repo=repo) | ||
| event.text = text | ||
| # event.from_notes() | ||
| return event | ||
| def _refresh(self): | ||
| if self.repository: | ||
| self.issue_object = self.repository.issues.get(self.issue_object.iid) | ||
| self.text = self.issue_object.description.split("---") | ||
| else: | ||
| pass | ||
| @property | ||
| def state(self): | ||
| """ | ||
| Get the state of the event's runs. | ||
| """ | ||
| for label in self.issue_object.labels: | ||
| if f"{STATE_PREFIX}::" in label: | ||
| return label[len(STATE_PREFIX) + 2:] | ||
| return None | ||
| @state.setter | ||
| def state(self, state): | ||
| """ | ||
| Set the event state. | ||
| """ | ||
| self._refresh() | ||
| for label in self.issue_object.labels: | ||
| if f"{STATE_PREFIX}::" in label: | ||
| # Need to remove all of the other scoped labels first. | ||
| self.issue_object.labels.remove(label) | ||
| self.issue_object.labels += ["{}::{}".format(STATE_PREFIX, state)] | ||
| self.issue_object.save() | ||
| def add_note(self, text): | ||
| """ | ||
| Add a comment to the event issue. | ||
| A footer will be added to identify this as being created by the | ||
| supervisor and not the user. | ||
| """ | ||
| self._refresh() | ||
| now = datetime.datetime.now() | ||
| header = """""" | ||
| footer = f"""\nAdded at {now:%H}:{now:%M}, {now:%Y}-{now:%m}-{now:%d} by the run supervision robot :robot:.""" | ||
| self.issue_object.notes.create({"body": header + text + footer}) | ||
| self.issue_object.save() | ||
| def add_label(self, label, state=True): | ||
| """ | ||
| Add a new label to an event issue. | ||
| Parameters | ||
| ---------- | ||
| label : str | ||
| The name of the label. | ||
| """ | ||
| self._refresh() | ||
| if state: | ||
| self.issue_object.labels += [f"{STATE_PREFIX}:{label}"] | ||
| else: | ||
| self.issue_object.labels += [f"{label}"] | ||
| self.issue_object.save() | ||
| def update_data(self): | ||
| """ | ||
| Store event data in the comments on the event repository. | ||
| """ | ||
| self._refresh() | ||
| self.issue_object.description = self.event_object.to_issue() | ||
| self.issue_object.save() | ||
| def parse_notes(self): | ||
| """ | ||
| Read issue information from the comments on the issue. | ||
| Only notes which start | ||
| ``` | ||
| # Run information | ||
| ``` | ||
| will be parsed. | ||
| """ | ||
| data = {} | ||
| notes = self.issue_object.notes.list(per_page=200) | ||
| note_data = [] | ||
| for note in reversed(notes): | ||
| if "---\n" in note.body: | ||
| data = note.body.split("---") | ||
| if len(data) > 0: | ||
| data = data[1] | ||
| else: | ||
| continue | ||
| data = yaml.safe_load(data) | ||
| note_data.append(data) | ||
| return note_data |
| --------------------- | ||
| The production ledger | ||
| --------------------- | ||
| In order to construct and monitor on-going PE jobs, asimov stores metadata for each gravitational wave event and for each PE job (or "production" in `asimov` terminology). | ||
| These metadata are stored in the event's *production ledger*, which is specified in `yaml` format. | ||
| A number of fields are required for `asimov` to correctly process an event, while a number of additional fields will allow it to perform more operations automatically, or over-ride defaults. | ||
| Normally you shouldn't need to edit the ledger file directly, but if you're setting up new analyses or want to check the configuration of a pre-existing analysis then it can be helpful to have an understanding of the quantities contained in it. | ||
| from asimov import config | ||
| from asimov.cli import connect_gitlab, ACTIVE_STATES, known_pipelines | ||
| from asimov import gitlab | ||
| from asimov.condor import CondorJobList, CondorJob | ||
| import yaml | ||
| # server, repository = connect_gitlab() | ||
| # events = gitlab.find_events(repository, | ||
| # #milestone=config.get("olivaw", "milestone"), | ||
| # subset=["GW200115A"], | ||
| # update=False, | ||
| # repo=True) | ||
| ledger = "/home/pe.o3/public_html/LVC/o3a-final/ledger.yaml" | ||
| with open(ledger, "r") as filehandle: | ||
| events = yaml.safe_load(filehandle.read()) | ||
| jobs = CondorJobList() | ||
| #for job in jobs.jobs.values(): | ||
| # print(job) | ||
| for event in events: | ||
| productions = {} | ||
| for prod in event['productions']: | ||
| productions.update(prod) | ||
| for name, production in productions.items(): | ||
| if production['status'] == "running": | ||
| print(event['name'], name) | ||
| if not "job id" in production: | ||
| print("Job ID is missing") | ||
| continue | ||
| if production['job id'] in jobs.jobs: | ||
| print(jobs.jobs[production['job id']]) | ||
| else: | ||
| print("\t Job is not running") | ||
| # on_deck = [production | ||
| # for production in event.productions | ||
| # if production.status.lower() in ACTIVE_STATES] | ||
| # for production in on_deck: | ||
| # print(production.name) | ||
| # print(production.meta['job id'] in jobs.jobs.keys()) | ||
Alert delta unavailable
Currently unable to show alert delta for PyPI packages.
871245
9.92%190
4.97%10928
10.05%