upstash-workflow

安装量: 86
排名: #9215

安装

npx skills add https://github.com/lobehub/lobehub --skill upstash-workflow
Upstash Workflow Implementation Guide
This guide covers the standard patterns for implementing Upstash Workflow + QStash async workflows in the LobeHub codebase.
🎯 The Three Core Patterns
All workflows in LobeHub follow the same 3-layer architecture with three essential patterns:
🔍 Dry-Run Mode
- Get statistics without triggering actual execution
🌟 Fan-Out Pattern
- Split large batches into smaller chunks for parallel processing
🎯 Single Task Execution
- Each workflow execution processes
ONE item only
These patterns ensure scalable, debuggable, and cost-efficient async workflows.
Table of Contents
Architecture Overview
Core Patterns
File Structure
Implementation Patterns
Best Practices
Examples
Architecture Overview
Standard 3-Layer Pattern
All workflows follow a standard 3-layer architecture:
Layer 1: Entry Point (process-*)
├─ Validates prerequisites
├─ Calculates total items to process
├─ Filters existing items
├─ Supports dry-run mode (statistics only)
└─ Triggers Layer 2 if work needed
Layer 2: Pagination (paginate-*)
├─ Handles cursor-based pagination
├─ Implements fan-out for large batches
├─ Recursively processes all pages
└─ Triggers Layer 3 for each item
Layer 3: Single Task Execution (execute-/generate-)
└─ Performs actual business logic for ONE item
Examples
:
welcome-placeholder
,
agent-welcome
Core Patterns
1. Dry-Run Mode
Purpose
Get statistics without triggering actual execution
Pattern
:
// Layer 1: Entry Point
if
(
dryRun
)
{
console
.
log
(
'[workflow:process] Dry run mode, returning statistics only'
)
;
return
{
...
result
,
dryRun
:
true
,
message
:
`
[DryRun] Would process
${
itemsNeedingProcessing
.
length
}
items
`
,
}
;
}
Use Case
Check how many items will be processed before committing to execution
Response
:
{
success
:
true
,
totalEligible
:
100
,
toProcess
:
80
,
alreadyProcessed
:
20
,
dryRun
:
true
,
message
:
"[DryRun] Would process 80 items"
}
2. Fan-Out Pattern
Purpose
Split large batches into smaller chunks for parallel processing
Pattern
:
// Layer 2: Pagination
const
CHUNK_SIZE
=
20
;
if
(
itemIds
.
length
>
CHUNK_SIZE
)
{
// Fan-out to smaller chunks
const
chunks
=
chunk
(
itemIds
,
CHUNK_SIZE
)
;
console
.
log
(
'[workflow:paginate] Fan-out mode:'
,
{
chunks
:
chunks
.
length
,
chunkSize
:
CHUNK_SIZE
,
totalItems
:
itemIds
.
length
,
}
)
;
await
Promise
.
all
(
chunks
.
map
(
(
ids
,
idx
)
=>
context
.
run
(
`
workflow:fanout:
${
idx
+
1
}
/
${
chunks
.
length
}
`
,
(
)
=>
WorkflowClass
.
triggerPaginateItems
(
{
itemIds
:
ids
}
)
,
)
,
)
,
)
;
}
Use Case
Avoid hitting workflow step limits by splitting large batches
Configuration
:
PAGE_SIZE = 50
- Items per pagination page
CHUNK_SIZE = 20
- Items per fan-out chunk
If batch > CHUNK_SIZE, split into chunks and recursively trigger pagination
3. Single Task Execution
Purpose
Execute business logic for ONE item at a time
Pattern
:
// Layer 3: Single Task Execution
export
const
{
POST
}
=
serve
<
ExecutePayload
>
(
async
(
context
)
=>
{
const
{
itemId
}
=
context
.
requestPayload
??
{
}
;
if
(
!
itemId
)
{
return
{
success
:
false
,
error
:
'Missing itemId'
}
;
}
// Get item
const
item
=
await
context
.
run
(
'workflow:get-item'
,
async
(
)
=>
{
return
getItem
(
itemId
)
;
}
)
;
// Execute business logic for THIS item only
const
result
=
await
context
.
run
(
'workflow:execute'
,
async
(
)
=>
{
return
processItem
(
item
)
;
}
)
;
// Save result for THIS item
await
context
.
run
(
'workflow:save'
,
async
(
)
=>
{
return
saveResult
(
itemId
,
result
)
;
}
)
;
return
{
success
:
true
,
itemId
,
result
}
;
}
,
{
flowControl
:
{
key
:
'workflow.execute'
,
parallelism
:
10
,
ratePerSecond
:
5
,
}
,
}
,
)
;
Key Principles
:
Each workflow execution handles
exactly ONE item
Parallelism controlled by
flowControl
config
Multiple items processed via Layer 2 triggering multiple Layer 3 executions
File Structure
Directory Layout
src/
├── app/(backend)/api/workflows/
│ └── {workflow-name}/
│ ├── process-{entities}/route.ts # Layer 1
│ ├── paginate-{entities}/route.ts # Layer 2
│ └── execute-{entity}/route.ts # Layer 3
└── server/workflows/
└── {workflowName}/
└── index.ts # Workflow class
Cloud Project Configuration
For lobehub-cloud specific configurations (re-exports, cloud-only workflows, deployment patterns), see:
📄
Cloud Configuration Guide
Implementation Patterns
1. Workflow Class
Location
:
src/server/workflows/{workflowName}/index.ts
import
{
Client
}
from
'@upstash/workflow'
;
import
debug
from
'debug'
;
const
log
=
debug
(
'lobe-server:workflows:{workflow-name}'
)
;
// Workflow paths
const
WORKFLOW_PATHS
=
{
processItems
:
'/api/workflows/{workflow-name}/process-items'
,
paginateItems
:
'/api/workflows/{workflow-name}/paginate-items'
,
executeItem
:
'/api/workflows/{workflow-name}/execute-item'
,
}
as
const
;
// Payload types
export
interface
ProcessItemsPayload
{
dryRun
?
:
boolean
;
force
?
:
boolean
;
}
export
interface
PaginateItemsPayload
{
cursor
?
:
string
;
itemIds
?
:
string
[
]
;
// For fanout chunks
}
export
interface
ExecuteItemPayload
{
itemId
:
string
;
}
/**
* Get workflow URL using APP_URL
*/
const
getWorkflowUrl
=
(
path
:
string
)
:
string
=>
{
const
baseUrl
=
process
.
env
.
APP_URL
;
if
(
!
baseUrl
)
throw
new
Error
(
'APP_URL is required to trigger workflows'
)
;
return
new
URL
(
path
,
baseUrl
)
.
toString
(
)
;
}
;
/**
* Get workflow client
*/
const
getWorkflowClient
=
(
)
:
Client
=>
{
const
token
=
process
.
env
.
QSTASH_TOKEN
;
if
(
!
token
)
throw
new
Error
(
'QSTASH_TOKEN is required to trigger workflows'
)
;
const
config
:
ConstructorParameters
<
typeof
Client
>
[
0
]
=
{
token
}
;
if
(
process
.
env
.
QSTASH_URL
)
{
(
config
as
Record
<
string
,
unknown
>
)
.
url
=
process
.
env
.
QSTASH_URL
;
}
return
new
Client
(
config
)
;
}
;
/**
* {Workflow Name} Workflow
*/
export
class
{
WorkflowName
}
Workflow
{
private
static
client
:
Client
;
private
static
getClient
(
)
:
Client
{
if
(
!
this
.
client
)
{
this
.
client
=
getWorkflowClient
(
)
;
}
return
this
.
client
;
}
/**
* Trigger workflow to process items (entry point)
*/
static
triggerProcessItems
(
payload
:
ProcessItemsPayload
)
{
const
url
=
getWorkflowUrl
(
WORKFLOW_PATHS
.
processItems
)
;
log
(
'Triggering process-items workflow'
)
;
return
this
.
getClient
(
)
.
trigger
(
{
body
:
payload
,
url
}
)
;
}
/**
* Trigger workflow to paginate items
*/
static
triggerPaginateItems
(
payload
:
PaginateItemsPayload
)
{
const
url
=
getWorkflowUrl
(
WORKFLOW_PATHS
.
paginateItems
)
;
log
(
'Triggering paginate-items workflow'
)
;
return
this
.
getClient
(
)
.
trigger
(
{
body
:
payload
,
url
}
)
;
}
/**
* Trigger workflow to execute a single item
*/
static
triggerExecuteItem
(
payload
:
ExecuteItemPayload
)
{
const
url
=
getWorkflowUrl
(
WORKFLOW_PATHS
.
executeItem
)
;
log
(
'Triggering execute-item workflow: %s'
,
payload
.
itemId
)
;
return
this
.
getClient
(
)
.
trigger
(
{
body
:
payload
,
url
}
)
;
}
/**
* Filter items that need processing (e.g., check Redis cache, database state)
*/
static
async
filterItemsNeedingProcessing
(
itemIds
:
string
[
]
)
:
Promise
<
string
[
]
>
{
if
(
itemIds
.
length
===
0
)
return
[
]
;
// Check existing state (Redis, database, etc.)
// Return items that need processing
return
itemIds
;
}
}
2. Layer 1: Entry Point (process-*)
Purpose
Validates prerequisites, calculates statistics, supports dryRun mode
import
{
serve
}
from
'@upstash/workflow/nextjs'
;
import
{
getServerDB
}
from
'@/database/server'
;
import
{
WorkflowClass
,
type
ProcessPayload
}
from
'@/server/workflows/{workflowName}'
;
/**
* Entry workflow for
* 1. Get all eligible items
* 2. Filter items that already have results
* 3. If dryRun, return statistics only
* 4. If no items need processing, return early
* 5. Trigger paginate workflow
*/
export
const
{
POST
}
=
serve
<
ProcessPayload
>
(
async
(
context
)
=>
{
const
{
dryRun
,
force
}
=
context
.
requestPayload
??
{
}
;
console
.
log
(
'[{workflow}:process] Starting with payload:'
,
{
dryRun
,
force
}
)
;
// Get all eligible items
const
allItemIds
=
await
context
.
run
(
'{workflow}:get-all-items'
,
async
(
)
=>
{
const
db
=
await
getServerDB
(
)
;
// Query database for eligible items
return
items
.
map
(
(
item
)
=>
item
.
id
)
;
}
)
;
console
.
log
(
'[{workflow}:process] Total eligible items:'
,
allItemIds
.
length
)
;
if
(
allItemIds
.
length
===
0
)
{
return
{
success
:
true
,
totalEligible
:
0
,
message
:
'No eligible items found'
,
}
;
}
// Filter items that need processing
const
itemsNeedingProcessing
=
await
context
.
run
(
'{workflow}:filter-existing'
,
(
)
=>
WorkflowClass
.
filterItemsNeedingProcessing
(
allItemIds
)
,
)
;
const
result
=
{
success
:
true
,
totalEligible
:
allItemIds
.
length
,
toProcess
:
itemsNeedingProcessing
.
length
,
alreadyProcessed
:
allItemIds
.
length
-
itemsNeedingProcessing
.
length
,
}
;
console
.
log
(
'[{workflow}:process] Check result:'
,
result
)
;
// If dryRun mode, return statistics only
if
(
dryRun
)
{
console
.
log
(
'[{workflow}:process] Dry run mode, returning statistics only'
)
;
return
{
...
result
,
dryRun
:
true
,
message
:
`
[DryRun] Would process
${
itemsNeedingProcessing
.
length
}
items
`
,
}
;
}
// If no items need processing, return early
if
(
itemsNeedingProcessing
.
length
===
0
)
{
console
.
log
(
'[{workflow}:process] All items already processed'
)
;
return
{
...
result
,
message
:
'All items already processed'
,
}
;
}
// Trigger paginate workflow
console
.
log
(
'[{workflow}:process] Triggering paginate workflow'
)
;
await
context
.
run
(
'{workflow}:trigger-paginate'
,
(
)
=>
WorkflowClass
.
triggerPaginateItems
(
{
}
)
)
;
return
{
...
result
,
message
:
`
Triggered pagination for
${
itemsNeedingProcessing
.
length
}
items
`
,
}
;
}
,
{
flowControl
:
{
key
:
'{workflow}.process'
,
parallelism
:
1
,
ratePerSecond
:
1
,
}
,
}
,
)
;
3. Layer 2: Pagination (paginate-*)
Purpose
Handles cursor-based pagination, implements fanout for large batches
import
{
serve
}
from
'@upstash/workflow/nextjs'
;
import
{
chunk
}
from
'es-toolkit/compat'
;
import
{
getServerDB
}
from
'@/database/server'
;
import
{
WorkflowClass
,
type
PaginatePayload
}
from
'@/server/workflows/{workflowName}'
;
const
PAGE_SIZE
=
50
;
const
CHUNK_SIZE
=
20
;
/**
* Paginate items workflow - handles pagination and fanout
* 1. If specific itemIds provided (from fanout), process them directly
* 2. Otherwise, paginate through all items using cursor
* 3. Filter items that need processing
* 4. If batch > CHUNK_SIZE, fanout to smaller chunks
* 5. Trigger execute workflow for each item
* 6. Schedule next page if cursor exists
*/
export
const
{
POST
}
=
serve
<
PaginatePayload
>
(
async
(
context
)
=>
{
const
{
cursor
,
itemIds
:
payloadItemIds
}
=
context
.
requestPayload
??
{
}
;
console
.
log
(
'[{workflow}:paginate] Starting with payload:'
,
{
cursor
,
itemIdsCount
:
payloadItemIds
?.
length
??
0
,
}
)
;
// If specific itemIds are provided, process them directly (from fanout)
if
(
payloadItemIds
&&
payloadItemIds
.
length
>
0
)
{
console
.
log
(
'[{workflow}:paginate] Processing specific itemIds:'
,
{
count
:
payloadItemIds
.
length
,
}
)
;
await
Promise
.
all
(
payloadItemIds
.
map
(
(
itemId
)
=>
context
.
run
(
`
{workflow}:execute:
${
itemId
}
`
,
(
)
=>
WorkflowClass
.
triggerExecuteItem
(
{
itemId
}
)
,
)
,
)
,
)
;
return
{
success
:
true
,
processedItems
:
payloadItemIds
.
length
,
}
;
}
// Paginate through all items
const
itemBatch
=
await
context
.
run
(
'{workflow}:get-batch'
,
async
(
)
=>
{
const
db
=
await
getServerDB
(
)
;
// Query database with cursor and PAGE_SIZE
const
items
=
await
db
.
query
(
...
)
;
if
(
!
items
.
length
)
return
{
ids
:
[
]
}
;
const
last
=
items
.
at
(
-
1
)
;
return
{
ids
:
items
.
map
(
item
=>
item
.
id
)
,
cursor
:
last
?
last
.
id
:
undefined
,
}
;
}
)
;
const
batchItemIds
=
itemBatch
.
ids
;
const
nextCursor
=
'cursor'
in
itemBatch
?
itemBatch
.
cursor
:
undefined
;
console
.
log
(
'[{workflow}:paginate] Got batch:'
,
{
batchSize
:
batchItemIds
.
length
,
nextCursor
,
}
)
;
if
(
batchItemIds
.
length
===
0
)
{
console
.
log
(
'[{workflow}:paginate] No more items, pagination complete'
)
;
return
{
success
:
true
,
message
:
'Pagination complete'
}
;
}
// Filter items that need processing
const
itemIds
=
await
context
.
run
(
'{workflow}:filter-existing'
,
(
)
=>
WorkflowClass
.
filterItemsNeedingProcessing
(
batchItemIds
)
,
)
;
console
.
log
(
'[{workflow}:paginate] After filtering:'
,
{
needProcessing
:
itemIds
.
length
,
skipped
:
batchItemIds
.
length
-
itemIds
.
length
,
}
)
;
// Process items if any need processing
if
(
itemIds
.
length
>
0
)
{
if
(
itemIds
.
length
>
CHUNK_SIZE
)
{
// Fanout to smaller chunks
const
chunks
=
chunk
(
itemIds
,
CHUNK_SIZE
)
;
console
.
log
(
'[{workflow}:paginate] Fanout mode:'
,
{
chunks
:
chunks
.
length
,
chunkSize
:
CHUNK_SIZE
,
totalItems
:
itemIds
.
length
,
}
)
;
await
Promise
.
all
(
chunks
.
map
(
(
ids
,
idx
)
=>
context
.
run
(
`
{workflow}:fanout:
${
idx
+
1
}
/
${
chunks
.
length
}
`
,
(
)
=>
WorkflowClass
.
triggerPaginateItems
(
{
itemIds
:
ids
}
)
,
)
,
)
,
)
;
}
else
{
// Process directly
console
.
log
(
'[{workflow}:paginate] Processing items directly:'
,
{
count
:
itemIds
.
length
,
}
)
;
await
Promise
.
all
(
itemIds
.
map
(
(
itemId
)
=>
context
.
run
(
`
{workflow}:execute:
${
itemId
}
`
,
(
)
=>
WorkflowClass
.
triggerExecuteItem
(
{
itemId
}
)
,
)
,
)
,
)
;
}
}
// Schedule next page
if
(
nextCursor
)
{
console
.
log
(
'[{workflow}:paginate] Scheduling next page:'
,
{
nextCursor
}
)
;
await
context
.
run
(
'{workflow}:next-page'
,
(
)
=>
WorkflowClass
.
triggerPaginateItems
(
{
cursor
:
nextCursor
}
)
,
)
;
}
else
{
console
.
log
(
'[{workflow}:paginate] No more pages'
)
;
}
return
{
success
:
true
,
processedItems
:
itemIds
.
length
,
skippedItems
:
batchItemIds
.
length
-
itemIds
.
length
,
nextCursor
:
nextCursor
??
null
,
}
;
}
,
{
flowControl
:
{
key
:
'{workflow}.paginate'
,
parallelism
:
20
,
ratePerSecond
:
5
,
}
,
}
,
)
;
4. Layer 3: Execution (execute-
/generate-
)
Purpose
Performs actual business logic
import
{
serve
}
from
'@upstash/workflow/nextjs'
;
import
{
getServerDB
}
from
'@/database/server'
;
import
{
WorkflowClass
,
type
ExecutePayload
}
from
'@/server/workflows/{workflowName}'
;
/**
* Execute item workflow - performs actual business logic
* 1. Get item data
* 2. Perform business logic (AI generation, data processing, etc.)
* 3. Save results
*/
export
const
{
POST
}
=
serve
<
ExecutePayload
>
(
async
(
context
)
=>
{
const
{
itemId
}
=
context
.
requestPayload
??
{
}
;
console
.
log
(
'[{workflow}:execute] Starting:'
,
{
itemId
}
)
;
if
(
!
itemId
)
{
return
{
success
:
false
,
error
:
'Missing itemId'
}
;
}
const
db
=
await
getServerDB
(
)
;
// Get item data
const
item
=
await
context
.
run
(
'{workflow}:get-item'
,
async
(
)
=>
{
// Query database for item
return
item
;
}
)
;
if
(
!
item
)
{
return
{
success
:
false
,
error
:
'Item not found'
}
;
}
// Perform business logic
const
result
=
await
context
.
run
(
'{workflow}:process-item'
,
async
(
)
=>
{
const
workflow
=
new
WorkflowClass
(
db
,
itemId
)
;
return
workflow
.
generate
(
)
;
// or process(), execute(), etc.
}
)
;
// Save results
await
context
.
run
(
'{workflow}:save-result'
,
async
(
)
=>
{
const
workflow
=
new
WorkflowClass
(
db
,
itemId
)
;
return
workflow
.
saveToRedis
(
result
)
;
// or saveToDatabase(), etc.
}
)
;
console
.
log
(
'[{workflow}:execute] Completed:'
,
{
itemId
}
)
;
return
{
success
:
true
,
itemId
,
result
,
}
;
}
,
{
flowControl
:
{
key
:
'{workflow}.execute'
,
parallelism
:
10
,
ratePerSecond
:
5
,
}
,
}
,
)
;
Best Practices
1. Error Handling
export
const
{
POST
}
=
serve
<
Payload
>
(
async
(
context
)
=>
{
const
{
itemId
}
=
context
.
requestPayload
??
{
}
;
// Validate required parameters
if
(
!
itemId
)
{
return
{
success
:
false
,
error
:
'Missing itemId in payload'
}
;
}
try
{
// Perform work
const
result
=
await
context
.
run
(
'step-name'
,
(
)
=>
doWork
(
itemId
)
)
;
return
{
success
:
true
,
itemId
,
result
}
;
}
catch
(
error
)
{
console
.
error
(
'[workflow:error]'
,
error
)
;
return
{
success
:
false
,
error
:
error
instanceof
Error
?
error
.
message
:
'Unknown error'
}
;
}
}
,
{
flowControl
:
{
...
}
}
,
)
;
2. Logging
Use consistent log prefixes and structured logging:
console
.
log
(
'[{workflow}:{layer}] Starting with payload:'
,
payload
)
;
console
.
log
(
'[{workflow}:{layer}] Processing items:'
,
{
count
:
items
.
length
}
)
;
console
.
log
(
'[{workflow}:{layer}] Completed:'
,
result
)
;
console
.
error
(
'[{workflow}:{layer}:error]'
,
error
)
;
3. Return Values
Return consistent response shapes:
// Success response
return
{
success
:
true
,
itemId
,
result
,
message
:
'Optional success message'
,
}
;
// Error response
return
{
success
:
false
,
error
:
'Error description'
,
itemId
,
// Include context if available
}
;
// Statistics response (for entry point)
return
{
success
:
true
,
totalEligible
:
100
,
toProcess
:
80
,
alreadyProcessed
:
20
,
dryRun
:
true
,
// If applicable
message
:
'Summary message'
,
}
;
4. flowControl Configuration
Purpose
Control concurrency and rate limiting for workflow executions
Tune concurrency based on layer:
// Layer 1: Entry point - single instance only
flowControl
:
{
key
:
'{workflow}.process'
,
parallelism
:
1
,
// Only 1 process workflow at a time
ratePerSecond
:
1
,
// 1 execution per second
}
// Layer 2: Pagination - moderate concurrency
flowControl
:
{
key
:
'{workflow}.paginate'
,
parallelism
:
20
,
// Up to 20 pagination workflows in parallel
ratePerSecond
:
5
,
// 5 new executions per second
}
// Layer 3: Single task execution - high concurrency
flowControl
:
{
key
:
'{workflow}.execute'
,
parallelism
:
10
,
// Up to 10 items processed in parallel
ratePerSecond
:
5
,
// 5 new items per second
}
Guidelines
:
Layer 1
Always use
parallelism: 1
to avoid duplicate processing
Layer 2
Moderate concurrency for pagination (typically 10-20)
Layer 3
Higher concurrency for parallel item processing (typically 5-10)
Adjust
ratePerSecond
based on external API rate limits or resource constraints
5. context.run() Best Practices
Use descriptive step names with prefixes:
{workflow}:step-name
Each step should be idempotent (safe to retry)
Don't nest context.run() calls - keep them flat
Use unique step names when processing multiple items:
// Good: Unique step names
await
Promise
.
all
(
items
.
map
(
(
item
)
=>
context
.
run
(
`
{workflow}:execute:
${
item
.
id
}
`
,
(
)
=>
processItem
(
item
)
)
)
,
)
;
// Bad: Same step name for all items
await
Promise
.
all
(
items
.
map
(
(
item
)
=>
context
.
run
(
`
{workflow}:execute
`
,
(
)
=>
// ❌ Not unique
processItem
(
item
)
,
)
,
)
,
)
;
6. Payload Validation
Always validate required parameters at the start:
export
const
{
POST
}
=
serve
<
Payload
>
(
async
(
context
)
=>
{
const
{
itemId
,
configId
}
=
context
.
requestPayload
??
{
}
;
// Validate at the start
if
(
!
itemId
)
{
return
{
success
:
false
,
error
:
'Missing itemId in payload'
}
;
}
if
(
!
configId
)
{
return
{
success
:
false
,
error
:
'Missing configId in payload'
}
;
}
// Proceed with work...
}
,
{
flowControl
:
{
...
}
}
,
)
;
7. Database Connection
Get database connection once per workflow:
export
const
{
POST
}
=
serve
<
Payload
>
(
async
(
context
)
=>
{
const
db
=
await
getServerDB
(
)
;
// Get once
// Use in multiple steps
const
item
=
await
context
.
run
(
'get-item'
,
async
(
)
=>
{
return
itemModel
.
findById
(
db
,
itemId
)
;
}
)
;
const
result
=
await
context
.
run
(
'save-result'
,
async
(
)
=>
{
return
resultModel
.
create
(
db
,
result
)
;
}
)
;
}
,
{
flowControl
:
{
...
}
}
,
)
;
8. Testing
Create integration tests for workflows:
describe
(
'WorkflowName'
,
(
)
=>
{
it
(
'should process items successfully'
,
async
(
)
=>
{
// Setup test data
const
items
=
await
createTestItems
(
)
;
// Trigger workflow
await
WorkflowClass
.
triggerProcessItems
(
{
dryRun
:
false
}
)
;
// Wait for completion (use polling or webhook)
await
waitForCompletion
(
)
;
// Verify results
const
results
=
await
getResults
(
)
;
expect
(
results
)
.
toHaveLength
(
items
.
length
)
;
}
)
;
it
(
'should support dryRun mode'
,
async
(
)
=>
{
const
result
=
await
WorkflowClass
.
triggerProcessItems
(
{
dryRun
:
true
}
)
;
expect
(
result
)
.
toMatchObject
(
{
success
:
true
,
dryRun
:
true
,
totalEligible
:
expect
.
any
(
Number
)
,
toProcess
:
expect
.
any
(
Number
)
,
}
)
;
}
)
;
}
)
;
Examples
Example 1: Welcome Placeholder
Use Case
Generate AI-powered welcome placeholders for users
Structure
:
Layer 1:
process-users
- Entry point, checks eligible users
Layer 2:
paginate-users
- Paginates through active users
Layer 3:
generate-user
-
Generates placeholders for ONE user
Core Patterns Demonstrated
:
Dry-Run Mode
:
// Layer 1: process-users
if
(
dryRun
)
{
return
{
...
result
,
dryRun
:
true
,
message
:
`
[DryRun] Would process
${
usersNeedingGeneration
.
length
}
users
`
,
}
;
}
Fan-Out Pattern
:
// Layer 2: paginate-users
if
(
userIds
.
length
>
CHUNK_SIZE
)
{
const
chunks
=
chunk
(
userIds
,
CHUNK_SIZE
)
;
await
Promise
.
all
(
chunks
.
map
(
(
ids
,
idx
)
=>
context
.
run
(
`
welcome-placeholder:fanout:
${
idx
+
1
}
/
${
chunks
.
length
}
`
,
(
)
=>
WelcomePlaceholderWorkflow
.
triggerPaginateUsers
(
{
userIds
:
ids
}
)
,
)
,
)
,
)
;
}
Single Task Execution
:
// Layer 3: generate-user
export
const
{
POST
}
=
serve
<
GenerateUserPlaceholderPayload
>
(
async
(
context
)
=>
{
const
{
userId
}
=
context
.
requestPayload
??
{
}
;
// Execute for ONE user only
const
workflow
=
new
WelcomePlaceholderWorkflow
(
db
,
userId
)
;
const
placeholders
=
await
context
.
run
(
'generate'
,
(
)
=>
workflow
.
generate
(
)
)
;
return
{
success
:
true
,
userId
,
placeholdersCount
:
placeholders
.
length
}
;
}
)
;
Key Features
:
✅ Filters users who already have cached placeholders in Redis
✅ Supports
paidOnly
flag to process only subscribed users
✅ Supports
dryRun
mode for statistics
✅ Uses fan-out for large user batches (CHUNK_SIZE=20)
✅ Each execution processes exactly ONE user
Files
:
/api/workflows/welcome-placeholder/process-users/route.ts
/api/workflows/welcome-placeholder/paginate-users/route.ts
/api/workflows/welcome-placeholder/generate-user/route.ts
/server/workflows/welcomePlaceholder/index.ts
Example 2: Agent Welcome
Use Case
Generate welcome messages and open questions for AI agents
Structure
:
Layer 1:
process-agents
- Entry point, checks eligible agents
Layer 2:
paginate-agents
- Paginates through active agents
Layer 3:
generate-agent
-
Generates welcome data for ONE agent
Core Patterns Demonstrated
:
Dry-Run Mode
:
// Layer 1: process-agents
if
(
dryRun
)
{
return
{
...
result
,
dryRun
:
true
,
message
:
`
[DryRun] Would process
${
agentsNeedingGeneration
.
length
}
agents
`
,
}
;
}
Fan-Out Pattern
Same as welcome-placeholder Single Task Execution : // Layer 3: generate-agent export const { POST } = serve < GenerateAgentWelcomePayload

( async ( context ) => { const { agentId } = context . requestPayload ?? { } ; // Execute for ONE agent only const workflow = new AgentWelcomeWorkflow ( db , agentId ) ; const data = await context . run ( 'generate' , ( ) => workflow . generate ( ) ) ; return { success : true , agentId , data } ; } ) ; Key Features : ✅ Filters agents who already have cached data in Redis ✅ Supports paidOnly flag for subscribed users' agents only ✅ Supports dryRun mode for statistics ✅ Uses fan-out for large agent batches (CHUNK_SIZE=20) ✅ Each execution processes exactly ONE agent Files : /api/workflows/agent-welcome/process-agents/route.ts /api/workflows/agent-welcome/paginate-agents/route.ts /api/workflows/agent-welcome/generate-agent/route.ts /server/workflows/agentWelcome/index.ts Key Takeaways from Examples Both workflows follow the exact same pattern : Layer 1 (Entry Point): Calculate statistics Filter existing items Support dry-run mode Trigger pagination only if needed Layer 2 (Pagination): Paginate with cursor (PAGE_SIZE=50) Fan-out large batches (CHUNK_SIZE=20) Trigger Layer 3 for each item Recursively process all pages Layer 3 (Execution): Process ONE item per execution Perform business logic Save results Return success/failure The only differences are: Entity type (users vs agents) Business logic (placeholder generation vs welcome generation) Data source (different database queries) Common Pitfalls ❌ Don't: Use context.run() without unique names // Bad: Same step name when processing multiple items await Promise . all ( items . map ( ( item ) => context . run ( 'process' , ( ) => process ( item ) ) ) ) ; // Good: Unique step names await Promise . all ( items . map ( ( item ) => context . run ( process: ${ item . id } , ( ) => process ( item ) ) ) ) ; ❌ Don't: Forget to validate payload parameters // Bad: No validation export const { POST } = serve < Payload

( async ( context ) => { const { itemId } = context . requestPayload ?? { } ; const result = await process ( itemId ) ; // May fail with undefined } ) ; // Good: Validate early export const { POST } = serve < Payload

( async ( context ) => { const { itemId } = context . requestPayload ?? { } ; if ( ! itemId ) { return { success : false , error : 'Missing itemId' } ; } const result = await process ( itemId ) ; } ) ; ❌ Don't: Skip filtering existing items // Bad: No filtering, may duplicate work const allItems = await getAllItems ( ) ; await Promise . all ( allItems . map ( ( item ) => triggerExecute ( item ) ) ) ; // Good: Filter existing items first const allItems = await getAllItems ( ) ; const itemsNeedingProcessing = await filterExisting ( allItems ) ; await Promise . all ( itemsNeedingProcessing . map ( ( item ) => triggerExecute ( item ) ) ) ; ❌ Don't: Use inconsistent logging // Bad: Inconsistent prefixes and formats console . log ( 'Starting workflow' ) ; log . info ( 'Processing item:' , itemId ) ; console . log ( Done with ${ itemId } ) ; // Good: Consistent structured logging console . log ( '[workflow:layer] Starting with payload:' , payload ) ; console . log ( '[workflow:layer] Processing item:' , { itemId } ) ; console . log ( '[workflow:layer] Completed:' , { itemId , result } ) ; Environment Variables Required

Required for all workflows

APP_URL

https://your-app.com

Base URL for workflow endpoints

QSTASH_TOKEN

qstash_xxx

QStash authentication token

Optional (for custom QStash URL)

QSTASH_URL

https://custom-qstash.com

Custom QStash endpoint

Checklist for New Workflows
Planning Phase
Identify entity to process (users, agents, items, etc.)
Define business logic for single item execution
Determine filtering logic (Redis cache, database state, etc.)
Implementation Phase
Define payload types with proper TypeScript interfaces
Create workflow class with static trigger methods
Layer 1
Implement entry point with
dry-run
support
Layer 1
Add filtering logic to avoid duplicate work
Layer 2
Implement pagination with
fan-out
logic
Layer 3
Implement single task execution (ONE item per run) Configure appropriate flowControl for each layer Add consistent logging with workflow prefixes Validate all required payload parameters Use unique context.run() step names Quality & Deployment Return consistent response shapes Configure cloud deployment (see Cloud Guide if using lobehub-cloud) Write integration tests Test with dry-run mode first Test with small batch before full rollout Additional Resources Upstash Workflow Documentation QStash Documentation Example Workflows in Codebase Workflow Classes
返回排行榜