data-pipeline

安装量: 193
排名: #4439

安装

npx skills add https://github.com/claude-office-skills/skills --skill data-pipeline

Data Pipeline Build data pipelines and ETL workflows for data integration, transformation, and analytics automation. Based on n8n's data workflow templates. Overview This skill covers: Data extraction from multiple sources Transformation and cleaning Loading to destinations Scheduling and monitoring Error handling and alerts ETL Patterns Basic ETL Flow ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ EXTRACT │───▶│ TRANSFORM │───▶│ LOAD │ │ │ │ │ │ │ │ • APIs │ │ • Clean │ │ • Database │ │ • Databases │ │ • Map │ │ • Warehouse │ │ • Files │ │ • Aggregate │ │ • Files │ │ • Webhooks │ │ • Enrich │ │ • APIs │ └─────────────┘ └─────────────┘ └─────────────┘ n8n ETL Workflow workflow : "Daily Sales ETL" schedule : "2am daily" nodes :

EXTRACT

- name : "Extract from Shopify" type : shopify action : get_orders filter : created_at

= yesterday

name : "Extract from Stripe" type : stripe action : get_payments filter : created

= yesterday

TRANSFORM

- name : "Merge Data" type : merge mode : combine_by_key key : order_id - name : "Transform" type : code code : | return items.map(item => ({ date: item.created_at.split('T')[0], order_id: item.id, customer_email: item.email, total: parseFloat(item.total_price), currency: item.currency, items: item.line_items.length, source: item.source_name, payment_status: item.payment.status }));

LOAD

-
name
:
"Load to BigQuery"
type
:
google_bigquery
action
:
insert_rows
table
:
sales_daily
-
name
:
"Update Google Sheets"
type
:
google_sheets
action
:
append_rows
spreadsheet
:
"Daily Sales Report"
Data Sources
Common Extractors
extractors
:
databases
:
-
postgresql
:
connection
:
connection_string
query
:
"SELECT * FROM orders WHERE date >= $1"
-
mysql
:
connection
:
connection_string
query
:
custom_sql
-
mongodb
:
connection
:
connection_string
collection
:
orders
filter
:
{
date
:
{
$gte
:
yesterday
}
}
apis
:
-
rest_api
:
url
:
"https://api.example.com/data"
method
:
GET
headers
:
{
Authorization
:
"Bearer {token}"
}
pagination
:
handle_automatically
-
graphql
:
url
:
"https://api.example.com/graphql"
query
:
graphql_query
files
:
-
csv
:
source
:
sftp/s3/google_drive
delimiter
:
","
encoding
:
utf
-
8
-
excel
:
source
:
file_path
sheet
:
"Sheet1"
-
json
:
source
:
api/file
path
:
"data.items"
saas
:
-
salesforce
:
get_objects
-
hubspot
:
get_contacts/deals
-
stripe
:
get_charges
-
shopify
:
get_orders
Transformations
Common Transformations
transformations
:
cleaning
:
-
remove_nulls
:
drop_or_fill
-
trim_whitespace
:
all_string_fields
-
deduplicate
:
by_key
-
validate
:
against_schema
mapping
:
-
rename_fields
:
{
old_name
:
new_name
}
-
convert_types
:
{
date_string
:
date
}
-
map_values
:
{
status_code
:
status_name
}
aggregation
:
-
group_by
:
[
date
,
category
]
-
sum
:
[
revenue
,
quantity
]
-
count
:
orders
-
average
:
order_value
enrichment
:
-
lookup
:
from_reference_table
-
geocode
:
from_address
-
calculate
:
derived_fields
filtering
:
-
where
:
condition
-
limit
:
n_rows
-
sample
:
percentage
Code Transform Examples
// Clean and normalize data
function
transform
(
items
)
{
return
items
.
map
(
item
=>
(
{
// Clean strings
name
:
item
.
name
?.
trim
(
)
.
toLowerCase
(
)
,
// Parse dates
date
:
new
Date
(
item
.
created_at
)
.
toISOString
(
)
.
split
(
'T'
)
[
0
]
,
// Convert types
amount
:
parseFloat
(
item
.
amount
)
||
0
,
// Map values
status
:
statusMap
[
item
.
status_code
]
||
'unknown'
,
// Calculate fields
total
:
item
.
quantity
*
item
.
unit_price
,
// Filter nested
tags
:
item
.
tags
?.
filter
(
t
=>
t
.
active
)
.
map
(
t
=>
t
.
name
)
,
// Default values
source
:
item
.
source
||
'direct'
}
)
)
;
}
// Aggregate data
function
aggregate
(
items
)
{
const
grouped
=
{
}
;
items
.
forEach
(
item
=>
{
const
key
=
`
${
item
.
date
}
_
${
item
.
category
}
`
;
if
(
!
grouped
[
key
]
)
{
grouped
[
key
]
=
{
date
:
item
.
date
,
category
:
item
.
category
,
total_revenue
:
0
,
order_count
:
0
}
;
}
grouped
[
key
]
.
total_revenue
+=
item
.
amount
;
grouped
[
key
]
.
order_count
+=
1
;
}
)
;
return
Object
.
values
(
grouped
)
;
}
Data Destinations
Common Loaders
loaders
:
data_warehouses
:
-
bigquery
:
project
:
project_id
dataset
:
analytics
table
:
sales
write_mode
:
append/truncate
-
snowflake
:
account
:
account_id
warehouse
:
compute_wh
database
:
analytics
schema
:
public
-
redshift
:
cluster
:
cluster_id
database
:
analytics
databases
:
-
postgresql
:
upsert
:
on_conflict_update
-
mysql
:
batch_insert
:
1000_rows
files
:
-
s3
:
bucket
:
data
-
lake
path
:
/processed/
{
date
}
/
format
:
parquet
-
google_cloud_storage
:
bucket
:
data
-
bucket
spreadsheets
:
-
google_sheets
:
mode
:
append/overwrite
-
airtable
:
base
:
base_id
table
:
table_name
apis
:
-
webhook
:
url
:
destination_url
batch_size
:
100
Scheduling & Monitoring
Pipeline Scheduling
scheduling
:
patterns
:
hourly
:
cron
:
"0 * * * *"
use_for
:
real_time_dashboards
daily
:
cron
:
"0 2 * * *"
use_for
:
daily_reports
weekly
:
cron
:
"0 3 * * 1"
use_for
:
weekly_summaries
on_demand
:
trigger
:
webhook/manual
use_for
:
ad_hoc_analysis
dependencies
:
-
pipeline_a
:
must_complete_before pipeline_b
-
wait_for
:
all_extracts_complete
retries
:
max_attempts
:
3
delay
:
exponential_backoff
alert_on
:
final_failure
Monitoring & Alerts
monitoring
:
metrics
:
-
rows_processed
-
execution_time
-
error_count
-
data_freshness
alerts
:
pipeline_failed
:
channels
:
[
slack
,
pagerduty
]
template
:
|
🚨 Pipeline Failed
Pipeline
:
{
pipeline_name
}
Stage
:
{
failed_stage
}
Error
:
{
error_message
}
[
View Logs
]
(
{
logs_url
}
)
data_quality
:
trigger
:
anomaly_detected
conditions
:
-
row_count
:
differs_by
>
50%
-
null_rate
:
exceeds_threshold
-
schema
:
changed_unexpectedly
stale_data
:
trigger
:
last_update
>
threshold
threshold
:
2_hours
Data Quality
Quality Checks
data_quality
:
schema_validation
:
-
required_fields
:
[
id
,
date
,
amount
]
-
field_types
:
id
:
integer
date
:
date
amount
:
number
-
allowed_values
:
status
:
[
active
,
pending
,
closed
]
statistical_checks
:
-
null_rate
:
< 5%
-
duplicate_rate
:
< 1%
-
value_range
:
amount
:
[
0
,
1000000
]
business_rules
:
-
total_equals_sum_of_line_items
-
dates_are_not_in_future
-
email_format_valid
trend_analysis
:
-
row_count
:
within_2_std_of_mean
-
total_value
:
within_expected_range
Output Example
Request
"Create a daily sales data pipeline" Output :

Daily Sales Data Pipeline

Pipeline Overview Shopify + Stripe → Transform → BigQuery + Sheets

Schedule

  • Runs: 2am daily
  • Timezone: UTC
  • Retry: 3 attempts

Extract

Shopify Orders

```yaml source: shopify filter: created_at >= yesterday fields: [id, email, total_price, line_items, created_at] Stripe Payments source : stripe filter : created

= yesterday fields : [ id , amount , status , metadata.order_id ] Transform // Join and clean data { date : order . created_at . split ( 'T' ) [ 0 ] , order_id : order . id , customer : order . email , revenue : parseFloat ( order . total_price ) , items : order . line_items . length , payment_status : payment . status } Load BigQuery Table: analytics.sales_daily Mode: Append Google Sheets Sheet: "Daily Sales Dashboard" Tab: "Raw Data" Quality Checks Row count > 0 No null order_ids Revenue sum matches Stripe Alerts Slack: #data-alerts On failure: @data-team


Data Pipeline Skill - Part of Claude Office Skills

返回排行榜