Skip to content

Add model_request_stream_sync to direct API #2116

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Jul 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/direct.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The following functions are available:
- [`model_request`][pydantic_ai.direct.model_request]: Make a non-streamed async request to a model
- [`model_request_sync`][pydantic_ai.direct.model_request_sync]: Make a non-streamed synchronous request to a model
- [`model_request_stream`][pydantic_ai.direct.model_request_stream]: Make a streamed async request to a model
- [`model_request_stream_sync`][pydantic_ai.direct.model_request_stream_sync]: Make a streamed sync request to a model

## Basic Example

Expand Down
194 changes: 191 additions & 3 deletions pydantic_ai_slim/pydantic_ai/direct.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,29 @@

from __future__ import annotations as _annotations

import queue
import threading
from collections.abc import Iterator
from contextlib import AbstractAsyncContextManager
from dataclasses import dataclass, field
from datetime import datetime
from types import TracebackType

from pydantic_ai.usage import Usage
from pydantic_graph._utils import get_event_loop as _get_event_loop

from . import agent, messages, models, settings
from .models import instrumented as instrumented_models
from .models import StreamedResponse, instrumented as instrumented_models

__all__ = 'model_request', 'model_request_sync', 'model_request_stream'
__all__ = (
'model_request',
'model_request_sync',
'model_request_stream',
'model_request_stream_sync',
'StreamedResponseSync',
)

STREAM_INITIALIZATION_TIMEOUT = 30


async def model_request(
Expand Down Expand Up @@ -144,7 +159,7 @@ def model_request_stream(

async def main():
messages = [ModelRequest.user_text_prompt('Who was Albert Einstein?')] # (1)!
async with model_request_stream( 'openai:gpt-4.1-mini', messages) as stream:
async with model_request_stream('openai:gpt-4.1-mini', messages) as stream:
chunks = []
async for chunk in stream:
chunks.append(chunk)
Expand Down Expand Up @@ -181,6 +196,63 @@ async def main():
)


def model_request_stream_sync(
model: models.Model | models.KnownModelName | str,
messages: list[messages.ModelMessage],
*,
model_settings: settings.ModelSettings | None = None,
model_request_parameters: models.ModelRequestParameters | None = None,
instrument: instrumented_models.InstrumentationSettings | bool | None = None,
) -> StreamedResponseSync:
"""Make a streamed synchronous request to a model.

This is the synchronous version of [`model_request_stream`][pydantic_ai.direct.model_request_stream].
It uses threading to run the asynchronous stream in the background while providing a synchronous iterator interface.

```py {title="model_request_stream_sync_example.py"}

from pydantic_ai.direct import model_request_stream_sync
from pydantic_ai.messages import ModelRequest

messages = [ModelRequest.user_text_prompt('Who was Albert Einstein?')]
with model_request_stream_sync('openai:gpt-4.1-mini', messages) as stream:
chunks = []
for chunk in stream:
chunks.append(chunk)
print(chunks)
'''
[
PartStartEvent(index=0, part=TextPart(content='Albert Einstein was ')),
PartDeltaEvent(
index=0, delta=TextPartDelta(content_delta='a German-born theoretical ')
),
PartDeltaEvent(index=0, delta=TextPartDelta(content_delta='physicist.')),
]
'''
```

Args:
model: The model to make a request to. We allow `str` here since the actual list of allowed models changes frequently.
messages: Messages to send to the model
model_settings: optional model settings
model_request_parameters: optional model request parameters
instrument: Whether to instrument the request with OpenTelemetry/Logfire, if `None` the value from
[`logfire.instrument_pydantic_ai`][logfire.Logfire.instrument_pydantic_ai] is used.

Returns:
A [sync stream response][pydantic_ai.direct.StreamedResponseSync] context manager.
"""
async_stream_cm = model_request_stream(
model=model,
messages=messages,
model_settings=model_settings,
model_request_parameters=model_request_parameters,
instrument=instrument,
)

return StreamedResponseSync(async_stream_cm)


def _prepare_model(
model: models.Model | models.KnownModelName | str,
instrument: instrumented_models.InstrumentationSettings | bool | None,
Expand All @@ -191,3 +263,119 @@ def _prepare_model(
instrument = agent.Agent._instrument_default # pyright: ignore[reportPrivateUsage]

return instrumented_models.instrument_model(model_instance, instrument)


@dataclass
class StreamedResponseSync:
"""Synchronous wrapper to async streaming responses by running the async producer in a background thread and providing a synchronous iterator.

This class must be used as a context manager with the `with` statement.
"""

_async_stream_cm: AbstractAsyncContextManager[StreamedResponse]
_queue: queue.Queue[messages.ModelResponseStreamEvent | Exception | None] = field(
default_factory=queue.Queue, init=False
)
_thread: threading.Thread | None = field(default=None, init=False)
_stream_response: StreamedResponse | None = field(default=None, init=False)
_exception: Exception | None = field(default=None, init=False)
_context_entered: bool = field(default=False, init=False)
_stream_ready: threading.Event = field(default_factory=threading.Event, init=False)

def __enter__(self) -> StreamedResponseSync:
self._context_entered = True
self._start_producer()
return self

def __exit__(
self,
_exc_type: type[BaseException] | None,
_exc_val: BaseException | None,
_exc_tb: TracebackType | None,
) -> None:
self._cleanup()

def __iter__(self) -> Iterator[messages.ModelResponseStreamEvent]:
"""Stream the response as an iterable of [`ModelResponseStreamEvent`][pydantic_ai.messages.ModelResponseStreamEvent]s."""
self._check_context_manager_usage()

while True:
item = self._queue.get()
if item is None: # End of stream
break
elif isinstance(item, Exception):
raise item
else:
yield item

def __repr__(self) -> str:
if self._stream_response:
return repr(self._stream_response)
else:
return f'{self.__class__.__name__}(context_entered={self._context_entered})'

__str__ = __repr__

def _check_context_manager_usage(self) -> None:
if not self._context_entered:
raise RuntimeError(
'StreamedResponseSync must be used as a context manager. '
'Use: `with model_request_stream_sync(...) as stream:`'
)

def _ensure_stream_ready(self) -> StreamedResponse:
self._check_context_manager_usage()

if self._stream_response is None:
# Wait for the background thread to signal that the stream is ready
if not self._stream_ready.wait(timeout=STREAM_INITIALIZATION_TIMEOUT):
raise RuntimeError('Stream failed to initialize within timeout')

if self._stream_response is None: # pragma: no cover
raise RuntimeError('Stream failed to initialize')

return self._stream_response

def _start_producer(self):
self._thread = threading.Thread(target=self._async_producer, daemon=True)
self._thread.start()

def _async_producer(self):
async def _consume_async_stream():
try:
async with self._async_stream_cm as stream:
self._stream_response = stream
# Signal that the stream is ready
self._stream_ready.set()
async for event in stream:
self._queue.put(event)
except Exception as e:
# Signal ready even on error so waiting threads don't hang
self._stream_ready.set()
self._queue.put(e)
finally:
self._queue.put(None) # Signal end

_get_event_loop().run_until_complete(_consume_async_stream())

def _cleanup(self):
if self._thread and self._thread.is_alive():
self._thread.join()

def get(self) -> messages.ModelResponse:
"""Build a ModelResponse from the data received from the stream so far."""
return self._ensure_stream_ready().get()

def usage(self) -> Usage:
"""Get the usage of the response so far."""
return self._ensure_stream_ready().usage()

@property
def model_name(self) -> str:
"""Get the model name of the response."""
return self._ensure_stream_ready().model_name

@property
def timestamp(self) -> datetime:
"""Get the timestamp of the response."""
return self._ensure_stream_ready().timestamp
99 changes: 99 additions & 0 deletions tests/test_direct.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
import asyncio
import re
from contextlib import contextmanager
from datetime import timezone
from unittest.mock import AsyncMock, patch

import pytest
from inline_snapshot import snapshot

from pydantic_ai import Agent
from pydantic_ai.direct import (
StreamedResponseSync,
_prepare_model, # pyright: ignore[reportPrivateUsage]
model_request,
model_request_stream,
model_request_stream_sync,
model_request_sync,
)
from pydantic_ai.messages import (
ModelMessage,
ModelRequest,
ModelResponse,
PartDeltaEvent,
Expand Down Expand Up @@ -76,6 +82,24 @@ def test_model_request_sync():
)


def test_model_request_stream_sync():
with model_request_stream_sync('test', [ModelRequest.user_text_prompt('x')]) as stream:
chunks = list(stream)
assert chunks == snapshot(
[
PartStartEvent(index=0, part=TextPart(content='')),
PartDeltaEvent(index=0, delta=TextPartDelta(content_delta='success ')),
PartDeltaEvent(index=0, delta=TextPartDelta(content_delta='(no ')),
PartDeltaEvent(index=0, delta=TextPartDelta(content_delta='tool ')),
PartDeltaEvent(index=0, delta=TextPartDelta(content_delta='calls)')),
]
)

repr_str = repr(stream)
assert 'TestStreamedResponse' in repr_str
assert 'test' in repr_str


async def test_model_request_stream():
async with model_request_stream('test', [ModelRequest.user_text_prompt('x')]) as stream:
chunks = [chunk async for chunk in stream]
Expand All @@ -90,6 +114,81 @@ async def test_model_request_stream():
)


def test_model_request_stream_sync_without_context_manager():
"""Test that accessing properties or iterating without context manager raises RuntimeError."""
messages: list[ModelMessage] = [ModelRequest.user_text_prompt('x')]

expected_error_msg = re.escape(
'StreamedResponseSync must be used as a context manager. Use: `with model_request_stream_sync(...) as stream:`'
)

stream_cm = model_request_stream_sync('test', messages)

stream_repr = repr(stream_cm)
assert 'StreamedResponseSync' in stream_repr
assert 'context_entered=False' in stream_repr

with pytest.raises(RuntimeError, match=expected_error_msg):
_ = stream_cm.model_name

with pytest.raises(RuntimeError, match=expected_error_msg):
_ = stream_cm.timestamp

with pytest.raises(RuntimeError, match=expected_error_msg):
stream_cm.get()

with pytest.raises(RuntimeError, match=expected_error_msg):
stream_cm.usage()

with pytest.raises(RuntimeError, match=expected_error_msg):
list(stream_cm)

with pytest.raises(RuntimeError, match=expected_error_msg):
for _ in stream_cm:
break


def test_model_request_stream_sync_exception_in_stream():
"""Test handling of exceptions raised during streaming."""
async_stream_mock = AsyncMock()
async_stream_mock.__aenter__ = AsyncMock(side_effect=ValueError('Stream error'))

stream_sync = StreamedResponseSync(_async_stream_cm=async_stream_mock)

with stream_sync:
with pytest.raises(ValueError, match='Stream error'):
list(stream_sync)


def test_model_request_stream_sync_timeout():
"""Test timeout when stream fails to initialize."""
async_stream_mock = AsyncMock()

async def slow_init():
await asyncio.sleep(0.1)

async_stream_mock.__aenter__ = AsyncMock(side_effect=slow_init)

stream_sync = StreamedResponseSync(_async_stream_cm=async_stream_mock)

with patch('pydantic_ai.direct.STREAM_INITIALIZATION_TIMEOUT', 0.01):
with stream_sync:
with pytest.raises(RuntimeError, match='Stream failed to initialize within timeout'):
stream_sync.get()


def test_model_request_stream_sync_intermediate_get():
"""Test getting properties of StreamedResponse before consuming all events."""
messages: list[ModelMessage] = [ModelRequest.user_text_prompt('x')]

with model_request_stream_sync('test', messages) as stream:
response = stream.get()
assert response is not None

usage = stream.usage()
assert usage is not None


@contextmanager
def set_instrument_default(value: bool):
"""Context manager to temporarily set the default instrumentation value."""
Expand Down