Skip to content

ModelRetry not support stream output #1663

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
tianshangwuyun opened this issue May 8, 2025 · 1 comment
Open

ModelRetry not support stream output #1663

tianshangwuyun opened this issue May 8, 2025 · 1 comment
Assignees
Labels
question Further information is requested

Comments

@tianshangwuyun
Copy link

tianshangwuyun commented May 8, 2025

Question

Qwen3 thinking mode only support stream model

I find ModelRetry not support stream output
Is there any other way to support both stream output and ModelDelay simultaneously

from typing import Union

from pydantic import BaseModel

from pydantic_ai import Agent, RunContext, ModelRetry
from jsform_ai.llm import base_model




class InvalidRequest(BaseModel):
    error_message: str


Output = Union[str, InvalidRequest]
agent: Agent[dict,Output] = Agent(
    base_model,
    # output_type=str,  # type: ignore
    deps_type=dict,
    system_prompt='you are a helpful assistant.',
    retries=3
)


@agent.output_validator
async def validate_res(ctx: RunContext[dict], output: str) -> Output:
    if isinstance(output, InvalidRequest):
        return output
    print(f"output:{output}")
    num = ctx.deps["num"]
    if str(num) not in output:
        print(f"num:{num} not in output:{output}")
        ctx.deps["num"] += 1
        raise ModelRetry(f'error')
    return output

def main1():
    result = agent.run_sync(
        '猜我手里有几个苹果,1-10,猜错了就继续猜,直到猜对为止', deps={"num":1},
       
    )
    print(result.output)

async def main_stream():
    async with agent.run_stream(
        '猜我手里有几个苹果,1-10,猜错了就继续猜,直到猜对为止', deps={"num":1},
    ) as result:
        async for message in result.stream_text():  
            print(message)



async def main():
    result = await agent.run(
        '猜我手里有几个苹果,1-10,猜错了就继续猜,直到猜对为止', deps={"num":1},
    )
    print(result.output)
    

if __name__ == "__main__":
    import asyncio
    asyncio.run(main_stream())
    # main1()
@tianshangwuyun tianshangwuyun added the question Further information is requested label May 8, 2025
@DouweM
Copy link
Contributor

DouweM commented May 8, 2025

@tianshangwuyun Can you please try the iter-based streaming approach in https://ai.pydantic.dev/agents/#streaming and see if it works as expected there?

If not, are you saying that the output validator is not being called at all, or that the ModelRetry error is raised but not being handled correctly by PydanticAI to trigger the model to retry?

@DouweM DouweM self-assigned this May 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants