from kyvvu import Kyvvu, KyvvuBlockedError, RiskClassification, StepType, Verb
import os, sys
# --- Constructor ---
# api_url: Kyvvu platform URL (defaults to https://platform.kyvvu.com)
# api_key: your API key from `kyvvu register` (reads KV_API_KEY env var)
# agent_key: stable identifier for this agent (used for policy matching)
# risk_classification: EU AI Act risk tier (MINIMAL, LIMITED, or HIGH)
kv = Kyvvu(
api_url=os.environ.get("KV_API_URL", "https://platform.kyvvu.com"),
api_key=os.environ["KV_API_KEY"],
agent_key="my-agent",
risk_classification=RiskClassification.MINIMAL,
)
# --- Registration ---
# Registers the agent with the platform. Results are cached locally
# at ~/.kyvvu/registrations/ so repeat calls skip the network request.
# By default, only the opaque agent ID and operational fields are stored
# server-side (name, purpose, owner are NOT persisted unless you pass
# store_metadata=True).
kv.register_agent(
name="Hello Kyvvu",
purpose="Demonstration agent — shows behavioral trace and policy evaluation",
declared_tools=["call_llm", "fetch_user_data", "run_script"],
)
# --- Decorated steps ---
# @kv.step wraps each function in evaluate → execute → record:
# step_model = LLM / model call
# step_resource = external data fetch
# step_exec = code execution (triggers OWASP gate requirement)
# Verb describes the HTTP-like action (POST, GET, etc.)
@kv.step(StepType.step_model, Verb.POST)
def call_llm(prompt: str) -> str:
"""Mocked model call — returns a canned response."""
return f"(mocked response to: {prompt!r})"
@kv.step(StepType.step_resource, Verb.GET)
def fetch_user_data(user_id: str) -> dict:
"""Mocked external resource call."""
return {"user_id": user_id, "name": "Alice", "tier": "free"}
@kv.step(StepType.step_exec)
def run_script(code: str) -> str:
"""Mocked code execution — intentionally blocked by OWASP policy."""
return f"(would execute: {code!r})"
# --- Task lifecycle ---
# start_task() begins a new auditable task run.
# end_task() flushes the audit trail. error_task() records a failure.
# KyvvuBlockedError is raised when a policy blocks a step — the agent
# should handle it gracefully (log, skip, or escalate).
if __name__ == "__main__":
try:
kv.start_task()
user = fetch_user_data("user_123")
answer = call_llm(f"User {user['name']} asks: What's the weather?")
run_script(f"save_response('{answer}')") # blocked by OWASP policy
except KyvvuBlockedError as exc:
print(f"\n⛔ Policy blocked this step: {exc}")
print(f" Risk score: {exc.result.risk_score:.2f}")
print(f" Action: {exc.result.action.value}")
for p in exc.result.policies:
if p.violated:
print(f" • {p.name} ({p.severity})")
try:
kv.error_task(error=exc)
except Exception:
pass
sys.exit(1)
except Exception as exc:
try:
kv.error_task(error=exc)
except Exception:
pass
raise
else:
kv.end_task()