Data Pipeline Build data pipelines and ETL workflows for data integration, transformation, and analytics automation. Based on n8n's data workflow templates. Overview This skill covers: Data extraction from multiple sources Transformation and cleaning Loading to destinations Scheduling and monitoring Error handling and alerts ETL Patterns Basic ETL Flow ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ EXTRACT │───▶│ TRANSFORM │───▶│ LOAD │ │ │ │ │ │ │ │ • APIs │ │ • Clean │ │ • Database │ │ • Databases │ │ • Map │ │ • Warehouse │ │ • Files │ │ • Aggregate │ │ • Files │ │ • Webhooks │ │ • Enrich │ │ • APIs │ └─────────────┘ └─────────────┘ └─────────────┘ n8n ETL Workflow workflow : "Daily Sales ETL" schedule : "2am daily" nodes :
EXTRACT
- name : "Extract from Shopify" type : shopify action : get_orders filter : created_at
= yesterday
name : "Extract from Stripe" type : stripe action : get_payments filter : created
= yesterday
TRANSFORM
- name : "Merge Data" type : merge mode : combine_by_key key : order_id - name : "Transform" type : code code : | return items.map(item => ({ date: item.created_at.split('T')[0], order_id: item.id, customer_email: item.email, total: parseFloat(item.total_price), currency: item.currency, items: item.line_items.length, source: item.source_name, payment_status: item.payment.status }));
LOAD
- -
- name
- :
- "Load to BigQuery"
- type
- :
- google_bigquery
- action
- :
- insert_rows
- table
- :
- sales_daily
- -
- name
- :
- "Update Google Sheets"
- type
- :
- google_sheets
- action
- :
- append_rows
- spreadsheet
- :
- "Daily Sales Report"
- Data Sources
- Common Extractors
- extractors
- :
- databases
- :
- -
- postgresql
- :
- connection
- :
- connection_string
- query
- :
- "SELECT * FROM orders WHERE date >= $1"
- -
- mysql
- :
- connection
- :
- connection_string
- query
- :
- custom_sql
- -
- mongodb
- :
- connection
- :
- connection_string
- collection
- :
- orders
- filter
- :
- {
- date
- :
- {
- $gte
- :
- yesterday
- }
- }
- apis
- :
- -
- rest_api
- :
- url
- :
- "https://api.example.com/data"
- method
- :
- GET
- headers
- :
- {
- Authorization
- :
- "Bearer {token}"
- }
- pagination
- :
- handle_automatically
- -
- graphql
- :
- url
- :
- "https://api.example.com/graphql"
- query
- :
- graphql_query
- files
- :
- -
- csv
- :
- source
- :
- sftp/s3/google_drive
- delimiter
- :
- ","
- encoding
- :
- utf
- -
- 8
- -
- excel
- :
- source
- :
- file_path
- sheet
- :
- "Sheet1"
- -
- json
- :
- source
- :
- api/file
- path
- :
- "data.items"
- saas
- :
- -
- salesforce
- :
- get_objects
- -
- hubspot
- :
- get_contacts/deals
- -
- stripe
- :
- get_charges
- -
- shopify
- :
- get_orders
- Transformations
- Common Transformations
- transformations
- :
- cleaning
- :
- -
- remove_nulls
- :
- drop_or_fill
- -
- trim_whitespace
- :
- all_string_fields
- -
- deduplicate
- :
- by_key
- -
- validate
- :
- against_schema
- mapping
- :
- -
- rename_fields
- :
- {
- old_name
- :
- new_name
- }
- -
- convert_types
- :
- {
- date_string
- :
- date
- }
- -
- map_values
- :
- {
- status_code
- :
- status_name
- }
- aggregation
- :
- -
- group_by
- :
- [
- date
- ,
- category
- ]
- -
- sum
- :
- [
- revenue
- ,
- quantity
- ]
- -
- count
- :
- orders
- -
- average
- :
- order_value
- enrichment
- :
- -
- lookup
- :
- from_reference_table
- -
- geocode
- :
- from_address
- -
- calculate
- :
- derived_fields
- filtering
- :
- -
- where
- :
- condition
- -
- limit
- :
- n_rows
- -
- sample
- :
- percentage
- Code Transform Examples
- // Clean and normalize data
- function
- transform
- (
- items
- )
- {
- return
- items
- .
- map
- (
- item
- =>
- (
- {
- // Clean strings
- name
- :
- item
- .
- name
- ?.
- trim
- (
- )
- .
- toLowerCase
- (
- )
- ,
- // Parse dates
- date
- :
- new
- Date
- (
- item
- .
- created_at
- )
- .
- toISOString
- (
- )
- .
- split
- (
- 'T'
- )
- [
- 0
- ]
- ,
- // Convert types
- amount
- :
- parseFloat
- (
- item
- .
- amount
- )
- ||
- 0
- ,
- // Map values
- status
- :
- statusMap
- [
- item
- .
- status_code
- ]
- ||
- 'unknown'
- ,
- // Calculate fields
- total
- :
- item
- .
- quantity
- *
- item
- .
- unit_price
- ,
- // Filter nested
- tags
- :
- item
- .
- tags
- ?.
- filter
- (
- t
- =>
- t
- .
- active
- )
- .
- map
- (
- t
- =>
- t
- .
- name
- )
- ,
- // Default values
- source
- :
- item
- .
- source
- ||
- 'direct'
- }
- )
- )
- ;
- }
- // Aggregate data
- function
- aggregate
- (
- items
- )
- {
- const
- grouped
- =
- {
- }
- ;
- items
- .
- forEach
- (
- item
- =>
- {
- const
- key
- =
- `
- ${
- item
- .
- date
- }
- _
- ${
- item
- .
- category
- }
- `
- ;
- if
- (
- !
- grouped
- [
- key
- ]
- )
- {
- grouped
- [
- key
- ]
- =
- {
- date
- :
- item
- .
- date
- ,
- category
- :
- item
- .
- category
- ,
- total_revenue
- :
- 0
- ,
- order_count
- :
- 0
- }
- ;
- }
- grouped
- [
- key
- ]
- .
- total_revenue
- +=
- item
- .
- amount
- ;
- grouped
- [
- key
- ]
- .
- order_count
- +=
- 1
- ;
- }
- )
- ;
- return
- Object
- .
- values
- (
- grouped
- )
- ;
- }
- Data Destinations
- Common Loaders
- loaders
- :
- data_warehouses
- :
- -
- bigquery
- :
- project
- :
- project_id
- dataset
- :
- analytics
- table
- :
- sales
- write_mode
- :
- append/truncate
- -
- snowflake
- :
- account
- :
- account_id
- warehouse
- :
- compute_wh
- database
- :
- analytics
- schema
- :
- public
- -
- redshift
- :
- cluster
- :
- cluster_id
- database
- :
- analytics
- databases
- :
- -
- postgresql
- :
- upsert
- :
- on_conflict_update
- -
- mysql
- :
- batch_insert
- :
- 1000_rows
- files
- :
- -
- s3
- :
- bucket
- :
- data
- -
- lake
- path
- :
- /processed/
- {
- date
- }
- /
- format
- :
- parquet
- -
- google_cloud_storage
- :
- bucket
- :
- data
- -
- bucket
- spreadsheets
- :
- -
- google_sheets
- :
- mode
- :
- append/overwrite
- -
- airtable
- :
- base
- :
- base_id
- table
- :
- table_name
- apis
- :
- -
- webhook
- :
- url
- :
- destination_url
- batch_size
- :
- 100
- Scheduling & Monitoring
- Pipeline Scheduling
- scheduling
- :
- patterns
- :
- hourly
- :
- cron
- :
- "0 * * * *"
- use_for
- :
- real_time_dashboards
- daily
- :
- cron
- :
- "0 2 * * *"
- use_for
- :
- daily_reports
- weekly
- :
- cron
- :
- "0 3 * * 1"
- use_for
- :
- weekly_summaries
- on_demand
- :
- trigger
- :
- webhook/manual
- use_for
- :
- ad_hoc_analysis
- dependencies
- :
- -
- pipeline_a
- :
- must_complete_before pipeline_b
- -
- wait_for
- :
- all_extracts_complete
- retries
- :
- max_attempts
- :
- 3
- delay
- :
- exponential_backoff
- alert_on
- :
- final_failure
- Monitoring & Alerts
- monitoring
- :
- metrics
- :
- -
- rows_processed
- -
- execution_time
- -
- error_count
- -
- data_freshness
- alerts
- :
- pipeline_failed
- :
- channels
- :
- [
- slack
- ,
- pagerduty
- ]
- template
- :
- |
- 🚨 Pipeline Failed
- Pipeline
- :
- {
- pipeline_name
- }
- Stage
- :
- {
- failed_stage
- }
- Error
- :
- {
- error_message
- }
- [
- View Logs
- ]
- (
- {
- logs_url
- }
- )
- data_quality
- :
- trigger
- :
- anomaly_detected
- conditions
- :
- -
- row_count
- :
- differs_by
- >
- 50%
- -
- null_rate
- :
- exceeds_threshold
- -
- schema
- :
- changed_unexpectedly
- stale_data
- :
- trigger
- :
- last_update
- >
- threshold
- threshold
- :
- 2_hours
- Data Quality
- Quality Checks
- data_quality
- :
- schema_validation
- :
- -
- required_fields
- :
- [
- id
- ,
- date
- ,
- amount
- ]
- -
- field_types
- :
- id
- :
- integer
- date
- :
- date
- amount
- :
- number
- -
- allowed_values
- :
- status
- :
- [
- active
- ,
- pending
- ,
- closed
- ]
- statistical_checks
- :
- -
- null_rate
- :
- < 5%
- -
- duplicate_rate
- :
- < 1%
- -
- value_range
- :
- amount
- :
- [
- 0
- ,
- 1000000
- ]
- business_rules
- :
- -
- total_equals_sum_of_line_items
- -
- dates_are_not_in_future
- -
- email_format_valid
- trend_analysis
- :
- -
- row_count
- :
- within_2_std_of_mean
- -
- total_value
- :
- within_expected_range
- Output Example
- Request
- "Create a daily sales data pipeline" Output :
Daily Sales Data Pipeline
Pipeline Overview Shopify + Stripe → Transform → BigQuery + Sheets
Schedule
- Runs: 2am daily
- Timezone: UTC
- Retry: 3 attempts
Extract
Shopify Orders
```yaml source: shopify filter: created_at >= yesterday fields: [id, email, total_price, line_items, created_at] Stripe Payments source : stripe filter : created
= yesterday fields : [ id , amount , status , metadata.order_id ] Transform // Join and clean data { date : order . created_at . split ( 'T' ) [ 0 ] , order_id : order . id , customer : order . email , revenue : parseFloat ( order . total_price ) , items : order . line_items . length , payment_status : payment . status } Load BigQuery Table: analytics.sales_daily Mode: Append Google Sheets Sheet: "Daily Sales Dashboard" Tab: "Raw Data" Quality Checks Row count > 0 No null order_ids Revenue sum matches Stripe Alerts Slack: #data-alerts On failure: @data-team
Data Pipeline Skill - Part of Claude Office Skills