"""Contains functions for dealing with the .mmtf file format."""
import msgpack
import struct
from collections import deque
from datetime import datetime
from .mmcif import get_structure_from_atom, create_entities, split_residue_id
from .structures import Chain, Ligand
[docs]def mmtf_bytes_to_mmtf_dict(bytestring):
"""Takes the raw bytestring of a .mmtf file and turns it into a normal,
fully decoded JSON dictionary.
:patam bytes bytestring: the .mmtf filestring.
:rtype: ``dict``"""
raw = msgpack.unpackb(bytestring)
return decode_dict(raw)
[docs]def decode_dict(d):
"""Takes a dictionary that might have bytestring keys, lists of bytestring
values, .mmtf binary values, or other weirdness, and returns a dully decoded
version of it which is a JSON-valid dictionary.
:param dict d: the dictionary to read.
:rtype: ``dict``"""
new = {}
for key, value in d.items():
try:
new_value = value.decode()
except: new_value = value
if isinstance(new_value, str) and new_value and new_value[0] == "\x00":
new_value = new_value.encode()
if isinstance(new_value, bytes):
new_value = parse_binary_field(new_value)
if isinstance(new_value, list) and new_value:
if isinstance(new_value[0], dict):
new_value = [decode_dict(x) for x in new_value]
elif isinstance(new_value[0], bytes):
new_value = [x.decode() for x in new_value]
new[key.decode() if isinstance(key, bytes) else key] = new_value
return new
[docs]def parse_binary_field(b):
"""Some fields in a .mmtf file cannot be unpacked by msgpack and have
special .mmtf encoding, as specified in its documentation. This function
takes such a field and decodes it.
:param bytestring b: the field to parse.
:returns: the parsed result (type varies)."""
codec, length, params = struct.unpack(">iii", b[:12])
len4 = lambda b: int(len(b[12:]) / 4)
if codec == 1: return struct.unpack("f" * length, b[12:])
elif codec == 2: return struct.unpack("b" * length, b[12:])
elif codec == 3: return struct.unpack(">" + "h" * length, b[12:])
elif codec == 4: return struct.unpack(">" + "i" * length, b[12:])
elif codec == 5:
chars = struct.unpack("c" * (length * 4), b[12:])
return [b"".join([
c for c in chars[i * 4: (i + 1) * 4] if c != b"\x00"
]).decode() for i in range(length)]
elif codec == 6:
integers = struct.unpack(">" + ("i" * len4(b)), b[12:])
return [chr(c) if c != 0 else "" for c in run_length_decode(integers)]
elif codec == 7:
integers = struct.unpack(">" + ("i" * len4(b)), b[12:])
return run_length_decode(integers)
elif codec == 8:
integers = struct.unpack(">" + ("i" * len4(b)), b[12:])
return delta_decode(run_length_decode(integers))
elif codec == 9:
integers = struct.unpack(">" + ("i" * len4(b)), b[12:])
return [n / params for n in run_length_decode(integers)]
elif codec == 10:
integers = struct.unpack(">" + ("h" * int(len(b[12:]) / 2)), b[12:])
return [n / params for n in delta_decode(recursive_decode(integers))]
else: raise ValueError(".mmtf error: {} is invalid codec".format(codec))
[docs]def run_length_decode(integers):
"""Expands a list of integers where every second integer is a count of the
integer before it.
:param list integers: the integers to decode.
:rtype: ``list``"""
x = []
for index, val in enumerate(integers[::2]):
x += [val] * integers[1::2][index]
return x
[docs]def delta_decode(integers):
"""Turns a list of integers into a new list of integers where the values in
the first are treated as deltas to be applied to the previous value.
:param list integers: the integers to decode.
:rtype: ``list``"""
array, last = [], 0
for i in integers:
last += i
array.append(last)
return array
[docs]def recursive_decode(integers, bits=16):
"""Turns a list of integers into a new list of integers where the values in
the first are merged if it looks like a higher order integer split over two
integers.
(Code here adapted from the official python-mmtf package.)
:param list integers: the integers to decode.
:rtype: ``list``"""
new = []
power = 2 ** (bits - 1)
cutoff = [power - 1, 0 - power]
index = 0
while index < len(integers):
value = 0
while integers[index] in cutoff:
value += integers[index]
index += 1
if integers[index] == 0: break
value += integers[index]
index += 1
new.append(value)
return new
[docs]def mmtf_dict_to_data_dict(mmtf_dict):
"""Converts an .mmtf dictionary into an atomium data dictionary, with the
same standard layout that the other file formats get converted into.
:param dict mmtf_dict: the .mmtf dictionary.
:rtype: ``dict``"""
data_dict = {
"description": {
"code": None, "title": None, "deposition_date": None,
"classification": None, "keywords": [], "authors": []
}, "experiment": {
"technique": None, "source_organism": None, "expression_system": None,
"missing_residues": []
}, "quality": {"resolution": None, "rvalue": None, "rfree": None},
"geometry": {"assemblies": [], "crystallography": {}}, "models": []
}
mmtf_to_data_transfer(mmtf_dict, data_dict,
"description", "code", "structureId")
mmtf_to_data_transfer(mmtf_dict, data_dict,
"description", "title", "title")
mmtf_to_data_transfer(mmtf_dict, data_dict,
"description", "deposition_date", "depositionDate", date=True)
mmtf_to_data_transfer(mmtf_dict, data_dict,
"experiment", "technique", "experimentalMethods", first=True)
mmtf_to_data_transfer(mmtf_dict, data_dict,
"quality", "resolution", "resolution", trim=3)
mmtf_to_data_transfer(mmtf_dict, data_dict,
"quality", "rvalue", "rWork", trim=3)
mmtf_to_data_transfer(mmtf_dict, data_dict,
"quality", "rfree", "rFree", trim=3)
mmtf_to_data_transfer(mmtf_dict, data_dict["geometry"],
"crystallography", "space_group", "spaceGroup")
mmtf_to_data_transfer(mmtf_dict, data_dict["geometry"],
"crystallography", "unit_cell", "unitCell", trim=3)
if data_dict["geometry"]["crystallography"].get("space_group") == "NA":
data_dict["geometry"]["crystallography"] = {}
data_dict["geometry"]["assemblies"] = [{
"id": int(a["name"]), "software": None, "delta_energy": None,
"buried_surface_area": None, "surface_area": None, "transformations": [{
"chains": [mmtf_dict["chainIdList"][i] for i in t["chainIndexList"]],
"matrix": [t["matrix"][n * 4: (n * 4) + 3] for n in range(3)],
"vector": t["matrix"][3:-4:4]} for t in a.get("transformList", [])
]
} for a in mmtf_dict.get("bioAssemblyList", [])]
update_models_list(mmtf_dict, data_dict)
return data_dict
[docs]def update_models_list(mmtf_dict, data_dict):
"""Takes a data dictionary and updates its models list with
information from a .mmtf dictionary.
:param dict mmtf_dict: the .mmtf dictionary to read.
:param dict data_dict: the data dictionary to update."""
atoms = get_atoms_list(mmtf_dict)
group_definitions = get_group_definitions_list(mmtf_dict)
groups = get_groups_list(mmtf_dict, group_definitions)
chains = get_chains_list(mmtf_dict, groups)
for model_num in range(mmtf_dict["numModels"]):
model = {"polymer": {}, "non-polymer": {}, "water": {}, "branched": {}}
for chain_num in range(mmtf_dict["chainsPerModel"][model_num]):
chain = chains[chain_num]
add_chain_to_model(chain, model, atoms)
data_dict["models"].append(model)
[docs]def get_atoms_list(mmtf_dict):
"""Creates a list of atom dictionaries from a .mmtf dictionary by zipping
together some of its fields.
:param dict mmtf_dict: the .mmtf dictionary to read.
:rtype: ``list``"""
return [{
"x": x, "y": y, "z": z, "alt_loc": a or None, "bvalue": b, "occupancy": o,
"id": i, "is_hetatm": False
} for x, y, z, a, b, i, o in zip(
mmtf_dict["xCoordList"], mmtf_dict["yCoordList"], mmtf_dict["zCoordList"],
mmtf_dict["altLocList"], mmtf_dict["bFactorList"], mmtf_dict["atomIdList"],
mmtf_dict["occupancyList"]
)]
[docs]def get_group_definitions_list(mmtf_dict):
"""Gets a list of group definitions from the .mmtf dict and packs its atom
attributes into atoms dicts.
:param dict mmtf_dict: the .mmtf dictionary to read.
:rtype: ``list``"""
group_definitions = []
for group in mmtf_dict["groupList"]:
atoms = [{
"name": name, "element": element.upper(), "charge": charge
} for name, element, charge in zip(
group["atomNameList"], group["elementList"], group["formalChargeList"],
)]
group_definitions.append({
"name": group["groupName"], "atoms": atoms
})
return group_definitions
[docs]def get_groups_list(mmtf_dict, group_definitions):
"""Creates a list of group dictionaries from a .mmtf dictionary by zipping
together some of its fields.
:param dict mmtf_dict: the .mmtf dictionary to read.
:rtype: ``list``"""
sec_struct = [
"helices", None, "helices", "strands", "helices", "strands", None, None
]
return [{
"number": id, "insert": insert, "secondary_structure": sec_struct[ss],
**group_definitions[type_]
} for id, insert, ss, type_, in zip(
mmtf_dict["groupIdList"], mmtf_dict["insCodeList"],
mmtf_dict.get("secStructList", [-1] * len(mmtf_dict["groupIdList"])),
mmtf_dict["groupTypeList"]
)]
[docs]def get_chains_list(mmtf_dict, groups):
"""Creates a list of chain dictionaries from a .mmtf dictionary by zipping
together some of its fields.
:param dict mmtf_dict: the .mmtf dictionary to read.
:rtype: ``list``"""
chains = []
for i_id, id, group_num in zip(mmtf_dict["chainIdList"],
mmtf_dict["chainNameList"], mmtf_dict["groupsPerChain"]):
chain = {"id": id, "internal_id": i_id, "groups": groups[:group_num]}
del groups[:group_num]
for entity in mmtf_dict["entityList"]:
if len(chains) in entity["chainIndexList"]:
chain["type"] = entity["type"]
chain["sequence"] = entity.get("sequence", "")
chain["full_name"] = entity.get("description", None)
break
chains.append(chain)
return chains
[docs]def add_chain_to_model(chain, model, atoms):
"""Adds a 'chain' to a model - a chain in the .mmtf dict, which can also be
a non-polymer.
:param dict chain: the 'chain' to add.
:param dict model: the model to add it to.
:param list atoms: the atoms list to work through."""
if chain["type"] == "polymer" or chain["type"] == "branched":
polymer = {
"internal_id": chain["internal_id"], "sequence": chain["sequence"],
"helices": [], "strands": [], "residues": {}
}
for i, group in enumerate(chain["groups"], start=1):
add_het_to_dict(group, chain, atoms, polymer["residues"], number=i)
add_ss_to_chain(polymer)
model["polymer"][chain["id"]] = polymer
else:
for group in chain["groups"]:
add_het_to_dict(group, chain, atoms, model[chain["type"]])
[docs]def add_het_to_dict(group, chain, atoms, d, number=None):
"""Adds a ligand or water or residue to the appropriate dict. An ID and name
will be generated for it.
:param dict group: the group template the het should be based on.
:param dict chain: the chain (in the real sense) the het is associated with.
:param list atoms: the atoms list to work through.
:param dict d: the dictionary to add to.
:param int number: if given, the residue number to use."""
het_id = f"{chain['id']}.{group['number']}{group['insert']}"
het_atoms = atoms[:len(group["atoms"])]
del atoms[:len(het_atoms)]
het_atoms = {a["id"]: {
"anisotropy": [0] * 6, **a, **g_a
} for a, g_a in zip(het_atoms, group["atoms"])}
for a in het_atoms.values(): del a["id"]
het = {
"name": group["name"], "atoms": het_atoms, "full_name": None,
"secondary_structure": group["secondary_structure"]
}
if number is None:
het["internal_id"] = chain["internal_id"]
het["polymer"] = chain["id"]
het["full_name"] = chain["full_name"]
else:
het["number"] = number
d[het_id] = het
[docs]def add_ss_to_chain(chain):
"""Updates polymer dictionary with secondary structure information, from
information temporarily stored in its residue dicts.
:param dict chain: the chain to update."""
in_ss = {"helices": False, "strands": False}
for res_id, res in chain["residues"].items():
ss = res["secondary_structure"]
if ss:
if not in_ss[ss]:
chain[ss].append([])
in_ss[ss] = True
chain[ss][-1].append(res_id)
else:
if in_ss["helices"]: in_ss["helices"] = False
if in_ss["strands"]: in_ss["strands"] = False
del res["secondary_structure"]
[docs]def mmtf_to_data_transfer(mmtf_dict, data_dict, d_cat, d_key, m_key,
date=False, first=False, trim=False):
"""A function for transfering a bit of data from a .mmtf dictionary to a
data dictionary, or doing nothing if the data doesn't exist.
:param dict mmtf_dict: the .mmtf dictionary to read.
:param dict data_dict: the data dictionary to update.
:param str d_cat: the top-level key in the data dictionary.
:param str d_key: the data dictionary field to update.
:param str m_key: the .mmtf field to read.
:param bool date: if True, the value will be converted to a date.
:param bool first: if True, the value's first item will be split used.
:param int trim: if given, the value will be rounded by this amount."""
try:
value = mmtf_dict[m_key]
if date: value = datetime.strptime(value, "%Y-%m-%d").date()
if first: value = value[0]
if trim:
try:
value = [round(v, trim) for v in value]
except: value = round(value, trim)
data_dict[d_cat][d_key] = value
except: pass
[docs]def structure_to_mmtf_string(structure):
"""Converts a :py:class:`.AtomStructure` to a .mmtf filestring.
No compression is currently performed.
:param AtomStructure structure: the structure to convert.
:rtype: ``bytes``"""
chains, ligands, waters, properties, entities = get_structures(structure)
entity_list = get_entity_list(entities, chains, ligands, waters)
chain_ids, chain_names = get_chain_ids_and_names(chains, ligands, waters)
groups_per_chain = get_groups_per_chain(chains, ligands, waters)
group_types, group_ids, groups, ins = get_groups(chains, ligands, waters)
x, y, z, alt, bfactor, ids, occupancy = zip(*properties)
chain_count = len(chains) + len(ligands) + len(set(l.chain for l in waters))
d = {
"numModels": 1, "numChains": chain_count, "chainsPerModel": [chain_count],
"xCoordList": x, "yCoordList": y, "zCoordList": z, "altLocList": alt,
"bFactorList": bfactor, "atomIdList": ids, "occupancyList": occupancy,
"entityList": entity_list, "chainIdList": chain_ids, "insCodeList": ins,
"chainNameList": chain_names, "groupsPerChain": groups_per_chain,
"groupList": groups, "groupIdList": group_ids, "groupTypeList": group_types
}
return msgpack.packb(d)
[docs]def get_structures(structure):
"""Takes an atomic structure, and creates a list of chains within it, a list
of ligands, a list of waters, a list of relevant atom properties, and a list
of entities.
:param AtomStructure structure: the structure to unpack.
:rtype: ``tuple``"""
chains, ligands, waters, atom_properties = set(), set(), set(), []
for atom in sorted(structure.atoms(), key=lambda a: a.id):
get_structure_from_atom(atom, chains, ligands, waters)
atom_properties.append(list(atom.location) + [
"", atom.bvalue, atom.id, 1
])
chains = sorted(chains, key=lambda c: c._internal_id)
ligands = sorted(ligands, key=lambda l: l._internal_id)
waters = sorted(waters, key=lambda w: w._internal_id)
entities = create_entities(chains, ligands, waters)
return (chains, ligands, waters, atom_properties, entities)
[docs]def get_entity_list(entities, chains, ligands, waters):
"""Takes a list of entity objects, as well as the objects they represent,
and turns them into a list of .mmtf dictionaries.
:param list entities: the entities to pack.
:param list chains: the chains to pack.
:param list ligands: the ligands to pack.
:param list waters: the waters to pack.
:rtype: ``list``"""
entity_list = []
for e in entities:
if isinstance(e, Chain):
entity_list.append({"type": "polymer", "chainIndexList": [
i for i, c in enumerate(chains) if c.sequence == e.sequence
], "sequence": e.sequence})
elif isinstance(e, Ligand) and not e.is_water:
entity_list.append({"type": "non-polymer", "chainIndexList": [
i + len(chains) for i, l in enumerate(ligands) if l._name == e._name
]})
else:
water_chains = set(w.chain for w in waters)
entity_list.append({"type": "water", "chainIndexList": [
i + len(chains) + len(ligands) for i in range(len(water_chains))
]})
return entity_list
[docs]def get_chain_ids_and_names(chains, ligands, waters):
"""Takes lists of chains, ligands and waters, and returns the chain IDs and
chain names that should go in the .mmtf file.
:param list chains: the chains to pack.
:param list ligands: the ligands to pack.
:param list waters: the waters to pack.
:rtype: ``list``"""
chain_ids, chain_names = [], []
for chain in chains:
chain_ids.append(chain._internal_id)
chain_names.append(chain.id)
for ligand in ligands:
chain_ids.append(ligand._internal_id)
chain_names.append(ligand.chain.id)
for water in waters:
if water._internal_id not in chain_ids:
chain_ids.append(water._internal_id)
chain_names.append(water.chain.id)
return (chain_ids, chain_names)
[docs]def get_groups_per_chain(chains, ligands, waters):
"""Takes lists of chains, ligands and waters, and returns the chain counts
that should go in the .mmtf file.
:param list chains: the chains to pack.
:param list ligands: the ligands to pack.
:param list waters: the waters to pack.
:rtype: ``list``"""
groups_per_chain = []
for chain in chains:
groups_per_chain.append(len(chain.residues()))
for ligand in ligands:
groups_per_chain.append(1)
water_chains = sorted(set(w._internal_id for w in waters))
for wc in water_chains:
groups_per_chain.append(len([w for w in waters if w._internal_id == wc]))
return groups_per_chain
[docs]def get_groups(chains, ligands, waters):
"""Creates the relevant lists of group information from chains, ligands and
waters.
:param list chains: the chains to pack.
:param list ligands: the ligands to pack.
:param list waters: the waters to pack.
:rtype: ``tuple``"""
group_types, group_ids, groups, inserts = [], [], [], []
for chain in chains:
for res in chain.residues():
add_het_to_groups(res, group_types, group_ids, groups, inserts)
for ligand in ligands + waters:
add_het_to_groups(ligand, group_types, group_ids, groups, inserts)
return (group_types, group_ids, groups, inserts)
[docs]def add_het_to_groups(het, group_type_list, group_id_list, group_list, ins_list):
"""Updates group lists with information from a single :py:class:`.Het`.
:param Het het: the Het to pack.
:param list group_type_list: the list of group types.
:param list group_id_list: the list of group IDs.
:param list group_list: the list of groups.
:param list ins_list: the list of insertion codes.
:rtype: ``tuple``"""
atoms = sorted(het.atoms(), key=lambda a: a.id)
group = {
"groupName": het._name, "atomNameList": [a._name for a in atoms],
"elementList": [a.element for a in atoms],
"formalChargeList": [a.charge for a in atoms]
}
for i, g in enumerate(group_list):
if g == group:
group_type_list.append(i)
break
else:
group_list.append(group)
group_type_list.append(len(group_list) - 1)
id_, insert = split_residue_id(atoms[0])
group_id_list.append(id_)
ins_list.append(insert if insert != "?" else "")