-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Fix connection reuse for file-like data payloads #10915
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
test case import asyncio
import io
import aiohttp
import aiohttp.web
async def hello(_request: aiohttp.web.Request) -> aiohttp.web.Response:
return aiohttp.web.Response(body=b"")
async def main():
app = aiohttp.web.Application()
app.router.add_post("/hello", hello)
# Properly set up a runner and a site
runner = aiohttp.web.AppRunner(app)
await runner.setup()
site = aiohttp.web.TCPSite(runner, "127.0.0.1", 3003)
await site.start()
for _ in range(300):
async with aiohttp.ClientSession() as session:
async with session.post(
"http://127.0.0.1:3003/hello",
data=b"x",
headers={"Content-Length": "1"},
) as response:
response.raise_for_status()
assert len(session._connector._conns) == 1, session._connector._conns
async with aiohttp.ClientSession() as session:
x = io.BytesIO(b"x")
async with session.post(
"http://127.0.0.1:3003/hello",
data=x,
headers={"Content-Length": "1"},
) as response:
response.raise_for_status()
assert (
len(session._connector._conns) == 1
), session._connector._conns
print("All 300 iterations passed successfully!")
# Properly clean up the site and runner
await runner.cleanup()
if __name__ == "__main__":
asyncio.run(main()) |
CodSpeed Performance ReportMerging #10915 will not alter performanceComparing Summary
|
Codecov ReportAll modified and coverable lines are covered by tests ✅
✅ All tests successful. No failed tests found. Additional details and impacted files@@ Coverage Diff @@
## master #10915 +/- ##
==========================================
+ Coverage 98.74% 98.77% +0.02%
==========================================
Files 129 129
Lines 39095 39455 +360
Branches 2166 2184 +18
==========================================
+ Hits 38606 38970 +364
+ Misses 340 337 -3
+ Partials 149 148 -1
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
diff --git a/aiohttp/payload.py b/aiohttp/payload.py
index 55a7a677f..41c6ae9dd 100644
--- a/aiohttp/payload.py
+++ b/aiohttp/payload.py
@@ -14,6 +14,7 @@ from typing import (
Any,
Dict,
Final,
+ Set,
Iterable,
Optional,
TextIO,
@@ -53,6 +54,8 @@ __all__ = (
)
TOO_LARGE_BYTES_BODY: Final[int] = 2**20 # 1 MB
+_CLOSE_FUTURES: Set[asyncio.Future[None]] = set()
+
if TYPE_CHECKING:
from typing import List
@@ -334,9 +337,14 @@ class IOBasePayload(Payload):
chunk = await loop.run_in_executor(None, self._value.read, 2**16)
while chunk:
await writer.write(chunk)
+ break
chunk = await loop.run_in_executor(None, self._value.read, 2**16)
finally:
- await loop.run_in_executor(None, self._value.close)
+ close_future = loop.run_in_executor(None, self._value.close)
+ # Hold a strong reference to the future to prevent it from being
+ # garbage collected before it completes.
+ _CLOSE_FUTURES.add(close_future)
+ close_future.add_done_callback(_CLOSE_FUTURES.remove)
def decode(self, encoding: str = "utf-8", errors: str = "strict") -> str:
return "".join(r.decode(encoding, errors) for r in self._value.readlines())
This fixes it as well but not a good solution because we still need to read |
The problem we have is that the writer could still be reading forever, even though we've satisfied the content length, so we still have to cancel. Combination of the above change and likely a single yield to the event loop via sleep 0 to let an executor job finish is likely all we can do before cancel if we are already at eof is likely all we can do safely to avoid a deadlock. Maybe we can yield twice at most. |
The complexity gets really high since we can't always guarantee we can read the file size, and if it's chunked, we don't have a limit how much we can read., However, without the content length size guard, we end up writing the body of the payload that goes beyond the content length into the next request so we do need to check it |
Once I have it all working, I'm going to refactor it into something that's a lot more readable, as the branching here is so complex |
This reverts commit 7e08cd6.
I have no idea which lines codecov thinks are missing since it was 100% coverage before the last change and I removed some lines. When I click the link it shows 100% coverage so I think something it out of sync |
This reverts commit 8654141.
Thanks. I forgot to revert the testing I was doing and I had two copies checked out, one for PyPy and one for Cpython so I was looking at the wrong one. |
Backport to 3.12: 💔 cherry-picking failed — conflicts found❌ Failed to cleanly apply 545783b on top of patchback/backports/3.12/545783b9e91c69d48ffd3b85c7eb13d1b19eb55e/pr-10915 Backporting merged PR #10915 into master
🤖 @patchback |
(cherry picked from commit 545783b)
try: | ||
chunk = await loop.run_in_executor(None, self._value.read, 2**16) | ||
# Get initial data and available length | ||
available_len, chunk = await loop.run_in_executor( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we just use asyncio.to_thread() now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Method | Mean Time (seconds) | Std Dev (seconds) |
---|---|---|
run_in_executor | 0.0984 | 0.0119 |
asyncio.to_thread | 0.0985 | 0.0120 |
yeah I think so
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
async def to_thread(func, /, *args, **kwargs):
"""Asynchronously run function *func* in a separate thread.
Any *args and **kwargs supplied for this function are directly passed
to *func*. Also, the current :class:`contextvars.Context` is propagated,
allowing context variables from the main thread to be accessed in the
separate thread.
Return a coroutine that can be awaited to get the eventual result of *func*.
"""
loop = events.get_running_loop()
ctx = contextvars.copy_context()
func_call = functools.partial(ctx.run, func, *args, **kwargs)
return await loop.run_in_executor(None, func_call)
Well we still need loop.run_in_executor
for the _schedule_file_close
so it probably makes sense to leave it as-is for now.
# Hold a strong reference to the future to prevent it from being | ||
# garbage collected before it completes. | ||
_CLOSE_FUTURES.add(close_future) | ||
close_future.add_done_callback(_CLOSE_FUTURES.remove) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a risk here that we lose exceptions as we never await on it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think its an acceptable risk as .close()
is very unlikely to fail. Its the same pattern we have in web_fileresponse which has worked well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we are reading there isn't any buffer to flush, and likely the only reason it would fail is if file descriptors somehow get mixed up in Python, and at that point everything is likely broken
@@ -608,16 +653,30 @@ async def write_bytes( | |||
assert protocol is not None | |||
try: | |||
if isinstance(self.body, payload.Payload): | |||
await self.body.write(writer) | |||
# Specialized handling for Payload objects that know how to write themselves |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm refactoring the ClientRequest class, and I've just realised this is not specialized handling, every single body goes through here. The else branch is only used when the body is empty.
Summary
This PR fixes two critical issues in aiohttp's HTTP payload handling that prevented connection reuse:
Buffer truncation: The incoming payload buffer for a ClientRequest wasn't being truncated when it exceeded the specified content length
Note: Lack of buffer truncation is not considered a security issue since it requires the developer to explicitly pass a content length shorter than the payload size. This may be a result of a mistake calculating the payload size or an expectation that only the requested amount would be sent.
Timing issue: The
write_bytes
function had a race where it would sometimes finish after the request had already completed, causing connections to be closed instead of reusedSolution Implemented
The PR fixes both issues:
https://www.rfc-editor.org/rfc/rfc9112.html#section-6.3-8 doesn't have specific guidance on this but it implies we should truncate the incoming buffer if we get more data than we expect.
write_bytes
completes in time by:Impact
Before:
After:
write_bytes
completes in time to allow connection reusefixes #10325