Upstash Workflow Implementation Guide
This guide covers the standard patterns for implementing Upstash Workflow + QStash async workflows in the LobeHub codebase.
🎯 The Three Core Patterns
All workflows in LobeHub follow the same 3-layer architecture with three essential patterns:
-
🔍 Dry-Run Mode - Get statistics without triggering actual execution
-
🌟 Fan-Out Pattern - Split large batches into smaller chunks for parallel processing
-
🎯 Single Task Execution - Each workflow execution processes ONE item only
These patterns ensure scalable, debuggable, and cost-efficient async workflows.
Table of Contents
-
Architecture Overview
-
Core Patterns
-
File Structure
-
Implementation Patterns
-
Best Practices
-
Examples
Architecture Overview
Standard 3-Layer Pattern
All workflows follow a standard 3-layer architecture:
Layer 1: Entry Point (process-*) ├─ Validates prerequisites ├─ Calculates total items to process ├─ Filters existing items ├─ Supports dry-run mode (statistics only) └─ Triggers Layer 2 if work needed
Layer 2: Pagination (paginate-*) ├─ Handles cursor-based pagination ├─ Implements fan-out for large batches ├─ Recursively processes all pages └─ Triggers Layer 3 for each item
Layer 3: Single Task Execution (execute-/generate-) └─ Performs actual business logic for ONE item
Examples: welcome-placeholder , agent-welcome
Core Patterns
- Dry-Run Mode
Purpose: Get statistics without triggering actual execution
Pattern:
// Layer 1: Entry Point
if (dryRun) {
console.log('[workflow:process] Dry run mode, returning statistics only');
return {
...result,
dryRun: true,
message: [DryRun] Would process ${itemsNeedingProcessing.length} items,
};
}
Use Case: Check how many items will be processed before committing to execution
Response:
{ success: true, totalEligible: 100, toProcess: 80, alreadyProcessed: 20, dryRun: true, message: "[DryRun] Would process 80 items" }
- Fan-Out Pattern
Purpose: Split large batches into smaller chunks for parallel processing
Pattern:
// Layer 2: Pagination const CHUNK_SIZE = 20;
if (itemIds.length > CHUNK_SIZE) { // Fan-out to smaller chunks const chunks = chunk(itemIds, CHUNK_SIZE); console.log('[workflow:paginate] Fan-out mode:', { chunks: chunks.length, chunkSize: CHUNK_SIZE, totalItems: itemIds.length, });
await Promise.all(
chunks.map((ids, idx) =>
context.run(workflow:fanout:${idx + 1}/${chunks.length}, () =>
WorkflowClass.triggerPaginateItems({ itemIds: ids }),
),
),
);
}
Use Case: Avoid hitting workflow step limits by splitting large batches
Configuration:
-
PAGE_SIZE = 50
-
Items per pagination page
-
CHUNK_SIZE = 20
-
Items per fan-out chunk
-
If batch > CHUNK_SIZE, split into chunks and recursively trigger pagination
- Single Task Execution
Purpose: Execute business logic for ONE item at a time
Pattern:
// Layer 3: Single Task Execution export const { POST } = serve<ExecutePayload>( async (context) => { const { itemId } = context.requestPayload ?? {};
if (!itemId) {
return { success: false, error: 'Missing itemId' };
}
// Get item
const item = await context.run('workflow:get-item', async () => {
return getItem(itemId);
});
// Execute business logic for THIS item only
const result = await context.run('workflow:execute', async () => {
return processItem(item);
});
// Save result for THIS item
await context.run('workflow:save', async () => {
return saveResult(itemId, result);
});
return { success: true, itemId, result };
}, { flowControl: { key: 'workflow.execute', parallelism: 10, ratePerSecond: 5, }, }, );
Key Principles:
-
Each workflow execution handles exactly ONE item
-
Parallelism controlled by flowControl config
-
Multiple items processed via Layer 2 triggering multiple Layer 3 executions
File Structure
Directory Layout
src/ ├── app/(backend)/api/workflows/ │ └── {workflow-name}/ │ ├── process-{entities}/route.ts # Layer 1 │ ├── paginate-{entities}/route.ts # Layer 2 │ └── execute-{entity}/route.ts # Layer 3 │ └── server/workflows/ └── {workflowName}/ └── index.ts # Workflow class
Cloud Project Configuration
For lobehub-cloud specific configurations (re-exports, cloud-only workflows, deployment patterns), see:
📄 Cloud Configuration Guide
Implementation Patterns
- Workflow Class
Location: src/server/workflows/{workflowName}/index.ts
import { Client } from '@upstash/workflow'; import debug from 'debug';
const log = debug('lobe-server:workflows:{workflow-name}');
// Workflow paths const WORKFLOW_PATHS = { processItems: '/api/workflows/{workflow-name}/process-items', paginateItems: '/api/workflows/{workflow-name}/paginate-items', executeItem: '/api/workflows/{workflow-name}/execute-item', } as const;
// Payload types export interface ProcessItemsPayload { dryRun?: boolean; force?: boolean; }
export interface PaginateItemsPayload { cursor?: string; itemIds?: string[]; // For fanout chunks }
export interface ExecuteItemPayload { itemId: string; }
/**
- Get workflow URL using APP_URL */ const getWorkflowUrl = (path: string): string => { const baseUrl = process.env.APP_URL; if (!baseUrl) throw new Error('APP_URL is required to trigger workflows'); return new URL(path, baseUrl).toString(); };
/**
- Get workflow client */ const getWorkflowClient = (): Client => { const token = process.env.QSTASH_TOKEN; if (!token) throw new Error('QSTASH_TOKEN is required to trigger workflows');
const config: ConstructorParameters<typeof Client>[0] = { token }; if (process.env.QSTASH_URL) { (config as Record<string, unknown>).url = process.env.QSTASH_URL; } return new Client(config); };
/**
- {Workflow Name} Workflow */ export class {WorkflowName}Workflow { private static client: Client;
private static getClient(): Client { if (!this.client) { this.client = getWorkflowClient(); } return this.client; }
/**
- Trigger workflow to process items (entry point) */ static triggerProcessItems(payload: ProcessItemsPayload) { const url = getWorkflowUrl(WORKFLOW_PATHS.processItems); log('Triggering process-items workflow'); return this.getClient().trigger({ body: payload, url }); }
/**
- Trigger workflow to paginate items */ static triggerPaginateItems(payload: PaginateItemsPayload) { const url = getWorkflowUrl(WORKFLOW_PATHS.paginateItems); log('Triggering paginate-items workflow'); return this.getClient().trigger({ body: payload, url }); }
/**
- Trigger workflow to execute a single item */ static triggerExecuteItem(payload: ExecuteItemPayload) { const url = getWorkflowUrl(WORKFLOW_PATHS.executeItem); log('Triggering execute-item workflow: %s', payload.itemId); return this.getClient().trigger({ body: payload, url }); }
/**
- Filter items that need processing (e.g., check Redis cache, database state) */ static async filterItemsNeedingProcessing(itemIds: string[]): Promise<string[]> { if (itemIds.length === 0) return [];
// Check existing state (Redis, database, etc.)
// Return items that need processing
return itemIds;
} }
- Layer 1: Entry Point (process-*)
Purpose: Validates prerequisites, calculates statistics, supports dryRun mode
import { serve } from '@upstash/workflow/nextjs'; import { getServerDB } from '@/database/server'; import { WorkflowClass, type ProcessPayload } from '@/server/workflows/{workflowName}';
/**
-
Entry workflow for {workflow description}
-
- Get all eligible items
-
- Filter items that already have results
-
- If dryRun, return statistics only
-
- If no items need processing, return early
-
- Trigger paginate workflow */ export const { POST } = serve<ProcessPayload>( async (context) => { const { dryRun, force } = context.requestPayload ?? {};
console.log('[{workflow}:process] Starting with payload:', { dryRun, force });
// Get all eligible items const allItemIds = await context.run('{workflow}:get-all-items', async () => { const db = await getServerDB(); // Query database for eligible items return items.map((item) => item.id); });
console.log('[{workflow}:process] Total eligible items:', allItemIds.length);
if (allItemIds.length === 0) { return { success: true, totalEligible: 0, message: 'No eligible items found', }; }
// Filter items that need processing const itemsNeedingProcessing = await context.run('{workflow}:filter-existing', () => WorkflowClass.filterItemsNeedingProcessing(allItemIds), );
const result = { success: true, totalEligible: allItemIds.length, toProcess: itemsNeedingProcessing.length, alreadyProcessed: allItemIds.length - itemsNeedingProcessing.length, };
console.log('[{workflow}:process] Check result:', result);
// If dryRun mode, return statistics only if (dryRun) { console.log('[{workflow}:process] Dry run mode, returning statistics only'); return { ...result, dryRun: true, message:
[DryRun] Would process ${itemsNeedingProcessing.length} items, }; }// If no items need processing, return early if (itemsNeedingProcessing.length === 0) { console.log('[{workflow}:process] All items already processed'); return { ...result, message: 'All items already processed', }; }
// Trigger paginate workflow console.log('[{workflow}:process] Triggering paginate workflow'); await context.run('{workflow}:trigger-paginate', () => WorkflowClass.triggerPaginateItems({}));
return { ...result, message:
Triggered pagination for ${itemsNeedingProcessing.length} items, }; }, { flowControl: { key: '{workflow}.process', parallelism: 1, ratePerSecond: 1, }, }, );
- Layer 2: Pagination (paginate-*)
Purpose: Handles cursor-based pagination, implements fanout for large batches
import { serve } from '@upstash/workflow/nextjs'; import { chunk } from 'es-toolkit/compat'; import { getServerDB } from '@/database/server'; import { WorkflowClass, type PaginatePayload } from '@/server/workflows/{workflowName}';
const PAGE_SIZE = 50; const CHUNK_SIZE = 20;
/**
-
Paginate items workflow - handles pagination and fanout
-
- If specific itemIds provided (from fanout), process them directly
-
- Otherwise, paginate through all items using cursor
-
- Filter items that need processing
-
- If batch > CHUNK_SIZE, fanout to smaller chunks
-
- Trigger execute workflow for each item
-
- Schedule next page if cursor exists */ export const { POST } = serve<PaginatePayload>( async (context) => { const { cursor, itemIds: payloadItemIds } = context.requestPayload ?? {};
console.log('[{workflow}:paginate] Starting with payload:', { cursor, itemIdsCount: payloadItemIds?.length ?? 0, });
// If specific itemIds are provided, process them directly (from fanout) if (payloadItemIds && payloadItemIds.length > 0) { console.log('[{workflow}:paginate] Processing specific itemIds:', { count: payloadItemIds.length, });
await Promise.all( payloadItemIds.map((itemId) => context.run(
{workflow}:execute:${itemId}, () => WorkflowClass.triggerExecuteItem({ itemId }), ), ), );return { success: true, processedItems: payloadItemIds.length, }; }
// Paginate through all items const itemBatch = await context.run('{workflow}:get-batch', async () => { const db = await getServerDB(); // Query database with cursor and PAGE_SIZE const items = await db.query(...);
if (!items.length) return { ids: [] };
const last = items.at(-1); return { ids: items.map(item => item.id), cursor: last ? last.id : undefined, }; });
const batchItemIds = itemBatch.ids; const nextCursor = 'cursor' in itemBatch ? itemBatch.cursor : undefined;
console.log('[{workflow}:paginate] Got batch:', { batchSize: batchItemIds.length, nextCursor, });
if (batchItemIds.length === 0) { console.log('[{workflow}:paginate] No more items, pagination complete'); return { success: true, message: 'Pagination complete' }; }
// Filter items that need processing const itemIds = await context.run('{workflow}:filter-existing', () => WorkflowClass.filterItemsNeedingProcessing(batchItemIds), );
console.log('[{workflow}:paginate] After filtering:', { needProcessing: itemIds.length, skipped: batchItemIds.length - itemIds.length, });
// Process items if any need processing if (itemIds.length > 0) { if (itemIds.length > CHUNK_SIZE) { // Fanout to smaller chunks const chunks = chunk(itemIds, CHUNK_SIZE); console.log('[{workflow}:paginate] Fanout mode:', { chunks: chunks.length, chunkSize: CHUNK_SIZE, totalItems: itemIds.length, });
await Promise.all( chunks.map((ids, idx) => context.run(`{workflow}:fanout:${idx + 1}/${chunks.length}`, () => WorkflowClass.triggerPaginateItems({ itemIds: ids }), ), ), );} else { // Process directly console.log('[{workflow}:paginate] Processing items directly:', { count: itemIds.length, });
await Promise.all( itemIds.map((itemId) => context.run(`{workflow}:execute:${itemId}`, () => WorkflowClass.triggerExecuteItem({ itemId }), ), ), );} }
// Schedule next page if (nextCursor) { console.log('[{workflow}:paginate] Scheduling next page:', { nextCursor }); await context.run('{workflow}:next-page', () => WorkflowClass.triggerPaginateItems({ cursor: nextCursor }), ); } else { console.log('[{workflow}:paginate] No more pages'); }
return { success: true, processedItems: itemIds.length, skippedItems: batchItemIds.length - itemIds.length, nextCursor: nextCursor ?? null, }; }, { flowControl: { key: '{workflow}.paginate', parallelism: 20, ratePerSecond: 5, }, }, );
- Layer 3: Execution (execute-/generate-)
Purpose: Performs actual business logic
import { serve } from '@upstash/workflow/nextjs'; import { getServerDB } from '@/database/server'; import { WorkflowClass, type ExecutePayload } from '@/server/workflows/{workflowName}';
/**
-
Execute item workflow - performs actual business logic
-
- Get item data
-
- Perform business logic (AI generation, data processing, etc.)
-
- Save results */ export const { POST } = serve<ExecutePayload>( async (context) => { const { itemId } = context.requestPayload ?? {};
console.log('[{workflow}:execute] Starting:', { itemId });
if (!itemId) { return { success: false, error: 'Missing itemId' }; }
const db = await getServerDB();
// Get item data const item = await context.run('{workflow}:get-item', async () => { // Query database for item return item; });
if (!item) { return { success: false, error: 'Item not found' }; }
// Perform business logic const result = await context.run('{workflow}:process-item', async () => { const workflow = new WorkflowClass(db, itemId); return workflow.generate(); // or process(), execute(), etc. });
// Save results await context.run('{workflow}:save-result', async () => { const workflow = new WorkflowClass(db, itemId); return workflow.saveToRedis(result); // or saveToDatabase(), etc. });
console.log('[{workflow}:execute] Completed:', { itemId });
return { success: true, itemId, result, }; }, { flowControl: { key: '{workflow}.execute', parallelism: 10, ratePerSecond: 5, }, }, );
Best Practices
- Error Handling
export const { POST } = serve<Payload>( async (context) => { const { itemId } = context.requestPayload ?? {};
// Validate required parameters
if (!itemId) {
return { success: false, error: 'Missing itemId in payload' };
}
try {
// Perform work
const result = await context.run('step-name', () => doWork(itemId));
return { success: true, itemId, result };
} catch (error) {
console.error('[workflow:error]', error);
return {
success: false,
error: error instanceof Error ? error.message : 'Unknown error'
};
}
}, { flowControl: { ... } }, );
- Logging
Use consistent log prefixes and structured logging:
console.log('[{workflow}:{layer}] Starting with payload:', payload); console.log('[{workflow}:{layer}] Processing items:', { count: items.length }); console.log('[{workflow}:{layer}] Completed:', result); console.error('[{workflow}:{layer}:error]', error);
- Return Values
Return consistent response shapes:
// Success response return { success: true, itemId, result, message: 'Optional success message', };
// Error response return { success: false, error: 'Error description', itemId, // Include context if available };
// Statistics response (for entry point) return { success: true, totalEligible: 100, toProcess: 80, alreadyProcessed: 20, dryRun: true, // If applicable message: 'Summary message', };
- flowControl Configuration
Purpose: Control concurrency and rate limiting for workflow executions
Tune concurrency based on layer:
// Layer 1: Entry point - single instance only flowControl: { key: '{workflow}.process', parallelism: 1, // Only 1 process workflow at a time ratePerSecond: 1, // 1 execution per second }
// Layer 2: Pagination - moderate concurrency flowControl: { key: '{workflow}.paginate', parallelism: 20, // Up to 20 pagination workflows in parallel ratePerSecond: 5, // 5 new executions per second }
// Layer 3: Single task execution - high concurrency flowControl: { key: '{workflow}.execute', parallelism: 10, // Up to 10 items processed in parallel ratePerSecond: 5, // 5 new items per second }
Guidelines:
-
Layer 1: Always use parallelism: 1 to avoid duplicate processing
-
Layer 2: Moderate concurrency for pagination (typically 10-20)
-
Layer 3: Higher concurrency for parallel item processing (typically 5-10)
-
Adjust ratePerSecond based on external API rate limits or resource constraints
- context.run() Best Practices
-
Use descriptive step names with prefixes: {workflow}:step-name
-
Each step should be idempotent (safe to retry)
-
Don't nest context.run() calls - keep them flat
-
Use unique step names when processing multiple items:
// Good: Unique step names
await Promise.all(
items.map((item) => context.run({workflow}:execute:${item.id}, () => processItem(item))),
);
// Bad: Same step name for all items
await Promise.all(
items.map((item) =>
context.run({workflow}:execute, () =>
// ❌ Not unique
processItem(item),
),
),
);
- Payload Validation
Always validate required parameters at the start:
export const { POST } = serve<Payload>( async (context) => { const { itemId, configId } = context.requestPayload ?? {};
// Validate at the start
if (!itemId) {
return { success: false, error: 'Missing itemId in payload' };
}
if (!configId) {
return { success: false, error: 'Missing configId in payload' };
}
// Proceed with work...
}, { flowControl: { ... } }, );
- Database Connection
Get database connection once per workflow:
export const { POST } = serve<Payload>( async (context) => { const db = await getServerDB(); // Get once
// Use in multiple steps
const item = await context.run('get-item', async () => {
return itemModel.findById(db, itemId);
});
const result = await context.run('save-result', async () => {
return resultModel.create(db, result);
});
}, { flowControl: { ... } }, );
- Testing
Create integration tests for workflows:
describe('WorkflowName', () => { it('should process items successfully', async () => { // Setup test data const items = await createTestItems();
// Trigger workflow
await WorkflowClass.triggerProcessItems({ dryRun: false });
// Wait for completion (use polling or webhook)
await waitForCompletion();
// Verify results
const results = await getResults();
expect(results).toHaveLength(items.length);
});
it('should support dryRun mode', async () => { const result = await WorkflowClass.triggerProcessItems({ dryRun: true });
expect(result).toMatchObject({
success: true,
dryRun: true,
totalEligible: expect.any(Number),
toProcess: expect.any(Number),
});
}); });
Examples
Example 1: Welcome Placeholder
Use Case: Generate AI-powered welcome placeholders for users
Structure:
-
Layer 1: process-users
-
Entry point, checks eligible users
-
Layer 2: paginate-users
-
Paginates through active users
-
Layer 3: generate-user
-
Generates placeholders for ONE user
Core Patterns Demonstrated:
- Dry-Run Mode:
// Layer 1: process-users
if (dryRun) {
return {
...result,
dryRun: true,
message: [DryRun] Would process ${usersNeedingGeneration.length} users,
};
}
- Fan-Out Pattern:
// Layer 2: paginate-users
if (userIds.length > CHUNK_SIZE) {
const chunks = chunk(userIds, CHUNK_SIZE);
await Promise.all(
chunks.map((ids, idx) =>
context.run(welcome-placeholder:fanout:${idx + 1}/${chunks.length}, () =>
WelcomePlaceholderWorkflow.triggerPaginateUsers({ userIds: ids }),
),
),
);
}
- Single Task Execution:
// Layer 3: generate-user export const { POST } = serve<GenerateUserPlaceholderPayload>(async (context) => { const { userId } = context.requestPayload ?? {};
// Execute for ONE user only const workflow = new WelcomePlaceholderWorkflow(db, userId); const placeholders = await context.run('generate', () => workflow.generate());
return { success: true, userId, placeholdersCount: placeholders.length }; });
Key Features:
-
✅ Filters users who already have cached placeholders in Redis
-
✅ Supports paidOnly flag to process only subscribed users
-
✅ Supports dryRun mode for statistics
-
✅ Uses fan-out for large user batches (CHUNK_SIZE=20)
-
✅ Each execution processes exactly ONE user
Files:
-
/api/workflows/welcome-placeholder/process-users/route.ts
-
/api/workflows/welcome-placeholder/paginate-users/route.ts
-
/api/workflows/welcome-placeholder/generate-user/route.ts
-
/server/workflows/welcomePlaceholder/index.ts
Example 2: Agent Welcome
Use Case: Generate welcome messages and open questions for AI agents
Structure:
-
Layer 1: process-agents
-
Entry point, checks eligible agents
-
Layer 2: paginate-agents
-
Paginates through active agents
-
Layer 3: generate-agent
-
Generates welcome data for ONE agent
Core Patterns Demonstrated:
- Dry-Run Mode:
// Layer 1: process-agents
if (dryRun) {
return {
...result,
dryRun: true,
message: [DryRun] Would process ${agentsNeedingGeneration.length} agents,
};
}
Fan-Out Pattern: Same as welcome-placeholder
Single Task Execution:
// Layer 3: generate-agent export const { POST } = serve<GenerateAgentWelcomePayload>(async (context) => { const { agentId } = context.requestPayload ?? {};
// Execute for ONE agent only const workflow = new AgentWelcomeWorkflow(db, agentId); const data = await context.run('generate', () => workflow.generate());
return { success: true, agentId, data }; });
Key Features:
-
✅ Filters agents who already have cached data in Redis
-
✅ Supports paidOnly flag for subscribed users' agents only
-
✅ Supports dryRun mode for statistics
-
✅ Uses fan-out for large agent batches (CHUNK_SIZE=20)
-
✅ Each execution processes exactly ONE agent
Files:
-
/api/workflows/agent-welcome/process-agents/route.ts
-
/api/workflows/agent-welcome/paginate-agents/route.ts
-
/api/workflows/agent-welcome/generate-agent/route.ts
-
/server/workflows/agentWelcome/index.ts
Key Takeaways from Examples
Both workflows follow the exact same pattern:
Layer 1 (Entry Point):
-
Calculate statistics
-
Filter existing items
-
Support dry-run mode
-
Trigger pagination only if needed
Layer 2 (Pagination):
-
Paginate with cursor (PAGE_SIZE=50)
-
Fan-out large batches (CHUNK_SIZE=20)
-
Trigger Layer 3 for each item
-
Recursively process all pages
Layer 3 (Execution):
-
Process ONE item per execution
-
Perform business logic
-
Save results
-
Return success/failure
The only differences are:
-
Entity type (users vs agents)
-
Business logic (placeholder generation vs welcome generation)
-
Data source (different database queries)
Common Pitfalls
❌ Don't: Use context.run() without unique names
// Bad: Same step name when processing multiple items await Promise.all(items.map((item) => context.run('process', () => process(item))));
// Good: Unique step names
await Promise.all(items.map((item) => context.run(process:${item.id}, () => process(item))));
❌ Don't: Forget to validate payload parameters
// Bad: No validation export const { POST } = serve<Payload>(async (context) => { const { itemId } = context.requestPayload ?? {}; const result = await process(itemId); // May fail with undefined });
// Good: Validate early export const { POST } = serve<Payload>(async (context) => { const { itemId } = context.requestPayload ?? {};
if (!itemId) { return { success: false, error: 'Missing itemId' }; }
const result = await process(itemId); });
❌ Don't: Skip filtering existing items
// Bad: No filtering, may duplicate work const allItems = await getAllItems(); await Promise.all(allItems.map((item) => triggerExecute(item)));
// Good: Filter existing items first const allItems = await getAllItems(); const itemsNeedingProcessing = await filterExisting(allItems); await Promise.all(itemsNeedingProcessing.map((item) => triggerExecute(item)));
❌ Don't: Use inconsistent logging
// Bad: Inconsistent prefixes and formats
console.log('Starting workflow');
log.info('Processing item:', itemId);
console.log(Done with ${itemId});
// Good: Consistent structured logging console.log('[workflow:layer] Starting with payload:', payload); console.log('[workflow:layer] Processing item:', { itemId }); console.log('[workflow:layer] Completed:', { itemId, result });
Environment Variables Required
Required for all workflows
APP_URL=https://your-app.com # Base URL for workflow endpoints QSTASH_TOKEN=qstash_xxx # QStash authentication token
Optional (for custom QStash URL)
QSTASH_URL=https://custom-qstash.com # Custom QStash endpoint
Checklist for New Workflows
Planning Phase
-
Identify entity to process (users, agents, items, etc.)
-
Define business logic for single item execution
-
Determine filtering logic (Redis cache, database state, etc.)
Implementation Phase
-
Define payload types with proper TypeScript interfaces
-
Create workflow class with static trigger methods
-
Layer 1: Implement entry point with dry-run support
-
Layer 1: Add filtering logic to avoid duplicate work
-
Layer 2: Implement pagination with fan-out logic
-
Layer 3: Implement single task execution (ONE item per run)
-
Configure appropriate flowControl for each layer
-
Add consistent logging with workflow prefixes
-
Validate all required payload parameters
-
Use unique context.run() step names
Quality & Deployment
-
Return consistent response shapes
-
Configure cloud deployment (see Cloud Guide if using lobehub-cloud)
-
Write integration tests
-
Test with dry-run mode first
-
Test with small batch before full rollout
Additional Resources
-
Upstash Workflow Documentation
-
QStash Documentation
-
Example Workflows in Codebase
-
Workflow Classes