from itertools import chain
import torch
from scipy.sparse.csgraph import minimum_spanning_tree
from torch_sparse import SparseTensor
from torch_geometric.utils import to_undirected
[docs]def tree_decomposition(mol, return_vocab=False):
r"""The tree decomposition algorithm of molecules from the
`"Junction Tree Variational Autoencoder for Molecular Graph Generation"
<https://arxiv.org/abs/1802.04364>`_ paper.
Returns the graph connectivity of the junction tree, the assignment
mapping of each atom to the clique in the junction tree, and the number
of cliques.
Args:
mol (rdkit.Chem.Mol): A :obj:`rdkit` molecule.
return_vocab (bool, optional): If set to :obj:`True`, will return an
identifier for each clique (ring, bond, bridged compounds, single).
(default: :obj:`False`)
:rtype: (LongTensor, LongTensor, int)
"""
import rdkit.Chem as Chem
# Cliques = rings and bonds.
cliques = [list(x) for x in Chem.GetSymmSSSR(mol)]
xs = [0] * len(cliques)
for bond in mol.GetBonds():
if not bond.IsInRing():
cliques.append([bond.GetBeginAtomIdx(), bond.GetEndAtomIdx()])
xs.append(1)
# Generate `atom2clique` mappings.
atom2clique = [[] for i in range(mol.GetNumAtoms())]
for c in range(len(cliques)):
for atom in cliques[c]:
atom2clique[atom].append(c)
# Merge rings that share more than 2 atoms as they form bridged compounds.
for c1 in range(len(cliques)):
for atom in cliques[c1]:
for c2 in atom2clique[atom]:
if c1 >= c2 or len(cliques[c1]) <= 2 or len(cliques[c2]) <= 2:
continue
if len(set(cliques[c1]) & set(cliques[c2])) > 2:
cliques[c1] = set(cliques[c1]) | set(cliques[c2])
xs[c1] = 2
cliques[c2] = []
xs[c2] = -1
cliques = [c for c in cliques if len(c) > 0]
xs = [x for x in xs if x >= 0]
# Update `atom2clique` mappings.
atom2clique = [[] for i in range(mol.GetNumAtoms())]
for c in range(len(cliques)):
for atom in cliques[c]:
atom2clique[atom].append(c)
# Add singleton cliques in case there are more than 2 intersecting
# cliques. We further compute the "initial" clique graph.
edges = {}
for atom in range(mol.GetNumAtoms()):
cs = atom2clique[atom]
if len(cs) <= 1:
continue
# Number of bond clusters that the atom lies in.
bonds = [c for c in cs if len(cliques[c]) == 2]
# Number of ring clusters that the atom lies in.
rings = [c for c in cs if len(cliques[c]) > 4]
if len(bonds) > 2 or (len(bonds) == 2 and len(cs) > 2):
cliques.append([atom])
xs.append(3)
c2 = len(cliques) - 1
for c1 in cs:
edges[(c1, c2)] = 1
elif len(rings) > 2:
cliques.append([atom])
xs.append(3)
c2 = len(cliques) - 1
for c1 in cs:
edges[(c1, c2)] = 99
else:
for i in range(len(cs)):
for j in range(i + 1, len(cs)):
c1, c2 = cs[i], cs[j]
count = len(set(cliques[c1]) & set(cliques[c2]))
edges[(c1, c2)] = min(count, edges.get((c1, c2), 99))
# Update `atom2clique` mappings.
atom2clique = [[] for i in range(mol.GetNumAtoms())]
for c in range(len(cliques)):
for atom in cliques[c]:
atom2clique[atom].append(c)
if len(edges) > 0:
edge_index_T, weight = zip(*edges.items())
row, col = torch.tensor(edge_index_T).t()
inv_weight = 100 - torch.tensor(weight)
clique_graph = SparseTensor(row=row, col=col, value=inv_weight,
sparse_sizes=(len(cliques), len(cliques)))
junc_tree = minimum_spanning_tree(clique_graph.to_scipy('csr'))
row, col, _ = SparseTensor.from_scipy(junc_tree).coo()
edge_index = torch.stack([row, col], dim=0)
edge_index = to_undirected(edge_index, num_nodes=len(cliques))
else:
edge_index = torch.empty((2, 0), dtype=torch.long)
rows = [[i] * len(atom2clique[i]) for i in range(mol.GetNumAtoms())]
row = torch.tensor(list(chain.from_iterable(rows)))
col = torch.tensor(list(chain.from_iterable(atom2clique)))
atom2clique = torch.stack([row, col], dim=0).to(torch.long)
if return_vocab:
vocab = torch.tensor(xs, dtype=torch.long)
return edge_index, atom2clique, len(cliques), vocab
else:
return edge_index, atom2clique, len(cliques)