Skip to content

Producer Streams

Producer streams let the server emit a sequence of output batches from a single request. Use them for large result sets, real-time data, or any pattern where the output length is not known upfront.

import { Protocol, int32 } from "@query-farm/vgi-rpc";
const protocol = new Protocol("MyService");
protocol.producer<{ limit: number; current: number }>("count", {
params: { limit: int32 },
outputSchema: { n: int32, n_squared: int32 },
init: async ({ limit }) => ({ limit, current: 0 }),
produce: async (state, out) => {
if (state.current >= state.limit) {
out.finish();
return;
}
out.emitRow({ n: state.current, n_squared: state.current ** 2 });
state.current++;
},
doc: "Count from 0 to limit-1.",
});

A producer method has two phases:

  1. init — receives the request parameters, returns the initial state object
  2. produce — called repeatedly with the current state and an OutputCollector

The produce function must either:

  • Emit one data batch via out.emit() or out.emitRow(), or
  • Call out.finish() to end the stream

Mutate state in-place between calls to track progress.

The generic <S> parameter on protocol.producer<S>() threads the state type from init to produce:

interface CountState {
limit: number;
current: number;
batchSize: number;
}
protocol.producer<CountState>("count", {
params: { limit: int32, batch_size: int32 },
outputSchema: { n: int32, n_squared: int32 },
init: async ({ limit, batch_size }) => ({
limit,
current: 0,
batchSize: batch_size,
}),
produce: async (state, out) => {
// state is typed as CountState
if (state.current >= state.limit) {
out.finish();
return;
}
out.emitRow({ n: state.current, n_squared: state.current ** 2 });
state.current++;
},
defaults: { batch_size: 1 },
});

For efficiency, emit multiple rows per batch instead of one at a time:

produce: async (state, out) => {
if (state.current >= state.limit) {
out.finish();
return;
}
const remaining = state.limit - state.current;
const count = Math.min(state.batchSize, remaining);
const nValues: number[] = [];
const sqValues: number[] = [];
for (let i = 0; i < count; i++) {
const n = state.current + i;
nValues.push(n);
sqValues.push(n * n);
}
state.current += count;
out.emit({ n: nValues, n_squared: sqValues });
},

A client can cancel an in-flight producer stream by sending an input batch whose metadata carries the vgi_rpc.cancel key (the CANCEL_KEY constant). When the server sees this, it ends the stream cleanly without calling produce again.

If you need to release resources held in state (open files, connections, etc.), register an optional onCancel hook on the producer config. It runs exactly once on cancellation:

protocol.producer<CountState>("count", {
params: { limit: int32 },
outputSchema: { n: int32, n_squared: int32 },
init: async ({ limit }) => ({ limit, current: 0 }),
produce: async (state, out) => {
if (state.current >= state.limit) {
out.finish();
return;
}
out.emitRow({ n: state.current, n_squared: state.current ** 2 });
state.current++;
},
onCancel: (state) => {
// Cleanup; runs once when the client cancels.
// Signature: (state: S) => void | Promise<void>
},
});

The TypeScript client does not expose a dedicated cancel() method — cancellation is initiated by sending the cancel-metadata input batch, and onCancel is the server-side hook that responds to it.

Producer methods can optionally send a one-time header before the data stream. See Stream Headers.