You're Invited:Meet the Socket Team at RSAC and BSidesSF 2026, March 23–26.RSVP
Socket
Book a DemoSign in
Socket

asimov

Package Overview
Dependencies
Maintainers
2
Versions
66
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

asimov - pypi Package Compare versions

Comparing version
0.5.8
to
0.6.0
+66
asimov/auth.py
"""
This module contains logic to assist with various
authorisation tasks against the open science grid.
This will ultimately be a good candidate to spin-out
into its own package.
"""
import subprocess
import configparser
import logging
from asimov import config
def refresh_scitoken(func):
"""
Decorator to refresh an existing scitoken.
"""
def wrapper(*args, **kwargs):
logger = logging.getLogger("asimov").getChild("auth")
try:
command = ["kinit"] + [config.get("authentication", "kinit options")]
pipe = subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
logger.info(" ".join(command))
out, err = pipe.communicate()
logger.info(out)
if err and len(err) > 0:
logger.error(err)
command = ["htgettoken"] + [
config.get("authentication", "htgettoken options")
]
pipe = subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
logger.info(" ".join(command))
out, err = pipe.communicate()
logger.info(out)
if err and len(err) > 0:
logger.error(err)
command = ["condor_vault_storer"] + [config.get("authentication", "scopes")]
pipe = subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
logger.info(" ".join(command))
out, err = pipe.communicate()
logger.info(out)
if err and len(err) > 0:
logger.error(err)
except configparser.NoSectionError:
# If authentication isn't set up then just ignore this.
pass
func(*args, **kwargs)
return wrapper
# This file isn't actually required except by asimov.
"""Defines the interface with generic analysis pipelines."""
import os
import warnings
warnings.filterwarnings("ignore", module="htcondor")
import htcondor # NoQA
from asimov import utils # NoQA
from asimov import config, logger, logging, LOGGER_LEVEL # NoQA
import otter # NoQA
from ..storage import Store # NoQA
from ..pipeline import Pipeline, PipelineException, PipelineLogger # NoQA
class PESummary(Pipeline):
"""
A postprocessing pipeline add-in using PESummary.
"""
executable = os.path.join(
config.get("pipelines", "environment"), "bin", "summarypages"
)
name = "PESummary"
def __init__(self, production, category=None):
self.production = production
self.category = category if category else production.category
self.logger = logger
self.meta = self.production.meta["postprocessing"][self.name.lower()]
def results(self):
"""
Fetch the results file from this post-processing step.
A dictionary of results will be returned with the description
of each results file as the key. These may be nested if it
makes sense for the output, for example skymaps.
For example
{'metafile': '/home/asimov/working/samples/metafile.hd5',
'skymaps': {'H1': '/another/file/path', ...}
}
Returns
-------
dict
A dictionary of the results.
"""
self.outputs = os.path.join(
config.get("project", "root"),
config.get("general", "webroot"),
self.subject.name,
)
self.outputs = os.path.join(self.outputs, self.production.name)
self.outputs = os.path.join(self.outputs, "pesummary")
metafile = os.path.join(self.outputs, "samples", "posterior_samples.h5")
return dict(metafile=metafile)
def submit_dag(self, dryrun=False):
"""
Run PESummary on the results of this job.
"""
configfile = self.production.event.repository.find_prods(
self.production.name, self.category
)[0]
label = str(self.production.name)
command = [
"--webdir",
os.path.join(
config.get("project", "root"),
config.get("general", "webroot"),
self.production.event.name,
self.production.name,
"pesummary",
),
"--labels",
label,
]
command += ["--gw"]
command += [
"--approximant",
self.production.meta["waveform"]["approximant"],
]
command += [
"--f_low",
str(min(self.production.meta["quality"]["minimum frequency"].values())),
"--f_ref",
str(self.production.meta["waveform"]["reference frequency"]),
]
if "cosmology" in self.meta:
command += [
"--cosmology",
self.meta["cosmology"],
]
if "redshift" in self.meta:
command += ["--redshift_method", self.meta["redshift"]]
if "skymap samples" in self.meta:
command += [
"--nsamples_for_skymap",
str(self.meta["skymap samples"]),
]
if "evolve spins" in self.meta:
if "forwards" in self.meta["evolve spins"]:
command += ["--evolve_spins_fowards", "True"]
if "backwards" in self.meta["evolve spins"]:
command += ["--evolve_spins_backwards", "precession_averaged"]
if "nrsur" in self.production.meta["waveform"]["approximant"].lower():
command += ["--NRSur_fits"]
if "multiprocess" in self.meta:
command += ["--multi_process", str(self.meta["multiprocess"])]
if "regenerate" in self.meta:
command += ["--regenerate", " ".join(self.meta["regenerate posteriors"])]
if "calculate" in self.meta:
if "precessing snr" in self.meta["calculate"]:
command += ["--calculate_precessing_snr"]
# Config file
command += [
"--config",
os.path.join(
self.production.event.repository.directory, self.category, configfile
),
]
# Samples
command += ["--samples"]
command += [self.production._previous_assets().get("samples", {})]
# PSDs
psds = {
ifo: os.path.abspath(psd)
for ifo, psd in self.production._previous_assets().get("psds", {}).items()
}
if len(psds) > 0:
command += ["--psds"]
for key, value in psds.items():
command += [f"{key}:{value}"]
# Calibration envelopes
cals = {
ifo: os.path.abspath(psd)
for ifo, psd in self.production._previous_assets()
.get("calibration", {})
.items()
}
if len(cals) > 0:
command += ["--calibration"]
for key, value in cals.items():
command += [f"{key}:{value}"]
with utils.set_directory(self.subject.work_dir):
with open("pesummary.sh", "w") as bash_file:
bash_file.write(f"{self.executable} " + " ".join(command))
self.logger.info(
f"PE summary command: {self.executable} {' '.join(command)}",
)
if dryrun:
print("PESUMMARY COMMAND")
print("-----------------")
print(" ".join(command))
self.subject = self.production.event
submit_description = {
"executable": self.executable,
"arguments": " ".join(command),
"output": f"{self.subject.work_dir}/pesummary.out",
"error": f"{self.subject.work_dir}/pesummary.err",
"log": f"{self.subject.work_dir}/pesummary.log",
"request_cpus": self.meta["multiprocess"],
"getenv": "true",
"batch_name": f"Summary Pages/{self.subject.name}/{self.production.name}",
"request_memory": "8192MB",
"should_transfer_files": "YES",
"request_disk": "8192MB",
}
if "accounting group" in self.meta:
submit_description["accounting_group_user"] = config.get("condor", "user")
submit_description["accounting_group"] = self.meta["accounting group"]
if dryrun:
print("SUBMIT DESCRIPTION")
print("------------------")
print(submit_description)
if not dryrun:
hostname_job = htcondor.Submit(submit_description)
try:
# There should really be a specified submit node, and if there is, use it.
schedulers = htcondor.Collector().locate(
htcondor.DaemonTypes.Schedd, config.get("condor", "scheduler")
)
schedd = htcondor.Schedd(schedulers)
except: # NoQA
# If you can't find a specified scheduler, use the first one you find
schedd = htcondor.Schedd()
with schedd.transaction() as txn:
cluster_id = hostname_job.queue(txn)
else:
cluster_id = 0
return cluster_id
.. _citation-guide:
Citation Guide
==============
If you used asimov in part of your analysis, or it contributed towards results in a paper we'd appreciate it if you would cite the original paper which describes asimov, which was published in the Journal of Open Source Software.
Williams, D. et al., "Asimov: A framework for coordinating parameter estimation workflows", *The Journal of Open Source Software*, vol. 8, no. 84, Art. no. 4170, 2023. doi:10.21105/joss.04170.
BibTeX Entry
------------
.. code-block:: bibtex
@ARTICLE{asimov-paper,
author = {{Williams}, Daniel and {Veitch}, John and {Chiofalo}, Maria and {Schmidt}, Patricia and {Udall}, Rhiannon and {Vajpeyi}, Avi and {Hoy}, Charlie},
title = "{Asimov: A framework for coordinating parameter estimation workflows}",
journal = {The Journal of Open Source Software},
keywords = {Python, astronomy, gravitational waves, General Relativity and Quantum Cosmology, Physics - Data Analysis, Statistics and Probability},
year = 2023,
month = apr,
volume = {8},
number = {84},
eid = {4170},
pages = {4170},
doi = {10.21105/joss.04170},
archivePrefix = {arXiv},
eprint = {2207.01468},
primaryClass = {gr-qc},
}
Citing specific versions of the software
----------------------------------------
We use Zenodo to publish releases, and each release receives a DOI which can be cited.
These should be cited, where relevant, in addition to the paper above, in the format:
Daniel Williams, Duncan Macleod, Avi Vajpeyi, James Clark, & Richard O'Shaughnessy. (2024). transientlunatic/asimov: v0.5.8 (v0.5.8). Zenodo. https://doi.org/10.5281/zenodo.13992299
(See `this page<https://doi.org/10.5281/zenodo.4024432>`_ for the latest version, and links to previous versions' DOIs, where links to BibTeX snippets for relevant versions are also available).
data:
channels:
H1: H1:DCS-CALIB_STRAIN_C02
L1: L1:DCS-CALIB_STRAIN_C02
frame types:
H1: H1_HOFT_C02
L1: L1_HOFT_C02
segment length: 4
event time: 1126259462.391
interferometers:
- H1
- L1
kind: event
likelihood:
psd length: 4
reference frequency: 20
sample rate: 2048
segment start: 1126259460.391
start frequency: 13.333333333333334
window length: 4
name: GW150914_095045
priors:
amplitude order: 1
chirp mass:
maximum: 41.97447913941358
minimum: 21.418182160215295
luminosity distance:
maximum: 10000
minimum: 10
mass 1:
maximum: 1000
minimum: 1
mass ratio:
maximum: 1.0
minimum: 0.05
quality:
minimum frequency:
H1: 20
L1: 20
---
data:
channels:
H1: H1:DCS-CALIB_STRAIN_C02
L1: L1:DCS-CALIB_STRAIN_C02
frame types:
H1: H1_HOFT_C02
L1: L1_HOFT_C02
segment length: 8
event time: 1128678900.444336
interferometers:
- H1
- L1
kind: event
likelihood:
psd length: 8
reference frequency: 20
sample rate: 2048
segment start: 1128678894.444336
start frequency: 13.333333333333334
window length: 8
name: GW151012_095443
priors:
amplitude order: 1
chirp mass:
maximum: 30.55016955177058
minimum: 11.331236548800117
luminosity distance:
maximum: 10000
minimum: 10
mass 1:
maximum: 1000
minimum: 1
mass ratio:
maximum: 1.0
minimum: 0.05
quality:
minimum frequency:
H1: 20
L1: 20
kind: ProjectAnalysis
name: TestPA
pipeline: TestPipeline
subjects:
- GW150914_095045
- GW151012_095443
kind: analysis
name: Prod0
event: GW150914_095045
pipeline: bayeswave
---
kind: analysis
name: Prod1
event: GW150914_095045
pipeline: bilby
waveform:
approximant: IMRPhenomXPHM
---
kind: analysis
name: Prod0
event: GW151226_033853
pipeline: bayeswave
---
kind: analysis
name: Prod1
event: GW151226_033853
pipeline: bilby
waveform:
approximant: IMRPhenomXPHM
---
kind: analysis
name: Prod2
event: GW151226_033853
pipeline: lalinference
waveform:
approximant: SEOBNRv4PHM
status: uploaded
---
kind: analysis
name: Prod3
event: GW151226_033853
pipeline: lalinference
waveform:
approximant: SEOBNRv4PHM
status: uploaded
review:
- status: approved
message: "blah"
---
kind: analysis
name: Prod4
event: GW151226_033853
pipeline: lalinference
waveform:
approximant: IMRPhenomXPHM
status: uploaded
review:
- status: approved
message: "blah"
kind: ProjectAnalysis
# This kind of analysis is used where multiple events need to be
# analysed jointly.
name: JointTest
subjects:
- GW150914_095045
- GW151226_033853
analyses:
- pipeline:bayeswave
pipeline: asimov-testing-project
---
kind: ProjectAnalysis
# This kind of analysis is used where multiple events need to be
# analysed jointly.
name: JointTestWaveform
subjects:
- GW150914_095045
- GW151226_033853
analyses:
- waveform.approximant:IMRPhenomXPHM
pipeline: asimov-testing-project
---
kind: ProjectAnalysis
# This kind of analysis is used where multiple events need to be
# analysed jointly.
name: JointTestStatus
subjects:
- GW150914_095045
- GW151226_033853
analyses:
- status:uploaded
pipeline: asimov-testing-project
---
kind: ProjectAnalysis
# This kind of analysis is used where multiple events need to be
# analysed jointly.
name: JointTestReview
subjects:
- GW150914_095045
- GW151226_033853
analyses:
- review:approved
pipeline: asimov-testing-project
---
kind: ProjectAnalysis
# This kind of analysis is used where multiple events need to be
# analysed jointly.
name: JointTestReview2
subjects:
- GW150914_095045
- GW151226_033853
analyses:
- review:approved
- waveform.approximant:IMRPhenomXPHM
pipeline: asimov-testing-project
kind: postprocessing
name: standard pe postprocessing
stages:
- name: simple PE summary
pipeline: pesummary
- name: less simple PE summary
pipeline: pesummary
needs:
- simple PE summary
---
kind: analysis
name: analysis with postprocessing
event: GW150914_095045
postprocessing:
- standard pe postprocessing
pipeline: bilby
---
kind: postprocessing
name: combined summary pages for bilby
subject: all
analyses:
- pipeline:bilby
stages:
- name: combined pages
pipeline: pesummary
kind: analysis
name: Prod0
pipeline: bayeswave
comment: Bayeswave on-source PSD estimation job
event: GW150914_095045
status: uploaded
---
kind: analysis
name: Prod1
event: GW150914_095045
pipeline: bilby
approximant: IMRPhenomXPHM
comment: Bilby parameter estimation job
needs:
- pipeline:Bayeswave
data:
channels:
H1: H1:DCS-CALIB_STRAIN_C02
L1: L1:DCS-CALIB_STRAIN_C02
frame types:
H1: H1_HOFT_C02
L1: L1_HOFT_C02
segment length: 4
event time: 1126259462.391
interferometers:
- H1
- L1
kind: event
likelihood:
psd length: 4
reference frequency: 20
sample rate: 2048
segment start: 1126259460.391
start frequency: 13.333333333333334
window length: 4
name: GW150914_095045
priors:
amplitude order: 1
chirp mass:
maximum: 41.97447913941358
minimum: 21.418182160215295
luminosity distance:
maximum: 10000
minimum: 10
mass 1:
maximum: 1000
minimum: 1
mass ratio:
maximum: 1.0
minimum: 0.05
quality:
minimum frequency:
H1: 20
L1: 20
---
data:
channels:
H1: H1:DCS-CALIB_STRAIN_C02
L1: L1:DCS-CALIB_STRAIN_C02
frame types:
H1: H1_HOFT_C02
L1: L1_HOFT_C02
segment length: 16
event time: 1135136350.647758
interferometers:
- H1
- L1
kind: event
likelihood:
psd length: 16
reference frequency: 20
sample rate: 4096
segment start: 1135136336.647758
start frequency: 13.333333333333334
window length: 16
name: GW151226_033853
priors:
amplitude order: 1
chirp mass:
maximum: 11.825459113009993
minimum: 7.60927765774096
luminosity distance:
maximum: 10000
minimum: 100
mass 1:
maximum: 1000
minimum: 1
mass ratio:
maximum: 1.0
minimum: 0.05
quality:
minimum frequency:
H1: 20
L1: 20
---
data:
channels:
H1: H1:DCH-CLEAN_STRAIN_C02
L1: L1:DCH-CLEAN_STRAIN_C02
V1: V1:Hrec_hoft_V1O2Repro2A_16384Hz
frame types:
H1: H1_CLEANED_HOFT_C02
L1: L1_CLEANED_HOFT_C02
V1: V1O2Repro2A
segment length: 4
event time: 1186741861.52678
interferometers:
- H1
- L1
- V1
kind: event
likelihood:
psd length: 4
reference frequency: 20
sample rate: 2048
segment start: 1186741859.52678
start frequency: 13.333333333333334
window length: 4
name: GW170814_103043
priors:
amplitude order: 1
chirp mass:
maximum: 36.75187106272061
minimum: 19.520822504718232
luminosity distance:
maximum: 10000
minimum: 100
mass 1:
maximum: 1000
minimum: 1
mass ratio:
maximum: 1.0
minimum: 0.05
quality:
minimum frequency:
H1: 20
L1: 20
V1: 20
# """These tests are designed to be run on all pipelines to ensure that
# they are generally compliant within asimov's specifications. They can
# include checking that ini files have the correct information and that
# the pipelines report information as expected.
# """
# import os
# import io
# import unittest
# from unittest.mock import patch
# import shutil
# import git
# import contextlib
# from click.testing import CliRunner
# from importlib import reload
# import asimov.event
# from asimov.cli.project import make_project
# from asimov.cli.application import apply_page
# from asimov.ledger import YAMLLedger
# from asimov.pipelines import known_pipelines
# from asimov.testing import AsimovTestCase
# from asimov import config
# from asimov.utils import set_directory
# from asimov.cli import monitor
# class TestSimplePostProcessing(AsimovTestCase):
# def event_setup(self):
# f = io.StringIO()
# with contextlib.redirect_stdout(f):
# apply_page(
# f"{self.cwd}/tests/test_data/testing_pe.yaml",
# event=None,
# ledger=self.ledger,
# )
# apply_page(
# file=f"{self.cwd}/tests/test_data/testing_events.yaml",
# ledger=self.ledger,
# )
# apply_page(
# file=f"{self.cwd}/tests/test_data/test_analyses.yaml",
# ledger=self.ledger,
# )
# apply_page(
# file=f"{self.cwd}/tests/test_data/test_postprocessing.yaml",
# ledger=self.ledger,
# )
# def test_analysis_has_postprocesses(self):
# self.event_setup()
# post = self.ledger.get_event("GW150914_095045")[0].analyses[-1].postprocessing
# self.assertEqual(len(post), 1)
# def test_analysis_postprocess_stages(self):
# self.event_setup()
# post = self.ledger.get_event("GW150914_095045")[0].analyses[-1].postprocessing
# self.assertEqual(len(post[0].stages), 2)
# def test_analysis_postprocess_stages_into_pipelines(self):
# self.event_setup()
# post = self.ledger.get_event("GW150914_095045")[0].analyses[-1].postprocessing
# self.assertEqual(post[0].stages[0].pipeline.name.lower(), "pesummary")
# def test_analysis_postprocess_dag(self):
# """Test that the DAG of analysis stages is created"""
# self.event_setup()
# # Mark the analysis as finished
# os.chdir(f"{self.cwd}/tests/tmp/project")
# reload(asimov)
# analysis = asimov.current_ledger.get_event("GW150914_095045")[0].analyses[-1]
# analysis.status = "finished"
# asimov.current_ledger.update_event(analysis.event)
# reload(asimov)
# post = asimov.current_ledger.get_event("GW150914_095045")[0].analyses[-1].postprocessing
# self.assertEqual(len(post[0]._stages_dag), 2)
# self.assertEqual(len(post[0].next), 1)
# self.assertEqual(post[0].next[0], "simple PE summary")
# os.chdir(self.cwd)
# @patch('asimov.pipelines.bilby.Bilby.samples')
# def test_analysis_postprocess_run_next(self, mock_samples):
# mock_samples.return_value = ["/home/test/samples"]
# self.event_setup()
# os.chdir(f"{self.cwd}/tests/tmp/project")
# reload(asimov)
# analysis = asimov.current_ledger.get_event("GW150914_095045")[0].analyses[-1]
# analysis.status = "finished"
# asimov.current_ledger.update_event(analysis.event)
# reload(asimov)
# post = asimov.current_ledger.get_event("GW150914_095045")[0].analyses[-1].postprocessing
# f = io.StringIO()
# with contextlib.redirect_stdout(f):
# string = """'output': 'working/GW150914_095045/pesummary.out', 'error': 'working/GW150914_095045/pesummary.err', 'log': 'working/GW150914_095045/pesummary.log', 'request_cpus': 4, 'getenv': 'true', 'batch_name': 'Summary Pages/GW150914_095045', 'request_memory': '8192MB', 'should_transfer_files': 'YES', 'request_disk': '8192MB'"""
# post[0].run_next(dryrun=True)
# self.assertTrue(string in f.getvalue())
# os.chdir(self.cwd)
# @patch('asimov.pipelines.bilby.Bilby.samples')
# def test_analysis_postprocess_run_mocked_samples(self, mock_samples):
# mock_samples.return_value = ["/home/test/samples"]
# self.event_setup()
# os.chdir(f"{self.cwd}/tests/tmp/project")
# reload(asimov)
# analysis = asimov.current_ledger.get_event("GW150914_095045")[0].analyses[-1]
# analysis.status = "finished"
# asimov.current_ledger.update_event(analysis.event)
# reload(asimov)
# post = asimov.current_ledger.get_event("GW150914_095045")[0].analyses[-1].postprocessing
# f = io.StringIO()
# with contextlib.redirect_stdout(f):
# post[0].run_next(dryrun=True)
# self.assertTrue("--samples /home/test/samples" in f.getvalue())
# os.chdir(self.cwd)
# # Next we need to test that asimov monitor will actually start a post-processing job correctly.
# @patch('asimov.condor.CondorJobList')
# @patch('asimov.pipelines.bilby.Bilby.detect_completion')
# @patch('asimov.pipelines.bilby.Bilby.collect_assets')
# @patch('asimov.pipelines.bilby.Bilby.samples')
# def test_analysis_postprocess_monitor_mocked_samples(self, mock_samples, mock_assets, mock_complete, mock_condor):
# """Check that asimov monitor attempts to start a post-processing job"""
# class MockJobList:
# def __init__(self):
# pass
# def refresh(self):
# return []
# mock_samples.return_value = ["/home/test/samples"]
# mock_assets.return_value = {"samples": "/home/test/samples"}
# mock_complete.return_value = True
# mock_condor.return_value = MockJobList()
# self.event_setup()
# os.chdir(f"{self.cwd}/tests/tmp/project")
# runner = CliRunner()
# reload(asimov)
# reload(monitor)
# # Mark the analysis as running
# analysis = asimov.current_ledger.get_event("GW150914_095045")[0].analyses[-1]
# analysis.status = "finished"
# asimov.current_ledger.update_event(analysis.event)
# reload(asimov)
# analysis = asimov.current_ledger.get_event("GW150914_095045")[0].analyses[-1]
# # Run the monitor
# result = runner.invoke(monitor.monitor, "--dry-run")
# os.chdir(self.cwd)
# self.assertTrue("--samples /home/test/samples" in result.output)
# class TestEventPostProcessing(AsimovTestCase):
# def event_setup(self):
# # f = io.StringIO()
# # with contextlib.redirect_stdout(f):
# apply_page(
# f"{self.cwd}/tests/test_data/testing_pe.yaml",
# event=None,
# ledger=self.ledger,
# )
# apply_page(
# file=f"{self.cwd}/tests/test_data/testing_events.yaml",
# ledger=self.ledger,
# )
# apply_page(
# file=f"{self.cwd}/tests/test_data/test_analyses.yaml",
# ledger=self.ledger,
# )
# apply_page(
# file=f"{self.cwd}/tests/test_data/test_postprocessing.yaml",
# ledger=self.ledger,
# )
# def test_subject_has_postprocesses(self):
# self.event_setup()
# post = self.ledger.get_event("GW150914_095045")[0].postprocessing
# self.assertEqual(len(post), 1)
# def test_correct_analyses(self):
# """Test that the correct analyses are returned."""
# self.event_setup()
# post = self.ledger.get_event("GW150914_095045")[0].postprocessing
# self.assertEqual(str(list(post[0].analyses)[0].pipeline), "bilby")
# def test_fresh_is_true_no_finished_analyses(self):
# """Test that when no analyses are finished the job is not fresh"""
# self.event_setup()
# post_job = self.ledger.get_event("GW150914_095045")[0].postprocessing[0]
# self.assertEqual(post_job.fresh, True)
# # Next we need to test that asimov monitor will actually start a post-processing job correctly.
# @patch('asimov.condor.CondorJobList')
# @patch('asimov.pipelines.bilby.Bilby.detect_completion')
# @patch('asimov.pipelines.bilby.Bilby.collect_assets')
# @patch('asimov.pipelines.bilby.Bilby.samples')
# def test_analysis_postprocess_monitor_mocked_samples(self, mock_samples, mock_assets, mock_complete, mock_condor):
# """Check that asimov monitor attempts to start a post-processing job"""
# class MockJobList:
# def __init__(self):
# pass
# def refresh(self):
# return []
# mock_samples.return_value = ["/home/test/samples"]
# mock_complete.return_value = True
# mock_condor.return_value = MockJobList()
# self.event_setup()
# os.chdir(f"{self.cwd}/tests/tmp/project")
# runner = CliRunner()
# reload(asimov)
# reload(monitor)
# # Mark the analysis as running
# analysis = asimov.current_ledger.get_event("GW150914_095045")[0].analyses[-1]
# analysis.status = "finished"
# asimov.current_ledger.get_event("GW150914_095045")[0].analyses[-1].status = "finished"
# asimov.current_ledger.update_event(analysis.event)
# reload(asimov)
# reload(monitor)
# # Run the monitor
# result = runner.invoke(monitor.monitor, "--dry-run")
# os.chdir(self.cwd)
# self.assertTrue("--samples /home/test/samples /home/test/samples" in result.output)
# # Next we need to test that asimov monitor will actually start a post-processing job correctly.
# @patch('asimov.condor.CondorJobList')
# @patch('asimov.pipelines.bilby.Bilby.detect_completion')
# @patch('asimov.pipelines.bilby.Bilby.collect_assets')
# @patch('asimov.pipelines.bilby.Bilby.samples')
# def test_analysis_postprocess_only_ready(self, mock_samples, mock_assets, mock_complete, mock_condor):
# """Check that asimov monitor attempts to start a post-processing job only if one of the analyses is ready"""
# class MockJobList:
# def __init__(self):
# pass
# def refresh(self):
# return []
# mock_samples.return_value = ["/home/test/samples"]
# #mock_assets.return_value = {"samples": "/home/test/samples"}
# mock_complete.return_value = True
# mock_condor.return_value = MockJobList()
# self.event_setup()
# os.chdir(f"{self.cwd}/tests/tmp/project")
# runner = CliRunner()
# reload(asimov)
# reload(monitor)
# # Mark the analysis as running
# analysis = asimov.current_ledger.get_event("GW150914_095045")[0].analyses[-1]
# analysis.status = "finished"
# asimov.current_ledger.get_event("GW150914_095045")[0].analyses[-1].status = "finished"
# asimov.current_ledger.update_event(analysis.event)
# reload(asimov)
# reload(monitor)
# result = runner.invoke(monitor.monitor, "--dry-run")
# os.chdir(self.cwd)
# self.assertTrue("--samples /home/test/samples /home/test/samples" in result.output)
+7
-19
Metadata-Version: 2.1
Name: asimov
Version: 0.5.8
Version: 0.6.0
Summary: A Python package for managing and interacting with data analysis jobs.

@@ -49,6 +49,7 @@ Home-page: https://git.ligo.org/asimov/asimov

[![Anaconda-Server Badge](https://anaconda.org/conda-forge/ligo-asimov/badges/version.svg)](https://anaconda.org/conda-forge/ligo-asimov)
[![Anaconda-Server Badge](https://anaconda.org/conda-forge/ligo-asimov/badges/installer/conda.svg)](https://conda.anaconda.org/conda-forge)
[![coverage report](https://git.ligo.org/asimov/asimov/badges/infrastructure-updates/coverage.svg)](https://git.ligo.org/asimov/asimov/-/commits/infrastructure-updates)
[![coverage report](https://git.ligo.org/asimov/asimov/badges/master/coverage.svg)](https://git.ligo.org/asimov/asimov/-/commits/master)
[![conda-forge version](https://anaconda.org/conda-forge/asimov/badges/version.svg)](https://anaconda.org/conda-forge/asimov/)
![pypi](https://img.shields.io/pypi/v/asimov.svg)
![tests](https://git.ligo.org/asimov/asimov/badges/master/pipeline.svg)

@@ -59,16 +60,2 @@ Asimov was developed to manage and automate the parameter estimation analyses used by the LIGO, Virgo, and KAGRA collaborations to analyse gravitational wave signals, but it aims to provide tools which can be used for other workflows.

## Branch notes
These notes relate to in-development features on this branch, and what's described here is only expected to be relevant during development.
More generally useful documentation will move to the main documentation before moving to production.
### Starting the logging server
Run in ``asimov`` directory:
```
export FLASK_APP=server
flask run
```
## Features

@@ -112,3 +99,3 @@

```
$ conda install -c conda-forge ligo-asimov
$ conda install -c conda-forge asimov
```

@@ -122,2 +109,3 @@

## Get started

@@ -124,0 +112,0 @@

@@ -23,2 +23,3 @@ .gitlab-ci.yml

asimov/asimov.conf
asimov/auth.py
asimov/condor.py

@@ -28,3 +29,2 @@ asimov/database.py

asimov/git.py
asimov/gitlab.py
asimov/ini.py

@@ -68,2 +68,3 @@ asimov/ledger.py

asimov/configs/lalinference.ini
asimov/configs/pesummary.ini
asimov/configs/rift.ini

@@ -74,2 +75,3 @@ asimov/pipelines/__init__.py

asimov/pipelines/lalinference.py
asimov/pipelines/pesummary.py
asimov/pipelines/rift.py

@@ -87,2 +89,3 @@ asimov/report-theme/body.html

docs/source/building-docs.rst
docs/source/citing.rst
docs/source/clusters.rst

@@ -106,3 +109,2 @@ docs/source/code-of-conduct.rst

docs/source/textmark.png
docs/source/yamlformat.rst
docs/source/advanced/projects.rst

@@ -136,3 +138,2 @@ docs/source/api/asimov.rst

scripts/check-ifo.py
scripts/condor_tests.py
scripts/find_calibration.py

@@ -154,2 +155,3 @@ scripts/make_events.py

tests/test_pipelines_generic.py
tests/test_postprocessing.py
tests/test_review.py

@@ -170,8 +172,15 @@ tests/test_specific_events.py

tests/test_data/event_non_standard_settings.yaml
tests/test_data/events_blueprint.yaml
tests/test_data/lalinference_settings_nodeps.yaml
tests/test_data/project_analysis.yaml
tests/test_data/rift_settings_nodeps.yaml
tests/test_data/test_analyses.yaml
tests/test_data/test_complex_dag.yaml
tests/test_data/test_joint_analysis.yaml
tests/test_data/test_linear_dag.yaml
tests/test_data/test_postprocessing.yaml
tests/test_data/test_query_dag.yaml
tests/test_data/test_rift.yaml
tests/test_data/test_simple_dag.yaml
tests/test_data/testing_events.yaml
tests/test_data/testing_pe.yaml

@@ -178,0 +187,0 @@ tests/test_data/priors/bbh.prior.template

@@ -6,3 +6,2 @@ """

__author__ = """Daniel Williams"""

@@ -95,10 +94,8 @@ __email__ = "daniel.williams@ligo.org"

try:
if config.get("ledger", "engine") == "gitlab":
from .gitlab import GitlabLedger
current_ledger = GitlabLedger()
elif config.get("ledger", "engine") == "yamlfile":
if config.get("ledger", "engine") == "yamlfile":
from .ledger import YAMLLedger
current_ledger = YAMLLedger(config.get("ledger", "location"))
elif config.get("ledger", "engine") == "gitlab":
logger.error("The gitlab interface has been removed from v0.6 of asimov")
else:

@@ -105,0 +102,0 @@ current_ledger = None

@@ -22,25 +22,57 @@ """

on all events, including event and simple analyses.
This type of analysis is useful for defining a population analysis, for example.
This type of analysis is useful for defining a population analysis or analysing multiple events together, for example.
"""
import os
import configparser
from copy import deepcopy
from warnings import warn
import pathlib
from functools import reduce
import operator
from liquid import Liquid
from asimov import config
from asimov import config, logger, LOGGER_LEVEL
from asimov.pipelines import known_pipelines
from asimov.utils import update
from asimov.utils import update, diff_dict
from asimov.storage import Store
from .review import Review
from .ini import RunConfiguration
status_map = {
"cancelled": "light",
"finished": "success",
"uploaded": "success",
"processing": "primary",
"running": "primary",
"stuck": "warning",
"restart": "secondary",
"ready": "secondary",
"wait": "light",
"stop": "danger",
"manual": "light",
"stopped": "light",
}
review_map = {
"deprecated": "warning",
"none": "default",
"approved": "success",
"rejected": "danger",
"checked": "info",
}
class Analysis:
"""
The base class for all other types of analysis.
TODO: Add a check to make sure names cannot conflict
"""
meta = {}
meta_defaults = {"scheduler": {}, "sampler": {}, "review": {}, "likelihood": {}}
_reviews = Review()
@property

@@ -51,2 +83,7 @@ def review(self):

"""
if "review" in self.meta:
if len(self.meta["review"]) > 0:
self._reviews = Review.from_dict(self.meta["review"], production=self)
self.meta.pop("review")
return self._reviews

@@ -56,13 +93,68 @@ def _process_dependencies(self, needs):

Process the dependencies list for this production.
The dependencies can be provided either as the name of a production,
or a query against the analysis's attributes.
Parameters
----------
needs : list
A list of all the requirements
Returns
-------
list
A list of all the requirements processed for evaluation.
"""
return needs
all_requirements = []
for need in deepcopy(needs):
try:
requirement = need.split(":")
requirement = [requirement[0].split("."), requirement[1]]
except IndexError:
requirement = [["name"], need]
except AttributeError:
requirement = need
all_requirements.append(requirement)
return all_requirements
@property
def job_id(self):
"""
Get the ID number of this job as it resides in the scheduler.
"""
if "scheduler" in self.meta:
if "job id" in self.meta["scheduler"]:
return self.meta["scheduler"]["job id"]
else:
return None
@job_id.setter
def job_id(self, value):
if "scheduler" not in self.meta:
self.meta["scheduler"] = {}
self.meta["scheduler"]["job id"] = value
@property
def dependencies(self):
if "needs" in self.meta:
dependencies = self._process_dependencies(self.meta["needs"])
"""Return a list of analyses which this analysis depends upon."""
all_matches = []
if len(self._needs) == 0:
return []
else:
dependencies = None
return dependencies
matches = set({}) # set(self.event.analyses)
# matches.remove(self)
requirements = self._process_dependencies(deepcopy(self._needs))
for attribute, match in requirements:
filtered_analyses = list(
filter(
lambda x: x.matches_filter(attribute, match),
self.event.analyses,
)
)
matches = set.union(matches, set(filtered_analyses))
for analysis in matches:
all_matches.append(analysis.name)
return all_matches
@property

@@ -78,3 +170,3 @@ def priors(self):

def finished(self):
finished_states = ["uploaded"]
finished_states = ["finished", "processing", "uploaded"]
return self.status in finished_states

@@ -89,5 +181,54 @@

self.status_str = value.lower()
if self.event.issue_object is not None:
self.event.issue_object.update_data()
def matches_filter(self, attribute, match):
"""
Checks to see if this analysis matches a given filtering
criterion.
A variety of different attributes can be used for filtering.
The primary attributes are
- review status
- processing status
- name
In addition, any quantity contained in the analysis metadata
may be used by accessing it in the nested structure of this
data, with levels of the hierarchy separated with period
characters. For example, to access the waveform approximant
the correct attribute would be `waveform.approximant`.
Parameters
----------
attribute : str
The name of the attribute to be tested
match : str
The string to be matched against the value of the attribute
Returns
-------
bool
Returns True if this analysis matches the query,
otherwise returns False.
"""
is_review = False
is_status = False
is_name = False
in_meta = False
if attribute[0] == "review":
is_review = match.lower() == str(self.review.status).lower()
elif attribute[0] == "status":
is_status = match.lower() == self.status.lower()
elif attribute[0] == "name":
is_name = match == self.name
else:
try:
in_meta = reduce(operator.getitem, attribute, self.meta) == match
except KeyError:
in_meta = False
return is_name | in_meta | is_status | is_review
def results(self, filename=None, handle=False, hash=None):

@@ -111,9 +252,10 @@ store = Store(root=config.get("storage", "results_store"))

"""
Return the run directory for this event.
Return the run directory for this analysis.
"""
if "rundir" in self.meta:
return self.meta["rundir"]
return os.path.abspath(self.meta["rundir"])
elif "working directory" in self.subject.meta:
value = os.path.join(self.subject.meta["working directory"], self.name)
self.meta["rundir"] = value
return os.path.abspath(self.meta["rundir"])
# TODO: Make sure this is saved back to the ledger

@@ -123,4 +265,34 @@ else:

def make_config(self, filename, template_directory=None):
@rundir.setter
def rundir(self, value):
"""
Set the run directory.
"""
if "rundir" not in self.meta:
self.meta["rundir"] = value
else:
self.meta["rundir"] = value
def get_meta(self, key):
"""
Get the value of a metadata attribute, or return None if it doesn't
exist.
"""
if key in self.meta:
return self.meta[key]
else:
return None
def set_meta(self, key, value):
"""
Set a metadata attribute which doesn't currently exist.
"""
if key not in self.meta:
self.meta[key] = value
self.event.ledger.update_event(self.event)
else:
raise ValueError
def make_config(self, filename, template_directory=None, dryrun=False):
"""
Make the configuration file for this production.

@@ -139,7 +311,7 @@

template = f"{self.meta['template']}.ini"
else:
template = f"{self.pipeline}.ini"
pipeline = known_pipelines[self.pipeline]
pipeline = self.pipeline
try:

@@ -158,8 +330,131 @@ template_directory = config.get("templating", "directory")

liq = Liquid(template_file)
rendered = liq.render(production=self, config=config)
rendered = liq.render(production=self, analysis=self, config=config)
with open(filename, "w") as output_file:
output_file.write(rendered)
def build_report(self):
if self.pipeline:
self.pipeline.build_report()
def html(self):
"""
An HTML representation of this production.
"""
production = self
card = ""
card += f"<div class='asimov-analysis asimov-analysis-{self.status}'>"
card += f"<h4>{self.name}"
if self.comment:
card += (
f""" <small class="asimov-comment text-muted">{self.comment}</small>"""
)
card += "</h4>"
if self.status:
card += f"""<p class="asimov-status">
<span class="badge badge-pill badge-{status_map[self.status]}">{self.status}</span>
</p>"""
if self.pipeline:
card += f"""<p class="asimov-pipeline-name">{self.pipeline.name}</p>"""
if self.pipeline:
# self.pipeline.collect_pages()
card += self.pipeline.html()
if self.rundir:
card += f"""<p class="asimov-rundir"><code>{production.rundir}</code></p>"""
else:
card += """&nbsp;"""
if "approximant" in production.meta:
card += f"""<p class="asimov-attribute">Waveform approximant:
<span class="asimov-approximant">{production.meta['approximant']}</span>
</p>"""
card += """&nbsp;"""
card += """</div>"""
if len(self.review) > 0:
for review in self.review:
card += review.html()
return card
def to_dict(self, event=True):
"""
Return this production as a dictionary.
Parameters
----------
event : bool
If set to True the output is designed to be included nested within an event.
The event name is not included in the representation, and the production name is provided as a key.
"""
dictionary = deepcopy(self.meta)
if not event:
dictionary["event"] = self.event.name
dictionary["name"] = self.name
if isinstance(self.pipeline, str):
dictionary["pipeline"] = self.pipeline
else:
dictionary["pipeline"] = self.pipeline.name.lower()
dictionary["comment"] = self.comment
if self.review:
dictionary["review"] = self.review.to_dicts()
dictionary["needs"] = self._needs # self.dependencies
if "data" in self.meta:
dictionary["data"] = self.meta["data"]
if "likelihood" in self.meta:
dictionary["likelihood"] = self.meta["likelihood"]
if "quality" in self.meta:
dictionary["quality"] = self.meta["quality"]
if "priors" in self.meta:
dictionary["priors"] = self.meta["priors"]
if "waveform" in self.meta:
dictionary["waveform"] = self.meta["waveform"]
for key, value in self.meta.items():
dictionary[key] = value
dictionary["status"] = self.status
dictionary["job id"] = self.job_id
# Remove duplicates of pipeline defaults
if self.pipeline.name.lower() in self.event.ledger.data["pipelines"]:
defaults = deepcopy(
self.event.ledger.data["pipelines"][self.pipeline.name.lower()]
)
else:
defaults = {}
# if "postprocessing" in self.event.ledger.data:
# defaults["postprocessing"] = deepcopy(
# self.event.ledger.data["postprocessing"]
# )
defaults = update(defaults, deepcopy(self.event.meta))
dictionary = diff_dict(defaults, dictionary)
if "repository" in self.meta:
dictionary["repository"] = self.repository.url
if "ledger" in dictionary:
dictionary.pop("ledger")
if "pipelines" in dictionary:
dictionary.pop("pipelines")
if "productions" in dictionary:
dictionary.pop("productions")
if not event:
output = dictionary
else:
output = {self.name: dictionary}
return output
class SimpleAnalysis(Analysis):

@@ -171,5 +466,22 @@ """

def __init__(self, subject, name, pipeline, status=None, comment=None, **kwargs):
self.ledger = kwargs.get("ledger", None)
self.event = self.subject = subject
self.name = name
pathlib.Path(
os.path.join(config.get("logging", "directory"), self.event.name, name)
).mkdir(parents=True, exist_ok=True)
self.logger = logger.getChild("analysis").getChild(
f"{self.event.name}/{self.name}"
)
self.logger.setLevel(LOGGER_LEVEL)
# fh = logging.FileHandler(logfile)
# formatter = logging.Formatter("%(asctime)s - %(message)s", "%Y-%m-%d %H:%M:%S")
# fh.setFormatter(formatter)
# self.logger.addHandler(fh)
if status:

@@ -179,8 +491,70 @@ self.status_str = status.lower()

self.status_str = "none"
self.pipeline = pipeline.lower()
self.comment = comment
self.meta = deepcopy(self.subject.meta)
self.meta = update(self.meta, kwargs)
self.meta = deepcopy(self.meta_defaults)
# Start by adding pipeline defaults
if "pipelines" in self.event.ledger.data:
if pipeline in self.event.ledger.data["pipelines"]:
self.meta = update(
self.meta, deepcopy(self.event.ledger.data["pipelines"][pipeline])
)
if "postprocessing" in self.event.ledger.data:
self.meta["postprocessing"] = deepcopy(
self.event.ledger.data["postprocessing"]
)
# self.meta["pipeline"] = pipeline
# Update with the subject defaults
self.meta = update(self.meta, deepcopy(self.subject.meta))
if "productions" in self.meta:
self.meta.pop("productions")
self.meta = update(self.meta, deepcopy(kwargs))
self.pipeline = pipeline.lower()
self.pipeline = known_pipelines[pipeline.lower()](self)
if "needs" in self.meta:
self._needs = self.meta.pop("needs")
else:
self._needs = []
self.comment = kwargs.get("comment", None)
def _previous_assets(self):
assets = {}
if self.dependencies:
productions = {}
for production in self.event.productions:
productions[production.name] = production
for previous_job in self.dependencies:
assets.update(productions[previous_job].pipeline.collect_assets())
return assets
@classmethod
def from_dict(cls, parameters, subject=None, ledger=None):
parameters = deepcopy(parameters)
# Check that pars is a dictionary
if not {"pipeline", "name"} <= parameters.keys():
raise ValueError(
f"Some of the required parameters are missing."
f"Found {parameters.keys()}"
)
if "status" not in parameters:
parameters["status"] = "ready"
if "event" in parameters:
parameters.pop("event")
pipeline = parameters.pop("pipeline")
name = parameters.pop("name")
if "comment" not in parameters:
parameters["comment"] = None
if "ledger" in parameters:
ledger = parameters.pop("ledger")
return cls(
name=name, pipeline=pipeline, ledger=ledger, subject=subject, **parameters
)
class SubjectAnalysis(Analysis):

@@ -191,5 +565,124 @@ """

pass
def __init__(self, subject, name, pipeline, status=None, comment=None, **kwargs):
self.event = self.subject = subject
self.name = name
self.category = "subject_analyses"
self.logger = logger.getChild("event").getChild(f"{self.name}")
self.logger.setLevel(LOGGER_LEVEL)
if status:
self.status_str = status.lower()
else:
self.status_str = "none"
self.meta = deepcopy(self.meta_defaults)
self.meta = update(self.meta, deepcopy(self.subject.meta))
self.meta = update(self.meta, deepcopy(kwargs))
self._analysis_spec = self.meta.get("needs")
if self._analysis_spec:
requirements = self._process_dependencies(self._analysis_spec)
self.analyses = []
for attribute, match in requirements:
matches = set(self.subject.analyses)
filtered_analyses = list(
filter(
lambda x: x.matches_filter(attribute, match), subject.analyses
)
)
matches = set.intersection(matches, set(filtered_analyses))
for analysis in matches:
self.analyses.append(analysis)
self.productions = self.analyses
if "needs" in self.meta:
self.meta.pop("needs")
self.pipeline = pipeline.lower()
self.pipeline = known_pipelines[pipeline.lower()](self)
if "needs" in self.meta:
self._needs = self.meta.pop("needs")
else:
self._needs = []
if "comment" in kwargs:
self.comment = kwargs["comment"]
else:
self.comment = None
def to_dict(self, event=True):
"""
Return this production as a dictionary.
Parameters
----------
event : bool
If set to True the output is designed to be included nested within an event.
The event name is not included in the representation, and the production name is provided as a key.
"""
dictionary = {}
dictionary = update(dictionary, self.meta)
if not event:
dictionary["event"] = self.event.name
dictionary["name"] = self.name
dictionary["status"] = self.status
if isinstance(self.pipeline, str):
dictionary["pipeline"] = self.pipeline
else:
dictionary["pipeline"] = self.pipeline.name.lower()
dictionary["comment"] = self.comment
dictionary["analyses"] = self._analysis_spec
if self.review:
dictionary["review"] = self.review.to_dicts()
dictionary["needs"] = deepcopy(self._needs) # self.dependencies
if "quality" in self.meta:
dictionary["quality"] = self.meta["quality"]
if "priors" in self.meta:
dictionary["priors"] = self.meta["priors"]
for key, value in self.meta.items():
dictionary[key] = value
if "repository" in self.meta:
dictionary["repository"] = self.repository.url
if "ledger" in dictionary:
dictionary.pop("ledger")
if "pipelines" in dictionary:
dictionary.pop("pipelines")
if not event:
output = dictionary
else:
output = {self.name: dictionary}
return output
@classmethod
def from_dict(cls, parameters, subject):
parameters = deepcopy(parameters)
# Check that pars is a dictionary
if not {"pipeline", "name"} <= parameters.keys():
raise ValueError(
f"Some of the required parameters are missing."
f"Found {parameters.keys()}"
)
if "status" not in parameters:
parameters["status"] = "ready"
if "event" in parameters:
parameters.pop("event")
pipeline = parameters.pop("pipeline")
name = parameters.pop("name")
if "comment" not in parameters:
parameters["comment"] = None
return cls(subject, name, pipeline, **parameters)
class ProjectAnalysis(Analysis):

@@ -200,12 +693,222 @@ """

def __init__(
self, subjects, analyses, name, pipeline, status=None, comment=None, **kwargs
):
meta_defaults = {"scheduler": {}, "sampler": {}, "review": {}}
def __init__(self, name, pipeline, ledger=None, **kwargs):
""" """
super().__init__()
self.name = name
self.logger = logger.getChild("project analyses").getChild(f"{self.name}")
self.logger.setLevel(LOGGER_LEVEL)
self.ledger = ledger
self.subjects = subjects
self.analyses = analyses
self._subjects = kwargs["subjects"]
self._events = self._subjects
if "analyses" in kwargs.keys():
self._analysis_spec = kwargs["analyses"]
else:
self._analysis_spec = {}
requirements = self._process_dependencies(self._analysis_spec)
self.analyses = []
# set up the working directory
if "working_directory" in kwargs:
self.work_dir = kwargs["working_directory"]
else:
self.work_dir = os.path.join("working", "project analyses", f"{self.name}")
if not os.path.exists(self.work_dir):
os.makedirs(self.work_dir)
self.repository = None
self._subject_obs = []
for subject in self.subjects:
if self._analysis_spec:
matches = set(subject.analyses)
for attribute, match in requirements:
filtered_analyses = list(
filter(
lambda x: x.matches_filter(attribute, match),
subject.analyses,
)
)
matches = set.intersection(matches, set(filtered_analyses))
for analysis in matches:
self.analyses.append(analysis)
if "status" in kwargs:
self.status_str = kwargs["status"].lower()
else:
self.status_str = "none"
self.pipeline = pipeline # .lower()
if isinstance(pipeline, str):
# try:
self.pipeline = known_pipelines[str(pipeline).lower()](self)
# except KeyError:
self.logger.warning(f"The pipeline {pipeline} could not be found.")
if "needs" in self.meta:
self._needs = self.meta.pop("needs")
else:
self._needs = []
if "comment" in kwargs:
self.comment = kwargs["comment"]
else:
self.comment = None
self.meta = deepcopy(self.meta_defaults)
# Start by adding pipeline defaults
if "pipelines" in self.ledger.data:
if pipeline in self.ledger.data["pipelines"]:
self.meta = update(
self.meta, deepcopy(self.ledger.data["pipelines"][pipeline])
)
self.meta = update(self.meta, deepcopy(kwargs))
def __repr__(self):
"""
A human-friendly representation of this project analysis.
Parameters
----------
None
"""
return f"<Project analysis for {len(self.events)} events and {len(self.analyses)} analyses>"
@property
def subjects(self):
"""Return a list of subjects for this project analysis."""
return [self.ledger.get_event(subject)[0] for subject in self._subjects]
@property
def events(self):
return self.subjects()
@classmethod
def from_dict(cls, parameters, ledger=None):
parameters = deepcopy(parameters)
# Check that pars is a dictionary
if not {"pipeline", "name"} <= parameters.keys():
raise ValueError(
f"Some of the required parameters are missing. "
f"Found {parameters.keys()}"
)
if "status" not in parameters:
parameters["status"] = "ready"
if "event" in parameters:
parameters.pop("event")
pipeline = parameters.pop("pipeline")
name = parameters.pop("name")
if "comment" not in parameters:
parameters["comment"] = None
if "analyses" not in parameters:
parameters["analyses"] = []
return cls(name=name, pipeline=pipeline, ledger=ledger, **parameters)
@property
def dependencies(self):
"""Return a list of analyses which this analysis depends upon."""
all_matches = []
if len(self._needs) == 0:
return []
else:
matches = set({}) # set(self.event.analyses)
# matches.remove(self)
requirements = self._process_dependencies(deepcopy(self._needs))
analyses = []
for subject in self.subjects:
sub = self.ledger.get_event(subject)[0]
self._subject_obs.append(sub)
for analysis in sub.analyses:
analyses.append(analysis)
for attribute, match in requirements:
filtered_analyses = list(
filter(
lambda x: x.matches_filter(attribute, match),
analyses,
)
)
matches = set.union(matches, set(filtered_analyses))
for analysis in matches:
all_matches.append(analysis.name)
return all_matches
def to_dict(self):
"""
Return this project production as a dictionary.
Parameters
----------
event : bool
If set to True the output is designed to be included nested within an event.
The event name is not included in the representation, and the production name is provided as a key.
"""
dictionary = {}
dictionary = update(dictionary, self.meta)
dictionary["name"] = self.name
dictionary["status"] = self.status
if isinstance(self.pipeline, str):
dictionary["pipeline"] = self.pipeline
else:
dictionary["pipeline"] = self.pipeline.name.lower()
dictionary["comment"] = self.comment
if self.review:
dictionary["review"] = self.review.copy() # .to_dicts()
dictionary["needs"] = self.dependencies
if "quality" in self.meta:
dictionary["quality"] = self.meta["quality"]
if "priors" in self.meta:
dictionary["priors"] = self.meta["priors"]
for key, value in self.meta.items():
dictionary[key] = value
if "repository" in self.meta:
dictionary["repository"] = self.repository.url
if "ledger" in dictionary:
dictionary.pop("ledger")
if "pipelines" in dictionary:
dictionary.pop("pipelines")
dictionary["subjects"] = self._subjects
dictionary["analyses"] = self._analysis_spec
output = dictionary
return output
@property
def rundir(self):
"""
Returns the rundir for this event
"""
if "rundir" in self.meta:
return self.meta["rundir"]
elif self.work_dir:
self.meta["rundir"] = self.work_dir
return self.meta["rundir"]
else:
return None
@rundir.setter
def rundir(self, value):
"""
Set the run directory.
"""
if "rundir" not in self.meta:
self.meta["rundir"] = value
else:
self.meta["rundir"] = value
class GravitationalWaveTransient(SimpleAnalysis):

@@ -216,6 +919,91 @@ """

def __init__(self, subject, name, pipeline, status=None, comment=None, **kwargs):
super().init(subject, name, pipeline, status=None, comment=None, **kwargs)
def __init__(self, subject, name, pipeline, **kwargs):
"""
A specific analysis on a GW transient event.
Parameters
----------
subject : `asimov.event`
The event this analysis is running on.
name : str
The name of this analysis.
status : str
The status of this analysis.
pipeline : str
This analysis's pipeline.
comment : str
A comment on this analysis.
"""
self.category = config.get("general", "calibration_directory")
super().__init__(subject, name, pipeline, **kwargs)
self._checks()
self.psds = self._collect_psds()
self.xml_psds = self._collect_psds(format="xml")
if "cip jobs" in self.meta:
# TODO: Should probably raise a deprecation warning
self.meta["sampler"]["cip jobs"] = self.meta["cip jobs"]
if "scheduler" not in self.meta:
self.meta["scheduler"] = {}
if "likelihood" not in self.meta:
self.meta["likelihood"] = {}
if "marginalization" not in self.meta["likelihood"]:
self.meta["likelihood"]["marginalization"] = {}
if "data" not in self.meta:
self.meta["data"] = {}
if "data files" not in self.meta["data"]:
self.meta["data"]["data files"] = {}
if "lmax" in self.meta:
# TODO: Should probably raise a deprecation warning
self.meta["sampler"]["lmax"] = self.meta["lmax"]
# Check that the upper frequency is included, otherwise calculate it
if "quality" in self.meta:
if ("maximum frequency" not in self.meta["quality"]) and (
"sample rate" in self.meta["likelihood"]
):
self.meta["quality"]["maximum frequency"] = {}
# Account for the PSD roll-off with the 0.875 factor
for ifo in self.meta["interferometers"]:
self.meta["quality"]["maximum frequency"][ifo] = int(
0.875 * self.meta["likelihood"]["sample rate"] / 2
)
if ("quality" in self.meta) and ("event time" in self.meta):
if ("segment start" not in self.meta["quality"]) and (
"segment length" in self.meta["data"]
):
self.meta["likelihood"]["segment start"] = (
self.meta["event time"] - self.meta["data"]["segment length"] + 2
)
# self.event.meta['likelihood']['segment start'] = self.meta['data']['segment start']
# Update waveform data
if "waveform" not in self.meta:
self.logger.info("Didn't find waveform information in the metadata")
self.meta["waveform"] = {}
if "approximant" in self.meta:
self.logger.warning(
"Found deprecated approximant information, "
"moving to waveform area of ledger"
)
approximant = self.meta.pop("approximant")
self.meta["waveform"]["approximant"] = approximant
if "reference frequency" in self.meta["likelihood"]:
self.logger.warning(
"Found deprecated ref freq information, "
"moving to waveform area of ledger"
)
ref_freq = self.meta["likelihood"].pop("reference frequency")
self.meta["waveform"]["reference frequency"] = ref_freq
# Gather the PSDs for the job
self.psds = self._collect_psds()
def _checks(self):

@@ -238,3 +1026,3 @@ """

):
warn(
self.logger.warn(
"The upper-cutoff frequency is not equal to 0.875 times the Nyquist frequency."

@@ -261,21 +1049,2 @@ )

@property
def psds(self):
"""
Return the PSDs stored for this transient event.
"""
if "psds" in self.meta and self.quality:
if self.quality["sample-rate"] in self.meta["psds"]:
self.psds = self.meta["psds"][self.quality["sample-rate"]]
else:
self.psds = {}
else:
self.psds = {}
for ifo, psd in self.psds.items():
if self.subject.repository:
self.psds[ifo] = os.path.join(self.subject.repository.directory, psd)
else:
self.psds[ifo] = psd
def get_timefile(self):

@@ -314,4 +1083,77 @@ """

def get_configuration(self):
"""
Get the configuration file contents for this event.
"""
if "ini" in self.meta:
ini_loc = self.meta["ini"]
else:
# We'll need to search the repository for it.
try:
ini_loc = self.subject.repository.find_prods(self.name, self.category)[
0
]
if not os.path.exists(ini_loc):
raise ValueError("Could not open the ini file.")
except IndexError:
raise ValueError("Could not open the ini file.")
try:
ini = RunConfiguration(ini_loc)
except ValueError:
raise ValueError("Could not open the ini file")
except configparser.MissingSectionHeaderError:
raise ValueError("This isn't a valid ini file")
class Production(SimpleAnalysis):
pass
return ini
def _collect_psds(self, format="ascii"):
"""
Collect the required psds for this production.
"""
psds = {}
# If the PSDs are specifically provided in the ledger,
# use those.
if format == "ascii":
keyword = "psds"
elif format == "xml":
keyword = "xml psds"
else:
raise ValueError(f"This PSD format ({format}) is not recognised.")
if keyword in self.meta:
# if self.meta["likelihood"]["sample rate"] in self.meta[keyword]:
psds = self.meta[keyword] # [self.meta["likelihood"]["sample rate"]]
# First look through the list of the job's dependencies
# to see if they're provided by a job there.
elif self.dependencies:
productions = {}
for production in self.event.productions:
productions[production.name] = production
for previous_job in self.dependencies:
try:
# Check if the job provides PSDs as an asset and were produced with compatible settings
if keyword in productions[previous_job].pipeline.collect_assets():
if self._check_compatible(productions[previous_job]):
psds = productions[previous_job].pipeline.collect_assets()[
keyword
]
break
else:
self.logger.info(
f"The PSDs from {previous_job} are not compatible with this job."
)
else:
psds = {}
except Exception:
psds = {}
# Otherwise return no PSDs
else:
psds = {}
for ifo, psd in psds.items():
self.logger.debug(f"PSD-{ifo}: {psd}")
return psds

@@ -18,2 +18,5 @@ [general]

[asimov start]
accounting = ligo.dev.o4.cbc.pe.lalinference
[pipelines]

@@ -20,0 +23,0 @@ environment = /cvmfs/oasis.opensciencegrid.org/ligo/sw/conda/envs/igwn-py39

@@ -12,3 +12,5 @@ """

import asimov.event
from asimov.analysis import ProjectAnalysis
from asimov import current_ledger as ledger
from asimov.ledger import Ledger
from asimov.utils import update

@@ -28,3 +30,3 @@

def apply_page(file, event, ledger=ledger):
def apply_page(file, event=None, ledger=ledger):
if file[:4] == "http":

@@ -46,3 +48,2 @@ r = requests.get(file)

for document in quick_parse:
if document["kind"] == "event":

@@ -77,6 +78,7 @@ logger.info("Found an event")

logger.exception(e)
production = asimov.event.Production.from_dict(document, event=event_o)
production = asimov.event.Production.from_dict(
parameters=document, subject=event_o, ledger=ledger
)
try:
event_o.add_production(production)
ledger.update_event(event_o)
ledger.add_analysis(production, event=event_o)
click.echo(

@@ -95,2 +97,74 @@ click.style("●", fg="green")

elif document["kind"].lower() == "postprocessing":
# Handle a project analysis
logger.info("Found a postprocessing description")
document.pop("kind")
if event:
event_s = event
if event:
try:
event_o = ledger.get_event(event_s)[0]
level = event_o
except KeyError as e:
click.echo(
click.style("●", fg="red")
+ f" Could not apply postprocessing, couldn't find the event {event}"
)
logger.exception(e)
else:
level = ledger
try:
if document["name"] in level.data.get("postprocessing stages", {}):
click.echo(
click.style("●", fg="red")
+ f" Could not apply postprocessing, as {document['name']} is already in the ledger."
)
logger.error(
f" Could not apply postprocessing, as {document['name']} is already in the ledger."
)
else:
if "postprocessing stages" not in level.data:
level.data["postprocessing stages"] = {}
if isinstance(level, asimov.event.Event):
level.meta["postprocessing stages"][document["name"]] = document
elif isinstance(level, Ledger):
level.data["postprocessing stages"][document["name"]] = document
level.name = "the project"
ledger.save()
click.echo(
click.style("●", fg="green")
+ f" Successfully added {document['name']} to {level.name}."
)
logger.info(f"Added {document['name']}")
except ValueError as e:
click.echo(
click.style("●", fg="red")
+ f" Could not apply {document['name']} to project as "
+ "a post-process already exists with this name"
)
logger.exception(e)
elif document["kind"].lower() == "projectanalysis":
# Handle a project analysis
logger.info("Found a project analysis")
document.pop("kind")
analysis = ProjectAnalysis.from_dict(document, ledger=ledger)
try:
ledger.add_analysis(analysis)
click.echo(
click.style("●", fg="green")
+ f" Successfully added {analysis.name} to this project."
)
ledger.save()
logger.info(f"Added {analysis.name}")
except ValueError as e:
click.echo(
click.style("●", fg="red")
+ f" Could not apply {analysis.name} to project as "
+ "an analysis already exists with this name"
)
logger.exception(e)
elif document["kind"] == "configuration":

@@ -112,6 +186,3 @@ logger.info("Found configurations")

hook.load()(ledger).run(event)
click.echo(
click.style("●", fg="green")
+ f"{event} has been applied."
)
click.echo(click.style("●", fg="green") + f"{event} has been applied.")

@@ -121,4 +192,3 @@ break

click.echo(
click.style("●", fg="red")
+ f"No hook found matching {hookname}. "
click.style("●", fg="red") + f"No hook found matching {hookname}. "
f"Installed hooks are {', '.join(discovered_hooks.names)}"

@@ -125,0 +195,0 @@ )

@@ -15,4 +15,4 @@ import ast

from asimov import current_ledger as ledger
from asimov.utils import find_calibrations, update
from asimov.event import DescriptionException, Event
from asimov.utils import update
from asimov.event import Event

@@ -136,3 +136,3 @@

@click.argument("delete")
@click.argument("event", default=None)
@event.command()

@@ -146,27 +146,2 @@ def delete(event):

# @click.argument("event")
# @click.option("--yaml", "yaml", default=None)
# @click.option("--ini", "ini", default=None)
# @event.command()
# def populate(event, yaml, ini):
# """
# Populate an event ledger with data from ini or yaml files.
# """
# event = ledger.get_event(event)
# # Check the calibration files for this event
# click.echo("Check the calibration.")
# click.echo(event.name)
# calibration(event=event.name)
# # Check the IFOs for this event
# click.echo("Check the IFO list")
# try:
# checkifo(event.name)
# except:
# pass
# if yaml:
# add_data(event.name, yaml)
@click.argument("event", default=None)

@@ -284,33 +259,10 @@ @click.option("--json", "json_data", default=None)

def calibration(event, calibration):
"""
Add calibration files to an event from a filepath.
"""
event = ledger.get_event(event)[0]
try:
event._check_calibration()
except DescriptionException:
print(event.name)
time = event.meta["event time"]
if not calibration[0]:
try:
calibrations = find_calibrations(time)
except ValueError:
calibrations = {}
else:
calibrations = {}
for cal in calibration:
calibrations[cal.split(":")[0]] = cal.split(":")[1]
print(calibrations)
update(event.meta["data"]["calibration"], calibrations)
ledger.update_event(event)
# @click.argument("data")
# @click.argument("event")
# @event.command()
# def load(event, data):
# event = ledger.get_event(event)
# with open(data, "r") as datafile:
# data = yaml.safe_load(datafile.read())
# event.meta = update(event.meta, data)
# ledger.update_event(event)
calibrations = {}
for cal in calibration:
calibrations[cal.split(":")[0]] = cal.split(":")[1]
update(event.meta["data"]["calibration"], calibrations)
ledger.update_event(event)
"""
Olivaw management commands
"""
import os

@@ -15,2 +16,3 @@ import pathlib

from asimov.pipeline import PipelineException
from asimov.git import EventRepo

@@ -46,2 +48,28 @@

logger.setLevel(LOGGER_LEVEL)
for analysis in ledger.project_analyses:
if analysis.status in {"ready"}:
# Need to ensure a directory exists for these!
subj_string = "_".join([f"{subject}" for subject in analysis._subjects])
project_analysis_dir = os.path.join(
"checkouts", "project-analyses", subj_string
)
if not os.path.exists(project_analysis_dir):
os.makedirs(project_analysis_dir)
click.echo(
click.style("●", fg="green")
+ f" Building project analysis {analysis.name}"
)
analysis.pipeline.before_config()
analysis.make_config(
filename=os.path.join(project_analysis_dir, f"{analysis.name}.ini"),
dryrun=dryrun,
)
click.echo(
click.style("●", fg="green")
+ f" Created configuration for {analysis.name}"
)
for event in ledger.get_event(event):

@@ -143,2 +171,206 @@

logger.setLevel(LOGGER_LEVEL)
# this should add the required repository field to the project analysis
# with the correct type
# FIXME: for now, this only accountd for the case where we have multiple analysis
# done but not yet several productions for the same event and pipelines. This would
# require extra checks when adding to the dictionary
interest_dict = {}
for analysis in ledger.project_analyses:
if analysis.pipeline.name not in interest_dict.keys():
interest_dict[analysis.pipeline.name] = []
if "interest status" in analysis.meta.keys():
interest_dict[analysis.pipeline.name].append(
{
"interest status": analysis.meta["interest status"],
"subjects": set(analysis.subjects),
}
)
for analysis in ledger.project_analyses:
# need to change the logic of analysis set up as to account for
# dependencies
to_analyse = True
extra_prio = False
if analysis.status not in {"ready"}:
to_analyse = False
elif analysis._needs:
# check if the parent jobs are said to be interesting
# this does not account for the old logic in the project analyses
interested_pipelines = 0
for old_analysis in analysis._needs:
if old_analysis in interest_dict.keys():
if len(interest_dict[old_analysis]) > 0:
for cases in interest_dict[old_analysis]:
if cases["interest status"] is True and cases[
"subjects"
] == set(analysis.subjects):
interested_pipelines += 1
if interested_pipelines < int(analysis.meta["needs settings"]["minimum"]):
to_analyse = False
# check if we need to account for extra priority comming from atlenstics
if "extra priority" in analysis.meta["needs settings"].keys():
if (
analysis.meta["needs settings"]["extra priority"]
== "atlenstics_compatibility"
):
# we need to verify if extra priority is needed
if "atlenstics_compatibility" in interest_dict.keys():
# need to add a check on the length of the list to avoid some failures
if len(interest_dict["atlenstics_compatibility"]) > 0:
if (
interest_dict["atlenstics_compatibility"][0][
"interest status"
]
is True
):
extra_prio = True
running_and_requiring_priority_check = False
if analysis.status in {"running"} and analysis._needs:
if "needs settings" in analysis.meta.keys():
if "logic" in analysis.meta["needs settings"]:
if analysis.meta["needs settings"]["logic"] == "add_priority":
running_and_requiring_priority_check = True
if to_analyse:
# Need to ensure a directory exists for these!
subjects = analysis._subjects
subj_string = "_".join([f"{subjects[i]}" for i in range(len(subjects))])
project_analysis_dir = os.path.join(
"checkouts",
"project-analyses",
subj_string,
)
if analysis.repository is None:
analysis.repository = EventRepo.create(project_analysis_dir)
else:
if isinstance(analysis.repository, str):
if (
"git@" in analysis.repository
or "https://" in analysis.repository
):
analysis.repository = EventRepo.from_url(
analysis.repository,
analysis.event.name,
directory=None,
update=update,
)
else:
analysis.repository = EventRepo.create(analysis.repository)
click.echo(
click.style("●", fg="green")
+ f" Submitting project analysis {analysis.name}"
)
pipe = analysis.pipeline
try:
pipe.build_dag(dryrun=dryrun)
except PipelineException as e:
logger.error(
"The pipeline failed to build a DAG file.",
)
logger.exception(e)
click.echo(
click.style("●", fg="red") + f" Failed to submit {analysis.name}"
)
except ValueError:
logger.info("Unable to submit an unbuilt project analysis")
click.echo(
click.style("●", fg="red")
+ f" Unable to submit {analysis.name} as it hasn't been built yet."
)
click.echo("Try running `asimov manage build` first.")
try:
pipe.submit_dag(dryrun=dryrun)
if not dryrun:
click.echo(
click.style("●", fg="green") + f" Submitted {analysis.name}"
)
analysis.status = "running"
ledger.update_analysis_in_project_analysis(analysis)
# directly add the extra priority related if needed
if extra_prio:
job_id = analysis.scheduler["job id"]
extra_prio = 20
condor.change_job_priority(job_id, extra_prio, use_old=False)
except PipelineException as e:
analysis.status = "stuck"
click.echo(
click.style("●", fg="red") + f" Unable to submit {analysis.name}"
)
logger.exception(e)
ledger.update_analysis_in_project_analysis(analysis)
ledger.save()
logger.error(
f"The pipeline failed to submit the DAG file to the cluster. {e}",
)
if not dryrun:
# Refresh the job list
job_list = condor.CondorJobList()
job_list.refresh()
# Update the ledger
ledger.save()
else:
click.echo(
click.style("●", fg="yellow")
+ f"Project analysis {analysis.name} not ready to submit"
)
# addition to see if we need to adjust the priority of a running job
if running_and_requiring_priority_check:
# enquire about the old priority
try:
current_prio = int(
condor.get_job_priority(analysis.meta["scheduler"]["job id"])
)
except TypeError:
# can happen when the job has done running
current_prio = 0
# calculate the priority it is expected to have
interested_pipelines = 0
for old_analysis in analysis._needs:
if old_analysis in interest_dict.keys():
if len(interest_dict[old_analysis]) > 0:
if interest_dict[old_analysis][-1]["interest status"] is True:
interested_pipelines += 1
if interested_pipelines < 2:
theoretical_prio = 0
else:
theoretical_prio = int((interested_pipelines - 2) * 10)
extra_prio = False
if "extra priority" in analysis.meta["needs settings"].keys():
if (
analysis.meta["needs settings"]["extra priority"]
== "atlenstics_compatibility"
):
# we need to verify if extra priority is needed
if "atlenstics_compatibility" in interest_dict.keys():
if len(interest_dict["atlenstics_compatibility"]) > 0:
if (
interest_dict["atlenstics_compatibility"][0][
"interest status"
]
is True
):
extra_prio = True
if extra_prio:
theoretical_prio += 20
# check if we currently have the correct priority or if an adaptation is needed
if theoretical_prio != current_prio:
logger.info(
f"Adjusting priority of {analysis.name} from {current_prio} to {theoretical_prio}"
)
condor.change_job_priority(
analysis.meta["scheduler"]["job id"],
theoretical_prio,
use_old=False,
)
for event in ledger.get_event(event):

@@ -184,3 +416,3 @@ ready_productions = event.get_all_latest()

logger.error(
"The pipeline failed to build a DAG file.",
"failed to build a DAG file.",
)

@@ -192,4 +424,3 @@ logger.exception(e)

)
except ValueError as e:
print("ERROR", e)
except ValueError:
logger.info("Unable to submit an unbuilt production")

@@ -257,3 +488,2 @@ click.echo(

click.echo("\t (No results available)")
# print(production.results())

@@ -296,2 +526,1 @@

pass
# print(production.results())
import shutil
import configparser
import sys
import traceback
import os
import sys
import click

@@ -34,2 +35,3 @@ from copy import deepcopy

"arguments": "monitor --chain",
"accounting_group": config.get("asimov start", "accounting"),
"output": os.path.join(".asimov", "asimov_cron.out"),

@@ -124,3 +126,249 @@ "on_exit_remove": "false",

# also check the analyses in the project analyses
for analysis in ledger.project_analyses:
click.secho(f"Subjects: {analysis.subjects}", bold=True)
if analysis.status.lower() in ACTIVE_STATES:
logger.debug(f"Available analyses: project_analyses/{analysis.name}")
click.echo(
"\t- "
+ click.style(f"{analysis.name}", bold=True)
+ click.style(f"[{analysis.pipeline}]", fg="green")
)
# ignore the analysis if it is set to ready as it has not been started yet
if analysis.status.lower() == "ready":
click.secho(f" \t ● {analysis.status.lower()}", fg="green")
logger.debug(f"Ready production: project_analyses/{analysis.name}")
continue
# check if there are jobs that need to be stopped
if analysis.status.lower() == "stop":
pipe = analysis.pipeline
logger.debug(f"Stop production project_analyses/{analysis.name}")
if not dry_run:
pipe.eject_job()
analysis.status = "stopped"
ledger.update_analysis_in_project_analysis(analysis)
click.secho(" \t Stopped", fg="red")
else:
click.echo("\t\t{analysis.name} --> stopped")
continue
# deal with the condor jobs
analysis_scheduler = analysis.meta["scheduler"].copy()
try:
if "job id" in analysis_scheduler:
if not dry_run:
if analysis_scheduler["job id"] in job_list.jobs:
job = job_list.jobs[analysis_scheduler["job id"]]
else:
job = None
else:
logger.debug(
f"Running analysis: {event}/{analysis.name}, cluster {analysis.job_id}"
)
click.echo("\t\tRunning under condor")
else:
raise ValueError
if not dry_run:
if job.status.lower() == "idle":
click.echo(
" \t "
+ click.style("●", "green")
+ f" {analysis.name} is in the queue (condor id: {analysis_scheduler['job id']})"
)
elif job.status.lower() == "running":
click.echo(
" \t "
+ click.style("●", "green")
+ f" {analysis.name} is running (condor id: {analysis_scheduler['job id']})"
)
if "profiling" not in analysis.meta:
analysis.meta["profiling"] = {}
if hasattr(analysis.pipeline, "while_running"):
analysis.pipeline.while_running()
analysis.status = "running"
ledger.update_analysis_in_project_analysis(analysis)
elif job.status.lower() == "completed":
pipe.after_completion()
click.echo(
" \t "
+ click.style("●", "green")
+ f" {analysis.name} has finished and post-processing has been started"
)
job_list.refresh()
elif job.status.lower() == "held":
click.echo(
" \t "
+ click.style("●", "yellow")
+ f" {analysis.name} is held on the scheduler"
+ f" (condor id: {analysis_scheduler['job id']})"
)
analysis.status = "stuck"
ledger.update_analysis_in_project_analysis(analysis)
else:
continue
except (ValueError, AttributeError):
if analysis.pipeline:
pipe = analysis.pipeline
if analysis.status.lower() == "stop":
pipe.eject_job()
analysis.status = "stopped"
ledger.update_analysis_in_project_analysis(analysis)
click.echo(
" \t "
+ click.style("●", "red")
+ f" {analysis.name} has been stopped"
)
job_list.refresh()
elif analysis.status.lower() == "finished":
pipe.after_completion()
click.echo(
" \t "
+ click.style("●", "green")
+ f" {analysis.name} has finished and post-processing has been started"
)
job_list.refresh()
elif analysis.status.lower() == "processing":
if pipe.detect_completion_processing():
try:
pipe.after_processing()
click.echo(
" \t "
+ click.style("●", "green")
+ f" {analysis.name} has been finalised and stored"
)
except ValueError as e:
click.echo(e)
else:
click.echo(
" \t "
+ click.style("●", "green")
+ f" {analysis.name} has finished and post-processing"
+ f" is stuck ({analysis_scheduler['job id']})"
)
elif (
pipe.detect_completion()
and analysis.status.lower() == "processing"
):
click.echo(
" \t "
+ click.style("●", "green")
+ f" {analysis.name} has finished and post-processing is running"
)
elif (
pipe.detect_completion()
and analysis.status.lower() == "running"
):
if "profiling" not in analysis.meta:
analysis.meta["profiling"] = {}
try:
config.get("condor", "scheduler")
analysis.meta["profiling"] = condor.collect_history(
analysis_scheduler["job id"]
)
analysis_scheduler["job id"] = None
ledger.update_analysis_in_project_analysis(analysis)
except (
configparser.NoOptionError,
configparser.NoSectionError,
):
logger.warning(
"Could not collect condor profiling data as no "
+ "scheduler was specified in the config file."
)
except ValueError as e:
logger.error("Could not collect condor profiling data.")
logger.exception(e)
pass
analysis.status = "finished"
ledger.update_analysis_in_project_analysis(analysis)
pipe.after_completion()
click.secho(
f" \t ● {analysis.name} - Completion detected",
fg="green",
)
job_list.refresh()
else:
# job may have been evicted from the clusters
click.echo(
" \t "
+ click.style("●", "yellow")
+ f" {analysis.name} is stuck; attempting a rescue"
)
try:
pipe.resurrect()
except (
Exception
): # Sorry, but there are many ways the above command can fail
analysis.status = "stuck"
click.echo(
" \t "
+ click.style("●", "red")
+ f" {analysis.name} is stuck; automatic rescue was not possible"
)
ledger.update_analysis_in_project_analysis(analysis)
if analysis.status == "stuck":
click.echo(
" \t "
+ click.style("●", "yellow")
+ f" {analysis.name} is stuck"
)
ledger.update_analysis_in_project_analysis(analysis)
ledger.save()
if chain:
ctx.invoke(report.html)
all_analyses = set(ledger.project_analyses)
complete = {
analysis
for analysis in ledger.project_analyses
if analysis.status in {"finished", "uploaded"}
}
others = all_analyses - complete
if len(others) > 0:
click.echo(
"There are also these analyses waiting for other analyses to complete:"
)
for analysis in others:
needs = ", ".join(analysis._needs)
click.echo(f"\t{analysis.name} which needs {needs}")
# need to check for post monitor hooks for each of the analyses
for analysis in ledger.project_analyses:
# check for post monitoring
if "hooks" in ledger.data:
if "postmonitor" in ledger.data["hooks"]:
discovered_hooks = entry_points(group="asimov.hooks.postmonitor")
for hook in discovered_hooks:
# do not run cbcflow every time
if hook.name in list(
ledger.data["hooks"]["postmonitor"].keys()
) and hook.name not in ["cbcflow"]:
try:
hook.load()(deepcopy(ledger)).run()
except Exception:
pass
if chain:
ctx.invoke(report.html)
for event in sorted(ledger.get_event(event), key=lambda e: e.name):
stuck = 0

@@ -135,2 +383,3 @@ running = 0

]
for production in on_deck:

@@ -166,6 +415,6 @@

try:
if "job id" in production.meta:
if "job id" in production.meta["scheduler"]:
if not dry_run:
if production.meta["job id"] in job_list.jobs:
job = job_list.jobs[production.meta["job id"]]
if production.job_id in job_list.jobs:
job = job_list.jobs[production.job_id]
else:

@@ -175,3 +424,3 @@ job = None

logger.debug(
f"Running analysis: {event}/{production.name}, cluster {production.meta['job id']}"
f"Running analysis: {event}/{production.name}, cluster {production.job_id}"
)

@@ -183,20 +432,6 @@ click.echo("\t\tRunning under condor")

if not dry_run:
if (
job.status.lower() == "running"
and production.status == "processing"
):
if job.status.lower() == "idle":
click.echo(
" \t "
+ click.style("●", "green")
+ f" Postprocessing for {production.name} is running"
+ f" (condor id: {production.job_id})"
)
production.meta["postprocessing"]["status"] = "running"
elif job.status.lower() == "idle":
click.echo(
" \t "
+ click.style("●", "green")
+ f" {production.name} is in the queue (condor id: {production.job_id})"

@@ -277,3 +512,2 @@ )

)
production.meta["postprocessing"]["status"] = "stuck"
elif (

@@ -300,3 +534,3 @@ pipe.detect_completion()

)
production.meta["job id"] = None
production.job_id = None
except (

@@ -333,3 +567,5 @@ configparser.NoOptionError,

pipe.resurrect()
except Exception: # Sorry, but there are many ways the above command can fail
except (
Exception
): # Sorry, but there are many ways the above command can fail
production.status = "stuck"

@@ -363,5 +599,4 @@ click.echo(

for production in others:
needs = ", ".join(production.meta["needs"])
needs = ", ".join(production._needs)
click.echo(f"\t{production.name} which needs {needs}")
# Post-monitor hooks

@@ -372,9 +607,27 @@ if "hooks" in ledger.data:

for hook in discovered_hooks:
if hook.name in list(ledger.data["hooks"]["postmonitor"].keys()):
# do not run cbcflow every time
if hook.name in list(
ledger.data["hooks"]["postmonitor"].keys()
) and hook.name not in ["cbcflow"]:
try:
hook.load()(deepcopy(ledger)).run()
except Exception:
pass
except Exception as exc:
logger.warning("%s experienced %s", hook.name, type(exc))
traceback_lines = traceback.format_exc().splitlines()
traceback_text = "Traceback:\n" + "\n".join(traceback_lines)
logger.warning(traceback_text)
if chain:
ctx.invoke(report.html)
# run the cbcflow hook once to update all the info if needed
if "hooks" in ledger.data:
if "postmonitor" in ledger.data["hooks"]:
discovered_hooks = entry_points(group="asimov.hooks.postmonitor")
for hook in discovered_hooks:
if hook.name == "cbcflow":
logger.info("Found cbcflow postmonitor hook, trying to run it")
try:
hook.load()(deepcopy(ledger)).run()
except Exception:
logger.warning("Unable to run the cbcflow hook")

@@ -99,7 +99,3 @@ from copy import copy

if config.get("ledger", "engine") == "gitlab":
raise NotImplementedError(
"The Gitlab interface has been removed from this version." ""
)
elif config.get("ledger", "engine") == "yamlfile":
if config.get("ledger", "engine") == "yamlfile":
ledger.events[event.name] = event.to_dict()

@@ -106,0 +102,0 @@ ledger.save()

"""
Project management tools.
zProject management tools.
"""

@@ -197,7 +197,2 @@

)
elif config.get("ledger", "engine") == "gitlab":
raise NotImplementedError(
"The gitlab interface has been removed from this version of asimov."
)
config.set("ledger", "engine", "yamlfile")

@@ -204,0 +199,0 @@ config.set("ledger", "location", os.path.join(".asimov", "ledger.yml"))

"""
Reporting functions
"""
from datetime import datetime

@@ -5,0 +6,0 @@

@@ -20,2 +20,4 @@ """

@click.option("--other_subjects", "-o_e", "other_subjects", default=None)
@click.option("--pipeline", "-p", default=None)
@click.option("--message", "-m", "message", default=None)

@@ -26,40 +28,98 @@ @click.argument("status", required=False, default=None)

@review.command()
def add(event, production, status, message):
def add(event, production, status, message, other_subjects=None, pipeline=None):
"""
Add a review signoff or rejection to an event.
Arguments:
----------
event: str
The event for which we need to add a review for a given analysis.
If we are considering a project analysis, this will be used as the first
subject
production: str
The production for which we need to add the review status
status: str
The status of the review. Can be one of
"rejected", "approved", "preferred", "deprecated"
message: str, optional
The message to add to the review
other_subjects: str, optional
The other subjects to be considered in the project analysis and
for which we want to add a review status.
pipeline: str, optional
The pipeline used in the project analysis and for which we want to add
the review status.
"""
valid = {"REJECTED", "APPROVED", "PREFERRED", "DEPRECATED"}
events = current_ledger.get_event(event)
if events is None:
click.echo(
click.style("●", fg="red") + f" Could not find an event called {event}"
)
if other_subjects is None:
valid = {"REJECTED", "APPROVED", "PREFERRED", "DEPRECATED"}
events = current_ledger.get_event(event)
if events is None:
click.echo(
click.style("●", fg="red") + f" Could not find an event called {event}"
)
else:
for event in events:
production = [
production_o
for production_o in event.productions
if production_o.name == production
][0]
click.secho(event.name, bold=True)
if status.upper() in valid:
message = ReviewMessage(
message=message, status=status, production=production
)
production.review.add(message)
else:
click.echo(
click.style("●", fg="red")
+ f" Did not understand the review status {status.lower()}."
+ " The review status must be one of "
+ "{APPROVED, REJECTED, PREFERRED, DEPRECATED}"
)
if hasattr(event, "issue_object"):
production.event.update_data()
current_ledger.update_event(event)
click.echo(
click.style("●", fg="green")
+ f" {event.name}/{production.name} {status.lower()}"
)
else:
for event in events:
production = [
production_o
for production_o in event.productions
if production_o.name == production
][0]
click.secho(event.name, bold=True)
found = False
if status.upper() in valid:
subjects = list(other_subjects.replace("[", "").replace("]", "").split(","))
subjects = [subject.strip() for subject in subjects]
subjects = [event] + subjects
for analysis in current_ledger.project_analyses:
if (
(analysis.name == production)
and (analysis.pipeline.name == pipeline)
and (set(analysis.subjects) == set(subjects))
):
found = True
click.secho(analysis.name, bold=True)
click.secho(analysis.pipeline)
click.secho(" ".join(analysis.subjects))
message = ReviewMessage(
message=message, status=status, production=production
)
production.review.add(message)
else:
analysis.review.add(message)
click.echo(
click.style("●", fg="red")
+ f" Did not understand the review status {status.lower()}."
+ " The review status must be one of "
+ "{APPROVED, REJECTED, PREFERRED, DEPRECATED}"
click.style("●", fg="green")
+ f" {event.name}/{production.name} {status.lower()}"
)
if hasattr(event, "issue_object"):
production.event.update_data()
current_ledger.update_event(event)
click.echo(
click.style("●", fg="green")
+ f" {event.name}/{production.name} {status.lower()}"
if not found:
click.secho(
f"Unable to find a project analysis for pipeline {pipeline}, "
f"production {production} and subjects {set(subjects)}",
fg="red",
)

@@ -66,0 +126,0 @@

@@ -9,2 +9,3 @@ """

"""
import os

@@ -102,11 +103,11 @@ import datetime

HISTORY_CLASSADS = [
"CompletionDate",
"CpusProvisioned",
"GpusProvisioned",
"CumulativeSuspensionTime",
"EnteredCurrentStatus",
"MaxHosts",
"RemoteWallClockTime",
"RequestCpus",
]
"CompletionDate",
"CpusProvisioned",
"GpusProvisioned",
"CumulativeSuspensionTime",
"EnteredCurrentStatus",
"MaxHosts",
"RemoteWallClockTime",
"RequestCpus",
]
try:

@@ -179,2 +180,5 @@ jobs = schedd.history(

for key, value in kwargs.items():
setattr(self, key, value)
def __repr__(self):

@@ -359,1 +363,58 @@ out = f"<htcondor job | {self.idno} | {self.status} "

f.write(yaml.dump(self.jobs))
def get_job_priority(job_id):
"""
Returns the priority of a job given its id.
This is useful when some conitioning happens and should lead
to a change in the analysis priority compared to other events.
This returns None if the information cannot be found.
Parameters
----------
- job_id: the condor job id for which we want to get the priority
"""
# make collector to query the info
schedd = htcondor.Schedd()
# query job info
job_info = schedd.query(f"ClusterId == {job_id}.0")
if job_info:
priority = job_info[0].get("JobPrio", None)
return priority
else:
return None
def change_job_priority(job_id, extra_priority, use_old=False):
"""
Function to change the job priority for a given job.
Parameters:
-----------
- job_id: the condor job id for which we want to change the priority
- extra_priority: the extra priority that we want to add to the job
- use_old: if True, we add the new priority to the old one. Else, we simply replace it
"""
# setup a schedduler to query the priority
schedd = htcondor.Schedd()
main_job_info = schedd.query(f"ClusterId == {job_id}")
all_jobs = schedd.query()
if main_job_info:
# look for all the jobs needing to be updated (also child jobs)
jobs_to_update = []
for job in all_jobs:
if "JobBatchId" in job.keys():
if job["JobBatchId"] == main_job_info[0]["JobBatchId"]:
jobs_to_update.append(job["ClusterId"])
for j_id in jobs_to_update:
if use_old:
extra_priority += get_job_priority(j_id)
schedd.edit(f"ClusterId == {j_id}", {"JobPrio": extra_priority})
logger.info(f"Changed the priority of job {j_id} to {extra_priority}")
else:
logger.warning(f"Unable to adapt the priority for job {job_id}")

@@ -21,7 +21,7 @@ ;

seglen={{ data['segment length'] }}
window={{ likelihood['window length'] }}
flow={{ quality['lowest minimum frequency'] }}
window={{ likelihood['window length'] | default: likelihood['segment length'] }}
flow={{ quality['lowest minimum frequency'] | default: 20}}
srate={{ likelihood['sample rate'] }}
PSDlength={{ likelihood['psd length'] }}
rolloff={{ likelihood['roll off time'] | default: 0.4 }}
PSDlength={{ likelihood['psd length'] | default: likelihood['segment length'] }}
rolloff={{ likelihood['roll off time'] | default: 1.0 }}
ifo-list={{ ifos }}

@@ -28,0 +28,0 @@ segment-start={{ likelihood['segment start'] }}

@@ -67,3 +67,7 @@ {%- if production.event.repository -%}

generation-seed=42
psd-dict={ {% for ifo in ifos %}{{ifo}}:{{production.psds[ifo]}},{% endfor %} }
{%- if production._previous_assets() contains "psds" %}
psd-dict={ {% for ifo in ifos %}{{ifo}}:{{production._previous_assets()['psds'][ifo]}},{% endfor %} }
{%- elif production.meta contains "psds" %}
psd-dict={ {% for ifo in ifos %}{{ifo}}:{{production.meta['psds'][ifo]}},{% endfor %} }
{% endif %}
psd-fractional-overlap=0.5

@@ -119,3 +123,2 @@ post-trigger-duration={{ likelihood['post trigger time'] | default: 2.0 }}

log-directory=None
online-pe=False
osg={{ scheduler['osg'] | default: False }}

@@ -122,0 +125,0 @@ desired-sites={{ scheduler['desired sites'] | default: None }}

@@ -222,2 +222,2 @@ {%- if production.event.repository %}

# force-eta-range : the usual doesn't awlays work; this uses 20:1 prior range, should be set consistently to above
# force-eta-range="[0.0453514739,0.24999999999]"
# force-eta-range="[0.0453514739,0.24999999999]"

@@ -19,2 +19,3 @@ """

"""
from tinydb import Query, TinyDB

@@ -21,0 +22,0 @@

@@ -5,7 +5,3 @@ """

import glob
import os
import pathlib
from copy import deepcopy
import configparser
import subprocess

@@ -16,12 +12,7 @@

from ligo.gracedb.rest import GraceDb, HTTPError
from liquid import Liquid
from asimov import config, logger, LOGGER_LEVEL
from asimov.pipelines import known_pipelines
from asimov.storage import Store
from asimov.utils import update, diff_dict
from asimov.analysis import SubjectAnalysis, GravitationalWaveTransient
from .git import EventRepo
from .ini import RunConfiguration
from .review import Review

@@ -47,6 +38,5 @@ status_map = {

def __init__(self, message, issue=None, production=None):
def __init__(self, message, production=None):
super(DescriptionException, self).__init__(message)
self.message = message
self.issue = issue
self.production = production

@@ -70,14 +60,3 @@

def submit_comment(self):
"""
Submit this exception as a comment on the gitlab
issue for the event.
"""
if self.issue:
self.issue.add_label("yaml-error", state=False)
self.issue.add_note(self.__repr__())
else:
print(self.__repr__())
class Event:

@@ -126,2 +105,7 @@ """

if "ledger" in kwargs:
self.ledger = kwargs["ledger"]
else:
self.ledger = None
if repository:

@@ -150,10 +134,2 @@ if "git@" in repository or "https://" in repository:

self.issue_object = None
if "issue" in kwargs:
if kwargs["issue"]:
self.issue_object = kwargs.pop("issue")
self.from_notes()
else:
self.issue_object = None
self.productions = []

@@ -164,25 +140,12 @@ self.graph = nx.DiGraph()

for production in kwargs["productions"]:
try:
if ("analyses" in production) or ("productions" in production):
self.add_production(
Production.from_dict(
production, event=self, issue=self.issue_object
)
SubjectAnalysis.from_dict(production, subject=self)
)
except DescriptionException as error:
error.submit_comment()
self.productions = []
self.graph = nx.DiGraph()
if "productions" in kwargs:
for production in kwargs["productions"]:
try:
else:
self.add_production(
Production.from_dict(
production, event=self, issue=self.issue_object
production, subject=self, ledger=self.ledger
)
)
except DescriptionException as error:
error.submit_comment()
self._check_required()

@@ -200,2 +163,6 @@

@property
def analyses(self):
return self.productions
def __eq__(self, other):

@@ -270,3 +237,2 @@ if isinstance(other, Event):

"""
if production.name in [production_o.name for production_o in self.productions]:

@@ -281,10 +247,9 @@ raise ValueError(

if production.dependencies:
dependencies = production.dependencies
dependencies = [
production
for production in self.productions
if production.name in dependencies
]
for dependency in dependencies:
self.graph.add_edge(dependency, production)
for dependency in production.dependencies:
if dependency == production:
continue
analysis_dict = {
production.name: production for production in self.productions
}
self.graph.add_edge(analysis_dict[dependency], production)

@@ -295,7 +260,7 @@ def __repr__(self):

@classmethod
def from_dict(cls, data, issue=None, update=False, ledger=None):
def from_dict(cls, data, update=False, ledger=None):
"""
Convert a dictionary representation of the event object to an Event object.
"""
event = cls(**data, issue=issue, update=update, ledger=ledger)
event = cls(**data, update=update, ledger=ledger)
if ledger:

@@ -306,22 +271,20 @@ ledger.add_event(event)

@classmethod
def from_yaml(cls, data, issue=None, update=False, repo=True, ledger=None):
def from_yaml(cls, data, update=False, repo=True, ledger=None):
"""
Parse YAML to generate this event.
Parse YAML to generate this event.
Parameters
----------
data : str
YAML-formatted event specification.
issue : int
The gitlab issue which stores this event.
update : bool
Flag to determine if the repository is updated when loaded.
Defaults to False.
ledger : `asimov.ledger.Ledger`
An asimov ledger which the event should be included in.
| Parameters
----------
data : str
YAML-formatted event specification.
update : bool
Flag to determine if the repository is updated when loaded.
Defaults to False.
ledger : `asimov.ledger.Ledger`
An asimov ledger which the event should be included in.
Returns
-------
Event
An event.
Returns
-------
Event
An event.
"""

@@ -341,7 +304,15 @@ data = yaml.safe_load(data)

try:
calibration = data["data"]["calibration"]
except KeyError:
calibration = {}
if "productions" in data:
if isinstance(data["productions"], type(None)):
data["productions"] = []
if "working directory" not in data:
data["working directory"] = os.path.join(
config.get("general", "rundir_default"), data["name"]
)
if not repo and "repository" in data:
data.pop("repository")
event = cls.from_dict(data, update=update, ledger=ledger)
if "productions" in data:

@@ -353,15 +324,2 @@ if isinstance(data["productions"], type(None)):

if "interferometers" in data and "event time" in data:
if calibration.keys() != data["interferometers"]:
# We need to fetch the calibration data
from asimov.utils import find_calibrations
try:
data["data"]["calibration"] = find_calibrations(data["event time"])
except ValueError:
logger.warning(
f"Could not find calibration files for {data['name']}"
)
if "working directory" not in data:

@@ -374,43 +332,6 @@ data["working directory"] = os.path.join(

data.pop("repository")
event = cls.from_dict(data, issue=issue, update=update, ledger=ledger)
event = cls.from_dict(data, update=update, ledger=ledger)
if issue:
event.issue_object = issue
event.from_notes()
return event
@classmethod
def from_issue(cls, issue, update=False, repo=True):
"""
Parse an issue description to generate this event.
Parameters
----------
update : bool
Flag to determine if the repository is updated when loaded.
Defaults to False.
"""
text = issue.text.split("---")
event = cls.from_yaml(text[1], issue, update=update, repo=repo)
event.text = text
# event.from_notes()
return event
def from_notes(self):
"""
Update the event data from information in the issue comments.
Uses nested dictionary update code from
https://stackoverflow.com/questions/3232943/update-value-of-a-nested-dictionary-of-varying-depth#3233356
"""
notes_data = self.issue_object.parse_notes()
for note in notes_data:
update(self.meta, note)
def get_gracedb(self, gfile, destination):

@@ -478,19 +399,5 @@ """

for production in self.productions:
# Remove duplicate data
prod_dict = production.to_dict()[production.name]
dupes = []
prod_names = []
for key, value in prod_dict.items():
if production.name in prod_names:
continue
if key in data:
if data[key] == value:
dupes.append(key)
for dupe in dupes:
prod_dict.pop(dupe)
prod_names.append(production.name)
data["productions"].append({production.name: prod_dict})
data["productions"].append(production.to_dict(event=False))
data["working directory"] = self.work_dir
if "issue" in data:
data.pop("issue")
if "ledger" in data:

@@ -505,9 +412,4 @@ data.pop("ledger")

data = self.to_dict()
return yaml.dump(data, default_flow_style=False)
def to_issue(self):
self.text[1] = "\n" + self.to_yaml()
return "---".join(self.text)
def draw_dag(self):

@@ -533,12 +435,43 @@ """

for production in self.productions
if production.finished is False
if (production.finished is False and production.status not in {"wait"})
]
)
ends = [
x
for x in unfinished.reverse().nodes()
if unfinished.reverse().out_degree(x) == 0
]
return set(ends) # only want to return one version of each production!
ends = []
for production in unfinished.reverse().nodes():
if (
"needs settings" not in production.meta
or production.meta["needs settings"] == "default"
):
if (
unfinished.reverse().out_degree(production) == 0
and production.finished is False
):
ends.append(production)
elif "needs settings" in production.meta:
interested_pipelines = 0
if (
"minimum" in production.meta["needs settings"]
and production.meta["needs settings"]["condition"]
== "is_interesting"
):
for prod in unfinished.reverse().nodes():
if (
prod.pipeline.name != production.pipeline.name
and prod.pipeline.name in production._needs
):
if prod.meta["interest status"] is True:
interested_pipelines += 1
if (
interested_pipelines
>= production.meta["needs settings"]["minimum"]
):
ends.append(production)
ready_values = {end for end in ends if end.status.lower() == "ready"}
return set(ready_values) # only want to return one version of each production!
def build_report(self):

@@ -570,717 +503,2 @@ for production in self.productions:

class Production:
"""
A specific production run.
Parameters
----------
event : `asimov.event`
The event this production is running on.
name : str
The name of this production.
status : str
The status of this production.
pipeline : str
This production's pipeline.
comment : str
A comment on this production.
"""
def __init__(self, event, name, status, pipeline, comment=None, **kwargs):
self.event = event if isinstance(event, Event) else event[0]
self.subject = self.event
self.name = name
pathlib.Path(
os.path.join(config.get("logging", "directory"), self.event.name, name)
).mkdir(parents=True, exist_ok=True)
self.logger = logger.getChild("analysis").getChild(
f"{self.event.name}/{self.name}"
)
self.logger.setLevel(LOGGER_LEVEL)
# fh = logging.FileHandler(logfile)
# formatter = logging.Formatter("%(asctime)s - %(message)s", "%Y-%m-%d %H:%M:%S")
# fh.setFormatter(formatter)
# self.logger.addHandler(fh)
self.category = config.get("general", "calibration_directory")
if status:
self.status_str = status.lower()
else:
self.status_str = "none"
self.comment = comment
# Start by adding pipeline defaults
if "pipelines" in self.event.ledger.data:
if pipeline in self.event.ledger.data["pipelines"]:
self.meta = deepcopy(self.event.ledger.data["pipelines"][pipeline])
else:
self.meta = {}
else:
self.meta = {}
if "postprocessing" in self.event.ledger.data:
self.meta["postprocessing"] = deepcopy(
self.event.ledger.data["postprocessing"]
)
# Update with the event and project defaults
if "ledger" in self.event.meta:
self.event.meta.pop("ledger")
self.meta = update(self.meta, deepcopy(self.event.meta))
if "productions" in self.meta:
self.meta.pop("productions")
self.meta = update(self.meta, kwargs)
if "sampler" not in self.meta:
self.meta["sampler"] = {}
if "cip jobs" in self.meta:
# TODO: Should probably raise a deprecation warning
self.meta["sampler"]["cip jobs"] = self.meta["cip jobs"]
if "scheduler" not in self.meta:
self.meta["scheduler"] = {}
if "likelihood" not in self.meta:
self.meta["likelihood"] = {}
if "marginalization" not in self.meta["likelihood"]:
self.meta["likelihood"]["marginalization"] = {}
if "data" not in self.meta:
self.meta["data"] = {}
if "data files" not in self.meta["data"]:
self.meta["data"]["data files"] = {}
if "lmax" in self.meta:
# TODO: Should probably raise a deprecation warning
self.meta["sampler"]["lmax"] = self.meta["lmax"]
self.pipeline = pipeline
self.pipeline = known_pipelines[pipeline.lower()](self)
if "review" in self.meta:
self.review = Review.from_dict(self.meta["review"], production=self)
self.meta.pop("review")
else:
self.review = Review()
# Check that the upper frequency is included, otherwise calculate it
if (
"quality" in self.meta
and ("maximum frequency" not in self.meta["quality"])
and ("sample rate" in self.meta["likelihood"])
and len(self.meta["interferometers"]) > 0
) or (
list(self.meta.get("quality", {}).get("maximum frequency", {}).keys())
!= self.meta.get("interferometers")
and ("sample rate" in self.meta["likelihood"])
):
if "maximum frequency" not in self.meta["quality"]:
self.meta["quality"]["maximum frequency"] = {}
# Account for the PSD roll-off with the 0.875 factor
for ifo in self.meta["interferometers"]:
psd_rolloff = self.meta.get("likelihood", {}).get(
"roll off factor", 0.875
)
if ifo not in self.meta["quality"]["maximum frequency"]:
self.meta["quality"]["maximum frequency"][ifo] = int(
psd_rolloff * self.meta["likelihood"]["sample rate"] / 2
)
# Add a warning about roll-offs
if not ("roll off time" in self.meta["likelihood"]):
self.logger.warning(
"Using the default roll off settings (0.4-seconds); note that these may result in spectral leakage."
)
# Get the data quality recommendations
if "quality" in self.event.meta:
self.quality = self.event.meta["quality"]
else:
self.quality = {}
if "quality" in self.meta:
if "quality" in kwargs:
self.meta["quality"].update(kwargs["quality"])
self.quality = self.meta["quality"]
if ("quality" in self.meta) and ("event time" in self.meta):
if ("segment start" not in self.meta["quality"]) and (
"segment length" in self.meta["data"]
):
self.meta["likelihood"]["segment start"] = (
self.meta["event time"]
- self.meta["data"]["segment length"]
+ self.meta.get("likelihood", {}).get("post trigger time", 2)
)
# self.event.meta['likelihood']['segment start'] = self.meta['data']['segment start']
# Update waveform data
if "waveform" not in self.meta:
self.logger.info("Didn't find waveform information in the metadata")
self.meta["waveform"] = {}
if "approximant" in self.meta:
self.logger.warn(
"Found deprecated approximant information, "
"moving to waveform area of ledger"
)
approximant = self.meta.pop("approximant")
self.meta["waveform"]["approximant"] = approximant
if "reference frequency" in self.meta["likelihood"]:
self.logger.warning(
"Found deprecated ref freq information, "
"moving to waveform area of ledger"
)
ref_freq = self.meta["likelihood"].pop("reference frequency")
self.meta["waveform"]["reference frequency"] = ref_freq
# Gather the PSDs for the job
self.psds = self._collect_psds()
# Gather the appropriate prior data for this production
if "priors" in self.meta:
self.priors = self.meta["priors"]
if (
"amplitude order" in self.meta["priors"]
and "pn amplitude order" not in self.meta["waveform"]
):
self.meta["waveform"]["pn amplitude order"] = self.meta["priors"][
"amplitude order"
]
def __hash__(self):
return int(f"{hash(self.name)}{abs(hash(self.event.name))}")
def __eq__(self, other):
return (self.name == other.name) & (self.event == other.event)
def _process_dependencies(self, needs):
"""
Process the dependencies list for this production.
"""
return needs
@property
def dependencies(self):
if "needs" in self.meta:
return self._process_dependencies(self.meta["needs"])
else:
return None
def results(self, filename=None, handle=False, hash=None):
store = Store(root=config.get("storage", "results_store"))
if not filename:
try:
items = store.manifest.list_resources(self.event.name, self.name)
return items
except KeyError:
return None
elif handle:
return open(
store.fetch_file(self.event.name, self.name, filename, hash), "r"
)
else:
return store.fetch_file(self.event.name, self.name, filename, hash=hash)
@property
def rel_psds(self):
"""
Return the relative path to a PSD for a given event repo.
"""
rels = {}
for ifo, psds in self.psds.items():
psd = self.psds[ifo]
psd = psd.split("/")
rels[ifo] = "/".join(psd[-3:])
return rels
@property
def reference_frame(self):
"""
Calculate the appropriate reference frame.
"""
ifos = self.meta["interferometers"]
if len(ifos) == 1:
return ifos[0]
else:
return "".join(ifos[:2])
def get_meta(self, key):
"""
Get the value of a metadata attribute, or return None if it doesn't
exist.
"""
if key in self.meta:
return self.meta[key]
else:
return None
def set_meta(self, key, value):
"""
Set a metadata attribute which doesn't currently exist.
"""
if key not in self.meta:
self.meta[key] = value
self.event.ledger.update_event(self.event)
else:
raise ValueError
@property
def finished(self):
finished_states = ["uploaded"]
return self.status in finished_states
@property
def status(self):
return self.status_str.lower()
@status.setter
def status(self, value):
self.status_str = value.lower()
self.event.ledger.update_event(self.event)
@property
def job_id(self):
if "job id" in self.meta:
return self.meta["job id"]
else:
return None
@job_id.setter
def job_id(self, value):
self.meta["job id"] = value
if self.event.issue_object:
self.event.issue_object.update_data()
def to_dict(self, event=True):
"""
Return this production as a dictionary.
Parameters
----------
event : bool
If set to True the output is designed to be included nested within an event.
The event name is not included in the representation, and the production name is provided as a key.
"""
dictionary = deepcopy(self.meta)
if not event:
dictionary["event"] = self.event.name
dictionary["name"] = self.name
dictionary["status"] = self.status
dictionary["pipeline"] = self.pipeline.name.lower()
dictionary["comment"] = self.comment
dictionary["review"] = self.review.to_dicts()
if "data" in self.meta:
dictionary["data"] = self.meta["data"]
if "likelihood" in self.meta:
dictionary["likelihood"] = self.meta["likelihood"]
if "quality" in self.meta:
dictionary["quality"] = self.meta["quality"]
if "priors" in self.meta:
dictionary["priors"] = self.meta["priors"]
if "waveform" in self.meta:
dictionary["waveform"] = self.meta["waveform"]
dictionary["needs"] = self.dependencies
dictionary["job id"] = self.job_id
# Remove duplicates of pipeline defaults
if self.pipeline.name.lower() in self.event.ledger.data["pipelines"]:
defaults = deepcopy(
self.event.ledger.data["pipelines"][self.pipeline.name.lower()]
)
else:
defaults = {}
if "postprocessing" in self.event.ledger.data:
defaults["postprocessing"] = deepcopy(
self.event.ledger.data["postprocessing"]
)
if "ledger" in self.event.meta:
self.event.meta.pop("ledger")
defaults = update(defaults, deepcopy(self.event.meta))
dictionary = diff_dict(defaults, dictionary)
for key, value in self.meta.items():
if key == "operations":
continue
if "repository" in self.meta:
dictionary["repository"] = self.repository.url
if "ledger" in dictionary:
dictionary.pop("ledger")
if "pipelines" in dictionary:
dictionary.pop("pipelines")
if "productions" in dictionary:
dictionary.pop("productions")
if not event:
output = dictionary
else:
output = {self.name: dictionary}
return output
@property
def rundir(self):
"""
Return the run directory for this event.
"""
if "rundir" in self.meta:
return os.path.abspath(self.meta["rundir"])
elif "working directory" in self.event.meta:
value = os.path.join(self.event.meta["working directory"], self.name)
self.meta["rundir"] = value
return os.path.abspath(value)
else:
return None
@rundir.setter
def rundir(self, value):
"""
Set the run directory.
"""
if "rundir" not in self.meta:
self.meta["rundir"] = value
if self.event.issue_object is not None:
self.event.issue_object.update_data()
else:
raise ValueError
def get_psds(self, format="ascii", sample_rate=None):
"""
Get the PSDs for this production.
Parameters
----------
format : {ascii, xml}
The format of the PSD to be returned.
Defaults to the ascii format.
sample_rate : int
The sample rate of the PSD to be returned.
Defaults to None, in which case the sample rate in the event data is used.
Returns
-------
list
A list of PSD files for the production.
"""
if sample_rate is None:
try:
if (
"likelihood" in self.meta
and "sample rate" in self.meta["likelihood"]
):
sample_rate = self.meta["likelihood"]["sample rate"]
else:
raise DescriptionException(
"The sample rate for this event cannot be found.",
issue=self.event.issue_object,
production=self.name,
)
except Exception as e:
raise DescriptionException(
"The sample rate for this event cannot be found.",
issue=self.event.issue_object,
production=self.name,
) from e
if (len(self.psds) > 0) and (format == "ascii"):
return self.psds
elif format == "xml":
# TODO: This is a hack, and we need a better way to sort this.
files = glob.glob(
f"{self.event.repository.directory}/{self.category}/psds/{sample_rate}/*.xml.gz"
)
return files
def get_timefile(self):
"""
Find this event's time file.
Returns
-------
str
The location of the time file.
"""
try:
return self.event.repository.find_timefile(self.category)
except FileNotFoundError:
new_file = os.path.join("gps.txt")
with open(new_file, "w") as f:
f.write(f"{self.event.meta['event time']}")
self.logger.info(
f"Created a new time file in {new_file} with time {self.event.meta['event time']}"
)
self.event.repository.add_file(
new_file,
os.path.join(self.category, new_file),
"Added a new GPS timefile.",
)
return new_file
def get_coincfile(self):
"""
Find this event's coinc.xml file.
Returns
-------
str
The location of the time file.
"""
try:
coinc = self.event.repository.find_coincfile(self.category)
return coinc
except FileNotFoundError:
self.logger.info("Could not find a coinc.xml file")
savepath = os.path.abspath(
os.path.join(
self.event.repository.directory, self.category, "coinc.xml"
),
)
print(savepath)
self.event.get_gracedb(
"coinc.xml",
savepath,
)
coinc = self.event.repository.find_coincfile(self.category)
return coinc
def get_configuration(self):
"""
Get the configuration file contents for this event.
"""
if "ini" in self.meta:
ini_loc = self.meta["ini"]
else:
# We'll need to search the repository for it.
try:
ini_loc = self.event.repository.find_prods(self.name, self.category)[0]
if not os.path.exists(ini_loc):
raise ValueError("Could not open the ini file.")
except IndexError:
raise ValueError("Could not open the ini file.")
try:
ini = RunConfiguration(ini_loc)
except ValueError:
raise ValueError("Could not open the ini file")
except configparser.MissingSectionHeaderError:
raise ValueError("This isn't a valid ini file")
return ini
@classmethod
def from_dict(cls, parameters, event, issue=None):
name, pars = list(parameters.items())[0]
# Check that pars is a dictionary
if not isinstance(pars, dict):
if "event" in parameters:
parameters.pop("event")
if "status" not in parameters:
parameters["status"] = "ready"
return cls(event=event, **parameters)
# Check all of the required parameters are included
if not {"status", "pipeline"} <= pars.keys():
raise DescriptionException(
f"Some of the required parameters are missing from {name}", issue, name
)
if "comment" not in pars:
pars["comment"] = None
if "event" in pars:
pars.pop(event)
return cls(event, name, **pars)
def __repr__(self):
return f"<Production {self.name} for {self.event} | status: {self.status}>"
def _collect_psds(self, format="ascii"):
"""
Collect the required psds for this production.
"""
psds = {}
# If the PSDs are specifically provided in the ledger,
# use those.
if format == "ascii":
keyword = "psds"
elif format == "xml":
keyword = "xml psds"
else:
raise ValueError(f"This PSD format ({format}) is not recognised.")
if keyword in self.meta:
# if self.meta["likelihood"]["sample rate"] in self.meta[keyword]:
psds = self.meta[keyword] # [self.meta["likelihood"]["sample rate"]]
# First look through the list of the job's dependencies
# to see if they're provided by a job there.
elif self.dependencies:
productions = {}
for production in self.event.productions:
productions[production.name] = production
for previous_job in self.dependencies:
try:
# Check if the job provides PSDs as an asset and were produced with compatible settings
if keyword in productions[previous_job].pipeline.collect_assets():
if self._check_compatible(productions[previous_job]):
psds = productions[previous_job].pipeline.collect_assets()[
keyword
]
break
else:
self.logger.info(
f"The PSDs from {previous_job} are not compatible with this job."
)
else:
psds = {}
except Exception:
psds = {}
# Otherwise return no PSDs
else:
psds = {}
for ifo, psd in psds.items():
self.logger.debug(f"PSD-{ifo}: {psd}")
return psds
def _check_compatible(self, other_production):
"""
Check that the data settings in two productions are sufficiently compatible
that one can be used as a dependency of the other.
"""
compatible = True
# Check that the sample rate of this analysis is equal or less than that of the preceeding analysis
# to ensure that PSDs have points at the correct frequencies.
try:
compatible = self.meta.get("likelihood", {}).get(
"sample rate", None
) <= other_production.meta.get("likelihood", {}).get("sample rate", None)
except TypeError:
# One or more sample rates are missing so these can't be deemed compatible.
return False
tests = [
("data", "channels"),
("data", "frame types"),
("data", "segment length"),
]
for test in tests:
compatible &= self.meta.get(test[0], {}).get(
test[1], None
) == other_production.meta.get(test[0], {}).get(test[1], None)
return compatible
def make_config(self, filename, template_directory=None, dryrun=False):
"""
Make the configuration file for this production.
Parameters
----------
filename : str
The location at which the config file should be saved.
template_directory : str, optional
The path to the directory containing the pipeline config templates.
Defaults to the directory specified in the asimov configuration file.
"""
self.logger.info("Creating config file.")
self.psds = self._collect_psds()
self.xml_psds = self._collect_psds(format="xml")
if "template" in self.meta:
template = f"{self.meta['template']}.ini"
else:
template = f"{self.pipeline.name.lower()}.ini"
pipeline = self.pipeline
if hasattr(pipeline, "config_template"):
template_file = pipeline.config_template
else:
try:
template_directory = config.get("templating", "directory")
template_file = os.path.join(f"{template_directory}", template)
if not os.path.exists(template_file):
raise Exception
except Exception:
from pkg_resources import resource_filename
template_file = resource_filename("asimov", f"configs/{template}")
self.logger.info(f"Using {template_file}")
liq = Liquid(template_file)
rendered = liq.render(production=self, config=config)
if not dryrun:
with open(filename, "w") as output_file:
output_file.write(rendered)
self.logger.info(f"Saved as {filename}")
else:
print(rendered)
def build_report(self):
if self.pipeline:
self.pipeline.build_report()
def html(self):
"""
An HTML representation of this production.
"""
production = self
card = ""
card += f"<div class='asimov-analysis asimov-analysis-{self.status}'>"
card += f"<h4>{self.name}"
if self.comment:
card += (
f""" <small class="asimov-comment text-muted">{self.comment}</small>"""
)
card += "</h4>"
if self.status:
card += f"""<p class="asimov-status">
<span class="badge badge-pill badge-{status_map[self.status]}">{self.status}</span>
</p>"""
if self.pipeline:
card += f"""<p class="asimov-pipeline-name">{self.pipeline.name}</p>"""
if self.pipeline:
# self.pipeline.collect_pages()
card += self.pipeline.html()
if self.rundir:
card += f"""<p class="asimov-rundir"><code>{production.rundir}</code></p>"""
else:
card += """&nbsp;"""
if "approximant" in production.meta:
card += f"""<p class="asimov-attribute">Waveform approximant:
<span class="asimov-approximant">{production.meta['approximant']}</span>
</p>"""
card += """&nbsp;"""
card += """</div>"""
if len(self.review) > 0:
for review in self.review:
card += review.html()
return card
Production = GravitationalWaveTransient

@@ -61,3 +61,6 @@ import glob

os.makedirs(location, exist_ok=True)
repo = git.Repo.init(location)
try:
repo = git.Repo.init(location, initial_branch="master")
except Exception:
repo = git.Repo.init(location)
os.makedirs(os.path.join(location, directory), exist_ok=True)

@@ -193,3 +196,5 @@ with open(os.path.join(location, directory, ".gitkeep"), "w") as f:

"""
coinc_file = glob.glob(os.path.join(os.getcwd(), self.directory, category, "*coinc*.xml"))
coinc_file = glob.glob(
os.path.join(os.getcwd(), self.directory, category, "*coinc*.xml")
)

@@ -218,3 +223,8 @@ if len(coinc_file) > 0:

self.update()
path = f"{os.path.join(os.getcwd(), self.directory, category)}/{name}.ini"
if category is not None:
path = f"{os.path.join(os.getcwd(), self.directory, category)}/{name}.ini"
else:
category = "project_analyses"
path = f"{os.path.join(os.getcwd(), self.directory)}/{name}.ini"
return [path]

@@ -286,3 +296,3 @@

----------
event : `asimov.gitlab.EventIssue`
event : `asimov.event.Event`
The event which the preferred upload is being prepared for.

@@ -289,0 +299,0 @@ prods : list

@@ -9,2 +9,3 @@ """

"""
import ast

@@ -11,0 +12,0 @@ import getpass

"""
Code for the project ledger.
"""
import yaml

@@ -8,2 +9,3 @@

import shutil
from functools import reduce

@@ -13,2 +15,3 @@ import asimov

from asimov import config
from asimov.analysis import ProjectAnalysis
from asimov.event import Event, Production

@@ -33,13 +36,4 @@ from asimov.utils import update, set_directory

DatabaseLedger.create()
elif engine == "gitlab":
raise NotImplementedError(
"This hasn't been ported to the new interface yet. Stay tuned!"
)
elif engine == "gitlab":
raise NotImplementedError(
"This hasn't been ported to the new interface yet. Stay tuned!"
)
class YAMLLedger(Ledger):

@@ -57,7 +51,7 @@ def __init__(self, location=None):

]
self.events = {ev["name"]: ev for ev in self.data["events"]}
self._all_events = [
Event(**self.events[event], ledger=self)
for event in self.events.keys()
]
Event(**self.events[event], ledger=self) for event in self.events.keys()
]
self.data.pop("events")

@@ -73,2 +67,3 @@

data["events"] = []
data["project analyses"] = []
data["project"] = {}

@@ -86,2 +81,13 @@ data["project"]["name"] = name

def update_analysis_in_project_analysis(self, analysis):
"""
Function to update an analysis contained in the project analyses
"""
for i in range(len(self.data["project analyses"])):
if self.data["project analyses"][i]["name"] == analysis.name:
dict_to_save = analysis.to_dict().copy()
dict_to_save["status"] = analysis.status
self.data["project analyses"][i] = dict_to_save
self.save()
def delete_event(self, event_name):

@@ -125,14 +131,47 @@ """

def add_event(self, event):
def add_subject(self, subject):
"""Add a new subject to the ledger."""
if "events" not in self.data:
self.data["events"] = []
self.events[event.name] = event.to_dict()
self.events[subject.name] = subject.to_dict()
self.save()
def add_production(self, event, production):
event.add_production(production)
self.events[event.name] = event.to_dict()
def add_event(self, event):
self.add_subject(subject=event)
def add_analysis(self, analysis, event=None):
"""
Add an analysis to the ledger.
This method can accept any of the forms of analysis supported by asimov, and
will determine the correct way to add them to the ledger.
Parameters
----------
analysis : `asimov.Analysis`
The analysis to be added to the ledger.
event : str, optional
The name of the event which the analysis should be added to.
This is not required for project analyses.
Examples
--------
"""
if isinstance(analysis, ProjectAnalysis):
names = [ana["name"] for ana in self.data["project analyses"]]
if analysis.name not in names:
self.data["project analyses"].append(analysis.to_dict())
else:
raise ValueError(
"An analysis with that name already exists in the ledger."
)
else:
event.add_production(analysis)
self.events[event.name] = event.to_dict()
self.save()
def add_production(self, event, production):
self.add_analysis(analysis=production, event=event)
def get_defaults(self):

@@ -157,2 +196,9 @@ """

@property
def project_analyses(self):
return [
ProjectAnalysis.from_dict(analysis, ledger=self)
for analysis in self.data["project analyses"]
]
def get_event(self, event=None):

@@ -193,6 +239,10 @@ if event:

productions = filter(
lambda x: x.meta[parameter] == value
if (parameter in x.meta)
else (
getattr(x, parameter) == value if hasattr(x, parameter) else False
lambda x: (
x.meta[parameter] == value
if (parameter in x.meta)
else (
getattr(x, parameter) == value
if hasattr(x, parameter)
else False
)
),

@@ -255,43 +305,22 @@ productions,

def get_productions(self, event=None, filters=None):
"""Get a list of productions either for a single event or for all events.
def get_productions(self, event, filters=None, query=None):
"""
Get all of the productions for a given event.
"""
Parameters
----------
event : str
The name of the event to pull productions from.
Optional; if no event is specified then all of the productions are
returned.
if not filters and not query:
productions = self.db.query("production", "event", event)
filters : dict
A dictionary of parameters to filter on.
Examples
--------
FIXME: Add docs.
"""
if event:
productions = self.get_event(event).productions
else:
productions = []
for event_i in self.get_event():
for production in event_i.productions:
productions.append(production)
def apply_filter(productions, parameter, value):
productions = filter(
lambda x: x.meta[parameter] == value
if (parameter in x.meta)
else (
getattr(x, parameter) == value if hasattr(x, parameter) else False
),
productions,
queries_1 = self.db.Q["event"] == event
queries = [
self.db.Q[parameter] == value for parameter, value in filters.items()
]
productions = self.db.tables["production"].search(
queries_1 & reduce(lambda x, y: x & y, queries)
)
return productions
if filters:
for parameter, value in filters.items():
productions = apply_filter(productions, parameter, value)
return list(productions)
event = self.get_event(event)
return [
Production.from_dict(dict(production), event) for production in productions
]
"""
The locutus script.
"""
import os

@@ -5,0 +6,0 @@

@@ -13,5 +13,3 @@ """

# from .gitlab import EventIssue
logging.getLogger("werkzeug").setLevel(logging.WARNING)

@@ -81,3 +79,3 @@ logging.getLogger("MARKDOWN").setLevel(logging.WARNING)

The free-form message for the log message.
channels : list, or str, {"file", "mattermost", "gitlab"}, optional
channels : list, or str, {"file", "mattermost"}, optional
The list of places where the log message should be sent.

@@ -111,3 +109,3 @@ Defaults to file only.

The free-form message for the log message.
channels : list, or str, {"file", "mattermost", "gitlab"}, optional
channels : list, or str, {"file", "mattermost"}, optional
The list of places where the log message should be sent.

@@ -130,3 +128,3 @@ Defaults to file only.

The free-form message for the log message.
channels : list, or str, {"file", "mattermost", "gitlab"}, optional
channels : list, or str, {"file", "mattermost"}, optional
The list of places where the log message should be sent.

@@ -149,3 +147,3 @@ Defaults to file only.

The free-form message for the log message.
channels : list, or str, {"file", "mattermost", "gitlab"}, optional
channels : list, or str, {"file", "mattermost"}, optional
The list of places where the log message should be sent.

@@ -218,3 +216,3 @@ Defaults to file only.

The free-form message for the log message.
channels : list, or str, {"file", "mattermost", "gitlab"}, optional
channels : list, or str, {"file", "mattermost"}, optional
The list of places where the log message should be sent.

@@ -221,0 +219,0 @@ Defaults to file only.

@@ -9,5 +9,5 @@ """Defines the interface with generic analysis pipelines."""

import asimov.analysis
warnings.filterwarnings("ignore", module="htcondor")
import htcondor # NoQA

@@ -47,12 +47,3 @@

def submit_comment(self):
"""
Submit this exception as a comment on the gitlab
issue for the event.
"""
if self.issue:
self.issue.add_label("pipeline-error", state=False)
self.issue.add_note(self.__repr__())
class PipelineLogger:

@@ -99,7 +90,13 @@ """Log things for pipelines."""

self.category = production.category
try:
self.category = production.category
except AttributeError:
self.category = None
self.logger = logger.getChild(
f"analysis.{production.event.name}/{production.name}"
)
if isinstance(production, asimov.analysis.ProjectAnalysis):
full_name = f"ProjectAnalysis/{production.name}"
else:
full_name = f"analysis.{production.event.name}/{production.name}"
self.logger = logger.getChild(full_name)
self.logger.setLevel(LOGGER_LEVEL)

@@ -147,4 +144,7 @@

self.production.status = "finished"
self.production.meta.pop("job id")
# Need to determine the correct list of post-processing jobs here
# self.production.meta.pop("job id")
def collect_assets(self):

@@ -285,2 +285,5 @@ """

def build(self):
pass
def build_report(self, reportformat="html"):

@@ -287,0 +290,0 @@ """

@@ -13,2 +13,4 @@ import sys

from asimov.pipelines.pesummary import PESummary
discovered_pipelines = entry_points(group="asimov.pipelines")

@@ -22,2 +24,3 @@

"lalinference": LALInference,
"pesummary": PESummary,
}

@@ -24,0 +27,0 @@

@@ -16,3 +16,3 @@ """BayesWave Pipeline specification."""

from ..git import AsimovFileNotFound
from ..pipeline import Pipeline, PipelineException, PipelineLogger
from ..pipeline import Pipeline, PipelineException
from ..storage import AlreadyPresentException, Store

@@ -39,5 +39,10 @@

super(BayesWave, self).__init__(production, category)
self.logger.warning(
"The Bayeswave interface built into asimov will be removed "
"in v0.7 of asimov, and replaced with an integration from an "
"external package."
)
self.logger.info("Using the Bayeswave pipeline")
if not production.pipeline.lower() == "bayeswave":
raise PipelineException
raise PipelineException("Pipeline mismatch")

@@ -169,3 +174,2 @@ try:

self.logger.info(" ".join(command))
if dryrun:

@@ -175,3 +179,2 @@ print(" ".join(command))

else:
pipe = subprocess.Popen(

@@ -259,7 +262,5 @@ command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT

self.collect_pages()
except FileNotFoundError:
PipelineLogger(
message=b"Failed to copy megaplot pages.",
production=self.production.name,
)
except FileNotFoundError as e:
self.logger.error("Failed to copy the megaplot output")
self.logger.exception(e)

@@ -269,8 +270,5 @@ try:

self.store_assets()
except Exception:
PipelineLogger(
message=b"Failed to store PSDs.",
issue=self.production.event.issue_object,
production=self.production.name,
)
except Exception as e:
self.logger.error("Failed to store the PSDs")
self.logger.exception(e)

@@ -354,2 +352,4 @@ if "supress" in self.production.meta["quality"]:

self.logger.info((" ".join(command)))
if dryrun:

@@ -459,7 +459,14 @@ print(" ".join(command))

psds = {}
results_dir = glob.glob(f"{self.production.rundir}/trigtime_*")[0]
for det in self.production.meta["interferometers"]:
asset = os.path.join(
results_dir, "post", "clean", f"glitch_median_PSD_forLI_{det}.dat"
asset = glob.glob(
os.path.join(
self.production.rundir,
"trigtime*",
"post",
"clean",
f"glitch_median_PSD_forLI_{det}.dat",
)
)
if len(asset) > 0:
asset = asset[0]
if os.path.exists(asset):

@@ -494,3 +501,3 @@ psds[det] = asset

store = Store(root=config.get("storage", "directory"))
sample_rate = self.production.meta["quality"]["sample-rate"]
sample_rate = self.production.meta["likelihood"]["sample rate"]
orig_PSD_file = np.genfromtxt(

@@ -497,0 +504,0 @@ os.path.join(

@@ -12,5 +12,8 @@ """Bilby Pipeline specification."""

from .. import config
from ..pipeline import Pipeline, PipelineException, PipelineLogger, PESummaryPipeline
from ..pipeline import Pipeline, PipelineException, PipelineLogger
from .. import auth
from .pesummary import PESummary
class Bilby(Pipeline):

@@ -34,6 +37,11 @@ """

super(Bilby, self).__init__(production, category)
self.logger.warning(
"The Bilby interface built into asimov will be removed "
"in v0.7 of asimov, and replaced with an integration from an "
"external package."
)
self.logger.info("Using the bilby pipeline")
if not production.pipeline.lower() == "bilby":
raise PipelineException
raise PipelineException("Pipeline mismatch")

@@ -64,2 +72,3 @@ def detect_completion(self):

@auth.refresh_scitoken
def before_submit(self):

@@ -69,13 +78,5 @@ """

"""
self.logger.info("Running the before_submit hook")
sub_files = glob.glob(f"{self.production.rundir}/submit/*.submit")
for sub_file in sub_files:
if "dag" in sub_file:
continue
with open(sub_file, "r") as f_handle:
original = f_handle.read()
with open(sub_file, "w") as f_handle:
self.logger.info(f"Adding preserve_relative_paths to {sub_file}")
f_handle.write("preserve_relative_paths = True\n" + original)
pass
@auth.refresh_scitoken
def build_dag(self, psds=None, user=None, clobber_psd=False, dryrun=False):

@@ -209,4 +210,2 @@ """

try:
# to do: Check that this is the correct name of the output DAG file for billby (it
# probably isn't)
if "job label" in self.production.meta:

@@ -229,3 +228,2 @@ job_label = self.production.meta["job label"]

# with set_directory(self.production.rundir):
self.logger.info(f"Working in {os.getcwd()}")

@@ -271,3 +269,8 @@

"""
return {"samples": self.samples()}
return {
"samples": self.samples(),
"config": self.production.event.repository.find_prods(
self.production.name, self.category
)[0],
}

@@ -289,3 +292,3 @@ def samples(self, absolute=False):

def after_completion(self):
post_pipeline = PESummaryPipeline(production=self.production)
post_pipeline = PESummary(production=self.production)
self.logger.info("Job has completed. Running PE Summary.")

@@ -314,5 +317,5 @@ cluster = post_pipeline.submit_dag()

except FileNotFoundError:
messages[
log.split("/")[-1]
] = "There was a problem opening this log file."
messages[log.split("/")[-1]] = (
"There was a problem opening this log file."
)
return messages

@@ -341,5 +344,5 @@

except FileNotFoundError:
messages[
log.split("/")[-1]
] = "There was a problem opening this log file."
messages[log.split("/")[-1]] = (
"There was a problem opening this log file."
)
return messages

@@ -346,0 +349,0 @@

@@ -35,4 +35,9 @@ """LALInference Pipeline specification."""

self.logger = logger
self.logger.warning(
"The LALInference interface built into asimov will be removed "
"in v0.7 of asimov, and replaced with an integration from an "
"external package."
)
if not production.pipeline.lower() == "lalinference":
raise PipelineException
raise PipelineException("Pipeline mismatch")

@@ -195,2 +200,4 @@ def detect_completion(self):

"""
if not dryrun:
os.chdir(self.production.rundir)

@@ -240,3 +247,3 @@ with set_directory(self.production.rundir):

cluster = self.run_pesummary()
self.production.meta["job id"] = int(cluster)
self.production.job_id = int(cluster)
self.production.status = "processing"

@@ -243,0 +250,0 @@

@@ -15,3 +15,3 @@ """RIFT Pipeline specification."""

from asimov.pipeline import Pipeline, PipelineException, PipelineLogger
from asimov.pipeline import PESummaryPipeline
from asimov.pipelines.pesummary import PESummary

@@ -40,3 +40,3 @@

if not production.pipeline.lower() == "rift":
raise PipelineException
raise PipelineException("Pipeline mismatch")

@@ -67,3 +67,3 @@ if "bootstrap" in self.production.meta:

self.logger.info("Job has completed. Running PE Summary.")
post_pipeline = PESummaryPipeline(production=self.production)
post_pipeline = PESummary(production=self.production)
cluster = post_pipeline.submit_dag()

@@ -193,15 +193,34 @@

os.environ["LIGO_USER_NAME"] = f"{user}"
os.environ[
"LIGO_ACCOUNTING"
] = f"{self.production.meta['scheduler']['accounting group']}"
if "accounting group" in self.production.meta["scheduler"]:
os.environ["LIGO_USER_NAME"] = f"{user}"
os.environ["LIGO_ACCOUNTING"] = (
f"{self.production.meta['scheduler']['accounting group']}"
)
if "gpu architectures" in self.production.meta["scheduler"]:
# Write details of required GPU architectures into an environment variable.
os.environ["RIFT_REQUIRE_GPUS"] = "&&".join(
[
f"""(DeviceName=!="{device}")"""
for device in self.production.meta["scheduler"]["gpu architectures"]
]
)
if "avoid hosts" in self.production.meta["scheduler"]:
# Collect alist of specific hosts to avoid on the OSG
os.environ["RIFT_AVOID_HOSTS"] = "&&".join(
[
f"""(TARGET.Machine =!="{machine}")"""
for machine in self.production.meta["scheduler"]["avoid hosts"]
]
)
if "singularity image" in self.production.meta["scheduler"]:
# Collect the correct information for the singularity image
os.environ[
"SINGULARITY_RIFT_IMAGE"
] = f"{self.production.meta['scheduler']['singularity image']}"
os.environ[
"SINGULARITY_BASE_EXE_DIR"
] = f"{self.production.meta['scheduler']['singularity base exe directory']}"
os.environ["SINGULARITY_RIFT_IMAGE"] = (
f"{self.production.meta['scheduler']['singularity image']}"
)
os.environ["SINGULARITY_BASE_EXE_DIR"] = (
f"{self.production.meta['scheduler']['singularity base exe directory']}"
)

@@ -208,0 +227,0 @@ try:

@@ -65,2 +65,3 @@ """

)
review_ob = cls()

@@ -121,2 +122,3 @@ messages = sorted(messages, key=lambda k: k.timestamp)

out["status"] = self.status
return out

@@ -123,0 +125,0 @@

import collections
import glob
import os

@@ -8,4 +7,2 @@ from contextlib import contextmanager

import numpy as np
from asimov import logger

@@ -36,36 +33,2 @@

def find_calibrations(time):
"""
Find the calibration file for a given time.
"""
if time < 1190000000:
dir = "/home/cal/public_html/uncertainty/O2C02"
virgo = "/home/carl-johan.haster/projects/O2/C02_reruns/V_calibrationUncertaintyEnvelope_magnitude5p1percent_phase40mraddeg20microsecond.txt" # NoQA
elif time < 1290000000:
dir = "/home/cal/public_html/uncertainty/O3C01"
virgo = "/home/cbc/pe/O3/calibrationenvelopes/Virgo/V_O3a_calibrationUncertaintyEnvelope_magnitude5percent_phase35milliradians10microseconds.txt" # NoQA
elif time > 1268306607:
raise ValueError(
"This method cannot be used to add calibration envelope data beyond O3."
)
data_llo = glob.glob(f"{dir}/L1/*LLO*FinalResults.txt")
times_llo = {
int(datum.split("GPSTime_")[1].split("_C0")[0]): datum for datum in data_llo
}
data_lho = glob.glob(f"{dir}/H1/*LHO*FinalResults.txt")
times_lho = {
int(datum.split("GPSTime_")[1].split("_C0")[0]): datum for datum in data_lho
}
keys_llo = np.array(list(times_llo.keys()))
keys_lho = np.array(list(times_lho.keys()))
return {
"H1": times_lho[keys_lho[np.argmin(np.abs(keys_lho - time))]],
"L1": times_llo[keys_llo[np.argmin(np.abs(keys_llo - time))]],
"V1": virgo,
}
def update(d, u, inplace=True):

@@ -72,0 +35,0 @@ """Recursively update a dictionary."""

@@ -0,1 +1,37 @@

0.6.0
=====
This is a major feature release which introduces some change in functionality which may have a minor effect on backwards compatibility.
New features
------------
*New analysis types*: asimov 0.6.0 introduces two new kinds of analysis: subject analyses, which allow an analysis to access configurations and results from multiple other analyses defined on a given event or subject, and project analyses which allow access to multiple analyses to be accessed across multiple events. These will allow much more complicated workflows than before which allow combinations of large numbers of subsidiary results.
Changes
-------
*Deprecation warnings*: Deprecation warnings have been added to each of the pipelines (bilby, pesummary, lalinference, rift, and bayeswave) which are currently supported "out of the box" by asimov, as these will be removed in asimov 0.7 and replaced with plugins.
*Bilby*: The online pe argument has been removed from the bilby configuration template as this has been deprecated.
*Removal of gitlab interfaces*: More logic related to the disused and deprecated gitlab issue tracker interface has been removed.
*Removal of calibration envelope logic*: We have removed the logic required to find calibration files on the LIGO Scientific Collaboration computing resources. These have been moved to the asimov-gwdata pipeline, which will be used in O4 collaboration analyses.
Deprecation
-----------
+ All pipeline interfaces in the main package have been deprecated, and will be removed in v0.7.0
Merges and fixes
----------------
+ `ligo!120 <https://git.ligo.org/asimov/asimov/-/merge_requests/120>`_: Fixes the event deletion CLI option.
+ `ligo!66 <https://git.ligo.org/asimov/asimov/-/merge_requests/66>`_: Updates the post-processing interface.
+ `ligo!128 <https://git.ligo.org/asimov/asimov/-/merge_requests/128>`_: Updates to the README
+ `ligo!157 <https://git.ligo.org/asimov/asimov/-/merge_requests/157>`_: Fixes to the interface between asimov and lensingflow
0.5.8

@@ -41,3 +77,3 @@ =====

0.5.6

@@ -58,3 +94,2 @@ =====

0.5.5

@@ -61,0 +96,0 @@ =====

@@ -0,1 +1,4 @@

Analyses
========
Analyses are the fundamental operations which asimov conducts on data.

@@ -10,3 +13,3 @@

as these only require access to the data for a single event.
Before version 0.4 these were called `Productions`.
Before version 0.6 these were called `Productions`.

@@ -26,1 +29,195 @@ Event analyses

An analysis is defined as a series of configuration variables in an asimov project's ledger, which are then used to configure analysis pipelines.
.. _states:
Analysis state
--------------
In order to track each of the jobs asimov monitors it employs a simple state machine on each production.
This state is tracked within :ref:`the production ledger<ledger>` for each production with the value of ``status``.
Under normal running conditions the sequence of these states is
::
ready --> running --> finished --> processing --> uploaded
A number of additional states are also possible which interupt the normal flow of the job through ``asimov``'s workflow.
+ ``ready`` : This job should be started automatically. This state must be applied manually. The job will then be started once its dependencies are met.
+ ``stopped`` : This run has been manually stopped; stop tracking it. Must be applied manually.
+ ``stuck`` : A problem has been discovered with this run, and needs manual intervention.
+ ``uploaded`` : This event has been uploaded to the event repository.
+ ``restart`` : This job should be restarted by Olivaw. Must be applied manually.
+ ``finished`` : This job has finished running and the results are ready to be processed on the next bot check.
+ ``processing`` : The results of this job are currently being processed by ``PESummary``.
+ ``uploaded`` : This job has been uploaded to the data store.
.. note::
The way that analyses are handled by asimov changed considerably in ``asimov 0.6.0``, but generally you shouldn't notice any major differences if you've been using ``Productions`` in the past.
Creating Analyses
=================
The easiest way to create a new analysis is using an YAML Blueprint file.
We settled on using these because analyses often contain a very large number of settings, and trying to set everything up on the command line becomes rapidly impractical, and difficult to reproduce reliably, without writing a shell script.
It is also possible to create simple analyses via the command line, but a blueprint is normally the best way.
A Blueprint for a simple analysis
---------------------------------
A simple analysis is designed to only perform analysis on a single subject or event.
The blueprint for these analyses can be very short if you don't need to specify many settings for the analysis.
For example, ``Bayeswave`` is a gravitational wave analysis pipeline which is designed to perform analysis on a single stretch of data, and a single gravitational wave event.
It doesn't require information about other gravitational wave events to be shared with the analysis, and so is best modelled in asimov as a simple analysis.
A blueprint to set up a default ``Bayeswave`` run is just a handful of lines long.
.. code-block:: yaml
kind: analysis
name: sample-analysis
pipeline: bayeswave
comment: This is a simple analysis pipeline.
Save this as ``bayeswave-blueprint.yaml``, and you can then add this analysis to an event in a project by running
.. code-block:: console
$ asimov apply -e <subject name> -f bayeswave-blueprint.yaml
replacing ``<subject name>`` with the name of the subject you're adding the analysis to.
It's also possible to make a simple analysis depend on the results of a previous analysis using the ``needs`` keyword in the blueprint.
The pipeline ``giskard`` needs a datafile which is produced by the ``bayeswave`` pipeline defined in the first blueprint, so it can be created with this blueprint:
.. code-block:: yaml
kind: analysis
name: stage-2-analysis
pipeline: giskard
comment: Search for evidence of gravitational wave bending.
needs:
- sample-analysis
Here we defined the requirement by the *name* of the previous analysis, but we can also use various properties of the analysis.
This can be useful if you don't want to rely on having consistent naming between events or even projects, but you want to be able to reuse a blueprint for many subjects or even many projects.
You can update the previous blueprint to always require a ``Bayeswave`` pipeline to have completed before starting the ``giskard`` analysis:
.. code-block:: yaml
kind: analysis
name: stage-2-analysis
pipeline: giskard
comment: Search for evidence of gravitational wave bending.
waveform:
approximant: impecableOstritchv56PHMX
needs:
- "pipeline:bayeswave"
We can use any of the metadata for an analysis to create the dependencies.
For example, we can require an analysis which used the ``impecableOstritchv56PHMX`` waveform by stating ``"waveform.approximant:impecableOstritchv56PHMX"`` in the ``needs`` section.
You can also define mutliple criteria for an analyses dependencies, and asimov will wait until all of the requirements are satisfied before starting.
For example:
.. code-block:: yaml
kind: analysis
name: stage-3-analysis
pipeline: calvin
comment: A third step analysis.
needs:
- "pipeline:giskard"
- "waveform.approximant:impecableOstritchv56PHMX"
A Blueprint for a project analysis
----------------------------------
Creating a project analysis in asimov is very similar to a simple analysis, except that we can also provide a list of subjects which should be included in the analysis.
For example, the ``gladia`` pipeline is used to perform a joint analysis between two gravitational waves.
To create a ``gladia`` pipeline which analyses two events, ``GW150914`` and ``GW151012`` you need to add a ``subjects`` list to the blueprint, for example:
.. code-block:: yaml
kind: projectanalysis
name: gladia-joint
pipeline: gladia
comment: An example joint analysis.
subjects:
- GW150914
- GW151012
Creating a Project Analysis Pipeline
====================================
For the most part a Project Analysis pipeline is similar to a simple analysis pipeline.
The main difference will be how you access metadata from each event.
In the template configuration file project analyses have access to the ``analysis.subjects`` property, which provides a list of subjects available to the analysis.
These then give access to all of the metadata for each subject.
The example below uses two subjects, and to make the sample template easier to read we've assigned each to its own liquid variable.
.. code-block:: liquid
{%- assign subject_1 = analysis.subjects[0] -%}
{%- assign subject_2 = analysis.subjects[1] -%}
[event_1_settings]
{%- assign ifos = subject_1.meta['interferometers'] -%}
channel-dict = { {% for ifo in ifos %}{{ subject_1.meta['data']['channels'][ifo] }},{% endfor %} }
psd-dict = { {% for ifo in ifos %}{{ifo}}:{{subject_1.psds[ifo]}},{% endfor %} }
[event_2_settings]
{%- assign ifos = subject_2.meta['interferometers'] -%}
channel-dict = { {% for ifo in ifos %}{{ subject_2.meta['data']['channels'][ifo] }},{% endfor %} }
psd-dict = { {% for ifo in ifos %}{{ifo}}:{{subject_2.psds[ifo]}},{% endfor %} }
Postprocessing Workflows
========================
It's common to have workflows where one process produces a result which then needs to have some additional processing required which may not fit neatly into the notion of an analysis.
For example, in gravitational wave transient analyses it is common to perform parameter estimation in an ``Analysis``, but then want to run a script which will plot the outputs after the analysis is complete.
Indeed, there are tools which are designed to do this for a wide range of pipelines, in order to produce results in a common format.
In asimov these jobs are called "Postprocessing pipelines", and they share much of the same functionality as a full analysis.
Where an analysis is applied to a single event, and provides the settings which are required for a single analysis, in general postprocessing analyses are designed to be applied identically to any Analysis when it completes, if it satisfies the Pipeline's criteria.
As a concrete example, let's look at the blueprint for a postprocessing analysis.
.. code-block:: yaml
kind: postprocessing
name: combined summary pages for bilby
analyses:
- pipeline:bilby
stages:
- name: combined pages
pipeline: pesummary
This blueprint describes postprocessing using a pipeline called ``pesummary`` which applies to all events (aka subjects) in the project, and all analyses which have "bilby" as their pipeline.
In contrast to a normal Analysis, it is possible to define multiple stages to a Postprocessing workflow; for example, this blueprint creates a workflow with two stages:
.. code-block:: yaml
kind: postprocessing
name: standard pe postprocessing
analyses:
- pipeline:bilby
- pipeline:rift
stages:
- name: simple PE summary
pipeline: pesummary
- name: less simple PE summary
pipeline: pesummary
needs:
- simple PE summary
This workflow has two stages, with ``less simple PE summary`` requiring ``simple PE summary`` to complete before it is started.

@@ -0,1 +1,3 @@

:page_template: homepage.html
.. raw:: html

@@ -109,3 +111,6 @@

Citing asimov
-------------
If you use asimov in a publication please consider consulting our :ref:`citation-guide`.

@@ -132,10 +137,18 @@ ..

installation
olivaw/projects
olivaw/events
olivaw/productions
olivaw/running
olivaw/monitoring
olivaw/reporting
user-guide/projects
user-guide/events
user-guide/productions
user-guide/running
user-guide/monitoring
user-guide/reporting
storage
olivaw/review
citing
.. toctree::
:maxdepth: 2
:caption: The Ledger
ledger

@@ -175,6 +188,12 @@ .. toctree::

asimov-repository
code-overview
ledger
pipelines-dev
hooks
building-docs

@@ -202,4 +221,10 @@ asimov-repository

api/git
state
pipelines
api/gitlab
api/ini
api/ledger
api/locutus
api/logging
api/mattermost
api/olivaw
api/pipeline
config

@@ -206,0 +231,0 @@

@@ -0,1 +1,6 @@

<<<<<<< HEAD
.. _ledger:
=======
>>>>>>> v0.4-release
The Asimov Ledger

@@ -102,3 +107,5 @@ =================

For example
::
data:

@@ -113,3 +120,5 @@ calibration:

For example
::
data:

@@ -124,3 +133,5 @@ channels:

For example
::
data:

@@ -135,3 +146,5 @@ frame-types:

For example
::
data:

@@ -146,3 +159,5 @@ segments:

For example
::
data:

@@ -191,3 +206,11 @@ data files:

~~~~~~~~
.. code-block:: yaml
scheduler:
accounting group: ligo.dev.o4.cbc.pe.bilby
request cpus: 4
Prior settings

@@ -250,1 +273,28 @@ --------------

boundary: periodic
Postprocessing settings
-----------------------
Examples
~~~~~~~~
.. code-block:: yaml
postprocessing:
pesummary:
accounting group: ligo.dev.o4.cbc.pe.lalinference
cosmology: Planck15_lal
evolve spins: forward
multiprocess: 4
redshift: exact
regenerate posteriors:
- redshift
- mass_1_source
- mass_2_source
- chirp_mass_source
- total_mass_source
- final_mass_source
- final_mass_source_non_evolved
- radiated_energy
skymap samples: 2000

@@ -0,1 +1,6 @@

<<<<<<< HEAD
.. _pipeline-dev:
=======
>>>>>>> v0.4-release
Developing new pipelines

@@ -56,2 +61,3 @@ ========================

::
{"psds": {"L1": /path/to/L1/psd, "H1": /path/to/H1/psd}}

@@ -162,2 +168,230 @@

asimov.pipelines =
mypipeline = asimov:MyPipeline
mypipeline = mypipeline.asimov:MyPipeline
``setup.py``
~~~~~~~~~~~~
.. code-block:: python
import setuptools
setuptools.setup(
name = "MyPipeline",
...,
entry_points = {
"asimov.pipelines": ["mypipeline = mypipeline.asimov:MyPipeline"]
},
.. _templates:
Pipeline Configuration Templates
--------------------------------
All of the pipelines which ``asimov`` is designed to work with use some manner of configuration file to define their operation.
Previously, creating these configuration files could be a tedious manual process, but ``asimov`` allows these files to be *templated*, combining various pieces of data and metadata from the production ledger with a template configuration file to produce the configuration which is then used to generate the DAG files which run the analysis.
Details of the metadata stored in the ledger can be found in the :ref:`documentation for the ledger format<ledger>`.
In order to run a ``bilby`` job a config file is required for ``bilby_pipe``.
``Asimov`` can produce this from a template.
Config file templates can be written using the liquid templating language, and should be kept in a directory which is specified in the `asimov` configuration file under the ``templating>directory`` configuration value e.g.
.. code-block:: ini
[templating]
directory = config-templates
The liquid language allows some logic to be included in the template.
This can be used to only include a given value if an interferometer is included in the analysis.
For example:
.. code-block:: ini
spline-calibration-envelope-dict={
{% if production.meta['interferometers'] contains "H1" %}
H1:{{ production.event.repository.directory }}/{{ production.meta['calibration']['H1'] }},
{% endif %}
{% if production.meta['interferometers'] contains "L1" %}
L1:{{ production.event.repository.directory }}/{{ production.meta['calibration']['L1'] }},
{% endif %}
{% if production.meta['interferometers'] contains "V1" %}
V1:{{ production.event.repository.directory }}/{{ production.meta['calibration']['V1'] }}
{% endif %}
}
Adds only the calibration files for the required detectors to the configuration file.
The majority of the data passed to the template can be found in the ``production.meta`` dictionary.
These are stored in the same nested format as the production ledger; evbent-wide values are inherited by the production, so in the example ledger below the sample rate can be retrieved from ``production.meta['quality']['sample-rate']``, for example.
There are also a number of additional variables are available for convenience:
+ ``production.quality`` is an alias for ``production.meta['quality']``
+ ``production.psds`` provides the dictionary of PSDs for this event's specified sample rate.
+ ``production.event`` provides access to the data from the event (e.g. for the repository directory path, located at ``production.event.repository.directory``)
A full example ``bilby`` template is available below:
.. code-block:: ini
################################################################################
## Calibration arguments
################################################################################
calibration-model=CubicSpline
spline-calibration-envelope-dict={ {% if production.meta['interferometers'] contains "H1" %}H1:{{ production.event.repository.directory }}/{{ production.meta['calibration']['H1'] }},{% endif %}{% if production.meta['interferometers'] contains "L1" %}L1:{{ production.event.repository.directory }}/{{ production.meta['calibration']['L1'] }},{% endif %}{% if production.meta['interferometers'] contains "V1" %}V1:{{ production.event.repository.directory }}/{{ production.meta['calibration']['V1'] }}{% endif %} }
spline-calibration-nodes=10
spline-calibration-amplitude-uncertainty-dict=None
spline-calibration-phase-uncertainty-dict=None
################################################################################
## Data generation arguments
################################################################################
ignore-gwpy-data-quality-check=True
gps-tuple=None
gps-file=None
timeslide-file=None
timeslide-dict=None
trigger-time={{ production.meta['event time'] }}
gaussian-noise=False
n-simulation=0
data-dict=None
data-format=None
channel-dict={ {% if production.meta['interferometers'] contains "H1" %}{{ production.meta['data']['channels']['H1'] }},{% endif %} {% if production.meta['interferometers'] contains "L1" %}{{ production.meta['data']['channels']['L1'] }},{% endif %}{% if production.meta['interferometers'] contains "V1" %}{{ production.meta['data']['channels']['V1'] }}{% endif %} }
################################################################################
## Detector arguments
################################################################################
coherence-test=False
detectors={{ production.meta['interferometers'] }}
duration={{ production.meta['quality']['segment-length'] }}
generation-seed=None
psd-dict={ {% if production.meta['interferometers'] contains "H1" %}H1:{{ production.psds['H1'] }},{% endif %} {% if production.meta['interferometers'] contains "L1" %}L1:{{ production.psds['L1'] }},{% endif %} {% if production.meta['interferometers'] contains "V1" %}V1:{{ production.psds['V1'] }}{% endif %} }
psd-fractional-overlap=0.5
post-trigger-duration=2.0
sampling-frequency={{ production.meta['quality']['sample-rate'] }}
psd-length={{ production.meta['quality']['psd-length'] }}
psd-maximum-duration=1024
psd-method=median
psd-start-time=None
maximum-frequency=1024
minimum-frequency={{ production.meta['quality']['reference-frequency'] }}
zero-noise=False
tukey-roll-off=0.4
resampling-method=lal
################################################################################
## Injection arguments
################################################################################
injection=False
injection-dict=None
injection-file=None
injection-numbers=None
injection-waveform-approximant=None
################################################################################
## Job submission arguments
################################################################################
accounting=ligo.dev.o3.cbc.pe.lalinference
label={{ production.name }}
local=False
local-generation=False
local-plot=False
outdir={{ production.rundir }}
periodic-restart-time=28800
request-memory=4.0
request-memory-generation=None
request-cpus=4
singularity-image=None
scheduler=condor
scheduler-args=None
scheduler-module=None
scheduler-env=None
transfer-files=False
log-directory=None
online-pe=False
osg=False
################################################################################
## Likelihood arguments
################################################################################
distance-marginalization=True
distance-marginalization-lookup-table=None
phase-marginalization=True
time-marginalization=True
jitter-time=True
reference-frame={% if production.meta['interferometers'] contains "H1" %}H1{% endif %}{% if production.meta['interferometers'] contains "L1" %}L1{% endif %}{% if production.meta['interferometers'] contains "V1" %}V1{% endif %}
time-reference={% if production.meta['interferometers'] contains "H1" %}H1{% elsif production.meta['interferometers'] contains "L1" %}L1{% elsif production.meta['interferometers'] contains "V1" %}V1{% endif %}
likelihood-type=GravitationalWaveTransient
roq-folder=None
roq-scale-factor=1
extra-likelihood-kwargs=None
################################################################################
## Output arguments
################################################################################
create-plots=True
plot-calibration=False
plot-corner=False
plot-marginal=False
plot-skymap=False
plot-waveform=False
plot-format=png
create-summary=False
email=None
existing-dir=None
webdir=/home/pe.o3/public_html/LVC/o3b-catalog/{{ production.event.name }}/{{ production.name }}
summarypages-arguments=None
################################################################################
## Prior arguments
################################################################################
default-prior=BBHPriorDict
deltaT=0.2
prior-file=4s
prior-dict=None
convert-to-flat-in-component-mass=False
################################################################################
## Post processing arguments
################################################################################
postprocessing-executable=None
postprocessing-arguments=None
single-postprocessing-executable=None
single-postprocessing-arguments=None
################################################################################
## Sampler arguments
################################################################################
sampler=dynesty
sampling-seed=None
n-parallel=5
sampler-kwargs={'queue_size': 4, 'nlive': 2000, 'sample': 'rwalk', 'walks': 100, 'n_check_point': 2000, 'nact': 10, 'npool': 4}
################################################################################
## Waveform arguments
################################################################################
waveform-generator=bilby.gw.waveform_generator.WaveformGenerator
reference-frequency={{ production.meta['quality']['reference-frequency'] }}
waveform-approximant={{ production.meta['approximant'] }}
catch-waveform-errors=False
pn-spin-order=-1
pn-tidal-order=-1
pn-phase-order=-1
pn-amplitude-order=0
mode-array=None
frequency-domain-source-model=lal_binary_black_hole

@@ -16,6 +16,6 @@ ==================

+ :ref:`lalinference<LALInference pipelines>`
+ :ref:`bayeswave<pipelines/BayesWave pipelines>`
+ ``bilby``
+ ``RIFT``
+ :ref:`LALInference<lalinference-pipelines>`
+ :ref:`BayesWave<bayeswave-pipelines>`
+ :ref:`Bilby<bilby-pipelines>`
+ :ref:`RIFT<rift-pipelines>`

@@ -25,21 +25,3 @@ Adding new pipelines

New pipelines can be added to asimov by overloading the various methods in the :class:``asimov.pipeline.Pipeline`` class.
The most important of these is the ``build_dag`` method, which is used by the asimov framework to construct the DAG file to be submitted to the condor scheduler.
An example of a complete pipeline interface can be seen in the code for :class:``asimov.pipelines.lalinference.LALinference``.
Pipeline hooks
--------------
It is possible to customise the run process of the asimov pipeline runner using hooks.
By overloading the hook methods (listed below) inherited from the ``asimov.pipeline.Pipeline`` class additional operations can
be conducted during the processing workflow.
Hooks should take no arguments.
Implemented hooks are:
::
before_submit() --- Executed immediately before the DAG file for a pipeline is generated.
after_completion() --- Executed once execution has successfully
New pipelines can be added to asimov by overloading the various methods in the ``asimov.pipeline.Pipeline`` class.
Details for how you can develop new pipeline interfaces can be found in the :ref:`pipelines development guide<pipeline-dev>`.

@@ -0,1 +1,3 @@

.. _bayeswave-pipelines:
BayesWave pipelines

@@ -2,0 +4,0 @@ ===================

@@ -0,1 +1,3 @@

.. _bilby-pipelines:
Bilby pipelines

@@ -40,2 +42,3 @@ ===============

This section takes a list of types of marginalization to apply to the analysis (see below for an example of the syntax).
``distance``

@@ -50,2 +53,3 @@ Activates distance marginalization.

This section allows ROQs to be defined for the likelihood function.
``folder``

@@ -52,0 +56,0 @@ The location of the ROQs.

@@ -12,5 +12,8 @@ .. _lalinference-pipelines:

.. note::
The current integration with LALInference is fully reviewed and is suitable for use in collaboration analyses.
.. warning::
**v0.4.0**
The integration with LALInference has been deprecated.
It *must not* be used for collaboration parameter estimation analyses.
Examples

@@ -17,0 +20,0 @@ --------

@@ -0,1 +1,3 @@

.. _rift-pipelines:
RIFT pipelines

@@ -10,4 +12,8 @@ ==============

.. note::
The current integration with RIFT is fully reviewed and is suitable for use with all collaboration analyses.
.. warning::
**v0.4.0**
The current integration with RIFT is experimental, and is not reviewed.
It *must not* be used for collaboration parameter estimation analyses.
A reviewed version is expected to be available in the v0.5 series of releases.

@@ -14,0 +20,0 @@

Metadata-Version: 2.1
Name: asimov
Version: 0.5.8
Version: 0.6.0
Summary: A Python package for managing and interacting with data analysis jobs.

@@ -49,6 +49,7 @@ Home-page: https://git.ligo.org/asimov/asimov

[![Anaconda-Server Badge](https://anaconda.org/conda-forge/ligo-asimov/badges/version.svg)](https://anaconda.org/conda-forge/ligo-asimov)
[![Anaconda-Server Badge](https://anaconda.org/conda-forge/ligo-asimov/badges/installer/conda.svg)](https://conda.anaconda.org/conda-forge)
[![coverage report](https://git.ligo.org/asimov/asimov/badges/infrastructure-updates/coverage.svg)](https://git.ligo.org/asimov/asimov/-/commits/infrastructure-updates)
[![coverage report](https://git.ligo.org/asimov/asimov/badges/master/coverage.svg)](https://git.ligo.org/asimov/asimov/-/commits/master)
[![conda-forge version](https://anaconda.org/conda-forge/asimov/badges/version.svg)](https://anaconda.org/conda-forge/asimov/)
![pypi](https://img.shields.io/pypi/v/asimov.svg)
![tests](https://git.ligo.org/asimov/asimov/badges/master/pipeline.svg)

@@ -59,16 +60,2 @@ Asimov was developed to manage and automate the parameter estimation analyses used by the LIGO, Virgo, and KAGRA collaborations to analyse gravitational wave signals, but it aims to provide tools which can be used for other workflows.

## Branch notes
These notes relate to in-development features on this branch, and what's described here is only expected to be relevant during development.
More generally useful documentation will move to the main documentation before moving to production.
### Starting the logging server
Run in ``asimov`` directory:
```
export FLASK_APP=server
flask run
```
## Features

@@ -112,3 +99,3 @@

```
$ conda install -c conda-forge ligo-asimov
$ conda install -c conda-forge asimov
```

@@ -122,2 +109,3 @@

## Get started

@@ -124,0 +112,0 @@

@@ -7,6 +7,7 @@ # Asimov

[![Anaconda-Server Badge](https://anaconda.org/conda-forge/ligo-asimov/badges/version.svg)](https://anaconda.org/conda-forge/ligo-asimov)
[![Anaconda-Server Badge](https://anaconda.org/conda-forge/ligo-asimov/badges/installer/conda.svg)](https://conda.anaconda.org/conda-forge)
[![coverage report](https://git.ligo.org/asimov/asimov/badges/infrastructure-updates/coverage.svg)](https://git.ligo.org/asimov/asimov/-/commits/infrastructure-updates)
[![coverage report](https://git.ligo.org/asimov/asimov/badges/master/coverage.svg)](https://git.ligo.org/asimov/asimov/-/commits/master)
[![conda-forge version](https://anaconda.org/conda-forge/asimov/badges/version.svg)](https://anaconda.org/conda-forge/asimov/)
![pypi](https://img.shields.io/pypi/v/asimov.svg)
![tests](https://git.ligo.org/asimov/asimov/badges/master/pipeline.svg)

@@ -17,16 +18,2 @@ Asimov was developed to manage and automate the parameter estimation analyses used by the LIGO, Virgo, and KAGRA collaborations to analyse gravitational wave signals, but it aims to provide tools which can be used for other workflows.

## Branch notes
These notes relate to in-development features on this branch, and what's described here is only expected to be relevant during development.
More generally useful documentation will move to the main documentation before moving to production.
### Starting the logging server
Run in ``asimov`` directory:
```
export FLASK_APP=server
flask run
```
## Features

@@ -70,3 +57,3 @@

```
$ conda install -c conda-forge ligo-asimov
$ conda install -c conda-forge asimov
```

@@ -80,2 +67,3 @@

## Get started

@@ -82,0 +70,0 @@

@@ -59,1 +59,2 @@ from setuptools import setup

)

@@ -9,12 +9,167 @@ """

from importlib import reload
from unittest.mock import patch
import os
import io
import shutil
import contextlib
from click.testing import CliRunner
import asimov
from asimov.event import Production
from asimov.cli.application import apply_page
from asimov.cli import manage, project
from asimov.ledger import YAMLLedger
from asimov.pipeline import PipelineException
from asimov.cli.application import apply_page
class TestBaseAnalysis(unittest.TestCase):
pass
EVENTS = ["GW150914_095045", "GW151226_033853", "GW170814_103043"]
@classmethod
def setUpClass(cls):
cls.cwd = os.getcwd()
@classmethod
def tearDownClass(self):
os.chdir(self.cwd)
try:
shutil.rmtree(f"{self.cwd}/tests/tmp/")
except FileNotFoundError:
pass
def tearDown(self):
os.chdir(self.cwd)
shutil.rmtree(f"{self.cwd}/tests/tmp/")
def setUp(self):
reload(asimov)
reload(manage)
os.makedirs(f"{self.cwd}/tests/tmp/project")
os.chdir(f"{self.cwd}/tests/tmp/project")
runner = CliRunner()
result = runner.invoke(
project.init, ["Test Project", "--root", f"{self.cwd}/tests/tmp/project"]
)
assert result.exit_code == 0
assert result.output == "● New project created successfully!\n"
self.ledger = YAMLLedger(".asimov/ledger.yml")
f = io.StringIO()
with contextlib.redirect_stdout(f):
apply_page(
file=f"{self.cwd}/tests/test_data/testing_pe.yaml",
event=None,
ledger=self.ledger,
)
apply_page(
file=f"{self.cwd}/tests/test_data/testing_events.yaml",
ledger=self.ledger,
)
class TestSimpleAnalysis(TestBaseAnalysis):
pass
class TestEventAnalysis(TestBaseAnalysis):
pass
class TestProjectAnalysis(TestBaseAnalysis):
pass
class TestSubjectAnalysis(TestBaseAnalysis):
"""
Tests for subject analysis code.
"""
pipelines = {"bayeswave"}
EVENTS = ["GW150914_095045", "GW151226_033853", "GW170814_103043"]
def setUp(self):
super().setUp()
def test_all_events_present(self):
"""Test that all of the expected events are in the ledger."""
self.assertEqual(list(self.ledger.events.keys()), self.EVENTS)
def test_that_analyses_are_matched(self):
pass
# class TestProjectAnalysis(TestBaseAnalysis):
# """
# Tests for Project Analysis code.
# """
# def setUp(self):
# super().setUp()
# apply_page(
# file=f"{self.cwd}/tests/test_data/test_joint_analysis.yaml",
# ledger=self.ledger,
# )
# def test_that_subjects_are_accessible(self):
# """Test that it is possible to get a list of subjects from a project analysis."""
# self.assertTrue(
# self.ledger.project_analyses[0].subjects
# == ["GW150914_095045", "GW151226_033853"]
# )
# self.assertTrue(
# self.ledger.project_analyses[0].events
# == ["GW150914_095045", "GW151226_033853"]
# )
# def test_that_all_waveform_analyses_are_returned(self):
# """Test that the query returns all of the jobs specifying a specific waveform."""
# apply_page(
# file=f"{self.cwd}/tests/test_data/test_analyses.yaml",
# ledger=self.ledger,
# )
# self.assertEqual(len(self.ledger.events), 3)
# p_analysis = self.ledger.project_analyses
# print(p_analysis[1]._analysis_spec)
# analyses = p_analysis[1].analyses
# self.assertEqual(len(analyses), 3)
# def test_that_all_uploaded_analyses_are_returned(self):
# """Test that the query returns all of the jobs specifying a specific waveform."""
# apply_page(
# file=f"{self.cwd}/tests/test_data/test_analyses.yaml",
# ledger=self.ledger,
# )
# p_analysis = self.ledger.project_analyses
# analyses = p_analysis[2].analyses
# self.assertEqual(len(analyses), 3)
# def test_that_all_reviewed_analyses_are_returned(self):
# """Test that the query returns all of the jobs with a specific review state."""
# apply_page(
# file=f"{self.cwd}/tests/test_data/test_analyses.yaml",
# ledger=self.ledger,
# )
# p_analysis = self.ledger.project_analyses
# analyses = p_analysis[3].analyses
# self.assertEqual(len(analyses), 2)
# def test_mutliple_filters(self):
# """Test that the query returns all of the jobs with a specific review state."""
# apply_page(
# file=f"{self.cwd}/tests/test_data/test_analyses.yaml",
# ledger=self.ledger,
# )
# p_analysis = self.ledger.project_analyses
# analyses = p_analysis[4].analyses
# self.assertEqual(len(analyses), 1)
# for analysis in analyses:
# self.assertTrue(str(analysis.pipeline).lower() == "lalinference")
# self.assertTrue(str(analysis.meta['waveform']['approximant']).lower() == "imrphenomxphm")

@@ -64,3 +64,3 @@ """

runner = CliRunner()
result = runner.invoke(manage.manage, ['build'])

@@ -131,10 +131,10 @@ for event in EVENTS:

f = io.StringIO()
with contextlib.redirect_stdout(f):
apply_page(file = "https://git.ligo.org/asimov/data/-/raw/main/defaults/production-pe.yaml", event=None, ledger=self.ledger)
apply_page(file = "https://git.ligo.org/asimov/data/-/raw/main/defaults/production-pe-priors.yaml", event=None, ledger=self.ledger)
for event in EVENTS:
for pipeline in pipelines:
apply_page(file = f"https://git.ligo.org/asimov/data/-/raw/main/tests/{event}.yaml", event=None, ledger=self.ledger)
apply_page(file = f"https://git.ligo.org/asimov/data/-/raw/main/tests/{pipeline}.yaml", event=event, ledger=self.ledger)
#f = io.StringIO()
#with contextlib.redirect_stdout(f):
apply_page(file = "https://git.ligo.org/asimov/data/-/raw/main/defaults/production-pe.yaml", event=None, ledger=self.ledger)
apply_page(file = "https://git.ligo.org/asimov/data/-/raw/main/defaults/production-pe-priors.yaml", event=None, ledger=self.ledger)
for event in EVENTS:
for pipeline in pipelines:
apply_page(file = f"https://git.ligo.org/asimov/data/-/raw/main/tests/{event}.yaml", event=None, ledger=self.ledger)
apply_page(file = f"https://git.ligo.org/asimov/data/-/raw/main/tests/{pipeline}.yaml", event=event, ledger=self.ledger)

@@ -141,0 +141,0 @@ def test_buildsubmit_all_events(self):

@@ -35,19 +35,34 @@ import os

apply_page(file = "https://git.ligo.org/asimov/data/-/raw/main/defaults/production-pe.yaml", event=None, ledger=self.ledger)
apply_page(file = "https://git.ligo.org/asimov/data/-/raw/main/events/gwtc-2-1/GW150914_095045.yaml", event=None, ledger=self.ledger)
apply_page(file = f"{self.cwd}/tests/test_data/events_blueprint.yaml", ledger=self.ledger)
def tearDown(self):
del(self.ledger)
shutil.rmtree(f"{self.cwd}/tests/tmp/project")
def test_simple_dag(self):
"""Check that all jobs are run when there are no dependencies specified."""
apply_page(file = f"{self.cwd}/tests/test_data/test_simple_dag.yaml", event='GW150914_095045', ledger=self.ledger)
def test_dependency_list(self):
"""Check that all jobs are run when the dependencies are a chain."""
self.assertTrue(len(self.ledger.get_event('GW150914_095045')[0].productions)==0)
apply_page(file = f"{self.cwd}/tests/test_data/test_linear_dag.yaml", ledger=self.ledger)
event = self.ledger.get_event('GW150914_095045')[0]
self.assertEqual(len(event.get_all_latest()), 2)
self.assertTrue(len(event.productions[0]._needs) == 0)
self.assertTrue(len(event.productions[0].dependencies) == 0)
self.assertTrue(len(event.productions[1].dependencies) == 1)
def test_dependency_tree(self):
apply_page(file = f"{self.cwd}/tests/test_data/test_linear_dag.yaml", ledger=self.ledger)
event = self.ledger.get_event('GW150914_095045')[0]
self.assertTrue(len(event.graph.edges) == 1)
def test_linear_dag(self):
"""Check that all jobs are run when the dependencies are a chain."""
apply_page(file = f"{self.cwd}/tests/test_data/test_linear_dag.yaml", event='GW150914_095045', ledger=self.ledger)
apply_page(file = f"{self.cwd}/tests/test_data/test_linear_dag.yaml", ledger=self.ledger)
event = self.ledger.get_event('GW150914_095045')[0]
self.assertEqual(len(event.get_all_latest()), 1)
def test_simple_dag(self):
"""Check that all jobs are run when there are no dependencies specified."""
apply_page(file = f"{self.cwd}/tests/test_data/test_simple_dag.yaml", ledger=self.ledger)
event = self.ledger.get_event('GW150914_095045')[0]
self.assertEqual(len(event.get_all_latest()), 2)

@@ -59,2 +74,12 @@ def test_complex_dag(self):

event = self.ledger.get_event('GW150914_095045')[0]
print([ev.name for ev in event.get_all_latest()])
self.assertEqual(len(event.get_all_latest()), 2)
def test_query_dag(self):
"""Check that all jobs are run when the dependencies are not a chain."""
apply_page(file = f"{self.cwd}/tests/test_data/test_query_dag.yaml", event='GW150914_095045', ledger=self.ledger)
event = self.ledger.get_event('GW150914_095045')[0]
self.assertEqual(len(event.get_all_latest()), 1)

@@ -6,3 +6,3 @@ ---

comment: PSD production
status: finished
status: uploaded
---

@@ -13,3 +13,3 @@ kind: analysis

comment: PSD production
status: wait
status: ready
needs:

@@ -22,3 +22,3 @@ - Prod0

comment: PSD production
status: wait
status: ready
needs:

@@ -31,3 +31,3 @@ - Prod1

comment: PSD production
status: wait
status: ready
needs:

@@ -40,3 +40,3 @@ - Prod0

comment: PSD production
status: wait
status: ready
needs:

@@ -43,0 +43,0 @@ - Prod2

@@ -5,2 +5,4 @@ kind: analysis

comment: Bayeswave on-source PSD estimation job
event: GW150914_095045
status: ready
---

@@ -14,1 +16,3 @@ kind: analysis

- Prod0
event: GW150914_095045
status: ready

@@ -5,7 +5,9 @@ kind: analysis

comment: Bayeswave on-source PSD estimation job
event: GW150914_095045
---
kind: analysis
name: Prod1
event: GW150914_095045
pipeline: bilby
approximant: IMRPhenomXPHM
comment: Bilby parameter estimation job
"""Tests for the LALInference interface."""
import unittest
from unittest.mock import Mock, patch
from unittest.mock import patch
import shutil

@@ -10,17 +9,2 @@ import os

import io
import contextlib
from click.testing import CliRunner
from asimov.utils import set_directory
from asimov import config
from asimov.cli import project, manage
from asimov.cli import configuration
from asimov.cli.application import apply_page
from asimov.ledger import YAMLLedger
from asimov.event import Event
from asimov.pipeline import PipelineException
from asimov.pipelines.bayeswave import BayesWave

@@ -44,2 +28,3 @@ from asimov.event import Event

@unittest.skip("Skipped until Bayeswave is added to the testing environment correctly.")
class BayeswaveTests(unittest.TestCase):

@@ -58,2 +43,6 @@ """Test bayeswave interface.

cls.cwd = os.getcwd()
repo = git.Repo.init(cls.cwd+"/tests/test_data/s000000xx/")
os.chdir(cls.cwd+"/tests/test_data/s000000xx/")
os.system("git add C01_offline/Prod1_test.ini C01_offline/s000000xx_gpsTime.txt")
os.system("git commit -m 'test'")

@@ -138,13 +127,13 @@ def setUp(self):

f = io.StringIO()
with contextlib.redirect_stdout(f):
production = self.ledger.get_event(event)[0].productions[0]
with set_directory(os.path.join("checkouts", event, config.get("general", "calibration_directory"))):
production.make_config(f"{production.name}.ini")
# We need to make the workdir as this ought to be done by bayeswave_pipe
os.makedirs(os.path.join("working", event, production.name))
#with contextlib.redirect_stdout(f):
production = self.ledger.get_event(event)[0].productions[0]
with set_directory(os.path.join("checkouts", event, config.get("general", "calibration_directory"))):
# We need to make the workdir as this ought to be done by bayeswave_pipe
production.make_config(f"{production.name}.ini")
os.makedirs(os.path.join("working", event, production.name))
mock_popen.returncode=0
mock_popen.return_value.communicate.return_value=(b"Blah blah blah To submit: just run this", b"Lots of stuff on stderr")
mock_popen.returncode=0
mock_popen.return_value.communicate.return_value=(b"Blah blah blah To submit: just run this", b"Lots of stuff on stderr")
production.pipeline.build_dag(dryrun=False)
production.pipeline.build_dag(dryrun=False)

@@ -156,5 +145,4 @@ with contextlib.redirect_stdout(f):

production.pipeline.submit_dag(dryrun=False)
self.ledger.update_event(production.event)
self.ledger.update_event(production.event)

@@ -192,21 +180,24 @@ self.assertEqual(production.job_id, 999)

@patch('subprocess.Popen.communicate')
@patch('subprocess.Popen')
def test_bad_dag_build(self, mock_popen, mock_pcomm):
"""Check that things behave as expected if the DAG file can't be build."""
apply_page(file = "https://git.ligo.org/asimov/data/-/raw/main/defaults/production-pe.yaml", event=None, ledger=self.ledger)
apply_page(file = "https://git.ligo.org/asimov/data/-/raw/main/defaults/production-pe-priors.yaml", event=None, ledger=self.ledger)
event = "GW150914_095045"
pipeline = "bayeswave"
apply_page(file = f"https://git.ligo.org/asimov/data/-/raw/main/tests/{event}.yaml", event=None, ledger=self.ledger)
apply_page(file = f"https://git.ligo.org/asimov/data/-/raw/main/tests/{pipeline}.yaml", event=event, ledger=self.ledger)
@classmethod
def tearDownClass(cls):
"""Destroy all the products of this test."""
os.system(f"{cls.cwd}/tests/tmp/-rf")
os.system(f"{cls.cwd}/tests/test_data/s000000xx/.git -rf")
try:
shutil.rmtree("/tmp/S000000xx")
except:
pass
mock_popen.returncode=1 #"Could not build the DAG file"
mock_popen.return_value.communicate.return_value=(b"Could not be created", b"Lots of stuff on stderr")
def tearDown(self):
os.system(f"{self.cwd}/tests/tmp/-rf")
production = self.ledger.get_event(event)[0].productions[0]
with set_directory(os.path.join("checkouts", event, config.get("general", "calibration_directory"))):
production.make_config(f"{production.name}.ini")
with self.assertRaises(PipelineException):
production.pipeline.build_dag(dryrun=False)
def setUp(self):
"""Create a pipeline."""
self.event = Event.from_yaml(TEST_YAML.format(self.cwd))
self.pipeline = LALInference(self.event.productions[0])
out = self.pipeline.build_dag()
def test_dag(self):
"""Check that a DAG is actually produced."""
print(f"{self.cwd}/tests/tmp/s000000xx/C01_offline/Prod1/lalinference_1248617392-1248617397.dag")
self.assertEqual(os.path.exists(f"{self.cwd}/tests/tmp/s000000xx/C01_offline/Prod1/lalinference_1248617392-1248617397.dag"), 1)

@@ -54,3 +54,3 @@ """Tests for the RIFT interface."""

# @unittest.skip("Skipped temporarily while RIFT is updated")
@unittest.skip("Skipped temporarily while RIFT is updated")
def test_submit_cli(self):

@@ -72,4 +72,6 @@ """Check that a RIFT config file can be built."""

result = runner.invoke(manage.submit, "--dryrun")
print(result.output)
self.assertTrue("util_RIFT_pseudo_pipe.py" in result.output)
@unittest.skip("Skipped temporarily while RIFT is updated")
def test_build_api(self):

@@ -99,3 +101,3 @@ """Check that a RIFT config file can be built."""

@unittest.skip("Skipped temporarily while RIFT is updated")
def test_build_api_non_default_calibration(self):

@@ -102,0 +104,0 @@ """Check that a RIFT correctly picks up non C01 calibration."""

@@ -38,3 +38,3 @@ import unittest

productions:
- Prod0:
- name: Prod0
pipeline: lalinference

@@ -66,3 +66,3 @@ comment: PSD production

productions:
- Prod0:
- name: Prod0
pipeline: lalinference

@@ -100,5 +100,5 @@ comment: PSD production

def test_empty_register_creation(self):
"""Check that productions initialise an empty register if no review information attached."""
self.assertEqual(len(self.event_no_review.productions[0].review), 0)
# def test_empty_register_creation(self):
# """Check that productions initialise an empty register if no review information attached."""
# self.assertEqual(len(self.event_no_review.productions[0].review), 0)

@@ -131,2 +131,5 @@ def test_review_message_sort(self):

def setUp(self):
project_dir = f"{self.cwd}/tests/tmp/"
if os.path.exists(project_dir):
shutil.rmtree(project_dir)
os.makedirs(f"{self.cwd}/tests/tmp/project")

@@ -150,2 +153,16 @@ os.chdir(f"{self.cwd}/tests/tmp/project")

# def test_show_review_no_review(self):
# """Check that the CLI can show a review report with no reviews"""
# with patch("asimov.current_ledger", new=YAMLLedger(".asimov/ledger.yml")):
# reload(asimov)
# reload(review)
# runner = CliRunner()
# for event in EVENTS:
# result = runner.invoke(review.review,
# ['status', event])
# print(result.output)
# self.assertTrue(f"No review information exists for this production." in result.output)
def test_add_review_to_event_reject(self):

@@ -187,14 +204,2 @@ """Check that the CLI can add an event review"""

def test_show_review_no_review(self):
"""Check that the CLI can show a review report with no reviews"""
with patch("asimov.current_ledger", new=YAMLLedger(".asimov/ledger.yml")):
reload(asimov)
reload(review)
runner = CliRunner()
for event in EVENTS:
result = runner.invoke(review.review,
['status', event])
self.assertTrue(f"No review information exists for this production." in result.output)
def test_show_review_with_review(self):

@@ -201,0 +206,0 @@ """Check that the CLI can show a review report with no reviews"""

@@ -25,3 +25,3 @@ """Test that yaml event formats are parsed and written correctly."""

productions:
- Prod0:
- name: Prod0
pipeline: lalinference

@@ -41,3 +41,3 @@ comment: PSD production

productions:
- Prod0:
- name: Prod0
comment: PSD production

@@ -56,3 +56,3 @@ status: wait

productions:
- Prod0:
- name: Prod0
comment: PSD production

@@ -93,6 +93,6 @@ status: wait

def test_no_name_error(self):
"""Check an exception is raised if the event name is missing."""
with self.assertRaises(asimov.event.DescriptionException):
asimov.event.Event.from_yaml(BAD_YAML.format(self.cwd))
# def test_no_name_error(self):
# """Check an exception is raised if the event name is missing."""
# with self.assertRaises(asimov.event.DescriptionException):
# asimov.event.Event.from_yaml(BAD_YAML.format(self.cwd))

@@ -127,4 +127,4 @@ class ProductionTests(unittest.TestCase):

production = dict(status="wait", pipeline="lalinference")
with self.assertRaises(TypeError):
asimov.event.Production.from_dict(production, event=self.event)
with self.assertRaises(ValueError):
asimov.event.Production.from_dict(production, subject=self.event)

@@ -134,4 +134,4 @@ def test_missing_pipeline(self):

production = {"S000000x": dict(status="wait")}
with self.assertRaises(asimov.event.DescriptionException):
asimov.event.Production.from_dict(production, event=self.event)
with self.assertRaises(ValueError):
asimov.event.Production.from_dict(production, subject=self.event)

@@ -141,4 +141,4 @@ def test_missing_status(self):

production = {"S000000x": dict(pipeline="lalinference")}
with self.assertRaises(asimov.event.DescriptionException):
asimov.event.Production.from_dict(production, event=self.event)
with self.assertRaises(ValueError):
asimov.event.Production.from_dict(production, subject=self.event)

@@ -158,14 +158,14 @@ def test_production_prior_read(self):

productions:
- Prod0:
comment: PSD production
pipeline: lalinference
priors:
q: [0.0, 0.05]
status: wait
- Prod1:
comment: PSD production
pipeline: lalinference
priors:
q: [0.0, 1.0]
status: wait
- name: Prod0
comment: PSD production
pipeline: lalinference
priors:
q: [0.0, 0.05]
status: wait
- name: Prod1
comment: PSD production
pipeline: lalinference
priors:
q: [0.0, 0.8]
status: wait
"""

@@ -179,4 +179,4 @@ event = asimov.event.Event.from_yaml(

self.assertEqual(prod0.meta['priors']['q'][1], 0.05)
self.assertEqual(prod1.priors['q'][1], 1.00)
self.assertEqual(prod1.meta['priors']['q'][1], 1.00)
self.assertEqual(prod1.priors['q'][1], 0.8)
self.assertEqual(prod1.meta['priors']['q'][1], 0.8)

@@ -196,14 +196,14 @@ def test_production_prior_preserved(self):

productions:
- Prod0:
comment: PSD production
pipeline: lalinference
priors:
- name: Prod0
comment: PSD production
pipeline: lalinference
priors:
q: [0.0, 0.05]
status: wait
- Prod1:
comment: PSD production
pipeline: lalinference
priors:
status: wait
- name: Prod1
comment: PSD production
pipeline: lalinference
priors:
q: [0.0, 1.0]
status: wait
status: wait
"""

@@ -215,2 +215,3 @@ event = asimov.event.Event.from_yaml(

event_YAML = event.to_yaml()
event2 = asimov.event.Event.from_yaml(event_YAML, ledger=self.ledger)

@@ -217,0 +218,0 @@ prod0 = event.productions[0]

"""
Code for interacting with a gitlab instance.
"""
import datetime
import time
import configparser
import gitlab
import yaml
from liquid import Liquid
from asimov import config
from .event import Event
from .ledger import Ledger
STATE_PREFIX = "C01"
class GitlabLedger(Ledger):
"""
Connect to a gitlab-based issue tracker.
"""
def __init__(
self, configs, milestone=None, subset=None, update=False, repo=True, label=None
):
"""
Search through a repository's issues and find all of the ones
for events.
"""
_, self.repository = self._connect_gitlab()
if subset == [None]:
subset = None
if not label:
event_label = config.get("gitlab", "event_label")
else:
event_label = label
try:
sleep_time = int(config.get("gitlab", "rest_time"))
except configparser.NoOptionError:
sleep_time = 30
issues = self.repository.issues.list(labels=[event_label], per_page=1000)
output = []
if subset:
for issue in issues:
if issue.title in subset:
output += [EventIssue(issue, self.repository, update, repo=repo)]
if update:
time.sleep(sleep_time)
else:
for issue in issues:
output += [EventIssue(issue, self.repository, update, repo=repo)]
if update:
time.sleep(sleep_time)
self.data["events"] = output
self.events = {ev["name"]: ev for ev in self.data["events"]}
def _connect_gitlab(self):
"""
Connect to the gitlab server.
Returns
-------
server : `Gitlab`
The gitlab server.
repository: `Gitlab.project`
The gitlab project.
"""
server = gitlab.gitlab.Gitlab(
config.get("gitlab", "server"), private_token=config.get("gitlab", "token")
)
repository = server.projects.get(config.get("gitlab", "tracking_repository"))
return server, repository
def get_event(self, event=None):
if event:
return self.events[event]
else:
return self.events.values()
@classmethod
def add_event(self, event_object, issue_template=None):
"""
Create an issue for an event.
"""
if not issue_template:
from pkg_resources import resource_filename
issue_template = resource_filename("asimov", "gitlabissue.md")
with open(issue_template, "r") as template_file:
liq = Liquid(template_file.read())
rendered = liq.render(
event_object=event_object, yaml=event_object.to_yaml()
)
self.repository.issues.create(
{"title": event_object.name, "description": rendered}
)
def update_event(self, event):
event.update_data()
def save(self):
pass
@classmethod
def create(cls):
raise NotImplementedError(
"You must create a gitlab issue tracker manually first."
)
pass
class EventIssue(Event):
"""
Use an issue on the gitlab issue tracker to
hold variable data for the program.
Parameters
----------
issue : `gitlab.issue`
The issue which represents the event.
update : bool
Flag to determine if the git repository is updated
when it is loaded. Defaults to False to prevent
excessive load on the git server.
"""
def __init__(self, issue, repository, update=False, repo=True):
self.from_issue(self, issue, update, repo)
self.issue_object = issue
self.title = issue.title
self.text = issue.description
self.issue_id = issue.id
self.labels = issue.labels
self.data = self.parse_notes()
def from_issue(self, issue, update=False, repo=True):
"""
Parse an issue description to generate this event.
Parameters
----------
update : bool
Flag to determine if the repository is updated when loaded.
Defaults to False.
"""
text = issue.text.split("---")
event = self.from_yaml(text[1], issue, update=update, repo=repo)
event.text = text
# event.from_notes()
return event
def _refresh(self):
if self.repository:
self.issue_object = self.repository.issues.get(self.issue_object.iid)
self.text = self.issue_object.description.split("---")
else:
pass
@property
def state(self):
"""
Get the state of the event's runs.
"""
for label in self.issue_object.labels:
if f"{STATE_PREFIX}::" in label:
return label[len(STATE_PREFIX) + 2:]
return None
@state.setter
def state(self, state):
"""
Set the event state.
"""
self._refresh()
for label in self.issue_object.labels:
if f"{STATE_PREFIX}::" in label:
# Need to remove all of the other scoped labels first.
self.issue_object.labels.remove(label)
self.issue_object.labels += ["{}::{}".format(STATE_PREFIX, state)]
self.issue_object.save()
def add_note(self, text):
"""
Add a comment to the event issue.
A footer will be added to identify this as being created by the
supervisor and not the user.
"""
self._refresh()
now = datetime.datetime.now()
header = """"""
footer = f"""\nAdded at {now:%H}:{now:%M}, {now:%Y}-{now:%m}-{now:%d} by the run supervision robot :robot:."""
self.issue_object.notes.create({"body": header + text + footer})
self.issue_object.save()
def add_label(self, label, state=True):
"""
Add a new label to an event issue.
Parameters
----------
label : str
The name of the label.
"""
self._refresh()
if state:
self.issue_object.labels += [f"{STATE_PREFIX}:{label}"]
else:
self.issue_object.labels += [f"{label}"]
self.issue_object.save()
def update_data(self):
"""
Store event data in the comments on the event repository.
"""
self._refresh()
self.issue_object.description = self.event_object.to_issue()
self.issue_object.save()
def parse_notes(self):
"""
Read issue information from the comments on the issue.
Only notes which start
```
# Run information
```
will be parsed.
"""
data = {}
notes = self.issue_object.notes.list(per_page=200)
note_data = []
for note in reversed(notes):
if "---\n" in note.body:
data = note.body.split("---")
if len(data) > 0:
data = data[1]
else:
continue
data = yaml.safe_load(data)
note_data.append(data)
return note_data
---------------------
The production ledger
---------------------
In order to construct and monitor on-going PE jobs, asimov stores metadata for each gravitational wave event and for each PE job (or "production" in `asimov` terminology).
These metadata are stored in the event's *production ledger*, which is specified in `yaml` format.
A number of fields are required for `asimov` to correctly process an event, while a number of additional fields will allow it to perform more operations automatically, or over-ride defaults.
Normally you shouldn't need to edit the ledger file directly, but if you're setting up new analyses or want to check the configuration of a pre-existing analysis then it can be helpful to have an understanding of the quantities contained in it.
from asimov import config
from asimov.cli import connect_gitlab, ACTIVE_STATES, known_pipelines
from asimov import gitlab
from asimov.condor import CondorJobList, CondorJob
import yaml
# server, repository = connect_gitlab()
# events = gitlab.find_events(repository,
# #milestone=config.get("olivaw", "milestone"),
# subset=["GW200115A"],
# update=False,
# repo=True)
ledger = "/home/pe.o3/public_html/LVC/o3a-final/ledger.yaml"
with open(ledger, "r") as filehandle:
events = yaml.safe_load(filehandle.read())
jobs = CondorJobList()
#for job in jobs.jobs.values():
# print(job)
for event in events:
productions = {}
for prod in event['productions']:
productions.update(prod)
for name, production in productions.items():
if production['status'] == "running":
print(event['name'], name)
if not "job id" in production:
print("Job ID is missing")
continue
if production['job id'] in jobs.jobs:
print(jobs.jobs[production['job id']])
else:
print("\t Job is not running")
# on_deck = [production
# for production in event.productions
# if production.status.lower() in ACTIVE_STATES]
# for production in on_deck:
# print(production.name)
# print(production.meta['job id'] in jobs.jobs.keys())