queue-job-processor

安装量: 48
排名: #15546

安装

npx skills add https://github.com/patricio0312rev/skills --skill queue-job-processor

Queue Job Processor

Build robust background job processing with BullMQ and Redis.

Core Workflow Setup Redis: Configure connection Create queues: Define job queues Implement workers: Process jobs Add job types: Type-safe job definitions Configure retries: Handle failures Add monitoring: Dashboard and alerts Installation npm install bullmq ioredis npm install -D @types/ioredis

Redis Connection // lib/redis.ts import IORedis from 'ioredis';

export const redis = new IORedis(process.env.REDIS_URL!, { maxRetriesPerRequest: null, // Required for BullMQ enableReadyCheck: false, });

export const redisSubscriber = new IORedis(process.env.REDIS_URL!, { maxRetriesPerRequest: null, enableReadyCheck: false, });

Queue Setup Define Job Types // jobs/types.ts export interface EmailJobData { to: string; subject: string; template: string; variables: Record; }

export interface ImageProcessingJobData { imageId: string; userId: string; operations: Array<{ type: 'resize' | 'crop' | 'watermark'; params: Record; }>; }

export interface ReportJobData { reportId: string; userId: string; type: 'daily' | 'weekly' | 'monthly'; dateRange: { start: string; end: string; }; }

export interface WebhookJobData { url: string; payload: Record; headers?: Record; retryCount?: number; }

export type JobData = | { type: 'email'; data: EmailJobData } | { type: 'image-processing'; data: ImageProcessingJobData } | { type: 'report'; data: ReportJobData } | { type: 'webhook'; data: WebhookJobData };

Create Queues // queues/index.ts import { Queue, QueueOptions } from 'bullmq'; import { redis } from '../lib/redis'; import { EmailJobData, ImageProcessingJobData, ReportJobData, WebhookJobData, } from './types';

const defaultOptions: QueueOptions = { connection: redis, defaultJobOptions: { attempts: 3, backoff: { type: 'exponential', delay: 1000, }, removeOnComplete: { count: 1000, // Keep last 1000 completed jobs age: 24 * 3600, // Keep for 24 hours }, removeOnFail: { count: 5000, // Keep last 5000 failed jobs }, }, };

export const emailQueue = new Queue('email', defaultOptions);

export const imageQueue = new Queue('image-processing', { ...defaultOptions, defaultJobOptions: { ...defaultOptions.defaultJobOptions, attempts: 5, timeout: 5 * 60 * 1000, // 5 minutes }, });

export const reportQueue = new Queue('reports', { ...defaultOptions, defaultJobOptions: { ...defaultOptions.defaultJobOptions, timeout: 30 * 60 * 1000, // 30 minutes }, });

export const webhookQueue = new Queue('webhooks', { ...defaultOptions, defaultJobOptions: { ...defaultOptions.defaultJobOptions, attempts: 5, backoff: { type: 'exponential', delay: 5000, }, }, });

Workers Email Worker // workers/email.worker.ts import { Worker, Job } from 'bullmq'; import { redis } from '../lib/redis'; import { EmailJobData } from '../jobs/types'; import { sendEmail } from '../lib/email';

const emailWorker = new Worker( 'email', async (job: Job) => { const { to, subject, template, variables } = job.data;

console.log(`Processing email job ${job.id} to ${to}`);

// Update progress
await job.updateProgress(10);

// Render template
const html = await renderTemplate(template, variables);
await job.updateProgress(50);

// Send email
const result = await sendEmail({
  to,
  subject,
  html,
});

await job.updateProgress(100);

return { messageId: result.messageId, sentAt: new Date() };

}, { connection: redis, concurrency: 10, // Process 10 emails at a time limiter: { max: 100, // Max 100 jobs duration: 60000, // Per minute }, } );

// Event handlers emailWorker.on('completed', (job, result) => { console.log(Email job ${job.id} completed:, result); });

emailWorker.on('failed', (job, error) => { console.error(Email job ${job?.id} failed:, error); });

emailWorker.on('progress', (job, progress) => { console.log(Email job ${job.id} progress: ${progress}%); });

export { emailWorker };

Image Processing Worker // workers/image.worker.ts import { Worker, Job } from 'bullmq'; import { redis } from '../lib/redis'; import { ImageProcessingJobData } from '../jobs/types'; import sharp from 'sharp'; import { S3Client, PutObjectCommand } from '@aws-sdk/client-s3';

const s3 = new S3Client({ region: process.env.AWS_REGION });

const imageWorker = new Worker( 'image-processing', async (job: Job) => { const { imageId, userId, operations } = job.data;

console.log(`Processing image ${imageId} for user ${userId}`);

// Download original image
const originalBuffer = await downloadImage(imageId);
let image = sharp(originalBuffer);

// Apply operations
for (let i = 0; i < operations.length; i++) {
  const op = operations[i];

  switch (op.type) {
    case 'resize':
      image = image.resize(op.params.width, op.params.height, {
        fit: op.params.fit || 'cover',
      });
      break;
    case 'crop':
      image = image.extract({
        left: op.params.left,
        top: op.params.top,
        width: op.params.width,
        height: op.params.height,
      });
      break;
    case 'watermark':
      image = image.composite([
        { input: op.params.watermarkPath, gravity: 'southeast' },
      ]);
      break;
  }

  await job.updateProgress(((i + 1) / operations.length) * 80);
}

// Convert and upload
const processedBuffer = await image.webp({ quality: 85 }).toBuffer();

const key = `processed/${userId}/${imageId}.webp`;
await s3.send(
  new PutObjectCommand({
    Bucket: process.env.S3_BUCKET,
    Key: key,
    Body: processedBuffer,
    ContentType: 'image/webp',
  })
);

await job.updateProgress(100);

return {
  url: `https://${process.env.S3_BUCKET}.s3.amazonaws.com/${key}`,
  size: processedBuffer.length,
};

}, { connection: redis, concurrency: 5, } );

imageWorker.on('failed', async (job, error) => { // Notify user of failure if (job) { await notifyUser(job.data.userId, { type: 'image-processing-failed', imageId: job.data.imageId, error: error.message, }); } });

export { imageWorker };

Webhook Worker with Retries // workers/webhook.worker.ts import { Worker, Job } from 'bullmq'; import { redis } from '../lib/redis'; import { WebhookJobData } from '../jobs/types';

const webhookWorker = new Worker( 'webhooks', async (job: Job) => { const { url, payload, headers = {} } = job.data;

const response = await fetch(url, {
  method: 'POST',
  headers: {
    'Content-Type': 'application/json',
    'X-Webhook-Signature': generateSignature(payload),
    ...headers,
  },
  body: JSON.stringify(payload),
  signal: AbortSignal.timeout(30000), // 30s timeout
});

if (!response.ok) {
  // Retry for server errors
  if (response.status >= 500) {
    throw new Error(`Webhook failed: ${response.status}`);
  }
  // Don't retry for client errors
  return {
    success: false,
    status: response.status,
    message: 'Client error, not retrying',
  };
}

return {
  success: true,
  status: response.status,
};

}, { connection: redis, concurrency: 20, } );

export { webhookWorker };

Adding Jobs Service Layer // services/jobs.service.ts import { emailQueue, imageQueue, reportQueue, webhookQueue } from '../queues'; import { JobsOptions } from 'bullmq';

export class JobService { // Send email static async sendEmail(data: EmailJobData, options?: JobsOptions) { return emailQueue.add('send-email', data, { ...options, priority: data.template === 'password-reset' ? 1 : 10, }); }

// Bulk emails static async sendBulkEmails(emails: EmailJobData[]) { const jobs = emails.map((data, index) => ({ name: 'send-email', data, opts: { delay: index * 100, // Stagger by 100ms }, }));

return emailQueue.addBulk(jobs);

}

// Process image static async processImage(data: ImageProcessingJobData) { return imageQueue.add('process', data, { jobId: image-${data.imageId}, // Prevent duplicates }); }

// Schedule report static async scheduleReport(data: ReportJobData, runAt: Date) { return reportQueue.add('generate', data, { delay: runAt.getTime() - Date.now(), }); }

// Send webhook static async sendWebhook(data: WebhookJobData) { return webhookQueue.add('deliver', data); } }

API Usage // app/api/users/route.ts import { JobService } from '@/services/jobs.service';

export async function POST(req: Request) { const data = await req.json();

// Create user const user = await db.user.create({ data });

// Queue welcome email await JobService.sendEmail({ to: user.email, subject: 'Welcome!', template: 'welcome', variables: { name: user.name }, });

return Response.json(user); }

Scheduled Jobs (Cron) // schedulers/index.ts import { Queue, QueueScheduler } from 'bullmq'; import { redis } from '../lib/redis';

// Daily report scheduler export async function setupSchedulers() { // Clean up old jobs daily await reportQueue.add( 'cleanup', {}, { repeat: { pattern: '0 0 * * *', // Every day at midnight }, } );

// Hourly metrics aggregation await metricsQueue.add( 'aggregate', {}, { repeat: { pattern: '0 * * * *', // Every hour }, } );

// Weekly digest await emailQueue.add( 'weekly-digest', { template: 'weekly-digest' }, { repeat: { pattern: '0 9 * * 1', // Every Monday at 9 AM }, } ); }

Job Events & Monitoring Event Listeners // monitoring/events.ts import { QueueEvents } from 'bullmq'; import { redis } from '../lib/redis';

const emailQueueEvents = new QueueEvents('email', { connection: redis });

emailQueueEvents.on('completed', ({ jobId, returnvalue }) => { console.log(Job ${jobId} completed with:, returnvalue); metrics.increment('email.completed'); });

emailQueueEvents.on('failed', ({ jobId, failedReason }) => { console.error(Job ${jobId} failed:, failedReason); metrics.increment('email.failed');

// Alert on repeated failures alertOnFailure(jobId, failedReason); });

emailQueueEvents.on('delayed', ({ jobId, delay }) => { console.log(Job ${jobId} delayed by ${delay}ms); });

emailQueueEvents.on('progress', ({ jobId, data }) => { console.log(Job ${jobId} progress:, data); });

emailQueueEvents.on('stalled', ({ jobId }) => { console.warn(Job ${jobId} stalled); metrics.increment('email.stalled'); });

Bull Board Dashboard // app/api/admin/queues/route.ts import { createBullBoard } from '@bull-board/api'; import { BullMQAdapter } from '@bull-board/api/bullMQAdapter'; import { ExpressAdapter } from '@bull-board/express'; import { emailQueue, imageQueue, reportQueue, webhookQueue } from '@/queues';

const serverAdapter = new ExpressAdapter(); serverAdapter.setBasePath('/api/admin/queues');

createBullBoard({ queues: [ new BullMQAdapter(emailQueue), new BullMQAdapter(imageQueue), new BullMQAdapter(reportQueue), new BullMQAdapter(webhookQueue), ], serverAdapter, });

export const GET = serverAdapter.getRouter(); export const POST = serverAdapter.getRouter();

Error Handling // workers/base.worker.ts import { Worker, Job, UnrecoverableError } from 'bullmq';

// Custom error for non-retryable failures export class NonRetryableError extends UnrecoverableError { constructor(message: string) { super(message); this.name = 'NonRetryableError'; } }

// Worker with error handling const worker = new Worker( 'queue-name', async (job: Job) => { try { // Validate input if (!job.data.requiredField) { throw new NonRetryableError('Missing required field'); }

  // Process job
  return await processJob(job.data);
} catch (error) {
  if (error instanceof NonRetryableError) {
    throw error; // Won't retry
  }

  // Log and rethrow for retry
  console.error(`Job ${job.id} error:`, error);
  throw error;
}

}, { connection: redis, } );

// Handle worker errors worker.on('error', (error) => { console.error('Worker error:', error); });

Graceful Shutdown // server.ts import { emailWorker, imageWorker, reportWorker } from './workers';

const workers = [emailWorker, imageWorker, reportWorker];

async function gracefulShutdown() { console.log('Shutting down workers...');

// Close workers gracefully await Promise.all( workers.map((worker) => worker.close().catch((err) => { console.error('Error closing worker:', err); }) ) );

// Close Redis connections await redis.quit(); await redisSubscriber.quit();

console.log('Workers shut down'); process.exit(0); }

process.on('SIGTERM', gracefulShutdown); process.on('SIGINT', gracefulShutdown);

Best Practices Idempotent jobs: Jobs should be safe to retry Unique job IDs: Prevent duplicate processing Set timeouts: Prevent stuck jobs Use progress updates: For long-running jobs Handle failures gracefully: Alert and log Clean up old jobs: Remove completed/failed jobs Graceful shutdown: Wait for jobs to complete Monitor queues: Use Bull Board or similar Output Checklist

Every queue implementation should include:

Redis connection with proper config Typed job data interfaces Queue with default options Worker with concurrency limits Retry and backoff configuration Event handlers for monitoring Error handling (retryable vs non-retryable) Graceful shutdown handling Bull Board or monitoring dashboard Scheduled/recurring jobs (if needed)

返回排行榜