- ML Data Pipeline Architecture
- Patterns for efficient ML data pipelines using Polars, Arrow, and ClickHouse.
- ADR
- :
- 2026-01-22-polars-preference-hook
- (efficiency preferences framework)
- Note
- A PreToolUse hook enforces Polars preference. To use Pandas, add
polars-exception:
at file top. When to Use This Skill Use this skill when: Deciding between Polars and Pandas for a data pipeline Optimizing memory usage with zero-copy Arrow patterns Loading data from ClickHouse into PyTorch DataLoaders Implementing lazy evaluation for large datasets Migrating existing Pandas code to Polars 1. Decision Tree: Polars vs Pandas Dataset size? ├─ < 1M rows → Pandas OK (simpler API, richer ecosystem) ├─ 1M-10M rows → Consider Polars (2-5x faster, less memory) └─ > 10M rows → Use Polars (required for memory efficiency) Operations? ├─ Simple transforms → Either works ├─ Group-by aggregations → Polars 5-10x faster ├─ Complex joins → Polars with lazy evaluation └─ Streaming/chunked → Polars scan_* functions Integration? ├─ scikit-learn heavy → Pandas (better interop) ├─ PyTorch/custom → Polars + Arrow (zero-copy to tensor) └─ ClickHouse source → Arrow stream → Polars (optimal) 2. Zero-Copy Pipeline Architecture The Problem with Pandas
BAD: 3 memory copies
df
pd . read_sql ( query , conn )
Copy 1: DB → pandas
X
df [ features ] . values
Copy 2: pandas → numpy
tensor
torch . from_numpy ( X )
Copy 3: numpy → tensor
Peak memory: 3x data size
The Solution with Arrow
GOOD: 0-1 memory copies
import clickhouse_connect import polars as pl import torch client = clickhouse_connect . get_client ( . . . ) arrow_table = client . query_arrow ( "SELECT * FROM bars" )
Arrow in DB memory
df
pl . from_arrow ( arrow_table )
Zero-copy view
X
df . select ( features ) . to_numpy ( )
Single allocation
tensor
torch . from_numpy ( X )
View (no copy)
Peak memory: 1.2x data size
- ClickHouse Integration Patterns
Pattern A: Arrow Stream (Recommended)
def
query_arrow
(
client
,
query
:
str
)
-
pl . DataFrame : """ClickHouse → Arrow → Polars (zero-copy chain).""" arrow_table = client . query_arrow ( f" { query } FORMAT ArrowStream" ) return pl . from_arrow ( arrow_table )
Usage
df
query_arrow ( client , "SELECT * FROM bars WHERE ts >= '2024-01-01'" ) Pattern B: Polars Native (Simpler)
Polars has native ClickHouse support (see pola.rs for version requirements)
df
pl . read_database_uri ( query = "SELECT * FROM bars" , uri = "clickhouse://user:pass@host/db" ) Pattern C: Parquet Export (Batch Jobs)
For reproducible batch processing
client . query ( "SELECT * FROM bars INTO OUTFILE 'data.parquet' FORMAT Parquet" ) df = pl . scan_parquet ( "data.parquet" )
Lazy, memory-mapped
- PyTorch DataLoader Integration Minimal Change Pattern from torch . utils . data import TensorDataset , DataLoader
Accept both pandas and polars
def prepare_data ( df ) -
tuple [ torch . Tensor , torch . Tensor ] : if isinstance ( df , pd . DataFrame ) : df = pl . from_pandas ( df ) X = df . select ( features ) . to_numpy ( ) y = df . select ( target ) . to_numpy ( ) return ( torch . from_numpy ( X ) . float ( ) , torch . from_numpy ( y ) . float ( ) ) X , y = prepare_data ( df ) dataset = TensorDataset ( X , y ) loader = DataLoader ( dataset , batch_size = 32 , pin_memory = True ) Custom PolarsDataset (Large Data) class PolarsDataset ( torch . utils . data . Dataset ) : """Memory-efficient dataset from Polars DataFrame.""" def init ( self , df : pl . DataFrame , features : list [ str ] , target : str ) : self . arrow = df . to_arrow ( )
Arrow backing for zero-copy slicing
self . features = features self . target = target def len ( self ) -
int : return self . arrow . num_rows def getitem ( self , idx : int ) -
tuple [ torch . Tensor , torch . Tensor ] : row = self . arrow . slice ( idx , 1 ) x = torch . tensor ( [ row [ f ] [ 0 ] . as_py ( ) for f in self . features ] , dtype = torch . float32 ) y = torch . tensor ( row [ self . target ] [ 0 ] . as_py ( ) , dtype = torch . float32 ) return x , y 5. Lazy Evaluation Patterns Pipeline Composition
Define transformations lazily (no computation yet)
pipeline
( pl . scan_parquet ( "raw_data.parquet" ) . filter ( pl . col ( "timestamp" )
= start_date ) . with_columns ( [ ( pl . col ( "close" ) . pct_change ( ) ) . alias ( "returns" ) , ( pl . col ( "volume" ) . log ( ) ) . alias ( "log_volume" ) , ] ) . select ( features + [ target ] ) )
Execute only when needed
train_df
pipeline . filter ( pl . col ( "timestamp" ) < split_date ) . collect ( ) test_df = pipeline . filter ( pl . col ( "timestamp" )
= split_date ) . collect ( ) Streaming for Large Files
Process file in chunks (never loads full file)
def process_large_file ( path : str , chunk_size : int = 100_000 ) : reader = pl . scan_parquet ( path ) for batch in reader . iter_batches ( n_rows = chunk_size ) :
Process each chunk
features
compute_features ( batch ) yield features . to_numpy ( ) 6. Schema Validation Pydantic for Config from pydantic import BaseModel , field_validator class FeatureConfig ( BaseModel ) : features : list [ str ] target : str seq_len : int = 15 @field_validator ( "features" ) @classmethod def validate_features ( cls , v ) : required = { "returns_vs" , "momentum_z" , "atr_pct" } missing = required - set ( v ) if missing : raise ValueError ( f"Missing required features: { missing } " ) return v DataFrame Schema Validation def validate_schema ( df : pl . DataFrame , required : list [ str ] , stage : str ) -
None : """Fail-fast schema validation.""" missing = [ c for c in required if c not in df . columns ] if missing : raise ValueError ( f"[ { stage } ] Missing columns: { missing } \n" f"Available: { sorted ( df . columns ) } " ) 7. Performance Benchmarks Operation Pandas Polars Speedup Read CSV (1GB) 45s 4s 11x Filter rows 2.1s 0.4s 5x Group-by agg 3.8s 0.3s 13x Sort 5.2s 0.4s 13x Memory peak 10GB 2.5GB 4x Benchmark: 50M rows, 20 columns, MacBook M2 8. Migration Checklist Phase 1: Add Arrow Support Add polars = "
" to dependencies (see PyPI ) Implement query_arrow() in data client Verify zero-copy with memory profiler Phase 2: Polars at Entry Points Add pl.from_pandas() wrapper at trainer entry Update prepare_sequences() to accept both types Add schema validation after conversion Phase 3: Full Lazy Evaluation Convert file reads to pl.scan_* Compose transformations lazily Call .collect() only before .to_numpy() 9. Anti-Patterns to Avoid DON'T: Mix APIs Unnecessarily
BAD: Convert back and forth
df_polars
pl . from_pandas ( df_pandas ) df_pandas_again = df_polars . to_pandas ( )
Why?
DON'T: Collect Too Early
BAD: Defeats lazy evaluation
df
pl . scan_parquet ( "data.parquet" ) . collect ( )
Full load
filtered
df . filter ( . . . )
After the fact
GOOD: Filter before collect
df
pl . scan_parquet ( "data.parquet" ) . filter ( . . . ) . collect ( ) DON'T: Ignore Memory Pressure
BAD: Loads entire file
df
pl . read_parquet ( "huge_file.parquet" )
GOOD: Stream in chunks
for batch in pl . scan_parquet ( "huge_file.parquet" ) . iter_batches ( ) : process ( batch ) References Polars User Guide Polars Migration Guide Apache Arrow Python ClickHouse Python Client PyTorch Data Loading Polars Preference Hook ADR Troubleshooting Issue Cause Solution Memory spike during load Collecting too early Use lazy evaluation, call collect() only when needed Arrow conversion fails Unsupported data type Check for object columns, convert to native types ClickHouse connection error Wrong port or credentials Verify host:8123 (HTTP) or host:9000 (native) Zero-copy not working Intermediate pandas conversion Remove to_pandas() calls, stay in Arrow/Polars Polars hook blocking code Pandas used without exception Add
polars-exception: reason
comment at file top Slow group-by operations Using pandas for large datasets Migrate to Polars for 5-10x speedup Schema validation failure Column names case-sensitive Verify exact column names from source PyTorch DataLoader OOM Loading full dataset into memory Use PolarsDataset with Arrow backing for lazy access Parquet scan performance Not using predicate pushdown Add filters before collect() for lazy evaluation Type mismatch in tensor Float64 vs Float32 mismatch Explicitly cast with .cast(pl.Float32) before numpy