Skip to content

Transport Layer

Shape’s unified transport layer lives in the shape-wire crate. It provides a trait-based transport abstraction with TCP and QUIC implementations, plus a memoization wrapper that caches results for content-addressed function calls.

Transport and wire encoding are unified in shape-wire:

  • Transport (shape_wire::transport): network connections and framed byte I/O.
  • Encoding (shape_wire::codec + shape_wire::value + shape_wire::envelope): canonical MessagePack codec and typed wire values (JSON helpers are debug/output adapters).

The VM side (transport_builtins.rs) is a thin adapter that exposes these primitives to Shape code.

shape-wire/src
├── transport/
│ ├── mod.rs # Transport + Connection traits, TransportError
│ ├── factory.rs # TransportKind + WireTransportProvider + QUIC config
│ ├── framing.rs # zstd compression (encode_framed / decode_framed)
│ ├── tcp.rs # TcpTransport (always available)
│ ├── quic.rs # QuicTransport (feature = "quic")
│ └── memoized.rs # MemoizedTransport<T> wrapper
├── codec.rs # MessagePack encode/decode + JSON debug adapters
├── value.rs # WireValue model
├── envelope.rs # ValueEnvelope
└── metadata.rs # Type metadata for values
shape-vm/src/executor/builtins
├── transport_builtins.rs # Shape-facing std:transport exports
└── transport_provider.rs # provider boundary + default shape-wire provider

Both TCP and QUIC transports use length-prefixed frames with transparent compression:

[4-byte big-endian payload length] [flags: u8] [body...]

The flags byte controls compression:

  • 0x00 — body is uncompressed
  • 0x01 — body is zstd-compressed

Payloads under 256 bytes are always stored uncompressed. Larger payloads are compressed with zstd level 3, but only if the compressed form is actually smaller. A 256 MB decompression limit prevents decompression bombs. See Wire Protocol & Optimization for full details on compression ratios and configuration.

shape-wire transports treat payload bytes as opaque. Content-addressed metadata (for example function hashes in CallPayload) lives in the serialized message body, not in a transport-specific header.

For distributed execution, a common pattern is:

  1. Build a value (CallPayload, VmState, Delta) with std::core::state.
  2. Serialize to bytes (state::serialize(...)).
  3. Send bytes over std::core::transport (automatically compressed on the wire).
  4. Deserialize on the receiver (automatically decompressed).
from std::core::state use { capture_call, serialize, deserialize }
from std::core::transport use { tcp, send, connect, connection_send, connection_recv, connection_close }
fn send_call(t: Any, dest: string, f: Any, args: Array<Any>) -> Any {
// Build a CallPayload body
let payload = state::capture_call(f, args)
// Serialize to MessagePack bytes
let bytes = state::serialize(payload)
// Transport sends opaque framed bytes.
let response_bytes = transport::send(t, dest, bytes)?
// Deserialize the response
state::deserialize(response_bytes)
}
// On the receiving side (typically handled by the shape-wire server):
fn handle_incoming(raw_bytes: Array<int>) -> Array<int> {
let payload = state::deserialize(raw_bytes) // decode CallPayload
// payload.function.hash → verify blob is available
// payload.args → execute with these arguments
let result = invoke(payload.function, payload.args)
state::serialize(result) // encode response
}

All transports implement two traits: Transport for opening connections and one-shot sends, and Connection for persistent bidirectional communication.

pub trait Transport: Send + Sync {
fn send(&self, destination: &str, payload: &[u8]) -> Result<Vec<u8>, TransportError>;
fn connect(&self, destination: &str) -> Result<Box<dyn Connection>, TransportError>;
}
pub trait Connection: Send {
fn send(&mut self, payload: &[u8]) -> Result<(), TransportError>;
fn recv(&mut self, timeout: Option<Duration>) -> Result<Vec<u8>, TransportError>;
fn close(&mut self) -> Result<(), TransportError>;
/// Whether this connection supports parallel sidecar delivery.
fn supports_sidecars(&self) -> bool { false }
/// Send a sidecar payload (possibly on a separate parallel stream).
fn send_sidecar(&mut self, payload: &[u8]) -> Result<(), TransportError> {
self.send(payload)
}
/// Receive any incoming message (regular or sidecar).
fn recv_any(&mut self, timeout: Option<Duration>) -> Result<Vec<u8>, TransportError> {
self.recv(timeout)
}
}

Transport::send is a one-shot request/response: send a payload to a destination, wait for a response, return it. Transport::connect opens a persistent connection for multiple exchanges.

The sidecar methods enable large payloads to be split into separate messages. By default, sidecars fall back to send()/recv(). QUIC overrides these to use separate unidirectional streams for parallel delivery. See Wire Protocol & Optimization for details on sidecar extraction and reassembly.

All transport operations return Result<T, TransportError>. The error variants are:

VariantDescription
ConnectionFailedCould not establish a connection to the destination
SendFailedConnection established but sending the payload failed
ReceiveFailedConnection established but reading the response failed
TimeoutOperation exceeded the configured or requested timeout
ConnectionClosedThe remote end closed the connection
PayloadTooLarge { size, max }Payload exceeds the maximum allowed size
IoUnderlying I/O error (wraps std::io::Error)

In Shape code, transport errors surface as Result values and can be handled with the ? operator.

Transport creation is abstracted through shape_wire::transport::factory:

pub enum TransportKind {
Tcp,
Quic,
}
pub trait WireTransportProvider: Send + Sync {
fn create_transport(&self, kind: TransportKind) -> Result<Arc<dyn Transport>, String>;
}

shape-wire ships ShapeWireProvider as the default provider. The VM uses this provider boundary so embedders can swap transport-construction policy without changing VM execution code.

Host-side APIs exposed by shape-vm:

  • shape_vm::set_transport_provider(...)
  • shape_vm::reset_transport_provider()
  • shape_vm::configure_quic_transport(...) (feature quic)
  • shape_vm::clear_quic_transport_config() (feature quic)

TcpTransport is the default transport:: It is always available — no feature flag required.

Wire format:

  • Length-prefixed framing: every message is preceded by a 4-byte big-endian length header followed by a flags byte and the payload body.
  • Transparent compression: payloads >= 256 bytes are zstd-compressed when beneficial. Decompression is automatic on receive.
  • Maximum payload: 64 MB by default (configurable).

The framing protocol is deliberately simple. Higher-level concerns (content hashing and call semantics) are handled by the payload format, not by the transport itself.

TCP transport is available in Shape code via std::core::transport:

from std::core::transport use { tcp, send, connect, connection_send, connection_recv, connection_close, quic, memoized, memo_stats, memo_invalidate }
let tcp = transport::tcp()
// One-shot: send payload and wait for response
let response = transport::send(tcp, "10.0.0.5:9000", payload)?
// Persistent connection
let conn = transport::connect(tcp, "10.0.0.5:9000")?
transport::connection_send(conn, data)?
let received = transport::connection_recv(conn, 5000)? // 5s timeout
transport::connection_close(conn)?

transport::tcp() returns a transport handle. transport::send performs a one-shot request/response. For multiple exchanges with the same node, open a persistent connection with transport::connect to avoid repeated TCP handshakes.

QuicTransport is available behind the quic feature flag. It uses the quinn crate for the underlying QUIC implementation.

Key properties:

  • Multiplexed streams: multiple logical streams over a single connection. Parallel blob fetches (e.g., pulling several FunctionBlobs from a remote store) each run on their own QUIC stream without head-of-line blocking.
  • 0-RTT connection establishment: repeated connections to the same node skip the full handshake, reducing latency for frequent inter-node calls.
  • Built-in TLS 1.3 encryption: all traffic is encrypted by default with no additional configuration.
  • Connection migration: the connection survives IP address changes, which is useful for mobile and edge scenarios where the network path may shift mid-session.
  • Parallel sidecar delivery: large payloads (> 1 MB) can be sent as sidecars on separate unidirectional streams, delivered in parallel with the main call message. See Wire Protocol & Optimization.

transport::quic() requires host-side QUIC configuration. If configuration is missing, transport::quic() returns an error.

#[cfg(feature = "quic")]
use shape_vm::{configure_quic_transport, clear_quic_transport_config};
#[cfg(feature = "quic")]
fn configure_quic() -> Result<(), Box<dyn std::error::Error>> {
let root_ca_der = std::fs::read("certs/shape-ca.der")?;
configure_quic_transport(
"shape-node.internal".to_string(), // SNI / certificate server name
vec![root_ca_der], // trust anchors (DER)
Some(std::time::Duration::from_secs(5)),
);
Ok(())
}

The QUIC transport uses these settings to build a quinn::ClientConfig and validate peer certificates.

The Shape-level API is identical to TCP — only the transport constructor differs:

from std::core::transport use { tcp, send, connect, connection_send, connection_recv, connection_close, quic, memoized, memo_stats, memo_invalidate }
let quic = transport::quic()
// One-shot
let response = transport::send(quic, "10.0.0.5:9000", payload)?
// Persistent connection (multiplexed under the hood)
let conn = transport::connect(quic, "10.0.0.5:9000")?
transport::connection_send(conn, data)?
let received = transport::connection_recv(conn, 5000)?
transport::connection_close(conn)?

Because both TcpTransport and QuicTransport implement the same Transport trait, code that accepts a transport handle works with either implementation without modification.

Add the quic feature when building:

Terminal window
cargo build --features quic

If the quic feature is not enabled, calling transport::quic() in Shape code returns an error.

At runtime, the current implementation does:

  1. transport::quic() builds a QuicTransport via the provider factory.
  2. One-shot transport::send(...):
    • connect to peer
    • open a bidirectional stream
    • write a length-prefixed request
    • read a length-prefixed response
  3. Persistent transport::connect(...) returns a Connection wrapper:
    • connection_send opens a unidirectional stream and writes one frame
    • connection_recv(timeout?) accepts a unidirectional stream and reads one frame

This keeps the same Shape API as TCP while enabling QUIC stream multiplexing underneath.

MemoizedTransport<T: Transport> wraps any transport with result caching. It is designed for content-addressed function calls where the same function applied to the same arguments always produces the same result.

How it works:

  1. Cache key: SHA-256 of destination + payload. Because payloads for content-addressed calls contain function hashes and argument hashes, the cache key is deterministic.
  2. Lookup: before forwarding a request to the inner transport, the wrapper checks its cache for a matching key.
  3. Hit: return the cached result immediately — no network round-trip.
  4. Miss: forward the request to the inner transport, cache the response, return it.
  5. Eviction: LRU eviction with a configurable maximum number of entries.
from std::core::transport use { tcp, send, connect, connection_send, connection_recv, connection_close, quic, memoized, memo_stats, memo_invalidate }
// Create memoized transport (optional max entries; default 1024)
let memo = transport::memoized(10000)
// First call: goes over the network
let r1 = transport::send(memo, "10.0.0.5:9000", payload)?
// Second call with same destination + payload: returned from cache
let r2 = transport::send(memo, "10.0.0.5:9000", payload)?

transport::memoized(...) currently wraps a TCP transport and returns a handle compatible with transport::send and transport::connect.

transport::memo_stats(handle) returns:

[cache_hits, cache_misses, evictions, total_requests]

Use transport::memo_invalidate(handle) to clear the cache.

The memoized transport intercepts CallPayload requests before forwarding them to the remote node. For calls where the same function hash plus the same argument hashes produce the same result (pure functions), the cache eliminates all redundant network traffic.

This composes naturally with the content-addressed memoization annotation described in Content-Addressed Bytecode — local memoization avoids re-execution, transport memoization avoids re-transmission.

Integration with Content-Addressed Bytecode

Section titled “Integration with Content-Addressed Bytecode”

The transport layer and std::core::state primitives compose to enable distributed execution. The typical flow for dispatching a function call to a remote node is:

  1. Build the payload: state::capture_call(f, args) produces a CallPayload containing the function’s content hash and the argument values.
  2. Serialize: state::serialize(payload) encodes the CallPayload to MessagePack bytes.
  3. Send: transport::send(t, destination, bytes) transfers the serialized payload to a remote node over TCP or QUIC.
  4. Remote deserialize + execute: the remote node calls state::deserialize(bytes) to reconstruct the CallPayload, resolves the function hash to a local FunctionBlob (fetching it if missing), and executes the call.
  5. Return result: the remote node serializes the result and sends it back over the same transport::
from std::core::state use { capture_call, serialize, deserialize }
from std::core::transport use { tcp, send, connect, connection_send, connection_recv, connection_close }
let tcp = transport::tcp()
fn remote_call(dest: string, f: Any, arguments: Array<Any>) -> Any {
// 1. Build payload
let payload = state::capture_call(f, arguments)
// 2. Serialize
let bytes = state::serialize(payload)
// 3. Send to remote node
let response_bytes = transport::send(tcp, dest, bytes)?
// 4-5. Deserialize the result
state::deserialize(response_bytes)
}

Because function identity is a content hash, the remote node does not need the caller’s program. It only needs the referenced FunctionBlobs — which can come from the caller, from a shared blob store, or from any other node that has compiled the same function.

Composing Memoized Transport with Distribution

Section titled “Composing Memoized Transport with Distribution”

For pure functions, wrapping the transport with MemoizedTransport means:

  • First call with a given function + args: full round-trip (serialize, send, execute remotely, receive).
  • Subsequent calls with the same function + args: instant cache hit, zero network traffic.
let memo = transport::memoized(10000)
// The @faas annotation can use the memoized transport
@faas(cluster_with_memo_transport)
fn train(data: Array<Sample>) -> Model {
heavy_computation(data)
}