安装
npx skills add https://github.com/patricio0312rev/skills --skill etl-sync-job-builder
- ETL/Sync Job Builder
- Build reliable, incremental data synchronization pipelines.
- ETL Job Pattern
- // jobs/sync-users.ts
- interface
- SyncJob
- {
- name
- :
- string
- ;
- source
- :
- "database"
- |
- "api"
- |
- "file"
- ;
- destination
- :
- "database"
- |
- "warehouse"
- |
- "s3"
- ;
- schedule
- :
- string
- ;
- }
- export
- class
- ETLJob
- {
- constructor
- (
- private
- name
- :
- string
- ,
- private
- watermarkKey
- :
- string
- )
- {
- }
- async
- run
- (
- )
- {
- console
- .
- log
- (
- `
- 🔄 Starting
- ${
- this
- .
- name
- }
- ...
- `
- )
- ;
- try
- {
- // 1. Get last watermark
- const
- lastSync
- =
- await
- this
- .
- getWatermark
- (
- )
- ;
- console
- .
- log
- (
- `
- Last sync:
- ${
- lastSync
- }
- `
- )
- ;
- // 2. Extract data
- const
- data
- =
- await
- this
- .
- extract
- (
- lastSync
- )
- ;
- console
- .
- log
- (
- `
- Extracted
- ${
- data
- .
- length
- }
- records
- `
- )
- ;
- // 3. Transform data
- const
- transformed
- =
- await
- this
- .
- transform
- (
- data
- )
- ;
- // 4. Load data
- await
- this
- .
- load
- (
- transformed
- )
- ;
- // 5. Update watermark
- await
- this
- .
- updateWatermark
- (
- new
- Date
- (
- )
- )
- ;
- console
- .
- log
- (
- `
- ✅
- ${
- this
- .
- name
- }
- complete
- `
- )
- ;
- }
- catch
- (
- error
- )
- {
- console
- .
- error
- (
- `
- ❌
- ${
- this
- .
- name
- }
- failed:
- `
- ,
- error
- )
- ;
- throw
- error
- ;
- }
- }
- private
- async
- extract
- (
- since
- :
- Date
- )
- {
- // Extract logic
- return
- [
- ]
- ;
- }
- private
- async
- transform
- (
- data
- :
- any
- [
- ]
- )
- {
- // Transform logic
- return
- data
- ;
- }
- private
- async
- load
- (
- data
- :
- any
- [
- ]
- )
- {
- // Load logic
- }
- private
- async
- getWatermark
- (
- )
- :
- Promise
- <
- Date
- >
- {
- const
- watermark
- =
- await
- prisma
- .
- syncWatermark
- .
- findUnique
- (
- {
- where
- :
- {
- key
- :
- this
- .
- watermarkKey
- }
- ,
- }
- )
- ;
- return
- watermark
- ?.
- lastSync
- ||
- new
- Date
- (
- 0
- )
- ;
- }
- private
- async
- updateWatermark
- (
- timestamp
- :
- Date
- )
- {
- await
- prisma
- .
- syncWatermark
- .
- upsert
- (
- {
- where
- :
- {
- key
- :
- this
- .
- watermarkKey
- }
- ,
- create
- :
- {
- key
- :
- this
- .
- watermarkKey
- ,
- lastSync
- :
- timestamp
- }
- ,
- update
- :
- {
- lastSync
- :
- timestamp
- }
- ,
- }
- )
- ;
- }
- }
- Watermark Strategy
- // Track sync progress
- model SyncWatermark {
- key String @id
- lastSync DateTime
- metadata Json?
- @@index([lastSync])
- }
- // Incremental sync using watermark
- async
- function
- syncOrdersIncremental
- (
- )
- {
- // Get last sync time
- const
- watermark
- =
- await
- prisma
- .
- syncWatermark
- .
- findUnique
- (
- {
- where
- :
- {
- key
- :
- "orders_sync"
- }
- ,
- }
- )
- ;
- const
- lastSync
- =
- watermark
- ?.
- lastSync
- ||
- new
- Date
- (
- 0
- )
- ;
- // Fetch only new/updated records
- const
- newOrders
- =
- await
- sourceDb
- .
- order
- .
- findMany
- (
- {
- where
- :
- {
- updated_at
- :
- {
- gt
- :
- lastSync
- }
- ,
- }
- ,
- orderBy
- :
- {
- updated_at
- :
- "asc"
- }
- ,
- }
- )
- ;
- console
- .
- log
- (
- `
- 📦 Syncing
- ${
- newOrders
- .
- length
- }
- orders...
- `
- )
- ;
- // Process in batches
- for
- (
- let
- i
- =
- 0
- ;
- i
- <
- newOrders
- .
- length
- ;
- i
- +=
- 100
- )
- {
- const
- batch
- =
- newOrders
- .
- slice
- (
- i
- ,
- i
- +
- 100
- )
- ;
- await
- destinationDb
- .
- order
- .
- createMany
- (
- {
- data
- :
- batch
- ,
- skipDuplicates
- :
- true
- ,
- // Idempotency
- }
- )
- ;
- }
- // Update watermark to latest record
- if
- (
- newOrders
- .
- length
- >
- 0
- )
- {
- const
- latestTimestamp
- =
- newOrders
- [
- newOrders
- .
- length
- -
- 1
- ]
- .
- updated_at
- ;
- await
- prisma
- .
- syncWatermark
- .
- upsert
- (
- {
- where
- :
- {
- key
- :
- "orders_sync"
- }
- ,
- create
- :
- {
- key
- :
- "orders_sync"
- ,
- lastSync
- :
- latestTimestamp
- }
- ,
- update
- :
- {
- lastSync
- :
- latestTimestamp
- }
- ,
- }
- )
- ;
- }
- console
- .
- log
- (
- `
- ✅ Sync complete
- `
- )
- ;
- }
- Idempotent Upsert Pattern
- // Idempotent sync - safe to run multiple times
- async
- function
- syncUsersIdempotent
- (
- users
- :
- User
- [
- ]
- )
- {
- for
- (
- const
- user
- of
- users
- )
- {
- await
- prisma
- .
- user
- .
- upsert
- (
- {
- where
- :
- {
- id
- :
- user
- .
- id
- }
- ,
- create
- :
- user
- ,
- update
- :
- {
- email
- :
- user
- .
- email
- ,
- name
- :
- user
- .
- name
- ,
- updated_at
- :
- user
- .
- updated_at
- ,
- }
- ,
- }
- )
- ;
- }
- }
- // Batch upsert for better performance
- async
- function
- syncUsersBatch
- (
- users
- :
- User
- [
- ]
- )
- {
- // PostgreSQL: Use ON CONFLICT
- await
- prisma
- .
- $executeRaw
- `
- INSERT INTO users (id, email, name, updated_at)
- SELECT * FROM UNNEST(
- ${
- users
- .
- map
- (
- (
- u
- )
- =>
- u
- .
- id
- )
- }
- ::bigint[],
- ${
- users
- .
- map
- (
- (
- u
- )
- =>
- u
- .
- email
- )
- }
- ::text[],
- ${
- users
- .
- map
- (
- (
- u
- )
- =>
- u
- .
- name
- )
- }
- ::text[],
- ${
- users
- .
- map
- (
- (
- u
- )
- =>
- u
- .
- updated_at
- )
- }
- ::timestamp[]
- )
- ON CONFLICT (id) DO UPDATE SET
- email = EXCLUDED.email,
- name = EXCLUDED.name,
- updated_at = EXCLUDED.updated_at
- WHERE users.updated_at < EXCLUDED.updated_at
- `
- ;
- }
- Retry Logic with Exponential Backoff
- async
- function
- syncWithRetry
- <
- T
- >
- (
- operation
- :
- (
- )
- =>
- Promise
- <
- T
- >
- ,
- maxRetries
- :
- number
- =
- 3
- ,
- baseDelay
- :
- number
- =
- 1000
- )
- :
- Promise
- <
- T
- >
- {
- for
- (
- let
- attempt
- =
- 0
- ;
- attempt
- <=
- maxRetries
- ;
- attempt
- ++
- )
- {
- try
- {
- return
- await
- operation
- (
- )
- ;
- }
- catch
- (
- error
- )
- {
- if
- (
- attempt
- ===
- maxRetries
- )
- throw
- error
- ;
- const
- delay
- =
- baseDelay
- *
- Math
- .
- pow
- (
- 2
- ,
- attempt
- )
- ;
- console
- .
- log
- (
- `
- Retry
- ${
- attempt
- +
- 1
- }
- /
- ${
- maxRetries
- }
- after
- ${
- delay
- }
- ms
- `
- )
- ;
- await
- sleep
- (
- delay
- )
- ;
- }
- }
- throw
- new
- Error
- (
- "Max retries exceeded"
- )
- ;
- }
- // Usage
- await
- syncWithRetry
- (
- async
- (
- )
- =>
- {
- return
- await
- syncOrders
- (
- )
- ;
- }
- ,
- 3
- ,
- 1000
- )
- ;
- Change Data Capture (CDC)
- // Listen to database changes
- import
- {
- PrismaClient
- }
- from
- "@prisma/client"
- ;
- const
- prisma
- =
- new
- PrismaClient
- (
- )
- ;
- // PostgreSQL: Listen to logical replication
- async
- function
- setupCDC
- (
- )
- {
- await
- prisma
- .
- $executeRaw
- `
- CREATE PUBLICATION orders_publication FOR TABLE orders;
- `
- ;
- // Subscribe to changes (using pg library)
- const
- client
- =
- await
- pg
- .
- connect
- (
- )
- ;
- client
- .
- query
- (
- "LISTEN orders_changed;"
- )
- ;
- client
- .
- on
- (
- "notification"
- ,
- async
- (
- msg
- )
- =>
- {
- const
- change
- =
- JSON
- .
- parse
- (
- msg
- .
- payload
- )
- ;
- if
- (
- change
- .
- operation
- ===
- "INSERT"
- ||
- change
- .
- operation
- ===
- "UPDATE"
- )
- {
- await
- syncOrder
- (
- change
- .
- data
- )
- ;
- }
- }
- )
- ;
- }
- Conflict Resolution
- interface
- ConflictResolution
- {
- strategy
- :
- "source-wins"
- |
- "dest-wins"
- |
- "latest-wins"
- |
- "merge"
- ;
- }
- async
- function
- syncWithConflictResolution
- (
- sourceRecord
- :
- any
- ,
- destRecord
- :
- any
- ,
- strategy
- :
- ConflictResolution
- [
- "strategy"
- ]
- )
- {
- if
- (
- strategy
- ===
- "source-wins"
- )
- {
- return
- sourceRecord
- ;
- }
- if
- (
- strategy
- ===
- "dest-wins"
- )
- {
- return
- destRecord
- ;
- }
- if
- (
- strategy
- ===
- "latest-wins"
- )
- {
- return
- sourceRecord
- .
- updated_at
- >
- destRecord
- .
- updated_at
- ?
- sourceRecord
- :
- destRecord
- ;
- }
- if
- (
- strategy
- ===
- "merge"
- )
- {
- // Merge non-null fields
- return
- {
- ...
- destRecord
- ,
- ...
- Object
- .
- fromEntries
- (
- Object
- .
- entries
- (
- sourceRecord
- )
- .
- filter
- (
- (
- [
- _
- ,
- v
- ]
- )
- =>
- v
- !=
- null
- )
- )
- ,
- }
- ;
- }
- }
- Monitoring & Observability
- // Track sync job metrics
- interface
- SyncMetrics
- {
- jobName
- :
- string
- ;
- startTime
- :
- Date
- ;
- endTime
- :
- Date
- ;
- recordsProcessed
- :
- number
- ;
- recordsInserted
- :
- number
- ;
- recordsUpdated
- :
- number
- ;
- recordsSkipped
- :
- number
- ;
- errors
- :
- number
- ;
- durationMs
- :
- number
- ;
- }
- async
- function
- logSyncMetrics
- (
- metrics
- :
- SyncMetrics
- )
- {
- await
- prisma
- .
- syncMetric
- .
- create
- (
- {
- data
- :
- metrics
- ,
- }
- )
- ;
- console
- .
- log
- (
- `
- 📊 Sync Metrics
- Job:
- ${
- metrics
- .
- jobName
- }
- Records:
- ${
- metrics
- .
- recordsProcessed
- }
- Inserted:
- ${
- metrics
- .
- recordsInserted
- }
- Updated:
- ${
- metrics
- .
- recordsUpdated
- }
- Errors:
- ${
- metrics
- .
- errors
- }
- Duration:
- ${
- metrics
- .
- durationMs
- }
- ms
- `
- )
- ;
- }
- Full ETL Job Example
- // jobs/sync-orders-to-warehouse.ts
- export
- class
- OrdersETLJob
- extends
- ETLJob
- {
- constructor
- (
- )
- {
- super
- (
- "orders-etl"
- ,
- "orders_warehouse_sync"
- )
- ;
- }
- async
- extract
- (
- since
- :
- Date
- )
- :
- Promise
- <
- Order
- [
- ]
- >
- {
- return
- prisma
- .
- order
- .
- findMany
- (
- {
- where
- :
- {
- updated_at
- :
- {
- gt
- :
- since
- }
- ,
- }
- ,
- include
- :
- {
- items
- :
- true
- ,
- user
- :
- true
- ,
- }
- ,
- orderBy
- :
- {
- updated_at
- :
- "asc"
- }
- ,
- }
- )
- ;
- }
- async
- transform
- (
- orders
- :
- Order
- [
- ]
- )
- :
- Promise
- <
- WarehouseOrder
- [
- ]
- >
- {
- return
- orders
- .
- map
- (
- (
- order
- )
- =>
- (
- {
- order_id
- :
- order
- .
- id
- ,
- user_email
- :
- order
- .
- user
- .
- email
- ,
- total_amount
- :
- order
- .
- total
- ,
- item_count
- :
- order
- .
- items
- .
- length
- ,
- status
- :
- order
- .
- status
- ,
- order_date
- :
- order
- .
- created_at
- ,
- synced_at
- :
- new
- Date
- (
- )
- ,
- }
- )
- )
- ;
- }
- async
- load
- (
- data
- :
- WarehouseOrder
- [
- ]
- )
- :
- Promise
- <
- void
- >
- {
- const
- batchSize
- =
- 100
- ;
- for
- (
- let
- i
- =
- 0
- ;
- i
- <
- data
- .
- length
- ;
- i
- +=
- batchSize
- )
- {
- const
- batch
- =
- data
- .
- slice
- (
- i
- ,
- i
- +
- batchSize
- )
- ;
- await
- warehouseDb
- .
- $executeRaw
- `
- INSERT INTO orders_fact (
- order_id, user_email, total_amount, item_count,
- status, order_date, synced_at
- )
- VALUES
- ${
- batch
- .
- map
- (
- (
- o
- )
- =>
- `
- (
- ${
- o
- .
- order_id
- }
- , '
- ${
- o
- .
- user_email
- }
- ',
- ${
- o
- .
- total_amount
- }
- ,
- ${
- o
- .
- item_count
- }
- , '
- ${
- o
- .
- status
- }
- ', '
- ${
- o
- .
- order_date
- }
- ',
- '
- ${
- o
- .
- synced_at
- }
- '
- )
- `
- )
- .
- join
- (
- ","
- )
- }
- ON CONFLICT (order_id) DO UPDATE SET
- total_amount = EXCLUDED.total_amount,
- status = EXCLUDED.status,
- synced_at = EXCLUDED.synced_at
- `
- ;
- }
- }
- }
- // Run job
- new
- OrdersETLJob
- (
- )
- .
- run
- (
- )
- ;
- Scheduling
- // Schedule ETL jobs
- import
- cron
- from
- "node-cron"
- ;
- // Run every hour
- cron
- .
- schedule
- (
- "0 * * * *"
- ,
- async
- (
- )
- =>
- {
- await
- new
- OrdersETLJob
- (
- )
- .
- run
- (
- )
- ;
- }
- )
- ;
- // Run every 15 minutes
- cron
- .
- schedule
- (
- "/15 * * * "
- ,
- async
- (
- )
- =>
- {
- await
- syncUsersIncremental
- (
- )
- ;
- }
- )
- ;
- // Run nightly at 2 AM
- cron
- .
- schedule
- (
- "0 2 * * *"
- ,
- async
- (
- )
- =>
- {
- await
- fullDataSync
- (
- )
- ;
- }
- )
- ;
- Error Handling & Recovery
- async
- function
- syncWithErrorHandling
- (
- )
- {
- const
- checkpoint
- =
- await
- getCheckpoint
- (
- )
- ;
- let
- processedRecords
- =
- 0
- ;
- try
- {
- const
- records
- =
- await
- fetchRecords
- (
- checkpoint
- )
- ;
- for
- (
- const
- record
- of
- records
- )
- {
- try
- {
- await
- processRecord
- (
- record
- )
- ;
- processedRecords
- ++
- ;
- // Save checkpoint every 100 records
- if
- (
- processedRecords
- %
- 100
- ===
- 0
- )
- {
- await
- saveCheckpoint
- (
- record
- .
- id
- )
- ;
- }
- }
- catch
- (
- error
- )
- {
- // Log error but continue
- console
- .
- error
- (
- `
- Failed to process record
- ${
- record
- .
- id
- }
- :
- `
- ,
- error
- )
- ;
- await
- logFailedRecord
- (
- record
- .
- id
- ,
- error
- )
- ;
- }
- }
- await
- saveCheckpoint
- (
- "completed"
- )
- ;
- }
- catch
- (
- error
- )
- {
- // Critical failure - job will retry from checkpoint
- console
- .
- error
- (
- "Job failed:"
- ,
- error
- )
- ;
- throw
- error
- ;
- }
- }
- Best Practices
- Incremental sync
-
- Use watermarks, don't full-scan
- Idempotent operations
-
- Safe to retry
- Batch processing
-
- Process 100-1000 records at a time
- Checkpointing
-
- Resume from failure point
- Retry with backoff
-
- Handle transient failures
- Monitor metrics
-
- Track job health
- Test thoroughly
- Including failure scenarios
Output Checklist
ETL job class created
Watermark tracking implemented
Incremental sync logic
Idempotent upsert operations
Retry logic with backoff
Conflict resolution strategy
Monitoring and metrics
Error handling and recovery
Job scheduling configured
Testing including failure cases
← 返回排行榜