- Ray Data - Scalable ML Data Processing
- Distributed data processing library for ML and AI workloads.
- When to use Ray Data
- Use Ray Data when:
- Processing large datasets (>100GB) for ML training
- Need distributed data preprocessing across cluster
- Building batch inference pipelines
- Loading multi-modal data (images, audio, video)
- Scaling data processing from laptop to cluster
- Key features
- :
- Streaming execution
-
- Process data larger than memory
- GPU support
-
- Accelerate transforms with GPUs
- Framework integration
-
- PyTorch, TensorFlow, HuggingFace
- Multi-modal
-
- Images, Parquet, CSV, JSON, audio, video
- Use alternatives instead
- :
- Pandas
-
- Small data (<1GB) on single machine
- Dask
-
- Tabular data, SQL-like operations
- Spark
- Enterprise ETL, SQL queries Quick start Installation pip install -U 'ray[data]' Load and transform data import ray
Read Parquet files
ds
ray . data . read_parquet ( "s3://bucket/data/*.parquet" )
Transform data (lazy execution)
ds
ds . map_batches ( lambda batch : { "processed" : batch [ "text" ] . str . lower ( ) } )
Consume data
for batch in ds . iter_batches ( batch_size = 100 ) : print ( batch ) Integration with Ray Train import ray from ray . train import ScalingConfig from ray . train . torch import TorchTrainer
Create dataset
train_ds
ray . data . read_parquet ( "s3://bucket/train/*.parquet" ) def train_func ( config ) :
Access dataset in training
train_ds
ray . train . get_dataset_shard ( "train" ) for epoch in range ( 10 ) : for batch in train_ds . iter_batches ( batch_size = 32 ) :
Train on batch
pass
Train with Ray
trainer
TorchTrainer ( train_func , datasets = { "train" : train_ds } , scaling_config = ScalingConfig ( num_workers = 4 , use_gpu = True ) ) trainer . fit ( ) Reading data From cloud storage import ray
Parquet (recommended for ML)
ds
ray . data . read_parquet ( "s3://bucket/data/*.parquet" )
CSV
ds
ray . data . read_csv ( "s3://bucket/data/*.csv" )
JSON
ds
ray . data . read_json ( "gs://bucket/data/*.json" )
Images
ds
ray . data . read_images ( "s3://bucket/images/" ) From Python objects
From list
ds
ray . data . from_items ( [ { "id" : i , "value" : i * 2 } for i in range ( 1000 ) ] )
From range
ds
ray . data . range ( 1000000 )
Synthetic data
From pandas
import pandas as pd df = pd . DataFrame ( { "col1" : [ 1 , 2 , 3 ] , "col2" : [ 4 , 5 , 6 ] } ) ds = ray . data . from_pandas ( df ) Transformations Map batches (vectorized)
Batch transformation (fast)
def process_batch ( batch ) : batch [ "doubled" ] = batch [ "value" ] * 2 return batch ds = ds . map_batches ( process_batch , batch_size = 1000 ) Row transformations
Row-by-row (slower)
def process_row ( row ) : row [ "squared" ] = row [ "value" ] ** 2 return row ds = ds . map ( process_row ) Filter
Filter rows
ds
ds . filter ( lambda row : row [ "value" ]
100 ) Group by and aggregate
Group by column
ds
ds . groupby ( "category" ) . count ( )
Custom aggregation
ds
ds . groupby ( "category" ) . map_groups ( lambda group : { "sum" : group [ "value" ] . sum ( ) } ) GPU-accelerated transforms
Use GPU for preprocessing
def preprocess_images_gpu ( batch ) : import torch images = torch . tensor ( batch [ "image" ] ) . cuda ( )
GPU preprocessing
processed
images * 255 return { "processed" : processed . cpu ( ) . numpy ( ) } ds = ds . map_batches ( preprocess_images_gpu , batch_size = 64 , num_gpus = 1
Request GPU
) Writing data
Write to Parquet
ds . write_parquet ( "s3://bucket/output/" )
Write to CSV
ds . write_csv ( "output/" )
Write to JSON
ds . write_json ( "output/" ) Performance optimization Repartition
Control parallelism
ds
ds . repartition ( 100 )
100 blocks for 100-core cluster
Batch size tuning
Larger batches = faster vectorized ops
ds . map_batches ( process_fn , batch_size = 10000 )
vs batch_size=100
Streaming execution
Process data larger than memory
ds
ray . data . read_parquet ( "s3://huge-dataset/" ) for batch in ds . iter_batches ( batch_size = 1000 ) : process ( batch )
Streamed, not loaded to memory
Common patterns Batch inference import ray
Load model
def load_model ( ) :
Load once per worker
return MyModel ( )
Inference function
class BatchInference : def init ( self ) : self . model = load_model ( ) def call ( self , batch ) : predictions = self . model ( batch [ "input" ] ) return { "prediction" : predictions }
Run distributed inference
ds
ray . data . read_parquet ( "s3://data/" ) predictions = ds . map_batches ( BatchInference , batch_size = 32 , num_gpus = 1 ) predictions . write_parquet ( "s3://output/" ) Data preprocessing pipeline
Multi-step pipeline
ds
( ray . data . read_parquet ( "s3://raw/" ) . map_batches ( clean_data ) . map_batches ( tokenize ) . map_batches ( augment ) . write_parquet ( "s3://processed/" ) ) Integration with ML frameworks PyTorch
Convert to PyTorch
torch_ds
ds . to_torch ( label_column = "label" , batch_size = 32 ) for batch in torch_ds :
batch is dict with tensors
inputs , labels = batch [ "features" ] , batch [ "label" ] TensorFlow
Convert to TensorFlow
tf_ds
ds . to_tf ( feature_columns = [ "image" ] , label_column = "label" , batch_size = 32 ) for features , labels in tf_ds :
Train model
- pass
- Supported data formats
- Format
- Read
- Write
- Use Case
- Parquet
- ✅
- ✅
- ML data (recommended)
- CSV
- ✅
- ✅
- Tabular data
- JSON
- ✅
- ✅
- Semi-structured
- Images
- ✅
- ❌
- Computer vision
- NumPy
- ✅
- ✅
- Arrays
- Pandas
- ✅
- ❌
- DataFrames
- Performance benchmarks
- Scaling
- (processing 100GB data):
- 1 node (16 cores): ~30 minutes
- 4 nodes (64 cores): ~8 minutes
- 16 nodes (256 cores): ~2 minutes
- GPU acceleration
- (image preprocessing):
- CPU only: 1,000 images/sec
- 1 GPU: 5,000 images/sec
- 4 GPUs: 18,000 images/sec
- Use cases
- Production deployments
- :
-
- Last-mile data processing for model training
- ByteDance
-
- Scaling offline inference with multi-modal LLMs
- Spotify
-
- ML platform for batch inference
- References
- Transformations Guide
- - Map, filter, groupby operations
- Integration Guide
- - Ray Train, PyTorch, TensorFlow
- Resources
- Docs
- :
- https://docs.ray.io/en/latest/data/data.html
- GitHub
- :
- https://github.com/ray-project/ray
- ⭐ 36,000+
- Version
- Ray 2.40.0+ Examples : https://docs.ray.io/en/latest/data/examples/overview.html