Skip to content

Commit 968f085

Browse files
committed
del streamreader patch
1 parent a733ac4 commit 968f085

File tree

2 files changed

+21
-15
lines changed

2 files changed

+21
-15
lines changed

pproxy/proto.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,7 @@ def get_protos(rawprotos):
553553
def sslwrap(reader, writer, sslcontext, server_side=False, server_hostname=None, verbose=None):
554554
if sslcontext is None:
555555
return reader, writer
556-
ssl_reader = asyncio.StreamReader()
556+
ssl_reader = type(reader)()
557557
class Protocol(asyncio.Protocol):
558558
def data_received(self, data):
559559
ssl_reader.feed_data(data)

pproxy/server.py

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,22 @@
22
from . import proto
33
from .__doc__ import *
44

5-
SOCKET_TIMEOUT = 300
5+
SOCKET_TIMEOUT = 60
66
UDP_LIMIT = 30
77
DUMMY = lambda s: s
88

9-
asyncio.StreamReader.read_w = lambda self, n: asyncio.wait_for(self.read(n), timeout=SOCKET_TIMEOUT)
10-
asyncio.StreamReader.read_n = lambda self, n: asyncio.wait_for(self.readexactly(n), timeout=SOCKET_TIMEOUT)
11-
asyncio.StreamReader.read_until = lambda self, s: asyncio.wait_for(self.readuntil(s), timeout=SOCKET_TIMEOUT)
12-
asyncio.StreamReader.rollback = lambda self, s: self._buffer.__setitem__(slice(0, 0), s)
9+
class ProxyReader(asyncio.StreamReader):
10+
def __init__(self, o=None):
11+
if o:
12+
self.__dict__ = o.__dict__
13+
def read_w(self, n):
14+
return asyncio.wait_for(self.read(n), timeout=SOCKET_TIMEOUT)
15+
def read_n(self, n):
16+
return asyncio.wait_for(self.readexactly(n), timeout=SOCKET_TIMEOUT)
17+
def read_until(self, s):
18+
return asyncio.wait_for(self.readuntil(s), timeout=SOCKET_TIMEOUT)
19+
def rollback(self, s):
20+
self._buffer.__setitem__(slice(0, 0), s)
1321

1422
class AuthTable(object):
1523
_auth = {}
@@ -56,7 +64,7 @@ def schedule(rserver, salgorithm, host_name, port):
5664

5765
async def stream_handler(reader, writer, unix, lbind, protos, rserver, cipher, sslserver, debug=0, authtime=86400*30, block=None, salgorithm='fa', verbose=DUMMY, modstat=lambda u,r,h:lambda i:DUMMY, **kwargs):
5866
try:
59-
reader, writer = proto.sslwrap(reader, writer, sslserver, True, None, verbose)
67+
reader, writer = proto.sslwrap(ProxyReader(reader), writer, sslserver, True, None, verbose)
6068
if unix:
6169
remote_ip, server_ip, remote_text = 'local', None, 'unix_local'
6270
else:
@@ -219,10 +227,8 @@ async def open_connection(self, host, port, local_addr, lbind, timeout=SOCKET_TI
219227
reader, writer = await asyncio.wait_for(wait, timeout=timeout)
220228
except Exception as ex:
221229
raise
222-
return reader, writer
223-
def prepare_connection(self, reader_remote, writer_remote, host, port):
224-
return self.prepare_ciphers_and_headers(reader_remote, writer_remote, host, port)
225-
async def prepare_ciphers_and_headers(self, reader_remote, writer_remote, host, port):
230+
return ProxyReader(reader), writer
231+
async def prepare_connection(self, reader_remote, writer_remote, host, port):
226232
return reader_remote, writer_remote
227233
async def tcp_connect(self, host, port, local_addr=None, lbind=None):
228234
reader, writer = await self.open_connection(host, port, local_addr, lbind)
@@ -288,12 +294,12 @@ def wait_open_connection(self, host, port, local_addr, family):
288294
return asyncio.open_unix_connection(path=self.bind)
289295
else:
290296
return asyncio.open_connection(host=self.host_name, port=self.port, local_addr=local_addr, family=family)
291-
async def prepare_ciphers_and_headers(self, reader_remote, writer_remote, host, port):
297+
async def prepare_connection(self, reader_remote, writer_remote, host, port):
292298
reader_remote, writer_remote = proto.sslwrap(reader_remote, writer_remote, self.sslclient, False, self.host_name)
293299
_, writer_cipher_r = await prepare_ciphers(self.cipher, reader_remote, writer_remote, self.bind)
294300
whost, wport = self.jump.destination(host, port)
295301
await self.rproto.connect(reader_remote=reader_remote, writer_remote=writer_remote, rauth=self.auth, host_name=whost, port=wport, writer_cipher_r=writer_cipher_r, myhost=self.host_name, sock=writer_remote.get_extra_info('socket'))
296-
return await self.jump.prepare_ciphers_and_headers(reader_remote, writer_remote, host, port)
302+
return await self.jump.prepare_connection(reader_remote, writer_remote, host, port)
297303
def start_server(self, args, stream_handler=stream_handler):
298304
handler = functools.partial(stream_handler, **vars(self), **args)
299305
if self.unix:
@@ -470,7 +476,7 @@ def __init__(self, backward, backward_num, **kw):
470476
async def wait_open_connection(self, *args):
471477
while True:
472478
reader, writer = await self.conn.get()
473-
if not writer.is_closing():
479+
if not writer.is_closing() and not reader.at_eof():
474480
return reader, writer
475481
def close(self):
476482
self.closed = True
@@ -524,7 +530,7 @@ async def handler(reader, writer, **kw):
524530
auth = b'\x01'+auth
525531
if auth:
526532
try:
527-
assert auth == (await reader.read_n(len(auth)))
533+
assert auth == (await asyncio.wait_for(reader.readexactly(len(auth)), timeout=SOCKET_TIMEOUT))
528534
except Exception:
529535
return
530536
await self.conn.put((reader, writer))

0 commit comments

Comments
 (0)