kempnerforge.data

Data pipeline for KempnerForge.

Public API:
  • MemoryMappedDataset / HuggingFaceDataset: Dataset implementations

  • DistributedSampler: Rank-aware, deterministic sampling

  • StatefulDataLoader: Checkpoint-resumable data loading

class kempnerforge.data.DistributedSampler[source]

Bases: ~torch.utils.data.Sampler.

Deterministic distributed sampler with skip-ahead support.

Partitions dataset indices across data-parallel ranks. Each rank sees a unique, non-overlapping subset of the data.

Parameters:
  • dataset – Dataset to sample from.

  • num_replicas – Number of data-parallel ranks (default: world_size).

  • rank – Current rank (default: from dist).

  • shuffle – Whether to shuffle indices.

  • seed – Base random seed for deterministic shuffling.

  • drop_last – Drop samples that don’t divide evenly across ranks.

__init__(dataset, num_replicas=None, rank=None, shuffle=True, seed=0, drop_last=True)[source]
Parameters:
Return type:

None

set_epoch(epoch)[source]

Set the epoch for deterministic re-shuffling.

Parameters:

epoch (int)

Return type:

None

set_skip(skip)[source]

Set number of samples to skip (for resumption after checkpoint).

Parameters:

skip (int)

Return type:

None

state_dict()[source]

Return checkpoint state. Keys: epoch, seed, num_replicas, rank.

Return type:

dict

load_state_dict(state)[source]

Restore from checkpoint. Only epoch is restored; rank info is local.

Parameters:

state (dict)

Return type:

None

class kempnerforge.data.HuggingFaceDataset[source]

Bases: Dataset

HuggingFace dataset with on-the-fly tokenization and sequence packing.

Loads a HuggingFace dataset, tokenizes text on the fly, and packs multiple documents into fixed-length sequences (separated by EOS tokens).

Parameters:
  • dataset_name – HuggingFace dataset name (e.g., “allenai/c4”).

  • dataset_config – Optional config name (e.g., “wikitext-2-raw-v1”).

  • split – Dataset split (“train”, “validation”, etc.).

  • text_field – Name of the text column.

  • seq_len – Sequence length for packing.

  • tokenizer_path – Path or name for HuggingFace tokenizer.

__init__(dataset_name, split, text_field, seq_len, tokenizer_path, dataset_config=None, pack_sequences=False)[source]
Parameters:
  • dataset_name (str)

  • split (str)

  • text_field (str)

  • seq_len (int)

  • tokenizer_path (str)

  • dataset_config (str | None)

  • pack_sequences (bool)

Return type:

None

state_dict()[source]

Return checkpoint state. Keys: epoch, sample_idx, total_sequences.

Return type:

dict

load_state_dict(state)[source]

Restore from checkpoint. Restores epoch and sample_idx.

Parameters:

state (dict)

Return type:

None

class kempnerforge.data.MemoryMappedDataset[source]

Bases: Dataset

Pre-tokenized dataset backed by memory-mapped numpy files.

Expects .npy files containing 1D arrays of uint16/uint32 token IDs that have been pre-packed into fixed-length sequences.

File layout: each file stores a flat array of tokens. The dataset splits them into non-overlapping chunks of seq_len tokens. Multiple files are concatenated logically.

Parameters:
  • data_dir – Directory containing .npy token files.

  • seq_len – Sequence length (number of tokens per sample).

  • file_pattern – Glob pattern for data files.

__init__(data_dir, seq_len, file_pattern='*.npy', pack_sequences=False, eos_token_id=None)[source]
Parameters:
  • data_dir (str)

  • seq_len (int)

  • file_pattern (str)

  • pack_sequences (bool)

  • eos_token_id (int | None)

Return type:

None

state_dict()[source]

Return checkpoint state. Keys: epoch, total_samples.

Return type:

dict

load_state_dict(state)[source]

Restore from checkpoint. Only epoch is restored; sample count is derived.

Parameters:

state (dict)

Return type:

None

close()[source]

Release the underlying mmaps. Preferred path; do not rely on __del__.

Return type:

None

class kempnerforge.data.StatefulDataLoader[source]

Bases: object

Stateful wrapper around PyTorch DataLoader.

Tracks iteration progress so training can resume from the exact position after a checkpoint load.

Parameters:
  • dataset – Dataset to load from.

  • batch_size – Per-device micro-batch size.

  • sampler – Distributed sampler (created automatically if None).

  • config – Data pipeline configuration.

__init__(dataset, batch_size, sampler=None, config=None)[source]
Parameters:
Return type:

None

state_dict()[source]

Return checkpoint state. Keys: epoch, batches_yielded, sampler.

Return type:

dict

load_state_dict(state)[source]

Restore from checkpoint. Restores sampler state and skips to saved batch position.

Parameters:

state (dict)

Return type:

None

Modules

dataloader

Distributed, stateful DataLoader for KempnerForge.

dataset

Dataset implementations for KempnerForge.

sampler

Distributed sampler with deterministic shuffling and skip-ahead.