Skip to content

Commit 01cf81b

Browse files
author
no5ix
committed
rudp server basically fin
1 parent ad8c034 commit 01cf81b

27 files changed

+96
-66
lines changed

pycharm2020.1.3/.idea/pycharm2020.1.3.iml

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pycharm2020.1.3/script/ConnBase.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ def __init__(
5454
transport: transports.BaseTransport = None
5555
):
5656

57-
self._conn_type = None
57+
self._proto_type = None
5858

5959
self._try_connect_times = 0
6060

@@ -96,6 +96,9 @@ def __init__(
9696
else:
9797
self._conn_state = CONN_STATE_CONNECTED
9898

99+
def get_proto_type(self):
100+
return self._proto_type
101+
99102
# @wait_or_not()
100103
async def try_connect(self) -> bool:
101104
raise NotImplementedError

pycharm2020.1.3/script/ConnMgr.py

Lines changed: 48 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,16 @@
1313
from core.util.TimerHub import TimerHub
1414
from core.util.UtilApi import Singleton
1515

16-
from core.common.sanic_jwt_extended import JWT, refresh_jwt_required, jwt_required, jwt_optional
17-
from core.common.sanic_jwt_extended.tokens import Token
16+
from sanic_jwt_extended import JWT
17+
from sanic_jwt_extended.tokens import Token
1818

1919
if typing.TYPE_CHECKING:
20-
from RpcHandler import RpcHandler, RPC_TYPE_REQUEST
20+
from RpcHandler import RpcHandler
2121
import TcpConn
2222

2323

24-
CONN_TYPE_TCP = 0
25-
CONN_TYPE_RUDP = 1
24+
PROTO_TYPE_TCP = 0
25+
PROTO_TYPE_RUDP = 1
2626

2727
ROLE_TYPE_ACTIVE = 0
2828
ROLE_TYPE_PASSIVE = 1
@@ -43,13 +43,13 @@ def connection_made(self, transport: transports.BaseTransport) -> None:
4343
addr = transport.get_extra_info('peername') # type: typing.Tuple[str, int]
4444
if self._role_type == ROLE_TYPE_PASSIVE:
4545
self._conn = ConnMgr.instance().add_incoming_conn(
46-
CONN_TYPE_TCP, transport, addr
46+
PROTO_TYPE_TCP, transport, addr
4747
# self._rpc_handler
4848
)
4949
else:
50-
self._conn = ConnMgr.instance().get_conn(addr, CONN_TYPE_TCP)
50+
self._conn = ConnMgr.instance().get_conn(addr, PROTO_TYPE_TCP)
5151
assert self._conn
52-
LogManager.get_logger().info(f"{addr!r} is connected !!!!")
52+
LogManager.get_logger().info(f"TCP {addr!r} is connected !!!!")
5353

5454
def data_received(self, data: bytes) -> None:
5555
self._conn.handle_read(data)
@@ -70,6 +70,9 @@ class RudpProtocol(asyncio.DatagramProtocol):
7070
# def __init__(self, create_kcp_conn_cb):
7171
def __init__(self):
7272
# self._create_kcp_conn_cb = create_kcp_conn_cb
73+
with JWT.initialize() as manager:
74+
manager.config.secret_key = "new_byte"
75+
7376
self.transport = None
7477

7578
def connection_made(self, transport):
@@ -82,6 +85,7 @@ def datagram_received(self, data: bytes, addr):
8285
global RUDP_HANDSHAKE_ACK_PREFIX
8386
# assert callable(self._create_kcp_conn_cb)
8487
# self._create_kcp_conn_cb(addr)
88+
# print(f'datagram_received: {data=}, {addr=}')
8589
if data == RUDP_HANDSHAKE_SYN:
8690
RUDP_CONV += 1
8791
access_token_jwt: bytes = JWT.create_access_token(
@@ -96,14 +100,14 @@ def datagram_received(self, data: bytes, addr):
96100
token_obj = Token(raw_jwt)
97101
if token_obj.type != "access":
98102
raise Exception("Only access tokens are allowed")
99-
conv = token_obj.identity
103+
conv = int(token_obj.identity)
100104

101105
ConnMgr.instance().add_incoming_conn(
102-
CONN_TYPE_RUDP, self.transport, addr, rudp_conv=conv
106+
PROTO_TYPE_RUDP, self.transport, addr, rudp_conv=conv
103107
# self._rpc_handler
104108
)
105109
# addr = transport.get_extra_info('peername') # type: typing.Tuple[str, int]
106-
LogManager.get_logger().info(f"{addr!r} is connected !!!!")
110+
LogManager.get_logger().info(f"RUDP {addr!r} is connected !!!!")
107111
elif data.startswith(RUDP_HANDSHAKE_SYN_ACK_PREFIX):
108112
parts = data.split(RUDP_HANDSHAKE_SYN_ACK_PREFIX, 2)
109113
if len(parts) != 2:
@@ -114,22 +118,18 @@ def datagram_received(self, data: bytes, addr):
114118
if token_obj.type != "access":
115119
raise Exception("Only access tokens are allowed")
116120
self.transport.sendto(RUDP_HANDSHAKE_ACK_PREFIX + raw_jwt.encode(), addr)
117-
conv = token_obj.identity
118-
try:
119-
ConnMgr.instance().set_fut_result(addr, conv)
120-
except asyncio.InvalidStateError:
121-
# 并发情况即使`fut`已经done了也无所谓, 不处理即可
122-
pass
121+
conv = int(token_obj.identity)
122+
ConnMgr.instance().set_fut_result(addr, conv)
123123

124124
# ConnMgr.instance().add_incoming_conn(
125125
# ROLE_TYPE_ACTIVE, CONN_TYPE_RUDP, self.transport,
126126
# rudp_conv=conv, rudp_peer_addr=addr
127127
# # self._rpc_handler
128128
# )
129129
# addr = transport.get_extra_info('peername') # type: typing.Tuple[str, int]
130-
LogManager.get_logger().info(f"{addr!r} is connected !!!!")
130+
LogManager.get_logger().info(f"RUDP {addr!r} is connected !!!!")
131131
else:
132-
_cur_conn = ConnMgr.instance().get_conn(addr, CONN_TYPE_RUDP)
132+
_cur_conn = ConnMgr.instance().get_conn(addr, PROTO_TYPE_RUDP)
133133
assert _cur_conn
134134
_cur_conn.handle_read(data)
135135

@@ -141,7 +141,7 @@ def __init__(self):
141141
self._timer_hub = TimerHub()
142142
self._logger = LogManager.get_logger()
143143
self._is_proxy = False
144-
self._conn_type_2_addr_2_conn = {CONN_TYPE_TCP: {}, CONN_TYPE_RUDP: {}}
144+
self._proto_type_2_addr_2_conn = {PROTO_TYPE_TCP: {}, PROTO_TYPE_RUDP: {}}
145145
self._addr_2_try_connect_times = defaultdict(int)
146146
self._addr_2_rudp_conned_fut = {}
147147

@@ -156,58 +156,68 @@ def final_fut_cb(fut, _addr=addr):
156156
_fut.add_done_callback(final_fut_cb)
157157
return _fut
158158

159+
def set_fut_result(self, addr, conv):
160+
_fut = self._addr_2_rudp_conned_fut.get(addr, None)
161+
if _fut is None:
162+
return
163+
try:
164+
_fut.set_result(conv)
165+
except asyncio.InvalidStateError:
166+
# 并发情况即使`fut`已经done了也无所谓, 不处理即可
167+
pass
168+
159169
def set_is_proxy(self, is_proxy):
160170
self._is_proxy = is_proxy
161171

162-
def get_conn(self, addr, conn_type=None) -> ConnBase:
163-
if conn_type is not None:
164-
return self._conn_type_2_addr_2_conn[conn_type].get(addr, None)
165-
_conn = self._conn_type_2_addr_2_conn[CONN_TYPE_RUDP].get(addr, None)
172+
def get_conn(self, addr, proto_type=None) -> ConnBase:
173+
if proto_type is not None:
174+
return self._proto_type_2_addr_2_conn[proto_type].get(addr, None)
175+
_conn = self._proto_type_2_addr_2_conn[PROTO_TYPE_RUDP].get(addr, None)
166176
if _conn is None:
167-
_conn = self._conn_type_2_addr_2_conn[CONN_TYPE_TCP].get(addr, None)
177+
_conn = self._proto_type_2_addr_2_conn[PROTO_TYPE_TCP].get(addr, None)
168178
return _conn
169179

170180
def add_incoming_conn(
171-
self, conn_type, transport: transports.BaseTransport,
181+
self, proto_type, transport: transports.BaseTransport,
172182
peer_addr: Optional[Tuple[str, int]] = None,
173183
rudp_conv: int = None
174184
# rpc_handler: Optional[RpcHandler] = None
175185
) -> ConnBase:
176-
if conn_type == CONN_TYPE_TCP:
186+
if proto_type == PROTO_TYPE_TCP:
177187
from TcpConn import TcpConn
178188
_conn = TcpConn(
179189
ROLE_TYPE_PASSIVE, peer_addr,
180-
close_cb=lambda ct=conn_type, a=peer_addr: self._remove_conn(ct, a),
190+
close_cb=lambda ct=proto_type, a=peer_addr: self._remove_conn(ct, a),
181191
is_proxy=self._is_proxy,
182192
transport=transport)
183193
else:
184194
from RudpConn import RudpConn
185195
assert rudp_conv > 0
186196
_conn = RudpConn(
187197
ROLE_TYPE_PASSIVE, peer_addr,
188-
close_cb=lambda ct=conn_type, a=peer_addr: self._remove_conn(ct, a),
198+
close_cb=lambda ct=proto_type, a=peer_addr: self._remove_conn(ct, a),
189199
is_proxy=self._is_proxy,
190200
transport=transport,
191201
conv=rudp_conv)
192202

193-
self._conn_type_2_addr_2_conn[conn_type][peer_addr] = _conn
203+
self._proto_type_2_addr_2_conn[proto_type][peer_addr] = _conn
194204
return _conn
195205

196206
async def open_conn_by_addr(
197-
self, conn_type, addr: typing.Tuple[str, int],
207+
self, proto_type, addr: typing.Tuple[str, int],
198208
rpc_handler: RpcHandler = None
199209
) -> (ConnBase, bool):
200210

201-
_conn = self._conn_type_2_addr_2_conn[conn_type].get(addr, None)
211+
_conn = self._proto_type_2_addr_2_conn[proto_type].get(addr, None)
202212
is_conned = True
203213
if _conn is None:
204-
if conn_type == CONN_TYPE_TCP:
214+
if proto_type == PROTO_TYPE_TCP:
205215
from TcpConn import TcpConn
206216
conn_cls = TcpConn
207217
# addr = transport.get_extra_info('peername') # type: typing.Tuple[str, int]
208218
# _conn = TcpConn.TcpConn(
209219
# ROLE_TYPE_ACTIVE, addr,
210-
# close_cb=lambda ct=conn_type, a=addr: self._remove_conn(ct, a),
220+
# close_cb=lambda ct=proto_type, a=addr: self._remove_conn(ct, a),
211221
# is_proxy=self._is_proxy,
212222
# # transport=transport
213223
# )
@@ -221,19 +231,19 @@ async def open_conn_by_addr(
221231
_conn = conn_cls(
222232
ROLE_TYPE_ACTIVE, addr,
223233
rpc_handler=rpc_handler,
224-
close_cb=lambda ct=conn_type, a=addr: self._remove_conn(ct, a),
234+
close_cb=lambda ct=proto_type, a=addr: self._remove_conn(ct, a),
225235
is_proxy=self._is_proxy,
226236
# transport=transport
227237
)
228238

229-
self._conn_type_2_addr_2_conn[conn_type][addr] = _conn
239+
self._proto_type_2_addr_2_conn[proto_type][addr] = _conn
230240
# if rpc_handler is not None:
231241
# _conn.add_rpc_handler(rpc_handler)
232242
is_conned = await _conn.try_connect()
233243
return _conn, is_conned
234244

235-
def _remove_conn(self, conn_type, addr: typing.Tuple[str, int]):
236-
self._conn_type_2_addr_2_conn[conn_type].pop(addr, None)
245+
def _remove_conn(self, proto_type, addr: typing.Tuple[str, int]):
246+
self._proto_type_2_addr_2_conn[proto_type].pop(addr, None)
237247

238248
# def add_incoming_conn(
239249
# self,

pycharm2020.1.3/script/RpcHandler.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
import core.util.UtilApi
1515
from ConnBase import ConnBase
16-
from ConnMgr import ConnMgr, CONN_TYPE_TCP, CONN_TYPE_RUDP
16+
from ConnMgr import ConnMgr, PROTO_TYPE_TCP, PROTO_TYPE_RUDP
1717
from core.common.IdManager import IdManager
1818
from core.mobilelog.LogManager import AsyncLogger
1919
from core.util.TimerHub import TimerHub
@@ -60,11 +60,15 @@ class RpcHandler:
6060

6161
def __init__(
6262
self, rpc_handler_id: bytes, conn: ConnBase = None,
63-
entity: ServerEntity = None):
63+
entity: ServerEntity = None, conn_proto_type: int = PROTO_TYPE_TCP):
6464
self.rpc_handler_id = rpc_handler_id
6565
self._logger = LogManager.get_logger() # type: AsyncLogger
6666
# self._logger.info(f'!!! xx {rpc_handler_id=}')
67-
self._conn = conn # type: TcpConn
67+
self._conn = conn # type: ConnBase
68+
if conn:
69+
self._conn_proto_type = conn.get_proto_type()
70+
else:
71+
self._conn_proto_type = conn_proto_type
6872
self._entity = entity # type: ServerEntity
6973
self._next_reply_id = 0
7074
self._pending_requests = {} # type: typing.Dict[int, RpcReplyFuture]
@@ -210,8 +214,9 @@ async def _handle_create_conn(self, addr: typing.Tuple[str, int] = None):
210214
return
211215
addr = self._conn.get_addr()
212216
# self._try_connect_times += 1
213-
self._conn, is_conned = await ConnMgr.instance().open_conn_by_addr(CONN_TYPE_TCP, addr, self)
214-
# self._conn, is_conned = await ConnMgr.instance().open_conn_by_addr(CONN_TYPE_RUDP, addr, self)
217+
# self._conn, is_conned = await ConnMgr.instance().open_conn_by_addr(CONN_TYPE_TCP, addr, self)
218+
self._conn, is_conned = await ConnMgr.instance().open_conn_by_addr(
219+
self._conn_proto_type, addr, self)
215220
if not is_conned:
216221
for _msg in self._msg_buffer:
217222
if _msg[0] == RPC_TYPE_REQUEST:

pycharm2020.1.3/script/RudpConn.py

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
from ConnBase import HEARTBEAT_TIMEOUT, HEARTBEAT_INTERVAL, ConnBase, RECONNECT_MAX_TIMES, RECONNECT_INTERVAL, \
1111
CONN_STATE_CONNECTING, CONN_STATE_CONNECTED, STRUCT_PACK_FORMAT
12-
from ConnMgr import CONN_TYPE_TCP, CONN_TYPE_RUDP, RudpProtocol, RUDP_HANDSHAKE_SYN, ConnMgr, ROLE_TYPE_PASSIVE
12+
from ConnMgr import PROTO_TYPE_TCP, PROTO_TYPE_RUDP, RudpProtocol, RUDP_HANDSHAKE_SYN, ConnMgr, ROLE_TYPE_PASSIVE
1313
from common import gv
1414
from core.common import rudp
1515
from core.common.IdManager import IdManager
@@ -39,9 +39,8 @@ def __init__(
3939
transport: transports.BaseTransport = None,
4040
conv: int = None,
4141
):
42-
self._conn_type = CONN_TYPE_RUDP
43-
4442
super(RudpConn, self).__init__(role_type, addr, rpc_handler, close_cb, is_proxy, transport)
43+
self._proto_type = PROTO_TYPE_RUDP
4544
self._conv = conv
4645
if self._role_type == ROLE_TYPE_PASSIVE:
4746
self._init_kcp()
@@ -53,14 +52,12 @@ def _init_kcp(self):
5352

5453
async def try_connect(self) -> bool:
5554
self._conn_state = CONN_STATE_CONNECTING
56-
5755
while self._try_connect_times < RECONNECT_MAX_TIMES:
5856
self._try_connect_times += 1
59-
transport, protocol = await gv.get_ev_loop().create_datagram_endpoint(
57+
self._transport, protocol = await gv.get_ev_loop().create_datagram_endpoint(
6058
lambda: RudpProtocol(),
6159
remote_addr=self._addr)
6260
self._transport.sendto(RUDP_HANDSHAKE_SYN)
63-
self._transport = transport
6461
fut = ConnMgr.instance().create_rudp_conned_fut(self._addr)
6562
# self._logger.error(str(e))
6663
# await asyncio.sleep(RECONNECT_INTERVAL)
@@ -69,7 +66,7 @@ async def try_connect(self) -> bool:
6966
conv = await asyncio.wait_for(
7067
asyncio.shield(fut), timeout=RECONNECT_INTERVAL)
7168
except asyncio.exceptions.TimeoutError:
72-
pass
69+
self._logger.warning(f"try reconnect rudp: {str(self._addr)} ... {self._try_connect_times}")
7370
else:
7471
self._conv = conv
7572
self.set_connection_state(CONN_STATE_CONNECTED)
@@ -78,7 +75,7 @@ async def try_connect(self) -> bool:
7875
return True
7976
else:
8077
pass # todo
81-
self._logger.error(f"try {RECONNECT_MAX_TIMES} times , still can't connect remote addr: {addr}")
78+
self._logger.error(f"try {RECONNECT_MAX_TIMES} times , still can't connect rudp remote addr: {self._addr}")
8279
self._try_connect_times = 0
8380
return False
8481

pycharm2020.1.3/script/RudpServer.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818
import typing
1919

2020
# from core.util.performance.cpu_load_handler import CpuLoad
21+
from sanic_jwt_extended.jwt_manager import JWT
22+
2123
import ConnBase
2224
# from ConnBase import ROLE_TYPE_PASSIVE
23-
from ConnMgr import ConnMgr, CONN_TYPE_RUDP, RudpProtocol
25+
from ConnMgr import ConnMgr, PROTO_TYPE_RUDP, RudpProtocol
2426
from ProxyRpcHandler import ProxyCliRpcHandler
2527
from core.util import UtilApi
2628
from core.util.UtilApi import wait_or_not, Singleton

pycharm2020.1.3/script/TcpConn.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
from ConnBase import HEARTBEAT_TIMEOUT, HEARTBEAT_INTERVAL, ConnBase, RPC_HANDLER_ID_LEN, RECONNECT_MAX_TIMES, \
1111
RECONNECT_INTERVAL, CONN_STATE_CONNECTED, CONN_STATE_CONNECTING
12-
from ConnMgr import CONN_TYPE_TCP, ROLE_TYPE_ACTIVE
12+
from ConnMgr import PROTO_TYPE_TCP, ROLE_TYPE_ACTIVE
1313
from common import gv
1414
from core.common.IdManager import IdManager
1515
from core.util.TimerHub import TimerHub
@@ -38,7 +38,7 @@ def __init__(
3838
transport: transports.BaseTransport = None
3939
):
4040
super(TcpConn, self).__init__(role_type, addr, rpc_handler, close_cb, is_proxy, transport)
41-
self._conn_type = CONN_TYPE_TCP
41+
self._proto_type = PROTO_TYPE_TCP
4242

4343
# @wait_or_not()
4444
async def try_connect(self) -> bool:
@@ -62,7 +62,7 @@ async def try_connect(self) -> bool:
6262
return True
6363
else:
6464
pass # todo
65-
self._logger.error(f"try {RECONNECT_MAX_TIMES} times , still can't connect remote addr: {addr}")
65+
self._logger.error(f"try {RECONNECT_MAX_TIMES} times , still can't connect tcp remote addr: {self._addr}")
6666
self._try_connect_times = 0
6767
return False
6868

pycharm2020.1.3/script/TcpServer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
import ConnBase
2323
# from ConnBase import ROLE_TYPE_PASSIVE, ROLE_TYPE_ACTIVE
24-
from ConnMgr import ConnMgr, CONN_TYPE_TCP, TcpProtocol, ROLE_TYPE_PASSIVE
24+
from ConnMgr import ConnMgr, PROTO_TYPE_TCP, TcpProtocol, ROLE_TYPE_PASSIVE
2525
from ProxyRpcHandler import ProxyCliRpcHandler
2626
from ServerBase import ServerBase
2727
from core.util import UtilApi

0 commit comments

Comments
 (0)