Source code for torch_geometric.graphgym.config

import functools
import inspect
import logging
import os
import os.path as osp
import warnings
from collections.abc import Iterable
from dataclasses import asdict
from typing import Any

import torch_geometric.graphgym.register as register
from torch_geometric.io import fs

try:  # Define global config object
    from yacs.config import CfgNode as CN
    cfg = CN()
except ImportError:
    cfg = None
    warnings.warn("Could not define global config object. Please install "
                  "'yacs' via 'pip install yacs' in order to use GraphGym")


[docs]def set_cfg(cfg): r"""This function sets the default config value. 1) Note that for an experiment, only part of the arguments will be used The remaining unused arguments won't affect anything. So feel free to register any argument in graphgym.contrib.config 2) We support *at most* two levels of configs, *e.g.*, :obj:`cfg.dataset.name`. :return: Configuration use by the experiment. """ if cfg is None: return cfg # ----------------------------------------------------------------------- # # Basic options # ----------------------------------------------------------------------- # # Set print destination: stdout / file / both cfg.print = 'both' # Select device: 'cpu', 'cuda', 'auto' cfg.accelerator = 'auto' # number of devices: eg. for 2 GPU set cfg.devices=2 cfg.devices = 1 # Output directory cfg.out_dir = 'results' # Config name (in out_dir) cfg.cfg_dest = 'config.yaml' # Names of registered custom metric funcs to be used (use defaults if none) cfg.custom_metrics = [] # Random seed cfg.seed = 0 # Print rounding cfg.round = 4 # Tensorboard support for each run cfg.tensorboard_each_run = False # Tensorboard support for aggregated results cfg.tensorboard_agg = True # Additional num of worker for data loading cfg.num_workers = 0 # Max threads used by PyTorch cfg.num_threads = 6 # The metric for selecting the best epoch for each run cfg.metric_best = 'auto' # argmax or argmin in aggregating results cfg.metric_agg = 'argmax' # If visualize embedding. cfg.view_emb = False # If get GPU usage cfg.gpu_mem = False # If do benchmark analysis cfg.benchmark = False # ----------------------------------------------------------------------- # # Globally shared variables: # These variables will be set dynamically based on the input dataset # Do not directly set them here or in .yaml files # ----------------------------------------------------------------------- # cfg.share = CN() # Size of input dimension cfg.share.dim_in = 1 # Size of out dimension, i.e., number of labels to be predicted cfg.share.dim_out = 1 # Number of dataset splits: train/val/test cfg.share.num_splits = 1 # ----------------------------------------------------------------------- # # Dataset options # ----------------------------------------------------------------------- # cfg.dataset = CN() # Name of the dataset cfg.dataset.name = 'Cora' # if PyG: look for it in Pytorch Geometric dataset # if NetworkX/nx: load data in NetworkX format cfg.dataset.format = 'PyG' # Dir to load the dataset. If the dataset is downloaded, this is the # cache dir cfg.dataset.dir = './datasets' # Task: node, edge, graph, link_pred cfg.dataset.task = 'node' # Type of task: classification, regression, classification_binary # classification_multi cfg.dataset.task_type = 'classification' # Transductive / Inductive # Graph classification is always inductive cfg.dataset.transductive = True # Split ratio of dataset. Len=2: Train, Val. Len=3: Train, Val, Test cfg.dataset.split = [0.8, 0.1, 0.1] # Whether to shuffle the graphs for splitting cfg.dataset.shuffle_split = True # Whether random split or use custom split: random / custom cfg.dataset.split_mode = 'random' # Whether to use an encoder for general attribute features cfg.dataset.encoder = True # Name of general encoder cfg.dataset.encoder_name = 'db' # If add batchnorm after general encoder cfg.dataset.encoder_bn = True # Whether to use an encoder for the node features cfg.dataset.node_encoder = False # Name of node encoder cfg.dataset.node_encoder_name = 'Atom' # If add batchnorm after node encoder cfg.dataset.node_encoder_bn = True # Whether to use an encoder for the edge features cfg.dataset.edge_encoder = False # Name of edge encoder cfg.dataset.edge_encoder_name = 'Bond' # If add batchnorm after edge encoder cfg.dataset.edge_encoder_bn = True # Dimension of the encoded features. # For now the node and edge encoding dimensions # are the same. cfg.dataset.encoder_dim = 128 # Dimension for edge feature. Updated by the real dim of the dataset cfg.dataset.edge_dim = 128 # ============== Link/edge tasks only # all or disjoint cfg.dataset.edge_train_mode = 'all' # Used in disjoint edge_train_mode. The proportion of edges used for # message-passing cfg.dataset.edge_message_ratio = 0.8 # The ratio of negative samples to positive samples cfg.dataset.edge_negative_sampling_ratio = 1.0 # Whether resample disjoint when dataset.edge_train_mode is 'disjoint' cfg.dataset.resample_disjoint = False # Whether resample negative edges at training time (link prediction only) cfg.dataset.resample_negative = False # What transformation function is applied to the dataset cfg.dataset.transform = 'none' # Whether cache the splitted dataset # NOTE: it should be cautiouslly used, as cached dataset may not have # exactly the same setting as the config file cfg.dataset.cache_save = False cfg.dataset.cache_load = False # Whether remove the original node features in the dataset cfg.dataset.remove_feature = False # Simplify TU dataset for synthetic tasks cfg.dataset.tu_simple = True # Convert to undirected graph (save 2*E edges) cfg.dataset.to_undirected = False # dataset location: local, snowflake cfg.dataset.location = 'local' # Define label: Table name cfg.dataset.label_table = 'none' # Define label: Column name cfg.dataset.label_column = 'none' # ----------------------------------------------------------------------- # # Training options # ----------------------------------------------------------------------- # cfg.train = CN() # Total graph mini-batch size cfg.train.batch_size = 16 # Sampling strategy for a train loader cfg.train.sampler = 'full_batch' # Minibatch node cfg.train.sample_node = False # Num of sampled node per graph cfg.train.node_per_graph = 32 # Radius: same, extend. same: same as cfg.gnn.layers_mp, extend: layers+1 cfg.train.radius = 'extend' # Evaluate model on test data every eval period epochs cfg.train.eval_period = 10 # Option to skip training epoch evaluation cfg.train.skip_train_eval = False # Save model checkpoint every checkpoint period epochs cfg.train.ckpt_period = 100 # Enabling checkpoint, set False to disable and save I/O cfg.train.enable_ckpt = True # Resume training from the latest checkpoint in the output directory cfg.train.auto_resume = False # The epoch to resume. -1 means resume the latest epoch. cfg.train.epoch_resume = -1 # Clean checkpoint: only keep the last ckpt cfg.train.ckpt_clean = True # Number of iterations per epoch (for sampling based loaders only) cfg.train.iter_per_epoch = 32 # GraphSAINTRandomWalkSampler: random walk length cfg.train.walk_length = 4 # NeighborSampler: number of sampled nodes per layer cfg.train.neighbor_sizes = [20, 15, 10, 5] # ----------------------------------------------------------------------- # # Validation options # ----------------------------------------------------------------------- # cfg.val = CN() # Minibatch node cfg.val.sample_node = False # Sampling strategy for a val/test loader cfg.val.sampler = 'full_batch' # Num of sampled node per graph cfg.val.node_per_graph = 32 # Radius: same, extend. same: same as cfg.gnn.layers_mp, extend: layers+1 cfg.val.radius = 'extend' # ----------------------------------------------------------------------- # # Model options # ----------------------------------------------------------------------- # cfg.model = CN() # Model type to use cfg.model.type = 'gnn' # Auto match computational budget, match upper bound / lower bound cfg.model.match_upper = True # Loss function: cross_entropy, mse cfg.model.loss_fun = 'cross_entropy' # size average for loss function. 'mean' or 'sum' cfg.model.size_average = 'mean' # Threshold for binary classification cfg.model.thresh = 0.5 # ============== Link/edge tasks only # Edge decoding methods. # - dot: compute dot(u, v) to predict link (binary) # - cosine_similarity: use cosine similarity (u, v) to predict link ( # binary) # - concat: use u||v followed by an nn.Linear to obtain edge embedding # (multi-class) cfg.model.edge_decoding = 'dot' # =================================== # ================== Graph tasks only # Pooling methods. # - add: global add pool # - mean: global mean pool # - max: global max pool cfg.model.graph_pooling = 'add' # =================================== # ----------------------------------------------------------------------- # # GNN options # ----------------------------------------------------------------------- # cfg.gnn = CN() # Prediction head. Use cfg.dataset.task by default cfg.gnn.head = 'default' # Number of layers before message passing cfg.gnn.layers_pre_mp = 0 # Number of layers for message passing cfg.gnn.layers_mp = 2 # Number of layers after message passing cfg.gnn.layers_post_mp = 0 # Hidden layer dim. Automatically set if train.auto_match = True cfg.gnn.dim_inner = 16 # Type of graph conv: generalconv, gcnconv, sageconv, gatconv, ... cfg.gnn.layer_type = 'generalconv' # Stage type: 'stack', 'skipsum', 'skipconcat' cfg.gnn.stage_type = 'stack' # How many layers to skip each time cfg.gnn.skip_every = 1 # Whether use batch norm cfg.gnn.batchnorm = True # Activation cfg.gnn.act = 'relu' # Dropout cfg.gnn.dropout = 0.0 # Aggregation type: add, mean, max # Note: only for certain layers that explicitly set aggregation type # e.g., when cfg.gnn.layer_type = 'generalconv' cfg.gnn.agg = 'add' # Normalize adj cfg.gnn.normalize_adj = False # Message direction: single, both cfg.gnn.msg_direction = 'single' # Whether add message from node itself: none, add, cat cfg.gnn.self_msg = 'concat' # Number of attention heads cfg.gnn.att_heads = 1 # After concat attention heads, add a linear layer cfg.gnn.att_final_linear = False # After concat attention heads, add a linear layer cfg.gnn.att_final_linear_bn = False # Normalize after message passing cfg.gnn.l2norm = True # randomly use fewer edges for message passing cfg.gnn.keep_edge = 0.5 # clear cached feature_new cfg.gnn.clear_feature = True # ----------------------------------------------------------------------- # # Optimizer options # ----------------------------------------------------------------------- # cfg.optim = CN() # optimizer: sgd, adam cfg.optim.optimizer = 'adam' # Base learning rate cfg.optim.base_lr = 0.01 # L2 regularization cfg.optim.weight_decay = 5e-4 # SGD momentum cfg.optim.momentum = 0.9 # scheduler: none, steps, cos cfg.optim.scheduler = 'cos' # Steps for 'steps' policy (in epochs) cfg.optim.steps = [30, 60, 90] # Learning rate multiplier for 'steps' policy cfg.optim.lr_decay = 0.1 # Maximal number of epochs cfg.optim.max_epoch = 200 # ----------------------------------------------------------------------- # # Batch norm options # ----------------------------------------------------------------------- # cfg.bn = CN() # BN epsilon cfg.bn.eps = 1e-5 # BN momentum (BN momentum in PyTorch = 1 - BN momentum in Caffe2) cfg.bn.mom = 0.1 # ----------------------------------------------------------------------- # # Memory options # ----------------------------------------------------------------------- # cfg.mem = CN() # Perform ReLU inplace cfg.mem.inplace = False # Set user customized cfgs for func in register.config_dict.values(): func(cfg)
def assert_cfg(cfg): r"""Checks config values, do necessary post processing to the configs.""" if cfg.dataset.task not in ['node', 'edge', 'graph', 'link_pred']: raise ValueError(f"Task '{cfg.dataset.task}' not supported. Must be " f"one of node, edge, graph, link_pred") if 'classification' in cfg.dataset.task_type and cfg.model.loss_fun == \ 'mse': cfg.model.loss_fun = 'cross_entropy' logging.warning( 'model.loss_fun changed to cross_entropy for classification.') if cfg.dataset.task_type == 'regression' and cfg.model.loss_fun == \ 'cross_entropy': cfg.model.loss_fun = 'mse' logging.warning('model.loss_fun changed to mse for regression.') if cfg.dataset.task == 'graph' and cfg.dataset.transductive: cfg.dataset.transductive = False logging.warning('dataset.transductive changed ' 'to False for graph task.') if cfg.gnn.layers_post_mp < 1: cfg.gnn.layers_post_mp = 1 logging.warning('Layers after message passing should be >=1') if cfg.gnn.head == 'default': cfg.gnn.head = cfg.dataset.task cfg.run_dir = cfg.out_dir
[docs]def dump_cfg(cfg): r"""Dumps the config to the output directory specified in :obj:`cfg.out_dir`. Args: cfg (CfgNode): Configuration node """ os.makedirs(cfg.out_dir, exist_ok=True) cfg_file = osp.join(cfg.out_dir, cfg.cfg_dest) with open(cfg_file, 'w') as f: cfg.dump(stream=f)
[docs]def load_cfg(cfg, args): r"""Load configurations from file system and command line. Args: cfg (CfgNode): Configuration node args (ArgumentParser): Command argument parser """ cfg.merge_from_file(args.cfg_file) cfg.merge_from_list(args.opts) assert_cfg(cfg)
def makedirs_rm_exist(dir): if osp.isdir(dir): fs.rm(dir) os.makedirs(dir, exist_ok=True)
[docs]def get_fname(fname): r"""Extract filename from file name path. Args: fname (str): Filename for the yaml format configuration file """ fname = osp.basename(fname) if fname.endswith('.yaml'): fname = fname[:-5] elif fname.endswith('.yml'): fname = fname[:-4] return fname
[docs]def set_out_dir(out_dir, fname): r"""Create the directory for full experiment run. Args: out_dir (str): Directory for output, specified in :obj:`cfg.out_dir` fname (str): Filename for the yaml format configuration file """ fname = get_fname(fname) cfg.out_dir = osp.join(out_dir, fname) # Make output directory if cfg.train.auto_resume: os.makedirs(cfg.out_dir, exist_ok=True) else: makedirs_rm_exist(cfg.out_dir)
[docs]def set_run_dir(out_dir): r"""Create the directory for each random seed experiment run. Args: out_dir (str): Directory for output, specified in :obj:`cfg.out_dir` """ cfg.run_dir = osp.join(out_dir, str(cfg.seed)) # Make output directory if cfg.train.auto_resume: os.makedirs(cfg.run_dir, exist_ok=True) else: makedirs_rm_exist(cfg.run_dir)
set_cfg(cfg) def from_config(func): if inspect.isclass(func): params = list(inspect.signature(func.__init__).parameters.values())[1:] else: params = list(inspect.signature(func).parameters.values()) arg_names = [p.name for p in params] has_defaults = [p.default != inspect.Parameter.empty for p in params] @functools.wraps(func) def wrapper(*args, cfg: Any = None, **kwargs): if cfg is not None: cfg = dict(cfg) if isinstance(cfg, Iterable) else asdict(cfg) iterator = zip(arg_names[len(args):], has_defaults[len(args):]) for arg_name, has_default in iterator: if arg_name in kwargs: continue elif arg_name in cfg: kwargs[arg_name] = cfg[arg_name] elif not has_default: raise ValueError(f"'cfg.{arg_name}' undefined") return func(*args, **kwargs) return wrapper