Skip to content

Streams

Streams are append-only event logs. Use them for activity feeds, audit trails, change data capture, or any scenario where you need an ordered sequence of events.

Append

ts
const eventId = await ctx.stream.append("todo.activity", "todo.created", {
  todoId: 42,
  title: "Ship new feature",
  assignee: "alice@example.com",
});
// eventId: 1 (sequential)
ParameterTypeDescription
streamstringStream name
eventstringEvent type/name
payloadunknownEvent data (serialized as JSON)

Returns the sequential event ID.

Read

ts
const events = await ctx.stream.read("todo.activity");

With options

ts
// Read with limit
const recent = await ctx.stream.read("todo.activity", { limit: 50 });

// Read events after a specific ID (pagination)
const newer = await ctx.stream.read("todo.activity", {
  sinceId: lastSeenId,
  limit: 100,
});

Event shape

ts
interface ChimpbaseStreamEvent<TPayload> {
  id: number;          // sequential event ID
  stream: string;      // stream name
  event: string;       // event type
  payload: TPayload;   // event data
  createdAt: string;   // ISO 8601 timestamp
}

Common Patterns

Activity feed

ts
// Write
subscription("todo.created", async (ctx, todo) => {
  await ctx.stream.append("todo.activity", "todo.created", {
    todoId: todo.id,
    title: todo.title,
  });
}, { idempotent: true, name: "streamTodoCreated" });

// Read
const listActivity = action("listActivity", async (ctx, input: { sinceId?: number }) => {
  return await ctx.stream.read("todo.activity", {
    sinceId: input.sinceId,
    limit: 50,
  });
});

Audit trail

ts
await ctx.stream.append("audit", "user.login", {
  userId: user.id,
  ip: request.headers.get("x-forwarded-for"),
  timestamp: new Date().toISOString(),
});

Streams vs. Subscriptions

StreamsSubscriptions
PurposePersistent event logReactive event handler
StorageEvents stored permanentlyEvents are transient
ReadingPull-based (read on demand)Push-based (handler invoked)
Use caseAudit trail, activity feedTrigger side effects