Skip to main content

Flow construction

create_dialog_flow

async create_dialog_flow(prompt: str, llm: str, **kwargs) -> Flow
Bootstrap a flow graph from a prompt using a one-shot LLM call.
import asyncio
from superdialog import create_dialog_flow

# Inside async code
flow = await create_dialog_flow(
    prompt="Confirm appointment. Ask if Friday 4pm works; offer 5pm if not.",
    llm="openai/gpt-5.1",
)

# From a sync entry point
flow = asyncio.run(create_dialog_flow(prompt=..., llm=...))
The llm parameter is used only at construction. The runtime model is set on DialogMachine, not here.

Flow.save / Flow.load

flow.save("kyc.json")       # Serialize to JSON
flow = Flow.load("kyc.json") # Deserialize from JSON
JSON-serializable and version-controllable.

FlowSet

Container for multiple named flows. Switch between them at runtime.
from superdialog import FlowSet

flowset = FlowSet({
    "main": main_flow,
    "escalation": escalation_flow,
    "billing": billing_flow,
})

DialogMachine

Construction

DialogMachine(
    flow: Flow | FlowSet,
    llm: str,                                    # model URI
    tools: list[Tool] | None = None,
    memory: Memory | None = None,                # default: in-memory
    config: dict | None = None,                  # max_tokens, temperature, etc.
    traversal_dir: str | Path | None = None,     # auto-save traversal JSON on completion
    adapter: str = "toolcall",                   # "toolcall" (1 LLM call/turn) or "llm" (2 calls/turn)
)
  • traversal_dir - Writes a timestamped JSON file on each completed session recording every node visited, every turn, and all slot values. Useful for debugging, building eval datasets, and auditing.
  • adapter - "toolcall" (default, production) uses one LLM call per turn. "llm" uses two calls per turn and can be useful for debugging flow traversal logic.

start

async start() -> Turn
Generate the agent’s opening message without any user input. Call this at the start of a voice call or chat session to have the agent greet first.
opening = await dialog_machine.start()
print(opening.text)   # "Hello! I'm here to help verify your KYC. Could you provide..."

turn

async turn(
    text: str,
    context: dict | None = None,
    stream: bool | Literal["text"] = False,
) -> Turn | AsyncIterator[StreamChunk]
The primary method. Always async.
# Non-streaming - returns complete Turn
reply = await dialog_machine.turn("hello")
print(reply.text)
print(reply.tool_calls)   # list[ToolCall]
print(reply.metadata)     # {"latency_ms": ..., "tokens": ..., "model": ...}

# Streaming - await coroutine first, then iterate
stream = await dialog_machine.turn("hello", stream=True)
async for chunk in stream:
    print(chunk.text, end="")
Streaming in v0.2: the response is resolved in one shot, then emitted as whitespace-delimited chunks. True provider-level streaming (first-token latency) is planned for v0.4. The StreamChunk(text, done, turn) shape is stable.

reset

dialog_machine.reset()
Clear conversation memory and restart from the flow’s initial node. Useful between independent conversations on the same instance.

set_llm

dialog_machine.set_llm("anthropic/claude-haiku-4-5")
Hot-swap the model. Applies to the next turn; in-flight streaming continues on the old model.

switch_flow

dialog_machine.switch_flow("escalation")
dialog_machine.switch_flow("billing", preserve_memory=True)
Switch to a named flow within the FlowSet. State resets by default; pass preserve_memory=True to keep history.

assist

dialog_machine.assist("Customer is upset. Be especially empathetic.")
Push a system-level instruction that takes effect on the next turn. Used for mid-call context injection.
inject_system(...) is preserved as a deprecated alias and will emit a DeprecationWarning. It is slated for removal in v0.4.

Sessions (v0.2)

Sessions add lifecycle and persistence on top of DialogMachine (and any Agent-protocol-compatible brain). Use them when conversations need to survive across process boundaries.

Agent Protocol

class Agent(Protocol):
    async def turn(text: str, *, stream: bool = False) -> TurnResult | AsyncIterator[StreamChunk]
    def assist(text: str) -> None
    @property
    def chat_ctx(self) -> ChatContext
    def load_chat_ctx(ctx: ChatContext) -> None
DialogMachine, LLMAgent, and LangChainAgent all implement this Protocol.

SessionWorker

from superdialog import DialogMachine, SessionWorker, InMemorySessionStore

worker = SessionWorker(
    agent_factory=lambda: DialogMachine(flow=flow, llm="openai/gpt-5.1", tools=tools),
    store=InMemorySessionStore(),
    lock_backend=None,        # default: AsyncioLockBackend
    max_sessions=1000,
)

async with worker.acquire("user-42") as h:
    result = await h.turn("hello")
    h.assist("Customer sounds upset; be empathetic.")
  • agent_factory is called once per new session
  • acquire(session_id) is an async context manager - loads or creates the session, locks it for the duration of the block, persists state on exit
  • Concurrent requests for different session_ids run in parallel; same id serialises via per-session lock

SessionHandle

Yielded inside the async with worker.acquire(...) block.
Method / PropertyDescription
await h.turn(text, *, stream=False)Execute one turn
h.assist(text)Push system instruction
h.stateCurrent session state

Session stores

StoreShipsUse case
InMemorySessionStore✅ v0.2Single-process; state lives for process lifetime
NullSessionStore✅ v0.2Voice calls; no persistence wanted
RedisSessionStore🔜 v0.3Multi-process, distributed deployments
FileSessionStore🔜 v0.3Simple persistence without Redis
SQLiteSessionStore🔜 v0.3Local single-server persistence

Lock backends

BackendShipsUse case
AsyncioLockBackend✅ v0.2Single-process
RedisLockBackend🔜 v0.3Multi-process / distributed

Alternative agent brains

from superdialog import LLMAgent, LangChainAgent, SessionWorker

# Raw chat brain - no flow, no slots
worker = SessionWorker(
    agent_factory=lambda: LLMAgent(llm="openai/gpt-5.1", system_prompt="Be helpful."),
    store=InMemorySessionStore(),
)

# LangChain runnable (requires: pip install superdialog[langchain])
worker = SessionWorker(
    agent_factory=lambda: LangChainAgent(runnable=my_chain),
    store=InMemorySessionStore(),
)

Tools

@tool decorator

The quickest way to define a tool — decorate any function and it becomes a PythonTool with schema inferred from type hints and docstring:
from superdialog.tools import tool

@tool
def lookup_customer(customer_id: str) -> dict:
    """Look up customer record by ID."""
    return crm.get(customer_id)

# Use directly in DialogMachine
dialog_machine = DialogMachine(flow=flow, llm="...", tools=[lookup_customer])

PythonTool

# Convenience constructor - infers id, name, description, schema
tool = PythonTool.of(my_function)

# Explicit constructor
tool = PythonTool(
    id="lookup",
    name="lookup",
    description="Look up customer by ID",
    fn=lookup_customer,
)

Tool.from_dict

Deserialize a tool spec from a plain dict (e.g. loaded from JSON config). Routes to PythonTool, HttpTool, or MCPTool based on the type field:
from superdialog import Tool

tool = Tool.from_dict(
    {"type": "http", "id": "lookup", "name": "lookup",
     "description": "...", "url": "https://api.company.io/lookup"},
    handler_registry={"my_fn": my_python_fn},  # optional, for type=python
)

HttpTool

tool = HttpTool(
    id="lookup",
    name="lookup",
    description="Look up a customer by partial Aadhaar",
    url="https://api.company.io/customer/lookup",
    method="POST",                    # default: POST
    auth={"type": "bearer", "token": os.environ["KEY"]},
    input_schema=None,                # inferred if not provided
)
Auth shapes in v0.2: {"type": "bearer", "token": "..."}. Additional shapes (basic, api_key, callable) are planned for v0.3.

MCPTool

tool = MCPTool(
    id="search",
    name="search",
    description="Search the knowledge base",
    server="https://mcp.company.io",
    input_schema=None,
)
In v0.2, MCPTool forwards execute(args) to session.call_tool(self.id, args) against the configured server. Auto-discovery of all tools on an MCP server is planned for a follow-up.

LLM provider registration

register_llm_provider

register_llm_provider(
    name="internal",
    base_url="https://llm.company.io/v1",
    api_key=os.environ["INTERNAL_KEY"],
    api_style="openai",               # only "openai" supported in v0.2
)

# Use anywhere after registration
dialog_machine = DialogMachine(flow=flow, llm="custom/internal/llama-3-70b-tuned")
Process-global. Once registered, custom/<name>/<model> works in DialogMachine(llm=...), set_llm(), and create_dialog_flow(llm=...).

Adapters

ImportPurpose
superdialog.adapters.livekit.DialogMachineLLMLiveKit Agent(llm=...) plugin
superdialog.adapters.pipecat.make_processorFactory for PipeCat FrameProcessor
superdialog.adapters.fastapi.FastAPIRouterMountable router: /turn, /stream, /reset
superdialog.adapters.websocket.WebSocketRunnerStandalone WSS server for Unpod Voice Infra
See Embedding Guides for complete integration examples.

Eval (v0.3 - planned)

from superdialog.eval import Eval

eval = Eval(
    flow=flow,
    corpus="tests/kyc-corpus.jsonl",
    llms=["openai/gpt-5.1", "anthropic/claude-haiku-4-5", "groq/llama-3.3-70b"],
    metrics=["accuracy", "latency_p95", "tool_call_correctness"],
)
report = eval.run()
report.save("reports/2026-05-19.md")
Corpus format: JSONL with {utterance, expected_response | expected_intent | expected_tool_call} records. Custom metrics are passable as callables.