Async Jobs

Architecture

Nuxflare Pro implements async job processing using Cloudflare Queues and Cron triggers:

  1. Queue-based Jobs: Process tasks asynchronously through a message queues
  2. Cron Jobs: Execute scheduled tasks on a fixed interval

Components

Queue Consumer

Nuxflare Queue Consumer Screenshot

// Worker that processes queued jobs
import { drizzle } from "drizzle-orm/d1";
import { eq } from "drizzle-orm";
import * as schema from "./database/schema";

export default {
  async queue(batch: MessageBatch, env: Environment) {
    // Process jobs in batches
    for (const message of batch.messages) {
      const job = message.body;
      const db = drizzle(env.AppDB, { schema });

      // Update job status and log completion
      await db
        .update(schema.jobs)
        .set({ status: "completed" })
        .where(eq(schema.jobs.id, job.id));

      await db.insert(schema.logs).values({
        jobId: job.id,
        message: "job completed",
      });
    }
  },
};

Cron Worker

// Worker that runs on a schedule
import { drizzle } from "drizzle-orm/d1";
import * as schema from "./database/schema";

export default {
  async scheduled(_event: ScheduledEvent, env: Environment) {
    const db = drizzle(env.AppDB, { schema });
    await db.insert(schema.logs).values({
      message: "scheduled task executed",
    });
  },
};

Configuration

Infrastructure Setup

infra/jobs.ts
// Configure queue in your infrastructure
export const jobsQueue = new sst.cloudflare.Queue("JobQueue");

const wrangler = {
  queues: {
    consumers: [
      {
        queue: jobsQueue.name,
        max_batch_size: 10,
        max_batch_timeout: 5,
        max_retries: 1,
        max_concurrency: 5,
        retry_delay: 120,
      },
    ],
  },
};

Cron Setup

infra/cron.ts
// Configure cron schedule
export const emails = new sst.cloudflare.Cron("Cron", {
  schedules: ["0 * * * *"], // every hour
  job: {
    handler: "packages/functions/src/cron.ts",
    link: [d1],
  },
});

Usage

Creating Jobs

// Submit a job to the queue
const job = await useDB()
  .insert(jobs)
  .values({
    message: input.message,
    teamId: team.id,
    status: "pending",
  })
  .returning()
  .get();

await jobsQueue.send(job);

Monitoring Jobs

The UI provides:

  • Job status monitoring

Nuxflare Queue Job Status Screenshot

  • Execution logs

Nuxflare Queue Logs Screenshot

  • Cron job execution history

Nuxflare Queue Cron Logs Screenshot

Jobs can be in the following states:

  • pending: Waiting to be processed
  • completed: Successfully processed
  • failed: Failed to process

Database Schema

Jobs and logs are stored in D1:

  • jobs: Stores job details and status
  • logs: Tracks job execution and cron events