Stream Headers
Producer and exchange methods can send a one-time header batch before the data stream begins. Headers are useful for sending metadata about the stream, such as total expected count or a description.
Defining a header
Section titled “Defining a header”Add headerSchema and headerInit to a producer or exchange method:
import { Protocol, int, str } from "@query-farm/vgi-rpc";
const protocol = new Protocol("MyService");
protocol.producer<{ count: number; current: number }>("produce_with_header", { params: { count: int }, outputSchema: { index: int, value: int }, headerSchema: { total_expected: int, description: str }, headerInit: (params) => ({ total_expected: params.count, description: `producing ${params.count} batches`, }), init: ({ count }) => ({ count, current: 0 }), produce: (state, out) => { if (state.current >= state.count) { out.finish(); return; } out.emitRow({ index: state.current, value: state.current * 10 }); state.current++; },});Header init signature
Section titled “Header init signature”type HeaderInit = ( params: Record<string, any>, state: any, ctx: LogContext,) => Record<string, any>;The headerInit function receives:
params— the parsed request parametersstate— the state object returned byinitctx— logging context forclientLog()
It returns a record matching the headerSchema.
Wire behavior
Section titled “Wire behavior”The header batch is sent as the first batch in the response stream, before any data batches from produce or exchange. Clients can identify it by its distinct schema (different fields from the output schema).