Skip to main content

Installation

pip install unpod
With uv (recommended):
uv add unpod
Optional extras:
uv add unpod[dialog]      # Adds superdialog for structured dialog flows
uv add unpod[langchain]   # Adds LangChain adapter
Set your API key:
export UNPOD_API_KEY="sk-..."
export UNPOD_ORCHESTRATOR_URL="wss://api.unpod.io"  # optional override

Quick Start

from unpod import AgentRunner, CallContext

async def handle_call(ctx: CallContext) -> None:
    await ctx.session.say("Hello, thanks for calling!")
    await ctx.session.run()  # blocks until the call ends

runner = AgentRunner(
    entrypoint=handle_call,
    agent_id="my-agent",  # the agent_id from your Speech Pipe settings
)
runner.start()  # blocking
agent_id is the runner agent ID (a short string name) from your Speech Pipe’s settings, not the UUID shown in the Speech Pipe list.

AgentRunner

AgentRunner is a long-lived process that holds a WebSocket connection to the Unpod orchestrator. When a call is dispatched to your agent, the runner invokes your entrypoint with a CallContext.
Unpod Orchestrator
      | WebSocket (wss)
      v
AgentRunner  --- heartbeat every N seconds
      |
      | on dispatch
      v
entrypoint(CallContext)  --- your code

Constructor

AgentRunner(
    entrypoint: Callable[[CallContext], Awaitable[None]],
    agent_id: str,
    api_key: str | None = None,          # falls back to UNPOD_API_KEY
    max_sessions: int = 50,               # max concurrent sessions
    permits_per_minute: int = 120,       # rate limit hint
    drain_timeout_s: int = 60,           # graceful shutdown window
    dev_mode: bool = False,              # skip TLS, local orchestrator
    base_url: str | None = None,         # override orchestrator URL
)

Configuration options

ParameterDescription
max_sessionsMax simultaneous sessions this runner accepts. The orchestrator will not dispatch beyond this.
permits_per_minuteRate of new call acceptance. Set lower to protect downstream systems.
drain_timeout_sOn shutdown, wait this many seconds for active calls to finish before force-exiting.
dev_modeSkip TLS for local development against a local orchestrator.

Lifecycle hooks

React to runner-level events (not individual session events):
from unpod import AgentRunner, CallContext

runner = AgentRunner(entrypoint=handle_call, agent_id="my-agent")

@runner.on("call_start")
async def on_call_start(ctx: CallContext) -> None:
    print(f"New call: {ctx.call_id}")

@runner.on("call_end")
async def on_call_end(ctx: CallContext, final_state: str) -> None:
    print(f"Call {ctx.call_id} ended: {final_state}")
EventArgumentsWhen
call_startctx: CallContextCall is dispatched and connected
call_endctx: CallContext, final_state: strCall completed or failed

CallContext

Every call to your entrypoint receives a CallContext:
async def handle_call(ctx: CallContext) -> None:
    ctx.call_id       # str: unique call ID
    ctx.session_id    # str: unique session ID
    ctx.agent_id      # str: agent that received this call
    ctx.direction     # str: "inbound" or "outbound"
    ctx.user_number   # str: caller's E.164 number
    ctx.instructions  # str | None: per-call override instructions
    ctx.data          # dict: metadata from dispatch (e.g. CRM data)
    ctx.session       # Session: call control object

Management Client

AsyncClient / Client provide access to the Unpod management API - Speech Pipes, calls, numbers, voice profiles, and more:
import asyncio
from unpod import AsyncClient

async def main():
    async with AsyncClient(api_key="sk-...") as client:
        pipes = await client.pipes.list()
        for p in pipes:
            print(p.pipe_id, p.name)

asyncio.run(main())
For scripts and non-async contexts use the sync Client:
from unpod import Client

with Client(api_key="sk-...") as client:
    profiles = client.voice_profiles.list(language="en")
    for p in profiles:
        print(p.profile_id, p.name)

Available resources

AttributeResource
client.pipesCreate, list, get, update, delete Speech Pipes
client.callsInitiate, list, get, hangup calls
client.sessionsList and get call sessions
client.voice_profilesList and get voice profiles
client.numbersList, sync, attach, detach numbers
client.recordingsAccess call recordings
client.transcriptsAccess call transcripts

Advanced

Monitoring runner health

import asyncio
from unpod import AgentRunner, CallContext

runner = AgentRunner(entrypoint=handle_call, agent_id="my-agent")

async def log_stats() -> None:
    while True:
        s = runner.stats()
        print(
            f"in_flight={s.in_flight} "
            f"capacity={s.capacity} "
            f"completed={s.completed_last_hour} "
            f"failed={s.failed_last_hour} "
            f"mean_duration={s.mean_call_duration_s:.1f}s"
        )
        await asyncio.sleep(30)
RunnerStats fields:
FieldTypeDescription
in_flightintCurrent active calls
capacityintmax_sessions setting
completed_last_hourintTotal completed since startup
failed_last_hourintTotal failed since startup
mean_call_duration_sfloatAverage call length in seconds

Graceful shutdown

Send SIGTERM to your process (standard for containers and systemd). The runner stops accepting new dispatches, waits up to drain_timeout_s for active calls to finish, then exits.
import signal
import asyncio
from unpod import AgentRunner, CallContext

runner = AgentRunner(entrypoint=handle_call, agent_id="my-agent")

def handle_sigterm(signum, frame):
    asyncio.create_task(runner.shutdown())

signal.signal(signal.SIGTERM, handle_sigterm)
runner.start()

Multiple runners

Run multiple AgentRunner instances for the same agent across different machines or processes. The orchestrator load-balances across all connected runners using their reported capacity - no shared state required.
# Machine 1
UNPOD_API_KEY=sk-... python agent.py

# Machine 2
UNPOD_API_KEY=sk-... python agent.py

Next Steps

Session Controls

Use say(), transfer(), end(), and recording controls.

SuperDialog Integration

Attach a dialog flow to your session.