Skip to content

Client Transports

The vgi-rpc client library provides three connect functions that all return the same RpcClient interface. Pick the transport that fits your deployment and use call(), stream(), describe(), and close() identically across all of them.

interface RpcClient {
call(method: string, params?: Record<string, any>): Promise<Record<string, any> | null>;
stream(method: string, params?: Record<string, any>): Promise<StreamSession>;
describe(): Promise<ServiceDescription>;
close(): void;
}

Use httpConnect to talk to a running HTTP server:

import { httpConnect } from "@query-farm/vgi-rpc";
const client = httpConnect("http://localhost:8080", {
prefix: "/vgi", // URL path prefix (default: "" — root)
compressionLevel: 3, // zstd compression (omit to disable)
authorization: "Bearer <token>", // sent as the Authorization header
onLog: (msg) => console.log(`[${msg.level}] ${msg.message}`),
});
const result = await client.call("add", { a: 2, b: 3 });
console.log(result); // { result: 5 }
client.close();

HTTP transport is stateless — stream continuity is managed via XChaCha20-Poly1305 AEAD-sealed state tokens exchanged in batch metadata.

The authorization option (HTTP only) sets the Authorization request header. All three connect functions also accept an externalLocation option for configuring out-of-band data transfer locations.

Use subprocessConnect to spawn a server process and communicate over its stdin/stdout pipes:

import { subprocessConnect } from "@query-farm/vgi-rpc";
const client = subprocessConnect(["bun", "run", "server.ts"], {
cwd: "./my-project", // working directory
env: { DEBUG: "1" }, // extra environment variables
stderr: "inherit", // "inherit" | "pipe" | "ignore" (default: "ignore")
onLog: (msg) => console.log(msg),
});
const result = await client.call("greet", { name: "World" });
console.log(result); // { result: "Hello, World!" }
client.close(); // kills the subprocess

Use pipeConnect for low-level control when you already have a ReadableStream and writable sink:

import { pipeConnect } from "@query-farm/vgi-rpc";
const client = pipeConnect(readable, writable, {
onLog: (msg) => console.log(msg),
});
const result = await client.call("echo", { text: "hello" });
client.close();

The pipe transport is single-threaded: only one call() or stream() operation can be in flight at a time. Attempting a concurrent operation throws an error.

All transports use the same call() method for unary requests:

const result = await client.call("add", { a: 2, b: 3 });
// result: { result: 5 }
  • Default parameter values from the method definition are applied automatically.
  • Void methods (empty result schema) return null.

For server-streaming methods, use stream() and iterate:

const session = await client.stream("count", { limit: 5 });
// Access the stream header (if the method defines headerSchema)
if (session.header) {
console.log("Header:", session.header);
}
// Iterate over output batches
for await (const rows of session) {
console.log(rows); // [{ n: 0, n_squared: 0 }, { n: 1, n_squared: 1 }, ...]
}

For exchange methods, use exchange() to send input batches and receive output:

const session = await client.stream("scale", { factor: 2 });
const output = await session.exchange([{ value: 10 }]);
console.log(output); // [{ value: 20 }]
const output2 = await session.exchange([{ value: 5 }]);
console.log(output2); // [{ value: 10 }]
session.close();
interface StreamSession {
readonly header: Record<string, any> | null;
exchange(input: Record<string, any>[]): Promise<Record<string, any>[]>;
[Symbol.asyncIterator](): AsyncIterableIterator<Record<string, any>[]>;
close(): void;
}

Every RpcClient has a describe() method:

const desc = await client.describe();
console.log(desc.protocolName); // "Calculator"
console.log(desc.protocolVersion); // server-declared protocol version
for (const method of desc.methods) {
console.log(`${method.name} (${method.type})`);
// method.paramsSchema, method.resultSchema, and (for stream methods)
// method.inputSchema / method.outputSchema / method.headerSchema
}

For standalone HTTP introspection without creating a full client:

import { httpIntrospect } from "@query-farm/vgi-rpc";
const desc = await httpIntrospect("http://localhost:8080", { prefix: "/vgi" });

For custom transports, parse raw describe batches:

import { parseDescribeResponse } from "@query-farm/vgi-rpc";
const desc = await parseDescribeResponse(batches, onLog);

All transports accept an onLog callback that receives log messages sent by the server during method execution:

import { httpConnect } from "@query-farm/vgi-rpc";
const client = httpConnect("http://localhost:8080", {
onLog: (msg) => {
console.log(`[${msg.level}] ${msg.message}`);
if (msg.extra) console.log(" extra:", msg.extra);
},
});
interface LogMessage {
level: string;
message: string;
extra?: Record<string, any>;
}