Async Jobs
Architecture
Nuxflare Pro implements async job processing using Cloudflare Queues and Cron triggers:
- Queue-based Jobs: Process tasks asynchronously through a message queues
- Cron Jobs: Execute scheduled tasks on a fixed interval
Components
Queue Consumer
// Worker that processes queued jobs
import { drizzle } from "drizzle-orm/d1";
import { eq } from "drizzle-orm";
import * as schema from "./database/schema";
export default {
async queue(batch: MessageBatch, env: Environment) {
// Process jobs in batches
for (const message of batch.messages) {
const job = message.body;
const db = drizzle(env.AppDB, { schema });
// Update job status and log completion
await db
.update(schema.jobs)
.set({ status: "completed" })
.where(eq(schema.jobs.id, job.id));
await db.insert(schema.logs).values({
jobId: job.id,
message: "job completed",
});
}
},
};
Cron Worker
// Worker that runs on a schedule
import { drizzle } from "drizzle-orm/d1";
import * as schema from "./database/schema";
export default {
async scheduled(_event: ScheduledEvent, env: Environment) {
const db = drizzle(env.AppDB, { schema });
await db.insert(schema.logs).values({
message: "scheduled task executed",
});
},
};
Configuration
Infrastructure Setup
infra/jobs.ts
// Configure queue in your infrastructure
export const jobsQueue = new sst.cloudflare.Queue("JobQueue");
const wrangler = {
queues: {
consumers: [
{
queue: jobsQueue.name,
max_batch_size: 10,
max_batch_timeout: 5,
max_retries: 1,
max_concurrency: 5,
retry_delay: 120,
},
],
},
};
Cron Setup
infra/cron.ts
// Configure cron schedule
export const emails = new sst.cloudflare.Cron("Cron", {
schedules: ["0 * * * *"], // every hour
job: {
handler: "packages/functions/src/cron.ts",
link: [d1],
},
});
Usage
Creating Jobs
// Submit a job to the queue
const job = await useDB()
.insert(jobs)
.values({
message: input.message,
teamId: team.id,
status: "pending",
})
.returning()
.get();
await jobsQueue.send(job);
Monitoring Jobs
The UI provides:
- Job status monitoring
- Execution logs
- Cron job execution history
Jobs can be in the following states:
pending
: Waiting to be processedcompleted
: Successfully processedfailed
: Failed to process
Database Schema
Jobs and logs are stored in D1:
jobs
: Stores job details and statuslogs
: Tracks job execution and cron events