Skip to main content

Overview

The hook system lets you register async handlers that fire at specific points in a call’s lifecycle. There are two levels of hooks:
  • Runner-level - fire for every call the runner handles (e.g. logging, metrics)
  • Session-level - fire on events within a single call (e.g. per-turn logic, silence detection)

Registering Hooks

Decorator style

from unpod import AgentRunner, CallContext

runner = AgentRunner(entrypoint=handle_call, agent_id="my-agent")

# Runner-level hook
@runner.on("call_start")
async def on_any_call_start(ctx: CallContext) -> None:
    print(f"Runner received call: {ctx.call_id}")


async def handle_call(ctx: CallContext) -> None:
    # Session-level hook - registered inside the entrypoint
    @ctx.session.on("user_turn")
    async def on_user_turn(text: str) -> None:
        print(f"User said: {text}")

    await ctx.session.run()

Runner-Level Events

These fire at the runner level - once per call, regardless of what happens inside the session.
EventArgumentsWhen
call_startctx: CallContextCall dispatched and bridge connected
call_endctx: CallContext, final_state: strCall finished (any reason)
final_state values: "ended", "failed"
@runner.on("call_start")
async def log_call_start(ctx: CallContext) -> None:
    await db.insert_call_record(
        call_id=ctx.call_id,
        agent_id=ctx.agent_id,
        caller=ctx.user_number,
        direction=ctx.direction,
    )

@runner.on("call_end")
async def log_call_end(ctx: CallContext, final_state: str) -> None:
    await db.update_call_record(ctx.call_id, final_state=final_state)

Session-Level Events

Registered with @ctx.session.on(event) inside your entrypoint. These fire during session.run().

call_start

Fires at the very beginning of the session loop, before any events are processed:
@ctx.session.on("call_start")
async def on_call_start() -> None:
    customer = await crm.lookup(ctx.user_number)
    ctx.session.data["customer"] = customer
    await ctx.session.say(f"Hello {customer['first_name']}, how can I help?")

user_turn

Fires when the user has finished speaking and the transcription is ready:
@ctx.session.on("user_turn")
async def on_user_turn(text: str) -> None:
    await analytics.log_utterance(
        call_id=ctx.call_id,
        speaker="user",
        text=text,
    )

agent_turn

Fires after the agent’s full reply has been generated (post-streaming):
@ctx.session.on("agent_turn")
async def on_agent_turn(text: str) -> None:
    await analytics.log_utterance(
        call_id=ctx.call_id,
        speaker="agent",
        text=text,
    )

interruption

Fires when the user interrupts the agent mid-utterance:
@ctx.session.on("interruption")
async def on_interruption() -> None:
    await analytics.log_event(ctx.call_id, "interruption")

call_end

Fires when the session loop exits (call hung up, error, or session.end() called):
@ctx.session.on("call_end")
async def on_call_end(final_state: str) -> None:
    metrics = ctx.session.metrics.live()
    await db.save_call_summary(
        call_id=ctx.call_id,
        turns=metrics.turn_count,
        final_state=final_state,
    )

Complete Hook Reference

ScopeEventArgumentsDescription
Runnercall_startctx: CallContextCall dispatched to this runner
Runnercall_endctx: CallContext, state: strCall finished
Sessioncall_start(none)Session loop started
Sessionuser_turntext: strUser utterance transcribed
Sessionagent_turntext: strFull agent reply generated
Sessioninterruption(none)User interrupted agent
Sessioncall_endstate: strSession loop exited

Multiple Hooks for the Same Event

You can register multiple handlers for the same event - all are called in registration order:
@ctx.session.on("user_turn")
async def log_turn(text: str) -> None:
    await logger.log(text)

@ctx.session.on("user_turn")
async def check_escalation(text: str) -> None:
    if "speak to a human" in text.lower():
        await ctx.session.transfer_to_human(queue="support")

Practical Patterns

Silence detection with timeout

import asyncio

async def handle_call(ctx: CallContext) -> None:
    last_activity = asyncio.get_event_loop().time()

    @ctx.session.on("user_turn")
    async def on_user_turn(text: str) -> None:
        nonlocal last_activity
        last_activity = asyncio.get_event_loop().time()

    @ctx.session.on("agent_turn")
    async def on_agent_turn(text: str) -> None:
        nonlocal last_activity
        last_activity = asyncio.get_event_loop().time()

    async def silence_watchdog() -> None:
        while True:
            await asyncio.sleep(5)
            idle = asyncio.get_event_loop().time() - last_activity
            if idle > 30:
                await ctx.session.say("Are you still there?")
            if idle > 60:
                await ctx.session.end(reason="no_response")
                break

    ctx.session.dialog_machine = my_machine
    await asyncio.gather(
        ctx.session.run(),
        silence_watchdog(),
    )

CRM enrichment at call start

async def handle_call(ctx: CallContext) -> None:
    @ctx.session.on("call_start")
    async def enrich() -> None:
        account = await crm.get_by_phone(ctx.user_number)
        if account:
            ctx.session.data["account"] = account
            ctx.session.dialog_machine.assist(
                f"Caller is {account['name']}, a {account['tier']} customer. "
                f"Their open ticket count is {account['open_tickets']}."
            )

    ctx.session.dialog_machine = DialogMachine(flow=flow, llm="...")
    await ctx.session.run()

Escalation trigger

async def handle_call(ctx: CallContext) -> None:
    @ctx.session.on("user_turn")
    async def detect_escalation(text: str) -> None:
        triggers = ["speak to a human", "real person", "your manager", "supervisor"]
        if any(t in text.lower() for t in triggers):
            await ctx.session.say(
                "Of course, let me transfer you to one of our team members."
            )
            await ctx.session.transfer_to_human(queue="tier-1")

    ctx.session.dialog_machine = DialogMachine(flow=flow, llm="...")
    await ctx.session.run()

Full call logging

import time
from unpod import AgentRunner, CallContext

runner = AgentRunner(entrypoint=handle_call, agent_id="my-agent")

@runner.on("call_start")
async def runner_start(ctx: CallContext) -> None:
    await db.create_call(
        call_id=ctx.call_id,
        session_id=ctx.session_id,
        direction=ctx.direction,
        caller=ctx.user_number,
        started_at=time.time(),
    )

@runner.on("call_end")
async def runner_end(ctx: CallContext, final_state: str) -> None:
    m = ctx.session.metrics.live()
    await db.update_call(
        ctx.call_id,
        final_state=final_state,
        turns=m.turn_count,
        tokens_in=m.tokens_in,
        tokens_out=m.tokens_out,
    )

async def handle_call(ctx: CallContext) -> None:
    transcript: list[dict] = []

    @ctx.session.on("user_turn")
    async def capture_user(text: str) -> None:
        transcript.append({"speaker": "user", "text": text})

    @ctx.session.on("agent_turn")
    async def capture_agent(text: str) -> None:
        transcript.append({"speaker": "agent", "text": text})

    @ctx.session.on("call_end")
    async def save_transcript(state: str) -> None:
        await db.save_transcript(ctx.call_id, transcript)

    ctx.session.dialog_machine = DialogMachine(flow=flow, llm="...")
    await ctx.session.run()

Next Steps

Outbound Calls

Initiate calls programmatically from your backend.

Session Controls

Use say, transfer, end, and recording controls.