Producer Streams
Producer streams let the server emit a sequence of output batches from a single request. Use them for large result sets, real-time data, or any pattern where the output length is not known upfront.
Basic usage
Section titled “Basic usage”import { Protocol, int32 } from "@query-farm/vgi-rpc";
const protocol = new Protocol("MyService");
protocol.producer<{ limit: number; current: number }>("count", { params: { limit: int32 }, outputSchema: { n: int32, n_squared: int32 }, init: async ({ limit }) => ({ limit, current: 0 }), produce: async (state, out) => { if (state.current >= state.limit) { out.finish(); return; } out.emitRow({ n: state.current, n_squared: state.current ** 2 }); state.current++; }, doc: "Count from 0 to limit-1.",});How it works
Section titled “How it works”A producer method has two phases:
init— receives the request parameters, returns the initial state objectproduce— called repeatedly with the current state and anOutputCollector
The produce function must either:
- Emit one data batch via
out.emit()orout.emitRow(), or - Call
out.finish()to end the stream
Mutate state in-place between calls to track progress.
Type-safe state
Section titled “Type-safe state”The generic <S> parameter on protocol.producer<S>() threads the state type from init to produce:
interface CountState { limit: number; current: number; batchSize: number;}
protocol.producer<CountState>("count", { params: { limit: int32, batch_size: int32 }, outputSchema: { n: int32, n_squared: int32 }, init: async ({ limit, batch_size }) => ({ limit, current: 0, batchSize: batch_size, }), produce: async (state, out) => { // state is typed as CountState if (state.current >= state.limit) { out.finish(); return; } out.emitRow({ n: state.current, n_squared: state.current ** 2 }); state.current++; }, defaults: { batch_size: 1 },});Multi-row batches
Section titled “Multi-row batches”For efficiency, emit multiple rows per batch instead of one at a time:
produce: async (state, out) => { if (state.current >= state.limit) { out.finish(); return; }
const remaining = state.limit - state.current; const count = Math.min(state.batchSize, remaining);
const nValues: number[] = []; const sqValues: number[] = []; for (let i = 0; i < count; i++) { const n = state.current + i; nValues.push(n); sqValues.push(n * n); } state.current += count;
out.emit({ n: nValues, n_squared: sqValues });},Cancellation
Section titled “Cancellation”A client can cancel an in-flight producer stream by sending an input batch whose metadata carries the vgi_rpc.cancel key (the CANCEL_KEY constant). When the server sees this, it ends the stream cleanly without calling produce again.
If you need to release resources held in state (open files, connections, etc.), register an optional onCancel hook on the producer config. It runs exactly once on cancellation:
protocol.producer<CountState>("count", { params: { limit: int32 }, outputSchema: { n: int32, n_squared: int32 }, init: async ({ limit }) => ({ limit, current: 0 }), produce: async (state, out) => { if (state.current >= state.limit) { out.finish(); return; } out.emitRow({ n: state.current, n_squared: state.current ** 2 }); state.current++; }, onCancel: (state) => { // Cleanup; runs once when the client cancels. // Signature: (state: S) => void | Promise<void> },});The TypeScript client does not expose a dedicated cancel() method — cancellation is initiated by sending the cancel-metadata input batch, and onCancel is the server-side hook that responds to it.
Stream headers
Section titled “Stream headers”Producer methods can optionally send a one-time header before the data stream. See Stream Headers.