Source code for torch_geometric.nn.dense.linear

import copy
import math
from typing import Any, Dict, Optional, Union

import torch
import torch.nn.functional as F
from torch import Tensor, nn
from torch.nn.parameter import Parameter

import torch_geometric.typing
from torch_geometric.nn import inits
from torch_geometric.typing import pyg_lib
from torch_geometric.utils import index_sort
from torch_geometric.utils.sparse import index2ptr


def is_uninitialized_parameter(x: Any) -> bool:
    if not hasattr(nn.parameter, 'UninitializedParameter'):
        return False
    return isinstance(x, nn.parameter.UninitializedParameter)


def reset_weight_(weight: Tensor, in_channels: int,
                  initializer: Optional[str] = None) -> Tensor:
    if in_channels <= 0:
        pass
    elif initializer == 'glorot':
        inits.glorot(weight)
    elif initializer == 'uniform':
        bound = 1.0 / math.sqrt(in_channels)
        torch.nn.init.uniform_(weight.data, -bound, bound)
    elif initializer == 'kaiming_uniform':
        inits.kaiming_uniform(weight, fan=in_channels, a=math.sqrt(5))
    elif initializer is None:
        inits.kaiming_uniform(weight, fan=in_channels, a=math.sqrt(5))
    else:
        raise RuntimeError(f"Weight initializer '{initializer}' not supported")

    return weight


def reset_bias_(bias: Optional[Tensor], in_channels: int,
                initializer: Optional[str] = None) -> Optional[Tensor]:
    if bias is None or in_channels <= 0:
        pass
    elif initializer == 'zeros':
        inits.zeros(bias)
    elif initializer is None:
        inits.uniform(in_channels, bias)
    else:
        raise RuntimeError(f"Bias initializer '{initializer}' not supported")

    return bias


[docs]class Linear(torch.nn.Module): r"""Applies a linear tranformation to the incoming data .. math:: \mathbf{x}^{\prime} = \mathbf{x} \mathbf{W}^{\top} + \mathbf{b} similar to :class:`torch.nn.Linear`. It supports lazy initialization and customizable weight and bias initialization. Args: in_channels (int): Size of each input sample. Will be initialized lazily in case it is given as :obj:`-1`. out_channels (int): Size of each output sample. bias (bool, optional): If set to :obj:`False`, the layer will not learn an additive bias. (default: :obj:`True`) weight_initializer (str, optional): The initializer for the weight matrix (:obj:`"glorot"`, :obj:`"uniform"`, :obj:`"kaiming_uniform"` or :obj:`None`). If set to :obj:`None`, will match default weight initialization of :class:`torch.nn.Linear`. (default: :obj:`None`) bias_initializer (str, optional): The initializer for the bias vector (:obj:`"zeros"` or :obj:`None`). If set to :obj:`None`, will match default bias initialization of :class:`torch.nn.Linear`. (default: :obj:`None`) Shapes: - **input:** features :math:`(*, F_{in})` - **output:** features :math:`(*, F_{out})` """ def __init__(self, in_channels: int, out_channels: int, bias: bool = True, weight_initializer: Optional[str] = None, bias_initializer: Optional[str] = None): super().__init__() self.in_channels = in_channels self.out_channels = out_channels self.weight_initializer = weight_initializer self.bias_initializer = bias_initializer if in_channels > 0: self.weight = Parameter(torch.Tensor(out_channels, in_channels)) else: self.weight = nn.parameter.UninitializedParameter() self._hook = self.register_forward_pre_hook( self.initialize_parameters) if bias: self.bias = Parameter(torch.Tensor(out_channels)) else: self.register_parameter('bias', None) self._load_hook = self._register_load_state_dict_pre_hook( self._lazy_load_hook) self.reset_parameters() def __deepcopy__(self, memo): out = Linear(self.in_channels, self.out_channels, self.bias is not None, self.weight_initializer, self.bias_initializer) if self.in_channels > 0: out.weight = copy.deepcopy(self.weight, memo) if self.bias is not None: out.bias = copy.deepcopy(self.bias, memo) return out
[docs] def reset_parameters(self): r"""Resets all learnable parameters of the module.""" reset_weight_(self.weight, self.in_channels, self.weight_initializer) reset_bias_(self.bias, self.in_channels, self.bias_initializer)
[docs] def forward(self, x: Tensor) -> Tensor: r""" Args: x (torch.Tensor): The input features. """ return F.linear(x, self.weight, self.bias)
@torch.no_grad() def initialize_parameters(self, module, input): if is_uninitialized_parameter(self.weight): self.in_channels = input[0].size(-1) self.weight.materialize((self.out_channels, self.in_channels)) self.reset_parameters() self._hook.remove() delattr(self, '_hook') def _save_to_state_dict(self, destination, prefix, keep_vars): if (is_uninitialized_parameter(self.weight) or torch.onnx.is_in_onnx_export()): destination[prefix + 'weight'] = self.weight else: destination[prefix + 'weight'] = self.weight.detach() if self.bias is not None: if torch.onnx.is_in_onnx_export(): destination[prefix + 'bias'] = self.bias else: destination[prefix + 'bias'] = self.bias.detach() def _lazy_load_hook(self, state_dict, prefix, local_metadata, strict, missing_keys, unexpected_keys, error_msgs): weight = state_dict.get(prefix + 'weight', None) if weight is not None and is_uninitialized_parameter(weight): self.in_channels = -1 self.weight = nn.parameter.UninitializedParameter() if not hasattr(self, '_hook'): self._hook = self.register_forward_pre_hook( self.initialize_parameters) elif weight is not None and is_uninitialized_parameter(self.weight): self.in_channels = weight.size(-1) self.weight.materialize((self.out_channels, self.in_channels)) if hasattr(self, '_hook'): self._hook.remove() delattr(self, '_hook') def __repr__(self) -> str: return (f'{self.__class__.__name__}({self.in_channels}, ' f'{self.out_channels}, bias={self.bias is not None})')
[docs]class HeteroLinear(torch.nn.Module): r"""Applies separate linear tranformations to the incoming data according to types .. math:: \mathbf{x}^{\prime}_{\kappa} = \mathbf{x}_{\kappa} \mathbf{W}^{\top}_{\kappa} + \mathbf{b}_{\kappa} for type :math:`\kappa`. It supports lazy initialization and customizable weight and bias initialization. Args: in_channels (int): Size of each input sample. Will be initialized lazily in case it is given as :obj:`-1`. out_channels (int): Size of each output sample. num_types (int): The number of types. is_sorted (bool, optional): If set to :obj:`True`, assumes that :obj:`type_vec` is sorted. This avoids internal re-sorting of the data and can improve runtime and memory efficiency. (default: :obj:`False`) **kwargs (optional): Additional arguments of :class:`torch_geometric.nn.Linear`. Shapes: - **input:** features :math:`(*, F_{in})`, type vector :math:`(*)` - **output:** features :math:`(*, F_{out})` """ def __init__(self, in_channels: int, out_channels: int, num_types: int, is_sorted: bool = False, **kwargs): super().__init__() self.in_channels = in_channels self.out_channels = out_channels self.num_types = num_types self.is_sorted = is_sorted self.kwargs = kwargs if torch_geometric.typing.WITH_PYG_LIB: self.lins = None if self.in_channels == -1: self.weight = nn.parameter.UninitializedParameter() self._hook = self.register_forward_pre_hook( self.initialize_parameters) else: self.weight = torch.nn.Parameter( torch.Tensor(num_types, in_channels, out_channels)) if kwargs.get('bias', True): self.bias = Parameter(torch.Tensor(num_types, out_channels)) else: self.register_parameter('bias', None) else: self.lins = torch.nn.ModuleList([ Linear(in_channels, out_channels, **kwargs) for _ in range(num_types) ]) self.register_parameter('weight', None) self.register_parameter('bias', None) self.reset_parameters()
[docs] def reset_parameters(self): r"""Resets all learnable parameters of the module.""" if torch_geometric.typing.WITH_PYG_LIB: reset_weight_(self.weight, self.in_channels, self.kwargs.get('weight_initializer', None)) reset_weight_(self.bias, self.in_channels, self.kwargs.get('bias_initializer', None)) else: for lin in self.lins: lin.reset_parameters()
[docs] def forward(self, x: Tensor, type_vec: Tensor) -> Tensor: r""" Args: x (torch.Tensor): The input features. type_vec (torch.Tensor): A vector that maps each entry to a type. """ if torch_geometric.typing.WITH_PYG_LIB: assert self.weight is not None perm: Optional[Tensor] = None if not self.is_sorted: if (type_vec[1:] < type_vec[:-1]).any(): type_vec, perm = index_sort(type_vec, self.num_types) x = x[perm] type_vec_ptr = index2ptr(type_vec, self.num_types) out = pyg_lib.ops.segment_matmul(x, type_vec_ptr, self.weight) if self.bias is not None: out += self.bias[type_vec] if perm is not None: # Restore original order (if necessary). out_unsorted = torch.empty_like(out) out_unsorted[perm] = out out = out_unsorted else: assert self.lins is not None out = x.new_empty(x.size(0), self.out_channels) for i, lin in enumerate(self.lins): mask = type_vec == i out[mask] = lin(x[mask]) return out
@torch.no_grad() def initialize_parameters(self, module, input): if is_uninitialized_parameter(self.weight): self.in_channels = input[0].size(-1) self.weight.materialize( (self.num_types, self.in_channels, self.out_channels)) self.reset_parameters() self._hook.remove() delattr(self, '_hook') def __repr__(self) -> str: return (f'{self.__class__.__name__}({self.in_channels}, ' f'{self.out_channels}, num_types={self.num_types}, ' f'bias={self.kwargs.get("bias", True)})')
[docs]class HeteroDictLinear(torch.nn.Module): r"""Applies separate linear tranformations to the incoming data dictionary .. math:: \mathbf{x}^{\prime}_{\kappa} = \mathbf{x}_{\kappa} \mathbf{W}^{\top}_{\kappa} + \mathbf{b}_{\kappa} for key :math:`\kappa`. It supports lazy initialization and customizable weight and bias initialization. Args: in_channels (int or Dict[Any, int]): Size of each input sample. If passed an integer, :obj:`types` will be a mandatory argument. initialized lazily in case it is given as :obj:`-1`. out_channels (int): Size of each output sample. types (List[Any], optional): The keys of the input dictionary. (default: :obj:`None`) **kwargs (optional): Additional arguments of :class:`torch_geometric.nn.Linear`. """ def __init__( self, in_channels: Union[int, Dict[Any, int]], out_channels: int, types: Optional[Any] = None, **kwargs, ): super().__init__() if isinstance(in_channels, dict): self.types = list(in_channels.keys()) if any([i == -1 for i in in_channels.values()]): self._hook = self.register_forward_pre_hook( self.initialize_parameters) if types is not None and set(self.types) != set(types): raise ValueError("The provided 'types' do not match with the " "keys in the 'in_channels' dictionary") else: if types is None: raise ValueError("Please provide a list of 'types' if passing " "'in_channels' as an integer") if in_channels == -1: self._hook = self.register_forward_pre_hook( self.initialize_parameters) self.types = types in_channels = {node_type: in_channels for node_type in types} self.in_channels = in_channels self.out_channels = out_channels self.kwargs = kwargs self.lins = torch.nn.ModuleDict({ key: Linear(channels, self.out_channels, **kwargs) for key, channels in self.in_channels.items() }) self.reset_parameters()
[docs] def reset_parameters(self): r"""Resets all learnable parameters of the module.""" for lin in self.lins.values(): lin.reset_parameters()
[docs] def forward( self, x_dict: Dict[Any, Tensor], ) -> Dict[Any, Tensor]: r""" Args: x_dict (Dict[Any, torch.Tensor]): A dictionary holding input features for each individual type. """ if torch_geometric.typing.WITH_GMM: xs = [x_dict[key] for key in x_dict.keys()] weights = [self.lins[key].weight.t() for key in x_dict.keys()] if self.kwargs.get('bias', True): biases = [self.lins[key].bias for key in x_dict.keys()] else: biases = None outs = pyg_lib.ops.grouped_matmul(xs, weights, biases) return {key: out for key, out in zip(x_dict.keys(), outs)} return {key: self.lins[key](x) for key, x in x_dict.items()}
@torch.no_grad() def initialize_parameters(self, module, input): for key, x in input[0].items(): lin = self.lins[key] if is_uninitialized_parameter(lin.weight): self.lins[key].initialize_parameters(None, x) self.reset_parameters() self._hook.remove() self.in_channels = {key: x.size(-1) for key, x in input[0].items()} delattr(self, '_hook') def __repr__(self) -> str: return (f'{self.__class__.__name__}({self.in_channels}, ' f'{self.out_channels}, bias={self.kwargs.get("bias", True)})')