Skip to content

Conversation

@igordayen
Copy link
Contributor

@igordayen igordayen commented Nov 13, 2025

Streaming Converter Implementation for LLM JSON Processing

Overview

This PR introduces a reactive streaming converter for processing JSONL
(JSON Lines) responses from LLMs, enabling real-time streaming of
structured objects and thinking content.

Key Components

🔧 Core Implementation

StreamingJacksonOutputConverter

  • Extends JacksonOutputConverter to inherit schema injection capabilities
  • Converts JSONL input to reactive Flux streams
  • Supports mixed content with both objects and thinking blocks via
    StreamingEvent
  • Provides resilient error handling with proper logging

StreamingEvent

  • Sealed class supporting Object and Thinking event types
  • Includes Either-like fold() method for functional composition
  • Located in embabel-common-core for cross-product reusability

🛡️ Error Handling & Resilience

  • Graceful degradation: Continues processing valid lines after individual
    failures
  • Fail-fast on critical errors: Terminates stream on JSON parsing
    exceptions
  • Comprehensive logging: DEBUG/WARN/ERROR levels with structured messages
  • Production-ready: Uses inherited logger from parent converter

Configuration Framework

StreamingConfigProperties

  • Spring Boot configuration properties with sensible defaults
  • Supports buffer, timeout, retry, and error handling configuration
  • Uses embabel.platform.streaming namespace for consistency
  • Ready for externalization when production tuning is needed

###Package Structure

embabel-common-core/
├── com/embabel/common/core/streaming/
│ ├── StreamingEvent.kt # Core event types
│ └── config/
│ └── StreamingConfigProperties.kt # Configuration properties

embabel-common-ai/
└── com/embabel/common/ai/converters/streaming/
└── StreamingJacksonOutputConverter.kt # JSONL converter
implementation

Usage Examples

  Basic Object Streaming

  val converter = StreamingJacksonOutputConverter(MyObject::class.java,
  objectMapper)
  val objectStream: Flux<MyObject> = converter.convertStream(jsonlResponse)

  Mixed Content with Thinking

  val eventStream: Flux<StreamingEvent<MyObject>> =
      converter.convertStreamWithThinking(mixedResponse)

  eventStream.subscribe { event ->
      event.fold(
          left = { thinking -> logger.info("LLM reasoning: {}", thinking)
  },
          right = { obj -> processObject(obj) }
      )
  }

Benefits

  1. Real-time LLM Streaming - Process objects as they arrive instead of
    waiting for complete response
  2. Thinking Support - Capture LLM reasoning process alongside structured
    outputs
  3. Schema Validation - Inherits JSON schema injection for proper LLM
    formatting
  4. Production Ready - Robust error handling and logging for operational
    environments
  5. Extensible - Configuration framework ready for future tuning needs

Next Steps

This foundation enables the next phase of LLM streaming implementation:

  • SPI streaming interfaces
  • PromptRunner integration
  • ChatClient streaming support
  • End-to-end integration testing

@igordayen igordayen changed the title Streaming Converter Implementation for LLM JSONL Processing Streaming Converter Implementation for LLM JSON Processing Nov 13, 2025
@poutsma
Copy link

poutsma commented Nov 13, 2025

Here are my thoughts:

By extending JacksonOutputConverter, the StreamingJacksonOutputConverter looses the filtering capabilities that the FilteringJacksonOutputConverter introduces. Unless this is by design, my suggestion would be to extend from FilteringJacksonOutputConverter instead.

More in general, I can see how the StreamingJacksonOutputConverter could potentially contribute to streaming support in Embabel. However, without having the complete picture, it is hard to determine the context in which the StreamingJacksonOutputConverter will be used, and therefore hard to determine which parts of the code are complete, which are lacking, and which are unnecessary.

If we look at streaming from the perspective of the embabel-agent module, I would expect that any streaming solution Embabel offers will ultimately rely on the streaming capabilities of Spring AI's ChatClient, i.e. ChatClient.StreamResponseSpec. This means that, somewhere in ChatClientLlmOperations, we would have to do something like:

chatClient
    .prompt(springAiPrompt)
    .toolCallbacks(interaction.toolCallbacks)
    .options(chatOptions)
    .stream()

offering us a Flux of response chunks.
It is hard to find definite documentation on this, but AFACT Spring AI simply surfaces whatever the underlying model streams, and does not “tokenize” or re-chunk the text itself. This means that we would have to tokenize the response chunks into lines ourselves, by buffering them until a EOL character appears.
However, the StreamingJacksonOutputConverter assumes that the input text is a multi-line string, as the first thing it does it split the string into lines. That seems odd to me, and potentially unnecessary.

So in short, I don't think we can merge this PR until we have an idea of what the integration with PromptRunner and ChatClient will look like.

@igordayen
Copy link
Contributor Author

igordayen commented Nov 13, 2025

Thanks for thorough review Arjen.

converter in common represents foundation that gets consumed (thru yet another PR) in embabel-agent.

here is schematically chain of invocations in agent repo:

OperationContextPromptRunner.stream() →
  StreamingPromptRunnerOperationsImpl.createObjectList() →
  ChatClientLlmOperations.streamingMethod() → SpringAI.ChatClient.stream()
  → StreamingJacksonOutputConverter.convertStream() → Flux<T>

This PR is WIP and can not be commited prior to merging artifacts in common as it would break Github build.

Here is usage example:

context.ai()
      .withAutoLlm()
      .stream()
      .createObjectList(ItemClass::class.java)
      .subscribe { item -> println("Received: $item") }

as for missing filtering capabilities - will address this gap accordingly.

Regarding formatting. prompt follows JSONL standard: RFC 7464

see test below:

`convertStream should delegate to parent convert method for each line`() 

on multi-object processing.

@igordayen igordayen changed the title Streaming Converter Implementation for LLM JSON Processing Streaming Converter Implementation for LLM JSONL Processing Nov 13, 2025
@igordayen
Copy link
Contributor Author

added streaming converter Filtering support per Arjen's recommendation.

Please refer to comprehensive test:

streaming converter should handle filtering with actual streaming for multiple objects`

Copy link

@poutsma poutsma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have left several comments that aim to simplify this PR by reducing its public interface. When we have resolved those, I think it's ready to be merged.

Comment on lines +33 to +37
* val runner: PromptRunner = context.ai().autoLlm()
* if (runner.supportsStreaming()) {
* val capability: StreamingCapability = runner.stream()
* val operations = capability as StreamingPromptRunnerOperations (or use asStreaming extension function)
* // Use streaming operations...
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what the point of StreamingCapability is.
Looking at this example, it appears to me that runner.stream() could return StreamingPromptRunnerOperations directly, to save us extra cast in line 36.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is part which was not happy either.
image

PromptRunner::stream() ==> UOE, while in
StreamingPromptRunnerOperations ==> "normal streaming"

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can tell, the only benefit of the StreamingEvent abstraction is so that we can distinguish between objects and thinking blocks. Elsewhere in Embabel, in the SuppressThinkingConverter, we filter out the latter completely, so what's the reason we don't do that here for streaming? Is there a case where the user is interested in thinking blocks?
If there is not, then I would suggest dropping StreamingEvent as well as convertStreamWithThinking in StreamingJacksonOutputConverter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, im aware of supressing thinking, although thinking marker(s) format appears vary, which gets addressed in streaming utils.
thought that in streaming use case, expecially with keeping in mind potential UI integration, would by nice to see intermmediate response from LLM. adds some flexibility. im not markin gas resolved if my thought still requires validation on your side.

* Supports both object lines and thinking blocks.
* Uses resilient error handling - logs warnings for individual line failures but continues processing.
*/
fun convertStreamWithThinking(text: String): Flux<StreamingEvent<T>> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have my doubts about the usability of this method, see my comments on StreamingEvent below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replied above. thanks

simplified error handling
convertion to object stream got reimplemented thru more generic method that creates StreamEvent
removed unnesessary logging
updated tests
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants