Source code for torch_geometric.nn.conv.mf_conv

import torch

from torch_geometric.nn.conv import MessagePassing
from torch_geometric.utils import remove_self_loops, add_self_loops, degree
from torch.nn import Linear, ModuleList

[docs]class MFConv(MessagePassing): r"""The graph neural network operator from the `"Convolutional Networks on Graphs for Learning Molecular Fingerprints" <>`_ paper .. math:: \mathbf{x}^{\prime}_i = \mathbf{W}^{(\deg(i))} \left( \mathbf{x}_i + \sum_{j \in \mathcal{N}(i)} \mathbf{x}_j \right) which trains a distinct weight matrix for each possible vertex degree. Args: in_channels (int): Size of each input sample. out_channels (int): Size of each output sample. max_degree (int, optional): The maximum node degree to consider when updating weights (default: :obj:`10`) root_weight (bool, optional): If set to :obj:`True`, the layer will transform central node features differently than neighboring node features. (default: obj:`False`) bias (bool, optional): If set to :obj:`False`, the layer will not learn an additive bias. (default: :obj:`True`) **kwargs (optional): Additional arguments of :class:`torch_geometric.nn.conv.MessagePassing`. """ def __init__(self, in_channels, out_channels, max_degree=10, root_weight=False, bias=True, **kwargs): super(MFConv, self).__init__(aggr='add', **kwargs) self.in_channels = in_channels self.out_channels = out_channels self.max_degree = max_degree self.root_weight = root_weight self.rel_lins = ModuleList([ Linear(in_channels, out_channels, bias=bias) for _ in range(max_degree + 1) ]) if root_weight: self.root_lins = ModuleList([ Linear(in_channels, out_channels, bias=False) for _ in range(max_degree + 1) ]) self.reset_parameters()
[docs] def reset_parameters(self): for lin in self.rel_lins: lin.reset_parameters() if self.root_weight: for lin in self.root_lins: lin.reset_parameters()
[docs] def forward(self, x, edge_index): edge_index, _ = remove_self_loops(edge_index) deg = degree(edge_index[1 if self.flow == 'source_to_target' else 0], x.size(0), dtype=torch.long) deg.clamp_(max=self.max_degree) if not self.root_weight: edge_index, _ = add_self_loops(edge_index, num_nodes=x.size(self.node_dim)) h = self.propagate(edge_index, x=x) out = x.new_empty(list(x.size())[:-1] + [self.out_channels]) for i in deg.unique().tolist(): idx = (deg == i).nonzero().view(-1) r = self.rel_lins[i](h.index_select(self.node_dim, idx)) if self.root_weight: r = r + self.root_lins[i](x.index_select(self.node_dim, idx)) out.index_copy_(self.node_dim, idx, r) return out
def message(self, x_j): return x_j def __repr__(self): return '{}({}, {})'.format(self.__class__.__name__, self.in_channels, self.out_channels)