Skip to content

Producer Streams

Producer streams let the server emit a sequence of output batches from a single request. Use them for large result sets, real-time data, or any pattern where the output length is not known upfront.

Basic usage

import { Protocol, int32 } from "vgi-rpc-typescript";
const protocol = new Protocol("MyService");
protocol.producer<{ limit: number; current: number }>("count", {
params: { limit: int32 },
outputSchema: { n: int32, n_squared: int32 },
init: async ({ limit }) => ({ limit, current: 0 }),
produce: async (state, out) => {
if (state.current >= state.limit) {
out.finish();
return;
}
out.emitRow({ n: state.current, n_squared: state.current ** 2 });
state.current++;
},
doc: "Count from 0 to limit-1.",
});

How it works

A producer method has two phases:

  1. init — receives the request parameters, returns the initial state object
  2. produce — called repeatedly with the current state and an OutputCollector

The produce function must either:

  • Emit one data batch via out.emit() or out.emitRow(), or
  • Call out.finish() to end the stream

Mutate state in-place between calls to track progress.

Type-safe state

The generic <S> parameter on protocol.producer<S>() threads the state type from init to produce:

interface CountState {
limit: number;
current: number;
batchSize: number;
}
protocol.producer<CountState>("count", {
params: { limit: int32, batch_size: int32 },
outputSchema: { n: int32, n_squared: int32 },
init: async ({ limit, batch_size }) => ({
limit,
current: 0,
batchSize: batch_size,
}),
produce: async (state, out) => {
// state is typed as CountState
if (state.current >= state.limit) {
out.finish();
return;
}
out.emitRow({ n: state.current, n_squared: state.current ** 2 });
state.current++;
},
defaults: { batch_size: 1 },
});

Multi-row batches

For efficiency, emit multiple rows per batch instead of one at a time:

produce: async (state, out) => {
if (state.current >= state.limit) {
out.finish();
return;
}
const remaining = state.limit - state.current;
const count = Math.min(state.batchSize, remaining);
const nValues: number[] = [];
const sqValues: number[] = [];
for (let i = 0; i < count; i++) {
const n = state.current + i;
nValues.push(n);
sqValues.push(n * n);
}
state.current += count;
out.emit({ n: nValues, n_squared: sqValues });
},

Stream headers

Producer methods can optionally send a one-time header before the data stream. See Stream Headers.