etl-sync-job-builder

安装量: 41
排名: #17519

安装

npx skills add https://github.com/patricio0312rev/skills --skill etl-sync-job-builder
ETL/Sync Job Builder
Build reliable, incremental data synchronization pipelines.
ETL Job Pattern
// jobs/sync-users.ts
interface
SyncJob
{
name
:
string
;
source
:
"database"
|
"api"
|
"file"
;
destination
:
"database"
|
"warehouse"
|
"s3"
;
schedule
:
string
;
}
export
class
ETLJob
{
constructor
(
private
name
:
string
,
private
watermarkKey
:
string
)
{
}
async
run
(
)
{
console
.
log
(
`
🔄 Starting
${
this
.
name
}
...
`
)
;
try
{
// 1. Get last watermark
const
lastSync
=
await
this
.
getWatermark
(
)
;
console
.
log
(
`
Last sync:
${
lastSync
}
`
)
;
// 2. Extract data
const
data
=
await
this
.
extract
(
lastSync
)
;
console
.
log
(
`
Extracted
${
data
.
length
}
records
`
)
;
// 3. Transform data
const
transformed
=
await
this
.
transform
(
data
)
;
// 4. Load data
await
this
.
load
(
transformed
)
;
// 5. Update watermark
await
this
.
updateWatermark
(
new
Date
(
)
)
;
console
.
log
(
`
${
this
.
name
}
complete
`
)
;
}
catch
(
error
)
{
console
.
error
(
`
${
this
.
name
}
failed:
`
,
error
)
;
throw
error
;
}
}
private
async
extract
(
since
:
Date
)
{
// Extract logic
return
[
]
;
}
private
async
transform
(
data
:
any
[
]
)
{
// Transform logic
return
data
;
}
private
async
load
(
data
:
any
[
]
)
{
// Load logic
}
private
async
getWatermark
(
)
:
Promise
<
Date
>
{
const
watermark
=
await
prisma
.
syncWatermark
.
findUnique
(
{
where
:
{
key
:
this
.
watermarkKey
}
,
}
)
;
return
watermark
?.
lastSync
||
new
Date
(
0
)
;
}
private
async
updateWatermark
(
timestamp
:
Date
)
{
await
prisma
.
syncWatermark
.
upsert
(
{
where
:
{
key
:
this
.
watermarkKey
}
,
create
:
{
key
:
this
.
watermarkKey
,
lastSync
:
timestamp
}
,
update
:
{
lastSync
:
timestamp
}
,
}
)
;
}
}
Watermark Strategy
// Track sync progress
model SyncWatermark {
key String @id
lastSync DateTime
metadata Json?
@@index([lastSync])
}
// Incremental sync using watermark
async
function
syncOrdersIncremental
(
)
{
// Get last sync time
const
watermark
=
await
prisma
.
syncWatermark
.
findUnique
(
{
where
:
{
key
:
"orders_sync"
}
,
}
)
;
const
lastSync
=
watermark
?.
lastSync
||
new
Date
(
0
)
;
// Fetch only new/updated records
const
newOrders
=
await
sourceDb
.
order
.
findMany
(
{
where
:
{
updated_at
:
{
gt
:
lastSync
}
,
}
,
orderBy
:
{
updated_at
:
"asc"
}
,
}
)
;
console
.
log
(
`
📦 Syncing
${
newOrders
.
length
}
orders...
`
)
;
// Process in batches
for
(
let
i
=
0
;
i
<
newOrders
.
length
;
i
+=
100
)
{
const
batch
=
newOrders
.
slice
(
i
,
i
+
100
)
;
await
destinationDb
.
order
.
createMany
(
{
data
:
batch
,
skipDuplicates
:
true
,
// Idempotency
}
)
;
}
// Update watermark to latest record
if
(
newOrders
.
length
>
0
)
{
const
latestTimestamp
=
newOrders
[
newOrders
.
length
-
1
]
.
updated_at
;
await
prisma
.
syncWatermark
.
upsert
(
{
where
:
{
key
:
"orders_sync"
}
,
create
:
{
key
:
"orders_sync"
,
lastSync
:
latestTimestamp
}
,
update
:
{
lastSync
:
latestTimestamp
}
,
}
)
;
}
console
.
log
(
`
✅ Sync complete
`
)
;
}
Idempotent Upsert Pattern
// Idempotent sync - safe to run multiple times
async
function
syncUsersIdempotent
(
users
:
User
[
]
)
{
for
(
const
user
of
users
)
{
await
prisma
.
user
.
upsert
(
{
where
:
{
id
:
user
.
id
}
,
create
:
user
,
update
:
{
email
:
user
.
email
,
name
:
user
.
name
,
updated_at
:
user
.
updated_at
,
}
,
}
)
;
}
}
// Batch upsert for better performance
async
function
syncUsersBatch
(
users
:
User
[
]
)
{
// PostgreSQL: Use ON CONFLICT
await
prisma
.
$executeRaw
`
INSERT INTO users (id, email, name, updated_at)
SELECT * FROM UNNEST(
${
users
.
map
(
(
u
)
=>
u
.
id
)
}
::bigint[],
${
users
.
map
(
(
u
)
=>
u
.
email
)
}
::text[],
${
users
.
map
(
(
u
)
=>
u
.
name
)
}
::text[],
${
users
.
map
(
(
u
)
=>
u
.
updated_at
)
}
::timestamp[]
)
ON CONFLICT (id) DO UPDATE SET
email = EXCLUDED.email,
name = EXCLUDED.name,
updated_at = EXCLUDED.updated_at
WHERE users.updated_at < EXCLUDED.updated_at
`
;
}
Retry Logic with Exponential Backoff
async
function
syncWithRetry
<
T
>
(
operation
:
(
)
=>
Promise
<
T
>
,
maxRetries
:
number
=
3
,
baseDelay
:
number
=
1000
)
:
Promise
<
T
>
{
for
(
let
attempt
=
0
;
attempt
<=
maxRetries
;
attempt
++
)
{
try
{
return
await
operation
(
)
;
}
catch
(
error
)
{
if
(
attempt
===
maxRetries
)
throw
error
;
const
delay
=
baseDelay
*
Math
.
pow
(
2
,
attempt
)
;
console
.
log
(
`
Retry
${
attempt
+
1
}
/
${
maxRetries
}
after
${
delay
}
ms
`
)
;
await
sleep
(
delay
)
;
}
}
throw
new
Error
(
"Max retries exceeded"
)
;
}
// Usage
await
syncWithRetry
(
async
(
)
=>
{
return
await
syncOrders
(
)
;
}
,
3
,
1000
)
;
Change Data Capture (CDC)
// Listen to database changes
import
{
PrismaClient
}
from
"@prisma/client"
;
const
prisma
=
new
PrismaClient
(
)
;
// PostgreSQL: Listen to logical replication
async
function
setupCDC
(
)
{
await
prisma
.
$executeRaw
`
CREATE PUBLICATION orders_publication FOR TABLE orders;
`
;
// Subscribe to changes (using pg library)
const
client
=
await
pg
.
connect
(
)
;
client
.
query
(
"LISTEN orders_changed;"
)
;
client
.
on
(
"notification"
,
async
(
msg
)
=>
{
const
change
=
JSON
.
parse
(
msg
.
payload
)
;
if
(
change
.
operation
===
"INSERT"
||
change
.
operation
===
"UPDATE"
)
{
await
syncOrder
(
change
.
data
)
;
}
}
)
;
}
Conflict Resolution
interface
ConflictResolution
{
strategy
:
"source-wins"
|
"dest-wins"
|
"latest-wins"
|
"merge"
;
}
async
function
syncWithConflictResolution
(
sourceRecord
:
any
,
destRecord
:
any
,
strategy
:
ConflictResolution
[
"strategy"
]
)
{
if
(
strategy
===
"source-wins"
)
{
return
sourceRecord
;
}
if
(
strategy
===
"dest-wins"
)
{
return
destRecord
;
}
if
(
strategy
===
"latest-wins"
)
{
return
sourceRecord
.
updated_at
>
destRecord
.
updated_at
?
sourceRecord
:
destRecord
;
}
if
(
strategy
===
"merge"
)
{
// Merge non-null fields
return
{
...
destRecord
,
...
Object
.
fromEntries
(
Object
.
entries
(
sourceRecord
)
.
filter
(
(
[
_
,
v
]
)
=>
v
!=
null
)
)
,
}
;
}
}
Monitoring & Observability
// Track sync job metrics
interface
SyncMetrics
{
jobName
:
string
;
startTime
:
Date
;
endTime
:
Date
;
recordsProcessed
:
number
;
recordsInserted
:
number
;
recordsUpdated
:
number
;
recordsSkipped
:
number
;
errors
:
number
;
durationMs
:
number
;
}
async
function
logSyncMetrics
(
metrics
:
SyncMetrics
)
{
await
prisma
.
syncMetric
.
create
(
{
data
:
metrics
,
}
)
;
console
.
log
(
`
📊 Sync Metrics
Job:
${
metrics
.
jobName
}
Records:
${
metrics
.
recordsProcessed
}
Inserted:
${
metrics
.
recordsInserted
}
Updated:
${
metrics
.
recordsUpdated
}
Errors:
${
metrics
.
errors
}
Duration:
${
metrics
.
durationMs
}
ms
`
)
;
}
Full ETL Job Example
// jobs/sync-orders-to-warehouse.ts
export
class
OrdersETLJob
extends
ETLJob
{
constructor
(
)
{
super
(
"orders-etl"
,
"orders_warehouse_sync"
)
;
}
async
extract
(
since
:
Date
)
:
Promise
<
Order
[
]
>
{
return
prisma
.
order
.
findMany
(
{
where
:
{
updated_at
:
{
gt
:
since
}
,
}
,
include
:
{
items
:
true
,
user
:
true
,
}
,
orderBy
:
{
updated_at
:
"asc"
}
,
}
)
;
}
async
transform
(
orders
:
Order
[
]
)
:
Promise
<
WarehouseOrder
[
]
>
{
return
orders
.
map
(
(
order
)
=>
(
{
order_id
:
order
.
id
,
user_email
:
order
.
user
.
email
,
total_amount
:
order
.
total
,
item_count
:
order
.
items
.
length
,
status
:
order
.
status
,
order_date
:
order
.
created_at
,
synced_at
:
new
Date
(
)
,
}
)
)
;
}
async
load
(
data
:
WarehouseOrder
[
]
)
:
Promise
<
void
>
{
const
batchSize
=
100
;
for
(
let
i
=
0
;
i
<
data
.
length
;
i
+=
batchSize
)
{
const
batch
=
data
.
slice
(
i
,
i
+
batchSize
)
;
await
warehouseDb
.
$executeRaw
`
INSERT INTO orders_fact (
order_id, user_email, total_amount, item_count,
status, order_date, synced_at
)
VALUES
${
batch
.
map
(
(
o
)
=>
`
(
${
o
.
order_id
}
, '
${
o
.
user_email
}
',
${
o
.
total_amount
}
,
${
o
.
item_count
}
, '
${
o
.
status
}
', '
${
o
.
order_date
}
',
'
${
o
.
synced_at
}
'
)
`
)
.
join
(
","
)
}
ON CONFLICT (order_id) DO UPDATE SET
total_amount = EXCLUDED.total_amount,
status = EXCLUDED.status,
synced_at = EXCLUDED.synced_at
`
;
}
}
}
// Run job
new
OrdersETLJob
(
)
.
run
(
)
;
Scheduling
// Schedule ETL jobs
import
cron
from
"node-cron"
;
// Run every hour
cron
.
schedule
(
"0 * * * *"
,
async
(
)
=>
{
await
new
OrdersETLJob
(
)
.
run
(
)
;
}
)
;
// Run every 15 minutes
cron
.
schedule
(
"/15 * * * "
,
async
(
)
=>
{
await
syncUsersIncremental
(
)
;
}
)
;
// Run nightly at 2 AM
cron
.
schedule
(
"0 2 * * *"
,
async
(
)
=>
{
await
fullDataSync
(
)
;
}
)
;
Error Handling & Recovery
async
function
syncWithErrorHandling
(
)
{
const
checkpoint
=
await
getCheckpoint
(
)
;
let
processedRecords
=
0
;
try
{
const
records
=
await
fetchRecords
(
checkpoint
)
;
for
(
const
record
of
records
)
{
try
{
await
processRecord
(
record
)
;
processedRecords
++
;
// Save checkpoint every 100 records
if
(
processedRecords
%
100
===
0
)
{
await
saveCheckpoint
(
record
.
id
)
;
}
}
catch
(
error
)
{
// Log error but continue
console
.
error
(
`
Failed to process record
${
record
.
id
}
:
`
,
error
)
;
await
logFailedRecord
(
record
.
id
,
error
)
;
}
}
await
saveCheckpoint
(
"completed"
)
;
}
catch
(
error
)
{
// Critical failure - job will retry from checkpoint
console
.
error
(
"Job failed:"
,
error
)
;
throw
error
;
}
}
Best Practices
Incremental sync
Use watermarks, don't full-scan
Idempotent operations
Safe to retry
Batch processing
Process 100-1000 records at a time
Checkpointing
Resume from failure point
Retry with backoff
Handle transient failures
Monitor metrics
Track job health
Test thoroughly
Including failure scenarios Output Checklist ETL job class created Watermark tracking implemented Incremental sync logic Idempotent upsert operations Retry logic with backoff Conflict resolution strategy Monitoring and metrics Error handling and recovery Job scheduling configured Testing including failure cases
返回排行榜