Source code for torch_geometric.utils.sparse

from typing import Any, Optional, Tuple, Union

import torch
from torch import Tensor

from torch_geometric.typing import SparseTensor


[docs]def dense_to_sparse(adj: Tensor) -> Tuple[Tensor, Tensor]: r"""Converts a dense adjacency matrix to a sparse adjacency matrix defined by edge indices and edge attributes. Args: adj (Tensor): The dense adjacency matrix of shape :obj:`[num_nodes, num_nodes]` or :obj:`[batch_size, num_nodes, num_nodes]`. :rtype: (:class:`LongTensor`, :class:`Tensor`) Examples: >>> # Forr a single adjacency matrix >>> adj = torch.tensor([[3, 1], ... [2, 0]]) >>> dense_to_sparse(adj) (tensor([[0, 0, 1], [0, 1, 0]]), tensor([3, 1, 2])) >>> # For two adjacency matrixes >>> adj = torch.tensor([[[3, 1], ... [2, 0]], ... [[0, 1], ... [0, 2]]]) >>> dense_to_sparse(adj) (tensor([[0, 0, 1, 2, 3], [0, 1, 0, 3, 3]]), tensor([3, 1, 2, 1, 2])) """ if adj.dim() < 2 or adj.dim() > 3: raise ValueError(f"Dense adjacency matrix 'adj' must be 2- or " f"3-dimensional (got {adj.dim()} dimensions)") edge_index = adj.nonzero().t() if edge_index.size(0) == 2: edge_attr = adj[edge_index[0], edge_index[1]] return edge_index, edge_attr else: edge_attr = adj[edge_index[0], edge_index[1], edge_index[2]] row = edge_index[1] + adj.size(-2) * edge_index[0] col = edge_index[2] + adj.size(-1) * edge_index[0] return torch.stack([row, col], dim=0), edge_attr
[docs]def is_torch_sparse_tensor(src: Any) -> bool: r"""Returns :obj:`True` if the input :obj:`src` is a :class:`torch.sparse.Tensor` (in any sparse layout). Args: src (Any): The input object to be checked. """ if isinstance(src, Tensor): if src.layout == torch.sparse_coo: return True if src.layout == torch.sparse_csr: return True if src.layout == torch.sparse_csc: return True return False
[docs]def is_sparse(src: Any) -> bool: r"""Returns :obj:`True` if the input :obj:`src` is of type :class:`torch.sparse.Tensor` (in any sparse layout) or of type :class:`torch_sparse.SparseTensor`. Args: src (Any): The input object to be checked. """ return is_torch_sparse_tensor(src) or isinstance(src, SparseTensor)
[docs]def to_torch_coo_tensor( edge_index: Tensor, edge_attr: Optional[Tensor] = None, size: Optional[Union[int, Tuple[int, int]]] = None, is_coalesced: bool = False, ) -> Tensor: r"""Converts a sparse adjacency matrix defined by edge indices and edge attributes to a :class:`torch.sparse.Tensor` with layout `torch.sparse_coo`. See :meth:`~torch_geometric.utils.to_edge_index` for the reverse operation. Args: edge_index (LongTensor): The edge indices. edge_attr (Tensor, optional): The edge attributes. (default: :obj:`None`) size (int or (int, int), optional): The size of the sparse matrix. If given as an integer, will create a quadratic sparse matrix. If set to :obj:`None`, will infer a quadratic sparse matrix based on :obj:`edge_index.max() + 1`. (default: :obj:`None`) is_coalesced (bool): If set to :obj:`True`, will assume that :obj:`edge_index` is already coalesced and thus avoids expensive computation. (default: :obj:`False`) :rtype: :class:`torch.sparse.Tensor` Example: >>> edge_index = torch.tensor([[0, 1, 1, 2, 2, 3], ... [1, 0, 2, 1, 3, 2]]) >>> to_torch_coo_tensor(edge_index) tensor(indices=tensor([[0, 1, 1, 2, 2, 3], [1, 0, 2, 1, 3, 2]]), values=tensor([1., 1., 1., 1., 1., 1.]), size=(4, 4), nnz=6, layout=torch.sparse_coo) """ if size is None: size = int(edge_index.max()) + 1 if not isinstance(size, (tuple, list)): size = (size, size) if edge_attr is None: edge_attr = torch.ones(edge_index.size(1), device=edge_index.device) size = tuple(size) + edge_attr.size()[1:] adj = torch.sparse_coo_tensor( indices=edge_index, values=edge_attr, size=size, device=edge_index.device, ) return adj._coalesced_(True) if is_coalesced else adj.coalesce()
[docs]def to_torch_csr_tensor( edge_index: Tensor, edge_attr: Optional[Tensor] = None, size: Optional[Union[int, Tuple[int, int]]] = None, is_coalesced: bool = False, ) -> Tensor: r"""Converts a sparse adjacency matrix defined by edge indices and edge attributes to a :class:`torch.sparse.Tensor` with layout `torch.sparse_csr`. See :meth:`~torch_geometric.utils.to_edge_index` for the reverse operation. Args: edge_index (LongTensor): The edge indices. edge_attr (Tensor, optional): The edge attributes. (default: :obj:`None`) size (int or (int, int), optional): The size of the sparse matrix. If given as an integer, will create a quadratic sparse matrix. If set to :obj:`None`, will infer a quadratic sparse matrix based on :obj:`edge_index.max() + 1`. (default: :obj:`None`) is_coalesced (bool): If set to :obj:`True`, will assume that :obj:`edge_index` is already coalesced and thus avoids expensive computation. (default: :obj:`False`) :rtype: :class:`torch.sparse.Tensor` Example: >>> edge_index = torch.tensor([[0, 1, 1, 2, 2, 3], ... [1, 0, 2, 1, 3, 2]]) >>> to_torch_csr_tensor(edge_index) tensor(crow_indices=tensor([0, 1, 3, 5, 6]), col_indices=tensor([1, 0, 2, 1, 3, 2]), values=tensor([1., 1., 1., 1., 1., 1.]), size=(4, 4), nnz=6, layout=torch.sparse_csr) """ adj = to_torch_coo_tensor(edge_index, edge_attr, size, is_coalesced) return adj.to_sparse_csr()
[docs]def to_torch_csc_tensor( edge_index: Tensor, edge_attr: Optional[Tensor] = None, size: Optional[Union[int, Tuple[int, int]]] = None, is_coalesced: bool = False, ) -> Tensor: r"""Converts a sparse adjacency matrix defined by edge indices and edge attributes to a :class:`torch.sparse.Tensor` with layout `torch.sparse_csc`. See :meth:`~torch_geometric.utils.to_edge_index` for the reverse operation. Args: edge_index (LongTensor): The edge indices. edge_attr (Tensor, optional): The edge attributes. (default: :obj:`None`) size (int or (int, int), optional): The size of the sparse matrix. If given as an integer, will create a quadratic sparse matrix. If set to :obj:`None`, will infer a quadratic sparse matrix based on :obj:`edge_index.max() + 1`. (default: :obj:`None`) is_coalesced (bool): If set to :obj:`True`, will assume that :obj:`edge_index` is already coalesced and thus avoids expensive computation. (default: :obj:`False`) :rtype: :class:`torch.sparse.Tensor` Example: >>> edge_index = torch.tensor([[0, 1, 1, 2, 2, 3], ... [1, 0, 2, 1, 3, 2]]) >>> to_torch_csc_tensor(edge_index) tensor(ccol_indices=tensor([0, 1, 3, 5, 6]), row_indices=tensor([1, 0, 2, 1, 3, 2]), values=tensor([1., 1., 1., 1., 1., 1.]), size=(4, 4), nnz=6, layout=torch.sparse_csc) """ adj = to_torch_coo_tensor(edge_index, edge_attr, size, is_coalesced) return adj.to_sparse_csc()
[docs]def to_edge_index(adj: Union[Tensor, SparseTensor]) -> Tuple[Tensor, Tensor]: r"""Converts a :class:`torch.sparse.Tensor` or a :class:`torch_sparse.SparseTensor` to edge indices and edge attributes. Args: adj (torch.sparse.Tensor or SparseTensor): The adjacency matrix. :rtype: (:class:`torch.Tensor`, :class:`torch.Tensor`) Example: >>> edge_index = torch.tensor([[0, 1, 1, 2, 2, 3], ... [1, 0, 2, 1, 3, 2]]) >>> adj = to_torch_coo_tensor(edge_index) >>> to_edge_index(adj) (tensor([[0, 1, 1, 2, 2, 3], [1, 0, 2, 1, 3, 2]]), tensor([1., 1., 1., 1., 1., 1.])) """ if isinstance(adj, SparseTensor): row, col, value = adj.coo() if value is None: value = torch.ones(row.size(0), device=row.device) return torch.stack([row, col], dim=0).long(), value if adj.layout == torch.sparse_coo: return adj.indices().detach().long(), adj.values() if adj.layout == torch.sparse_csr: row = ptr2index(adj.crow_indices().detach()) col = adj.col_indices().detach() return torch.stack([row, col], dim=0).long(), adj.values() if adj.layout == torch.sparse_csc: col = ptr2index(adj.ccol_indices().detach()) row = adj.row_indices().detach() return torch.stack([row, col], dim=0).long(), adj.values() raise ValueError(f"Unexpected sparse tensor layout (got '{adj.layout}')")
# Helper functions ############################################################ def get_sparse_diag( size: int, fill_value: float = 1.0, layout: Optional[int] = None, dtype: Optional[torch.dtype] = None, device: Optional[torch.device] = None, ) -> Tensor: return torch.sparse.spdiags( torch.full((1, size), fill_value, dtype=dtype, device=device), offsets=torch.zeros(1, dtype=torch.long, device=device), shape=(size, size), layout=layout, ) def set_sparse_value(adj: Tensor, value: Tensor) -> Tensor: size = adj.size() if value.dim() > 1: size = size + value.size()[1:] if adj.layout == torch.sparse_coo: return torch.sparse_coo_tensor( indices=adj.indices(), values=value, size=size, device=value.device, ).coalesce() if adj.layout == torch.sparse_csr: return torch.sparse_csr_tensor( crow_indices=adj.crow_indices(), col_indices=adj.col_indices(), values=value, size=size, device=value.device, ) if adj.layout == torch.sparse_csc: return torch.sparse_csc_tensor( ccol_indices=adj.ccol_indices(), row_indices=adj.row_indices(), values=value, size=size, device=value.device, ) raise ValueError(f"Unexpected sparse tensor layout (got '{adj.layout}')") def ptr2index(ptr: Tensor) -> Tensor: ind = torch.arange(ptr.numel() - 1, dtype=ptr.dtype, device=ptr.device) return ind.repeat_interleave(ptr[1:] - ptr[:-1]) def index2ptr(index: Tensor, size: int) -> Tensor: return torch._convert_indices_from_coo_to_csr( index, size, out_int32=index.dtype == torch.int32)