Source code for elliptic_toolkit.dataset

import os
from functools import singledispatch
import warnings

import pandas as pd
import numpy as np
import torch

from torch_geometric.io import fs


[docs] def download_dataset( root: str = "elliptic_bitcoin_dataset", raw_file_names=[ 'elliptic_txs_features.csv', 'elliptic_txs_edgelist.csv', 'elliptic_txs_classes.csv', ], force: bool = False, url: str = 'https://data.pyg.org/datasets/elliptic', ): """Download the Elliptic Bitcoin dataset from PyTorch Geometric's dataset repository. Args: root (str, optional): The root directory where the dataset will be stored. Defaults to "elliptic_bitcoin_dataset". raw_file_names (list, optional): List of raw file names to download. Defaults to [ 'elliptic_txs_features.csv', 'elliptic_txs_edgelist.csv', 'elliptic_txs_classes.csv', ]. force (bool, optional): Whether to force re-download the dataset if it already exists. Defaults to False. url (str, optional): The base URL for the dataset files. Defaults to 'https://data.pyg.org/datasets/elliptic'. """ if not fs.exists(root): os.mkdir(root) for name in raw_file_names: if fs.exists(os.path.join(root, name)): if force: fs.rm(os.path.join(root, name)) else: continue fs.cp(f'{url}/{os.path.basename(name)}.zip', root, extract=True)
[docs] def process_dataset( folder_path: str = "elliptic_bitcoin_dataset", features_file: str = "elliptic_txs_features.csv", classes_file: str = "elliptic_txs_classes.csv", edges_file: str = "elliptic_txs_edgelist.csv", ): """ Loads, validates, and processes the Elliptic Bitcoin dataset. Returns ------- nodes_df : pandas.DataFrame DataFrame with shape (203769, 167). Columns: - 'time': Discrete time step (int) - 'feat_0' ... 'feat_164': Node features (float) - 'class': Node label (int: 1 for illicit, 0 for licit, -1 for unknown/missing) The 'class' column uses -1 to indicate missing labels (transductive setting). The 'txId' column is dropped in the returned DataFrame; its original order matches the input file. edges_df : pandas.DataFrame DataFrame with shape (234355, 2). Columns: - 'txId1': Source node index (int, row index in nodes_df) - 'txId2': Target node index (int, row index in nodes_df) Each row represents a directed edge in the transaction graph, with node indices corresponding to rows in nodes_df. Notes ----- - All IDs in 'edges_df' are mapped to row indices in 'nodes_df'. - The function performs strict validation on shapes, unique values, and label distribution. """ classes_path = os.path.join(folder_path, classes_file) features_path = os.path.join(folder_path, features_file) edges_path = os.path.join(folder_path, edges_file) classes_df = pd.read_csv(classes_path) features_df = pd.read_csv(features_path, header=None) edges_df = pd.read_csv(edges_path) # Basic checks # features checks assert features_df.shape == (203769, 167) assert features_df[0].nunique() == 203769 # txId is unique assert features_df[1].nunique() == 49 # time has 49 unique values # classes checks assert all(classes_df.columns == ['txId', 'class']) assert classes_df.shape == (203769, 2) assert set(classes_df['class'].unique()) == set(['unknown', '1', '2']) classes_counts = classes_df['class'].value_counts() assert classes_counts['unknown'] == 157205 assert classes_counts['1'] == 4545 assert classes_counts['2'] == 42019 assert set(classes_df['txId']) == set(features_df[0]) # edges checks assert edges_df.shape == (234355, 2) assert all(edges_df.columns == ['txId1', 'txId2']) assert set(edges_df['txId1']).issubset(set(features_df[0])) assert set(edges_df['txId2']).issubset(set(features_df[0])) features_names = ['txId', 'time'] + [f'feat_{i}' for i in range(165)] features_df.columns = features_names class_map = {'unknown': -1, '1': 1, '2': 0} classes_df['class'] = classes_df['class'].map(class_map) nodes_df = features_df.join(classes_df.set_index('txId')[ 'class'], on='txId', how='left') txid_to_idx = pd.Series(nodes_df.index, index=nodes_df['txId']) # Map txId1 and txId2 in edges_df to node indices edges_df['txId1'] = edges_df['txId1'].map(txid_to_idx) edges_df['txId2'] = edges_df['txId2'].map(txid_to_idx) return nodes_df.drop(columns=['txId']), edges_df
[docs] @singledispatch def temporal_split(times, test_size=0.2): """ Split data into temporal train/test sets based on unique time steps. Parameters ---------- times : np.ndarray, torch.Tensor, or pandas.DataFrame The time information or data to split. For DataFrames, must contain a 'time' column. test_size : float, default=0.2 Proportion of unique time steps to include in the test split (between 0.0 and 1.0). Returns ------- For array/tensor input: train_indices, test_indices : array-like Indices for training and test sets. For DataFrame input: (X_train, y_train), (X_test, y_test) : tuple of tuples X_train : pandas.DataFrame Training features (all columns except 'class'). y_train : pandas.Series Training labels (the 'class' column). X_test : pandas.DataFrame Test features (all columns except 'class'). y_test : pandas.Series Test labels (the 'class' column). Or, if return_X_y=False: train_df, test_df : pandas.DataFrame The full training and test DataFrames, already sliced by time. Type-specific behavior --------------------- - np.ndarray: Uses numpy operations to split by unique time values. - torch.Tensor: Uses torch operations to split by unique time values (no CPU/GPU transfer). - pandas.DataFrame: Splits based on the 'time' column. If return_X_y=True, unpacks X and y based on the 'class' column; otherwise, returns the sliced DataFrames. """ raise NotImplementedError("temporal_split not implemented for this type")
def _temporal_split(times, mod, test_size): """ Core logic for temporal splitting, used by temporal_split for both numpy and torch arrays. Issues a warning if n_train or n_test is zero. Parameters ---------- times : array-like Array of time values (numpy or torch). mod : module Module to use (np or torch) for unique, isin, where. test_size : float Proportion of unique time steps to include in the test split. Returns ------- train_indices, test_indices : array-like Indices for training and test sets. """ assert 0.0 < test_size < 1.0, "test_size must be between 0.0 and 1.0" unique_times = mod.unique(times) n_test = int(len(unique_times) * test_size) n_train = len(unique_times) - n_test if n_train == 0 or n_test == 0: msg = ( f"temporal_split: n_train or n_test is zero. " f"n_train={n_train}, n_test={n_test}, total unique_times={len(unique_times)}. " f"Check your test_size ({test_size}) and data." ) if n_train == 0: msg += " All data assigned to test set." if n_test == 0: msg += " All data assigned to train set." warnings.warn(msg) train_times = unique_times[:n_train] test_times = unique_times[n_train:] train_mask = mod.isin(times, train_times) test_mask = mod.isin(times, test_times) train_indices = mod.where(train_mask)[0] test_indices = mod.where(test_mask)[0] return train_indices, test_indices @temporal_split.register(np.ndarray) def _(times, test_size=0.2): """ Temporal split for numpy arrays. See _temporal_split for details. """ return _temporal_split(times, np, test_size) @temporal_split.register(torch.Tensor) def _(times, test_size=0.2): """ Temporal split for torch tensors. See _temporal_split for details. """ return _temporal_split(times, torch, test_size) @temporal_split.register(pd.DataFrame) def _(nodes_df, test_size=0.2, return_X_y=True): """ Temporal split for pandas DataFrames. Splits based on the 'time' column. If return_X_y=True, returns (X_train, y_train), (X_test, y_test) tuples; otherwise, returns the full train/test DataFrames. """ train_indices, test_indices = temporal_split( nodes_df['time'].values, test_size=test_size) train_df = nodes_df.iloc[train_indices].reset_index(drop=True) test_df = nodes_df.iloc[test_indices].reset_index(drop=True) if not return_X_y: return train_df, test_df X_train, y_train = train_df.drop(columns=['class']), train_df['class'] X_test, y_test = test_df.drop(columns=['class']), test_df['class'] return (X_train, y_train), (X_test, y_test)
[docs] def load_labeled_data(test_size=0.2, root="elliptic_bitcoin_dataset"): """ Utility function to load data, select only labeled data and split temporally into train and test sets. Parameters ---------- test_size : float, default=0.2 Proportion of unique time steps to include in the test split (between 0.0 and 1.0). root : str, optional The root directory where the dataset is stored. Defaults to "elliptic_bitcoin_dataset". Returns ------- (X_train, y_train), (X_test, y_test) : tuple of tuples X_train, y_train: training features and labels X_test, y_test: test features and labels """ nodes_df, edges_df = process_dataset(folder_path=root) nodes_df = nodes_df[nodes_df['class'] != -1] # select only labeled data (X_train, y_train), (X_test, y_test) = temporal_split( nodes_df, test_size=test_size) return (X_train, y_train), (X_test, y_test)