Queue adapter
Pick where your background jobs run, Postgres (pg-boss), Redis (BullMQ), or Cloudflare Queues, by passing one adapter to .build({ queue }). The same job code runs on any of them.
The queue adapter is the slot that decides where your background work runs. You write a job once; the adapter is the plug-in backend that stores, schedules, and hands jobs to a worker. Swap pgBossAdapter for bullMQAdapter and the exact same job code moves from Postgres to Redis, no handler change, no dispatch change. QUESTPIE ships three adapters and an open QueueAdapter contract so you can write your own.
This page is the infrastructure reference: which adapter to choose, how each one is configured, the capability flags that gate listen / runOnce / schedule, and how to plug in a custom backend. For how to define and dispatch jobs (job(), queue.<name>.publish, the handler context), see Jobs.
Prerequisites
Read Jobs first, it owns job(), queue.<name>.publish/schedule, and the worker entrypoints. This page assumes you know how a job is defined and dispatched, and focuses only on the adapter that backs the queue. See also Configuration for the .build({ ... }) model.
What it does
- Choose your backend with one line. Pass an adapter to
.build({ queue: { adapter } }); everything above it, the typedapp.queueclient, dispatch, retries, scheduling, stays identical. - Stay portable. Job handlers and
publishcalls never name the backend. Moving from pg-boss in dev to BullMQ in production is an adapter swap, nothing else. - Run on Postgres, Redis, or Cloudflare. Reuse your existing database (pg-boss), a Redis instance (BullMQ), or a push-based serverless queue (Cloudflare Queues).
- Advertise capabilities. Each adapter declares whether it supports long-running workers, one-batch draining, push delivery, cron scheduling, and singleton dedup, so the runtime can fail fast or branch.
- Keep drivers out of your bundle. Each adapter lives at its own import subpath (
questpie/adapters/pg-boss, …), sopg-boss/bullmq/ Cloudflare bindings load only when you import them. - Extensible by contract.
QueueAdapteris the same seam the three built-ins implement, bring any broker.
Quick start
Pick one adapter and pass it to .build(). Codegen fills jobs from your jobs/ files; you only supply the adapter.
import { pgBossAdapter } from "questpie/adapters/pg-boss";
export default questpie(/* modules */).build({
// `jobs` is filled by codegen from your jobs/ files, you only supply the adapter.
queue: { adapter: pgBossAdapter({ connectionString: env.DATABASE_URL }) },
});That single line wires app.queue to Postgres. Define a job, run questpie generate, and dispatch it as shown in Jobs, the adapter does the rest.
The adapter is required once any job exists
If your app has at least one job but no queue: { adapter }, the queue service throws: "Queue adapter is required when jobs are defined. Provide adapter in .build({ queue: { adapter: ... } })." With no jobs at all, app.queue is an empty {} and no adapter is needed.
Choosing an adapter
| Adapter | Import | Backed by | Best for |
|---|---|---|---|
| pg-boss | questpie/adapters/pg-boss | Postgres | The default, reuse your app database, zero extra infrastructure. |
| BullMQ | questpie/adapters/bullmq | Redis | High-throughput queues when you already run Redis. |
| Cloudflare Queues | questpie/adapters/cloudflare-queues | CF Queues | Serverless on Cloudflare Workers (push delivery). |
Each adapter advertises what it can do through capability flags. These gate which worker methods you may call:
| Capability | pg-boss | BullMQ | Cloudflare Queues |
|---|---|---|---|
longRunningConsumer (queue.listen()) | yes | yes | no |
runOnceConsumer (queue.runOnce()) | yes | yes | no |
pushConsumer (queue.createPushConsumer()) | no | no | yes |
scheduling (options.cron, queue.<n>.schedule()) | yes | yes | no |
singleton (singletonKey dedup) | yes | yes | no |
Detect capabilities at runtime with `app.queue.capabilities`
The resolved flags are exposed on app.queue.capabilities, longRunningConsumer, runOnceConsumer, pushConsumer, scheduling, singleton. If one codebase targets multiple runtimes, check the relevant flag before calling listen / runOnce / schedule rather than catching the throw. When an adapter omits capabilities, the runtime infers them: longRunningConsumer/runOnceConsumer/pushConsumer default to "is the matching method implemented?", scheduling to "are both schedule and unschedule functions?", and singleton to false.
The consumer models
The adapter's capabilities decide how a worker drains the queue. The mechanics of starting a worker live in Jobs → Running a worker; here's how each model maps to the adapters:
- Long-running (
queue.listen()), a persistent process polls and runs jobs until stopped. pg-boss and BullMQ. - Run-once (
queue.runOnce()), drain one bounded batch and exit, for a serverless function or scheduled tick. pg-boss and BullMQ. - Push (
queue.createPushConsumer()), the platform delivers batches to your worker; you don't poll. Cloudflare Queues only.
There is no `questpie worker` CLI command
Workers start programmatically, you import your built app and call app.queue.listen() (long-running) or app.queue.runOnce() (serverless tick), or wire app.queue.createPushConsumer() to the platform's queue entrypoint. The questpie CLI has no queue command. The full entrypoint recipes are in Jobs → Running a worker.
pg-boss (Postgres)
The default and simplest production setup: jobs live in your existing Postgres database, so there's no extra service to run.
import { pgBossAdapter } from "questpie/adapters/pg-boss";
.build({
queue: { adapter: pgBossAdapter({ connectionString: env.DATABASE_URL }) },
});PgBossAdapterOptions is pg-boss's own ConstructorOptions, a direct re-export, so every option passes straight through to new PgBoss(...):
import { pgBossAdapter } from "questpie/adapters/pg-boss";
pgBossAdapter({
connectionString: env.DATABASE_URL, // or pass `db`, `host`/`port`/`user`/…
schema: "pgboss", // dedicated schema for queue tables (pg-boss default)
max: 10, // connection pool size
});Behavior worth knowing:
- Queues are created on demand. The adapter calls
createQueue(jobName)the first time it touches a job and caches it, you never declare queues yourself. PublishOptionsmaps ~1:1 to pg-boss send options (priority,startAfter,singletonKey,retryLimit,retryDelay,expireInSeconds), passed through directly.- Batches are processed serially. pg-boss always hands an array to its
work()callback; the adapter iterates items one at a time and reports per-item failures viaboss.fail(...)so a sibling failure doesn't sink the rest of the batch. runOnceneeds pg-bossfetch()support, it throws "PgBossAdapter.runOnce requires pg-boss fetch() support." if the installed pg-boss version doesn't expose it.
Supports long-running workers, runOnce, cron scheduling, and native singletonKey.
BullMQ (Redis)
Use BullMQ when you already run Redis and want a dedicated, high-throughput queue.
import { bullMQAdapter } from "questpie/adapters/bullmq";
.build({
queue: {
adapter: bullMQAdapter({
connection: { host: env.REDIS_HOST, port: env.REDIS_PORT },
}),
},
});BullMQAdapterOptions:
interface BullMQAdapterOptions {
connection: ConnectionOptions; // bullmq/ioredis connection (required)
queuePrefix?: string; // namespace key prefix for queues/workers
workerOptions?: Omit<WorkerOptions, "connection" | "prefix">; // extra bullmq Worker opts
}Behavior worth knowing:
- Lazy connect.
start()is a no-op; queues and workers connect when first used. - One
Workerper job name.listen()creates a BullMQWorkerfor each job;teamSizemaps to the worker'sconcurrency. - Option mapping (
PublishOptions→ BullMQJobsOptions):startAfter→delay(ms),singletonKey→jobId(dedup by id, not a true singleton lock),retryLimit→attempts(retryLimit + 1),retryDelay→backoff.delayin milliseconds (type: exponentialwhenretryBackoff, elsefixed),expireInSeconds→removeOnComplete: { age },priority→priority. - Scheduling uses BullMQ's repeatable jobs (
repeat: { pattern: cron, key: "questpie:<jobName>" });unschedule()removes every repeatable tied to that name.
`retryDelay` is still in seconds, even on BullMQ
You always pass retryDelay in seconds, matching every other adapter. The BullMQ adapter multiplies it by 1000 internally to get BullMQ's millisecond backoff.delay. Don't pre-multiply.
Supports long-running workers, runOnce, cron scheduling, and singletonKey (id-dedup).
Cloudflare Queues (push)
For QUESTPIE apps deployed to Cloudflare Workers. Cloudflare pushes batches to your worker, so there's no long-running listener and no in-app scheduling, those come from the platform.
import { cloudflareQueuesAdapter } from "questpie/adapters/cloudflare-queues";
.build({
queue: { adapter: cloudflareQueuesAdapter({ queue: env.MY_QUEUE }) },
});CloudflareQueuesAdapterOptions, pass either queue or enqueue (the constructor throws if you pass neither):
interface CloudflareQueuesAdapterOptions {
// A Cloudflare Queues producer binding (or a function returning one).
queue?: CloudflareQueueBinding | (() => MaybePromise<CloudflareQueueBinding>);
// ...or your own enqueue function (returns a message id, or null).
enqueue?: (
message: CloudflareQueueEnvelope,
opts?: { delaySeconds?: number },
) => Promise<string | null>;
// Decode a raw pushed body into an envelope (defaults to requiring a `jobName` string).
decode?: (body: unknown) => CloudflareQueueEnvelope | null;
}Wire the push consumer to your Worker's queue() entrypoint (full recipe in Jobs → Push model):
import { app } from "#questpie";
const consume = app.queue.createPushConsumer();
export default {
async queue(batch, _env, _ctx) {
await consume(batch as never);
},
};Behavior worth knowing:
- Push-only.
schedule()/unschedule()throw, "…does not support cron scheduling. Use platform cron triggers." Use Cloudflare's Cron Triggers for recurring work, callingrunOnce-style draining from a scheduled handler if needed. publish()returnsnullwith thequeuebinding form, because Cloudflare'ssend()exposes no stable message id.- Delays cap at 24 hours.
startAfter→delaySecondsandretryDelay→ retrydelaySeconds, both throwing if they exceed 24h (or ifstartAfteris an invalid ISO string). - Retry handling. On a handler error the consumer calls
message.retry(...). The message is only acked (dropped) when aretryLimitwas published andattempts >= retryLimit + 1; with noretryLimitset, a failing message always retries. Undecodable messages and messages with no matching handler are acked and skipped. runtime: "cloudflare"marker. The adapter carries this so QUESTPIE's runtime-compatibility check can verify a Cloudflare Workers deploy uses a push-based queue. A custom adapter targeting Workers must setruntime: "cloudflare"and implementcreatePushConsumer().
The QueueAdapter contract
Every adapter, built-in or custom, implements this interface. It's the single seam the queue client talks to:
interface QueueAdapter {
capabilities?: Partial<QueueAdapterCapabilities>;
// Required:
start(): Promise<void>;
stop(): Promise<void>;
publish(jobName: string, payload: any, options?: PublishOptions): Promise<string | null>;
schedule(jobName: string, cron: string, payload: any,
options?: Omit<PublishOptions, "startAfter">): Promise<void>;
unschedule(jobName: string): Promise<void>;
on(event: "error", handler: (error: Error) => void): void;
// Optional, presence drives capability fallbacks:
listen?(handlers: QueueHandlerMap, options?: QueueListenOptions): Promise<void>;
runOnce?(handlers: QueueHandlerMap, options?: QueueRunOnceOptions): Promise<QueueRunOnceResult>;
createPushConsumer?(args: { handlers: QueueHandlerMap }): QueuePushConsumerHandler;
}start / stop / publish / schedule / unschedule / on are required; listen / runOnce / createPushConsumer are optional, whether you implement each one determines the matching capability flag when capabilities leaves it unset (see the capability callout above).
Two contract details adapter authors rely on:
- Handlers are keyed by the durable job name. The framework builds a
QueueHandlerMap(Record<jobName, handler>) and passes it tolisten/runOnce/createPushConsumer. Each handler receives aQueueJobRecord,{ id: string; data: unknown }. datais raw, the framework re-validates it. Adapters store and replay the raw payload; QUESTPIE re-parsesdataagainst the job's Zod schema before invoking the user handler. Your adapter never needs to validate. (See Jobs for the dispatch- and handler-time validation passes.)
Extending: a custom adapter
The queue backend is open. Implement QueueAdapter, declare your capabilities, and pass an instance to .build({ queue: { adapter } }), your adapter plugs into the exact same seam the three built-ins use, with no privileged internal API.
import type { QueueAdapter } from "questpie/queue";
class MyAdapter implements QueueAdapter {
capabilities = {
longRunningConsumer: true,
runOnceConsumer: false,
pushConsumer: false,
scheduling: false,
singleton: false,
};
async start() { /* connect to broker */ }
async stop() { /* disconnect */ }
async publish(jobName, payload, options) { /* enqueue */ return "job-id"; }
async schedule(jobName, cron, payload, options) { /* register cron */ }
async unschedule(jobName) { /* cancel cron */ }
on(event, handler) { /* register an "error" listener */ }
// Optional, implement what your broker supports:
async listen(handlers, options) { /* long-running consumer */ }
async runOnce(handlers, options) { return { processed: 0 }; }
createPushConsumer(args) { return async (batch) => { /* drain a pushed batch */ }; }
}
// .build({ queue: { adapter: new MyAdapter() } })This is the QUESTPIE principle in action: core, modules, and your own code reach the queue through one contract. The full "build your own adapter" loop, across every adapter kind, lives in Building a plugin.
TypeScript
The adapter contract types are public from questpie/queue; each adapter's option types come from its own subpath:
import type {
QueueAdapter,
QueueAdapterCapabilities,
QueueHandlerMap,
QueueJobRecord,
QueuePushBatch,
} from "questpie/queue";
import type { PgBossAdapterOptions } from "questpie/adapters/pg-boss";
import type { BullMQAdapterOptions } from "questpie/adapters/bullmq";
import type {
CloudflareQueuesAdapterOptions,
CloudflareQueueEnvelope,
} from "questpie/adapters/cloudflare-queues";questpie/queue re-exports the whole contract surface (QueueAdapter, the Queue* types, capabilities). The concrete adapters are deliberately not re-exported from questpie/queue, importing them from questpie/adapters/* keeps pg-boss / bullmq / Cloudflare bindings out of your bundle unless you use them. The job-facing types (JobDefinition, PublishOptions, QueueClient, InferJobPayload, …) are documented in Jobs → TypeScript.
Related
- Jobs, the feature this adapter serves:
job(), dispatch, the handler context, and the worker entrypoints. Start here. - Configuration, the
.build({ ... })model and runtime-compatibility checks that read the adapter. - Building a plugin, implement
QueueAdapter(and the other adapter contracts) for any backend. - Adapters, the full list of swappable backends (queue, search, realtime, storage, email, KV).
Overview
Adapters are the swappable backends behind QUESTPIE's infrastructure, queue, search, realtime, storage, email, and KV. Pick one per concern in your config and the same app code runs on any of them.
Search adapter
Add full-text and semantic search to your collections with one adapter in questpie.config.ts, pg_trgm lexical search out of the box, pgvector + embeddings when you want meaning-based recall.