Source code for torch_geometric.nn.conv.rgcn_conv

from typing import Optional, Tuple, Union

import torch
from torch import Tensor
from torch.nn import Parameter
from torch.nn import Parameter as Param

import torch_geometric.backend
import torch_geometric.typing
from torch_geometric import is_compiling
from torch_geometric.nn.conv import MessagePassing
from torch_geometric.nn.inits import glorot, zeros
from torch_geometric.typing import (
    Adj,
    OptTensor,
    SparseTensor,
    pyg_lib,
    torch_sparse,
)
from torch_geometric.utils import index_sort, one_hot, scatter, spmm
from torch_geometric.utils.sparse import index2ptr


def masked_edge_index(edge_index: Adj, edge_mask: Tensor) -> Adj:
    if isinstance(edge_index, Tensor):
        return edge_index[:, edge_mask]
    return torch_sparse.masked_select_nnz(edge_index, edge_mask, layout='coo')


[docs]class RGCNConv(MessagePassing): r"""The relational graph convolutional operator from the `"Modeling Relational Data with Graph Convolutional Networks" <https://arxiv.org/abs/1703.06103>`_ paper. .. math:: \mathbf{x}^{\prime}_i = \mathbf{\Theta}_{\textrm{root}} \cdot \mathbf{x}_i + \sum_{r \in \mathcal{R}} \sum_{j \in \mathcal{N}_r(i)} \frac{1}{|\mathcal{N}_r(i)|} \mathbf{\Theta}_r \cdot \mathbf{x}_j, where :math:`\mathcal{R}` denotes the set of relations, *i.e.* edge types. Edge type needs to be a one-dimensional :obj:`torch.long` tensor which stores a relation identifier :math:`\in \{ 0, \ldots, |\mathcal{R}| - 1\}` for each edge. .. note:: This implementation is as memory-efficient as possible by iterating over each individual relation type. Therefore, it may result in low GPU utilization in case the graph has a large number of relations. As an alternative approach, :class:`FastRGCNConv` does not iterate over each individual type, but may consume a large amount of memory to compensate. We advise to check out both implementations to see which one fits your needs. .. note:: :class:`RGCNConv` can use `dynamic shapes <https://docs.nvidia.com/deeplearning/tensorrt/developer-guide/index .html#work_dynamic_shapes>`_, which means that the shape of the interim tensors can be determined at runtime. If your device doesn't support dynamic shapes, use :class:`FastRGCNConv` instead. Args: in_channels (int or tuple): Size of each input sample. A tuple corresponds to the sizes of source and target dimensionalities. In case no input features are given, this argument should correspond to the number of nodes in your graph. out_channels (int): Size of each output sample. num_relations (int): Number of relations. num_bases (int, optional): If set, this layer will use the basis-decomposition regularization scheme where :obj:`num_bases` denotes the number of bases to use. (default: :obj:`None`) num_blocks (int, optional): If set, this layer will use the block-diagonal-decomposition regularization scheme where :obj:`num_blocks` denotes the number of blocks to use. (default: :obj:`None`) aggr (str, optional): The aggregation scheme to use (:obj:`"add"`, :obj:`"mean"`, :obj:`"max"`). (default: :obj:`"mean"`) root_weight (bool, optional): If set to :obj:`False`, the layer will not add transformed root node features to the output. (default: :obj:`True`) is_sorted (bool, optional): If set to :obj:`True`, assumes that :obj:`edge_index` is sorted by :obj:`edge_type`. This avoids internal re-sorting of the data and can improve runtime and memory efficiency. (default: :obj:`False`) bias (bool, optional): If set to :obj:`False`, the layer will not learn an additive bias. (default: :obj:`True`) **kwargs (optional): Additional arguments of :class:`torch_geometric.nn.conv.MessagePassing`. """ def __init__( self, in_channels: Union[int, Tuple[int, int]], out_channels: int, num_relations: int, num_bases: Optional[int] = None, num_blocks: Optional[int] = None, aggr: str = 'mean', root_weight: bool = True, is_sorted: bool = False, bias: bool = True, **kwargs, ): kwargs.setdefault('aggr', aggr) super().__init__(node_dim=0, **kwargs) if num_bases is not None and num_blocks is not None: raise ValueError('Can not apply both basis-decomposition and ' 'block-diagonal-decomposition at the same time.') self.in_channels = in_channels self.out_channels = out_channels self.num_relations = num_relations self.num_bases = num_bases self.num_blocks = num_blocks self.is_sorted = is_sorted if isinstance(in_channels, int): in_channels = (in_channels, in_channels) self.in_channels_l = in_channels[0] self._use_segment_matmul_heuristic_output: Optional[bool] = None if num_bases is not None: self.weight = Parameter( torch.empty(num_bases, in_channels[0], out_channels)) self.comp = Parameter(torch.empty(num_relations, num_bases)) elif num_blocks is not None: assert (in_channels[0] % num_blocks == 0 and out_channels % num_blocks == 0) self.weight = Parameter( torch.empty(num_relations, num_blocks, in_channels[0] // num_blocks, out_channels // num_blocks)) self.register_parameter('comp', None) else: self.weight = Parameter( torch.empty(num_relations, in_channels[0], out_channels)) self.register_parameter('comp', None) if root_weight: self.root = Param(torch.empty(in_channels[1], out_channels)) else: self.register_parameter('root', None) if bias: self.bias = Param(torch.empty(out_channels)) else: self.register_parameter('bias', None) self.reset_parameters()
[docs] def reset_parameters(self): super().reset_parameters() glorot(self.weight) glorot(self.comp) glorot(self.root) zeros(self.bias)
[docs] def forward(self, x: Union[OptTensor, Tuple[OptTensor, Tensor]], edge_index: Adj, edge_type: OptTensor = None): r"""Runs the forward pass of the module. Args: x (torch.Tensor or tuple, optional): The input node features. Can be either a :obj:`[num_nodes, in_channels]` node feature matrix, or an optional one-dimensional node index tensor (in which case input features are treated as trainable node embeddings). Furthermore, :obj:`x` can be of type :obj:`tuple` denoting source and destination node features. edge_index (torch.Tensor or SparseTensor): The edge indices. edge_type (torch.Tensor, optional): The one-dimensional relation type/index for each edge in :obj:`edge_index`. Should be only :obj:`None` in case :obj:`edge_index` is of type :class:`torch_sparse.SparseTensor`. (default: :obj:`None`) """ # Convert input features to a pair of node features or node indices. x_l: OptTensor = None if isinstance(x, tuple): x_l = x[0] else: x_l = x if x_l is None: x_l = torch.arange(self.in_channels_l, device=self.weight.device) x_r: Tensor = x_l if isinstance(x, tuple): x_r = x[1] size = (x_l.size(0), x_r.size(0)) if isinstance(edge_index, SparseTensor): edge_type = edge_index.storage.value() assert edge_type is not None # propagate_type: (x: Tensor, edge_type_ptr: OptTensor) out = torch.zeros(x_r.size(0), self.out_channels, device=x_r.device) weight = self.weight if self.num_bases is not None: # Basis-decomposition ================= weight = (self.comp @ weight.view(self.num_bases, -1)).view( self.num_relations, self.in_channels_l, self.out_channels) if self.num_blocks is not None: # Block-diagonal-decomposition ===== if not torch.is_floating_point( x_r) and self.num_blocks is not None: raise ValueError('Block-diagonal decomposition not supported ' 'for non-continuous input features.') for i in range(self.num_relations): tmp = masked_edge_index(edge_index, edge_type == i) h = self.propagate(tmp, x=x_l, edge_type_ptr=None, size=size) h = h.view(-1, weight.size(1), weight.size(2)) h = torch.einsum('abc,bcd->abd', h, weight[i]) out = out + h.contiguous().view(-1, self.out_channels) else: # No regularization/Basis-decomposition ======================== use_segment_matmul = torch_geometric.backend.use_segment_matmul # If `use_segment_matmul` is not specified, use a simple heuristic # to determine whether `segment_matmul` can speed up computation # given the observed input sizes: if use_segment_matmul is None: segment_count = scatter(torch.ones_like(edge_type), edge_type, dim_size=self.num_relations) self._use_segment_matmul_heuristic_output = ( torch_geometric.backend.use_segment_matmul_heuristic( num_segments=self.num_relations, max_segment_size=int(segment_count.max()), in_channels=self.weight.size(1), out_channels=self.weight.size(2), )) assert self._use_segment_matmul_heuristic_output is not None use_segment_matmul = self._use_segment_matmul_heuristic_output if (use_segment_matmul and torch_geometric.typing.WITH_SEGMM and not is_compiling() and self.num_bases is None and x_l.is_floating_point() and isinstance(edge_index, Tensor)): if not self.is_sorted: if (edge_type[1:] < edge_type[:-1]).any(): edge_type, perm = index_sort( edge_type, max_value=self.num_relations) edge_index = edge_index[:, perm] edge_type_ptr = index2ptr(edge_type, self.num_relations) out = self.propagate(edge_index, x=x_l, edge_type_ptr=edge_type_ptr, size=size) else: for i in range(self.num_relations): tmp = masked_edge_index(edge_index, edge_type == i) if not torch.is_floating_point(x_r): out = out + self.propagate( tmp, x=weight[i, x_l], edge_type_ptr=None, size=size, ) else: h = self.propagate(tmp, x=x_l, edge_type_ptr=None, size=size) out = out + (h @ weight[i]) root = self.root if root is not None: if not torch.is_floating_point(x_r): out = out + root[x_r] else: out = out + x_r @ root if self.bias is not None: out = out + self.bias return out
def message(self, x_j: Tensor, edge_type_ptr: OptTensor) -> Tensor: if (torch_geometric.typing.WITH_SEGMM and not is_compiling() and edge_type_ptr is not None): # TODO Re-weight according to edge type degree for `aggr=mean`. return pyg_lib.ops.segment_matmul(x_j, edge_type_ptr, self.weight) return x_j def message_and_aggregate(self, adj_t: Adj, x: Tensor) -> Tensor: if isinstance(adj_t, SparseTensor): adj_t = adj_t.set_value(None) return spmm(adj_t, x, reduce=self.aggr) def __repr__(self) -> str: return (f'{self.__class__.__name__}({self.in_channels}, ' f'{self.out_channels}, num_relations={self.num_relations})')
[docs]class FastRGCNConv(RGCNConv): r"""See :class:`RGCNConv`."""
[docs] def forward(self, x: Union[OptTensor, Tuple[OptTensor, Tensor]], edge_index: Adj, edge_type: OptTensor = None): self.fuse = False assert self.aggr in ['add', 'sum', 'mean'] # Convert input features to a pair of node features or node indices. x_l: OptTensor = None if isinstance(x, tuple): x_l = x[0] else: x_l = x if x_l is None: x_l = torch.arange(self.in_channels_l, device=self.weight.device) x_r: Tensor = x_l if isinstance(x, tuple): x_r = x[1] size = (x_l.size(0), x_r.size(0)) # propagate_type: (x: Tensor, edge_type: OptTensor) out = self.propagate(edge_index, x=x_l, edge_type=edge_type, size=size) root = self.root if root is not None: if not torch.is_floating_point(x_r): out = out + root[x_r] else: out = out + x_r @ root if self.bias is not None: out = out + self.bias return out
def message(self, x_j: Tensor, edge_type: Tensor, edge_index_j: Tensor) -> Tensor: weight = self.weight if self.num_bases is not None: # Basis-decomposition ================= weight = (self.comp @ weight.view(self.num_bases, -1)).view( self.num_relations, self.in_channels_l, self.out_channels) if self.num_blocks is not None: # Block-diagonal-decomposition ======= if not torch.is_floating_point(x_j): raise ValueError('Block-diagonal decomposition not supported ' 'for non-continuous input features.') weight = weight[edge_type].view(-1, weight.size(2), weight.size(3)) x_j = x_j.view(-1, 1, weight.size(1)) return torch.bmm(x_j, weight).view(-1, self.out_channels) else: # No regularization/Basis-decomposition ======================== if not torch.is_floating_point(x_j): weight_index = edge_type * weight.size(1) + edge_index_j return weight.view(-1, self.out_channels)[weight_index] return torch.bmm(x_j.unsqueeze(-2), weight[edge_type]).squeeze(-2) def aggregate(self, inputs: Tensor, edge_type: Tensor, index: Tensor, dim_size: Optional[int] = None) -> Tensor: # Compute normalization in separation for each `edge_type`. if self.aggr == 'mean': norm = one_hot(edge_type, self.num_relations, dtype=inputs.dtype) norm = scatter(norm, index, dim=0, dim_size=dim_size)[index] norm = torch.gather(norm, 1, edge_type.view(-1, 1)) norm = 1. / norm.clamp_(1.) inputs = norm * inputs return scatter(inputs, index, dim=self.node_dim, dim_size=dim_size)