Source code for torch_geometric.nn.conv.res_gated_graph_conv

from typing import Callable, Optional, Tuple, Union

import torch
from torch import Tensor
from torch.nn import Parameter, Sigmoid

from torch_geometric.nn.conv import MessagePassing
from torch_geometric.nn.dense.linear import Linear
from torch_geometric.nn.inits import zeros
from torch_geometric.typing import Adj, OptTensor, PairTensor

[docs]class ResGatedGraphConv(MessagePassing): r"""The residual gated graph convolutional operator from the `"Residual Gated Graph ConvNets" <>`_ paper. .. math:: \mathbf{x}^{\prime}_i = \mathbf{W}_1 \mathbf{x}_i + \sum_{j \in \mathcal{N}(i)} \eta_{i,j} \odot \mathbf{W}_2 \mathbf{x}_j where the gate :math:`\eta_{i,j}` is defined as .. math:: \eta_{i,j} = \sigma(\mathbf{W}_3 \mathbf{x}_i + \mathbf{W}_4 \mathbf{x}_j) with :math:`\sigma` denoting the sigmoid function. Args: in_channels (int or tuple): Size of each input sample, or :obj:`-1` to derive the size from the first input(s) to the forward method. A tuple corresponds to the sizes of source and target dimensionalities. out_channels (int): Size of each output sample. act (callable, optional): Gating function :math:`\sigma`. (default: :meth:`torch.nn.Sigmoid()`) edge_dim (int, optional): Edge feature dimensionality (in case there are any). (default: :obj:`None`) bias (bool, optional): If set to :obj:`False`, the layer will not learn an additive bias. (default: :obj:`True`) root_weight (bool, optional): If set to :obj:`False`, the layer will not add transformed root node features to the output. (default: :obj:`True`) **kwargs (optional): Additional arguments of :class:`torch_geometric.nn.conv.MessagePassing`. Shapes: - **inputs:** node features :math:`(|\mathcal{V}|, F_{in})` or :math:`((|\mathcal{V_s}|, F_{s}), (|\mathcal{V_t}|, F_{t}))` if bipartite, edge indices :math:`(2, |\mathcal{E}|)` - **outputs:** node features :math:`(|\mathcal{V}|, F_{out})` or :math:`(|\mathcal{V_t}|, F_{out})` if bipartite """ def __init__( self, in_channels: Union[int, Tuple[int, int]], out_channels: int, act: Optional[Callable] = Sigmoid(), edge_dim: Optional[int] = None, root_weight: bool = True, bias: bool = True, **kwargs, ): kwargs.setdefault('aggr', 'add') super().__init__(**kwargs) self.in_channels = in_channels self.out_channels = out_channels self.act = act self.edge_dim = edge_dim self.root_weight = root_weight if isinstance(in_channels, int): in_channels = (in_channels, in_channels) edge_dim = edge_dim if edge_dim is not None else 0 self.lin_key = Linear(in_channels[1] + edge_dim, out_channels) self.lin_query = Linear(in_channels[0] + edge_dim, out_channels) self.lin_value = Linear(in_channels[0] + edge_dim, out_channels) if root_weight: self.lin_skip = Linear(in_channels[1], out_channels, bias=False) else: self.register_parameter('lin_skip', None) if bias: self.bias = Parameter(Tensor(out_channels)) else: self.register_parameter('bias', None) self.reset_parameters()
[docs] def reset_parameters(self): super().reset_parameters() self.lin_key.reset_parameters() self.lin_query.reset_parameters() self.lin_value.reset_parameters() if self.lin_skip is not None: self.lin_skip.reset_parameters() if self.bias is not None: zeros(self.bias)
[docs] def forward( self, x: Union[Tensor, PairTensor], edge_index: Adj, edge_attr: OptTensor = None, ) -> Tensor: if isinstance(x, Tensor): x = (x, x) # In case edge features are not given, we can compute key, query and # value tensors in node-level space, which is a bit more efficient: if self.edge_dim is None: k = self.lin_key(x[1]) q = self.lin_query(x[0]) v = self.lin_value(x[0]) else: k, q, v = x[1], x[0], x[0] # propagate_type: (k: Tensor, q: Tensor, v: Tensor, # edge_attr: OptTensor) out = self.propagate(edge_index, k=k, q=q, v=v, edge_attr=edge_attr) if self.root_weight: out = out + self.lin_skip(x[1]) if self.bias is not None: out = out + self.bias return out
def message(self, k_i: Tensor, q_j: Tensor, v_j: Tensor, edge_attr: OptTensor) -> Tensor: assert (edge_attr is not None) == (self.edge_dim is not None) if edge_attr is not None: k_i = self.lin_key([k_i, edge_attr], dim=-1)) q_j = self.lin_query([q_j, edge_attr], dim=-1)) v_j = self.lin_value([v_j, edge_attr], dim=-1)) return self.act(k_i + q_j) * v_j