Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Lib/asyncio/base_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1341,6 +1341,14 @@ async def start_tls(self, transport, protocol, sslcontext, *,
ssl_shutdown_timeout=ssl_shutdown_timeout,
call_connection_made=False)

# gh-142352: move buffered StreamReader data to SSLProtocol
stream_reader = getattr(protocol, '_stream_reader', None)
if stream_reader is not None:
buffer = stream_reader._buffer
if buffer:
ssl_protocol._incoming.write(buffer)
buffer.clear()

# Pause early so that "ssl_protocol.data_received()" doesn't
# have a chance to get called before "ssl_protocol.connection_made()".
transport.pause_reading()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it make sense to transfer the buffer after reading is paused rather than before?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving the buffer is memory-only work without await, so doing it before or after pause_reading() is practically equivalent, as long as it happens before transport.set_protocol() and before connection_made()/resume_reading(). So the current order is safe.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought, it might make sense semantically. As a casual reader, seeing a comment after the data manipulation, that says we're pausing before undesired data reads happen seemed confusing to me. Perhaps, others won't feel the same, though.

Expand Down
87 changes: 87 additions & 0 deletions Lib/test/test_asyncio/test_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -819,6 +819,93 @@ async def client(addr):
self.assertEqual(msg1, b"hello world 1!\n")
self.assertEqual(msg2, b"hello world 2!\n")

def _test_start_tls_buffered_data(self, send_combined):
# gh-142352: test start_tls() with buffered data

PROXY_LINE = b"PROXY TCP4 127.0.0.1 127.0.0.1 54321 443\r\n"
TEST_MESSAGE = b"hello world\n"

async def pipe(src, dst):
try:
while data := await src.read(4096):
dst.write(data)
await dst.drain()
finally:
dst.close()
await dst.wait_closed()

async def proxy_handler(client_reader, client_writer, backend_addr):
backend_reader, backend_writer = await asyncio.open_connection(
*backend_addr)
try:
tls_data = await client_reader.read(4096)
if send_combined:
backend_writer.write(PROXY_LINE + tls_data)
else:
backend_writer.write(PROXY_LINE)
await backend_writer.drain()
await asyncio.sleep(0.01)
backend_writer.write(tls_data)
await backend_writer.drain()

await asyncio.gather(
pipe(client_reader, backend_writer),
pipe(backend_reader, client_writer),
)
finally:
client_writer.close()
backend_writer.close()
await asyncio.gather(
client_writer.wait_closed(),
backend_writer.wait_closed(),
return_exceptions=True
)

async def server_handler(client_reader, client_writer):
self.assertEqual(await client_reader.readline(), PROXY_LINE)
await client_writer.start_tls(test_utils.simple_server_sslcontext())
self.assertEqual(await client_reader.readline(), TEST_MESSAGE)
client_writer.close()
await client_writer.wait_closed()

async def client(addr):
_, writer = await asyncio.open_connection(*addr)
await writer.start_tls(test_utils.simple_client_sslcontext())
writer.write(TEST_MESSAGE)
await writer.drain()
writer.close()
await writer.wait_closed()

async def run_test():
server = await asyncio.start_server(
server_handler, socket_helper.HOSTv4, 0)
server_addr = server.sockets[0].getsockname()

proxy = await asyncio.start_server(
lambda r, w: proxy_handler(r, w, server_addr),
socket_helper.HOSTv4, 0)
proxy_addr = proxy.sockets[0].getsockname()

await asyncio.wait_for(client(proxy_addr), timeout=5.0)
proxy.close()
server.close()
await asyncio.gather(proxy.wait_closed(), server.wait_closed())

messages = []
self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))
self.loop.run_until_complete(run_test())
self.assertEqual(messages, [])

@unittest.skipIf(ssl is None, 'No ssl module')
def test_start_tls_buffered_data_combined(self):
# gh-142352: Test TLS data buffered before start_tls
self._test_start_tls_buffered_data(send_combined=True)

@unittest.skipIf(ssl is None, 'No ssl module')
def test_start_tls_buffered_data_separate(self):
# gh-142352: Test TLS data sent separately
self._test_start_tls_buffered_data(send_combined=False)

def test_streamreader_constructor_without_loop(self):
with self.assertRaisesRegex(RuntimeError, 'no current event loop'):
asyncio.StreamReader()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Fix :meth:`asyncio.StreamWriter.start_tls` to transfer buffered data from
:class:`~asyncio.StreamReader` to the SSL layer, preventing data loss when
upgrading a connection to TLS mid-stream (e.g., when implementing PROXY
protocol support).
Loading