Open
Description
I wrote an MVP to show the issue, basically throwing an error while streaming with HTTP.jl:
# streaming_mvp.jl
using HTTP, JSON3, Sockets
"""
Minimal Viable Product (MVP) for HTTP streaming
Demonstrates a server that streams responses and a client that consumes them
"""
# Define a simple callback type to handle streaming responses
mutable struct SimpleStreamCallback
SimpleStreamCallback() = new()
end
error_function(data) = if occursin("3", data) error("Error: The simulated error is here, we need readable stacktrace") end
# Simple callback function that just prints the data
function process_chunk(callback::SimpleStreamCallback, data)
error_function(data)
print(data)
end
# Server implementation
function create_streaming_server(host="127.0.0.1", port=8080)
router = HTTP.Router()
# Streaming endpoint
function stream_handler(stream::HTTP.Stream)
HTTP.setheader(stream, "Content-Type" => "text/event-stream")
HTTP.setheader(stream, "Cache-Control" => "no-cache")
HTTP.setheader(stream, "Connection" => "keep-alive")
# Process the request body if needed
body = String(read(stream))
# Stream response data
for i in 1:10
# Send a data message in SSE format
write(stream, "data: Chunk $i of data\n\n")
sleep(0.5)
end
# Send final message
write(stream, "data: Streaming completed\n\n")
return nothing
end
# Register the streaming endpoint
HTTP.register!(router, "POST", "/stream", stream_handler)
# Start the server
server = HTTP.serve!(router, host, port; stream=true)
println("Server running at http://$host:$port")
return server
end
# Simplified client implementation
function streamed_request!(callback::SimpleStreamCallback, url, headers, input)
resp = HTTP.open("POST", url, headers) do stream
# Send request body
write(stream, String(take!(input)))
HTTP.closewrite(stream)
# Start reading the response
response = HTTP.startread(stream)
# Verify content type
content_type = [header[2]
for header in response.headers
if lowercase(header[1]) == "content-type"]
@assert length(content_type)==1 "Content-Type header must be present and unique"
@assert occursin("text/event-stream", lowercase(content_type[1])) "Content-Type must be text/event-stream"
# Process the stream
buffer = ""
while !eof(stream)
# Read available data
chunk = String(readavailable(stream))
buffer *= chunk
# Process complete SSE messages (format: "message\n\n")
while occursin("\n\n", buffer)
parts = split(buffer, "\n\n", limit=2)
message = parts[1]
buffer = length(parts) > 1 ? parts[2] : ""
# Extract data from SSE format
if startswith(message, "")
data = replace(message, "data: " => "")
# Call the callback with the data
process_chunk(callback, data)
end
end
end
HTTP.closeread(stream)
return response
end
# Add a newline for better formatting
println()
return resp
end
# Example usage
function run_example()
# Start the server
server = create_streaming_server()
try
# Create a simple callback
callback = SimpleStreamCallback()
# Prepare request
url = "http://127.0.0.1:8080/stream"
headers = ["Content-Type" => "application/json"]
input = IOBuffer(JSON3.write(Dict(:prompt => "Test streaming")))
# Make the streaming request
println("Making streaming request...")
resp = streamed_request!(callback, url, headers, input)
# Show the final response status
println("\nRequest completed with status: $(resp.status)")
finally
# Shutdown the server
close(server)
@assert istaskdone(server.task)
end
end
# Run the example
run_example()
Output:
Server running at http://127.0.0.1:8080
Making streaming request...
Chunk 1 of dataChunk 2 of data[ Info: Server on 127.0.0.1:8080 closing
ERROR: HTTP.RequestError:
HTTP.Request:
HTTP.Messages.Request:
"""
POST /stream HTTP/1.1
Content-Type: application/json
Host: 127.0.0.1:8080
Accept: */*
User-Agent: HTTP.jl/1.11.3
Accept-Encoding: gzip
Transfer-Encoding: chunked
[Message Body was streamed]"""Underlying error:
Error: The simulated error is here, we need readable stacktrace
Stacktrace:
[1] (::HTTP.ConnectionRequest.var"#connections#4"{…})(req::HTTP.Messages.Request; proxy::Nothing, socket_type::Type, socket_type_tls::Nothing, readtimeout::Int64, connect_timeout::Int64, logerrors::Bool, logtag::Nothing, closeimmediately::Bool, kw::@Kwargs{…})
@ HTTP.ConnectionRequest ~/.julia/packages/HTTP/4AUPl/src/clientlayers/ConnectionRequest.jl:143
[2] connections
@ ~/.julia/packages/HTTP/4AUPl/src/clientlayers/ConnectionRequest.jl:60 [inlined]
[3] (::Base.var"#106#108"{…})(args::HTTP.Messages.Request; kwargs::@Kwargs{…})
@ Base ./error.jl:300
[4] (::HTTP.RetryRequest.var"#manageretries#3"{…})(req::HTTP.Messages.Request; retry::Bool, retries::Int64, retry_delays::ExponentialBackOff, retry_check::Function, retry_non_idempotent::Bool, kw::@Kwargs{…})
@ HTTP.RetryRequest ~/.julia/packages/HTTP/4AUPl/src/clientlayers/RetryRequest.jl:75
[5] manageretries
@ ~/.julia/packages/HTTP/4AUPl/src/clientlayers/RetryRequest.jl:30 [inlined]
[6] (::HTTP.CookieRequest.var"#managecookies#4"{…})(req::HTTP.Messages.Request; cookies::Bool, cookiejar::HTTP.Cookies.CookieJar, kw::@Kwargs{…})
@ HTTP.CookieRequest ~/.julia/packages/HTTP/4AUPl/src/clientlayers/CookieRequest.jl:42
[7] (::HTTP.HeadersRequest.var"#defaultheaders#2"{…})(req::HTTP.Messages.Request; iofunction::Function, decompress::Nothing, basicauth::Bool, detect_content_type::Bool, canonicalize_headers::Bool, kw::@Kwargs{…})
@ HTTP.HeadersRequest ~/.julia/packages/HTTP/4AUPl/src/clientlayers/HeadersRequest.jl:71
[8] defaultheaders
@ ~/.julia/packages/HTTP/4AUPl/src/clientlayers/HeadersRequest.jl:14 [inlined]
[9] (::HTTP.RedirectRequest.var"#redirects#3"{…})(req::HTTP.Messages.Request; redirect::Bool, redirect_limit::Int64, redirect_method::Nothing, forwardheaders::Bool, response_stream::Nothing, kw::@Kwargs{…})
@ HTTP.RedirectRequest ~/.julia/packages/HTTP/4AUPl/src/clientlayers/RedirectRequest.jl:25
[10] redirects
@ ~/.julia/packages/HTTP/4AUPl/src/clientlayers/RedirectRequest.jl:14 [inlined]
[11] (::HTTP.MessageRequest.var"#makerequest#3"{…})(method::String, url::URIs.URI, headers::Vector{…}, body::Nothing; copyheaders::Bool, response_stream::Nothing, http_version::HTTP.Strings.HTTPVersion, verbose::Int64, kw::@Kwargs{…})
@ HTTP.MessageRequest ~/.julia/packages/HTTP/4AUPl/src/clientlayers/MessageRequest.jl:35
[12] makerequest
@ ~/.julia/packages/HTTP/4AUPl/src/clientlayers/MessageRequest.jl:24 [inlined]
[13] request(stack::HTTP.MessageRequest.var"#makerequest#3"{…}, method::String, url::String, h::Vector{…}, b::Nothing, q::Nothing; headers::Vector{…}, body::Nothing, query::Nothing, kw::@Kwargs{…})
@ HTTP ~/.julia/packages/HTTP/4AUPl/src/HTTP.jl:457
[14] #request#20
@ ~/.julia/packages/HTTP/4AUPl/src/HTTP.jl:315 [inlined]
[15] request
@ ~/.julia/packages/HTTP/4AUPl/src/HTTP.jl:313 [inlined]
[16] open
@ ~/.julia/packages/HTTP/4AUPl/src/HTTP.jl:579 [inlined]
[17] streamed_request!
@ ~/repo/jl_pkgs/HTTP.jl/docs/examples/streaming_mvp.jl:57 [inlined]
[18] run_example()
@ Main ~/repo/jl_pkgs/HTTP.jl/docs/examples/streaming_mvp.jl:120
[19] top-level scope
@ ~/repo/jl_pkgs/HTTP.jl/docs/examples/streaming_mvp.jl:132
caused by: Error: The simulated error is here, we need readable stacktrace
Stacktrace:
[1] error(s::String)
@ Base ./error.jl:35
[2] error_function
@ ~/repo/jl_pkgs/HTTP.jl/docs/examples/streaming_mvp.jl:12 [inlined]
[3] process_chunk
@ ~/repo/jl_pkgs/HTTP.jl/docs/examples/streaming_mvp.jl:15 [inlined]
[4] (::var"#29#32"{…})(stream::HTTP.Streams.Stream{…})
@ Main ~/repo/jl_pkgs/HTTP.jl/docs/examples/streaming_mvp.jl:89
[5] macro expansion
@ ~/.julia/packages/HTTP/4AUPl/src/clientlayers/StreamRequest.jl:65 [inlined]
[6] macro expansion
@ ./task.jl:498 [inlined]
[7] streamlayer(stream::HTTP.Streams.Stream{…}; iofunction::var"#29#32"{…}, decompress::Nothing, logerrors::Bool, logtag::Nothing, timedout::Nothing, kw::@Kwargs{…})
@ HTTP.StreamRequest ~/.julia/packages/HTTP/4AUPl/src/clientlayers/StreamRequest.jl:35
[8] streamlayer
@ ~/.julia/packages/HTTP/4AUPl/src/clientlayers/StreamRequest.jl:21 [inlined]
[9] (::HTTP.ExceptionRequest.var"#exceptions#2"{…})(stream::HTTP.Streams.Stream{…}; status_exception::Bool, timedout::Nothing, logerrors::Bool, logtag::Nothing, kw::@Kwargs{…})
@ HTTP.ExceptionRequest ~/.julia/packages/HTTP/4AUPl/src/clientlayers/ExceptionRequest.jl:14
[10] exceptions
@ ~/.julia/packages/HTTP/4AUPl/src/clientlayers/ExceptionRequest.jl:13 [inlined]
[11] (::HTTP.TimeoutRequest.var"#timeouts#3"{…})(stream::HTTP.Streams.Stream{…}; readtimeout::Int64, logerrors::Bool, logtag::Nothing, kw::@Kwargs{…})
@ HTTP.TimeoutRequest ~/.julia/packages/HTTP/4AUPl/src/clientlayers/TimeoutRequest.jl:18
[12] (::HTTP.ConnectionRequest.var"#connections#4"{…})(req::HTTP.Messages.Request; proxy::Nothing, socket_type::Type, socket_type_tls::Nothing, readtimeout::Int64, connect_timeout::Int64, logerrors::Bool, logtag::Nothing, closeimmediately::Bool, kw::@Kwargs{…})
@ HTTP.ConnectionRequest ~/.julia/packages/HTTP/4AUPl/src/clientlayers/ConnectionRequest.jl:122
[13] connections
@ ~/.julia/packages/HTTP/4AUPl/src/clientlayers/ConnectionRequest.jl:60 [inlined]
[14] (::Base.var"#106#108"{…})(args::HTTP.Messages.Request; kwargs::@Kwargs{…})
@ Base ./error.jl:300
[15] (::HTTP.RetryRequest.var"#manageretries#3"{…})(req::HTTP.Messages.Request; retry::Bool, retries::Int64, retry_delays::ExponentialBackOff, retry_check::Function, retry_non_idempotent::Bool, kw::@Kwargs{…})
@ HTTP.RetryRequest ~/.julia/packages/HTTP/4AUPl/src/clientlayers/RetryRequest.jl:75
[16] manageretries
@ ~/.julia/packages/HTTP/4AUPl/src/clientlayers/RetryRequest.jl:30 [inlined]
[17] (::HTTP.CookieRequest.var"#managecookies#4"{…})(req::HTTP.Messages.Request; cookies::Bool, cookiejar::HTTP.Cookies.CookieJar, kw::@Kwargs{…})
@ HTTP.CookieRequest ~/.julia/packages/HTTP/4AUPl/src/clientlayers/CookieRequest.jl:42
[18] (::HTTP.HeadersRequest.var"#defaultheaders#2"{…})(req::HTTP.Messages.Request; iofunction::Function, decompress::Nothing, basicauth::Bool, detect_content_type::Bool, canonicalize_headers::Bool, kw::@Kwargs{…})
@ HTTP.HeadersRequest ~/.julia/packages/HTTP/4AUPl/src/clientlayers/HeadersRequest.jl:71
[19] defaultheaders
@ ~/.julia/packages/HTTP/4AUPl/src/clientlayers/HeadersRequest.jl:14 [inlined]
[20] (::HTTP.RedirectRequest.var"#redirects#3"{…})(req::HTTP.Messages.Request; redirect::Bool, redirect_limit::Int64, redirect_method::Nothing, forwardheaders::Bool, response_stream::Nothing, kw::@Kwargs{…})
@ HTTP.RedirectRequest ~/.julia/packages/HTTP/4AUPl/src/clientlayers/RedirectRequest.jl:25
[21] redirects
@ ~/.julia/packages/HTTP/4AUPl/src/clientlayers/RedirectRequest.jl:14 [inlined]
[22] (::HTTP.MessageRequest.var"#makerequest#3"{…})(method::String, url::URIs.URI, headers::Vector{…}, body::Nothing; copyheaders::Bool, response_stream::Nothing, http_version::HTTP.Strings.HTTPVersion, verbose::Int64, kw::@Kwargs{…})
@ HTTP.MessageRequest ~/.julia/packages/HTTP/4AUPl/src/clientlayers/MessageRequest.jl:35
[23] makerequest
@ ~/.julia/packages/HTTP/4AUPl/src/clientlayers/MessageRequest.jl:24 [inlined]
[24] request(stack::HTTP.MessageRequest.var"#makerequest#3"{…}, method::String, url::String, h::Vector{…}, b::Nothing, q::Nothing; headers::Vector{…}, body::Nothing, query::Nothing, kw::@Kwargs{…})
@ HTTP ~/.julia/packages/HTTP/4AUPl/src/HTTP.jl:457
[25] #request#20
@ ~/.julia/packages/HTTP/4AUPl/src/HTTP.jl:315 [inlined]
[26] request
@ ~/.julia/packages/HTTP/4AUPl/src/HTTP.jl:313 [inlined]
[27] open
@ ~/.julia/packages/HTTP/4AUPl/src/HTTP.jl:579 [inlined]
[28] streamed_request!
@ ~/repo/jl_pkgs/HTTP.jl/docs/examples/streaming_mvp.jl:57 [inlined]
[29] run_example()
@ Main ~/repo/jl_pkgs/HTTP.jl/docs/examples/streaming_mvp.jl:120
[30] top-level scope
@ ~/repo/jl_pkgs/HTTP.jl/docs/examples/streaming_mvp.jl:132
Some type information was truncated. Use `show(err)` to see complete types.
What is cool is that I actually can see a stacktrace here, but only in the second message shows those 3 lines we need. I wonder if things are well written here, why would we get duplicated stacktrace lines ?
Also I feel the stacktrace is a little bit too deep, I wonder if we could make this a little bit simpler.
Metadata
Metadata
Assignees
Labels
No labels