Skip to content

Stream Headers

Producer and exchange methods can send a one-time header batch before the data stream begins. Headers are useful for sending metadata about the stream, such as total expected count or a description.

Defining a header

Add headerSchema and headerInit to a producer or exchange method:

import { Protocol, int, str } from "vgi-rpc-typescript";
const protocol = new Protocol("MyService");
protocol.producer<{ count: number; current: number }>("produce_with_header", {
params: { count: int },
outputSchema: { index: int, value: int },
headerSchema: { total_expected: int, description: str },
headerInit: (params) => ({
total_expected: params.count,
description: `producing ${params.count} batches`,
}),
init: ({ count }) => ({ count, current: 0 }),
produce: (state, out) => {
if (state.current >= state.count) {
out.finish();
return;
}
out.emitRow({ index: state.current, value: state.current * 10 });
state.current++;
},
});

Header init signature

type HeaderInit = (
params: Record<string, any>,
state: any,
ctx: LogContext,
) => Record<string, any>;

The headerInit function receives:

  • params — the parsed request parameters
  • state — the state object returned by init
  • ctx — logging context for clientLog()

It returns a record matching the headerSchema.

Wire behavior

The header batch is sent as the first batch in the response stream, before any data batches from produce or exchange. Clients can identify it by its distinct schema (different fields from the output schema).