JSandy Logo

JSandy

docs

GitHubStar on GitHub0

Introduction

  • Key Features

Getting Started

  • First Steps
  • Local Development
  • Environment Variables

Backend

  • AppRouter
  • Routers
  • Procedures
  • API Client
  • Middleware
  • WebSockets
  • Performance
  • Documenting
  • Pub/Sub Adapters

Deploy

  • Vercel
  • Cloudflare Workers
Loading...

No sections on this page.

Pub/Sub Adapters

JSandy’s WebSocket layer is provider‑agnostic. You can bring your own real‑time backend by implementing a tiny adapter that satisfies a simple interface and configuring it on the router.

This page shows:

  • The PubSubAdapter interface
  • How to configure the adapter on your router
  • Ready‑to‑use examples:
    • Upstash REST (Cloudflare Workers‑friendly)
    • In‑memory (local dev)
    • Cloudflare Pub/Sub (Kafka API) — conceptual outline
    • Native Redis (Node‑only)

JSandy’s WebSocket server is designed to run on Cloudflare Workers. If your adapter needs TCP (e.g., native Redis), that won’t work on Workers — pick an HTTP/SSE compatible provider (e.g., Upstash REST) for Workers, or run your WebSocket server on a Node runtime.


The Adapter Interface

JSandy expects a minimal adapter with publish/subscribe:

export interface PubSubAdapter {
  /**
   * Publish a message to a topic/room.
   */
  publish(topic: string, payload: unknown): Promise<void>;
 
  /**
   * Subscribe to a topic/room. Call `onMessage` for every payload.
   * Must respect `options.signal` for cancellation.
   */
  subscribe(
    topic: string,
    onMessage: (payload: unknown) => void,
    options?: {
      signal?: AbortSignal;
      onOpen?: () => void;
      onError?: (error: unknown) => void;
    },
  ): Promise<void>;
}
export interface PubSubAdapter {
  /**
   * Publish a message to a topic/room.
   */
  publish(topic: string, payload: unknown): Promise<void>;
 
  /**
   * Subscribe to a topic/room. Call `onMessage` for every payload.
   * Must respect `options.signal` for cancellation.
   */
  subscribe(
    topic: string,
    onMessage: (payload: unknown) => void,
    options?: {
      signal?: AbortSignal;
      onOpen?: () => void;
      onError?: (error: unknown) => void;
    },
  ): Promise<void>;
}

Payload convention:

  • JSandy sends/receives ["eventName", data] tuples over pub/sub.
  • Your adapter should pass the parsed tuple back to JSandy’s socket.

Wiring the Adapter into the Router

Set an adapter per request via router.config({ getPubSubAdapter }):

TypeScript

server/jsandy.ts

import { jsandy, type PubSubAdapter } from "@jsandy/rpc"
 
export const j = jsandy.init()
 
// Choose an adapter (see examples below)
function getPubSubAdapter(_c: unknown): PubSubAdapter {
  // return new MyAdapter(...)
  throw new Error("Implement getPubSubAdapter")
}
 
export const api = j.router().config({ getPubSubAdapter })
import { jsandy, type PubSubAdapter } from "@jsandy/rpc"
 
export const j = jsandy.init()
 
// Choose an adapter (see examples below)
function getPubSubAdapter(_c: unknown): PubSubAdapter {
  // return new MyAdapter(...)
  throw new Error("Implement getPubSubAdapter")
}
 
export const api = j.router().config({ getPubSubAdapter })

That’s it — all WebSocket procedures on this router will use your adapter.


Example 1: Upstash REST (Cloudflare Workers‑friendly)

Upstash exposes Redis pub/sub over HTTP REST + SSE, which works on Workers.

TypeScript

server/jsandy.ts

import { jsandy, UpstashRestPubSub } from "@jsandy/rpc"
import { env } from "hono/adapter"
 
export const j = jsandy.init()
 
export const api = j.router().config({
  getPubSubAdapter: (c) => {
    const { UPSTASH_REDIS_REST_URL, UPSTASH_REDIS_REST_TOKEN } = env(c)
    return new UpstashRestPubSub(UPSTASH_REDIS_REST_URL, UPSTASH_REDIS_REST_TOKEN)
  },
})
import { jsandy, UpstashRestPubSub } from "@jsandy/rpc"
import { env } from "hono/adapter"
 
export const j = jsandy.init()
 
export const api = j.router().config({
  getPubSubAdapter: (c) => {
    const { UPSTASH_REDIS_REST_URL, UPSTASH_REDIS_REST_TOKEN } = env(c)
    return new UpstashRestPubSub(UPSTASH_REDIS_REST_URL, UPSTASH_REDIS_REST_TOKEN)
  },
})

Cloudflare secrets (if you choose Upstash):

wrangler secret put UPSTASH_REDIS_REST_URL
wrangler secret put UPSTASH_REDIS_REST_TOKEN
wrangler secret put UPSTASH_REDIS_REST_URL
wrangler secret put UPSTASH_REDIS_REST_TOKEN

Example 2: In‑Memory Adapter (Local Dev)

Great for quick local testing — no external infra required.

TypeScript

server/in-memory-pubsub.ts

import type { PubSubAdapter } from "@jsandy/rpc"
 
export class InMemoryPubSub implements PubSubAdapter {
  private subs = new Map<string, Set<(p: unknown) => void>>()
 
  async publish(topic: string, payload: unknown) {
    const set = this.subs.get(topic)
    if (!set) return
    for (const cb of set) cb(payload)
  }
 
  async subscribe(
    topic: string,
    onMessage: (payload: unknown) => void,
    options?: { signal?: AbortSignal; onOpen?: () => void; onError?: (e: unknown) => void },
  ) {
    let set = this.subs.get(topic)
    if (!set) {
      set = new Set()
      this.subs.set(topic, set)
    }
    set.add(onMessage)
    options?.onOpen?.()
 
    // Clean up when cancelled
    options?.signal?.addEventListener(
      "abort",
      () => {
        set!.delete(onMessage)
      },
      { once: true },
    )
  }
}
import type { PubSubAdapter } from "@jsandy/rpc"
 
export class InMemoryPubSub implements PubSubAdapter {
  private subs = new Map<string, Set<(p: unknown) => void>>()
 
  async publish(topic: string, payload: unknown) {
    const set = this.subs.get(topic)
    if (!set) return
    for (const cb of set) cb(payload)
  }
 
  async subscribe(
    topic: string,
    onMessage: (payload: unknown) => void,
    options?: { signal?: AbortSignal; onOpen?: () => void; onError?: (e: unknown) => void },
  ) {
    let set = this.subs.get(topic)
    if (!set) {
      set = new Set()
      this.subs.set(topic, set)
    }
    set.add(onMessage)
    options?.onOpen?.()
 
    // Clean up when cancelled
    options?.signal?.addEventListener(
      "abort",
      () => {
        set!.delete(onMessage)
      },
      { once: true },
    )
  }
}

Wire it up:

TypeScript

server/jsandy.ts

import { jsandy } from "@jsandy/rpc"
import { InMemoryPubSub } from "./in-memory-pubsub"
 
export const j = jsandy.init()
 
export const api = j.router().config({
  getPubSubAdapter: () => new InMemoryPubSub(),
})
import { jsandy } from "@jsandy/rpc"
import { InMemoryPubSub } from "./in-memory-pubsub"
 
export const j = jsandy.init()
 
export const api = j.router().config({
  getPubSubAdapter: () => new InMemoryPubSub(),
})

Note: This is single‑process only; it won’t scale across multiple instances.


Example 3: Cloudflare Pub/Sub (Kafka API) — Conceptual Outline

Cloudflare Pub/Sub provides a Kafka‑compatible API. In Workers, you’d typically integrate through a Kafka client that supports the Workers runtime or a Cloudflare‑provided binding.

This outline shows the shape — consult Cloudflare’s documentation for a production‑ready implementation and authentication details.

TypeScript

server/cf-pubsub.ts

import type { PubSubAdapter } from "@jsandy/rpc"
 
/**
 * Conceptual Cloudflare Pub/Sub adapter (Kafka API)
 * Note: This is illustrative — actual producer/consumer setup depends on the client you use
 * and how you configure bindings/credentials in `wrangler.toml`.
 */
export class CloudflareKafkaPubSub implements PubSubAdapter {
  constructor(
    private opts: {
      // e.g., SASL/SSL config, broker URLs, topic prefix, etc.
      bootstrapServers: string[]
      username: string
      password: string
      topicPrefix?: string
    },
  ) {}
 
  async publish(topic: string, payload: unknown): Promise<void> {
    const fullTopic = `${this.opts.topicPrefix || ""}${topic}`
    // 1) Create/Reuse a producer
    // 2) Serialize payload (JSON.stringify)
    // 3) send({ topic: fullTopic, value: JSON.stringify(payload) })
    // PSEUDOCODE:
    // await producer.send([{ topic: fullTopic, value: JSON.stringify(payload) }])
  }
 
  async subscribe(
    topic: string,
    onMessage: (payload: unknown) => void,
    options?: { signal?: AbortSignal; onOpen?: () => void; onError?: (e: unknown) => void },
  ): Promise<void> {
    const fullTopic = `${this.opts.topicPrefix || ""}${topic}`
    // 1) Create/Reuse a consumer
    // 2) subscribe({ topic: fullTopic })
    // 3) on each message: JSON.parse and call onMessage(parsed)
    // 4) options?.onOpen?.() once connected
    // 5) Respect options.signal to close the consumer gracefully
 
    // PSEUDOCODE:
    // await consumer.subscribe({ topic: fullTopic })
    // options?.onOpen?.()
    // const abortHandler = () => consumer.close()
    // options?.signal?.addEventListener("abort", abortHandler, { once: true })
    // await consumer.run({
    //   eachMessage: async ({ message }) => {
    //     try { onMessage(JSON.parse(message.value.toString())) } catch {}
    //   },
    // })
  }
}
import type { PubSubAdapter } from "@jsandy/rpc"
 
/**
 * Conceptual Cloudflare Pub/Sub adapter (Kafka API)
 * Note: This is illustrative — actual producer/consumer setup depends on the client you use
 * and how you configure bindings/credentials in `wrangler.toml`.
 */
export class CloudflareKafkaPubSub implements PubSubAdapter {
  constructor(
    private opts: {
      // e.g., SASL/SSL config, broker URLs, topic prefix, etc.
      bootstrapServers: string[]
      username: string
      password: string
      topicPrefix?: string
    },
  ) {}
 
  async publish(topic: string, payload: unknown): Promise<void> {
    const fullTopic = `${this.opts.topicPrefix || ""}${topic}`
    // 1) Create/Reuse a producer
    // 2) Serialize payload (JSON.stringify)
    // 3) send({ topic: fullTopic, value: JSON.stringify(payload) })
    // PSEUDOCODE:
    // await producer.send([{ topic: fullTopic, value: JSON.stringify(payload) }])
  }
 
  async subscribe(
    topic: string,
    onMessage: (payload: unknown) => void,
    options?: { signal?: AbortSignal; onOpen?: () => void; onError?: (e: unknown) => void },
  ): Promise<void> {
    const fullTopic = `${this.opts.topicPrefix || ""}${topic}`
    // 1) Create/Reuse a consumer
    // 2) subscribe({ topic: fullTopic })
    // 3) on each message: JSON.parse and call onMessage(parsed)
    // 4) options?.onOpen?.() once connected
    // 5) Respect options.signal to close the consumer gracefully
 
    // PSEUDOCODE:
    // await consumer.subscribe({ topic: fullTopic })
    // options?.onOpen?.()
    // const abortHandler = () => consumer.close()
    // options?.signal?.addEventListener("abort", abortHandler, { once: true })
    // await consumer.run({
    //   eachMessage: async ({ message }) => {
    //     try { onMessage(JSON.parse(message.value.toString())) } catch {}
    //   },
    // })
  }
}

Wire it up (with your config/secrets):

TypeScript

server/jsandy.ts

import { jsandy } from "@jsandy/rpc"
import { CloudflareKafkaPubSub } from "./cf-pubsub"
 
export const j = jsandy.init()
 
export const api = j.router().config({
  getPubSubAdapter: () =>
    new CloudflareKafkaPubSub({
      bootstrapServers: ["<BROKER_1>", "<BROKER_2>"],
      username: "<SASL_USERNAME>",
      password: "<SASL_PASSWORD>",
      topicPrefix: "myapp.",
    }),
})
import { jsandy } from "@jsandy/rpc"
import { CloudflareKafkaPubSub } from "./cf-pubsub"
 
export const j = jsandy.init()
 
export const api = j.router().config({
  getPubSubAdapter: () =>
    new CloudflareKafkaPubSub({
      bootstrapServers: ["<BROKER_1>", "<BROKER_2>"],
      username: "<SASL_USERNAME>",
      password: "<SASL_PASSWORD>",
      topicPrefix: "myapp.",
    }),
})

Important: The code above is intentionally a template. Use a Kafka client compatible with Workers and follow Cloudflare’s official guidance for bindings, credentials, and connectivity.


Example 4: Native Redis (Node‑only)

This approach uses a TCP Redis client (e.g., redis or ioredis) and thus requires a Node environment (not Cloudflare Workers).

TypeScript

server/native-redis-pubsub.ts

import type { PubSubAdapter } from "@jsandy/rpc"
import { createClient, type RedisClientType } from "redis" // Node-only
 
export class NativeRedisPubSub implements PubSubAdapter {
  private pub!: RedisClientType
  private sub!: RedisClientType
 
  constructor(private url: string) {}
 
  private async ensure() {
    if (!this.pub) {
      this.pub = createClient({ url: this.url })
      this.sub = createClient({ url: this.url })
      await Promise.all([this.pub.connect(), this.sub.connect()])
    }
  }
 
  async publish(topic: string, payload: unknown): Promise<void> {
    await this.ensure()
    await this.pub.publish(topic, JSON.stringify(payload))
  }
 
  async subscribe(
    topic: string,
    onMessage: (payload: unknown) => void,
    options?: { signal?: AbortSignal; onOpen?: () => void; onError?: (e: unknown) => void },
  ): Promise<void> {
    await this.ensure()
    try {
      await this.sub.subscribe(topic, (msg) => {
        try {
          onMessage(JSON.parse(msg))
        } catch {
          // ignore malformed
        }
      })
      options?.onOpen?.()
    } catch (e) {
      options?.onError?.(e)
    }
 
    // Clean up on abort
    options?.signal?.addEventListener(
      "abort",
      async () => {
        try {
          await this.sub.unsubscribe(topic)
        } catch {}
      },
      { once: true },
    )
  }
}
import type { PubSubAdapter } from "@jsandy/rpc"
import { createClient, type RedisClientType } from "redis" // Node-only
 
export class NativeRedisPubSub implements PubSubAdapter {
  private pub!: RedisClientType
  private sub!: RedisClientType
 
  constructor(private url: string) {}
 
  private async ensure() {
    if (!this.pub) {
      this.pub = createClient({ url: this.url })
      this.sub = createClient({ url: this.url })
      await Promise.all([this.pub.connect(), this.sub.connect()])
    }
  }
 
  async publish(topic: string, payload: unknown): Promise<void> {
    await this.ensure()
    await this.pub.publish(topic, JSON.stringify(payload))
  }
 
  async subscribe(
    topic: string,
    onMessage: (payload: unknown) => void,
    options?: { signal?: AbortSignal; onOpen?: () => void; onError?: (e: unknown) => void },
  ): Promise<void> {
    await this.ensure()
    try {
      await this.sub.subscribe(topic, (msg) => {
        try {
          onMessage(JSON.parse(msg))
        } catch {
          // ignore malformed
        }
      })
      options?.onOpen?.()
    } catch (e) {
      options?.onError?.(e)
    }
 
    // Clean up on abort
    options?.signal?.addEventListener(
      "abort",
      async () => {
        try {
          await this.sub.unsubscribe(topic)
        } catch {}
      },
      { once: true },
    )
  }
}

Wire it up (Node server):

TypeScript

server/jsandy.ts

import { jsandy } from "@jsandy/rpc"
import { NativeRedisPubSub } from "./native-redis-pubsub"
 
export const j = jsandy.init()
 
export const api = j.router().config({
  getPubSubAdapter: () => new NativeRedisPubSub(process.env.REDIS_URL!),
})
import { jsandy } from "@jsandy/rpc"
import { NativeRedisPubSub } from "./native-redis-pubsub"
 
export const j = jsandy.init()
 
export const api = j.router().config({
  getPubSubAdapter: () => new NativeRedisPubSub(process.env.REDIS_URL!),
})

Tips & Best Practices

  • Always honor options.signal in subscribe() to avoid leaking open streams.
  • Validate and JSON‑serialize payloads. JSandy expects ["event", data].
  • For Workers, choose providers with HTTP/SSE/Web APIs (e.g., Upstash REST).
  • For Node runtimes, native clients (Redis, NATS, Kafka) are fine — just ensure your hosting platform supports persistent connections.

Next Steps

  • See the full WebSockets guide: /docs/backend/websockets
  • Build a simple chat with room broadcasts using your adapter
  • Add metrics/logging around publish/subscribe for observability
prevDocumentingnextVercel