# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 """End-to-end tool calling tests against the Dynamo frontend. Validates: - Streaming protocol shape (chat.completion.chunk SSE) - Tool-call reconstruction from streamed deltas - Tool-call argument JSON validated against the declared JSON Schema - tool_choice variants: auto / required / none / named function - Multi-turn conversations carrying tool results - Multi-tool parallel calls """ from __future__ import annotations import json import logging import os import shutil import signal import time from dataclasses import dataclass from typing import Any, Generator import psutil import pytest from tests.conftest import EtcdServer, NatsServer from tests.utils.managed_process import ManagedProcess from tests.utils.payloads import check_models_api from tests.utils.port_utils import allocate_ports openai = pytest.importorskip("openai") OpenAI = openai.OpenAI jsonschema = pytest.importorskip("jsonschema") Draft7Validator = jsonschema.Draft7Validator logger = logging.getLogger(__name__) MODEL_NAME = "Qwen/Qwen3-0.6B" pytestmark = [ pytest.mark.sglang, pytest.mark.e2e, pytest.mark.gpu_1, pytest.mark.integration, pytest.mark.pre_merge, pytest.mark.model(MODEL_NAME), pytest.mark.timeout(300), ] # --------------------------------------------------------------------------- # Process management # --------------------------------------------------------------------------- def _check_ready(response) -> bool: try: return (response.json() or {}).get("status") == "ready" except ValueError: return False def _prepare_log_dir(request, suffix: str) -> str: log_dir = f"{request.node.name}_{suffix}" shutil.rmtree(log_dir, ignore_errors=True) return log_dir # Command-line patterns used to identify processes this fixture spawned. Each # pattern is distinctive enough that a targeted sweep between topology # switches will not touch unrelated processes in the system. _SGLANG_PROCESS_PATTERNS: tuple[str, ...] = ( "-m dynamo.sglang", "-m dynamo.frontend", "SGLANG:EngineCore", "sglang::scheduler", ) def _cleanup_sglang_stragglers(timeout: float = 10.0) -> None: """Force-kill any surviving SGLang / dynamo-frontend processes. ``ManagedProcess`` is configured with ``terminate_all_matching_process_names =False`` so its built-in straggler sweep is disabled (it does a system-wide pkill, which is not xdist-safe). But SGLang's ``EngineCore`` child processes can outlive the parent's SIGTERM and keep GPU memory pinned, which would prevent the next topology's worker from initializing. This helper performs a narrowly-scoped sweep using cmdline/name patterns that only match processes spawned by this fixture, and waits for them to exit before returning. """ procs: list[psutil.Process] = [] for proc in psutil.process_iter(["pid", "name", "cmdline"]): try: cmdline = " ".join(proc.info.get("cmdline") or []) name = proc.info.get("name") or "" if any(pat in cmdline or pat in name for pat in _SGLANG_PROCESS_PATTERNS): procs.append(proc) except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): continue if not procs: return logger.info( "Post-teardown straggler sweep: sending SIGKILL to %d process(es): %s", len(procs), [(p.pid, (p.info.get("name") or "")[:40]) for p in procs], ) for proc in procs: try: proc.send_signal(signal.SIGKILL) except (psutil.NoSuchProcess, psutil.AccessDenied): continue # Wait for processes to actually exit before the next topology boots. gone, alive = psutil.wait_procs(procs, timeout=timeout) if alive: logger.warning( "Straggler(s) still alive after %ss: %s", timeout, [(p.pid, p.name()) for p in alive], ) # Topology identifiers. Both drive the exact same test bodies; they differ in # where the SGLang chat processor / tool-call parser / reasoning parser live: # # * ``chat_processor_frontend`` — Python SGLang chat processor on the # frontend (``--dyn-chat-processor sglang``) with parsers declared as # frontend flags. Worker is a plain ``dynamo.sglang`` engine. # # * ``rust_parsers`` — plain Rust frontend (``dynamo.frontend``) with no # chat processor or parser flags. Parsers are declared on the worker # (``--dyn-reasoning-parser`` / ``--dyn-tool-call-parser``) and # propagated to the frontend via the model runtime config registered at # discovery time. TOPOLOGIES = ("chat_processor_frontend", "rust_parsers") class WorkerProcess(ManagedProcess): """backend worker for the tool-calling tests.""" def __init__(self, request, *, system_port: int, topology: str): env = os.environ.copy() env["DYN_LOG"] = "info" env["DYN_SYSTEM_PORT"] = str(system_port) env["DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS"] = '["generate"]' command = [ "python3", "-m", "dynamo.sglang", "--model-path", MODEL_NAME, "--served-model-name", MODEL_NAME, "--trust-remote-code", ] if topology == "rust_parsers": command += [ "--dyn-reasoning-parser", "qwen3", "--dyn-tool-call-parser", "qwen25", ] super().__init__( command=command, env=env, health_check_urls=[ (f"http://localhost:{system_port}/health", _check_ready), ], timeout=600, display_output=True, terminate_all_matching_process_names=False, stragglers=["SGLANG:EngineCore"], straggler_commands=["-m dynamo.sglang"], log_dir=_prepare_log_dir(request, f"sglang-worker-{topology}"), ) class ToolCallingFrontendProcess(ManagedProcess): """Frontend HTTP ingress. The chat processor + parser flags are attached only for the ``chat_processor_frontend`` topology. The ``rust_parsers`` topology uses a plain Rust frontend and relies on parsers declared on the worker side. """ def __init__(self, request, *, frontend_port: int, topology: str): env = os.environ.copy() env["DYN_LOG"] = "info" env.pop("DYN_SYSTEM_PORT", None) command = [ "python3", "-m", "dynamo.frontend", "--http-port", str(frontend_port), "--router-mode", "round-robin", ] if topology == "chat_processor_frontend": command += [ "--dyn-chat-processor", "sglang", "--tool-call-parser", "qwen25", "--reasoning-parser", "qwen3", "--trust-remote-code", ] super().__init__( command=command, env=env, health_check_urls=[ (f"http://localhost:{frontend_port}/v1/models", check_models_api), ], timeout=240, display_output=True, terminate_all_matching_process_names=False, straggler_commands=["-m dynamo.frontend"], log_dir=_prepare_log_dir(request, f"frontend-{topology}"), ) @pytest.fixture(scope="module") def runtime_services(request) -> Generator[None, None, None]: """Module-scoped NATS + Etcd for the tool calling stack. Inlined (rather than depending on the function-scoped ``runtime_services_dynamic_ports``) so the worker + frontend processes can be reused across all tests in this module. """ with NatsServer(request, port=0) as nats, EtcdServer(request, port=0) as etcd: orig_nats = os.environ.get("NATS_SERVER") orig_etcd = os.environ.get("ETCD_ENDPOINTS") os.environ["NATS_SERVER"] = f"nats://localhost:{nats.port}" os.environ["ETCD_ENDPOINTS"] = f"http://localhost:{etcd.port}" try: yield finally: if orig_nats is not None: os.environ["NATS_SERVER"] = orig_nats else: os.environ.pop("NATS_SERVER", None) if orig_etcd is not None: os.environ["ETCD_ENDPOINTS"] = orig_etcd else: os.environ.pop("ETCD_ENDPOINTS", None) @pytest.fixture(scope="module", params=TOPOLOGIES) def tool_calling_services( request, runtime_services, predownload_models ) -> Generator[int, None, None]: """Start the SGLang worker + frontend for the selected topology. Parameterized so every test runs once per entry in :data:`TOPOLOGIES`, giving side-by-side coverage of the Python chat-processor frontend and the plain Rust frontend with worker-declared parsers. Yields the frontend HTTP port. """ topology: str = request.param frontend_port, system_port = allocate_ports(count=2, start_port=10000) try: with WorkerProcess(request, system_port=system_port, topology=topology): # Allow worker to register with discovery. time.sleep(2) with ToolCallingFrontendProcess( request, frontend_port=frontend_port, topology=topology ): logger.info( "Tool calling stack ready (topology=%s frontend=%d worker_system=%d)", topology, frontend_port, system_port, ) yield frontend_port finally: # ManagedProcess.__exit__ has run for both context managers. Do a # narrowly-scoped straggler sweep so any surviving EngineCore / worker # / frontend process is gone before the next topology boots — otherwise # the next worker would race against pinned GPU memory or a stale # discovery registration. Followed by a brief settle delay so the OS # reclaims bound ports and the GPU frees its VRAM. _cleanup_sglang_stragglers() time.sleep(3) @pytest.fixture(scope="module") def client(tool_calling_services: int) -> OpenAI: return OpenAI( api_key="EMPTY", base_url=f"http://localhost:{tool_calling_services}/v1" ) @pytest.fixture(scope="module") def model() -> str: return MODEL_NAME # --------------------------------------------------------------------------- # Tool definitions # --------------------------------------------------------------------------- TOOLS_WEATHER = [ { "type": "function", "function": { "name": "get_weather", "description": "Get the current weather for a city", "parameters": { "type": "object", "properties": { "city": {"type": "string"}, "unit": { "type": "string", "enum": ["celsius", "fahrenheit"], }, }, "required": ["city"], "additionalProperties": True, }, }, } ] TOOLS_SEARCH = [ { "type": "function", "function": { "name": "search_web", "description": "Search the web for information", "parameters": { "type": "object", "properties": { "query": {"type": "string"}, "num_results": {"type": "integer"}, }, "required": ["query"], "additionalProperties": True, }, }, } ] TOOLS_CALCULATOR = [ { "type": "function", "function": { "name": "calculate", "description": "Evaluate a mathematical expression", "parameters": { "type": "object", "properties": { "expression": {"type": "string"}, }, "required": ["expression"], "additionalProperties": True, }, }, } ] TOOLS_COMPLEX_ARGS = [ { "type": "function", "function": { "name": "create_event", "description": "Create a calendar event with attendees and recurrence", "parameters": { "type": "object", "properties": { "title": {"type": "string"}, "start_time": {"type": "string"}, "end_time": {"type": "string"}, "attendees": { "type": "array", "items": { "type": "object", "properties": { "name": {"type": "string"}, "email": {"type": "string"}, "role": { "type": "string", "enum": ["required", "optional", "organizer"], }, }, "required": ["email"], "additionalProperties": True, }, }, "recurrence": { "type": "object", "properties": { "frequency": { "type": "string", "enum": ["daily", "weekly", "monthly"], }, "interval": {"type": "integer"}, "count": {"type": "integer"}, }, "additionalProperties": True, }, "location": {"type": "string"}, "description": {"type": "string"}, }, "required": ["title", "start_time", "end_time"], "additionalProperties": True, }, }, } ] TOOLS_DATABASE = [ { "type": "function", "function": { "name": "query_database", "description": "Execute a SQL query against the database", "parameters": { "type": "object", "properties": { "sql": {"type": "string"}, "params": { "type": "array", "items": {"type": "string"}, }, "database": { "type": "string", "enum": ["users", "orders", "products"], }, }, "required": ["sql", "database"], "additionalProperties": True, }, }, } ] TOOLS_GET_TIME = [ { "type": "function", "function": { "name": "get_time", "description": "Get the current time in a timezone", "parameters": { "type": "object", "properties": { "timezone": {"type": "string"}, }, "required": ["timezone"], "additionalProperties": True, }, }, } ] ALL_TOOLS = ( TOOLS_WEATHER + TOOLS_SEARCH + TOOLS_CALCULATOR + TOOLS_COMPLEX_ARGS + TOOLS_DATABASE ) # --------------------------------------------------------------------------- # Streaming helpers # --------------------------------------------------------------------------- def tool_schema_map(tools: list[dict[str, Any]]) -> dict[str, dict[str, Any]]: out: dict[str, dict[str, Any]] = {} for tool in tools: fn = tool["function"] out[fn["name"]] = fn["parameters"] return out @dataclass class StreamResult: content: str reasoning_content: str tool_calls: list[dict[str, Any]] finish_reason: str | None model: str chunks: int ttft_ms: float raw_chunks: list[Any] def collect_stream(stream) -> StreamResult: content_parts: list[str] = [] reasoning_parts: list[str] = [] tool_calls_by_index: dict[int, dict[str, Any]] = {} finish_reason = None model = "" chunk_count = 0 raw_chunks: list[Any] = [] t0 = time.monotonic() ttft_ms = 0.0 for chunk in stream: raw_chunks.append(chunk) chunk_count += 1 if chunk_count == 1: ttft_ms = (time.monotonic() - t0) * 1000.0 model = chunk.model for choice in chunk.choices: delta = choice.delta if getattr(delta, "content", None): content_parts.append(delta.content) if getattr(delta, "reasoning_content", None): reasoning_parts.append(delta.reasoning_content) if getattr(delta, "tool_calls", None): for tc in delta.tool_calls: idx = tc.index entry = tool_calls_by_index.setdefault( idx, { "id": "", "type": "function", "function": {"name": "", "arguments": ""}, }, ) if tc.id: if entry["id"] and entry["id"] != tc.id: raise AssertionError( f"Tool call id changed within same index {idx}: " f"{entry['id']} -> {tc.id}" ) entry["id"] = tc.id if tc.type: entry["type"] = tc.type if tc.function: if tc.function.name: if ( entry["function"]["name"] and entry["function"]["name"] != tc.function.name ): raise AssertionError( f"Tool name changed within same index {idx}: " f"{entry['function']['name']} -> {tc.function.name}" ) entry["function"]["name"] = tc.function.name if tc.function.arguments: entry["function"]["arguments"] += tc.function.arguments if choice.finish_reason: finish_reason = choice.finish_reason ordered_tool_calls = [tool_calls_by_index[i] for i in sorted(tool_calls_by_index)] return StreamResult( content="".join(content_parts), reasoning_content="".join(reasoning_parts), tool_calls=ordered_tool_calls, finish_reason=finish_reason, model=model, chunks=chunk_count, ttft_ms=ttft_ms, raw_chunks=raw_chunks, ) def stream_chat( client: OpenAI, model: str, *, messages: list[dict[str, Any]], tools: list[dict[str, Any]] | None = None, max_tokens: int = 4096, **kwargs, ) -> StreamResult: req: dict[str, Any] = { "model": model, "messages": messages, "stream": True, "max_tokens": max_tokens, } if tools is not None: req["tools"] = tools req.update(kwargs) stream = client.chat.completions.create(**req) return collect_stream(stream) def parse_and_validate_tool_call( tc: dict[str, Any], schema_by_name: dict[str, dict[str, Any]], *, expected_name: str | None = None, ) -> dict[str, Any]: assert tc["type"] == "function", f"unexpected tool type: {tc['type']!r}" assert tc["id"], "tool call id must be non-empty" fn_name = tc["function"]["name"] assert fn_name, "tool call function name must be non-empty" if expected_name is not None: assert fn_name == expected_name, f"expected {expected_name!r}, got {fn_name!r}" assert fn_name in schema_by_name, f"unknown tool name {fn_name!r}" args_str = tc["function"]["arguments"] assert args_str, "tool call arguments must be non-empty" try: args = json.loads(args_str) except json.JSONDecodeError as e: raise AssertionError(f"arguments are not valid JSON: {args_str!r}") from e assert isinstance(args, dict), f"arguments must decode to object, got {type(args)}" validator = Draft7Validator(schema_by_name[fn_name]) errors = sorted(validator.iter_errors(args), key=lambda e: list(e.path)) if errors: rendered = "; ".join( f"path={list(err.path)} message={err.message}" for err in errors ) raise AssertionError(f"arguments failed schema validation: {rendered}") return args def assert_finish_reason(result: StreamResult, allowed: set[str]) -> None: assert result.finish_reason in allowed, ( f"unexpected finish_reason={result.finish_reason!r}, " f"allowed={sorted(allowed)}" ) def assistant_tool_message_from_result(result: StreamResult) -> dict[str, Any]: return { "role": "assistant", "content": result.content or None, "tool_calls": result.tool_calls, } # --------------------------------------------------------------------------- # Protocol / contract tests # --------------------------------------------------------------------------- class TestToolCallingProtocol: def test_stream_has_required_chunk_shape(self, client: OpenAI, model: str): stream = client.chat.completions.create( model=model, messages=[{"role": "user", "content": "What's the weather in Berlin?"}], tools=TOOLS_WEATHER, stream=True, max_tokens=256, ) chunk_count = 0 saw_finish = False for chunk in stream: chunk_count += 1 assert chunk.id assert chunk.model == model or isinstance(chunk.model, str) assert chunk.object == "chat.completion.chunk" assert chunk.created > 0 assert len(chunk.choices) >= 1 for choice in chunk.choices: assert choice.index == 0 if choice.finish_reason is not None: saw_finish = True assert choice.finish_reason in {"stop", "tool_calls", "length"} assert chunk_count > 0 assert saw_finish, "stream never emitted a finish_reason" def test_single_tool_call_schema_valid(self, client: OpenAI, model: str): result = stream_chat( client, model, messages=[{"role": "user", "content": "What's the weather in Tokyo?"}], tools=TOOLS_WEATHER, ) assert_finish_reason(result, {"tool_calls"}) assert len(result.tool_calls) >= 1 schema = tool_schema_map(TOOLS_WEATHER) args = parse_and_validate_tool_call( result.tool_calls[0], schema, expected_name="get_weather" ) assert "city" in args assert isinstance(args["city"], str) assert args["city"] def test_tool_choice_required_forces_a_tool_call(self, client: OpenAI, model: str): result = stream_chat( client, model, messages=[{"role": "user", "content": "Hello there."}], tools=TOOLS_WEATHER, tool_choice="required", ) assert_finish_reason(result, {"tool_calls"}) assert len(result.tool_calls) >= 1 # Intent of this test: verify tool_choice=required forces a call. # The prompt doesn't warrant a tool call, so a small model may # hallucinate values for optional fields. Validate only that the # call is well-formed and the required fields are present; don't # enforce the full schema (e.g. enum values on optional fields). schema = tool_schema_map(TOOLS_WEATHER) for tc in result.tool_calls: assert tc["type"] == "function" assert tc["id"] fn_name = tc["function"]["name"] assert fn_name in schema, f"unknown tool name {fn_name!r}" args = json.loads(tc["function"]["arguments"]) assert isinstance(args, dict) for required_field in schema[fn_name].get("required", []): assert ( required_field in args ), f"{fn_name} missing required field {required_field!r}" def test_tool_choice_none_suppresses_tool_calls(self, client: OpenAI, model: str): result = stream_chat( client, model, messages=[{"role": "user", "content": "What's the weather in Paris?"}], tools=TOOLS_WEATHER, tool_choice="none", ) assert_finish_reason(result, {"stop"}) assert result.tool_calls == [] assert result.content.strip() def test_named_tool_choice_forces_specific_function( self, client: OpenAI, model: str ): result = stream_chat( client, model, messages=[{"role": "user", "content": "What's the weather in Paris?"}], tools=TOOLS_WEATHER, tool_choice={"type": "function", "function": {"name": "get_weather"}}, ) assert_finish_reason(result, {"tool_calls"}) assert len(result.tool_calls) >= 1 schema = tool_schema_map(TOOLS_WEATHER) for tc in result.tool_calls: parse_and_validate_tool_call(tc, schema, expected_name="get_weather") def test_parallel_multi_tool_request_includes_all_expected_tools( self, client: OpenAI, model: str ): result = stream_chat( client, model, messages=[ { "role": "user", "content": ( "Do all three of these with tools: " "1) weather in Paris, " "2) search the web for latest Python release, " "3) calculate 15 * 23 + 7." ), } ], tools=TOOLS_WEATHER + TOOLS_SEARCH + TOOLS_CALCULATOR, parallel_tool_calls=True, ) assert_finish_reason(result, {"tool_calls"}) # Models sometimes batch only a subset and emit follow-up calls in # later turns; require at least 2 distinct tools rather than all 3. schemas = tool_schema_map(TOOLS_WEATHER + TOOLS_SEARCH + TOOLS_CALCULATOR) names: set[str] = set() for tc in result.tool_calls: parse_and_validate_tool_call(tc, schemas) names.add(tc["function"]["name"]) assert len(names) >= 2, f"expected at least 2 distinct tools, got {names}" def test_array_argument_schema_valid(self, client: OpenAI, model: str): tools = [ { "type": "function", "function": { "name": "send_emails", "description": "Send emails", "parameters": { "type": "object", "properties": { "recipients": { "type": "array", "items": {"type": "string"}, }, "subject": {"type": "string"}, "body": {"type": "string"}, }, "required": ["recipients", "subject", "body"], }, }, } ] result = stream_chat( client, model, messages=[ { "role": "user", "content": ( "Send an email with subject 'Team Update' and body " "'Meeting at 3pm' to alice@example.com, bob@example.com, " "and carol@example.com." ), } ], tools=tools, tool_choice={"type": "function", "function": {"name": "send_emails"}}, ) assert_finish_reason(result, {"tool_calls"}) schema = tool_schema_map(tools) args = parse_and_validate_tool_call( result.tool_calls[0], schema, expected_name="send_emails" ) assert isinstance(args["recipients"], list) assert len(args["recipients"]) >= 3 def test_no_tools_is_plain_text(self, client: OpenAI, model: str): result = stream_chat( client, model, messages=[{"role": "user", "content": "What is the capital of France?"}], ) assert_finish_reason(result, {"stop"}) assert result.tool_calls == [] assert result.content.strip() # --------------------------------------------------------------------------- # Multi-turn contract tests # --------------------------------------------------------------------------- class TestToolCallingMultiTurn: def test_tool_result_is_consumed_and_final_answer_is_text( self, client: OpenAI, model: str ): schemas = tool_schema_map(TOOLS_WEATHER) first = stream_chat( client, model, messages=[{"role": "user", "content": "What is the weather in London?"}], tools=TOOLS_WEATHER, ) assert_finish_reason(first, {"tool_calls"}) assert len(first.tool_calls) >= 1 parse_and_validate_tool_call( first.tool_calls[0], schemas, expected_name="get_weather" ) second = stream_chat( client, model, messages=[ {"role": "user", "content": "What is the weather in London?"}, assistant_tool_message_from_result(first), { "role": "tool", "tool_call_id": first.tool_calls[0]["id"], "content": json.dumps( {"temperature": 15, "unit": "celsius", "condition": "cloudy"} ), }, ], tools=TOOLS_WEATHER, ) assert_finish_reason(second, {"stop"}) assert second.tool_calls == [] assert second.content.strip() assert "15" in second.content or "cloud" in second.content.lower() def test_chained_tool_use_search_then_calculate(self, client: OpenAI, model: str): schemas = tool_schema_map(TOOLS_SEARCH + TOOLS_CALCULATOR) messages: list[dict[str, Any]] = [ { "role": "user", "content": ( "Search the web for the population of Tokyo, " "then calculate what 10% of that number is." ), } ] step1 = stream_chat( client, model, messages=messages, tools=TOOLS_SEARCH + TOOLS_CALCULATOR ) assert_finish_reason(step1, {"tool_calls"}) assert len(step1.tool_calls) >= 1 parse_and_validate_tool_call(step1.tool_calls[0], schemas) messages.append(assistant_tool_message_from_result(step1)) messages.append( { "role": "tool", "tool_call_id": step1.tool_calls[0]["id"], "content": json.dumps( { "results": [ {"title": "Tokyo population", "snippet": "13,960,000"} ] } ), } ) step2 = stream_chat( client, model, messages=messages, tools=TOOLS_SEARCH + TOOLS_CALCULATOR ) # Small models sometimes short-circuit and compute the answer in # their reasoning instead of chaining a second tool call. Accept # either path: (a) another tool call to `calculate`, or (b) a # direct text answer containing the correct result. assert_finish_reason(step2, {"tool_calls", "stop"}) if step2.finish_reason == "tool_calls": assert len(step2.tool_calls) >= 1 args2 = parse_and_validate_tool_call(step2.tool_calls[0], schemas) assert step2.tool_calls[0]["function"]["name"] == "calculate" assert "13960000" in args2["expression"].replace( ",", "" ) or "1396000" in args2["expression"].replace(",", "") messages.append(assistant_tool_message_from_result(step2)) messages.append( { "role": "tool", "tool_call_id": step2.tool_calls[0]["id"], "content": "1396000", } ) step3 = stream_chat( client, model, messages=messages, tools=TOOLS_SEARCH + TOOLS_CALCULATOR ) assert_finish_reason(step3, {"stop"}) assert step3.tool_calls == [] assert "1396000" in step3.content.replace(",", "") else: # Short-circuit path: model did the math itself. Just verify # the final answer is present in the text. assert step2.tool_calls == [] assert "1396000" in step2.content.replace(",", "") def test_multiple_prior_tool_results_synthesize_to_text( self, client: OpenAI, model: str ): result = stream_chat( client, model, messages=[ {"role": "user", "content": "Get the weather in Tokyo and Paris."}, { "role": "assistant", "content": None, "tool_calls": [ { "id": "call_001", "type": "function", "function": { "name": "get_weather", "arguments": json.dumps({"city": "Tokyo"}), }, }, { "id": "call_002", "type": "function", "function": { "name": "get_weather", "arguments": json.dumps({"city": "Paris"}), }, }, ], }, { "role": "tool", "tool_call_id": "call_001", "content": json.dumps( {"temperature": 22, "unit": "celsius", "condition": "sunny"} ), }, { "role": "tool", "tool_call_id": "call_002", "content": json.dumps( {"temperature": 18, "unit": "celsius", "condition": "rainy"} ), }, ], tools=TOOLS_WEATHER, ) assert_finish_reason(result, {"stop"}) assert result.tool_calls == [] assert result.content.strip() lower = result.content.lower() assert "tokyo" in lower or "paris" in lower # --------------------------------------------------------------------------- # Model-behavior smoke tests # These are intentionally looser because the model may vary. # --------------------------------------------------------------------------- class TestToolCallingModelBehavior: def test_many_tools_prefers_calculator_for_math_question( self, client: OpenAI, model: str ): result = stream_chat( client, model, messages=[ {"role": "user", "content": "What is 2^10? Use a tool if helpful."} ], tools=ALL_TOOLS, ) assert result.finish_reason in {"stop", "tool_calls"} if result.finish_reason == "tool_calls": assert len(result.tool_calls) >= 1 assert result.tool_calls[0]["function"]["name"] == "calculate" def test_unicode_arguments_are_preserved(self, client: OpenAI, model: str): result = stream_chat( client, model, messages=[ { "role": "user", "content": "What's the weather in Zürich, Switzerland?", } ], tools=TOOLS_WEATHER, ) assert result.finish_reason in {"stop", "tool_calls"} if result.finish_reason == "tool_calls": schema = tool_schema_map(TOOLS_WEATHER) args = parse_and_validate_tool_call( result.tool_calls[0], schema, expected_name="get_weather" ) assert args["city"] def test_system_instruction_encourages_tool_use(self, client: OpenAI, model: str): result = stream_chat( client, model, messages=[ { "role": "system", "content": ( "You are a careful weather assistant. " "Always use the get_weather tool for weather questions." ), }, {"role": "user", "content": "How's the weather in Sydney?"}, ], tools=TOOLS_WEATHER, ) assert result.finish_reason in {"stop", "tool_calls"} if result.finish_reason == "tool_calls": schema = tool_schema_map(TOOLS_WEATHER) parse_and_validate_tool_call( result.tool_calls[0], schema, expected_name="get_weather" )