Installation
With uv (recommended):
Optional extras:
uv add unpod[dialog] # Adds superdialog for structured dialog flows
uv add unpod[langchain] # Adds LangChain adapter
Set your API key:
export UNPOD_API_KEY = "sk-..."
export UNPOD_ORCHESTRATOR_URL = "wss://api.unpod.io" # optional override
Quick Start
from unpod import AgentRunner, CallContext
async def handle_call ( ctx : CallContext) -> None :
await ctx.session.say( "Hello, thanks for calling!" )
await ctx.session.run() # blocks until the call ends
runner = AgentRunner(
entrypoint = handle_call,
agent_id = "my-agent" , # the agent_id from your Speech Pipe settings
)
runner.start() # blocking
agent_id is the runner agent ID (a short string name) from your Speech Pipe’s settings, not the UUID shown in the Speech Pipe list.
AgentRunner
AgentRunner is a long-lived process that holds a WebSocket connection to the Unpod orchestrator. When a call is dispatched to your agent, the runner invokes your entrypoint with a CallContext.
Unpod Orchestrator
| WebSocket (wss)
v
AgentRunner --- heartbeat every N seconds
|
| on dispatch
v
entrypoint(CallContext) --- your code
Constructor
AgentRunner(
entrypoint: Callable[[CallContext], Awaitable[ None ]],
agent_id: str ,
api_key: str | None = None , # falls back to UNPOD_API_KEY
max_sessions: int = 50 , # max concurrent sessions
permits_per_minute: int = 120 , # rate limit hint
drain_timeout_s: int = 60 , # graceful shutdown window
dev_mode: bool = False , # skip TLS, local orchestrator
base_url: str | None = None , # override orchestrator URL
)
Configuration options
Parameter Description max_sessionsMax simultaneous sessions this runner accepts. The orchestrator will not dispatch beyond this. permits_per_minuteRate of new call acceptance. Set lower to protect downstream systems. drain_timeout_sOn shutdown, wait this many seconds for active calls to finish before force-exiting. dev_modeSkip TLS for local development against a local orchestrator.
Lifecycle hooks
React to runner-level events (not individual session events):
from unpod import AgentRunner, CallContext
runner = AgentRunner( entrypoint = handle_call, agent_id = "my-agent" )
@runner.on ( "call_start" )
async def on_call_start ( ctx : CallContext) -> None :
print ( f "New call: { ctx.call_id } " )
@runner.on ( "call_end" )
async def on_call_end ( ctx : CallContext, final_state : str ) -> None :
print ( f "Call { ctx.call_id } ended: { final_state } " )
Event Arguments When call_startctx: CallContextCall is dispatched and connected call_endctx: CallContext, final_state: strCall completed or failed
CallContext
Every call to your entrypoint receives a CallContext:
async def handle_call ( ctx : CallContext) -> None :
ctx.call_id # str: unique call ID
ctx.session_id # str: unique session ID
ctx.agent_id # str: agent that received this call
ctx.direction # str: "inbound" or "outbound"
ctx.user_number # str: caller's E.164 number
ctx.instructions # str | None: per-call override instructions
ctx.data # dict: metadata from dispatch (e.g. CRM data)
ctx.session # Session: call control object
Management Client
AsyncClient / Client provide access to the Unpod management API - Speech Pipes, calls, numbers, voice profiles, and more:
import asyncio
from unpod import AsyncClient
async def main ():
async with AsyncClient( api_key = "sk-..." ) as client:
pipes = await client.pipes.list()
for p in pipes:
print (p.pipe_id, p.name)
asyncio.run(main())
For scripts and non-async contexts use the sync Client:
from unpod import Client
with Client( api_key = "sk-..." ) as client:
profiles = client.voice_profiles.list( language = "en" )
for p in profiles:
print (p.profile_id, p.name)
Available resources
Attribute Resource client.pipesCreate, list, get, update, delete Speech Pipes client.callsInitiate, list, get, hangup calls client.sessionsList and get call sessions client.voice_profilesList and get voice profiles client.numbersList, sync, attach, detach numbers client.recordingsAccess call recordings client.transcriptsAccess call transcripts
Advanced
Monitoring runner health
import asyncio
from unpod import AgentRunner, CallContext
runner = AgentRunner( entrypoint = handle_call, agent_id = "my-agent" )
async def log_stats () -> None :
while True :
s = runner.stats()
print (
f "in_flight= { s.in_flight } "
f "capacity= { s.capacity } "
f "completed= { s.completed_last_hour } "
f "failed= { s.failed_last_hour } "
f "mean_duration= { s.mean_call_duration_s :.1f} s"
)
await asyncio.sleep( 30 )
RunnerStats fields:
Field Type Description in_flightintCurrent active calls capacityintmax_sessions settingcompleted_last_hourintTotal completed since startup failed_last_hourintTotal failed since startup mean_call_duration_sfloatAverage call length in seconds
Graceful shutdown
Send SIGTERM to your process (standard for containers and systemd). The runner stops accepting new dispatches, waits up to drain_timeout_s for active calls to finish, then exits.
import signal
import asyncio
from unpod import AgentRunner, CallContext
runner = AgentRunner( entrypoint = handle_call, agent_id = "my-agent" )
def handle_sigterm ( signum , frame ):
asyncio.create_task(runner.shutdown())
signal.signal(signal. SIGTERM , handle_sigterm)
runner.start()
Multiple runners
Run multiple AgentRunner instances for the same agent across different machines or processes. The orchestrator load-balances across all connected runners using their reported capacity - no shared state required.
# Machine 1
UNPOD_API_KEY = sk-... python agent.py
# Machine 2
UNPOD_API_KEY = sk-... python agent.py
Next Steps
Session Controls Use say(), transfer(), end(), and recording controls.
SuperDialog Integration Attach a dialog flow to your session.