From 9045b9ae8f25cd1b75361d1b8fa94f3096277c86 Mon Sep 17 00:00:00 2001 From: Julia Varga Date: Mon, 12 Jan 2026 19:16:36 +0200 Subject: [PATCH] Added mmCIF format support with automatic detection. Implemented parse_cif() to extract same data as parse_pdb(). Converts mmCIF to PDB format for OpenBabel. Drop-in replacement, no downstream changes needed. --- plip/structure/preparation.py | 422 +++++++++++++++++++++++++++++++--- 1 file changed, 394 insertions(+), 28 deletions(-) diff --git a/plip/structure/preparation.py b/plip/structure/preparation.py index 554dade..cd4bc2b 100644 --- a/plip/structure/preparation.py +++ b/plip/structure/preparation.py @@ -2,7 +2,7 @@ import os import re import tempfile -from collections import namedtuple +from collections import namedtuple, defaultdict from operator import itemgetter import numpy as np @@ -22,13 +22,47 @@ class PDBParser: + """Parser for both PDB and mmCIF format files. + Automatically detects format and produces unified output.""" + def __init__(self, pdbpath, as_string): self.as_string = as_string self.pdbpath = pdbpath self.num_fixed_lines = 0 self.covlinkage = namedtuple("covlinkage", "id1 chain1 pos1 conf1 id2 chain2 pos2 conf2") self.pdb_file_was_corrected = False - self.proteinmap, self.modres, self.covalent, self.altconformations, self.corrected_pdb = self.parse_pdb() + + # Detect format + self.format = self._detect_format() + logger.debug(f'detected format: {self.format}') + + # Parse according to format + self.proteinmap, self.modres, self.covalent, self.altconformations, self.corrected_pdb = self.parse() + + def _detect_format(self): + """Detect whether input is PDB or mmCIF format.""" + if self.as_string: + content = self.pdbpath + else: + with open(self.pdbpath, 'r') as f: + # Read first few lines to detect format + first_lines = ''.join([f.readline() for _ in range(20)]) + + content = first_lines + + # mmCIF indicators + cif_indicators = ['data_', 'loop_', '_atom_site.', '_entry.id', '_cell.'] + if any(indicator in content for indicator in cif_indicators): + return 'cif' + + return 'pdb' + + def parse(self): + """Main parsing dispatcher.""" + if self.format == 'cif': + return self.parse_cif() + else: + return self.parse_pdb() def parse_pdb(self): """Extracts additional information from PDB files. @@ -40,11 +74,11 @@ def parse_pdb(self): IV. Alternative conformations """ if self.as_string: - fil = self.pdbpath.rstrip('\n').split('\n') # Removing trailing newline character + fil = self.pdbpath.rstrip('\n').split('\n') else: - f = read(self.pdbpath) - fil = f.readlines() - f.close() + with open(self.pdbpath, 'r') as f: + fil = f.readlines() + corrected_lines = [] i, j = 0, 0 # idx and PDB numbering d = {} @@ -54,33 +88,32 @@ def parse_pdb(self): previous_ter = False model_dict = {0: list()} + # Standard without fixing if not config.NOFIX: if not config.PLUGIN_MODE: - lastnum = 0 # Atom numbering (has to be consecutive) + lastnum = 0 other_models = False - # Model 0 stores header and similar additional data - # or the full file if no MODEL entries exist in the file current_model = 0 + for line in fil: corrected_line, newnum = self.fix_pdbline(line, lastnum) if corrected_line is not None: if corrected_line.startswith('MODEL'): - # reset atom number when new model is encountered lastnum = 0 - try: # Get number of MODEL (1,2,3) + try: model_num = int(corrected_line[10:14]) - # initialize storage for new model model_dict[model_num] = list() current_model = model_num - if model_num > 1: # MODEL 2,3,4 etc. + if model_num > 1: other_models = True except ValueError: logger.debug(f'ignoring invalid MODEL entry: {corrected_line}') else: lastnum = newnum model_dict[current_model].append(corrected_line) - # select model + + # Select model try: if other_models: logger.info(f'selecting model {config.MODEL} for analysis') @@ -105,7 +138,6 @@ def parse_pdb(self): for line in corrected_lines: if line.startswith(("ATOM", "HETATM")): - # Retrieve alternate conformations atomid, location = int(line[6:11]), line[16] location = 'A' if location == ' ' else location if location != 'A': @@ -119,17 +151,334 @@ def parse_pdb(self): j += 2 d[i] = j previous_ter = False - # Numbering Changes at TER records + if line.startswith("TER"): previous_ter = True - # Get modified residues + if line.startswith("MODRES"): modres.add(line[12:15].strip()) - # Get covalent linkages between ligands + if line.startswith("LINK"): covalent.append(self.get_linkage(line)) + return d, modres, covalent, alt, corrected_pdb + def parse_cif(self): + """Parse mmCIF format file and extract the same information as parse_pdb.""" + if self.as_string: + content = self.pdbpath.rstrip('\n') + else: + with open(self.pdbpath, 'r') as f: + content = f.read() + + lines = content.split('\n') + + # Parse mmCIF into dictionary structure + cif_data = self._parse_cif_structure(lines) + + # Extract information + proteinmap = {} + modres = set() + covalent = [] + alt = [] + + # Get atom site data + atom_sites = cif_data.get('_atom_site', []) + + if not atom_sites: + logger.warning('no atom site data found in mmCIF file') + return {}, set(), [], [], "" + + # Process models + model_dict = defaultdict(list) + + # Group atoms by model + for atom_data in atom_sites: + model_num = int(atom_data.get('pdbx_PDB_model_num', '1')) + model_dict[model_num].append(atom_data) + + # Select model + available_models = sorted(model_dict.keys()) + if len(available_models) > 1: + logger.info(f'multiple models found: {available_models}') + if config.MODEL in available_models: + selected_model = config.MODEL + logger.info(f'selecting model {selected_model} for analysis') + else: + selected_model = available_models[0] + logger.warning(f'model {config.MODEL} not found, using model {selected_model}') + config.MODEL = selected_model + else: + selected_model = available_models[0] if available_models else 1 + + selected_atoms = model_dict[selected_model] + + # Build proteinmap and detect alternate conformations + i = 0 # OpenBabel idx (continuous) + j = 0 # PDB numbering (with TER gaps) + + prev_chain = None + prev_resnum = None + prev_is_het = None + + pdb_lines = [] + + for atom_data in selected_atoms: + group_pdb = atom_data.get('group_PDB', 'ATOM') + atom_id = int(atom_data.get('id', '0')) + type_symbol = atom_data.get('type_symbol', 'X') + atom_label = atom_data.get('label_atom_id', 'X') + alt_id = atom_data.get('label_alt_id', '.') + comp_id = atom_data.get('label_comp_id', 'UNK') + asym_id = atom_data.get('label_asym_id', 'A') + seq_id = atom_data.get('label_seq_id', '.') + auth_seq_id = atom_data.get('auth_seq_id', '1') + auth_asym_id = atom_data.get('auth_asym_id', asym_id) + + # Use auth identifiers preferentially (as in original PDB) + chain_id = auth_asym_id if auth_asym_id != '?' else asym_id + res_num = auth_seq_id if auth_seq_id != '?' else seq_id + + # Coordinates + x = float(atom_data.get('Cartn_x', '0.0')) + y = float(atom_data.get('Cartn_y', '0.0')) + z = float(atom_data.get('Cartn_z', '0.0')) + + occupancy = atom_data.get('occupancy', '1.00') + b_factor = atom_data.get('B_iso_or_equiv', '0.00') + + # Detect chain/residue changes for TER insertion + is_het = (group_pdb == 'HETATM') + + # Check if we need to insert TER + if prev_chain is not None and prev_is_het is not None: + if (chain_id != prev_chain or + (not prev_is_het and is_het) or + (prev_is_het and not is_het and chain_id == prev_chain)): + # Insert TER in mapping + j += 1 + pdb_lines.append('TER\n') + + i += 1 + j += 1 + proteinmap[i] = j + + # Track alternate conformations + if alt_id not in ['.', '?', '', 'A']: + alt.append(j) + + # Convert alt_id for PDB format + pdb_alt_id = ' ' if alt_id in ['.', '?', ''] else alt_id + + # Create PDB line (relaxed format for OpenBabel) + try: + res_num_int = int(res_num) + except (ValueError, TypeError): + res_num_int = 1 + + pdb_line = f"{group_pdb:<6}{j:>5} {atom_label:<4}{pdb_alt_id}{comp_id:>3} {chain_id}{res_num_int:>4} {x:>8.3f}{y:>8.3f}{z:>8.3f}{occupancy:>6}{b_factor:>6} {type_symbol:>2}\n" + pdb_lines.append(pdb_line) + + prev_chain = chain_id + prev_resnum = res_num + prev_is_het = is_het + + # Extract modified residues + mod_residues = cif_data.get('_pdbx_struct_mod_residue', []) + for mod_res in mod_residues: + label_comp_id = mod_res.get('label_comp_id', '') + if label_comp_id: + modres.add(label_comp_id) + + # Extract covalent linkages + struct_conns = cif_data.get('_struct_conn', []) + for conn in struct_conns: + conn_type = conn.get('conn_type_id', '') + # Only process covalent links + if conn_type.lower() in ['covale', 'covale_base', 'covale_sugar', 'covale_phosphate', 'disulf']: + try: + id1 = conn.get('ptnr1_label_comp_id', '') + chain1 = conn.get('ptnr1_auth_asym_id', '') + pos1_str = conn.get('ptnr1_auth_seq_id', '0') + conf1 = conn.get('pdbx_ptnr1_label_alt_id', '') + + id2 = conn.get('ptnr2_label_comp_id', '') + chain2 = conn.get('ptnr2_auth_asym_id', '') + pos2_str = conn.get('ptnr2_auth_seq_id', '0') + conf2 = conn.get('pdbx_ptnr2_label_alt_id', '') + + # Handle missing/invalid values + if conf1 in ['.', '?', '']: conf1 = '' + if conf2 in ['.', '?', '']: conf2 = '' + if chain1 in ['.', '?', '']: chain1 = 'A' + if chain2 in ['.', '?', '']: chain2 = 'A' + + try: + pos1 = int(pos1_str) + pos2 = int(pos2_str) + + covalent.append(self.covlinkage( + id1=id1, chain1=chain1, pos1=pos1, conf1=conf1, + id2=id2, chain2=chain2, pos2=pos2, conf2=conf2 + )) + except (ValueError, TypeError): + logger.debug(f'skipping struct_conn with invalid residue numbers') + except Exception as e: + logger.debug(f'error parsing struct_conn entry: {e}') + + # Create corrected PDB string + corrected_pdb = ''.join(pdb_lines) + self.pdb_file_was_corrected = True + + return proteinmap, modres, covalent, alt, corrected_pdb + + def _parse_cif_structure(self, lines): + """Parse mmCIF structure into a dictionary of categories. + Returns dict with category names as keys and list of dicts as values.""" + cif_data = {} + current_category = None + current_columns = [] + in_loop = False + + i = 0 + while i < len(lines): + line = lines[i].strip() + + # Skip comments and empty lines + if not line or line.startswith('#'): + i += 1 + continue + + # Handle loop + if line.startswith('loop_'): + in_loop = True + current_columns = [] + current_category = None + i += 1 + continue + + # Handle data block + if line.startswith('data_'): + in_loop = False + current_category = None + i += 1 + continue + + # Handle category/column definition + if line.startswith('_'): + if in_loop: + # Column definition in loop + parts = line.split('.', 1) + if len(parts) == 2: + category = parts[0] + column = parts[1].split()[0] + + if current_category is None: + current_category = category + cif_data[current_category] = [] + + current_columns.append(column) + i += 1 + continue + else: + # Single value + parts = line.split(None, 1) + if len(parts) >= 1: + full_key = parts[0] + # Get value (could be on same line or next line) + if len(parts) == 2: + value = parts[1].strip().strip('"').strip("'") + else: + # Value might be on next line + i += 1 + if i < len(lines): + value = lines[i].strip().strip('"').strip("'") + else: + value = '' + + # Store as single-item list for consistency + if full_key not in cif_data: + cif_data[full_key] = [] + cif_data[full_key].append({full_key: value}) + i += 1 + continue + + # Data values in loop + if in_loop and current_category and current_columns: + # Check if this line starts a new category (next loop_ or _) + if line.startswith('_') or line.startswith('loop_'): + in_loop = False + current_category = None + current_columns = [] + continue + + # Parse data values + values = self._parse_cif_line(line) + + # If we have fewer values than columns, might be multiline + while len(values) < len(current_columns) and i + 1 < len(lines): + i += 1 + next_line = lines[i].strip() + if next_line.startswith('_') or next_line.startswith('loop_') or next_line.startswith('#'): + i -= 1 + break + values.extend(self._parse_cif_line(next_line)) + + if len(values) >= len(current_columns): + row_data = {} + for col_idx, col_name in enumerate(current_columns): + if col_idx < len(values): + row_data[col_name] = values[col_idx].strip('"').strip("'") + cif_data[current_category].append(row_data) + + # If we have extra values, they belong to next row(s) + extra_values = values[len(current_columns):] + while len(extra_values) >= len(current_columns): + row_data = {} + for col_idx, col_name in enumerate(current_columns): + row_data[col_name] = extra_values[col_idx].strip('"').strip("'") + cif_data[current_category].append(row_data) + extra_values = extra_values[len(current_columns):] + + i += 1 + + return cif_data + + def _parse_cif_line(self, line): + """Parse a single line of CIF data values, handling quotes and semicolons.""" + values = [] + current_value = '' + in_quotes = False + quote_char = None + + i = 0 + while i < len(line): + char = line[i] + + if not in_quotes: + if char in ['"', "'"]: + in_quotes = True + quote_char = char + elif char.isspace(): + if current_value: + values.append(current_value) + current_value = '' + else: + current_value += char + else: + if char == quote_char: + in_quotes = False + quote_char = None + else: + current_value += char + + i += 1 + + if current_value: + values.append(current_value) + + return values + def fix_pdbline(self, pdbline, lastnum): """Fix a PDB line if information is missing.""" pdbqt_conversion = { @@ -139,49 +488,59 @@ def fix_pdbline(self, pdbline, lastnum): new_num = 0 forbidden_characters = "[^a-zA-Z0-9_]" pdbline = pdbline.strip('\n') - # Some MD / Docking tools produce empty lines, leading to segfaults + + # Empty or too long lines if len(pdbline.strip()) == 0: self.num_fixed_lines += 1 return None, lastnum - if len(pdbline) > 100: # Should be 80 long + if len(pdbline) > 100: self.num_fixed_lines += 1 return None, lastnum - # TER Entries also have continuing numbering, consider them as well + + # TER entries if pdbline.startswith('TER'): - if not pdbline[6:11]: # pdb files saved from PyMol skip the number in TER entries + if not pdbline[6:11].strip(): new_num = lastnum else: new_num = lastnum + 1 + if pdbline.startswith('ATOM'): new_num = lastnum + 1 current_num = int(pdbline[6:11]) resnum = pdbline[22:27].strip() resname = pdbline[17:21].strip() + # Invalid residue number try: int(resnum) except ValueError: pdbline = pdbline[:22] + ' 0 ' + pdbline[27:] fixed = True + # Invalid characters in residue name if re.match(forbidden_characters, resname.strip()): pdbline = pdbline[:17] + 'UNK ' + pdbline[21:] fixed = True + if lastnum + 1 != current_num: pdbline = pdbline[:6] + (5 - len(str(new_num))) * ' ' + str(new_num) + ' ' + pdbline[12:] fixed = True + # No chain assigned if pdbline[21] == ' ': pdbline = pdbline[:21] + 'A' + pdbline[22:] fixed = True + if pdbline.endswith('H'): self.num_fixed_lines += 1 return None, lastnum - # Sometimes, converted PDB structures contain PDBQT atom types. Fix that. + + # PDBQT atom types for pdbqttype in pdbqt_conversion: if pdbline.strip().endswith(pdbqttype): pdbline = pdbline.strip()[:-2] + ' ' + pdbqt_conversion[pdbqttype] + '\n' self.num_fixed_lines += 1 + if pdbline.startswith('HETATM'): new_num = lastnum + 1 try: @@ -189,18 +548,22 @@ def fix_pdbline(self, pdbline, lastnum): except ValueError: current_num = None logger.debug(f'invalid HETATM entry: {pdbline}') + if lastnum + 1 != current_num: pdbline = pdbline[:6] + (5 - len(str(new_num))) * ' ' + str(new_num) + ' ' + pdbline[12:] fixed = True - # No chain assigned or number assigned as chain + + # No chain assigned if pdbline[21] == ' ': pdbline = pdbline[:21] + 'Z' + pdbline[22:] fixed = True - # No residue number assigned + + # No residue number if pdbline[23:26] == ' ': pdbline = pdbline[:23] + '999' + pdbline[26:] fixed = True - # Non-standard Ligand Names + + # Non-standard ligand names ligname = pdbline[17:21].strip() if len(ligname) > 3: pdbline = pdbline[:17] + ligname[:3] + ' ' + pdbline[21:] @@ -211,14 +574,17 @@ def fix_pdbline(self, pdbline, lastnum): if len(ligname.strip()) == 0: pdbline = pdbline[:17] + 'LIG ' + pdbline[21:] fixed = True + if pdbline.endswith('H'): self.num_fixed_lines += 1 return None, lastnum - # Sometimes, converted PDB structures contain PDBQT atom types. Fix that. + + # PDBQT atom types for pdbqttype in pdbqt_conversion: if pdbline.strip().endswith(pdbqttype): pdbline = pdbline.strip()[:-2] + ' ' + pdbqt_conversion[pdbqttype] + ' ' self.num_fixed_lines += 1 + self.num_fixed_lines += 1 if fixed else 0 return pdbline + '\n', max(new_num, lastnum) @@ -1702,4 +2068,4 @@ def output_path(self): @output_path.setter def output_path(self, path): - self._output_path = tilde_expansion(path) \ No newline at end of file + self._output_path = tilde_expansion(path)