-
Notifications
You must be signed in to change notification settings - Fork 17
Streaming Converter Implementation for LLM JSONL Processing #89
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Here are my thoughts: By extending More in general, I can see how the If we look at streaming from the perspective of the offering us a Flux of response chunks. So in short, I don't think we can merge this PR until we have an idea of what the integration with PromptRunner and ChatClient will look like. |
|
Thanks for thorough review Arjen. converter in here is schematically chain of invocations in agent repo: This PR is WIP and can not be commited prior to merging artifacts in common as it would break Github build. Here is usage example: as for missing filtering capabilities - will address this gap accordingly. Regarding formatting. prompt follows JSONL standard: RFC 7464 see test below: on multi-object processing. |
|
added streaming converter Filtering support per Arjen's recommendation. Please refer to comprehensive test: |
poutsma
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have left several comments that aim to simplify this PR by reducing its public interface. When we have resolved those, I think it's ready to be merged.
...rc/main/kotlin/com/embabel/common/ai/converters/streaming/StreamingJacksonOutputConverter.kt
Outdated
Show resolved
Hide resolved
...rc/main/kotlin/com/embabel/common/ai/converters/streaming/StreamingJacksonOutputConverter.kt
Outdated
Show resolved
Hide resolved
embabel-common-core/src/main/kotlin/com/embabel/common/core/streaming/StreamingUtils.kt
Show resolved
Hide resolved
...rc/main/kotlin/com/embabel/common/ai/converters/streaming/StreamingJacksonOutputConverter.kt
Show resolved
Hide resolved
...rc/main/kotlin/com/embabel/common/ai/converters/streaming/StreamingJacksonOutputConverter.kt
Outdated
Show resolved
Hide resolved
...rc/main/kotlin/com/embabel/common/ai/converters/streaming/StreamingJacksonOutputConverter.kt
Outdated
Show resolved
Hide resolved
| * val runner: PromptRunner = context.ai().autoLlm() | ||
| * if (runner.supportsStreaming()) { | ||
| * val capability: StreamingCapability = runner.stream() | ||
| * val operations = capability as StreamingPromptRunnerOperations (or use asStreaming extension function) | ||
| * // Use streaming operations... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure what the point of StreamingCapability is.
Looking at this example, it appears to me that runner.stream() could return StreamingPromptRunnerOperations directly, to save us extra cast in line 36.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I can tell, the only benefit of the StreamingEvent abstraction is so that we can distinguish between objects and thinking blocks. Elsewhere in Embabel, in the SuppressThinkingConverter, we filter out the latter completely, so what's the reason we don't do that here for streaming? Is there a case where the user is interested in thinking blocks?
If there is not, then I would suggest dropping StreamingEvent as well as convertStreamWithThinking in StreamingJacksonOutputConverter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, im aware of supressing thinking, although thinking marker(s) format appears vary, which gets addressed in streaming utils.
thought that in streaming use case, expecially with keeping in mind potential UI integration, would by nice to see intermmediate response from LLM. adds some flexibility. im not markin gas resolved if my thought still requires validation on your side.
| * Supports both object lines and thinking blocks. | ||
| * Uses resilient error handling - logs warnings for individual line failures but continues processing. | ||
| */ | ||
| fun convertStreamWithThinking(text: String): Flux<StreamingEvent<T>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have my doubts about the usability of this method, see my comments on StreamingEvent below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replied above. thanks
...rc/main/kotlin/com/embabel/common/ai/converters/streaming/StreamingJacksonOutputConverter.kt
Outdated
Show resolved
Hide resolved
simplified error handling convertion to object stream got reimplemented thru more generic method that creates StreamEvent removed unnesessary logging updated tests

Streaming Converter Implementation for LLM JSON Processing
Overview
This PR introduces a reactive streaming converter for processing JSONL
(JSON Lines) responses from LLMs, enabling real-time streaming of
structured objects and thinking content.
Key Components
🔧 Core Implementation
StreamingJacksonOutputConverter
StreamingEvent
StreamingEvent
🛡️ Error Handling & Resilience
failures
exceptions
Configuration Framework
StreamingConfigProperties
###Package Structure
embabel-common-core/
├── com/embabel/common/core/streaming/
│ ├── StreamingEvent.kt # Core event types
│ └── config/
│ └── StreamingConfigProperties.kt # Configuration properties
embabel-common-ai/
└── com/embabel/common/ai/converters/streaming/
└── StreamingJacksonOutputConverter.kt # JSONL converter
implementation
Usage Examples
Benefits
waiting for complete response
outputs
formatting
environments
Next Steps
This foundation enables the next phase of LLM streaming implementation: