Realtime adapter
The server-side transport that powers live queries, turn on realtime in your config and changes are recorded to an outbox, then fanned out across every app instance over pg_notify, Redis streams, or Cloudflare Durable Objects.
The realtime adapter is the server-side transport behind QUESTPIE's live queries. Set realtime in your runtime config and every write is recorded to a change outbox table; the adapter then signals each app instance to re-run the affected subscriptions and push fresh snapshots over SSE. With no adapter the service polls that outbox on a timer, fine for a single process. Add pgNotifyAdapter() or redisStreamsAdapter({ client }) and changes propagate the instant they happen, across every horizontally-scaled instance.
This page covers the server side: the realtime config, the outbox/poll model, and each transport adapter's options. The client API, live(), liveIter(), and the TanStack Query { realtime: true } flag, is taught on Realtime.
Prerequisites
Read Configuration (where adapters are wired in questpie.config.ts) and Realtime (the client live() API this transport serves).
What it does
- Records every change to an outbox. Each create/update/delete (and bulk variant) appends a row to the
questpie_realtime_logtable, a durable, ordered change log keyed by an auto-incrementingseq. - Fans out across instances. The adapter broadcasts a tiny notice ("seq N on collection posts changed") to every process, which then drains the outbox and pushes snapshots to its own SSE subscribers.
- Zero-config default. Turn
realtimeon withtrueand you get a working transport, polling on a non-Postgres setup, or an auto-wiredpg_notifylistener when a Postgres connection is available. - Push transports for scale.
pgNotifyAdapterrides the database you already run;redisStreamsAdapteruses a Redis consumer group;cloudflareRealtimeAdapteris backed by a Durable Object. - Privacy-safe by design. Only a
RealtimeNoticecrosses the wire, never row data. Each subscriber re-runs its own access-controlled query to build its snapshot, so one user's data can't leak through the transport. - Swap with one config line. The transport is a single
realtime.adaptervalue, no app-code change to move from polling to Redis.
Quick start
Realtime is off until you turn it on in the server config. Set realtime in runtimeConfig and the app starts recording changes and serving the POST /realtime SSE stream:
import { runtimeConfig } from "questpie/app";
import env from "./env"; // declared + validated in env.ts, see Environment
export default runtimeConfig({
db: { url: env.DATABASE_URL },
realtime: true,
});That's the whole server requirement. Subscribe from the client with client.collections.posts.live(...), see Realtime for the client API.
No `realtime` config = no live mode
live() / liveIter() / { realtime: true } all need realtime set in the server config. The client always wires client.realtime, but without server realtime the POST /realtime endpoint isn't served and subscriptions error out (via onError / a thrown generator) rather than silently degrading. Turn it on before relying on live queries.
How the transport works
The flow, from a write to a pushed snapshot:
| Step | What happens | Source |
|---|---|---|
| 1. Write commits | A create/update/delete appends one row to questpie_realtime_log (appendChange), { seq, resourceType, resource, operation, recordId?, locale?, payload }. | realtime/service.ts:217; realtime/collection.ts:9 |
| 2. Signal | The adapter notify(event) broadcasts a minimal RealtimeNotice ({ seq, resourceType, resource, operation }) to every instance. | realtime/adapter.ts:9; realtime/service.ts:249 |
| 3. Drain | Each instance reacts to the notice (or a poll tick) by drain()ing new outbox rows past its watermark. | realtime/service.ts:418-426 |
| 4. Re-run + push | For each affected subscription, the server re-runs the query under the subscriber's session and pushes the fresh snapshot over its POST /realtime SSE stream. | realtime/service.ts (drain → listeners) |
The adapter is only the signal layer (steps 2-3). The outbox (step 1) and the access-controlled re-run (step 4) happen regardless of which adapter you pick. This is why the notice carries no payload, the snapshot is always rebuilt server-side per subscriber.
The notice is a cue, not the data
A RealtimeNotice is Pick<RealtimeChangeEvent, "seq" | "resourceType" | "resource" | "operation">, just enough to say something matching changed. The full event (with payload) lives in the outbox; subscribers never receive another user's rows over the transport.
RealtimeConfig, the options
Use realtime: true to turn on the default transport. Use an object when you want to configure the transport or tune the SSE/outbox behavior:
interface RealtimeConfig {
adapter?: RealtimeAdapter; // transport (pg_notify, redis, …). Omit → poll / auto pg_notify
pollIntervalMs?: number; // default 2000, drain interval when running pollless (no adapter)
batchSize?: number; // default 500, max outbox rows read per drain
retentionDays?: number; // optional time-based outbox safety window
keepAliveIntervalMs?: number; // default 8000, SSE keep-alive ping interval
}| Option | Default | What it controls |
|---|---|---|
adapter | (none) | The transport that broadcasts change notices. Omit it for the poll / auto-pg_notify default (below). |
pollIntervalMs | 2000 | How often the service drains the outbox when there is no adapter (defaults to 0 if you omit it while an adapter is set). With an adapter, drains are notice-driven, so this is ignored, the poll timer only runs on the no-adapter path. |
batchSize | 500 | Max outbox rows pulled per drain. Raise it for very high write volume. |
retentionDays | (none) | Adds a time-based cleanup window. The service always does watermark cleanup (it deletes rows below the min consumed seq across active subscribers); retentionDays is an extra safety net for rows no subscriber will ever read. |
keepAliveIntervalMs | 8000 | Interval between ping keep-alive events on each POST /realtime SSE stream. |
Keep `keepAliveIntervalMs` under your idle timeout
The default 8000 ms is deliberately under Bun's default 10s idleTimeout, so SSE streams survive on an untuned Bun.serve. Behind a proxy with a shorter read timeout (some default to 30-60s, but custom ones can be lower), drop it further so the connection never goes idle long enough to be culled.
The default: poll vs auto pg_notify
Omitting adapter does not always mean naive polling. The service resolves the transport lazily on the first subscription (in ensureStarted(), triggered from subscribe(), not at process boot), like this:
- Explicit
adapterset → use it. - No adapter, but a Postgres connection string is available → the service lazily constructs a
pgNotifyAdapteron thequestpie_realtimechannel for you. Most Postgres apps get push delivery fromrealtime: truewith no extra config. - No adapter and no Postgres connection → fall back to polling the outbox every
pollIntervalMs(default 2s).
Polling is real, just laggy
The poll path is correct, not a downgrade in behavior, it drains the same outbox and pushes the same access-controlled snapshots, just on a 2s timer instead of on-change. It's a fine default for a single instance and local dev. For multiple processes you want a real adapter (next section) so a write on instance A reaches a subscriber on instance B.
One instance, multiple processes need a shared transport
The outbox is shared (it's a DB table), but the poll timer and the auto-pg_notify listener are per process. Polling means each instance independently notices changes on its own timer, workable but laggy and DB-chatty at scale. An explicit pgNotifyAdapter / redisStreamsAdapter fans a single notice out to all instances the moment a change lands. Reach for one once you run more than one process.
pgNotifyAdapter, Postgres LISTEN/NOTIFY
The simplest push transport: it broadcasts change notices over a Postgres channel with pg_notify, and every instance LISTENs. No infrastructure beyond the database you already run.
import { runtimeConfig } from "questpie/app";
import { pgNotifyAdapter } from "questpie/adapters/pg-notify";
import env from "./env";
export default runtimeConfig({
db: { url: env.DATABASE_URL },
realtime: {
adapter: pgNotifyAdapter({
connectionString: env.DATABASE_URL,
// channel: "questpie_realtime", // default
}),
},
});PgNotifyAdapterOptions is { channel?, client?, connection?, connectionString? }. You must supply one connection source.
| Option | Type | Default | Notes |
|---|---|---|---|
channel | string | "questpie_realtime" | Postgres channel name. Validated against /^[a-zA-Z0-9_]+$/, an invalid name throws at construction. |
client | pg.Client | none | An existing pg client to reuse. Not .end()-ed by the adapter on stop (you own its lifecycle). |
connection | pg.ClientConfig | none | A pg connection config; the adapter constructs and owns the client. |
connectionString | string | none | A connection URL; the adapter constructs and owns the client. |
Behavior:
- Needs the
pgpackage. It's an optional peer dependency (pg ^8.13.1), not bundled, the adapterawait import("pg")s it lazily. Installpgif it isn't already in your app. - One connection source required. Pass exactly one of
client/connection/connectionString, or the adapter throws"PgNotifyAdapter requires a pg Client or connection config"on start. - Owned vs borrowed client. A
connection/connectionStringclient is owned and closed onstop(); aclientyou pass is left open (ownsClient = false). - Uses
LISTEN/UNLISTEN+pg_notify($1, $2). The notice JSON ({ seq, resourceType, resource, operation }) is the channel payload.
You usually don't need to write this
On a Postgres app, realtime: true already auto-wires pg_notify on the default channel (see the default above). Configure pgNotifyAdapter explicitly only when you want a non-default channel, want to reuse an existing pg client, or are pointing realtime at a different database than your main db.
redisStreamsAdapter, Redis Streams
For Redis-backed fan-out across many instances, use Redis Streams with a consumer group. The client is required, QUESTPIE doesn't ship a Redis driver, so you bring your own (a small command-shaped object modeled on node-redis):
import { runtimeConfig } from "questpie/app";
import { redisStreamsAdapter } from "questpie/adapters/redis-streams";
import { createClient } from "redis";
import env from "./env";
const redis = createClient({ url: env.REDIS_URL });
await redis.connect();
export default runtimeConfig({
db: { url: env.DATABASE_URL },
realtime: {
adapter: redisStreamsAdapter({
client: redis,
// stream: "questpie:realtime", // default
// group: "questpie-realtime", // default
// consumer: "consumer-<uuid>", // default: random per process
// blockMs: 5000, batchSize: 100, // defaults
}),
},
});RedisStreamsAdapterOptions is { client (required), stream?, group?, consumer?, blockMs?, batchSize? }.
| Option | Type | Default | Notes |
|---|---|---|---|
client | RedisStreamsClient | none required | Your Redis client. Must implement xAdd + xReadGroup; xGroupCreate / xAck / quit / disconnect are optional. |
stream | string | "questpie:realtime" | The Redis stream key notices are XADD-ed to. |
group | string | "questpie-realtime" | Consumer group name. Created with MKSTREAM; a BUSYGROUP (group already exists) is swallowed. |
consumer | string | consumer-<random uuid> | Per-process consumer id. The random default lets every instance read the stream without stealing each other's messages. |
blockMs | number | 5000 | How long XREADGROUP blocks waiting for new messages. |
batchSize | number | 100 | COUNT per XREADGROUP call. |
Behavior:
- Uses
XADD/XREADGROUP/XACK.notify()XADDs the notice fields (seq,resourceType,resource,operation) as strings; the read loop acknowledges each delivered message. - Resilient read loop. On a read error it backs off 500ms and retries while running;
stop()ends the loop. - Driver-agnostic. Any client matching the
RedisStreamsClientshape works, the type is modeled onnode-rediscommand names but isn't tied to it.
`RedisStreamsClient` is a structural contract
type RedisStreamsClient = {
xAdd(stream, id, fields): Promise<string>;
xGroupCreate?(stream, group, id, options?: { MKSTREAM?: boolean }): Promise<unknown>;
xReadGroup(...args: any[]): Promise<unknown>;
xAck?(stream, group, id | id[]): Promise<unknown>;
quit?(): Promise<void>;
disconnect?(): void;
};Only xAdd and xReadGroup are mandatory.
cloudflareRealtimeAdapter, Durable Objects
On Cloudflare Workers, realtime is backed by a Durable Object: there's no long-lived process to hold subscriptions or a poll timer, so the DO is the fan-out point. cloudflareRealtimeAdapter({ namespace }) proxies subscribe/notify through it.
import { runtimeConfig } from "questpie/app";
import { cloudflareRealtimeAdapter } from "questpie/adapters/cloudflare-realtime";
export default runtimeConfig({
realtime: {
adapter: cloudflareRealtimeAdapter({
// A Durable Object namespace binding, or a zero-arg function that
// returns one (so you can resolve it lazily from your Worker env).
namespace: () => getEnv().QUESTPIE_REALTIME,
// objectName: "questpie-realtime", // default DO instance name
// hubPath: "/__questpie/realtime", // default internal hub path
}),
},
});CloudflareRealtimeAdapterOptions is { namespace (required), objectName?, hubPath? }. The namespace is a Durable Object namespace binding, or a zero-argument function that returns one (resolved lazily when the adapter first needs it, so you can defer reading it out of your Worker env).
| Option | Type | Default | Notes |
|---|---|---|---|
namespace | CloudflareDurableObjectNamespaceInput | none required | The DO namespace binding, or () => namespace. |
objectName | string | "questpie-realtime" | Name of the DO instance (idFromName) all notices route through. |
hubPath | string | "/__questpie/realtime" | Internal hub path; the adapter derives ${hubPath}/subscribe and ${hubPath}/notify. |
This adapter carries a runtime: "cloudflare" discriminator so the Cloudflare fetch/queue handlers detect it, and it's wired together with the createCloudflareRealtimeDurableObjectHandler export (from questpie/adapters/cloudflare) that implements the DO itself.
Cloudflare realtime is a deployment-specific setup
Unlike pg_notify / Redis, this adapter requires a Durable Object class bound in your wrangler config plus the DO handler. It's part of the Cloudflare Workers deployment story rather than a drop-in line. The other two adapters need no Workers-specific wiring.
The outbox table
The change log lives in one Drizzle table, questpie_realtime_log, created by codegen alongside your schema (it's part of the core module). You rarely touch it directly, but it's worth knowing it exists:
| Column | Notes |
|---|---|
seq | bigserial primary key, the monotonic ordering every subscriber watermarks against. |
resource_type | "collection" or "global". |
resource | The collection/global name. |
operation | create / update / delete / bulk_update / bulk_delete. |
record_id, locale | Nullable; the specific record + locale when applicable. |
payload | jsonb (default {}), the change detail, kept server-side (never broadcast). |
created_at | Insert timestamp; powers retentionDays cleanup. |
TypeScript
The transport contract and config types are exported from questpie (and questpie/realtime):
import type {
RealtimeAdapter, // the transport contract you implement for a custom adapter
RealtimeConfig, // the `realtime` config shape
RealtimeChangeEvent, // a full outbox event
RealtimeNotice, // the minimal broadcast notice
RealtimeOperation, // "create" | "update" | "delete" | "bulk_update" | "bulk_delete"
RealtimeResourceType, // "collection" | "global"
} from "questpie/realtime";Each adapter's option type ships from its own entry, PgNotifyAdapterOptions from questpie/adapters/pg-notify, RedisStreamsAdapterOptions + RedisStreamsClient from questpie/adapters/redis-streams, CloudflareRealtimeAdapterOptions from questpie/adapters/cloudflare-realtime.
Build your own transport
RealtimeAdapter is a four-method contract, implement it to back realtime on any pub/sub system (NATS, Kafka, a cloud queue):
interface RealtimeAdapter {
start(): Promise<void>;
stop(): Promise<void>;
notify(event: RealtimeChangeEvent): Promise<void>; // broadcast a change
subscribe(handler: (notice: RealtimeNotice) => void): () => void; // receive changes; returns unsubscribe
}notify publishes a change to all instances; subscribe registers a handler that fires on every received notice (the service uses it to trigger a drain()) and returns an unsubscribe function. You only need to move the four notice fields (seq, resourceType, resource, operation), never the payload. Wire your instance as realtime.adapter.
Related
- Realtime, the client API this transport powers:
live(),liveIter(), and the TanStack Query{ realtime: true }flag. - Configuration, where
realtimelives inQuestpieConfig, alongside the other adapters. - Adapters, the hub listing every adapter kind and the swap-the-backend model.
- Building a plugin, implement
RealtimeAdapter(or any adapter contract) yourself. - Multi-tenancy, how access rules re-run per subscriber keep tenant data isolated over realtime.
- Runnable example:
examples/tanstack-barbershop, a TanStack Start app using realtime via the typed client and TanStack Query bindings.
Search adapter
Add full-text and semantic search to your collections with one adapter in questpie.config.ts, pg_trgm lexical search out of the box, pgvector + embeddings when you want meaning-based recall.
Storage adapter
Point your file uploads at the local disk, S3, R2, or any of 40+ Files SDK backends with one config block, and get a typed Files instance on app.storage, automatic signed URLs for private files, and direct-to-storage uploads, all without touching your collections.