Skip to content

Unix Socket Launcher

The launcher serves a vgi-rpc protocol over an AF_UNIX domain socket instead of stdin/stdout or HTTP. It is built for the spawn-or-reuse pattern: a single long-running worker process binds one socket, and any number of callers connect to that socket path. A coordination layer (launch) makes sure exactly one worker exists per worker command tuple, reusing a warm worker when one is already running.

The on-disk layout and tuple hash match the Python vgi_rpc.launcher byte-for-byte, so a TypeScript worker and a Python launcher (or vice versa) resolve to the same socket.

  • One server process per socket. serveUnix binds an AF_UNIX socket, accepts connections one at a time (sequential listen, like Python’s serve_unix), and dispatches each connection through the standard unary / stream / __describe__ machinery.
  • Self-termination. A worker shuts itself down after idleTimeout seconds with zero connected clients, after an initial startupGrace window so a slow first caller doesn’t see the socket vanish.
  • Spawn-or-reuse. launch derives a deterministic socket path from a hash of the worker command tuple, serializes concurrent first-callers on a per-hash lockfile, probes for an existing worker, and spawns one only if none is serving.
  • Worker contract. A worker accepts --unix PATH and --idle-timeout SEC on its command line, and prints exactly one UNIX:<absolute-path> line to stdout once bind + listen succeed.

serveUnix(protocol, options) binds a Unix socket and serves a Protocol. It returns a ServeUnixHandle.

import { Protocol, float, serveUnix } from "@query-farm/vgi-rpc";
const protocol = new Protocol("Calculator");
protocol.unary("add", {
params: { a: float, b: float },
result: { result: float },
handler: async ({ a, b }) => ({ result: a + b }),
});
const handle = await serveUnix(protocol, {
unixPath: "/tmp/calculator.sock",
idleTimeout: 300, // self-terminate after 300s idle (0 disables)
});
console.log("serving on", handle.socketPath);
// Block until the server stops (idle timeout, stop(), or fatal error).
await handle.done;

Once bind + listen succeed, serveUnix writes UNIX:<absolute-path>\n to stdout (this is the cross-language launcher contract) and tightens the socket file to mode 0600.

OptionTypeDefaultDescription
unixPathstring(required)Path to the Unix socket to bind. Resolved to an absolute path.
idleTimeoutnumber300Self-terminate after this many seconds with zero connected clients. 0 disables the timer.
startupGraceSecondsnumber5Grace period after listen() succeeds before the idle timer starts ticking.
protocolVersionstring""Optional logical-service / protocol-contract version label.
serverIdstringrandomCustom server identifier (random 12-char id if omitted).
enableDescribebooleantrueEnable the __describe__ introspection method.
dispatchHookDispatchHookOptional dispatch hook for observability.
externalLocationExternalLocationConfigOptional external-storage config for large-batch externalization.
onServeStartServeStartHookLifecycle hook fired once before the first dispatched request.
backlognumber16Maximum listen backlog.
onBound(sockPath: string) => voidCalled after listen() returns but before UNIX:<path> is printed.
announcementSinkNodeJS.WritableStreamprocess.stdoutStream used for the UNIX:<path> announcement line.
interface ServeUnixHandle {
readonly socketPath: string;
/** Shut down the listener and unlink the socket file. */
stop(): Promise<void>;
/** Resolves when the server has stopped (idle timeout, stop(), or fatal error). */
readonly done: Promise<void>;
}

launch(config) ensures a worker is running for a given command and returns its socket path. The flow:

  1. Resolve the state directory and the per-tuple lock / sock / meta paths (or use an explicit socketPath).
  2. Acquire the per-hash lockfile (blocking up to connectTimeout).
  3. Probe the socket — if a worker is already serving, reuse it and return its path.
  4. Otherwise clean up any stale socket, write launch metadata, and spawn the worker, waiting for its UNIX:<path> announcement.
  5. Release the lock, then run a bounded opportunistic GC of stale entries.
import { launch, pipeConnect } from "@query-farm/vgi-rpc";
import { createConnection } from "node:net";
const socketPath = await launch({
workerArgv: ["bun", "run", "examples/calculator-unix.ts"],
idleTimeout: 300, // forwarded as --idle-timeout
connectTimeout: 30, // max seconds to wait for the lock
workerStartupTimeout: 60, // max seconds to wait for UNIX:<path>
});
// Connect to the worker over the socket (a Node socket is a duplex stream).
const sock = createConnection({ path: socketPath });
const client = pipeConnect(sock, sock);
const result = await client.call("add", { a: 2, b: 3 });

launch appends --unix <socketPath> and --idle-timeout <seconds> to workerArgv when spawning, so the worker must implement the serveUnix CLI contract (parse those flags, print UNIX:<path>).

FieldTypeDefaultDescription
workerArgvreadonly string[](required)The worker command and arguments. Must be non-empty.
socketPathstringderivedExplicit socket path. When omitted, derived from the tuple hash. Explicit paths get a sibling .lock, no .meta, and are skipped by status / GC.
idleTimeoutnumber300Worker self-shutdown after this many seconds idle. Forwarded as --idle-timeout.
connectTimeoutnumber30Max seconds to block waiting for the per-hash file lock.
workerStartupTimeoutnumber60Max seconds to wait for the worker to print UNIX:<path>.
workerStderrstringWhen set, worker stderr is appended to this file; otherwise discarded.
stateDirstringdefaultStateDir()Override the default state directory.

These lower-level exports back the coordination layer. Most callers only need launch and serveUnix; reach for these when building custom tooling (status displays, cleanup jobs, alternative coordinators).

defaultStateDir() returns the per-user directory used for lockfiles and sockets, creating it mode 0700 if missing:

  • Linux: $XDG_RUNTIME_DIR/vgi-rpc/ when set, otherwise $TMPDIR/vgi-rpc-$UID/.
  • macOS / BSD: $TMPDIR/vgi-rpc-$UID/.
  • Windows: $TMP/vgi-rpc/.

On POSIX it refuses to operate on a directory not owned by the current user.

socketPaths(stateDir, hashId) returns the <hash>.lock / <hash>.sock / <hash>.meta triple for one worker tuple:

import { defaultStateDir, socketPaths, launcherComputeHash } from "@query-farm/vgi-rpc";
const stateDir = defaultStateDir();
const hashId = await launcherComputeHash(["bun", "run", "worker.ts"]);
const { lockPath, sockPath, metaPath } = socketPaths(stateDir, hashId);
interface SocketPaths {
lockPath: string;
sockPath: string;
metaPath: string;
}

launcherComputeHash(workerArgv, cwd?, env?) computes the 16-hex-char tuple hash. Only VGI_RPC_* environment keys participate, so workers that differ only in unrelated env (PATH, HOME, …) intentionally share a worker. The hash matches Python’s compute_hash byte-for-byte.

probeSocket(sockPath, timeoutMs?) (default 2000 ms) resolves true if something is currently accepting connections on the socket, false otherwise:

import { probeSocket } from "@query-farm/vgi-rpc";
if (await probeSocket("/tmp/calculator.sock")) {
console.log("a worker is live");
}

launch serializes first-callers on a per-hash lockfile. The lock uses a persistent PID-stamp protocol (the lockfile is truncated to zero bytes on release rather than unlinked, so it remains a slot marker for scanners).

  • tryAcquireLock(lockPath) — non-blocking; returns a FileLockHandle or null if held by a live process.
  • acquireLock(lockPath, timeoutMs) — polls until acquired or the timeout fires (throws on timeout).
import { acquireLock, tryAcquireLock, socketPaths, defaultStateDir, launcherComputeHash } from "@query-farm/vgi-rpc";
const hashId = await launcherComputeHash(["bun", "run", "worker.ts"]);
const { lockPath } = socketPaths(defaultStateDir(), hashId);
const handle = await acquireLock(lockPath, 30_000);
try {
// critical section: probe + spawn
} finally {
handle.release();
}
// Or non-blocking:
const maybe = tryAcquireLock(lockPath);
if (maybe) maybe.release();
interface FileLockHandle {
readonly path: string;
/** Truncate the file to zero bytes; the file persists as a slot marker. Idempotent. */
release(): void;
}

statusRows(stateDir) lists one row per <hash>.lock in the state directory. It is read-only and takes no locks, reading the best-effort .meta file and probing each socket for liveness:

import { statusRows, defaultStateDir } from "@query-farm/vgi-rpc";
for (const row of await statusRows(defaultStateDir())) {
console.log(row.hashId, row.alive ? "alive" : "dead", row.cmd.join(" "));
}
interface StatusRow {
hashId: string;
cmd: string[];
cwd: string;
socket: string;
startedAt: number | null;
alive: boolean;
}

gcStateDir(stateDir, tryAcquire, options?) removes the .lock / .sock / .meta triples whose worker is no longer accepting connections. It only cleans entries whose lock it can acquire (so it never touches a live worker or an in-flight launch). The tryAcquire callback returns a release function when the lock is free, or null when it is held — pass one backed by tryAcquireLock:

import { gcStateDir, tryAcquireLock, defaultStateDir } from "@query-farm/vgi-rpc";
const result = await gcStateDir(
defaultStateDir(),
async (lockPath) => {
const h = tryAcquireLock(lockPath);
return h ? () => h.release() : null;
},
{ limit: 16 }, // optional: bound the scan; excludeHash skips a specific entry
);
console.log("cleaned:", result.cleaned);
console.log("skipped (in use):", result.skippedInUse);
interface GcResult {
/** Hash IDs of stale entries that were removed. */
cleaned: string[];
/** Hash IDs whose lockfile is currently held (a launch is in flight or the worker is alive). */
skippedInUse: string[];
}

launch already runs a bounded GC (limit 16) opportunistically after releasing its lock, so you rarely need to call gcStateDir directly.