from typing import Optional, List, Union
from torch_geometric.typing import OptPairTensor, Adj, Size, OptTensor

import torch
from torch import Tensor
from torch.nn import Parameter
import torch.nn.functional as F
from torch.nn import Sequential, Linear, ReLU, Dropout
from torch.nn import BatchNorm1d, LayerNorm, InstanceNorm1d
from torch_sparse import SparseTensor
from torch_scatter import scatter, scatter_softmax
from torch_geometric.nn.conv import MessagePassing
from torch_geometric.nn.norm import MessageNorm

from ..inits import reset

class MLP(Sequential):
    def __init__(self, channels: List[int], norm: Optional[str] = None,
                 bias: bool = True, dropout: float = 0.):
        m = []
        for i in range(1, len(channels)):
            m.append(Linear(channels[i - 1], channels[i], bias))

            if i < len(channels) - 1:
                if norm and norm == 'batch':
                    m.append(BatchNorm1d(channels[i], affine=True))
                elif norm and norm == 'layer':
                    m.append(LayerNorm(channels[i], elementwise_affine=True))
                elif norm and norm == 'instance':
                    m.append(InstanceNorm1d(channels[i], affine=False))
                elif norm:
                    raise NotImplementedError(
                        f'Normalization layer "{norm}" not supported.')

        super(MLP, self).__init__(*m)

[docs]class GENConv(MessagePassing): r"""The GENeralized Graph Convolution (GENConv) from the `"DeeperGCN: All You Need to Train Deeper GCNs" <>`_ paper. Supports SoftMax & PowerMean aggregation. The message construction is: .. math:: \mathbf{x}_i^{\prime} = \mathrm{MLP} \left( \mathbf{x}_i + \mathrm{AGG} \left( \left\{ \mathrm{ReLU} \left( \mathbf{x}_j + \mathbf{e_{ji}} \right) +\epsilon : j \in \mathcal{N}(i) \right\} \right) \right) .. note:: For an example of using :obj:`GENConv`, see `examples/ <>`_. Args: in_channels (int): Size of each input sample. out_channels (int): Size of each output sample. aggr (str, optional): The aggregation scheme to use (:obj:`"softmax"`, :obj:`"softmax_sg"`, :obj:`"power"`, :obj:`"add"`, :obj:`"mean"`, :obj:`max`). (default: :obj:`"softmax"`) t (float, optional): Initial inverse temperature for softmax aggregation. (default: :obj:`1.0`) learn_t (bool, optional): If set to :obj:`True`, will learn the value :obj:`t` for softmax aggregation dynamically. (default: :obj:`False`) p (float, optional): Initial power for power mean aggregation. (default: :obj:`1.0`) learn_p (bool, optional): If set to :obj:`True`, will learn the value :obj:`p` for power mean aggregation dynamically. (default: :obj:`False`) msg_norm (bool, optional): If set to :obj:`True`, will use message normalization. (default: :obj:`False`) learn_msg_scale (bool, optional): If set to :obj:`True`, will learn the scaling factor of message normalization. (default: :obj:`False`) norm (str, optional): Norm layer of MLP layers (:obj:`"batch"`, :obj:`"layer"`, :obj:`"instance"`) (default: :obj:`batch`) num_layers (int, optional): The number of MLP layers. (default: :obj:`2`) eps (float, optional): The epsilon value of the message construction function. (default: :obj:`1e-7`) **kwargs (optional): Additional arguments of :class:`torch_geometric.nn.conv.GenMessagePassing`. """ def __init__(self, in_channels: int, out_channels: int, aggr: str = 'softmax', t: float = 1.0, learn_t: bool = False, p: float = 1.0, learn_p: bool = False, msg_norm: bool = False, learn_msg_scale: bool = False, norm: str = 'batch', num_layers: int = 2, eps: float = 1e-7, **kwargs): super(GENConv, self).__init__(aggr=None, **kwargs) self.in_channels = in_channels self.out_channels = out_channels self.aggr = aggr self.eps = eps assert aggr in ['softmax', 'softmax_sg', 'power'] channels = [in_channels] for i in range(num_layers - 1): channels.append(in_channels * 2) channels.append(out_channels) self.mlp = MLP(channels, norm=norm) self.msg_norm = MessageNorm(learn_msg_scale) if msg_norm else None self.initial_t = t self.initial_p = p if learn_t and aggr == 'softmax': self.t = Parameter(torch.Tensor([t]), requires_grad=True) else: self.t = t if learn_p: self.p = Parameter(torch.Tensor([p]), requires_grad=True) else: self.p = p
[docs] def reset_parameters(self): reset(self.mlp) if self.msg_norm is not None: self.msg_norm.reset_parameters() if self.t and isinstance(self.t, Tensor): if self.p and isinstance(self.p, Tensor):
[docs] def forward(self, x: Union[Tensor, OptPairTensor], edge_index: Adj, edge_attr: OptTensor = None, size: Size = None) -> Tensor: """""" if isinstance(x, Tensor): x: OptPairTensor = (x, x) # Node and edge feature dimensionalites need to match. if isinstance(edge_index, Tensor): if edge_attr is not None: assert x[0].size(-1) == edge_attr.size(-1) elif isinstance(edge_index, SparseTensor): edge_attr = if edge_attr is not None: assert x[0].size(-1) == edge_attr.size(-1) # propagate_type: (x: OptPairTensor, edge_attr: OptTensor) out = self.propagate(edge_index, x=x, edge_attr=edge_attr, size=size) if self.msg_norm is not None: out = self.msg_norm(x, out) x_r = x[1] if x_r is not None: out += x_r return self.mlp(out)
def message(self, x_j: Tensor, edge_attr: OptTensor) -> Tensor: msg = x_j if edge_attr is None else x_j + edge_attr return F.relu(msg) + self.eps def aggregate(self, inputs: Tensor, index: Tensor, dim_size: Optional[int] = None) -> Tensor: if self.aggr == 'softmax': out = scatter_softmax(inputs * self.t, index, dim=self.node_dim) return scatter(inputs * out, index, dim=self.node_dim, dim_size=dim_size, reduce='sum') elif self.aggr == 'softmax_sg': out = scatter_softmax(inputs * self.t, index, dim=self.node_dim).detach() return scatter(inputs * out, index, dim=self.node_dim, dim_size=dim_size, reduce='sum') else: min_value, max_value = 1e-7, 1e1 torch.clamp_(inputs, min_value, max_value) out = scatter(torch.pow(inputs, self.p), index, dim=self.node_dim, dim_size=dim_size, reduce='mean') torch.clamp_(out, min_value, max_value) return torch.pow(out, 1 / self.p) def __repr__(self): return '{}({}, {}, aggr={})'.format(self.__class__.__name__, self.in_channels, self.out_channels, self.aggr)