Skip to content

Streaming error messages duplicates error message and loses stacktrace? with MVP #1216

Open
@Sixzero

Description

@Sixzero

I wrote an MVP to show the issue, basically throwing an error while streaming with HTTP.jl:

# streaming_mvp.jl
using HTTP, JSON3, Sockets

"""
Minimal Viable Product (MVP) for HTTP streaming
Demonstrates a server that streams responses and a client that consumes them
"""

# Define a simple callback type to handle streaming responses
mutable struct SimpleStreamCallback
    SimpleStreamCallback() = new()
end
error_function(data) = if occursin("3", data) error("Error: The simulated error is here, we need readable stacktrace") end
# Simple callback function that just prints the data
function process_chunk(callback::SimpleStreamCallback, data)
    error_function(data)
    print(data)
end

# Server implementation
function create_streaming_server(host="127.0.0.1", port=8080)
    router = HTTP.Router()
    
    # Streaming endpoint
    function stream_handler(stream::HTTP.Stream)
        HTTP.setheader(stream, "Content-Type" => "text/event-stream")
        HTTP.setheader(stream, "Cache-Control" => "no-cache")
        HTTP.setheader(stream, "Connection" => "keep-alive")
        
        # Process the request body if needed
        body = String(read(stream))
        
        # Stream response data
        for i in 1:10
            # Send a data message in SSE format
            write(stream, "data: Chunk $i of data\n\n")
            sleep(0.5)
        end
        
        # Send final message
        write(stream, "data: Streaming completed\n\n")
        
        return nothing
    end
    
    # Register the streaming endpoint
    HTTP.register!(router, "POST", "/stream", stream_handler)
    
    # Start the server
    server = HTTP.serve!(router, host, port; stream=true)
    
    println("Server running at http://$host:$port")
    return server
end

# Simplified client implementation
function streamed_request!(callback::SimpleStreamCallback, url, headers, input)
    resp = HTTP.open("POST", url, headers) do stream
        # Send request body
        write(stream, String(take!(input)))
        HTTP.closewrite(stream)
        
        # Start reading the response
        response = HTTP.startread(stream)
        
        # Verify content type
        content_type = [header[2]
                        for header in response.headers
                        if lowercase(header[1]) == "content-type"]
        @assert length(content_type)==1 "Content-Type header must be present and unique"
        @assert occursin("text/event-stream", lowercase(content_type[1])) "Content-Type must be text/event-stream"
        
        # Process the stream
        buffer = ""
        while !eof(stream)
            # Read available data
            chunk = String(readavailable(stream))
            buffer *= chunk
            
            # Process complete SSE messages (format: "message\n\n")
            while occursin("\n\n", buffer)
                parts = split(buffer, "\n\n", limit=2)
                message = parts[1]
                buffer = length(parts) > 1 ? parts[2] : ""
                
                # Extract data from SSE format
                if startswith(message, "")
                    data = replace(message, "data: " => "")
                    # Call the callback with the data
                    process_chunk(callback, data)
                end
            end
        end
        
        HTTP.closeread(stream)
        return response
    end
    
    # Add a newline for better formatting
    println()
    
    return resp
end

# Example usage
function run_example()
    # Start the server
    server = create_streaming_server()
    
    try
        # Create a simple callback
        callback = SimpleStreamCallback()
        
        # Prepare request
        url = "http://127.0.0.1:8080/stream"
        headers = ["Content-Type" => "application/json"]
        input = IOBuffer(JSON3.write(Dict(:prompt => "Test streaming")))
        
        # Make the streaming request
        println("Making streaming request...")
        resp = streamed_request!(callback, url, headers, input)
        
        # Show the final response status
        println("\nRequest completed with status: $(resp.status)")
    finally
        # Shutdown the server
        close(server)
        @assert istaskdone(server.task)
    end
end

# Run the example
run_example()

Output:

Server running at http://127.0.0.1:8080
Making streaming request...
Chunk 1 of dataChunk 2 of data[ Info: Server on 127.0.0.1:8080 closing
ERROR: HTTP.RequestError:
HTTP.Request:
HTTP.Messages.Request:
"""
POST /stream HTTP/1.1
Content-Type: application/json
Host: 127.0.0.1:8080
Accept: */*
User-Agent: HTTP.jl/1.11.3
Accept-Encoding: gzip
Transfer-Encoding: chunked

[Message Body was streamed]"""Underlying error:
Error: The simulated error is here, we need readable stacktrace
Stacktrace:
  [1] (::HTTP.ConnectionRequest.var"#connections#4"{…})(req::HTTP.Messages.Request; proxy::Nothing, socket_type::Type, socket_type_tls::Nothing, readtimeout::Int64, connect_timeout::Int64, logerrors::Bool, logtag::Nothing, closeimmediately::Bool, kw::@Kwargs{…})
    @ HTTP.ConnectionRequest ~/.julia/packages/HTTP/4AUPl/src/clientlayers/ConnectionRequest.jl:143
  [2] connections
    @ ~/.julia/packages/HTTP/4AUPl/src/clientlayers/ConnectionRequest.jl:60 [inlined]
  [3] (::Base.var"#106#108"{…})(args::HTTP.Messages.Request; kwargs::@Kwargs{…})
    @ Base ./error.jl:300
  [4] (::HTTP.RetryRequest.var"#manageretries#3"{…})(req::HTTP.Messages.Request; retry::Bool, retries::Int64, retry_delays::ExponentialBackOff, retry_check::Function, retry_non_idempotent::Bool, kw::@Kwargs{…})
    @ HTTP.RetryRequest ~/.julia/packages/HTTP/4AUPl/src/clientlayers/RetryRequest.jl:75
  [5] manageretries
    @ ~/.julia/packages/HTTP/4AUPl/src/clientlayers/RetryRequest.jl:30 [inlined]
  [6] (::HTTP.CookieRequest.var"#managecookies#4"{…})(req::HTTP.Messages.Request; cookies::Bool, cookiejar::HTTP.Cookies.CookieJar, kw::@Kwargs{…})
    @ HTTP.CookieRequest ~/.julia/packages/HTTP/4AUPl/src/clientlayers/CookieRequest.jl:42
  [7] (::HTTP.HeadersRequest.var"#defaultheaders#2"{…})(req::HTTP.Messages.Request; iofunction::Function, decompress::Nothing, basicauth::Bool, detect_content_type::Bool, canonicalize_headers::Bool, kw::@Kwargs{…})
    @ HTTP.HeadersRequest ~/.julia/packages/HTTP/4AUPl/src/clientlayers/HeadersRequest.jl:71
  [8] defaultheaders
    @ ~/.julia/packages/HTTP/4AUPl/src/clientlayers/HeadersRequest.jl:14 [inlined]
  [9] (::HTTP.RedirectRequest.var"#redirects#3"{…})(req::HTTP.Messages.Request; redirect::Bool, redirect_limit::Int64, redirect_method::Nothing, forwardheaders::Bool, response_stream::Nothing, kw::@Kwargs{…})
    @ HTTP.RedirectRequest ~/.julia/packages/HTTP/4AUPl/src/clientlayers/RedirectRequest.jl:25
 [10] redirects
    @ ~/.julia/packages/HTTP/4AUPl/src/clientlayers/RedirectRequest.jl:14 [inlined]
 [11] (::HTTP.MessageRequest.var"#makerequest#3"{…})(method::String, url::URIs.URI, headers::Vector{…}, body::Nothing; copyheaders::Bool, response_stream::Nothing, http_version::HTTP.Strings.HTTPVersion, verbose::Int64, kw::@Kwargs{…})
    @ HTTP.MessageRequest ~/.julia/packages/HTTP/4AUPl/src/clientlayers/MessageRequest.jl:35
 [12] makerequest
    @ ~/.julia/packages/HTTP/4AUPl/src/clientlayers/MessageRequest.jl:24 [inlined]
 [13] request(stack::HTTP.MessageRequest.var"#makerequest#3"{…}, method::String, url::String, h::Vector{…}, b::Nothing, q::Nothing; headers::Vector{…}, body::Nothing, query::Nothing, kw::@Kwargs{…})
    @ HTTP ~/.julia/packages/HTTP/4AUPl/src/HTTP.jl:457
 [14] #request#20
    @ ~/.julia/packages/HTTP/4AUPl/src/HTTP.jl:315 [inlined]
 [15] request
    @ ~/.julia/packages/HTTP/4AUPl/src/HTTP.jl:313 [inlined]
 [16] open
    @ ~/.julia/packages/HTTP/4AUPl/src/HTTP.jl:579 [inlined]
 [17] streamed_request!
    @ ~/repo/jl_pkgs/HTTP.jl/docs/examples/streaming_mvp.jl:57 [inlined]
 [18] run_example()
    @ Main ~/repo/jl_pkgs/HTTP.jl/docs/examples/streaming_mvp.jl:120
 [19] top-level scope
    @ ~/repo/jl_pkgs/HTTP.jl/docs/examples/streaming_mvp.jl:132

caused by: Error: The simulated error is here, we need readable stacktrace
Stacktrace:
  [1] error(s::String)
    @ Base ./error.jl:35
  [2] error_function
    @ ~/repo/jl_pkgs/HTTP.jl/docs/examples/streaming_mvp.jl:12 [inlined]
  [3] process_chunk
    @ ~/repo/jl_pkgs/HTTP.jl/docs/examples/streaming_mvp.jl:15 [inlined]
  [4] (::var"#29#32"{…})(stream::HTTP.Streams.Stream{…})
    @ Main ~/repo/jl_pkgs/HTTP.jl/docs/examples/streaming_mvp.jl:89
  [5] macro expansion
    @ ~/.julia/packages/HTTP/4AUPl/src/clientlayers/StreamRequest.jl:65 [inlined]
  [6] macro expansion
    @ ./task.jl:498 [inlined]
  [7] streamlayer(stream::HTTP.Streams.Stream{…}; iofunction::var"#29#32"{…}, decompress::Nothing, logerrors::Bool, logtag::Nothing, timedout::Nothing, kw::@Kwargs{…})
    @ HTTP.StreamRequest ~/.julia/packages/HTTP/4AUPl/src/clientlayers/StreamRequest.jl:35
  [8] streamlayer
    @ ~/.julia/packages/HTTP/4AUPl/src/clientlayers/StreamRequest.jl:21 [inlined]
  [9] (::HTTP.ExceptionRequest.var"#exceptions#2"{…})(stream::HTTP.Streams.Stream{…}; status_exception::Bool, timedout::Nothing, logerrors::Bool, logtag::Nothing, kw::@Kwargs{…})
    @ HTTP.ExceptionRequest ~/.julia/packages/HTTP/4AUPl/src/clientlayers/ExceptionRequest.jl:14
 [10] exceptions
    @ ~/.julia/packages/HTTP/4AUPl/src/clientlayers/ExceptionRequest.jl:13 [inlined]
 [11] (::HTTP.TimeoutRequest.var"#timeouts#3"{…})(stream::HTTP.Streams.Stream{…}; readtimeout::Int64, logerrors::Bool, logtag::Nothing, kw::@Kwargs{…})
    @ HTTP.TimeoutRequest ~/.julia/packages/HTTP/4AUPl/src/clientlayers/TimeoutRequest.jl:18
 [12] (::HTTP.ConnectionRequest.var"#connections#4"{…})(req::HTTP.Messages.Request; proxy::Nothing, socket_type::Type, socket_type_tls::Nothing, readtimeout::Int64, connect_timeout::Int64, logerrors::Bool, logtag::Nothing, closeimmediately::Bool, kw::@Kwargs{…})
    @ HTTP.ConnectionRequest ~/.julia/packages/HTTP/4AUPl/src/clientlayers/ConnectionRequest.jl:122
 [13] connections
    @ ~/.julia/packages/HTTP/4AUPl/src/clientlayers/ConnectionRequest.jl:60 [inlined]
 [14] (::Base.var"#106#108"{…})(args::HTTP.Messages.Request; kwargs::@Kwargs{…})
    @ Base ./error.jl:300
 [15] (::HTTP.RetryRequest.var"#manageretries#3"{…})(req::HTTP.Messages.Request; retry::Bool, retries::Int64, retry_delays::ExponentialBackOff, retry_check::Function, retry_non_idempotent::Bool, kw::@Kwargs{…})
    @ HTTP.RetryRequest ~/.julia/packages/HTTP/4AUPl/src/clientlayers/RetryRequest.jl:75
 [16] manageretries
    @ ~/.julia/packages/HTTP/4AUPl/src/clientlayers/RetryRequest.jl:30 [inlined]
 [17] (::HTTP.CookieRequest.var"#managecookies#4"{…})(req::HTTP.Messages.Request; cookies::Bool, cookiejar::HTTP.Cookies.CookieJar, kw::@Kwargs{…})
    @ HTTP.CookieRequest ~/.julia/packages/HTTP/4AUPl/src/clientlayers/CookieRequest.jl:42
 [18] (::HTTP.HeadersRequest.var"#defaultheaders#2"{…})(req::HTTP.Messages.Request; iofunction::Function, decompress::Nothing, basicauth::Bool, detect_content_type::Bool, canonicalize_headers::Bool, kw::@Kwargs{…})
    @ HTTP.HeadersRequest ~/.julia/packages/HTTP/4AUPl/src/clientlayers/HeadersRequest.jl:71
 [19] defaultheaders
    @ ~/.julia/packages/HTTP/4AUPl/src/clientlayers/HeadersRequest.jl:14 [inlined]
 [20] (::HTTP.RedirectRequest.var"#redirects#3"{…})(req::HTTP.Messages.Request; redirect::Bool, redirect_limit::Int64, redirect_method::Nothing, forwardheaders::Bool, response_stream::Nothing, kw::@Kwargs{…})
    @ HTTP.RedirectRequest ~/.julia/packages/HTTP/4AUPl/src/clientlayers/RedirectRequest.jl:25
 [21] redirects
    @ ~/.julia/packages/HTTP/4AUPl/src/clientlayers/RedirectRequest.jl:14 [inlined]
 [22] (::HTTP.MessageRequest.var"#makerequest#3"{…})(method::String, url::URIs.URI, headers::Vector{…}, body::Nothing; copyheaders::Bool, response_stream::Nothing, http_version::HTTP.Strings.HTTPVersion, verbose::Int64, kw::@Kwargs{…})
    @ HTTP.MessageRequest ~/.julia/packages/HTTP/4AUPl/src/clientlayers/MessageRequest.jl:35
 [23] makerequest
    @ ~/.julia/packages/HTTP/4AUPl/src/clientlayers/MessageRequest.jl:24 [inlined]
 [24] request(stack::HTTP.MessageRequest.var"#makerequest#3"{…}, method::String, url::String, h::Vector{…}, b::Nothing, q::Nothing; headers::Vector{…}, body::Nothing, query::Nothing, kw::@Kwargs{…})
    @ HTTP ~/.julia/packages/HTTP/4AUPl/src/HTTP.jl:457
 [25] #request#20
    @ ~/.julia/packages/HTTP/4AUPl/src/HTTP.jl:315 [inlined]
 [26] request
    @ ~/.julia/packages/HTTP/4AUPl/src/HTTP.jl:313 [inlined]
 [27] open
    @ ~/.julia/packages/HTTP/4AUPl/src/HTTP.jl:579 [inlined]
 [28] streamed_request!
    @ ~/repo/jl_pkgs/HTTP.jl/docs/examples/streaming_mvp.jl:57 [inlined]
 [29] run_example()
    @ Main ~/repo/jl_pkgs/HTTP.jl/docs/examples/streaming_mvp.jl:120
 [30] top-level scope
    @ ~/repo/jl_pkgs/HTTP.jl/docs/examples/streaming_mvp.jl:132
Some type information was truncated. Use `show(err)` to see complete types.

What is cool is that I actually can see a stacktrace here, but only in the second message shows those 3 lines we need. I wonder if things are well written here, why would we get duplicated stacktrace lines ?

Also I feel the stacktrace is a little bit too deep, I wonder if we could make this a little bit simpler.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions