Source code for torch_geometric.loader.node_loader

import logging
from contextlib import contextmanager
from typing import Any, Callable, Iterator, List, Optional, Tuple, Union

import psutil
import torch

from torch_geometric.data import Data, HeteroData
from torch_geometric.data.feature_store import FeatureStore
from torch_geometric.data.graph_store import GraphStore
from torch_geometric.loader.base import DataLoaderIterator, WorkerInitWrapper
from torch_geometric.loader.utils import (
    InputData,
    filter_custom_store,
    filter_data,
    filter_hetero_data,
    get_input_nodes,
    get_numa_nodes_cores,
)
from torch_geometric.sampler.base import (
    BaseSampler,
    HeteroSamplerOutput,
    NodeSamplerInput,
    SamplerOutput,
)
from torch_geometric.typing import InputNodes, OptTensor


[docs]class NodeLoader(torch.utils.data.DataLoader): r"""A data loader that performs neighbor sampling from node information, using a generic :class:`~torch_geometric.sampler.BaseSampler` implementation that defines a :meth:`sample_from_nodes` function and is supported on the provided input :obj:`data` object. Args: data (torch_geometric.data.Data or torch_geometric.data.HeteroData): The :class:`~torch_geometric.data.Data` or :class:`~torch_geometric.data.HeteroData` graph object. node_sampler (torch_geometric.sampler.BaseSampler): The sampler implementation to be used with this loader. Note that the sampler implementation must be compatible with the input data object. input_nodes (torch.Tensor or str or Tuple[str, torch.Tensor]): The indices of nodes for which neighbors are sampled to create mini-batches. Needs to be either given as a :obj:`torch.LongTensor` or :obj:`torch.BoolTensor`. If set to :obj:`None`, all nodes will be considered. In heterogeneous graphs, needs to be passed as a tuple that holds the node type and node indices. (default: :obj:`None`) input_time (torch.Tensor, optional): Optional values to override the timestamp for the input nodes given in :obj:`input_nodes`. If not set, will use the timestamps in :obj:`time_attr` as default (if present). The :obj:`time_attr` needs to be set for this to work. (default: :obj:`None`) transform (Callable, optional): A function/transform that takes in a sampled mini-batch and returns a transformed version. (default: :obj:`None`) filter_per_worker (bool, optional): If set to :obj:`True`, will filter the returning data in each worker's subprocess rather than in the main process. Setting this to :obj:`True` is generally not recommended: (1) it may result in too many open file handles, (2) it may slown down data loading, (3) it requires operating on CPU tensors. (default: :obj:`False`) **kwargs (optional): Additional arguments of :class:`torch.utils.data.DataLoader`, such as :obj:`batch_size`, :obj:`shuffle`, :obj:`drop_last` or :obj:`num_workers`. """ def __init__( self, data: Union[Data, HeteroData, Tuple[FeatureStore, GraphStore]], node_sampler: BaseSampler, input_nodes: InputNodes = None, input_time: OptTensor = None, transform: Callable = None, filter_per_worker: bool = False, **kwargs, ): # Remove for PyTorch Lightning: if 'dataset' in kwargs: del kwargs['dataset'] if 'collate_fn' in kwargs: del kwargs['collate_fn'] # Get node type (or `None` for homogeneous graphs): node_type, input_nodes = get_input_nodes(data, input_nodes) self.data = data self.node_type = node_type self.node_sampler = node_sampler self.input_data = InputData(input_nodes, input_time) self.transform = transform self.filter_per_worker = filter_per_worker # TODO: Unify DL affinitization in `BaseDataLoader` class # CPU Affinitization for loader and compute cores self.num_workers = kwargs.get('num_workers', 0) self.is_cuda_available = torch.cuda.is_available() self.cpu_affinity_enabled = False worker_init_fn = WorkerInitWrapper(kwargs.pop('worker_init_fn', None)) iterator = range(input_nodes.size(0)) super().__init__(iterator, collate_fn=self.collate_fn, worker_init_fn=worker_init_fn, **kwargs)
[docs] def collate_fn(self, index: NodeSamplerInput) -> Any: r"""Samples a subgraph from a batch of input nodes.""" input_data: NodeSamplerInput = self.input_data[index] out = self.node_sampler.sample_from_nodes(input_data) if self.filter_per_worker: # Execute `filter_fn` in the worker process out = self.filter_fn(out) return out
[docs] def filter_fn( self, out: Union[SamplerOutput, HeteroSamplerOutput], ) -> Union[Data, HeteroData]: r"""Joins the sampled nodes with their corresponding features, returning the resulting :class:`~torch_geometric.data.Data` or :class:`~torch_geometric.data.HeteroData` object to be used downstream. """ if isinstance(out, SamplerOutput): data = filter_data(self.data, out.node, out.row, out.col, out.edge, self.node_sampler.edge_permutation) data.batch = out.batch data.input_id = out.metadata data.batch_size = out.metadata.size(0) elif isinstance(out, HeteroSamplerOutput): if isinstance(self.data, HeteroData): data = filter_hetero_data(self.data, out.node, out.row, out.col, out.edge, self.node_sampler.edge_permutation) else: # Tuple[FeatureStore, GraphStore] data = filter_custom_store(*self.data, out.node, out.row, out.col, out.edge) for key, batch in (out.batch or {}).items(): data[key].batch = batch data[self.node_type].input_id = out.metadata data[self.node_type].batch_size = out.metadata.size(0) else: raise TypeError(f"'{self.__class__.__name__}'' found invalid " f"type: '{type(out)}'") return data if self.transform is None else self.transform(data)
def _get_iterator(self) -> Iterator: if self.filter_per_worker: return super()._get_iterator() # if not self.is_cuda_available and not self.cpu_affinity_enabled: # TODO: Add manual page for best CPU practices # link = ... # Warning('Dataloader CPU affinity opt is not enabled, consider ' # 'switching it on with enable_cpu_affinity() or see CPU ' # f'best practices for PyG [{link}])') # Execute `filter_fn` in the main process: return DataLoaderIterator(super()._get_iterator(), self.filter_fn) def __enter__(self): return self def __repr__(self) -> str: return f'{self.__class__.__name__}()'
[docs] @contextmanager def enable_cpu_affinity(self, loader_cores: Optional[List[int]] = None): r"""A context manager to enable CPU affinity for data loader workers (only used when running on CPU devices). Affinitization places data loader workers threads on specific CPU cores. In effect, it allows for more efficient local memory allocation and reduces remote memory calls. Every time a process or thread moves from one core to another, registers and caches need to be flushed and reloaded. This can become very costly if it happens often, and our threads may also no longer be close to their data, or be able to share data in a cache. .. warning:: If you want to further affinitize compute threads (*i.e.* with OMP), please make sure that you exclude :obj:`loader_cores` from the list of cores available for compute. This will cause core oversubsription and exacerbate performance. .. code-block:: python loader = NeigborLoader(data, num_workers=3) with loader.enable_cpu_affinity(loader_cores=[1,2,3]): for batch in loader: pass This will be gradually extended to increase performance on dual socket CPUs. Args: loader_cores ([int], optional): List of CPU cores to which data loader workers should affinitize to. By default, :obj:`cpu0` is reserved for all auxiliary threads and ops. The :class:`DataLoader` wil affinitize to cores starting at :obj:`cpu1`. (default: :obj:`node0_cores[1:num_workers]`) """ if not self.is_cuda_available: if not self.num_workers > 0: raise ValueError('ERROR: affinity should be used with at ' 'least one DL worker') if loader_cores and len(loader_cores) != self.num_workers: raise Exception('ERROR: cpu_affinity incorrect ' f'number of loader_cores={loader_cores} ' f'for num_workers={self.num_workers}') worker_init_fn_old = self.worker_init_fn affinity_old = psutil.Process().cpu_affinity() nthreads_old = torch.get_num_threads() loader_cores = loader_cores[:] if loader_cores else None def init_fn(worker_id): try: psutil.Process().cpu_affinity([loader_cores[worker_id]]) except IndexError: raise Exception('ERROR: cannot use affinity' f'id={worker_id} cpu={loader_cores}') worker_init_fn_old(worker_id) if loader_cores is None: numa_info = get_numa_nodes_cores() if numa_info and len(numa_info[0]) > self.num_workers: # take one thread per each node 0 core node0_cores = [cpus[0] for core_id, cpus in numa_info[0]] else: node0_cores = list(range(psutil.cpu_count(logical=False))) if len(node0_cores) - 1 < self.num_workers: raise Exception( f'More workers than available cores {node0_cores[1:]}') # set default loader core ids loader_cores = node0_cores[1:self.num_workers + 1] try: # set cpu affinity for dataloader self.worker_init_fn = init_fn self.cpu_affinity_enabled = True logging.info(f'{self.num_workers} DataLoader workers ' f'are assigned to CPUs {loader_cores}') yield finally: # restore omp_num_threads and cpu affinity psutil.Process().cpu_affinity(affinity_old) torch.set_num_threads(nthreads_old) self.worker_init_fn = worker_init_fn_old self.cpu_affinity_enabled = False else: yield