Skip to content

When using Chat Memory, if it is in streaming mode, an error "No StreamAdvisors available to execute" will occur. In non - streaming mode, it runs normally. #3344

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
yangbuyiya opened this issue May 27, 2025 · 7 comments

Comments

@yangbuyiya
Copy link
Contributor

yangbuyiya commented May 27, 2025

When using Chat Memory, if it is in streaming mode, an error "No StreamAdvisors available to execute" will occur. In non - streaming mode, it runs normally.

Bug description

When using Chat Memory, if it is in streaming mode, an error "No StreamAdvisors available to execute" will occur. In non - streaming mode, it runs normally.

java.lang.IllegalStateException: No StreamAdvisors available to execute

	at org.springframework.ai.chat.client.advisor.DefaultAroundAdvisorChain.lambda$nextStream$6(DefaultAroundAdvisorChain.java:119)
	Suppressed: The stacktrace has been enhanced by Reactor, refer to additional information below: 
Assembly trace from producer [reactor.core.publisher.FluxError] :
	reactor.core.publisher.Flux.error(Flux.java:858)
	org.springframework.ai.chat.client.advisor.DefaultAroundAdvisorChain.lambda$nextStream$6(DefaultAroundAdvisorChain.java:119)
Error has been observed at the following site(s):
	*____________Flux.errorat org.springframework.ai.chat.client.advisor.DefaultAroundAdvisorChain.lambda$nextStream$6(DefaultAroundAdvisorChain.java:119)
	*__Flux.deferContextualat org.springframework.ai.chat.client.advisor.DefaultAroundAdvisorChain.nextStream(DefaultAroundAdvisorChain.java:117)
	|_       Flux.doOnErrorat org.springframework.ai.chat.client.DefaultChatClient$DefaultStreamResponseSpec.lambda$doGetObservableFluxChatResponse$3(DefaultChatClient.java:528)
	|_       Flux.doFinallyat org.springframework.ai.chat.client.DefaultChatClient$DefaultStreamResponseSpec.lambda$doGetObservableFluxChatResponse$3(DefaultChatClient.java:529)
	|_    Flux.contextWriteat org.springframework.ai.chat.client.DefaultChatClient$DefaultStreamResponseSpec.lambda$doGetObservableFluxChatResponse$3(DefaultChatClient.java:530)
	*__Flux.deferContextualat org.springframework.ai.chat.client.DefaultChatClient$DefaultStreamResponseSpec.doGetObservableFluxChatResponse(DefaultChatClient.java:510)
	|_      Flux.mapNotNullat org.springframework.ai.chat.client.DefaultChatClient$DefaultStreamResponseSpec.chatResponse(DefaultChatClient.java:542)

Environment

Spring AI 1.0.0、JDK17

Build Advisors

.defaultAdvisors(
                        PromptChatMemoryAdvisor.builder(chatMemory)
                                .build())

Memory storage

    @Bean
    public ChatMemory chatMemory() {
        return MessageWindowChatMemory.builder()
                .chatMemoryRepository(new InMemoryChatMemoryRepository())
                .build();

Unit testing

    @Test
    public void test_tool() {
        String userInput = "What tools can be used?";
        System.out.println("\n>>> QUESTION: " + userInput);

        ChatClient.ChatClientRequestSpec prompt = openaiChatClientBuilder.prompt(userInput);
        

        Flux<ChatResponse> chatResponseFlux = prompt.stream().chatResponse();


        chatResponseFlux.subscribe(
                chatResponse -> {
                    AssistantMessage assistantMessage = chatResponse.getResult().getOutput();
                    String text = assistantMessage.getText();
                    System.out.print(text);
                },
                error -> System.err.println("error : " + error.getMessage()),
                () -> System.out.println("\n>>> over")
        );
     

        chatResponseFlux.blockLast();
        
    }

Expected behavior

I hope it can support streaming. Currently, the non-streaming mode can run normally.

@luguangdong
Copy link

Image
噢,我的兄弟,你可以使用这种方式来试试。

org.springframework.ai spring-ai-bom 1.0.0 pom import

@resource
private ChatMemory chatMemory;

@GetMapping("/stream")
public void stream(@RequestParam String message) {
    Flux<ChatResponse> chattedResponse = ChatClient.create(openAiChatModel)
            .prompt(Prompt.builder().messages(UserMessage.builder().text(message).build()).build())
            .advisors(
                    PromptChatMemoryAdvisor.builder(chatMemory).build()
            )
            .stream()
            .chatResponse();

    System.out.println("\n>>> QUESTION: " + message);
    chattedResponse.subscribe(chatResponse -> {
        AssistantMessage output = chatResponse.getResult().getOutput();
        String text = output.getText();
        System.out.print(text);
    });
}

@yangbuyiya
Copy link
Contributor Author

Image 噢,我的兄弟,你可以使用这种方式来试试。

org.springframework.ai spring-ai-bom 1.0.0 pom import
@resource private ChatMemory chatMemory;

@GetMapping("/stream")
public void stream(@RequestParam String message) {
    Flux<ChatResponse> chattedResponse = ChatClient.create(openAiChatModel)
            .prompt(Prompt.builder().messages(UserMessage.builder().text(message).build()).build())
            .advisors(
                    PromptChatMemoryAdvisor.builder(chatMemory).build()
            )
            .stream()
            .chatResponse();

    System.out.println("\n>>> QUESTION: " + message);
    chattedResponse.subscribe(chatResponse -> {
        AssistantMessage output = chatResponse.getResult().getOutput();
        String text = output.getText();
        System.out.print(text);
    });
}

Thank you, bro. I will use it in MCP. Your approach doesn't have much to do with the current problem.

@luguangdong
Copy link

Image

Image
可以看看这个是支持MCP,而且是基于流式的会话记忆的。

@yangbuyiya
Copy link
Contributor Author

Image

Image 可以看看这个是支持MCP,而且是基于流式的会话记忆的。

Thank you very much. Just like you, I can't make it run properly. Currently, there is an error message: "No StreamAdvisors available to execute".

Code:

    @Bean(name = "openaiChatClientBuilder")
    public ChatClient chatClient(OpenAiChatModel openAiChatModel,
                                 @Qualifier("syncMcpToolCallbackProvider") ToolCallbackProvider syncMcpToolCallbackProvider,
                                 ChatMemory chatMemory) {
        DefaultChatClientBuilder defaultChatClientBuilder = new DefaultChatClientBuilder(openAiChatModel,
                ObservationRegistry.NOOP, (ChatClientObservationConvention) null);
        return defaultChatClientBuilder
                .defaultToolCallbacks(syncMcpToolCallbackProvider.getToolCallbacks())
                .defaultAdvisors(
                        PromptChatMemoryAdvisor.builder(chatMemory).build())
                .build();
    }

@sunyuhan1998
Copy link
Contributor

Hi @yangbuyiya

the issue you reported is actually caused by your test code. In your test code, you first used chatResponseFlux.subscribe, and then called chatResponseFlux.blockLast. This leads to multiple subscriptions to the same Flux, which in turn causes org.springframework.ai.chat.client.advisor.DefaultAroundAdvisorChain#nextStream to be executed extra times unnecessarily.

As a result, after PromptChatMemoryAdvisor and ChatModelStreamAdvisor have already completed their normal execution of adviseStream, the method org.springframework.ai.chat.client.advisor.DefaultAroundAdvisorChain#nextStream is incorrectly entered again.

In short, do not subscribe to the same Flux multiple times. You can write it like this instead:

...
chatResponseFlux
    .doOnNext(chatResponse -> {
        AssistantMessage assistantMessage = chatResponse.getResult().getOutput();
        String text = assistantMessage.getText();
        System.out.print(text);
    })
    .doOnError(error -> System.err.println("error: " + error.getMessage()))
    .doOnComplete(() -> System.out.println("\n>>> over"))
    .blockLast();

@sunyuhan1998
Copy link
Contributor

sunyuhan1998 commented May 27, 2025

@yangbuyiya By the way, let me add one more thing. As I mentioned earlier, the message "No StreamAdvisors available to execute" appears because the method org.springframework.ai.chat.client.advisor.DefaultAroundAdvisorChain#nextStream is incorrectly re-entered multiple times. This causes the system to keep trying to retrieve the next Advisor from the queue even though all Advisors have already been executed. You can also simply understand that Flux<ChatResponse> cannot be easily resubscribed to, just like many streams; once it has been processed and completed, you cannot iterate through it again.

Therefore, in conclusion, I don't think this is an issue with Spring AI itself.

@yangbuyiya
Copy link
Contributor Author

@sunyuhan1998 Thank you so much. I just gave it a try and the streaming output can work properly now. I still have a lot to learn!!!!!!!!!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants