Flow construction
create_dialog_flow
async create_dialog_flow(prompt: str, llm: str, **kwargs) -> Flow
Bootstrap a flow graph from a prompt using a one-shot LLM call.
import asyncio
from superdialog import create_dialog_flow
# Inside async code
flow = await create_dialog_flow(
prompt="Confirm appointment. Ask if Friday 4pm works; offer 5pm if not.",
llm="openai/gpt-5.1",
)
# From a sync entry point
flow = asyncio.run(create_dialog_flow(prompt=..., llm=...))
The llm parameter is used only at construction. The runtime model is set on DialogMachine, not here.
Flow.save / Flow.load
flow.save("kyc.json") # Serialize to JSON
flow = Flow.load("kyc.json") # Deserialize from JSON
JSON-serializable and version-controllable.
FlowSet
Container for multiple named flows. Switch between them at runtime.
from superdialog import FlowSet
flowset = FlowSet({
"main": main_flow,
"escalation": escalation_flow,
"billing": billing_flow,
})
DialogMachine
Construction
DialogMachine(
flow: Flow | FlowSet,
llm: str, # model URI
tools: list[Tool] | None = None,
memory: Memory | None = None, # default: in-memory
config: dict | None = None, # max_tokens, temperature, etc.
traversal_dir: str | Path | None = None, # auto-save traversal JSON on completion
adapter: str = "toolcall", # "toolcall" (1 LLM call/turn) or "llm" (2 calls/turn)
)
traversal_dir - Writes a timestamped JSON file on each completed session recording every node visited, every turn, and all slot values. Useful for debugging, building eval datasets, and auditing.
adapter - "toolcall" (default, production) uses one LLM call per turn. "llm" uses two calls per turn and can be useful for debugging flow traversal logic.
start
Generate the agent’s opening message without any user input. Call this at the start of a voice call or chat session to have the agent greet first.
opening = await dialog_machine.start()
print(opening.text) # "Hello! I'm here to help verify your KYC. Could you provide..."
turn
async turn(
text: str,
context: dict | None = None,
stream: bool | Literal["text"] = False,
) -> Turn | AsyncIterator[StreamChunk]
The primary method. Always async.
# Non-streaming - returns complete Turn
reply = await dialog_machine.turn("hello")
print(reply.text)
print(reply.tool_calls) # list[ToolCall]
print(reply.metadata) # {"latency_ms": ..., "tokens": ..., "model": ...}
# Streaming - await coroutine first, then iterate
stream = await dialog_machine.turn("hello", stream=True)
async for chunk in stream:
print(chunk.text, end="")
Streaming in v0.2: the response is resolved in one shot, then emitted as whitespace-delimited chunks. True provider-level streaming (first-token latency) is planned for v0.4. The StreamChunk(text, done, turn) shape is stable.
reset
Clear conversation memory and restart from the flow’s initial node. Useful between independent conversations on the same instance.
set_llm
dialog_machine.set_llm("anthropic/claude-haiku-4-5")
Hot-swap the model. Applies to the next turn; in-flight streaming continues on the old model.
switch_flow
dialog_machine.switch_flow("escalation")
dialog_machine.switch_flow("billing", preserve_memory=True)
Switch to a named flow within the FlowSet. State resets by default; pass preserve_memory=True to keep history.
assist
dialog_machine.assist("Customer is upset. Be especially empathetic.")
Push a system-level instruction that takes effect on the next turn. Used for mid-call context injection.
inject_system(...) is preserved as a deprecated alias and will emit a DeprecationWarning. It is slated for removal in v0.4.
Sessions (v0.2)
Sessions add lifecycle and persistence on top of DialogMachine (and any Agent-protocol-compatible brain). Use them when conversations need to survive across process boundaries.
Agent Protocol
class Agent(Protocol):
async def turn(text: str, *, stream: bool = False) -> TurnResult | AsyncIterator[StreamChunk]
def assist(text: str) -> None
@property
def chat_ctx(self) -> ChatContext
def load_chat_ctx(ctx: ChatContext) -> None
DialogMachine, LLMAgent, and LangChainAgent all implement this Protocol.
SessionWorker
from superdialog import DialogMachine, SessionWorker, InMemorySessionStore
worker = SessionWorker(
agent_factory=lambda: DialogMachine(flow=flow, llm="openai/gpt-5.1", tools=tools),
store=InMemorySessionStore(),
lock_backend=None, # default: AsyncioLockBackend
max_sessions=1000,
)
async with worker.acquire("user-42") as h:
result = await h.turn("hello")
h.assist("Customer sounds upset; be empathetic.")
agent_factory is called once per new session
acquire(session_id) is an async context manager - loads or creates the session, locks it for the duration of the block, persists state on exit
- Concurrent requests for different
session_ids run in parallel; same id serialises via per-session lock
SessionHandle
Yielded inside the async with worker.acquire(...) block.
| Method / Property | Description |
|---|
await h.turn(text, *, stream=False) | Execute one turn |
h.assist(text) | Push system instruction |
h.state | Current session state |
Session stores
| Store | Ships | Use case |
|---|
InMemorySessionStore | ✅ v0.2 | Single-process; state lives for process lifetime |
NullSessionStore | ✅ v0.2 | Voice calls; no persistence wanted |
RedisSessionStore | 🔜 v0.3 | Multi-process, distributed deployments |
FileSessionStore | 🔜 v0.3 | Simple persistence without Redis |
SQLiteSessionStore | 🔜 v0.3 | Local single-server persistence |
Lock backends
| Backend | Ships | Use case |
|---|
AsyncioLockBackend | ✅ v0.2 | Single-process |
RedisLockBackend | 🔜 v0.3 | Multi-process / distributed |
Alternative agent brains
from superdialog import LLMAgent, LangChainAgent, SessionWorker
# Raw chat brain - no flow, no slots
worker = SessionWorker(
agent_factory=lambda: LLMAgent(llm="openai/gpt-5.1", system_prompt="Be helpful."),
store=InMemorySessionStore(),
)
# LangChain runnable (requires: pip install superdialog[langchain])
worker = SessionWorker(
agent_factory=lambda: LangChainAgent(runnable=my_chain),
store=InMemorySessionStore(),
)
The quickest way to define a tool — decorate any function and it becomes a PythonTool with schema inferred from type hints and docstring:
from superdialog.tools import tool
@tool
def lookup_customer(customer_id: str) -> dict:
"""Look up customer record by ID."""
return crm.get(customer_id)
# Use directly in DialogMachine
dialog_machine = DialogMachine(flow=flow, llm="...", tools=[lookup_customer])
# Convenience constructor - infers id, name, description, schema
tool = PythonTool.of(my_function)
# Explicit constructor
tool = PythonTool(
id="lookup",
name="lookup",
description="Look up customer by ID",
fn=lookup_customer,
)
Deserialize a tool spec from a plain dict (e.g. loaded from JSON config). Routes to PythonTool, HttpTool, or MCPTool based on the type field:
from superdialog import Tool
tool = Tool.from_dict(
{"type": "http", "id": "lookup", "name": "lookup",
"description": "...", "url": "https://api.company.io/lookup"},
handler_registry={"my_fn": my_python_fn}, # optional, for type=python
)
tool = HttpTool(
id="lookup",
name="lookup",
description="Look up a customer by partial Aadhaar",
url="https://api.company.io/customer/lookup",
method="POST", # default: POST
auth={"type": "bearer", "token": os.environ["KEY"]},
input_schema=None, # inferred if not provided
)
Auth shapes in v0.2: {"type": "bearer", "token": "..."}. Additional shapes (basic, api_key, callable) are planned for v0.3.
tool = MCPTool(
id="search",
name="search",
description="Search the knowledge base",
server="https://mcp.company.io",
input_schema=None,
)
In v0.2, MCPTool forwards execute(args) to session.call_tool(self.id, args) against the configured server. Auto-discovery of all tools on an MCP server is planned for a follow-up.
LLM provider registration
register_llm_provider
register_llm_provider(
name="internal",
base_url="https://llm.company.io/v1",
api_key=os.environ["INTERNAL_KEY"],
api_style="openai", # only "openai" supported in v0.2
)
# Use anywhere after registration
dialog_machine = DialogMachine(flow=flow, llm="custom/internal/llama-3-70b-tuned")
Process-global. Once registered, custom/<name>/<model> works in DialogMachine(llm=...), set_llm(), and create_dialog_flow(llm=...).
Adapters
| Import | Purpose |
|---|
superdialog.adapters.livekit.DialogMachineLLM | LiveKit Agent(llm=...) plugin |
superdialog.adapters.pipecat.make_processor | Factory for PipeCat FrameProcessor |
superdialog.adapters.fastapi.FastAPIRouter | Mountable router: /turn, /stream, /reset |
superdialog.adapters.websocket.WebSocketRunner | Standalone WSS server for Unpod Voice Infra |
See Embedding Guides for complete integration examples.
Eval (v0.3 - planned)
from superdialog.eval import Eval
eval = Eval(
flow=flow,
corpus="tests/kyc-corpus.jsonl",
llms=["openai/gpt-5.1", "anthropic/claude-haiku-4-5", "groq/llama-3.3-70b"],
metrics=["accuracy", "latency_p95", "tool_call_correctness"],
)
report = eval.run()
report.save("reports/2026-05-19.md")
Corpus format: JSONL with {utterance, expected_response | expected_intent | expected_tool_call} records. Custom metrics are passable as callables.