QUESTPIE
Adapters

Realtime adapter

The server-side transport that powers live queries, turn on realtime in your config and changes are recorded to an outbox, then fanned out across every app instance over pg_notify, Redis streams, or Cloudflare Durable Objects.

The realtime adapter is the server-side transport behind QUESTPIE's live queries. Set realtime in your runtime config and every write is recorded to a change outbox table; the adapter then signals each app instance to re-run the affected subscriptions and push fresh snapshots over SSE. With no adapter the service polls that outbox on a timer, fine for a single process. Add pgNotifyAdapter() or redisStreamsAdapter({ client }) and changes propagate the instant they happen, across every horizontally-scaled instance.

This page covers the server side: the realtime config, the outbox/poll model, and each transport adapter's options. The client API, live(), liveIter(), and the TanStack Query { realtime: true } flag, is taught on Realtime.

Prerequisites

Read Configuration (where adapters are wired in questpie.config.ts) and Realtime (the client live() API this transport serves).

What it does

  • Records every change to an outbox. Each create/update/delete (and bulk variant) appends a row to the questpie_realtime_log table, a durable, ordered change log keyed by an auto-incrementing seq.
  • Fans out across instances. The adapter broadcasts a tiny notice ("seq N on collection posts changed") to every process, which then drains the outbox and pushes snapshots to its own SSE subscribers.
  • Zero-config default. Turn realtime on with true and you get a working transport, polling on a non-Postgres setup, or an auto-wired pg_notify listener when a Postgres connection is available.
  • Push transports for scale. pgNotifyAdapter rides the database you already run; redisStreamsAdapter uses a Redis consumer group; cloudflareRealtimeAdapter is backed by a Durable Object.
  • Privacy-safe by design. Only a RealtimeNotice crosses the wire, never row data. Each subscriber re-runs its own access-controlled query to build its snapshot, so one user's data can't leak through the transport.
  • Swap with one config line. The transport is a single realtime.adapter value, no app-code change to move from polling to Redis.

Quick start

Realtime is off until you turn it on in the server config. Set realtime in runtimeConfig and the app starts recording changes and serving the POST /realtime SSE stream:

src/questpie/server/questpie.config.ts
import { runtimeConfig } from "questpie/app";

import env from "./env"; // declared + validated in env.ts, see Environment

export default runtimeConfig({
  db: { url: env.DATABASE_URL },
  realtime: true,
});

That's the whole server requirement. Subscribe from the client with client.collections.posts.live(...), see Realtime for the client API.

No `realtime` config = no live mode

live() / liveIter() / { realtime: true } all need realtime set in the server config. The client always wires client.realtime, but without server realtime the POST /realtime endpoint isn't served and subscriptions error out (via onError / a thrown generator) rather than silently degrading. Turn it on before relying on live queries.

How the transport works

The flow, from a write to a pushed snapshot:

StepWhat happensSource
1. Write commitsA create/update/delete appends one row to questpie_realtime_log (appendChange), { seq, resourceType, resource, operation, recordId?, locale?, payload }.realtime/service.ts:217; realtime/collection.ts:9
2. SignalThe adapter notify(event) broadcasts a minimal RealtimeNotice ({ seq, resourceType, resource, operation }) to every instance.realtime/adapter.ts:9; realtime/service.ts:249
3. DrainEach instance reacts to the notice (or a poll tick) by drain()ing new outbox rows past its watermark.realtime/service.ts:418-426
4. Re-run + pushFor each affected subscription, the server re-runs the query under the subscriber's session and pushes the fresh snapshot over its POST /realtime SSE stream.realtime/service.ts (drain → listeners)

The adapter is only the signal layer (steps 2-3). The outbox (step 1) and the access-controlled re-run (step 4) happen regardless of which adapter you pick. This is why the notice carries no payload, the snapshot is always rebuilt server-side per subscriber.

The notice is a cue, not the data

A RealtimeNotice is Pick<RealtimeChangeEvent, "seq" | "resourceType" | "resource" | "operation">, just enough to say something matching changed. The full event (with payload) lives in the outbox; subscribers never receive another user's rows over the transport.

RealtimeConfig, the options

Use realtime: true to turn on the default transport. Use an object when you want to configure the transport or tune the SSE/outbox behavior:

interface RealtimeConfig {
  adapter?: RealtimeAdapter;       // transport (pg_notify, redis, …). Omit → poll / auto pg_notify
  pollIntervalMs?: number;         // default 2000, drain interval when running pollless (no adapter)
  batchSize?: number;              // default 500, max outbox rows read per drain
  retentionDays?: number;          // optional time-based outbox safety window
  keepAliveIntervalMs?: number;    // default 8000, SSE keep-alive ping interval
}
OptionDefaultWhat it controls
adapter(none)The transport that broadcasts change notices. Omit it for the poll / auto-pg_notify default (below).
pollIntervalMs2000How often the service drains the outbox when there is no adapter (defaults to 0 if you omit it while an adapter is set). With an adapter, drains are notice-driven, so this is ignored, the poll timer only runs on the no-adapter path.
batchSize500Max outbox rows pulled per drain. Raise it for very high write volume.
retentionDays(none)Adds a time-based cleanup window. The service always does watermark cleanup (it deletes rows below the min consumed seq across active subscribers); retentionDays is an extra safety net for rows no subscriber will ever read.
keepAliveIntervalMs8000Interval between ping keep-alive events on each POST /realtime SSE stream.

Keep `keepAliveIntervalMs` under your idle timeout

The default 8000 ms is deliberately under Bun's default 10s idleTimeout, so SSE streams survive on an untuned Bun.serve. Behind a proxy with a shorter read timeout (some default to 30-60s, but custom ones can be lower), drop it further so the connection never goes idle long enough to be culled.

The default: poll vs auto pg_notify

Omitting adapter does not always mean naive polling. The service resolves the transport lazily on the first subscription (in ensureStarted(), triggered from subscribe(), not at process boot), like this:

  1. Explicit adapter set → use it.
  2. No adapter, but a Postgres connection string is available → the service lazily constructs a pgNotifyAdapter on the questpie_realtime channel for you. Most Postgres apps get push delivery from realtime: true with no extra config.
  3. No adapter and no Postgres connection → fall back to polling the outbox every pollIntervalMs (default 2s).

Polling is real, just laggy

The poll path is correct, not a downgrade in behavior, it drains the same outbox and pushes the same access-controlled snapshots, just on a 2s timer instead of on-change. It's a fine default for a single instance and local dev. For multiple processes you want a real adapter (next section) so a write on instance A reaches a subscriber on instance B.

One instance, multiple processes need a shared transport

The outbox is shared (it's a DB table), but the poll timer and the auto-pg_notify listener are per process. Polling means each instance independently notices changes on its own timer, workable but laggy and DB-chatty at scale. An explicit pgNotifyAdapter / redisStreamsAdapter fans a single notice out to all instances the moment a change lands. Reach for one once you run more than one process.

pgNotifyAdapter, Postgres LISTEN/NOTIFY

The simplest push transport: it broadcasts change notices over a Postgres channel with pg_notify, and every instance LISTENs. No infrastructure beyond the database you already run.

src/questpie/server/questpie.config.ts
import { runtimeConfig } from "questpie/app";
import { pgNotifyAdapter } from "questpie/adapters/pg-notify";

import env from "./env";

export default runtimeConfig({
  db: { url: env.DATABASE_URL },
  realtime: {
    adapter: pgNotifyAdapter({
      connectionString: env.DATABASE_URL,
      // channel: "questpie_realtime", // default
    }),
  },
});

PgNotifyAdapterOptions is { channel?, client?, connection?, connectionString? }. You must supply one connection source.

OptionTypeDefaultNotes
channelstring"questpie_realtime"Postgres channel name. Validated against /^[a-zA-Z0-9_]+$/, an invalid name throws at construction.
clientpg.ClientnoneAn existing pg client to reuse. Not .end()-ed by the adapter on stop (you own its lifecycle).
connectionpg.ClientConfignoneA pg connection config; the adapter constructs and owns the client.
connectionStringstringnoneA connection URL; the adapter constructs and owns the client.

Behavior:

  • Needs the pg package. It's an optional peer dependency (pg ^8.13.1), not bundled, the adapter await import("pg")s it lazily. Install pg if it isn't already in your app.
  • One connection source required. Pass exactly one of client / connection / connectionString, or the adapter throws "PgNotifyAdapter requires a pg Client or connection config" on start.
  • Owned vs borrowed client. A connection/connectionString client is owned and closed on stop(); a client you pass is left open (ownsClient = false).
  • Uses LISTEN/UNLISTEN + pg_notify($1, $2). The notice JSON ({ seq, resourceType, resource, operation }) is the channel payload.

You usually don't need to write this

On a Postgres app, realtime: true already auto-wires pg_notify on the default channel (see the default above). Configure pgNotifyAdapter explicitly only when you want a non-default channel, want to reuse an existing pg client, or are pointing realtime at a different database than your main db.

redisStreamsAdapter, Redis Streams

For Redis-backed fan-out across many instances, use Redis Streams with a consumer group. The client is required, QUESTPIE doesn't ship a Redis driver, so you bring your own (a small command-shaped object modeled on node-redis):

src/questpie/server/questpie.config.ts
import { runtimeConfig } from "questpie/app";
import { redisStreamsAdapter } from "questpie/adapters/redis-streams";
import { createClient } from "redis";

import env from "./env";

const redis = createClient({ url: env.REDIS_URL });
await redis.connect();

export default runtimeConfig({
  db: { url: env.DATABASE_URL },
  realtime: {
    adapter: redisStreamsAdapter({
      client: redis,
      // stream: "questpie:realtime",   // default
      // group: "questpie-realtime",    // default
      // consumer: "consumer-<uuid>",   // default: random per process
      // blockMs: 5000, batchSize: 100, // defaults
    }),
  },
});

RedisStreamsAdapterOptions is { client (required), stream?, group?, consumer?, blockMs?, batchSize? }.

OptionTypeDefaultNotes
clientRedisStreamsClientnone requiredYour Redis client. Must implement xAdd + xReadGroup; xGroupCreate / xAck / quit / disconnect are optional.
streamstring"questpie:realtime"The Redis stream key notices are XADD-ed to.
groupstring"questpie-realtime"Consumer group name. Created with MKSTREAM; a BUSYGROUP (group already exists) is swallowed.
consumerstringconsumer-<random uuid>Per-process consumer id. The random default lets every instance read the stream without stealing each other's messages.
blockMsnumber5000How long XREADGROUP blocks waiting for new messages.
batchSizenumber100COUNT per XREADGROUP call.

Behavior:

  • Uses XADD / XREADGROUP / XACK. notify() XADDs the notice fields (seq, resourceType, resource, operation) as strings; the read loop acknowledges each delivered message.
  • Resilient read loop. On a read error it backs off 500ms and retries while running; stop() ends the loop.
  • Driver-agnostic. Any client matching the RedisStreamsClient shape works, the type is modeled on node-redis command names but isn't tied to it.

`RedisStreamsClient` is a structural contract

type RedisStreamsClient = {
  xAdd(stream, id, fields): Promise<string>;
  xGroupCreate?(stream, group, id, options?: { MKSTREAM?: boolean }): Promise<unknown>;
  xReadGroup(...args: any[]): Promise<unknown>;
  xAck?(stream, group, id | id[]): Promise<unknown>;
  quit?(): Promise<void>;
  disconnect?(): void;
};

Only xAdd and xReadGroup are mandatory.

cloudflareRealtimeAdapter, Durable Objects

On Cloudflare Workers, realtime is backed by a Durable Object: there's no long-lived process to hold subscriptions or a poll timer, so the DO is the fan-out point. cloudflareRealtimeAdapter({ namespace }) proxies subscribe/notify through it.

src/questpie/server/questpie.config.ts
import { runtimeConfig } from "questpie/app";
import { cloudflareRealtimeAdapter } from "questpie/adapters/cloudflare-realtime";

export default runtimeConfig({
  realtime: {
    adapter: cloudflareRealtimeAdapter({
      // A Durable Object namespace binding, or a zero-arg function that
      // returns one (so you can resolve it lazily from your Worker env).
      namespace: () => getEnv().QUESTPIE_REALTIME,
      // objectName: "questpie-realtime",  // default DO instance name
      // hubPath: "/__questpie/realtime",   // default internal hub path
    }),
  },
});

CloudflareRealtimeAdapterOptions is { namespace (required), objectName?, hubPath? }. The namespace is a Durable Object namespace binding, or a zero-argument function that returns one (resolved lazily when the adapter first needs it, so you can defer reading it out of your Worker env).

OptionTypeDefaultNotes
namespaceCloudflareDurableObjectNamespaceInputnone requiredThe DO namespace binding, or () => namespace.
objectNamestring"questpie-realtime"Name of the DO instance (idFromName) all notices route through.
hubPathstring"/__questpie/realtime"Internal hub path; the adapter derives ${hubPath}/subscribe and ${hubPath}/notify.

This adapter carries a runtime: "cloudflare" discriminator so the Cloudflare fetch/queue handlers detect it, and it's wired together with the createCloudflareRealtimeDurableObjectHandler export (from questpie/adapters/cloudflare) that implements the DO itself.

Cloudflare realtime is a deployment-specific setup

Unlike pg_notify / Redis, this adapter requires a Durable Object class bound in your wrangler config plus the DO handler. It's part of the Cloudflare Workers deployment story rather than a drop-in line. The other two adapters need no Workers-specific wiring.

The outbox table

The change log lives in one Drizzle table, questpie_realtime_log, created by codegen alongside your schema (it's part of the core module). You rarely touch it directly, but it's worth knowing it exists:

ColumnNotes
seqbigserial primary key, the monotonic ordering every subscriber watermarks against.
resource_type"collection" or "global".
resourceThe collection/global name.
operationcreate / update / delete / bulk_update / bulk_delete.
record_id, localeNullable; the specific record + locale when applicable.
payloadjsonb (default {}), the change detail, kept server-side (never broadcast).
created_atInsert timestamp; powers retentionDays cleanup.

TypeScript

The transport contract and config types are exported from questpie (and questpie/realtime):

import type {
  RealtimeAdapter,        // the transport contract you implement for a custom adapter
  RealtimeConfig,         // the `realtime` config shape
  RealtimeChangeEvent,    // a full outbox event
  RealtimeNotice,         // the minimal broadcast notice
  RealtimeOperation,      // "create" | "update" | "delete" | "bulk_update" | "bulk_delete"
  RealtimeResourceType,   // "collection" | "global"
} from "questpie/realtime";

Each adapter's option type ships from its own entry, PgNotifyAdapterOptions from questpie/adapters/pg-notify, RedisStreamsAdapterOptions + RedisStreamsClient from questpie/adapters/redis-streams, CloudflareRealtimeAdapterOptions from questpie/adapters/cloudflare-realtime.

Build your own transport

RealtimeAdapter is a four-method contract, implement it to back realtime on any pub/sub system (NATS, Kafka, a cloud queue):

interface RealtimeAdapter {
  start(): Promise<void>;
  stop(): Promise<void>;
  notify(event: RealtimeChangeEvent): Promise<void>;          // broadcast a change
  subscribe(handler: (notice: RealtimeNotice) => void): () => void; // receive changes; returns unsubscribe
}

notify publishes a change to all instances; subscribe registers a handler that fires on every received notice (the service uses it to trigger a drain()) and returns an unsubscribe function. You only need to move the four notice fields (seq, resourceType, resource, operation), never the payload. Wire your instance as realtime.adapter.

  • Realtime, the client API this transport powers: live(), liveIter(), and the TanStack Query { realtime: true } flag.
  • Configuration, where realtime lives in QuestpieConfig, alongside the other adapters.
  • Adapters, the hub listing every adapter kind and the swap-the-backend model.
  • Building a plugin, implement RealtimeAdapter (or any adapter contract) yourself.
  • Multi-tenancy, how access rules re-run per subscriber keep tenant data isolated over realtime.
  • Runnable example: examples/tanstack-barbershop, a TanStack Start app using realtime via the typed client and TanStack Query bindings.

On this page