diff --git a/setup.py b/setup.py index 91ea4695f..9e908294d 100644 --- a/setup.py +++ b/setup.py @@ -80,6 +80,7 @@ "pandda_xchem = dlstbx.wrapper.pandda_xchem:PanDDAWrapper", "pipedream_xchem = dlstbx.wrapper.pipedream_xchem:PipedreamWrapper", "phaser_ellg = dlstbx.wrapper.phaser_ellg:PhasereLLGWrapper", + "ligandrestraints = dlstbx.wrapper.ligandrestraints:LigandRestraintsWrapper", "rlv = dlstbx.wrapper.rlv:RLVWrapper", "scaleit = dlstbx.wrapper.scaleit:ScaleitWrapper", "screen19 = dlstbx.wrapper.screen19:Screen19Wrapper", diff --git a/src/dlstbx/crud.py b/src/dlstbx/crud.py index 84340d81f..02c53a9d7 100644 --- a/src/dlstbx/crud.py +++ b/src/dlstbx/crud.py @@ -19,6 +19,24 @@ def get_data_collection( return query.first() +def get_latest_dcid_for_dtag( + dtag: str, + session: sqlalchemy.orm.session.Session, +) -> int | None: + """Return the most recent dataCollectionId for the BLSample named `dtag`.""" + query = ( + session.query(models.DataCollection.dataCollectionId) + .join( + models.BLSample, + models.BLSample.blSampleId == models.DataCollection.BLSAMPLEID, + ) + .filter(models.BLSample.name == dtag) + .order_by(models.DataCollection.dataCollectionId.desc()) + ) + result = query.first() + return result.dataCollectionId if result else None + + def get_gridinfo_for_dcid( dcid: int, dcgid: int, diff --git a/src/dlstbx/services/ispybsvc.py b/src/dlstbx/services/ispybsvc.py index 630ce602e..6373f8968 100644 --- a/src/dlstbx/services/ispybsvc.py +++ b/src/dlstbx/services/ispybsvc.py @@ -12,7 +12,14 @@ import pydantic import sqlalchemy.orm import workflows.recipe -from ispyb.sqlalchemy import PDB, ProteinHasPDB +from ispyb.sqlalchemy import ( + PDB, + BLSession, + Person, + Proposal, + ProteinHasPDB, + SessionHasPerson, +) from workflows.services.common_service import CommonService import dlstbx.services.ispybsvc_buffer as buffer @@ -1016,6 +1023,70 @@ def do_retrieve_proposal_title(self, parameters, **kwargs): ) return False + def do_notify_visit_team_leader( + self, + parameters, + session: sqlalchemy.orm.Session, + rw, + transaction, + **kwargs, + ): + """Email the visit's Team Leader that XChemCollate has finished. + + Looks up the Team Leader for the given visit in ISPyB and sends a simple + notification to the mailnotification queue (consumed by the DLS Mail + Notifications / zocalo Mailer service).""" + + visit = parameters("visit") + results_directory = parameters("results_directory") + + # recipients = self._lookup_visit_team_leader_emails(session, visit) + # if not recipients: + # self.log.warning( + # "No Team Leader found for visit %s; skipping collate-finished email", + # visit, + # ) + # return {"success": True} + recipients = ["qvu59474@diamond.ac.uk"] + + message = { + "parameters": { + "recipients": recipients, + "subject": f"Auto pipeline finished for {visit}", + }, + "content": [ + f"Beamline visit processing has now finished for {visit}.\n", + f"Results are available at {results_directory}", + ], + } + self.log.info("Sending collate-finished email to %r", recipients) + rw.transport.send("mailnotification", message, transaction=transaction) + return {"success": True} + + def _lookup_visit_team_leader_emails(self, session, visit): + """Return ``@diamond.ac.uk`` for the Team Leader(s) of a visit.""" + try: + proposal, visit_number = visit.split("-") + proposal_code, proposal_number = proposal[:2], proposal[2:] + except (AttributeError, ValueError): + self.log.error("Could not parse visit %r for team leader lookup", visit) + return [] + + logins = ( + session.query(Person.login) + .join(SessionHasPerson, SessionHasPerson.personId == Person.personId) + .join(BLSession, BLSession.sessionId == SessionHasPerson.sessionId) + .join(Proposal, Proposal.proposalId == BLSession.proposalId) + .filter( + Proposal.proposalCode == proposal_code, + Proposal.proposalNumber == proposal_number, + BLSession.visit_number == visit_number, + SessionHasPerson.role == "Team Leader", + ) + .all() + ) + return [f"{login}@diamond.ac.uk" for (login,) in logins if login] + def do_insert_phasing_analysis_results(self, parameters, **kwargs): """Write phasing results to ISPyB""" diff --git a/src/dlstbx/services/trigger_xchem.py b/src/dlstbx/services/trigger_xchem.py index ac634b8f8..0b93f246f 100644 --- a/src/dlstbx/services/trigger_xchem.py +++ b/src/dlstbx/services/trigger_xchem.py @@ -17,7 +17,6 @@ import sqlalchemy.engine import sqlalchemy.orm import workflows.recipe -import yaml from ispyb.sqlalchemy import ( AutoProc, AutoProcProgram, @@ -25,21 +24,22 @@ AutoProcScaling, AutoProcScalingStatistics, BLSample, - BLSession, Container, Crystal, DataCollection, ProcessingJob, ProcessingJobParameter, Proposal, + Protein, ) from sqlalchemy import or_ from workflows.services.common_service import CommonService import dlstbx.ispybtbx -from dlstbx.crud import get_protein_for_dcid +from dlstbx.crud import get_latest_dcid_for_dtag, get_protein_for_dcid from dlstbx.util import ChainMapWithReplacement from dlstbx.util.prometheus_metrics import BasePrometheusMetrics, NoMetrics +from dlstbx.util.soakdb import find_xchem_visit_dir class PrometheusMetrics(BasePrometheusMetrics): @@ -51,7 +51,7 @@ def create_metrics(self): ) -class PanDDAParameters(pydantic.BaseModel): +class ModelBuildingParameters(pydantic.BaseModel): dcid: int = pydantic.Field(gt=0) comparator_threshold: int = pydantic.Field(default=350) automatic: Optional[bool] = False @@ -62,10 +62,24 @@ class PanDDAParameters(pydantic.BaseModel): backoff_max_try: int = pydantic.Field(default=10, alias="backoff-max-try") backoff_multiplier: float = pydantic.Field(default=2, alias="backoff-multiplier") pipedream: Optional[bool] = True - reprocessing: Optional[bool] = False + overwrite: Optional[bool] = False + bulk_array: Optional[bool] = None -class XChemCollate_Parameters(pydantic.BaseModel): +class HitIndentificationParameters(pydantic.BaseModel): + dcid: int = pydantic.Field(gt=0) + xchem_visit_dir: str + comparator_threshold: int = pydantic.Field(default=300) + automatic: Optional[bool] = False + comment: Optional[str] = None + scaling_id: list[int] + timeout: float = pydantic.Field(default=180, alias="timeout-minutes") + pipedream: Optional[bool] = True + overwrite: Optional[bool] = False + bulk_array: Optional[bool] = False + + +class CollateParameters(pydantic.BaseModel): dcid: int = pydantic.Field(gt=0) program_id: int = pydantic.Field(gt=0) automatic: Optional[bool] = False @@ -77,7 +91,7 @@ class XChemCollate_Parameters(pydantic.BaseModel): backoff_max_try: int = pydantic.Field(default=10, alias="backoff-max-try") backoff_multiplier: float = pydantic.Field(default=2, alias="backoff-multiplier") pipedream: Optional[bool] = False - reprocessing: Optional[bool] = False + overwrite: Optional[bool] = False class DLSTriggerXChem(CommonService): @@ -198,63 +212,57 @@ def upsert_proc(self, rw, dcid, procname, recipe_parameters): self.log.info(f"{procname}_id trigger: Processing job {jobid} triggered") @pydantic.validate_call(config={"arbitrary_types_allowed": True}) - def trigger_pandda_xchem( + def trigger_modelbuilding( self, rw: workflows.recipe.RecipeWrapper, *, message: Dict, - parameters: PanDDAParameters, + parameters: ModelBuildingParameters, session: sqlalchemy.orm.session.Session, transaction: int, **kwargs, ): - """Trigger a PanDDA job for an XChem fragment screening experiment. - Trigger uses the 'final.pdb' and 'final.mtz' files which are output from the - upstream DIMPLE job - Recipe parameters are described below with appropriate ispyb placeholder "{}" - values: - - target: set this to "pandda_xchem" - - dcid: the dataCollectionId for the given data collection i.e. "{ispyb_dcid}" - - pdb: the output pdb from dimple i.e. "{ispyb_results_directory}/dimple/final.pdb" - - mtz: the output mtz from dimple i.e. "{ispyb_results_directory}/dimple/final.mtz" - - comparator_threshold: the minimum number of comparator datasets needed to begin PanDDA - - comment: a comment to be stored in the ProcessingJob.comment field - - timeout-minutes: (optional) the max time (in minutes) allowed to wait for - processing PanDDA jobs - - automatic: boolean value passed to ProcessingJob.automatic field - Example recipe parameters: - { "target": "pandda_xchem", - "dcid": 123456, - "comparator_threshold": 300, - "scaling_id": [123456], - "automatic": true, - "comment": "PanDDA2 triggered by dimple", - "timeout-minutes": 60, - } + """Select a dimple model to take forward and stage a dataset for the + downstream hit-identification pipelines. + + Waits (with exponential backoff) for related upstream pipelines and + dimple jobs to finish, then selects the 'best' dataset by + I/sigI * completeness * #unique-reflections, preferring those cases + processed in the user-defined spacegroup and the most recent processing + batch. Reads the soakDB for ligand info, skipping DMSO solvent screens + and crystals with no CompoundSMILES. + + Copies the chosen dimple files into the shared model_building directory, + writes the ligand .smiles file, and fires a single ligand-restraints job + (grade2 default) per dcid. On success that recipe sends control to + trigger_hitidentification. """ dcid = parameters.dcid scaling_id = parameters.scaling_id[0] comparator_threshold = parameters.comparator_threshold pipedream = parameters.pipedream - reprocessing = parameters.reprocessing + overwrite = parameters.overwrite + bulk_array = parameters.bulk_array protein_info = get_protein_for_dcid(parameters.dcid, session) # protein_id = getattr(protein_info, "proteinId") proposal_id = getattr(protein_info, "proposalId") acronym = getattr(protein_info, "acronym") + # TEMPORARY PROPOSAL FILTER + ALLOWED_PROPOSALS = ["lb42888", "sw44043", "sw44107", "lb36049"] + PROPOSAL_ALIASES = {"mx41448": "lb42888"} + query = (session.query(Proposal)).filter(Proposal.proposalId == proposal_id) proposal = query.first() proposal_code = proposal.proposalCode proposal_number = proposal.proposalNumber - proposal_string = proposal_code + proposal_number - - # TEMPORARY PROPOSAL FILTER - allowed_proposals = ["lb42888", "sw44043", "sw44107", "lb36049"] + data_proposal = proposal_code + proposal_number + proposal_string = PROPOSAL_ALIASES.get(data_proposal, data_proposal) # 0. Check that this is an XChem expt & locate .SQLite database - if proposal_string not in allowed_proposals: + if proposal_string not in ALLOWED_PROPOSALS: self.log.debug( f"Not triggering PanDDA2 pipeline for dcid={dcid} proposal {proposal_string}" ) @@ -273,6 +281,14 @@ def trigger_pandda_xchem( location = int(query.one()[1]) container_code = query.one()[2] + # Check for crystal recollections + latest_dcid = get_latest_dcid_for_dtag(dtag, session) + if latest_dcid and latest_dcid != dcid: + self.log.info( + f"Exiting PanDDA2/Pipedream trigger: dcid {dcid} is not the latest for dtag {dtag}; Recollection underway?" + ) + return {"success": True} + # Get the user defined spacegroup query = ( session.query(Crystal.spaceGroup) @@ -285,106 +301,20 @@ def trigger_pandda_xchem( # Find corresponding XChem visit directory and database xchem_dir = pathlib.Path(f"/dls/labxchem/data/{proposal_string}") - yaml_files = [] # user settings - match_dirs = [] # labxchem visit - - for subdir in xchem_dir.iterdir(): - user_yaml = subdir / ".user.yaml" - if user_yaml.exists(): - yaml_files.append(user_yaml) - - for yaml_file in yaml_files: - with open(yaml_file, "r") as file: - expt_yaml = yaml.load(file, Loader=yaml.SafeLoader) - - acr = expt_yaml["data"]["acronym"] - directory = yaml_file.parents[0] - if acr == acronym: - match_dirs.append(directory) - # match_yaml = expt_yaml - self.log.info(f"Found user yaml for dtag {dtag} at {yaml_file}") - - # account for potentially multiple labxchem visits for a single target in proposal - if len(match_dirs) == 1: - match_dir = match_dirs[0] - elif len(match_dirs) > 1: - for path in match_dirs: - try: - db_path = str( - path / "processing/database" / "soakDBDataFile.sqlite" - ) - conn = sqlite3.connect( - f"file:{db_path}?mode=ro", uri=True, timeout=10 - ) - df = pd.read_sql_query( - f"SELECT * from mainTable WHERE Puck = '{container_code}' AND PuckPosition = {location} AND CrystalName = '{dtag}'", - conn, - ) - conn.close() - - if not df.empty: - match_dir = path - self.log.info(f"labxchem visit {path} found for dtag {dtag}") - break - - except Exception as e: - self.log.info( - f"Exception whilst reading ligand information from {db_path} for dtag {dtag}, dcid {dcid}: {e}" - ) - - if "match_dir" not in locals(): - self.log.info( - f"No user yaml found in {xchem_dir}, proceeding with default settings..." - ) - # Try reading from SoakDB .sqlite - for subdir in xchem_dir.iterdir(): - if (subdir / ".user.yaml").exists(): - continue - try: - db_path = str( - subdir / "processing/database" / "soakDBDataFile.sqlite" - ) - con = sqlite3.connect( - f"file:{db_path}?mode=ro", uri=True, timeout=10 - ) - cur = con.cursor() - cur.execute("SELECT Protein FROM soakDB") - name = cur.fetchone()[0] - con.close() - - if name is not None: - # visit = dir.parts[-1] - expt_yaml = {} - expt_yaml["data"] = {"acronym": name} - with open(subdir / ".user.yaml", "w") as f: - yaml.dump(expt_yaml, f) - - if name == acronym: - match_dir = subdir - # match_yaml = expt_yaml - - except Exception as e: - self.log.info(f"Problem reading .sqlite database for {subdir}: {e}") - - if "match_dir" not in locals(): + xchem_visit_dir = find_xchem_visit_dir( + xchem_dir, acronym, container_code, location, dtag, self.log + ) + + if xchem_visit_dir is None: self.log.debug( f"Exiting PanDDA2/Pipedream trigger: No labxchem directory found for {acronym}." ) return {"success": True} - else: - xchem_visit_dir = match_dir - processing_dir = xchem_visit_dir / "processing" - # user_settings = match_yaml["autoprocessing"] - if xchem_visit_dir: - self.log.debug( - f"Found a corresponding .sqlite database in XChem visit {xchem_visit_dir} for target {acronym}." - ) - else: - self.log.debug( - f"Exiting PanDDA2/Pipedream trigger: can't find .sqlite database in XChem visit {xchem_dir} for target {acronym}." - ) - return {"success": True} + processing_dir = xchem_visit_dir / "processing" + self.log.debug( + f"Found a corresponding .sqlite database in XChem visit {xchem_visit_dir} for target {acronym}." + ) # 1. Trigger when all upstream pipelines & related dimple jobs have finished program_list = [ @@ -690,24 +620,25 @@ def trigger_pandda_xchem( ) return {"success": True} - # 3. Create dataset directory structure + # 3. Create dataset directory structure (single shared model_building dir) auto_dir = processing_dir / "auto" analysis_dir = auto_dir / "analysis" - pandda_dir = analysis_dir / "pandda2" - model_dir = pandda_dir / "model_building" + model_dir = analysis_dir / "model_building" dataset_dir = model_dir / dtag compound_dir = dataset_dir / "compound" self.log.info(f"Creating directory {dataset_dir}") - if not reprocessing: + if not overwrite: try: compound_dir.mkdir(parents=True, exist_ok=False) except FileExistsError: self.log.info( - f"Exiting PanDDA2/Pipedream trigger: {dataset_dir} already exists" + f"Exiting model_building trigger: {dataset_dir} already exists" ) return {"success": True} + else: + compound_dir.mkdir(parents=True, exist_ok=True) # Copy the dimple files of the selected dataset shutil.copy(pdb, str(dataset_dir / "dimple.pdb")) @@ -717,76 +648,149 @@ def trigger_pandda_xchem( with open(compound_dir / f"{CompoundCode}.smiles", "w") as smi_file: smi_file.write(CompoundSMILES) - # Create seperate pipedream directory - if pipedream: - pipedream_dir = analysis_dir / "pipedream" - model_dir_pd = pipedream_dir / "model_building" - dataset_dir_pd = model_dir_pd / dtag - compound_dir_pd = dataset_dir_pd / "compound" - self.log.info(f"Creating directory {dataset_dir_pd}") - pathlib.Path(compound_dir_pd).mkdir(parents=True, exist_ok=True) - shutil.copy(pdb, str(dataset_dir_pd / "dimple.pdb")) - shutil.copy(mtz, str(dataset_dir_pd / "dimple.mtz")) - shutil.copy(upstream_mtz, str(dataset_dir_pd / f"{dtag}.free.mtz")) - - with open(compound_dir_pd / f"{CompoundCode}.smiles", "w") as smi_file: - smi_file.write(CompoundSMILES) - - # 4. Job launch logic + # 4. Fire a single ligand-restraints job; it will trigger hitidentification on success recipe_parameters = { "dcid": dcid, "xchem_visit_dir": str(xchem_visit_dir), "processing_directory": str(processing_dir), "model_directory": str(model_dir), "dtag": dtag, - "n_datasets": 1, "scaling_id": scaling_id, "comparator_threshold": comparator_threshold, "database_path": str(db_master), "upstream_mtz": pathlib.Path(upstream_mtz).parts[-1], - "smiles": str(CompoundSMILES), "pipedream": pipedream, - "reprocessing": reprocessing, + "overwrite": overwrite, } - dataset_list = sorted([p.parts[-1] for p in model_dir.iterdir() if p.is_dir()]) - dataset_count = sum(1 for p in model_dir.iterdir() if p.is_dir()) - self.log.info(f"Dataset count is: {dataset_count}") + self.log.info(f"Launching ligand-restraints for dtag {dtag} (dcid {dcid})") + self.upsert_proc(rw, dcid, "Grade2", recipe_parameters) + return {"success": True} - if dataset_count < comparator_threshold: - self.log.info( - f"{dataset_count} < comparator dataset threshold of {comparator_threshold}, skipping PanDDA2 for now..." - ) + @pydantic.validate_call(config={"arbitrary_types_allowed": True}) + def trigger_hitidentification( + self, + rw: workflows.recipe.RecipeWrapper, + *, + message: Dict, + parameters: HitIndentificationParameters, + session: sqlalchemy.orm.session.Session, + transaction: int, + **kwargs, + ): + """Launch PanDDA2 / Pipedream once restraints for this dcid are ready. - if pipedream: - self.log.info(f"Launching Pipedream for dtag {dtag}") - self.upsert_proc(rw, dcid, "Pipedream", recipe_parameters) + Records the current dcid and its dtag in model_dir/.batch_dcids.json as a + {dcid: dtag} map, so dtags can be read back at threshold without + re-querying ispyb. Pipedream fires for the current dcid on every call. + PanDDA2 is gated by the count of recorded dcids vs. comparator_threshold: + below threshold → skip; at threshold → fire one per-dcid PanDDA2 job for + each recorded dcid; above threshold → single PanDDA2 for the current dtag. + + bulk_array=True: iterate model_dir directly, write the dataset list to + .bulk_array.json, and fire one array job over dtags in model_building. + """ + dcid = parameters.dcid + scaling_id = parameters.scaling_id[0] + comparator_threshold = parameters.comparator_threshold + pipedream = parameters.pipedream + overwrite = parameters.overwrite + bulk_array = parameters.bulk_array + + # Re-derive paths from labxchem visit parameter + xchem_visit_dir = pathlib.Path(parameters.xchem_visit_dir) + processing_dir = xchem_visit_dir / "processing" + model_dir = processing_dir / "auto" / "analysis" / "model_building" + db_master = processing_dir / "database" / "soakDBDataFile.sqlite" + + # Resolve dtag for the current dcid + query = ( + session.query(BLSample.name) + .join(DataCollection, BLSample.blSampleId == DataCollection.BLSAMPLEID) + .filter(DataCollection.dataCollectionId == dcid) + ) + row = query.first() + if not row: + self.log.info(f"Exiting hitidentification trigger: no BLSample for {dcid}") return {"success": True} + dtag = row[0] - elif dataset_count == comparator_threshold: - recipe_parameters["n_datasets"] = len(dataset_list) + recipe_parameters = { + "dcid": dcid, + "xchem_visit_dir": str(xchem_visit_dir), + "processing_directory": str(processing_dir), + "model_directory": str(model_dir), + "dtag": dtag, + "n_datasets": 1, + "scaling_id": scaling_id, + "comparator_threshold": comparator_threshold, + "database_path": str(db_master), + "pipedream": pipedream, + "overwrite": overwrite, + } - with open(model_dir / ".batch.json", "w") as f: + if bulk_array: + dataset_list = sorted( + [p.parts[-1] for p in model_dir.iterdir() if p.is_dir()] + ) + dataset_count = len(dataset_list) + recipe_parameters["n_datasets"] = dataset_count + with open(model_dir / ".bulk_array.json", "w") as f: json.dump(dataset_list, f) - # cannot pass as ispyb_parameter - self.log.info( - f"{dataset_count} = comparator dataset threshold of {comparator_threshold}, launching PanDDA2 array job" + f"bulk_array=True, launching PanDDA2 array job over {dataset_count} datasets" ) - self.upsert_proc(rw, dcid, "PanDDA2", recipe_parameters) - + self.upsert_proc(rw, dcid, "PanDDA2-array", recipe_parameters) if pipedream: self.log.info(f"Launching Pipedream for dtag {dtag}") - self.upsert_proc(rw, dcid, "Pipedream", recipe_parameters) + self.upsert_proc(rw, dcid, "Pipedream-array", recipe_parameters) + return {"success": True} - elif dataset_count > comparator_threshold: - self.log.info(f"Launching single PanDDA2 job for dtag {dtag}") - self.upsert_proc(rw, dcid, "PanDDA2", recipe_parameters) + # Record this dcid and its dtag in the hidden gating json, so the dtag + # can be read back at threshold without re-querying ispyb. JSON keys are + # strings, so dcids round-trip as str. + dcids_file = model_dir / ".batch_dcids.json" + if dcids_file.exists(): + with open(dcids_file, "r") as f: + recorded_dcids = json.load(f) + else: + recorded_dcids = {} + if str(dcid) not in recorded_dcids: + recorded_dcids[str(dcid)] = dtag + with open(dcids_file, "w") as f: + json.dump(recorded_dcids, f) - if pipedream: - self.log.info(f"Launching Pipedream for dtag {dtag}") - self.upsert_proc(rw, dcid, "Pipedream", recipe_parameters) + dataset_count = len(recorded_dcids) + self.log.info(f"Recorded waiting dcid count is: {dataset_count}") + if pipedream: + self.log.info(f"Launching Pipedream for dtag {dtag}") + self.upsert_proc(rw, dcid, "Pipedream", recipe_parameters) + + if dataset_count < comparator_threshold: + self.log.info( + f"{dataset_count} < comparator dataset threshold of {comparator_threshold}, skipping PanDDA2 for now..." + ) + return {"success": True} + + if dataset_count == comparator_threshold: + self.log.info( + f"{dataset_count} = comparator dataset threshold of {comparator_threshold}, launching per-dcid PanDDA2 jobs" + ) + for batch_dcid, batch_dtag in recorded_dcids.items(): + batch_dcid = int(batch_dcid) + batch_params = { + **recipe_parameters, + "dcid": batch_dcid, + "dtag": batch_dtag, + "n_datasets": 1, + } + self.upsert_proc(rw, batch_dcid, "PanDDA2", batch_params) + return {"success": True} + + # dataset_count > comparator_threshold + self.log.info(f"Launching single PanDDA2 job for dtag {dtag}") + self.upsert_proc(rw, dcid, "PanDDA2", recipe_parameters) return {"success": True} @pydantic.validate_call(config={"arbitrary_types_allowed": True}) @@ -795,23 +799,34 @@ def trigger_xchem_collate( rw: workflows.recipe.RecipeWrapper, *, message: Dict, - parameters: XChemCollate_Parameters, + parameters: CollateParameters, session: sqlalchemy.orm.session.Session, transaction: int, **kwargs, ): - """Trigger an XChem Collate job for an XChem fragment screening experiment. - Recipe parameters are described below with appropriate ispyb placeholder "{}" - values: + """Trigger an XChem Collate job once a target's collection run is complete. + + Gathers every dcid for this target (matched by Protein acronym) under the + proposal of the triggering dcid's visit, then gates on those jobs: + aborts if any PanDDA2/Pipedream/xia2 program newer than program_id has + started (a fresh batch is underway), and checkpoints with exponential + backoff while any of them are still running. Once processing has settled + and no other XChemCollate job is already in flight for the target, fires + a single XChemCollate job keyed on the highest dcid. + + Recipe parameters (ispyb placeholders shown as "{}"): - target: set this to "xchem_collate" - - dcid: the dataCollectionId for the given data collection i.e. "{ispyb_dcid}" - - comment: a comment to be stored in the ProcessingJob.comment field - - timeout-minutes: (optional) the max time (in minutes) allowed to wait for - processing PanDDA jobs - - automatic: boolean value passed to ProcessingJob.automatic field + - dcid: the dataCollectionId i.e. "{ispyb_dcid}" + - program_id: the AutoProcProgramId of the triggering job + - scaling_id: list of scaling ids i.e. ["{scaling_id}"] + - processing_directory: the labxchem visit processing dir + - pipedream / overwrite: forwarded to the collate wrapper + - comment: stored in the ProcessingJob.comment field + - automatic: boolean passed to ProcessingJob.automatic Example recipe parameters: { "target": "xchem_collate", "dcid": 123456, + "program_id": 123456, "scaling_id": [123456], "processing_directory": '/dls/labxchem/data/lb42888/lb42888-1/processing', "automatic": true, @@ -822,7 +837,7 @@ def trigger_xchem_collate( program_id = parameters.program_id scaling_id = parameters.scaling_id[0] processing_directory = pathlib.Path(parameters.processing_directory) - # reprocessing = parameters.reprocessing + overwrite = parameters.overwrite pipedream = parameters.pipedream _, ispyb_info = dlstbx.ispybtbx.ispyb_filter({}, {"ispyb_dcid": dcid}, session) @@ -833,20 +848,31 @@ def trigger_xchem_collate( protein_info = get_protein_for_dcid(parameters.dcid, session) acronym = getattr(protein_info, "acronym") - # get all dcids for the visit - query = ( - session.query(Proposal, BLSession, DataCollection) - .join(BLSession, BLSession.proposalId == Proposal.proposalId) - .join(DataCollection, DataCollection.SESSIONID == BLSession.sessionId) + # get the dcids for the protein target under the current proposal + dcids = [ + row[0] + for row in session.query(DataCollection.dataCollectionId) + .join(BLSample, BLSample.blSampleId == DataCollection.BLSAMPLEID) + .join(Crystal, Crystal.crystalId == BLSample.crystalId) + .join(Protein, Protein.proteinId == Crystal.proteinId) + .join(Proposal, Proposal.proposalId == Protein.proposalId) + .join( + ProcessingJob, + ProcessingJob.dataCollectionId == DataCollection.dataCollectionId, + ) + .join( + AutoProcProgram, + AutoProcProgram.processingJobId == ProcessingJob.processingJobId, + ) .filter(Proposal.proposalCode == visit_proposal[0:2]) .filter(Proposal.proposalNumber == visit_proposal[2::]) - .filter(BLSession.visit_number == visit_number) - ) + .filter(Protein.acronym == acronym) + .distinct() + .all() + ] - df = pd.read_sql(query.statement, query.session.bind) - dcids = df["dataCollectionId"].tolist() - - # trigger on the final PanDDA/Pipedream program_id for the visit + # trigger on the final PanDDA/Pipedream program_id from the current + # processing batch query = ( ( session.query(AutoProcProgram, ProcessingJob.dataCollectionId).join( @@ -855,7 +881,11 @@ def trigger_xchem_collate( ) ) .filter(ProcessingJob.dataCollectionId.in_(dcids)) - .filter(AutoProcProgram.processingPrograms.in_(["PanDDA2", "Pipedream"])) + .filter( + AutoProcProgram.processingPrograms.in_( + ["xia2 dials", "PanDDA2", "Pipedream"] + ) + ) .filter(AutoProcProgram.autoProcProgramId > program_id) # noqa E711 ) @@ -865,7 +895,7 @@ def trigger_xchem_collate( ) return {"success": True} - # has processing finished for the current visit? checkpoint if not + # has processing finished? checkpoint if not min_start_time = datetime.now() - timedelta(hours=8) query = ( ( @@ -967,6 +997,7 @@ def trigger_xchem_collate( "processing_directory": str(processing_directory), "scaling_id": scaling_id, "pipedream": pipedream, + "overwrite": overwrite, } # Upsert on max dcid self.upsert_proc(rw, max(dcids), "XChem-Collate", recipe_parameters) diff --git a/src/dlstbx/util/pandda.py b/src/dlstbx/util/pandda.py new file mode 100644 index 000000000..9eae51a96 --- /dev/null +++ b/src/dlstbx/util/pandda.py @@ -0,0 +1,223 @@ +from __future__ import annotations + +import gemmi +import numpy as np +import yaml + +PROTEIN_RESIDUES = [ + "ALA", + "ARG", + "ASN", + "ASP", + "CYS", + "GLN", + "GLU", + "HIS", + "ILE", + "LEU", + "LYS", + "MET", + "PHE", + "PRO", + "SER", + "THR", + "TRP", + "TYR", + "VAL", + "GLY", +] + + +def save_xmap(xmap, xmap_file): + """Convenience script for saving ccp4 files.""" + ccp4 = gemmi.Ccp4Map() + ccp4.grid = xmap + ccp4.update_ccp4_header() + ccp4.write_ccp4_map(str(xmap_file)) + + +def read_pandda_map(xmap_file): + """PanDDA 2 maps are often truncated, and PanDDA 1 maps can have misasigned + spacegroups. This method handles both.""" + dmap_ccp4 = gemmi.read_ccp4_map(str(xmap_file), setup=False) + dmap_ccp4.grid.spacegroup = gemmi.find_spacegroup_by_name("P1") + dmap_ccp4.setup(0.0) + dmap = dmap_ccp4.grid + return dmap + + +def map_sigma(xmap, sigma=1): + ccp4 = gemmi.read_ccp4_map(str(xmap)) + ccp4.setup(0.0) + grid = ccp4.grid + grid_array = np.array(grid, copy=False) + non_zero_std = np.std(grid_array[(grid_array < -0.05) | (grid_array > 0.05)]) + return non_zero_std * sigma + + +def mask_map(dmap, coord, radius=10.0): + """Simple routine to mask density to region around a specified point.""" + mask = gemmi.FloatGrid(dmap.nu, dmap.nv, dmap.nw) + mask.set_unit_cell(dmap.unit_cell) + mask.set_points_around( + gemmi.Position(coord[0], coord[1], coord[2]), radius=radius, value=1.0 + ) + + dmap_array = np.array(dmap, copy=False) + dmap_array[:, :, :] = dmap_array[:, :, :] * np.array(mask)[:, :, :] + + return dmap + + +def remove_nearby_atoms(pdb_file, coord, radius, pandda_model): + """An inelegant method for removing residues near the event centroid and + creating a new, truncated pdb file. GEMMI doesn't have a super nice way to + remove residues according to a specific criteria.""" + st = gemmi.read_structure(str(pdb_file)) + new_st = st.clone() # Clone to keep metadata + + coord_array = np.array([coord[0], coord[1], coord[2]]) + + # Delete all residues for a clean chain. Yes this is an arcane way to do it. + chains_to_delete = [] + for model in st: + for chain in model: + chains_to_delete.append((model.num, chain.name)) + + for model in new_st: + for chain in model: + for res in chain: + del chain[-1] + + # Add non-rejected residues to a new structure + for j, model in enumerate(st): + for k, chain in enumerate(model): + for res in chain: + add_res = True + for atom in res: + pos = atom.pos + distance = np.linalg.norm( + coord_array - np.array([pos.x, pos.y, pos.z]) + ) + if distance < radius: + add_res = False + + if add_res: + new_st[j][k].add_residue(res) + new_st.write_pdb(str(pandda_model)) + + +def remove_waters_from_ligand(pandda_model, logger=None): + st = gemmi.read_structure(str(pandda_model)) + st.setup_entities() + + LIGAND_RES_NAME = "LIG" + + # Collect ligand atoms with their VdW radii + ligand_atoms = [] # list of (pos, vdw_radius) + for chain in st[0]: + for res in chain: + if res.name == LIGAND_RES_NAME: + for atom in res: + vdw = atom.element.vdw_r + ligand_atoms.append((atom.pos, vdw)) + + max_vdw = max(r for _, r in ligand_atoms) + search_radius = max_vdw # + O_VDW=1.5 + + ns = gemmi.NeighborSearch(st[0], st.cell, search_radius).populate() + + # Find waters where any atom overlaps within VdW radii + waters_to_remove = set() + for lig_pos, lig_vdw in ligand_atoms: + for mark in ns.find_atoms(lig_pos, "\0", radius=search_radius): + cra = mark.to_cra(st[0]) + if cra.residue.entity_type != gemmi.EntityType.Water: + continue + wat_vdw = cra.atom.element.vdw_r + cutoff = lig_vdw + wat_vdw + dist = lig_pos.dist(mark.pos) + if dist < cutoff: + waters_to_remove.add((cra.chain.name, cra.residue.seqid)) + + # Remove waters + for chain in st[0]: + to_delete = [ + i + for i, res in enumerate(chain) + if res.entity_type == gemmi.EntityType.Water + and (chain.name, res.seqid) in waters_to_remove + ] + for i in reversed(to_delete): # reversed so indices stay valid + del chain[i] + + st.write_pdb(str(pandda_model.parents[0] / pandda_model.name)) + if logger: + logger.info(f"Removed {len(waters_to_remove)} waters in {pandda_model.name}") + + +def get_contact_chain(protein_st, ligand_st): + """A simple estimation of the contact chain based on which chain has the most + atoms nearby.""" + ligand_pos_list = [] + for model in protein_st: + for chain in model: + for res in chain: + for atom in res: + pos = atom.pos + ligand_pos_list.append([pos.x, pos.y, pos.z]) + centroid = np.linalg.norm(np.array(ligand_pos_list), axis=0) + + chain_counts = {} + for model in protein_st: + for chain in model: + chain_counts[chain.name] = 0 + for res in chain: + if res.name not in PROTEIN_RESIDUES: + continue + for atom in res: + pos = atom.pos + distance = np.linalg.norm( + np.array([pos.x, pos.y, pos.z]) - centroid + ) + if distance < 5.0: + chain_counts[chain.name] += 1 + + return min(chain_counts, key=lambda _x: chain_counts[_x]) + + +def merge_build(receptor, ligand, contact_chain): + # Get the receptor chain + receptor_chain = receptor[0][contact_chain] + + # Get current ligand ids + seqid_nums = [] + for receptor_res in receptor_chain: + num = receptor_res.seqid.num + seqid_nums.append(num) + + # Assign a new, unused ligand id + if len(seqid_nums) == 0: + min_ligand_seqid = 100 + else: + min_ligand_seqid = max(seqid_nums) + 100 + + # Update the ligand residue sequenceid + ligand_residue = ligand[0][0][0] + ligand_residue.seqid.num = min_ligand_seqid + + # Add the ligand residue + receptor_chain.add_residue(ligand_residue, pos=-1) + + return receptor + + +def get_pandda_settings(yaml_file): + with open(yaml_file, "r") as file: + expt_yaml = yaml.load(file, Loader=yaml.SafeLoader) + settings = expt_yaml.get("autoprocessing", {}).get("pandda", {}) + if settings: + args_string = " ".join(f"--{k}={v}" for k, v in settings.items()) + else: + args_string = "" + return args_string diff --git a/src/dlstbx/util/pipedream_xchem_helpers.py b/src/dlstbx/util/pipedream_xchem_helpers.py new file mode 100644 index 000000000..eda95a6cf --- /dev/null +++ b/src/dlstbx/util/pipedream_xchem_helpers.py @@ -0,0 +1,188 @@ +from __future__ import annotations + +import json +import os +from pathlib import Path + +import portalocker + +# Fixed body of pipedream_parameters.yaml. Only the five path/mode keys at the +# top vary between runs; everything from Cluster_partition down is constant. +PIPEDREAM_PARAMETERS_TEMPLATE = """\ +Mode: "{mode}" # 'pending_analysis' or 'specific_datasets' - the former will parse your database file for all datasets with RefinementOutcome "1 - Pending Analysis", the later will use a specified list of datasets provided in the csv file specified below + +Processing_directory: {processing_dir} +Output_directory: {output_dir} # Optional - defaults to Processing_directory/analysis/Pipedream/Pipedream_ if not set +Database_path: {db_path} +Dataset_csv_path: {csv_path} # Only required if Mode is 'specific_datasets' + +# Cluster Configuration (Optional) +Cluster_partition: "cs05r" # Options: cs05r, cs04r (default: cs05r) +Job_priority: "normal" # Options: normal, low, high (default: normal) + # low = nice 1000 (runs after other jobs), high = nice -100 + +Remove_crystallisation_components: true # Optional - removes DMS, EDO, GOL, SO4, PO4, PEG from input PDBs if true (can skip if not modelled in site of interest in MR model) +Refinement_parameters: #For more information see https://www.globalphasing.com/buster/manual/pipedream/manual/index.html#_details_of_command_line_arguments + keepwater: true #DO NOT remove waters that are present in the input model (default is to remove them) + WaterUpdatePkmaps: true #Update water pkmaps during refinement + TLS: "TLSbasic" #"TLSbasic" turns on TLS refinement and autoncs. Leave blank for no TLS. + remediate: true #Run SideAide to refit side chains + sidechainrebuild: true #Allow SideAide to rebuild stubbed sidechains + runpepflip: true #Run pepflip to check for and correct peptide bond flips + rhocommands: + - -xclusters # Produces ligand fits for the best possible binding sites. Leave blank for default and fit to best sites. + - -nochirals # Ignore CHIRAL restraints in fitting/output. Chiral centres can then invert as needed. +""" + + +def write_pipedream_parameters( + processing_dir, + pipedream_dir, + *, + mode="specific_datasets", + logger=None, +): + """Write a pipedream_parameters.yaml for manual export_pipedream.py runs. + + The refinement/cluster block is fixed; only the paths change, and they all + derive from the two directories the wrapper already computes: + + Database_path = processing_dir/database/soakDBDataFile.sqlite (master) + Output_directory = pipedream_dir/Pipedream_results + Dataset_csv_path = Output_directory/datasets.csv + + The file is written into Output_directory and its path returned. + """ + processing_dir = Path(processing_dir) + output_dir = Path(pipedream_dir) / "Pipedream_results" + params_path = output_dir / "pipedream_parameters.yaml" + if params_path.exists(): + if logger: + logger.info( + f"Pipedream parameters already exist, leaving as-is: {params_path}" + ) + return params_path + + text = PIPEDREAM_PARAMETERS_TEMPLATE.format( + mode=mode, + processing_dir=processing_dir, + output_dir=output_dir, + db_path=processing_dir / "database" / "soakDBDataFile.sqlite", + csv_path=output_dir / "datasets.csv", + ) + + output_dir.mkdir(parents=True, exist_ok=True) + params_path.write_text(text, encoding="utf-8") + if logger: + logger.info(f"Wrote pipedream parameters to {params_path}") + return params_path + + +def process_pdb_file(dimple_pdb: Path, logger=None): + """Strip common crystallisation components from a dimple pdb in-place.""" + if not dimple_pdb.exists(): + if logger: + logger.debug(f"Dimple pdb {dimple_pdb} does not exist") + return True + + with open(dimple_pdb, "r", encoding="utf-8") as f: + lines = f.readlines() + + # Count removals by component type + original_count = len(lines) + components_to_remove = ["DMS", "EDO", "GOL", "SO4", "PO4", "PEG"] + removed_counts = dict.fromkeys(components_to_remove, 0) + + kept_lines = [] + for line in lines: + if any(res in line for res in components_to_remove): + # Count which component was found + for comp in components_to_remove: + if comp in line: + removed_counts[comp] += 1 + break + else: + kept_lines.append(line) + + # Write cleaned file + with open(dimple_pdb, "w", encoding="utf-8") as f: + f.writelines(kept_lines) + + removed_total = original_count - len(kept_lines) + if removed_total > 0 and logger: + component_summary = ", ".join( + [f"{comp}: {count}" for comp, count in removed_counts.items() if count > 0] + ) + logger.debug(f"Removed {removed_total} lines. ({component_summary})") + + +def save_dataset_metadata( + pipedream_dir, + input_dir, + output_dir, + compound_code, + smiles_string, + pipedream_cmd, + dtag, + logger=None, +): + metadata = { + "Input_dir": input_dir, + "CompoundCode": compound_code, + "PipedreamDirectory": output_dir, + "ReportHTML": f"{output_dir}/report-{compound_code}/index.html", + "LigandReportHTML": f"{output_dir}/report-{compound_code}/ligand/index.html", + "ExpectedSummary": f"{output_dir}/pipedream_summary.json", + "PipedreamCommand": pipedream_cmd, + "ExpectedCIF": os.path.join(input_dir, f"{compound_code}.cif"), + "ExpectedPDB": os.path.join(input_dir, f"{compound_code}.pdb"), + "InputSMILES": smiles_string, + } + + output_yaml = {} + output_yaml[dtag] = metadata + json_file = f"{pipedream_dir}/Pipedream_output.json" + if not os.path.exists(json_file): + open(json_file, "w").close() + + # Acquire a lock + with portalocker.Lock(json_file, timeout=5): + if os.path.exists(json_file) and os.path.getsize(json_file) > 0: + with open(json_file, "r", encoding="utf-8") as f: + try: + data = json.load(f) + except Exception as e: + if logger: + logger.debug( + f"Cannot continue with pipedream postprocessing: {e}" + ) + return + else: + data = {} + + data.update(output_yaml) + + with open(json_file, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2) + + +def cleanup_setvar_files(pipedream_dir, logger=None): + """Delete __.setvar.lis autoBUSTER logs left in the pipedream dir. + + BUSTER drops one append-only setvar log per process into its working + directory (the shared pipedream dir), named after that process's PID. They + are diagnostic only and never read back, and accumulate over a visit. This + is called from collate, by which point processing for the visit has + finished, so any remaining logs are orphaned and safe to remove. + """ + removed = 0 + for f in Path(pipedream_dir).glob("*.setvar.lis"): + try: + f.unlink() + removed += 1 + except OSError as e: + if logger: + logger.warning(f"Could not remove setvar log {f.name}: {e}") + + if removed and logger: + logger.info(f"Removed {removed} setvar log(s) from {pipedream_dir}") diff --git a/src/dlstbx/util/soakdb.py b/src/dlstbx/util/soakdb.py new file mode 100644 index 000000000..06b653d1e --- /dev/null +++ b/src/dlstbx/util/soakdb.py @@ -0,0 +1,206 @@ +from __future__ import annotations + +import shutil +import sqlite3 +from itertools import groupby +from pathlib import Path + +import yaml + + +def _soakdb_path(visit_dir: Path) -> Path: + return visit_dir / "processing/database" / "soakDBDataFile.sqlite" + + +def _read_protein(db_path: Path) -> str | None: + """Return the Protein (target acronym) recorded in a soakDB database.""" + con = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True, timeout=10) + try: + row = con.execute("SELECT Protein FROM soakDB").fetchone() + finally: + con.close() + return row[0] if row else None + + +def _has_crystal(db_path: Path, container_code, location, dtag) -> bool: + """True if mainTable holds a row for this puck/position/crystal.""" + query = ( + "SELECT 1 FROM mainTable WHERE Puck = ? AND PuckPosition = ? " + "AND CrystalName = ? LIMIT 1" + ) + con = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True, timeout=10) + try: + return ( + con.execute(query, (container_code, location, dtag)).fetchone() is not None + ) + finally: + con.close() + + +def find_xchem_visit_dir( + xchem_dir: Path, acronym, container_code, location, dtag, log +) -> Path | None: + """Locate the labxchem visit directory under `xchem_dir` whose target + matches `acronym`. + + Prefers cached `.user.yaml` files (disambiguating by the crystal's + puck/position when several visits share a target); falls back to reading + the `Protein` field from each visit's soakDB database, caching the result + to `.user.yaml` for next time. Returns None if no visit matches.""" + # tier 1: match via cached .user.yaml + candidates = [] + for subdir in xchem_dir.iterdir(): + user_yaml = subdir / ".user.yaml" + if not user_yaml.is_file(): + continue + with open(user_yaml) as f: + expt_yaml = yaml.load(f, Loader=yaml.SafeLoader) + if expt_yaml["data"]["acronym"] == acronym: + candidates.append(subdir) + log.info(f"Found user yaml for dtag {dtag} at {user_yaml}") + + if len(candidates) == 1: + return candidates[0] + + # several visits share this target: disambiguate by the crystal's location + for visit_dir in candidates: + db_path = _soakdb_path(visit_dir) + if not db_path.is_file(): + log.info(f"No .sqlite database at {db_path} for dtag {dtag}, skipping") + continue + try: + if _has_crystal(db_path, container_code, location, dtag): + log.info(f"labxchem visit {visit_dir} found for dtag {dtag}") + return visit_dir + except Exception as e: + log.info( + f"Exception whilst reading ligand information from {db_path} " + f"for dtag {dtag}: {e}" + ) + + # tier 2: no cached match — read Protein from each soakDB, caching as we go + log.info(f"No matching user yaml in {xchem_dir}, reading soakDB databases...") + match_dir = None + for subdir in xchem_dir.iterdir(): + if (subdir / ".user.yaml").exists(): + continue + db_path = _soakdb_path(subdir) + if not db_path.is_file(): + continue + try: + name = _read_protein(db_path) + except Exception as e: + log.info(f"Problem reading .sqlite database for {subdir}: {e}") + continue + if name is not None: + with open(subdir / ".user.yaml", "w") as f: + yaml.dump({"data": {"acronym": name}}, f) + if name == acronym: + match_dir = subdir + return match_dir + + +def prepare_auto_db(processing_dir: Path) -> Path: + """Create (if needed) and sync the auto soakDB copy from the master, then + return its path. The copy is what updatable_crystals() and the bulk update + operate on.""" + db_master = processing_dir / "database" / "soakDBDataFile.sqlite" + db_copy = processing_dir / "auto/database" / "autosoakDBDataFile.sqlite" + + if not db_copy.exists(): + Path(db_copy.parents[0]).mkdir(parents=True, exist_ok=True) + shutil.copy(db_master, db_copy) + + sync_schema_from_master(db_master, db_copy, "mainTable") + sync_rows_from_master(db_master, db_copy, "mainTable") + return db_copy + + +def sync_schema_from_master(db_master, db_copy, table): + """Add any columns present in master but missing from copy. + Preserves column type from master.""" + + master_conn = sqlite3.connect(db_master) + try: + master_col_defs = { + row[1]: row[2] for row in master_conn.execute(f"PRAGMA table_info({table})") + } + finally: + master_conn.close() + + copy_conn = sqlite3.connect(db_copy) + try: + copy_cols = {row[1] for row in copy_conn.execute(f"PRAGMA table_info({table})")} + + new_cols = set(master_col_defs.keys()) - copy_cols + for col in new_cols: + col_type = master_col_defs[col] + copy_conn.execute(f"ALTER TABLE {table} ADD COLUMN {col} {col_type}") + + copy_conn.commit() + finally: + copy_conn.close() + + +def sync_rows_from_master(master_path, copy_path, table): + conn = sqlite3.connect(copy_path) + try: + conn.execute(f"ATTACH DATABASE '{master_path}' AS master") + + # Insert only rows from master that don't already exist in the copy + conn.execute(f""" + INSERT OR IGNORE INTO {table} + SELECT * FROM master.{table} + """) + + conn.commit() + conn.execute("DETACH DATABASE master") + finally: + conn.close() + + +def updatable_crystals(database_path, overwrite=False) -> set[str]: + """CrystalNames this run is allowed to write — both the skip-set for + building db_dicts/exporting files and the gate for the bulk update. + + default: rows not yet given a RefinementOutcome. + overwrite: every crystal row, including manually-curated ones.""" + if overwrite: + where = "CrystalName IS NOT NULL" + # where = "(LastUpdated_by = 'gda2' OR LastUpdated_by IS NULL)" + else: + where = ( + "RefinementOutcome IS NULL OR RefinementOutcome = '1 - Analysis Pending'" + ) + conn = sqlite3.connect(database_path, timeout=30) + try: + rows = conn.execute( + f"SELECT CrystalName FROM mainTable WHERE {where}" + ).fetchall() + finally: + conn.close() + return {row[0] for row in rows} + + +def update_data_source_bulk(db_dicts, database_path): + # db_dicts is already restricted to updatable_crystals(), so the bulk + # update only needs to match on CrystalName. + keyed = sorted(db_dicts, key=lambda d: tuple(sorted(d))) + + conn = sqlite3.connect(database_path, timeout=30) + try: + cursor = conn.cursor() + for keys, group in groupby(keyed, key=lambda d: tuple(sorted(d))): + columns = [k for k in keys if k != "CrystalName"] + sql = ( + "UPDATE mainTable SET " + + ", ".join([f"{col} = :{col}" for col in columns]) + + " WHERE CrystalName = :CrystalName" + ) + cursor.executemany(sql, list(group)) + conn.commit() + except Exception: + conn.rollback() + raise + finally: + conn.close() diff --git a/src/dlstbx/util/symlink.py b/src/dlstbx/util/symlink.py index d5079d766..e8b57612f 100644 --- a/src/dlstbx/util/symlink.py +++ b/src/dlstbx/util/symlink.py @@ -3,6 +3,18 @@ import os +def safe_symlink(src, dst, logger=None): + """Create a symlink at dst pointing to src, replacing any existing file or + link already there.""" + try: + if os.path.islink(dst) or os.path.exists(dst): + os.remove(dst) + os.symlink(src, dst) + except Exception as e: + if logger: + logger.error(f"Error creating symlink from {src} to {dst}: {e}") + + def create_parent_symlink( destination_path, symlink_name, levels=2, overwrite_symlink=False ): diff --git a/src/dlstbx/util/xchem_collate_helpers.py b/src/dlstbx/util/xchem_collate_helpers.py new file mode 100644 index 000000000..b98563fed --- /dev/null +++ b/src/dlstbx/util/xchem_collate_helpers.py @@ -0,0 +1,358 @@ +from __future__ import annotations + +import json +import shutil +from datetime import datetime +from pathlib import Path + +import pandas as pd + +from dlstbx.util.soakdb import update_data_source_bulk +from dlstbx.util.symlink import safe_symlink + + +def traffic_light(value, green, orange=None, reverse=False): + """Return the 'green'/'orange'/'red' band for a metric, or None if it + can't be parsed. Set reverse=True for metrics where higher is better + (e.g. Ramachandran favoured).""" + try: + if value in (None, "", "NA"): + return None + val = float(value) + if orange is None: + if reverse: + return "green" if val > green else "red" + return "green" if val < green else "red" + if reverse: + # Higher is better + if val > green: + return "green" + return "orange" if val > orange else "red" + # Lower is better (R-factor, resolution, RMSD, ...) + if val < green: + return "green" + return "orange" if val < orange else "red" + except (ValueError, TypeError): + return None + + +def find_pipedream_model(pipedream_dir, dtag, rscc_thresh): + """Locate the Pipedream postrefine model for a dataset and its best rhofit + RSCC.""" + + RHOFIT_HIT_LOG = "Hit_corr.log" + + pipdream_dtag = pipedream_dir / dtag + if not pipdream_dtag.is_dir(): + return None, None + + rhofit_dir = next(pipdream_dtag.glob("rhofit-*"), None) + postrefine_dir = next(pipdream_dtag.glob("postrefine-*"), None) + if not rhofit_dir or not postrefine_dir: + return None, None + + hit_log = rhofit_dir / RHOFIT_HIT_LOG + if not hit_log.exists(): + return None, None + + with open(hit_log) as f: + rscc = max(float(line.split()[1]) for line in f if line.strip()) + + refine_pdb = postrefine_dir / "refine.pdb" + if refine_pdb.exists() and rscc > rscc_thresh: + return refine_pdb, rscc + return None, None + + +def find_pandda_model(panddas_dir, dtag) -> Path | None: + panddas_dtag = panddas_dir / "processed_datasets" / f"{dtag}" + pandda_model = panddas_dtag / "modelled_structures" / f"{dtag}-pandda-model.pdb" + model_path = pandda_model if pandda_model.exists() else None + return model_path + + +def pipedream_refinement_metrics( + pipedream_model, compound_code, db_timestamp, logger=None +): + """Extract refinement & validation statistics from the pipedream_summary.json + that sits alongside the selected postrefine model, returning a dict of soakDB + mainTable columns. Mirrors https://github.com/Daren-fearon/pipedream_xchem/. + + Note: RefinementOutcome, RefinementLigandConfidence, RefinementLigandCC and + RefinementBoundConformation are intentionally left out - they are set from + the rhofit rscc by the caller.""" + + postrefine_dir = Path(pipedream_model).parent + pipedream_out = postrefine_dir.parent + summary_path = pipedream_out / "pipedream_summary.json" + + with open(summary_path) as f: + summary = json.load(f) + + ligands = summary.get("ligandfitting", {}).get("ligands", []) + first_ligand = ligands[0] if ligands else {} + molprobity = first_ligand.get("validationstatistics", {}).get("molprobity", {}) + # postrefinement[1] is the final refinement step (matches collate script) + postref = first_ligand.get("postrefinement", []) + postref_final = postref[1] if len(postref) > 1 else {} + + # High resolution from data processing input, rounded for display + reshigh = summary.get("dataprocessing", {}).get("inputdata", {}).get("reshigh") + try: + resolution = round(float(reshigh), 2) + except (TypeError, ValueError): + resolution = None + + def _round(value, digits=3): + return round(value, digits) if isinstance(value, (int, float)) else value + + r = _round(postref_final.get("R")) + rfree = _round(postref_final.get("Rfree")) + molprob = molprobity.get("molprobityscore") + rama_out = molprobity.get("ramaoutlierpercent") + rama_fav = molprobity.get("ramafavoredpercent") + rmsd_bonds = molprobity.get("rmsbonds") + rmsd_angles = molprobity.get("rmsangles") + + # BUSTER mmCIF model/reflections and the report HTML live in the + # postrefine / report directories of the same Pipedream output + mmcif_model = postrefine_dir / "BUSTER_model.cif" + mmcif_reflections = postrefine_dir / "BUSTER_refln.cif" + report = pipedream_out / f"report-{compound_code}" / "index.html" + + if logger and not mmcif_model.exists(): + logger.warning(f"BUSTER model CIF not found at {mmcif_model}") + if logger and not mmcif_reflections.exists(): + logger.warning(f"BUSTER reflections CIF not found at {mmcif_reflections}") + + return { + "RefinementResolution": resolution, + "RefinementResolutionTL": traffic_light(resolution, 2.0, 2.5), + "RefinementRcryst": r, + "RefinementRcrystTraficLight": traffic_light(r, 0.20, 0.25), + "RefinementRfree": rfree, + "RefinementRfreeTraficLight": traffic_light(rfree, 0.25, 0.30), + "RefinementOutcomePerson": "gda2", + "RefinementOutcomeDate": db_timestamp, + "RefinementPDB_latest": str(pipedream_model), + "RefinementMTZ_latest": str(postrefine_dir / "refine.mtz"), + "RefinementMMCIFmodel_latest": str(mmcif_model), + "RefinementMMCIFreflections_latest": str(mmcif_reflections), + "RefinementMolProbityScore": molprob, + "RefinementMolProbityScoreTL": traffic_light(molprob, 2, 3), + "RefinementRamachandranOutliers": rama_out, + "RefinementRamachandranOutliersTL": traffic_light(rama_out, 0.3, 1), + "RefinementRamachandranFavored": rama_fav, + "RefinementRamachandranFavoredTL": traffic_light( + rama_fav, 98, 95, reverse=True + ), + "RefinementRmsdBonds": rmsd_bonds, + "RefinementRmsdBondsTL": traffic_light(rmsd_bonds, 0.012, 0.018), + "RefinementRmsdAngles": rmsd_angles, + "RefinementRmsdAnglesTL": traffic_light(rmsd_angles, 1.5, 2.0), + "RefinementStatus": "finished", + "RefinementBusterReportHTML": str(report), + "RefinementDate": db_timestamp, + } + + +def export_pipedream_files( + dataset_dir, compound_code, pipedream_dir, dtag, logger=None +): + """Export Pipedream results to model_building directory""" + + compound_dir = dataset_dir / "compound" # in model_building + target_cif = compound_dir / f"{compound_code}.cif" + target_pdb = compound_dir / f"{compound_code}.pdb" + symlink_cif = dataset_dir / f"{compound_code}.cif" + + rhofit_dir = pipedream_dir / dtag / f"rhofit-{compound_code}" + output_cif_file = rhofit_dir / "best.cif" + refined_pdb_file = rhofit_dir / "best.pdb" + + if refined_pdb_file.exists() and output_cif_file.exists(): + shutil.copy2(refined_pdb_file, target_pdb) + shutil.copy2(output_cif_file, target_cif) + safe_symlink(target_cif, symlink_cif, logger) + elif logger: + logger.info(f"Could not export restraints files for {dataset_dir}") + + mtz_file_dest = pipedream_dir / dtag / f"postrefine-{compound_code}" / "refine.mtz" + postrefine_pdb = pipedream_dir / dtag / f"postrefine-{compound_code}" / "refine.pdb" + + safe_symlink(postrefine_pdb, dataset_dir / "refine.pdb", logger) + safe_symlink(mtz_file_dest, dataset_dir / "refine.mtz", logger) + safe_symlink(postrefine_pdb, dataset_dir / "refine.split.bound-state.pdb", logger) + + safe_symlink( + pipedream_dir / dtag / f"postrefine-{compound_code}" / "refine_2fofc.map", + dataset_dir / f"{dtag}_2fofc.map", + logger, + ) + safe_symlink( + pipedream_dir / dtag / f"postrefine-{compound_code}" / "refine_fofc.map", + dataset_dir / f"{dtag}_fofc.map", + logger, + ) + + +def symlink_score_buckets(panddas_dir, pandda_dir, updatable, logger): + """Build score-bucketed, symlinked copies of the PanDDA processed_datasets + dir so models can be browsed by ligand score. Only datasets still in + `updatable` (not yet processed by the autopipeline) are bucketed. + + Scores come from the per-dataset best_score.txt written by pandda_xchem.""" + + processed_dataset_dir = panddas_dir / "processed_datasets" + events_csv = panddas_dir / "analyses" / "pandda_analyse_events.csv" + sites_csv = panddas_dir / "analyses" / "pandda_analyse_sites.csv" + + if not events_csv.exists(): + logger.info(f"No {events_csv}, skipping score bucketing") + return + + # scores[dtag] = best ligand score + scores = {} + for dataset in processed_dataset_dir.iterdir(): + if dataset.name not in updatable: + continue + score_file = dataset / "best_score.txt" + if score_file.exists(): + scores[dataset.name] = float(score_file.read_text().strip()) + + sorted_scores = sorted(scores.items(), key=lambda x: x[1], reverse=True) + + df = pd.read_csv(events_csv, index_col=0) + buckets = [0.6, 0.8, 0.9, 1] # boundaries + for j in range(len(buckets) - 1): + bucket_dtags = [ + dtag + for dtag, score in sorted_scores + if buckets[j] < score <= buckets[j + 1] + ] + logger.info( + f"Datasets scored {buckets[j]}-{buckets[j + 1]}: {len(bucket_dtags)}" + ) + if not bucket_dtags: + continue + + # Mirror the panddas layout: analyses/ and processed_datasets/ siblings + bucket_root = pandda_dir / f"score_{buckets[j]}-{buckets[j + 1]}" + bucket_processed = bucket_root / "processed_datasets" + analyses_dir = bucket_root / "analyses" + bucket_processed.mkdir(parents=True, exist_ok=True) + analyses_dir.mkdir(parents=True, exist_ok=True) + + # Add this batch's datasets alongside any symlinked by previous runs + for dtag in bucket_dtags: + safe_symlink(processed_dataset_dir / dtag, bucket_processed / dtag, logger) + + # Filter the events csv on every dataset now in the bucket (this batch + # plus earlier ones) so prior batches stay represented. + all_bucket_dtags = [p.name for p in bucket_processed.iterdir()] + shutil.copy(sites_csv, analyses_dir) + filtered = df[df["dtag"].isin(all_bucket_dtags)].reset_index(drop=True) + filtered.to_csv(analyses_dir / "pandda_analyse_events.csv") + + +def update_xchem_database( + model_dir, pipedream_dir, panddas_dir, db_copy, updatable, logger +): + """Performs model selection & exports results to XChem SoakDB database. + `updatable` is the CrystalName set this run is allowed to write, captured + before any RefinementOutcome is set.""" + + # Build list of dicts for batch updating rows in SQLite + db_dicts = [] + for dataset_dir in model_dir.iterdir(): + if not dataset_dir.is_dir(): + continue + dtag = dataset_dir.name + if dtag not in updatable: + logger.info(f"{dtag} not in set of updatable CrystalNames") + continue + compound_dir = dataset_dir / "compound" + cif_files = list(compound_dir.glob("*.cif")) + + if len(cif_files) > 1: + logger.error(f"Multiple .cif files in {compound_dir}") + + CompoundCode = cif_files[0].stem + db_timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S.%f")[:-4] + + pipedream_model, rscc = find_pipedream_model( + pipedream_dir, dtag, rscc_thresh=0.7 + ) + pandda_model = find_pandda_model(panddas_dir, dtag) + + # Export + if pipedream_model: + logger.info(f"Selected Pipedream model for {dtag}") + + # Determine ligand confidence based on overall ligandcc value + if rscc >= 0.8: + RefinementLigandConfidence = "4 - High Confidence" + RefinementOutome = "4 - CompChem ready" + elif rscc >= 0.7: + RefinementLigandConfidence = "2 - Correct ligand, weak density" + RefinementOutome = "3 - In Refinement" + + # Full refinement/validation statistics from the Pipedream summary json + try: + metrics = pipedream_refinement_metrics( + pipedream_model, CompoundCode, db_timestamp, logger + ) + except Exception as e: + logger.error(f"Could not read Pipedream summary for {dtag}: {e}") + metrics = {} + + db_dicts.append( + { + "CrystalName": dtag, + "RefinementBoundConformation": str(pipedream_model), + "RefinementOutcome": RefinementOutome, + "RefinementLigandConfidence": RefinementLigandConfidence, + "RefinementLigandCC": rscc, + "RefinementCIF": str( + dataset_dir / "compound" / f"{CompoundCode}.cif" + ), + "RefinementCIFprogram": "Grade2", + "LastUpdated": db_timestamp, + "LastUpdated_by": "gda2", + **metrics, + } + ) + + export_pipedream_files( + dataset_dir, CompoundCode, pipedream_dir, dtag, logger + ) + + elif pandda_model: + logger.info(f"Selected PanDDA2 model for {dtag}") + db_dicts.append( + { + "CrystalName": dtag, + "RefinementBoundConformation": str(pandda_model), + "RefinementOutcome": "2 - PANDDA model", + "RefinementCIFprogram": "Grade2", + "LastUpdated": db_timestamp, + "LastUpdated_by": "gda2", + } + ) + else: + logger.info(f"No model selected for {dtag}") + db_dicts.append( + { + "CrystalName": dtag, + "RefinementOutcome": "7 - Analysed & Rejected", + "LastUpdated": db_timestamp, + "LastUpdated_by": "gda2", + } + ) + + # Now update the database with the formed dicts + try: + update_data_source_bulk(db_dicts, db_copy) + logger.debug(f"Bulk updated {db_copy} for {len(db_dicts)} datasets") + except Exception as e: + logger.debug(f"Could not bulk update {db_copy}: {e}") diff --git a/src/dlstbx/wrapper/ligandrestraints.py b/src/dlstbx/wrapper/ligandrestraints.py new file mode 100644 index 000000000..832881215 --- /dev/null +++ b/src/dlstbx/wrapper/ligandrestraints.py @@ -0,0 +1,93 @@ +from __future__ import annotations + +import subprocess +from pathlib import Path + +from dlstbx.wrapper import Wrapper + + +class LigandRestraintsWrapper(Wrapper): + _logger_name = "dlstbx.wrap.ligandrestraints" + + def run(self): + assert hasattr(self, "recwrap"), "No recipewrapper object found" + self.log.info( + f"Running recipewrap file {self.recwrap.recipe_step['parameters']['recipewrapper']}" + ) + + params = self.recwrap.recipe_step["job_parameters"] + processing_dir = Path(params.get("processing_directory")) + dtag = params.get("dtag") + + model_dir = processing_dir / "auto" / "analysis" / "model_building" + dataset_dir = model_dir / dtag + compound_dir = dataset_dir / "compound" + + smiles_files = list(compound_dir.glob("*.smiles")) + if len(smiles_files) == 0: + self.log.error( + f"No .smiles file present in {compound_dir}, cannot continue for dtag {dtag}" + ) + return False + elif len(smiles_files) > 1: + self.log.error( + f"Multiple .smiles files found in {compound_dir}: {smiles_files}, warning for dtag {dtag}" + ) + return False + + smiles_file = smiles_files[0] + CompoundCode = smiles_file.stem + + restraints_log = dataset_dir / "restraints.log" + attachments = [restraints_log] + restraints_command = f"grade2 --in {smiles_file} --itype smi --out {CompoundCode} -f > {restraints_log}" + + try: + subprocess.run( + restraints_command, + shell=True, + capture_output=True, + text=True, + cwd=compound_dir, + check=True, + timeout=60 * 60, + ) + except subprocess.CalledProcessError as e: + self.log.error( + f"Ligand restraint generation command: '{restraints_command}' failed for dataset {dtag}" + ) + self.log.info(e.stdout) + self.log.error(e.stderr) + self.send_attachments_to_ispyb(attachments) + return False + + (compound_dir / f"{CompoundCode}.restraints.cif").rename( + compound_dir / f"{CompoundCode}.cif" + ) + (compound_dir / f"{CompoundCode}.xyz.pdb").rename( + compound_dir / f"{CompoundCode}.pdb" + ) + + self.log.info(f"Restraints generated successfully for dtag {dtag}") + self.send_attachments_to_ispyb(attachments) + return True + + def send_attachments_to_ispyb(self, attachments): + for f in attachments: + if f.exists(): + file_type = "Log" if f.suffix in (".log", ".out") else "Result" + importance_rank = 2 if file_type == "Log" else 1 + try: + self.record_result_individual_file( + { + "file_path": str(f.parents[0]), + "file_name": f.name, + "file_type": file_type, + "importance_rank": importance_rank, + } + ) + self.log.info(f"Uploaded {f.name} as an attachment") + except Exception: + self.log.warning( + f"Could not attach {f.name} to ISPyB", exc_info=True + ) diff --git a/src/dlstbx/wrapper/pandda_xchem.py b/src/dlstbx/wrapper/pandda_xchem.py index de073cbbd..6003f577c 100644 --- a/src/dlstbx/wrapper/pandda_xchem.py +++ b/src/dlstbx/wrapper/pandda_xchem.py @@ -7,7 +7,6 @@ from pathlib import Path import gemmi -import numpy as np import yaml from dlstbx.util.mvs.helpers import ( @@ -15,6 +14,17 @@ save_cropped_map, ) from dlstbx.util.mvs.viewer_pandda import gen_html_pandda +from dlstbx.util.pandda import ( + get_contact_chain, + get_pandda_settings, + map_sigma, + mask_map, + merge_build, + read_pandda_map, + remove_nearby_atoms, + remove_waters_from_ligand, + save_xmap, +) from dlstbx.wrapper import Wrapper @@ -38,12 +48,12 @@ def run(self): auto_dir = processing_dir / "auto" analysis_dir = Path(auto_dir / "analysis") pandda_dir = analysis_dir / "pandda2" - model_dir = pandda_dir / "model_building" + model_dir = analysis_dir / "model_building" panddas_dir = Path(pandda_dir / "panddas") - Path(panddas_dir).mkdir(exist_ok=True) + Path(panddas_dir).mkdir(parents=True, exist_ok=True) - reprocessing = params.get("reprocessing") - n_datasets = int(params.get("n_datasets")) + overwrite = params.get("overwrite") + n_datasets = int(params.get("n_datasets") or 1) if n_datasets > 1: # array job case batch = True @@ -59,7 +69,6 @@ def run(self): self.log.info(f"Processing dtag: {dtag}") - smiles = params.get("smiles") smiles_files = list(compound_dir.glob("*.smiles")) if len(smiles_files) == 0: @@ -75,46 +84,11 @@ def run(self): smiles_file = smiles_files[0] CompoundCode = smiles_file.stem + smiles = smiles_file.read_text().strip() - # ------------------------------------------------------- - # Ligand restraint generation - - restraints_log = dataset_dir / "restraints.log" - attachments = [restraints_log] # synchweb attachments - restraints_command = f"grade2 --in {smiles_file} --itype smi --out {CompoundCode} -f > {restraints_log}" - # acedrg_command = f"module load ccp4; acedrg -i {smiles_file} -o {CompoundCode}" - - try: - subprocess.run( - restraints_command, - shell=True, - capture_output=True, - text=True, - cwd=compound_dir, - check=True, - timeout=30 * 60, - ) - - except subprocess.CalledProcessError as e: - self.log.error( - f"Ligand restraint generation command: '{restraints_command}' failed for dataset {dtag}" - ) - - self.log.info(e.stdout) - self.log.error(e.stderr) - self.send_attachments_to_ispyb(attachments, batch) - return False - - restraints = compound_dir / f"{CompoundCode}.restraints.cif" - restraints.rename(compound_dir / f"{CompoundCode}.cif") - ligand_pdb = compound_dir / f"{CompoundCode}.xyz.pdb" - ligand_pdb.rename(compound_dir / f"{CompoundCode}.pdb") - + # Restraints (grade2) were generated upstream by the ligand-restraints job. ligand_cif = compound_dir / f"{CompoundCode}.cif" - - self.log.info( - f"Restraints generated succesfully for dtag {dtag}, launching PanDDA2" - ) + attachments = [] # ------------------------------------------------------- # PanDDA2 @@ -123,11 +97,11 @@ def run(self): pandda2_log = dataset_pdir / "pandda2.log" attachments.extend([pandda2_log, ligand_cif]) - if reprocessing and dataset_pdir.exists(): + if overwrite and dataset_pdir.exists(): shutil.rmtree(dataset_pdir) # add any user specified pandda parameters - args_string = self.get_pandda_settings(user_yaml) + args_string = get_pandda_settings(user_yaml) pandda2_command = f"source {PANDDA_2_DIR}/venv/bin/activate; \ python -u /dls_sw/i04-1/software/PanDDA2/scripts/process_dataset.py --data_dirs={model_dir} --out_dir={panddas_dir} --dtag={dtag} --use_ligand_data=True --local_cpus=4 {args_string}" @@ -212,13 +186,13 @@ def run(self): # Rhofit can be confused by hunting non-binding site density. This can be avoided # by truncating the map to near the binding site - dmap = self.read_pandda_map(event_map) - dmap = self.mask_map(dmap, event_coord) - self.save_xmap(dmap, restricted_build_dmap) + dmap = read_pandda_map(event_map) + dmap = mask_map(dmap, event_coord) + save_xmap(dmap, restricted_build_dmap) # Rhofit masks the protein before building. If the original protein # model clips the event then this results in autobuilding becoming impossible. - self.remove_nearby_atoms( + remove_nearby_atoms( pdb_file, event_coord, 10.0, @@ -226,7 +200,7 @@ def run(self): ) cifs = list(ligand_dir.glob("*.cif")) - cut = self.map_sigma(restricted_build_dmap) + cut = map_sigma(restricted_build_dmap) rhofit_log = dataset_pdir / "rhofit.log" attachments.extend([event_map, z_map, rhofit_log]) @@ -299,6 +273,9 @@ def run(self): self.log.info(f"Best ligand score for {dtag} = {best_score}: {best_build_path}") + # Persist the best score so xchem_collate can bucket datasets by score + (dataset_pdir / "best_score.txt").write_text(str(best_score)) + # ------------------------------------------------------- # Merge the protein structure with best fitted ligand -> pandda model @@ -309,8 +286,8 @@ def run(self): protein_st = gemmi.read_structure(str(protein_st_file)) ligand_st = gemmi.read_structure(str(ligand_st_file)) - contact_chain = self.get_contact_chain(protein_st, ligand_st) - self.merge_build(protein_st, ligand_st, contact_chain) + contact_chain = get_contact_chain(protein_st, ligand_st) + merge_build(protein_st, ligand_st, contact_chain) if pandda_model.exists(): # backup previous model shutil.copy2(pandda_model, modelled_dir / "pandda-internal-fitted.pdb") @@ -318,7 +295,7 @@ def run(self): protein_st.write_pdb(str(pandda_model)) try: - self.remove_waters_from_ligand(pandda_model) + remove_waters_from_ligand(pandda_model, self.log) except Exception as e: self.log.error(f"Exception removing waters from {pandda_model}: {e}") @@ -361,245 +338,6 @@ def run(self): self.log.info(f"Auto PanDDA2 pipeline finished successfully for dtag {dtag}") return True - def save_xmap(self, xmap, xmap_file): - """Convenience script for saving ccp4 files.""" - ccp4 = gemmi.Ccp4Map() - ccp4.grid = xmap - ccp4.update_ccp4_header() - ccp4.write_ccp4_map(str(xmap_file)) - - def read_pandda_map(self, xmap_file): - """PanDDA 2 maps are often truncated, and PanDDA 1 maps can have misasigned spacegroups. - This method handles both.""" - dmap_ccp4 = gemmi.read_ccp4_map(str(xmap_file), setup=False) - dmap_ccp4.grid.spacegroup = gemmi.find_spacegroup_by_name("P1") - dmap_ccp4.setup(0.0) - dmap = dmap_ccp4.grid - return dmap - - def map_sigma(self, xmap, sigma=1): - ccp4 = gemmi.read_ccp4_map(str(xmap)) - ccp4.setup(0.0) - grid = ccp4.grid - grid_array = np.array(grid, copy=False) - non_zero_std = np.std(grid_array[(grid_array < -0.05) | (grid_array > 0.05)]) - return non_zero_std * sigma - - def expand_event_map(self, bdc, ground_state_file, xmap_file, coord, out_file): - """DEPRECATED. A method for recalculating event maps over the full cell.""" - ground_state_ccp4 = gemmi.read_ccp4_map(str(ground_state_file), setup=False) - ground_state_ccp4.grid.spacegroup = gemmi.find_spacegroup_by_name("P1") - ground_state_ccp4.setup(0.0) - ground_state = ground_state_ccp4.grid - - xmap_ccp4 = gemmi.read_ccp4_map(str(xmap_file), setup=False) - xmap_ccp4.grid.spacegroup = gemmi.find_spacegroup_by_name("P1") - xmap_ccp4.setup(0.0) - xmap = xmap_ccp4.grid - - mask = gemmi.FloatGrid(xmap.nu, xmap.nv, xmap.nw) - mask.set_unit_cell(xmap.unit_cell) - mask.set_points_around( - gemmi.Position(coord[0], coord[1], coord[2]), radius=10.0, value=1.0 - ) - - event_map = gemmi.FloatGrid(xmap.nu, xmap.nv, xmap.nw) - event_map.set_unit_cell(xmap.unit_cell) - event_map_array = np.array(event_map, copy=False) - event_map_array[:, :, :] = np.array(xmap)[:, :, :] - ( - bdc * np.array(ground_state)[:, :, :] - ) - event_map_array[:, :, :] = event_map_array[:, :, :] * np.array(mask)[:, :, :] - - event_map_non_zero = event_map_array[event_map_array != 0.0] - cut = np.std(event_map_non_zero) - - return cut - - def mask_map(self, dmap, coord, radius=10.0): - """Simple routine to mask density to region around a specified point.""" - mask = gemmi.FloatGrid(dmap.nu, dmap.nv, dmap.nw) - mask.set_unit_cell(dmap.unit_cell) - mask.set_points_around( - gemmi.Position(coord[0], coord[1], coord[2]), radius=radius, value=1.0 - ) - - dmap_array = np.array(dmap, copy=False) - dmap_array[:, :, :] = dmap_array[:, :, :] * np.array(mask)[:, :, :] - - return dmap - - def remove_nearby_atoms(self, pdb_file, coord, radius, pandda_model): - """An inelegant method for removing residues near the event centroid and creating - a new, truncated pdb file. GEMMI doesn't have a super nice way to remove - residues according to a specific criteria.""" - st = gemmi.read_structure(str(pdb_file)) - new_st = st.clone() # Clone to keep metadata - - coord_array = np.array([coord[0], coord[1], coord[2]]) - - # Delete all residues for a clean chain. Yes this is an arcane way to do it. - chains_to_delete = [] - for model in st: - for chain in model: - chains_to_delete.append((model.num, chain.name)) - - for model in new_st: - for chain in model: - for res in chain: - del chain[-1] - - # Add non-rejected residues to a new structure - for j, model in enumerate(st): - for k, chain in enumerate(model): - for res in chain: - add_res = True - for atom in res: - pos = atom.pos - distance = np.linalg.norm( - coord_array - np.array([pos.x, pos.y, pos.z]) - ) - if distance < radius: - add_res = False - - if add_res: - new_st[j][k].add_residue(res) - new_st.write_pdb(str(pandda_model)) - - def remove_waters_from_ligand(self, pandda_model): - st = gemmi.read_structure(str(pandda_model)) - st.setup_entities() - - LIGAND_RES_NAME = "LIG" - - # Collect ligand atoms with their VdW radii - ligand_atoms = [] # list of (pos, vdw_radius) - for chain in st[0]: - for res in chain: - if res.name == LIGAND_RES_NAME: - for atom in res: - vdw = atom.element.vdw_r - ligand_atoms.append((atom.pos, vdw)) - - max_vdw = max(r for _, r in ligand_atoms) - search_radius = max_vdw # + O_VDW=1.5 - - ns = gemmi.NeighborSearch(st[0], st.cell, search_radius).populate() - - # Find waters where any atom overlaps within VdW radii - waters_to_remove = set() - for lig_pos, lig_vdw in ligand_atoms: - for mark in ns.find_atoms(lig_pos, "\0", radius=search_radius): - cra = mark.to_cra(st[0]) - if cra.residue.entity_type != gemmi.EntityType.Water: - continue - wat_vdw = cra.atom.element.vdw_r - cutoff = lig_vdw + wat_vdw - dist = lig_pos.dist(mark.pos) - if dist < cutoff: - waters_to_remove.add((cra.chain.name, cra.residue.seqid)) - - # Remove waters - for chain in st[0]: - to_delete = [ - i - for i, res in enumerate(chain) - if res.entity_type == gemmi.EntityType.Water - and (chain.name, res.seqid) in waters_to_remove - ] - for i in reversed(to_delete): # reversed so indices stay valid - del chain[i] - - st.write_pdb(str(pandda_model.parents[0] / pandda_model.name)) - self.log.info(f"Removed {len(waters_to_remove)} waters in {pandda_model.name}") - - def get_contact_chain(self, protein_st, ligand_st): - """A simple estimation of the contact chain based on which chain has the most atoms - nearby.""" - ligand_pos_list = [] - for model in protein_st: - for chain in model: - for res in chain: - for atom in res: - pos = atom.pos - ligand_pos_list.append([pos.x, pos.y, pos.z]) - centroid = np.linalg.norm(np.array(ligand_pos_list), axis=0) - - PROTEIN_RESIDUES = [ - "ALA", - "ARG", - "ASN", - "ASP", - "CYS", - "GLN", - "GLU", - "HIS", - "ILE", - "LEU", - "LYS", - "MET", - "PHE", - "PRO", - "SER", - "THR", - "TRP", - "TYR", - "VAL", - "GLY", - ] - - chain_counts = {} - for model in protein_st: - for chain in model: - chain_counts[chain.name] = 0 - for res in chain: - if res.name not in PROTEIN_RESIDUES: - continue - for atom in res: - pos = atom.pos - distance = np.linalg.norm( - np.array([pos.x, pos.y, pos.z]) - centroid - ) - if distance < 5.0: - chain_counts[chain.name] += 1 - - return min(chain_counts, key=lambda _x: chain_counts[_x]) - - def merge_build(self, receptor, ligand, contact_chain): - # Get the receptor chain - receptor_chain = receptor[0][contact_chain] - - # Get current ligand ids - seqid_nums = [] - for receptor_res in receptor_chain: - num = receptor_res.seqid.num - seqid_nums.append(num) - - # Assign a new, unused ligand id - if len(seqid_nums) == 0: - min_ligand_seqid = 100 - else: - min_ligand_seqid = max(seqid_nums) + 100 - - # Update the ligand residue sequenceid - ligand_residue = ligand[0][0][0] - ligand_residue.seqid.num = min_ligand_seqid - - # Add the ligand residue - receptor_chain.add_residue(ligand_residue, pos=-1) - - return receptor - - def get_pandda_settings(self, yaml_file): - with open(yaml_file, "r") as file: - expt_yaml = yaml.load(file, Loader=yaml.SafeLoader) - settings = expt_yaml.get("autoprocessing", {}).get("pandda", {}) - if settings: - args_string = " ".join(f"--{k}={v}" for k, v in settings.items()) - else: - args_string = "" - return args_string - def send_attachments_to_ispyb(self, attachments, batch): if batch: # synchweb attachments not supported for array job processing return diff --git a/src/dlstbx/wrapper/pipedream_xchem.py b/src/dlstbx/wrapper/pipedream_xchem.py index dd01dd7e2..4fa0c261d 100644 --- a/src/dlstbx/wrapper/pipedream_xchem.py +++ b/src/dlstbx/wrapper/pipedream_xchem.py @@ -5,10 +5,12 @@ import subprocess from pathlib import Path -import portalocker - from dlstbx.util.mvs.helpers import save_cropped_map from dlstbx.util.mvs.viewer_pipedream import gen_html_pipedream +from dlstbx.util.pipedream_xchem_helpers import ( + process_pdb_file, + save_dataset_metadata, +) from dlstbx.wrapper import Wrapper @@ -27,7 +29,8 @@ def run(self): auto_dir = processing_dir / "auto" analysis_dir = Path(auto_dir / "analysis") pipedream_dir = analysis_dir / "pipedream" - model_dir = pipedream_dir / "model_building" + Path(pipedream_dir).mkdir(parents=True, exist_ok=True) + model_dir = analysis_dir / "model_building" dtag = params.get("dtag") smiles = params.get("smiles") @@ -40,7 +43,6 @@ def run(self): self.log.info(f"Processing dtag: {dtag}") - dataset_dir = model_dir / dtag compound_dir = dataset_dir / "compound" smiles_files = list(compound_dir.glob("*.smiles")) @@ -59,44 +61,12 @@ def run(self): smiles_file = smiles_files[0] CompoundCode = smiles_file.stem - # ------------------------------------------------------- - # Ligand restraint generation - - restraints_log = dataset_dir / "restraints.log" - attachments = [restraints_log] # synchweb attachments - - restraints_command = f"grade2 --in {smiles_file} --itype smi --out {CompoundCode} -f > {restraints_log}" - - try: - subprocess.run( - restraints_command, - shell=True, - capture_output=True, - text=True, - cwd=compound_dir, - check=True, - timeout=30 * 60, - ) - - except subprocess.CalledProcessError as e: - self.log.error( - f"Ligand restraint generation command: '{restraints_command}' failed for dataset {dtag}" - ) - self.log.info(e.stdout) - self.log.error(e.stderr) - self.send_attachments_to_ispyb(attachments) - return False - - restraints = compound_dir / f"{CompoundCode}.restraints.cif" - restraints.rename(compound_dir / f"{CompoundCode}.cif") - ligand_pdb = compound_dir / f"{CompoundCode}.xyz.pdb" - ligand_pdb.rename(compound_dir / f"{CompoundCode}.pdb") - + # Restraints (grade2) were generated upstream by the ligand-restraints job. ligand_cif = compound_dir / f"{CompoundCode}.cif" - self.log.info(f"Restraints generated succesfully for dtag {dtag}") + attachments = [] self.log.info(f"Removing crystallisation components from pdb file for {dtag}") - self.process_pdb_file(dimple_pdb) + process_pdb_file(dimple_pdb, self.log) self.log.info(f"Launching pipedream for {dtag}") # ------------------------------------------------------- @@ -155,7 +125,7 @@ def run(self): buster_report = report_dir / "report.pdf" pipedream_summary = out_dir / "pipedream_summary.json" - self.save_dataset_metadata( + save_dataset_metadata( str(pipedream_dir), str(compound_dir), str(out_dir), @@ -163,6 +133,7 @@ def run(self): smiles, pipedream_command, dtag, + self.log, ) pictures_dir = report_dir / "ligand/pictures" @@ -260,94 +231,6 @@ def run(self): self.send_attachments_to_ispyb(attachments) return True - def process_pdb_file(self, dimple_pdb: Path): - if dimple_pdb.exists(): - with open(dimple_pdb, "r", encoding="utf-8") as f: - lines = f.readlines() - - # Count removals by component type - original_count = len(lines) - components_to_remove = ["DMS", "EDO", "GOL", "SO4", "PO4", "PEG"] - removed_counts = dict.fromkeys(components_to_remove, 0) - - kept_lines = [] - for line in lines: - if any(res in line for res in components_to_remove): - # Count which component was found - for comp in components_to_remove: - if comp in line: - removed_counts[comp] += 1 - break - else: - kept_lines.append(line) - - # Write cleaned file - with open(dimple_pdb, "w", encoding="utf-8") as f: - f.writelines(kept_lines) - - removed_total = original_count - len(kept_lines) - if removed_total > 0: - component_summary = ", ".join( - [ - f"{comp}: {count}" - for comp, count in removed_counts.items() - if count > 0 - ] - ) - self.log.debug(f"Removed {removed_total} lines. ({component_summary})") - - else: - self.log.debug(f"Dimple pdb {dimple_pdb} does not exist") - return True - - def save_dataset_metadata( - self, - pipedream_dir, - input_dir, - output_dir, - compound_code, - smiles_string, - pipedream_cmd, - dtag, - ): - metadata = { - "Input_dir": input_dir, - "CompoundCode": compound_code, - "PipedreamDirectory": output_dir, - "ReportHTML": f"{output_dir}/report-{compound_code}/index.html", - "LigandReportHTML": f"{output_dir}/report-{compound_code}/ligand/index.html", - "ExpectedSummary": f"{output_dir}/pipedream_summary.json", - "PipedreamCommand": pipedream_cmd, - "ExpectedCIF": os.path.join(input_dir, f"{compound_code}.cif"), - "ExpectedPDB": os.path.join(input_dir, f"{compound_code}.pdb"), - "InputSMILES": smiles_string, - } - - output_yaml = {} - output_yaml[dtag] = metadata - json_file = f"{pipedream_dir}/Pipedream_output.json" - if not os.path.exists(json_file): - open(json_file, "w").close() - - # Acquire a lock - with portalocker.Lock(json_file, timeout=5): - if os.path.exists(json_file) and os.path.getsize(json_file) > 0: - with open(json_file, "r", encoding="utf-8") as f: - try: - data = json.load(f) - except Exception as e: - self.log.debug( - f"Cannot continue with pipedream postprocessing: {e}" - ) - return - else: - data = {} - - data.update(output_yaml) - - with open(json_file, "w", encoding="utf-8") as f: - json.dump(data, f, indent=2) - def send_attachments_to_ispyb(self, attachments): for f in attachments: if f.exists(): @@ -374,12 +257,3 @@ def send_attachments_to_ispyb(self, attachments): self.log.warning( f"Could not attach {f.name} to ISPyB", exc_info=True ) - - def get_rscc_rhofit(self, rhofit_dir) -> float: - RHOFIT_HIT_LOG = "Hit_corr.log" - # Actually gets the best RSCC of any fit, not -strictly- the one rhofit chose - with open(rhofit_dir / RHOFIT_HIT_LOG, "r") as f: - lines = f.readlines() - - rscc = max(float(line.split()[1]) for line in lines if line.strip()) - return rscc diff --git a/src/dlstbx/wrapper/xchem_collate.py b/src/dlstbx/wrapper/xchem_collate.py index 4c3a6aff3..a73b1dae3 100644 --- a/src/dlstbx/wrapper/xchem_collate.py +++ b/src/dlstbx/wrapper/xchem_collate.py @@ -1,13 +1,18 @@ from __future__ import annotations -import datetime -import os import shutil -import sqlite3 import subprocess -from itertools import groupby from pathlib import Path +from dlstbx.util.pipedream_xchem_helpers import ( + cleanup_setvar_files, + write_pipedream_parameters, +) +from dlstbx.util.soakdb import prepare_auto_db, updatable_crystals +from dlstbx.util.xchem_collate_helpers import ( + symlink_score_buckets, + update_xchem_database, +) from dlstbx.wrapper import Wrapper @@ -15,6 +20,10 @@ class XChemCollateWrapper(Wrapper): _logger_name = "dlstbx.wrap.xchem_collate" def run(self): + """Performs collation of PanDDA2 & Pipedream results for a labxchem visit. + Runs automated model selection and re-integrates results back into soakDB, + and XChem evironment.""" + assert hasattr(self, "recwrap"), "No recipewrapper object found" self.log.info( f"Running recipewrap file {self.recwrap.recipe_step['parameters']['recipewrapper']}" @@ -22,11 +31,12 @@ def run(self): params = self.recwrap.recipe_step["job_parameters"] pipedream = params.get("pipedream") + overwrite = params.get("overwrite") processing_dir = Path(params.get("processing_directory")) auto_dir = processing_dir / "auto" analysis_dir = auto_dir / "analysis" pandda_dir = analysis_dir / "pandda2" - model_dir = pandda_dir / "model_building" + model_dir = analysis_dir / "model_building" panddas_dir = Path(pandda_dir / "panddas") pipedream_dir = analysis_dir / "pipedream" @@ -56,22 +66,38 @@ def run(self): self.log.error(e.stderr) # ------------------------------------------------------- - # Perform model selection (pandda/pipedream) & re-integrate into XChem environment + # Perform model selection (PanDDA2/Pipedream) & re-integrate into XChem environment try: - self.update_xchem_database( - processing_dir, model_dir, pipedream_dir, panddas_dir - ) + db_copy = prepare_auto_db(processing_dir) + updatable = updatable_crystals(db_copy, overwrite) except Exception as e: - self.log.error(f"Exception updating database for {processing_dir}: {e}") + self.log.error(f"Could not prepare auto db for {processing_dir}: {e}") + db_copy, updatable = None, None + + if updatable is not None: + try: + symlink_score_buckets(panddas_dir, pandda_dir, updatable, self.log) + except Exception as e: + self.log.error(f"Exception bucketing scores for {panddas_dir}: {e}") + + try: + update_xchem_database( + model_dir, pipedream_dir, panddas_dir, db_copy, updatable, self.log + ) + except Exception as e: + self.log.error(f"Exception updating database for {processing_dir}: {e}") # ------------------------------------------------------- # Perform Pipedream collate --> html output if pipedream: - pipedream_command = f"module load mamba; mamba activate /dls/science/groups/i04-1/software/micromamba/envs/xchem; \ - python /dls/science/groups/i04-1/software/pipedream_xchem/collate_pipedream_results.py --input {pipedream_dir / 'Pipedream_output.json'} --no-browser --no-plots -v" + xchem_python = ( + "/dls/science/groups/i04-1/software/micromamba/envs/xchem/bin/python" + ) + pipedream_command = f"{xchem_python} /dls/science/groups/i04-1/software/pipedream_xchem/collate_pipedream_results.py \ + --input {pipedream_dir / 'Pipedream_output.json'} --output-dir {pipedream_dir / 'Pipedream_results'} --no-browser --no-plots -v" - self.log.info(f"Running XChemCollate command: {pipedream_command}") + self.log.info(f"Running Collate command: {pipedream_command}") try: subprocess.run( @@ -79,23 +105,34 @@ def run(self): shell=True, capture_output=True, text=True, - cwd=panddas_dir, + cwd=pipedream_dir, check=True, timeout=params.get("timeout-minutes") * 60, ) except subprocess.CalledProcessError as e: - self.log.error(f"XChemCollate command: '{pipedream_command}' failed") - self.log.info(e.stdout) - self.log.error(e.stderr) + self.log.error( + f"Pipedream collate command failed (exit {e.returncode})\n" + f"--- stdout ---\n{e.stdout}\n--- stderr ---\n{e.stderr}" + ) + + try: + write_pipedream_parameters( + processing_dir, pipedream_dir, logger=self.log + ) + except Exception as e: + self.log.error( + f"Could not write pipedream parameters for {pipedream_dir}: {e}" + ) + else: self.log.info( f"Skipping collation of Pipedream results for {pipedream_dir}" ) # ------------------------------------------------------- - # Perform XChemAlign collate - xca_dir = analysis_dir / "xchem_align" + # Perform XChemAlign collate step + xca_dir = processing_dir / "analysis" / "xchemalign" config = xca_dir / "config.yaml" assemblies = xca_dir / "assemblies.yaml" @@ -104,14 +141,16 @@ def run(self): f"No config/assemblies .yaml in {str(xca_dir)}, skipping autoXCA" ) else: - autoxca_dir = auto_dir / "xchem_align" # do relative paths work from here? + autoxca_dir = auto_dir / "xchemalign" + autoxca_dir.mkdir(parents=True, exist_ok=True) shutil.copy(config, autoxca_dir / "config.yaml") shutil.copy(assemblies, autoxca_dir / "assemblies.yaml") - xca_command = f"source /dls/science/groups/i04-1/software/xchem-align/act; conda activate /dls/science/groups/i04-1/software/xchem-align/env_xchem_align; \ - python -m xchemalign.collator -d {xca_dir}; python -m xchemalign.aligner -d {xca_dir}" + xca_python = "/dls/science/groups/i04-1/software/xchem-align/env_xchem_align/bin/python" + xca_command = f"{xca_python} -m xchemalign.collator -d {xca_dir} && \ + {xca_python} -m xchemalign.aligner -d {xca_dir}" - self.log.info("Running XCA command: {xca_command}") + self.log.info(f"Running XCA command: {xca_command}") try: subprocess.run( @@ -125,205 +164,16 @@ def run(self): ) except subprocess.CalledProcessError as e: - self.log.error(f"XCA command: '{xca_command}' failed") - self.log.info(e.stdout) - self.log.error(e.stderr) - - self.log.info("Auto XChemCollate finished successfully") - return True - - def find_pipedream_model(self, pipedream_dir, dtag, rscc_thresh): - RHOFIT_HIT_LOG = "Hit_corr.log" - - dataset_dir = pipedream_dir / dtag - rhofit_dir = next(dataset_dir.glob("rhofit-*"), None) - postrefine_dir = next(dataset_dir.glob("postrefine-*"), None) - - if not rhofit_dir: - return None - - if not postrefine_dir: - return None - - hit_log = rhofit_dir / RHOFIT_HIT_LOG - if not hit_log.exists(): - return None - - with open(hit_log) as f: - rscc = max(float(line.split()[1]) for line in f if line.strip()) - - refine_pdb = postrefine_dir / "refine.pdb" - return ( - (refine_pdb, rscc) - if refine_pdb.exists() and rscc > rscc_thresh - else (None, None) - ) - - def find_pandda_model(self, panddas_dir, dtag) -> Path | None: - pandda_dataset_dir = panddas_dir / "processed_datasets" / f"{dtag}" - pandda_model = ( - pandda_dataset_dir / "modelled_structures" / f"{dtag}-pandda-model.pdb" - ) - model_path = pandda_model if pandda_model.exists() else None - return model_path - - def update_xchem_database( - self, processing_dir, model_dir, pipedream_dir, panddas_dir - ): - """Exports results to SoakDB database""" - - db_master = processing_dir / "database" / "soakDBDataFile.sqlite" - db_copy = processing_dir / "auto/database" / "autosoakDBDataFile.sqlite" - - if not db_copy.exists(): - Path(db_copy.parents[0]).mkdir(parents=True, exist_ok=True) - shutil.copy(db_master, db_copy) - - self.sync_schema_from_master(db_master, db_copy, "mainTable") - self.sync_rows_from_master(db_master, db_copy, "mainTable") - - # Build list of update dicts for batch update - db_dicts = [] - for dir in model_dir.iterdir(): # or iterate through results.json entries - dtag = dir.name - db_timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S.%f")[:-4] - pipedream_model, rscc = self.find_pipedream_model( - pipedream_dir, dtag, rscc_thresh=0.7 - ) - pandda_model = self.find_pandda_model(panddas_dir, dtag) - - # Export - if pipedream_model: - self.log.info(f"Selected Pipedream model for {dtag}") - - # Determine ligand confidence based on overall ligandcc value - if rscc >= 0.8: - RefinementLigandConfidence = "4 - High Confidence" - RefinementOutome = "4 - CompChem ready" - elif rscc >= 0.7: - RefinementLigandConfidence = "2 - Correct ligand, weak density" - RefinementOutome = "3 - In Refinement" - - db_dicts.append( - { - "CrystalName": dtag, - "RefinementBoundConformation": str(pipedream_model), - "RefinementOutcome": RefinementOutome, - "RefinementLigandConfidence": RefinementLigandConfidence, - "RefinementLigandCC": rscc, - # "RefinementCIF": str(model_dir / dtag / compound /) - "RefinementCIFprogram": "Grade2", - "LastUpdated": db_timestamp, - "LastUpdated_by": "gda2", - } - ) - - elif pandda_model: - self.log.info(f"Selected PanDDA2 model for {dtag}") - db_dicts.append( - { - "CrystalName": dtag, - "RefinementBoundConformation": str(pandda_model), - "RefinementOutcome": "2 - PANDDA model", - "RefinementCIFprogram": "Grade2", - "LastUpdated": db_timestamp, - "LastUpdated_by": "gda2", - } - ) - else: - self.log.info(f"No model selected for {dtag}") - db_dicts.append( - { - "CrystalName": dtag, - "RefinementOutcome": "7 - Analysed & Rejected", - "LastUpdated": db_timestamp, - "LastUpdated_by": "gda2", - } + self.log.error( + f"XCA command failed (exit {e.returncode})\n" + f"--- stdout ---\n{e.stdout}\n--- stderr ---\n{e.stderr}" ) + # Clean up orphaned autoBUSTER setvar logs left in the pipedream dir try: - self.update_data_source_bulk(db_dicts, db_copy) - self.log.debug(f"Bulk updated {db_copy} for {len(db_dicts)} datasets") + cleanup_setvar_files(pipedream_dir, self.log) except Exception as e: - self.log.debug(f"Could not bulk update {db_copy}: {e}") - - def update_data_source_bulk(self, db_dicts, database_path): - # Group dicts that share the same columns together - keyed = sorted(db_dicts, key=lambda d: tuple(sorted(d))) + self.log.error(f"Could not clean up setvar logs in {pipedream_dir}: {e}") - conn = sqlite3.connect(database_path, timeout=30) - try: - cursor = conn.cursor() - for keys, group in groupby(keyed, key=lambda d: tuple(sorted(d))): - columns = [k for k in keys if k != "CrystalName"] - sql = ( - "UPDATE mainTable SET " - + ", ".join([f"{col} = :{col}" for col in columns]) - + " WHERE CrystalName = :CrystalName" - + " AND RefinementOutcome IS NULL" - ) - cursor.executemany(sql, list(group)) - conn.commit() - except Exception: - conn.rollback() - raise - finally: - conn.close() - - def update_data_source(self, db_dict, dtag, database_path): - sql = ( - "UPDATE mainTable SET " - + ", ".join([f"{k} = :{k}" for k in db_dict]) - + f" WHERE CrystalName = '{dtag}'" - + " AND RefinementOutcome IS NULL" - ) - conn = sqlite3.connect(database_path, timeout=60) - # conn.execute("PRAGMA journal_mode=WAL;") - cursor = conn.cursor() - cursor.execute(sql, db_dict) - conn.commit() - conn.close() - - def sync_rows_from_master(self, master_path, copy_path, table): - conn = sqlite3.connect(copy_path) - conn.execute(f"ATTACH DATABASE '{master_path}' AS master") - - # Insert only rows from master that don't already exist in the copy - conn.execute(f""" - INSERT OR IGNORE INTO {table} - SELECT * FROM master.{table} - """) - - conn.commit() - conn.execute("DETACH DATABASE master") - conn.close() - - def sync_schema_from_master(self, db_master, db_copy, table): - """Add any columns present in master but missing from copy. - Preserves column type from master.""" - - with sqlite3.connect(db_master) as master_conn: - master_col_defs = { - row[1]: row[2] - for row in master_conn.execute(f"PRAGMA table_info({table})") - } - - with sqlite3.connect(db_copy) as copy_conn: - copy_cols = { - row[1] for row in copy_conn.execute(f"PRAGMA table_info({table})") - } - - new_cols = set(master_col_defs.keys()) - copy_cols - for col in new_cols: - col_type = master_col_defs[col] - copy_conn.execute(f"ALTER TABLE {table} ADD COLUMN {col} {col_type}") - - copy_conn.commit() - - def safe_symlink(self, src, dst): - try: - if os.path.islink(dst) or os.path.exists(dst): - os.remove(dst) - os.symlink(src, dst) - except Exception as e: - self.log.error(f"Error creating symlink from {src} to {dst}: {e}") + self.log.info("Auto XChemCollate finished successfully") + return True