Skip to content

Commit 7e5aaae

Browse files
author
no5ix
committed
rudp server ing...
1 parent adbd2dd commit 7e5aaae

File tree

12 files changed

+401
-178
lines changed

12 files changed

+401
-178
lines changed

pycharm2020.1.3/script/ConnBase.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from asyncio.exceptions import CancelledError
99

1010
# from ConnBase import HEARTBEAT_TIMEOUT, HEARTBEAT_INTERVAL, ROLE_TYPE_ACTIVE, ConnBase
11+
from ConnMgr import ROLE_TYPE_ACTIVE
1112
from common import gv
1213
from core.common.IdManager import IdManager
1314
from core.util.TimerHub import TimerHub
@@ -24,9 +25,6 @@
2425
HEARTBEAT_TIMEOUT = 8
2526
HEARTBEAT_INTERVAL = 6
2627

27-
ROLE_TYPE_ACTIVE = 0
28-
ROLE_TYPE_PASSIVE = 1
29-
3028
CONN_STATE_CONNECTING = 0
3129
CONN_STATE_CONNECTED = 1
3230
CONN_STATE_DISCONNECTING = 2
@@ -53,6 +51,9 @@ def __init__(
5351
):
5452
if role_type == ROLE_TYPE_ACTIVE:
5553
assert (rpc_handler is not None)
54+
55+
self._conn_type = None
56+
5657
self._is_connected = True
5758

5859
self._role_type = role_type

pycharm2020.1.3/script/ConnMgr.py

Lines changed: 153 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,82 +1,205 @@
11
from __future__ import annotations
2+
3+
import datetime
24
import typing
35
import asyncio
46
from asyncio import transports
7+
from collections import defaultdict
58
from typing import Optional, Tuple
69

710
import ConnBase
8-
from ConnBase import ROLE_TYPE_ACTIVE
9-
from RudpConn import RudpConn
1011
from common import gv
11-
import TcpConn
1212
from core.mobilelog.LogManager import LogManager
13+
from core.util.TimerHub import TimerHub
1314
from core.util.UtilApi import Singleton
1415

16+
from core.common.sanic_jwt_extended import JWT, refresh_jwt_required, jwt_required, jwt_optional
17+
from core.common.sanic_jwt_extended.tokens import Token
18+
1519
if typing.TYPE_CHECKING:
16-
from RpcHandler import RpcHandler
20+
from RpcHandler import RpcHandler, RPC_TYPE_REQUEST
21+
import TcpConn
1722

1823

1924
CONN_TYPE_TCP = 0
2025
CONN_TYPE_RUDP = 1
2126

27+
RECONNECT_MAX_TIMES = 6
28+
RECONNECT_INTERVAL = 0.6 # sec
29+
30+
ROLE_TYPE_ACTIVE = 0
31+
ROLE_TYPE_PASSIVE = 1
32+
33+
34+
class TcpServerProtocol(asyncio.Protocol):
35+
# def __init__(self, role_type, create_tcp_conn_cb):
36+
def __init__(self):
37+
# def __init__(self, rpc_handler=None):
38+
# self._role_type = role_type
39+
# self._create_tcp_conn_cb = create_tcp_conn_cb
40+
self._conn = None # type: Optional[TcpConn.TcpConn]
41+
# self._rpc_handler = rpc_handler # type: Optional[RpcHandler]
42+
43+
def connection_made(self, transport: transports.BaseTransport) -> None:
44+
# assert callable(self._create_tcp_conn_cb)
45+
# self._conn = self._create_tcp_conn_cb(self._role_type, transport)
46+
self._conn = ConnMgr.instance().add_conn(
47+
ROLE_TYPE_PASSIVE, CONN_TYPE_TCP, transport,
48+
# self._rpc_handler
49+
)
50+
51+
addr = transport.get_extra_info('peername') # type: typing.Tuple[str, int]
52+
LogManager.get_logger().info(f"{addr!r} is connected !!!!")
53+
54+
def data_received(self, data: bytes) -> None:
55+
self._conn.handle_read(data)
56+
57+
def connection_lost(self, exc: Optional[Exception]) -> None:
58+
self._conn.handle_close(str(exc))
59+
60+
61+
# {kSyn = 66, kAck, kPsh, kRst};
62+
RUDP_HANDSHAKE_SYN = b'new_byte_hello_its_me'
63+
RUDP_HANDSHAKE_SYN_ACK_PREFIX = b'new_byte_welcome:'
64+
RUDP_HANDSHAKE_ACK_PREFIX = b'new_byte_ack:'
65+
66+
RUDP_CONV = 0
67+
68+
69+
class RudpServerProtocol(asyncio.DatagramProtocol):
70+
# def __init__(self, create_kcp_conn_cb):
71+
def __init__(self):
72+
# self._create_kcp_conn_cb = create_kcp_conn_cb
73+
self.transport = None
74+
75+
def connection_made(self, transport):
76+
self.transport = transport
77+
78+
def datagram_received(self, data: bytes, addr):
79+
global RUDP_CONV
80+
global RUDP_HANDSHAKE_SYN
81+
global RUDP_HANDSHAKE_SYN_ACK_PREFIX
82+
global RUDP_HANDSHAKE_ACK_PREFIX
83+
# assert callable(self._create_kcp_conn_cb)
84+
# self._create_kcp_conn_cb(addr)
85+
if data == RUDP_HANDSHAKE_SYN:
86+
RUDP_CONV += 1
87+
access_token_jwt: bytes = JWT.create_access_token(
88+
identity=str(RUDP_CONV), expires_delta=datetime.timedelta(seconds=6)).encode()
89+
self.transport.sendto(RUDP_HANDSHAKE_SYN_ACK_PREFIX + access_token_jwt, addr)
90+
elif data.startswith(RUDP_HANDSHAKE_ACK_PREFIX):
91+
parts = data.split(RUDP_HANDSHAKE_ACK_PREFIX, 2)
92+
if len(parts) != 2:
93+
raise Exception(f"Expected value '{RUDP_HANDSHAKE_ACK_PREFIX}<JWT>'")
94+
raw_jwt = parts[1].decode()
95+
token_obj = Token(raw_jwt)
96+
if token_obj.type != "access":
97+
raise Exception("Only access tokens are allowed")
98+
conv = token_obj.identity
99+
100+
ConnMgr.instance().add_conn(
101+
ROLE_TYPE_PASSIVE, CONN_TYPE_RUDP, self.transport,
102+
rudp_conv=conv, rudp_peer_addr=addr
103+
# self._rpc_handler
104+
)
105+
# addr = transport.get_extra_info('peername') # type: typing.Tuple[str, int]
106+
LogManager.get_logger().info(f"{addr!r} is connected !!!!")
107+
else:
108+
_cur_conn = ConnMgr.instance().get_conn(addr, CONN_TYPE_RUDP)
109+
if _cur_conn:
110+
_cur_conn.handle_read(data)
111+
22112

23113
@Singleton
24114
class ConnMgr:
25115

26116
def __init__(self):
27-
self._addr_2_conn_map = {} # type: typing.Dict[typing.Tuple[str, int], TcpConn.TcpConn]
117+
self._timer_hub = TimerHub()
28118
self._logger = LogManager.get_logger()
29119
self._is_proxy = False
120+
self._conn_type_2_addr_2_conn = {CONN_TYPE_TCP: {}, CONN_TYPE_RUDP: {}}
121+
self._addr_2_try_connect_times = defaultdict(int)
30122

31123
def set_is_proxy(self, is_proxy):
32124
self._is_proxy = is_proxy
33125

34-
def get_conn(self, addr, conn_type) -> ConnBase:
35-
return self._addr_2_conn_map.get(addr, None)
126+
def get_conn(self, addr, conn_type=None) -> ConnBase:
127+
if conn_type is not None:
128+
return self._conn_type_2_addr_2_conn[conn_type].get(addr, None)
129+
_conn = self._conn_type_2_addr_2_conn[CONN_TYPE_RUDP].get(addr, None)
130+
if _conn is None:
131+
_conn = self._conn_type_2_addr_2_conn[CONN_TYPE_TCP].get(addr, None)
132+
return _conn
36133

37-
def create_conn(
134+
def add_conn(
38135
self, role_type, conn_type, transport: transports.BaseTransport,
39136
rudp_conv: int = 0, rudp_peer_addr: Optional[Tuple[str, int]] = None
40137
# rpc_handler: Optional[RpcHandler] = None
41-
) -> TcpConn.TcpConn:
42-
addr = transport.get_extra_info('peername') # type: typing.Tuple[str, int]
43-
138+
) -> ConnBase:
44139
if conn_type == CONN_TYPE_TCP:
140+
import TcpConn
141+
addr = transport.get_extra_info('peername') # type: typing.Tuple[str, int]
45142
_conn = TcpConn.TcpConn(
46-
role_type, addr, close_cb=self._remove_conn, is_proxy=self._is_proxy,
143+
role_type, addr,
144+
close_cb=lambda ct=conn_type, a=addr: self._remove_conn(ct, a),
145+
is_proxy=self._is_proxy,
47146
transport=transport)
48147
else:
49148
pass # todo: rudp
149+
from RudpConn import RudpConn
50150
assert rudp_conv > 0
151+
addr = rudp_peer_addr
51152
_conn = RudpConn(
52-
rudp_conv, role_type, rudp_peer_addr, close_cb=self._remove_conn, is_proxy=self._is_proxy,
153+
rudp_conv, role_type, addr,
154+
close_cb=lambda ct=conn_type, a=addr: self._remove_conn(ct, a),
155+
is_proxy=self._is_proxy,
53156
transport=transport)
54157

55-
self._addr_2_conn_map[addr] = _conn
158+
self._conn_type_2_addr_2_conn[conn_type][addr] = _conn
56159
return _conn
57160

58-
async def get_conn_by_addr(
161+
async def create_conn_by_addr(
59162
self, conn_type, addr: typing.Tuple[str, int],
60163
rpc_handler: RpcHandler = None
61-
) -> TcpConn:
62-
from TcpServer import TcpServerProtocol
63-
_conn = self._addr_2_conn_map.get(addr, None)
164+
) -> ConnBase:
165+
_conn = self._conn_type_2_addr_2_conn[conn_type].get(addr, None)
64166
if _conn is None:
65-
if conn_type == CONN_TYPE_TCP:
66-
# reader, writer = await asyncio.open_connection(addr[0], addr[1])
67-
transport, protocol = await gv.EV_LOOP.create_connection(
167+
if conn_type == CONN_TYPE_RUDP:
168+
_conn = await self._try_create_rudp_conn(addr)
169+
if _conn is None:
170+
_conn = await self._try_create_tcp_conn(addr)
171+
if _conn is not None and rpc_handler is not None:
172+
_conn.add_rpc_handler(rpc_handler)
173+
return _conn
174+
175+
async def _try_create_rudp_conn(self, addr):
176+
pass
177+
178+
async def _try_create_tcp_conn(self, addr):
179+
while self._addr_2_try_connect_times[addr] < RECONNECT_MAX_TIMES:
180+
try:
181+
from TcpServer import TcpServerProtocol
182+
self._addr_2_try_connect_times[addr] += 1
183+
transport, protocol = await gv.get_ev_loop().create_connection(
68184
lambda: TcpServerProtocol(),
69-
# lambda: TcpServerProtocol(rpc_handler),
70185
addr[0], addr[1])
71-
_conn = self._addr_2_conn_map[addr]
186+
except Exception as e:
187+
self._logger.error(str(e))
188+
await asyncio.sleep(RECONNECT_INTERVAL)
189+
# if self._addr_2_try_connect_times[addr] < RECONNECT_MAX_TIMES:
190+
self._logger.warning(f"try reconnect tcp: {str(addr)} ... {self._addr_2_try_connect_times[addr]}")
191+
await self._try_create_tcp_conn(addr)
72192
else:
73-
pass # todo: rudp
74-
if rpc_handler is not None:
75-
_conn.add_rpc_handler(rpc_handler)
76-
return _conn
193+
self._addr_2_try_connect_times[addr] = 0
194+
return self._conn_type_2_addr_2_conn[CONN_TYPE_TCP][addr]
195+
else:
196+
pass # todo
197+
self._logger.error(f"try {RECONNECT_MAX_TIMES} times , still can't connect remote addr: {addr}")
198+
self._addr_2_try_connect_times[addr] = 0
199+
return None
77200

78-
def _remove_conn(self, addr: typing.Tuple[str, int]):
79-
self._addr_2_conn_map.pop(addr, None)
201+
def _remove_conn(self, conn_type, addr: typing.Tuple[str, int]):
202+
self._conn_type_2_addr_2_conn[conn_type].pop(addr, None)
80203

81204
# def add_conn(
82205
# self,
@@ -95,7 +218,7 @@ def _remove_conn(self, addr: typing.Tuple[str, int]):
95218
# self._addr_2_conn_map[addr] = conn
96219
# return conn
97220
#
98-
# async def get_conn_by_addr(
221+
# async def create_conn_by_addr(
99222
# self, addr: typing.Tuple[str, int], rpc_handler: RpcHandler = None) -> TcpConn:
100223
# _conn = self._addr_2_conn_map.get(addr, None)
101224
# if _conn is None:

pycharm2020.1.3/script/RpcHandler.py

Lines changed: 37 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,6 @@
3434
RPC_TYPE_REPLY = 2
3535
RPC_TYPE_HEARTBEAT = 3
3636

37-
RECONNECT_MAX_TIMES = 6
38-
RECONNECT_INTERVAL = 0.6 # sec
39-
4037
REQUEST_RPC_TIMEOUT = 3 # sec
4138

4239

@@ -73,7 +70,7 @@ def __init__(
7370
self._pending_requests = {} # type: typing.Dict[int, RpcReplyFuture]
7471
self._msg_buffer = [] # type: typing.List[typing.Tuple]
7572
self._timer_hub = TimerHub()
76-
self._try_connect_times = 0
73+
# self._try_connect_times = 0
7774

7875
self._is_destroyed = False
7976
self._create_conn_tasks = [] # type: typing.List[asyncio.Task]
@@ -212,19 +209,9 @@ async def _handle_create_conn(self, addr: typing.Tuple[str, int] = None):
212209
if not self._conn.is_active_role():
213210
return
214211
addr = self._conn.get_addr()
215-
self._try_connect_times += 1
216-
self._conn = await ConnMgr.instance().get_conn_by_addr(CONN_TYPE_TCP, addr, self)
217-
218-
self._logger.warning(f"{self._try_connect_times=}, {id(self._conn)=}")
219-
except asyncio.CancelledError:
220-
raise
221-
except Exception as e:
222-
self._logger.error(str(e))
223-
if self._try_connect_times < RECONNECT_MAX_TIMES:
224-
self._logger.warning(f"try reconnect {str(addr)} ... {self._try_connect_times}")
225-
self._timer_hub.call_later(RECONNECT_INTERVAL, lambda: self._handle_create_conn(addr))
226-
else:
227-
self._logger.error(f"try {RECONNECT_MAX_TIMES} times , still can't connect remote addr: {addr}")
212+
# self._try_connect_times += 1
213+
self._conn = await ConnMgr.instance().create_conn_by_addr(CONN_TYPE_TCP, addr, self)
214+
if self._conn is None:
228215
for _msg in self._msg_buffer:
229216
if _msg[0] == RPC_TYPE_REQUEST:
230217
_reply_id = _msg[1]
@@ -234,15 +221,39 @@ async def _handle_create_conn(self, addr: typing.Tuple[str, int] = None):
234221
f"fire pending requests future, rpc func: {_msg[3]}, reply_id: {_reply_id}")
235222
self._pending_requests[_reply_id].set_error_and_result(
236223
f"cant connect remote addr: {addr}", None)
237-
self._try_connect_times = 0
238-
return
239-
else:
240-
if self._try_connect_times > 1:
241-
self._logger.warning(f"sum {self._try_connect_times=}, {id(self._conn)=}")
242-
self._try_connect_times = 0
243-
for _msg in self._msg_buffer:
244-
self._conn.send_data_and_count(self.rpc_handler_id, _msg)
245-
self._msg_buffer.clear()
224+
else:
225+
for _msg in self._msg_buffer:
226+
self._conn.send_data_and_count(self.rpc_handler_id, _msg)
227+
self._msg_buffer.clear()
228+
229+
# self._logger.warning(f"{self._try_connect_times=}, {id(self._conn)=}")
230+
except asyncio.CancelledError:
231+
raise
232+
# except Exception as e:
233+
# self._logger.error(str(e))
234+
# if self._try_connect_times < RECONNECT_MAX_TIMES:
235+
# self._logger.warning(f"try reconnect {str(addr)} ... {self._try_connect_times}")
236+
# self._timer_hub.call_later(RECONNECT_INTERVAL, lambda: self._handle_create_conn(addr))
237+
# else:
238+
# self._logger.error(f"try {RECONNECT_MAX_TIMES} times , still can't connect remote addr: {addr}")
239+
# for _msg in self._msg_buffer:
240+
# if _msg[0] == RPC_TYPE_REQUEST:
241+
# _reply_id = _msg[1]
242+
# if _reply_id not in self._pending_requests:
243+
# continue
244+
# self._logger.warning(
245+
# f"fire pending requests future, rpc func: {_msg[3]}, reply_id: {_reply_id}")
246+
# self._pending_requests[_reply_id].set_error_and_result(
247+
# f"cant connect remote addr: {addr}", None)
248+
# self._try_connect_times = 0
249+
# return
250+
# else:
251+
# if self._try_connect_times > 1:
252+
# self._logger.warning(f"sum {self._try_connect_times=}, {id(self._conn)=}")
253+
# self._try_connect_times = 0
254+
# for _msg in self._msg_buffer:
255+
# self._conn.send_data_and_count(self.rpc_handler_id, _msg)
256+
# self._msg_buffer.clear()
246257

247258
@staticmethod
248259
async def handle_request_notify_rpc(_method, _method_name, _method_args, _method_kwargs):

pycharm2020.1.3/script/RudpConn.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77
from asyncio import transports
88
from asyncio.exceptions import CancelledError
99

10-
from ConnBase import HEARTBEAT_TIMEOUT, HEARTBEAT_INTERVAL, ROLE_TYPE_ACTIVE, ConnBase
10+
from ConnBase import HEARTBEAT_TIMEOUT, HEARTBEAT_INTERVAL, ConnBase
11+
from ConnMgr import CONN_TYPE_TCP, CONN_TYPE_RUDP
1112
from common import gv
1213
from core.common import rudp
1314
from core.common.IdManager import IdManager
@@ -39,6 +40,7 @@ def __init__(
3940
):
4041
super(RudpConn, self).__init__(role_type, addr, rpc_handler, close_cb, is_proxy, transport)
4142
self._conv = conv
43+
self._conn_type = CONN_TYPE_RUDP
4244

4345
self._kcp = rudp.Kcp(conv, self.send_data_internal)
4446
self._kcp.set_nodelay(nodelay=True, interval=10, resend=2, nocwnd=True)

0 commit comments

Comments
 (0)