ray-data

安装量: 62
排名: #12028

安装

npx skills add https://github.com/orchestra-research/ai-research-skills --skill ray-data
Ray Data - Scalable ML Data Processing
Distributed data processing library for ML and AI workloads.
When to use Ray Data
Use Ray Data when:
Processing large datasets (>100GB) for ML training
Need distributed data preprocessing across cluster
Building batch inference pipelines
Loading multi-modal data (images, audio, video)
Scaling data processing from laptop to cluster
Key features
:
Streaming execution
Process data larger than memory
GPU support
Accelerate transforms with GPUs
Framework integration
PyTorch, TensorFlow, HuggingFace
Multi-modal
Images, Parquet, CSV, JSON, audio, video
Use alternatives instead
:
Pandas
Small data (<1GB) on single machine
Dask
Tabular data, SQL-like operations
Spark
Enterprise ETL, SQL queries Quick start Installation pip install -U 'ray[data]' Load and transform data import ray

Read Parquet files

ds

ray . data . read_parquet ( "s3://bucket/data/*.parquet" )

Transform data (lazy execution)

ds

ds . map_batches ( lambda batch : { "processed" : batch [ "text" ] . str . lower ( ) } )

Consume data

for batch in ds . iter_batches ( batch_size = 100 ) : print ( batch ) Integration with Ray Train import ray from ray . train import ScalingConfig from ray . train . torch import TorchTrainer

Create dataset

train_ds

ray . data . read_parquet ( "s3://bucket/train/*.parquet" ) def train_func ( config ) :

Access dataset in training

train_ds

ray . train . get_dataset_shard ( "train" ) for epoch in range ( 10 ) : for batch in train_ds . iter_batches ( batch_size = 32 ) :

Train on batch

pass

Train with Ray

trainer

TorchTrainer ( train_func , datasets = { "train" : train_ds } , scaling_config = ScalingConfig ( num_workers = 4 , use_gpu = True ) ) trainer . fit ( ) Reading data From cloud storage import ray

Parquet (recommended for ML)

ds

ray . data . read_parquet ( "s3://bucket/data/*.parquet" )

CSV

ds

ray . data . read_csv ( "s3://bucket/data/*.csv" )

JSON

ds

ray . data . read_json ( "gs://bucket/data/*.json" )

Images

ds

ray . data . read_images ( "s3://bucket/images/" ) From Python objects

From list

ds

ray . data . from_items ( [ { "id" : i , "value" : i * 2 } for i in range ( 1000 ) ] )

From range

ds

ray . data . range ( 1000000 )

Synthetic data

From pandas

import pandas as pd df = pd . DataFrame ( { "col1" : [ 1 , 2 , 3 ] , "col2" : [ 4 , 5 , 6 ] } ) ds = ray . data . from_pandas ( df ) Transformations Map batches (vectorized)

Batch transformation (fast)

def process_batch ( batch ) : batch [ "doubled" ] = batch [ "value" ] * 2 return batch ds = ds . map_batches ( process_batch , batch_size = 1000 ) Row transformations

Row-by-row (slower)

def process_row ( row ) : row [ "squared" ] = row [ "value" ] ** 2 return row ds = ds . map ( process_row ) Filter

Filter rows

ds

ds . filter ( lambda row : row [ "value" ]

100 ) Group by and aggregate

Group by column

ds

ds . groupby ( "category" ) . count ( )

Custom aggregation

ds

ds . groupby ( "category" ) . map_groups ( lambda group : { "sum" : group [ "value" ] . sum ( ) } ) GPU-accelerated transforms

Use GPU for preprocessing

def preprocess_images_gpu ( batch ) : import torch images = torch . tensor ( batch [ "image" ] ) . cuda ( )

GPU preprocessing

processed

images * 255 return { "processed" : processed . cpu ( ) . numpy ( ) } ds = ds . map_batches ( preprocess_images_gpu , batch_size = 64 , num_gpus = 1

Request GPU

) Writing data

Write to Parquet

ds . write_parquet ( "s3://bucket/output/" )

Write to CSV

ds . write_csv ( "output/" )

Write to JSON

ds . write_json ( "output/" ) Performance optimization Repartition

Control parallelism

ds

ds . repartition ( 100 )

100 blocks for 100-core cluster

Batch size tuning

Larger batches = faster vectorized ops

ds . map_batches ( process_fn , batch_size = 10000 )

vs batch_size=100

Streaming execution

Process data larger than memory

ds

ray . data . read_parquet ( "s3://huge-dataset/" ) for batch in ds . iter_batches ( batch_size = 1000 ) : process ( batch )

Streamed, not loaded to memory

Common patterns Batch inference import ray

Load model

def load_model ( ) :

Load once per worker

return MyModel ( )

Inference function

class BatchInference : def init ( self ) : self . model = load_model ( ) def call ( self , batch ) : predictions = self . model ( batch [ "input" ] ) return { "prediction" : predictions }

Run distributed inference

ds

ray . data . read_parquet ( "s3://data/" ) predictions = ds . map_batches ( BatchInference , batch_size = 32 , num_gpus = 1 ) predictions . write_parquet ( "s3://output/" ) Data preprocessing pipeline

Multi-step pipeline

ds

( ray . data . read_parquet ( "s3://raw/" ) . map_batches ( clean_data ) . map_batches ( tokenize ) . map_batches ( augment ) . write_parquet ( "s3://processed/" ) ) Integration with ML frameworks PyTorch

Convert to PyTorch

torch_ds

ds . to_torch ( label_column = "label" , batch_size = 32 ) for batch in torch_ds :

batch is dict with tensors

inputs , labels = batch [ "features" ] , batch [ "label" ] TensorFlow

Convert to TensorFlow

tf_ds

ds . to_tf ( feature_columns = [ "image" ] , label_column = "label" , batch_size = 32 ) for features , labels in tf_ds :

Train model

pass
Supported data formats
Format
Read
Write
Use Case
Parquet
ML data (recommended)
CSV
Tabular data
JSON
Semi-structured
Images
Computer vision
NumPy
Arrays
Pandas
DataFrames
Performance benchmarks
Scaling
(processing 100GB data):
1 node (16 cores): ~30 minutes
4 nodes (64 cores): ~8 minutes
16 nodes (256 cores): ~2 minutes
GPU acceleration
(image preprocessing):
CPU only: 1,000 images/sec
1 GPU: 5,000 images/sec
4 GPUs: 18,000 images/sec
Use cases
Production deployments
:
Pinterest
Last-mile data processing for model training
ByteDance
Scaling offline inference with multi-modal LLMs
Spotify
ML platform for batch inference
References
Transformations Guide
- Map, filter, groupby operations
Integration Guide
- Ray Train, PyTorch, TensorFlow
Resources
Docs
:
https://docs.ray.io/en/latest/data/data.html
GitHub
:
https://github.com/ray-project/ray
⭐ 36,000+
Version
Ray 2.40.0+ Examples : https://docs.ray.io/en/latest/data/examples/overview.html
返回排行榜