tensorflow-data-pipelines

TensorFlow Data Pipelines

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "tensorflow-data-pipelines" with this command: npx skills add thebushidocollective/han/thebushidocollective-han-tensorflow-data-pipelines

TensorFlow Data Pipelines

Build efficient, scalable data pipelines using the tf.data API for optimal training performance. This skill covers dataset creation, transformations, batching, shuffling, prefetching, and advanced optimization techniques to maximize GPU/TPU utilization.

Dataset Creation

From Tensor Slices

import tensorflow as tf import numpy as np

Create dataset from numpy arrays

x_train = np.random.rand(1000, 28, 28, 1) y_train = np.random.randint(0, 10, 1000)

Method 1: from_tensor_slices

dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))

Apply transformations

dataset = dataset.shuffle(buffer_size=1024) dataset = dataset.batch(32) dataset = dataset.prefetch(tf.data.AUTOTUNE)

Iterate through dataset

for batch_x, batch_y in dataset.take(2): print(f"Batch shape: {batch_x.shape}, Labels shape: {batch_y.shape}")

From Generator Functions

def data_generator(): """Generator function for custom data loading.""" for i in range(1000): # Simulate loading data from disk or API x = np.random.rand(28, 28, 1).astype(np.float32) y = np.random.randint(0, 10) yield x, y

Create dataset from generator

dataset = tf.data.Dataset.from_generator( data_generator, output_signature=( tf.TensorSpec(shape=(28, 28, 1), dtype=tf.float32), tf.TensorSpec(shape=(), dtype=tf.int32) ) )

dataset = dataset.batch(32).prefetch(tf.data.AUTOTUNE)

From Dataset Range

Create simple range dataset

dataset = tf.data.Dataset.range(1000)

Use with custom mapping

dataset = dataset.map(lambda x: (tf.random.normal([28, 28, 1]), x % 10)) dataset = dataset.batch(32)

Data Transformation

Normalization Pipeline

def normalize(image, label): """Normalize pixel values.""" image = tf.cast(image, tf.float32) / 255.0 return image, label

Apply normalization

train_dataset = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .map(normalize, num_parallel_calls=tf.data.AUTOTUNE) .batch(32) .prefetch(tf.data.AUTOTUNE) )

Data Augmentation Pipeline

def augment(image, label): """Apply random augmentations.""" image = tf.image.random_flip_left_right(image) image = tf.image.random_brightness(image, 0.2) image = tf.image.random_contrast(image, 0.8, 1.2) return image, label

def normalize(image, label): """Normalize pixel values.""" image = tf.cast(image, tf.float32) / 255.0 return image, label

Build complete pipeline

train_dataset = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .map(normalize, num_parallel_calls=tf.data.AUTOTUNE) .cache() # Cache after normalization .shuffle(1000) .map(augment, num_parallel_calls=tf.data.AUTOTUNE) .batch(32) .prefetch(tf.data.AUTOTUNE) )

Multiple Transformations

def resize_image(image, label): """Resize images to target size.""" image = tf.image.resize(image, [224, 224]) return image, label

def apply_random_rotation(image, label): """Apply random rotation augmentation.""" angle = tf.random.uniform([], -0.2, 0.2) image = tfa.image.rotate(image, angle) return image, label

Chain multiple transformations

dataset = ( tf.data.Dataset.from_tensor_slices((images, labels)) .map(resize_image, num_parallel_calls=tf.data.AUTOTUNE) .map(normalize, num_parallel_calls=tf.data.AUTOTUNE) .cache() .shuffle(10000) .map(augment, num_parallel_calls=tf.data.AUTOTUNE) .map(apply_random_rotation, num_parallel_calls=tf.data.AUTOTUNE) .batch(64) .prefetch(tf.data.AUTOTUNE) )

Batching and Shuffling

Basic Batching Configuration

Batch size

BATCH_SIZE = 64

Buffer size to shuffle the dataset

(TF data is designed to work with possibly infinite sequences,

so it doesn't attempt to shuffle the entire sequence in memory. Instead,

it maintains a buffer in which it shuffles elements).

BUFFER_SIZE = 10000

dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE, drop_remainder=True)

Dynamic Batching

Variable batch sizes based on sequence length

def batch_by_sequence_length(dataset, batch_size, max_length): """Batch sequences by length for efficient padding.""" def key_func(x, y): # Bucket by length return tf.cast(tf.size(x) / max_length * 10, tf.int64)

def reduce_func(key, dataset):
    return dataset.batch(batch_size)

return dataset.group_by_window(
    key_func=key_func,
    reduce_func=reduce_func,
    window_size=batch_size
)

Stratified Sampling

def create_stratified_dataset(features, labels, batch_size): """Create dataset with balanced class sampling.""" # Separate by class datasets = [] for class_id in range(num_classes): mask = labels == class_id class_dataset = tf.data.Dataset.from_tensor_slices( (features[mask], labels[mask]) ) datasets.append(class_dataset)

# Sample equally from each class
balanced_dataset = tf.data.Dataset.sample_from_datasets(
    datasets,
    weights=[1.0/num_classes] * num_classes
)

return balanced_dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)

Performance Optimization

Caching Strategies

Cache in memory (for small datasets)

dataset = dataset.cache()

Cache to disk (for larger datasets)

dataset = dataset.cache('/tmp/dataset_cache')

Optimal caching placement

dataset = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .map(expensive_preprocessing, num_parallel_calls=tf.data.AUTOTUNE) .cache() # Cache after expensive operations .shuffle(buffer_size) .map(cheap_augmentation, num_parallel_calls=tf.data.AUTOTUNE) .batch(batch_size) .prefetch(tf.data.AUTOTUNE) )

Prefetching

Automatic prefetching

dataset = dataset.prefetch(tf.data.AUTOTUNE)

Manual prefetch buffer size

dataset = dataset.prefetch(buffer_size=2)

Complete optimized pipeline

optimized_dataset = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .map(preprocess, num_parallel_calls=tf.data.AUTOTUNE) .cache() .shuffle(10000) .batch(64) .prefetch(tf.data.AUTOTUNE) )

Parallel Data Loading

Use num_parallel_calls for CPU-bound operations

dataset = dataset.map( preprocessing_function, num_parallel_calls=tf.data.AUTOTUNE )

Interleave for parallel file reading

def make_dataset_from_file(filename): return tf.data.TextLineDataset(filename)

filenames = tf.data.Dataset.list_files('/path/to/data/*.csv') dataset = filenames.interleave( make_dataset_from_file, cycle_length=4, num_parallel_calls=tf.data.AUTOTUNE )

Memory Management

Use take() and skip() for train/val split without loading all data

total_size = 10000 train_size = int(0.8 * total_size)

full_dataset = tf.data.Dataset.from_tensor_slices((x, y))

train_dataset = ( full_dataset .take(train_size) .shuffle(1000) .batch(32) .prefetch(tf.data.AUTOTUNE) )

val_dataset = ( full_dataset .skip(train_size) .batch(32) .prefetch(tf.data.AUTOTUNE) )

Advanced Patterns

Iterating with For Loops

Basic iteration

for i in tf.data.Dataset.range(3): tf.print('iteration:', i)

With dataset iterator

for i in iter(tf.data.Dataset.range(3)): tf.print('iteration:', i)

Distributed Dataset

Distribute dataset across devices

for i in tf.distribute.OneDeviceStrategy('cpu').experimental_distribute_dataset( tf.data.Dataset.range(3)): tf.print('iteration:', i)

Multi-GPU distribution

strategy = tf.distribute.MirroredStrategy() distributed_dataset = strategy.experimental_distribute_dataset(dataset)

Training Loop Integration

Execute training loop over dataset

for images, labels in train_ds: if optimizer.iterations > TRAIN_STEPS: break train_step(images, labels)

Vectorized Operations

def f(args): embeddings, index = args # embeddings [vocab_size, embedding_dim] # index [] # desired result: [embedding_dim] return tf.gather(params=embeddings, indices=index)

@tf.function def f_auto_vectorized(embeddings, indices): # embeddings [num_heads, vocab_size, embedding_dim] # indices [num_heads] # desired result: [num_heads, embedding_dim] return tf.vectorized_map(f, [embeddings, indices])

concrete_vectorized = f_auto_vectorized.get_concrete_function( tf.TensorSpec(shape=[None, 100, 16], dtype=tf.float32), tf.TensorSpec(shape=[None], dtype=tf.int32))

Model Integration

Training with tf.data

Use dataset with model

model = tf.keras.Sequential([ tf.keras.layers.Flatten(input_shape=(28, 28, 1)), tf.keras.layers.Dense(128, activation='relu'), tf.keras.layers.Dense(10, activation='softmax') ]) model.compile(optimizer='adam', loss='sparse_categorical_crossentropy') model.fit(train_dataset, epochs=1)

Validation Dataset

Create separate train and validation datasets

train_dataset = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(10000) .batch(32) .prefetch(tf.data.AUTOTUNE) )

val_dataset = ( tf.data.Dataset.from_tensor_slices((x_val, y_val)) .batch(32) .prefetch(tf.data.AUTOTUNE) )

Train with validation

history = model.fit( train_dataset, validation_data=val_dataset, epochs=10 )

Custom Training Loop

@tf.function def train_step(images, labels): with tf.GradientTape() as tape: predictions = model(images, training=True) loss = loss_fn(labels, predictions) gradients = tape.gradient(loss, model.trainable_variables) optimizer.apply_gradients(zip(gradients, model.trainable_variables)) return loss

Training loop with dataset

for epoch in range(epochs): for images, labels in train_dataset: loss = train_step(images, labels) print(f'Epoch {epoch}, Loss: {loss.numpy():.4f}')

File-Based Datasets

TFRecord Files

Reading TFRecord files

def parse_tfrecord(example_proto): feature_description = { 'image': tf.io.FixedLenFeature([], tf.string), 'label': tf.io.FixedLenFeature([], tf.int64), } parsed = tf.io.parse_single_example(example_proto, feature_description) image = tf.io.decode_raw(parsed['image'], tf.float32) image = tf.reshape(image, [28, 28, 1]) label = parsed['label'] return image, label

Load TFRecord dataset

tfrecord_dataset = ( tf.data.TFRecordDataset(['data_shard_1.tfrecord', 'data_shard_2.tfrecord']) .map(parse_tfrecord, num_parallel_calls=tf.data.AUTOTUNE) .shuffle(10000) .batch(32) .prefetch(tf.data.AUTOTUNE) )

CSV Files

Load CSV dataset

def parse_csv(line): columns = tf.io.decode_csv(line, record_defaults=[0.0] * 785) label = tf.cast(columns[0], tf.int32) features = tf.stack(columns[1:]) features = tf.reshape(features, [28, 28, 1]) return features, label

csv_dataset = ( tf.data.TextLineDataset(['data.csv']) .skip(1) # Skip header .map(parse_csv, num_parallel_calls=tf.data.AUTOTUNE) .batch(32) .prefetch(tf.data.AUTOTUNE) )

Image Files

def load_and_preprocess_image(path, label): """Load image from file and preprocess.""" image = tf.io.read_file(path) image = tf.image.decode_jpeg(image, channels=3) image = tf.image.resize(image, [224, 224]) image = tf.cast(image, tf.float32) / 255.0 return image, label

Create dataset from image paths

image_paths = ['/path/to/image1.jpg', '/path/to/image2.jpg', ...] labels = [0, 1, ...]

image_dataset = ( tf.data.Dataset.from_tensor_slices((image_paths, labels)) .map(load_and_preprocess_image, num_parallel_calls=tf.data.AUTOTUNE) .cache() .shuffle(1000) .batch(32) .prefetch(tf.data.AUTOTUNE) )

Data Validation

DataLoader Generation

Generate TensorFlow dataset with batching

def gen_dataset( batch_size=1, is_training=False, shuffle=False, input_pipeline_context=None, preprocess=None, drop_remainder=True, total_steps=None ): """Generate dataset with specified configuration.""" dataset = tf.data.Dataset.from_tensor_slices((features, labels))

if shuffle:
    dataset = dataset.shuffle(buffer_size=10000)

if preprocess:
    dataset = dataset.map(preprocess, num_parallel_calls=tf.data.AUTOTUNE)

dataset = dataset.batch(batch_size, drop_remainder=drop_remainder)

if is_training:
    dataset = dataset.repeat()

dataset = dataset.prefetch(tf.data.AUTOTUNE)

if total_steps:
    dataset = dataset.take(total_steps)

return dataset

When to Use This Skill

Use the tensorflow-data-pipelines skill when you need to:

  • Load and preprocess large datasets that don't fit in memory

  • Implement data augmentation for training robustness

  • Optimize data loading to prevent GPU/TPU idle time

  • Create custom data generators for specialized formats

  • Build multi-modal pipelines with images, text, and audio

  • Implement efficient batching strategies for variable-length sequences

  • Cache preprocessed data to speed up training

  • Handle distributed training across multiple devices

  • Parse TFRecord, CSV, or other file formats

  • Implement stratified sampling for imbalanced datasets

  • Create reproducible data shuffling

  • Build real-time data augmentation pipelines

  • Optimize memory usage with streaming datasets

  • Implement prefetching for pipeline parallelism

  • Create validation and test splits efficiently

Best Practices

  • Always use prefetch() - Add .prefetch(tf.data.AUTOTUNE) at the end of pipeline to overlap data loading with training

  • Use num_parallel_calls=AUTOTUNE - Let TensorFlow automatically tune parallelism for map operations

  • Cache after expensive operations - Place .cache() after preprocessing but before augmentation and shuffling

  • Shuffle before batching - Call .shuffle() before .batch() to ensure random batches

  • Use appropriate buffer sizes - Shuffle buffer should be >= dataset size for perfect shuffling, or at least several thousand

  • Normalize data in pipeline - Apply normalization in map() function for consistency across train/val/test

  • Batch after transformations - Apply .batch() after all element-wise transformations for efficiency

  • Use drop_remainder for training - Set drop_remainder=True in batch() to ensure consistent batch sizes

  • Leverage AUTOTUNE - Use tf.data.AUTOTUNE for automatic performance tuning instead of manual values

  • Apply augmentation after caching - Cache deterministic preprocessing, apply random augmentation after

  • Use interleave for file reading - Parallel file reading with interleave() for large multi-file datasets

  • Repeat for infinite datasets - Use .repeat() for training datasets to avoid dataset exhaustion

  • Use take/skip for splits - Efficiently split datasets without loading all data into memory

  • Monitor pipeline performance - Use TensorFlow Profiler to identify bottlenecks in data pipeline

  • Shard data for distribution - Use shard() for distributed training across multiple workers

Common Pitfalls

  • Shuffling after batching - Shuffles batches instead of individual samples, reducing randomness

  • Not using prefetch - GPU sits idle waiting for data, wasting compute resources

  • Cache in wrong position - Caching after augmentation prevents randomness, before preprocessing wastes memory

  • Buffer size too small - Insufficient shuffle buffer leads to poor randomization and training issues

  • Not using num_parallel_calls - Sequential map operations create bottlenecks in data loading

  • Loading entire dataset to memory - Use tf.data instead of loading all data with NumPy

  • Applying augmentation deterministically - Same augmentations every epoch reduce training effectiveness

  • Not setting random seeds - Irreproducible results and debugging difficulties

  • Ignoring batch remainder - Variable batch sizes cause errors in models expecting fixed dimensions

  • Repeating validation dataset - Validation should not repeat, only training datasets

  • Not using AUTOTUNE - Manual tuning is difficult and suboptimal compared to automatic optimization

  • Caching very large datasets - Exceeds memory limits and causes OOM errors

  • Too many parallel operations - Excessive parallelism causes thread contention and slowdown

  • Not monitoring data loading time - Data pipeline can become training bottleneck without monitoring

  • Applying normalization inconsistently - Different normalization for train/val/test causes poor performance

Resources

  • tf.data API Guide

  • Data Pipeline Performance

  • TFRecord Format

  • Image Data Loading

  • CSV Data Loading

  • Data Augmentation Guide

  • Preprocessing Layers

  • Building Input Pipelines

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

android-jetpack-compose

No summary provided by upstream source.

Repository SourceNeeds Review
General

fastapi-async-patterns

No summary provided by upstream source.

Repository SourceNeeds Review
General

storybook-story-writing

No summary provided by upstream source.

Repository SourceNeeds Review
General

atomic-design-fundamentals

No summary provided by upstream source.

Repository SourceNeeds Review