Skip to content

Add Listener, Web server close on header, use Pipe instead of Manager in eventing core #720

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Nov 10, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
comma
  • Loading branch information
abhinavsingh committed Nov 10, 2021
commit 8a0668f580d6504eaf852ac2afed00b745ed861c
3 changes: 2 additions & 1 deletion proxy/common/flag.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ def initialize(
opts.get('hostname', ipaddress.ip_address(args.hostname)),
)
args.unix_socket_path = opts.get(
'unix_socket_path', args.unix_socket_path)
'unix_socket_path', args.unix_socket_path,
)
# AF_UNIX is not available on Windows
# See https://bugs.python.org/issue33408
if os.name != 'nt':
Expand Down
3 changes: 2 additions & 1 deletion proxy/core/event/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ def _broadcast(self, ev: Dict[str, Any]) -> None:
self.subscribers[sub_id].send(ev)
except BrokenPipeError:
logger.warning(
'Subscriber#%s broken pipe', sub_id)
'Subscriber#%s broken pipe', sub_id,
)
self.subscribers[sub_id].close()
broken_pipes.append(sub_id)
for sub_id in broken_pipes:
Expand Down
18 changes: 12 additions & 6 deletions proxy/core/event/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ def subscribe(self) -> None:
def unsubscribe(self) -> None:
if self.relay_sub_id is None:
logger.warning(
'Relay called unsubscribe without an active subscription')
'Relay called unsubscribe without an active subscription',
)
return
try:
self.event_queue.unsubscribe(self.relay_sub_id)
Expand All @@ -122,14 +123,17 @@ def relay(
ev = channel.recv()
if ev['event_name'] == eventNames.SUBSCRIBED:
logger.info(
'Subscriber#{0} subscribe ack received'.format(sub_id))
'Subscriber#{0} subscribe ack received'.format(sub_id),
)
elif ev['event_name'] == eventNames.UNSUBSCRIBED:
logger.info(
'Subscriber#{0} unsubscribe ack received'.format(sub_id))
'Subscriber#{0} unsubscribe ack received'.format(sub_id),
)
break
elif ev['event_name'] == eventNames.DISPATCHER_SHUTDOWN:
logger.info(
'Subscriber#{0} received dispatcher shutdown event'.format(sub_id))
'Subscriber#{0} received dispatcher shutdown event'.format(sub_id),
)
break
else:
callback(ev)
Expand All @@ -146,8 +150,10 @@ def _start_relay_thread(self, callback: Callable[[Dict[str, Any]], None]) -> Non
self.relay_recv, self.relay_send = multiprocessing.Pipe()
self.relay_thread = threading.Thread(
target=EventSubscriber.relay,
args=(self.relay_sub_id, self.relay_shutdown,
self.relay_recv, callback),
args=(
self.relay_sub_id, self.relay_shutdown,
self.relay_recv, callback,
),
)
self.relay_thread.daemon = True
self.relay_thread.start()
Expand Down
6 changes: 4 additions & 2 deletions proxy/dashboard/inspect_traffic.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ def handle_message(self, message: Dict[str, Any]) -> None:
WebsocketFrame.text(
bytes_(
json.dumps(
{'id': message['id'],
'response': 'not enabled'},
{
'id': message['id'],
'response': 'not enabled',
},
),
),
),
Expand Down
15 changes: 10 additions & 5 deletions tests/core/test_acceptor_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,22 @@ def test_setup_and_shutdown(
self.assertEqual(mock_send_handle.call_count, num_acceptors)

self.assertEqual(
mock_acceptor.call_args_list[0][1]['idd'], 0)
mock_acceptor.call_args_list[0][1]['idd'], 0,
)
self.assertEqual(
mock_acceptor.call_args_list[0][1]['fd_queue'], mock_pipe.return_value[1])
mock_acceptor.call_args_list[0][1]['fd_queue'], mock_pipe.return_value[1],
)
self.assertEqual(
mock_acceptor.call_args_list[0][1]['flags'], flags)
mock_acceptor.call_args_list[0][1]['flags'], flags,
)
self.assertEqual(
mock_acceptor.call_args_list[0][1]['event_queue'], None)
mock_acceptor.call_args_list[0][1]['event_queue'], None,
)
# executor_queues=[],
# executor_pids=[]
self.assertEqual(
mock_acceptor.call_args_list[1][1]['idd'], 1)
mock_acceptor.call_args_list[1][1]['idd'], 1,
)

acceptor1.start.assert_called_once()
acceptor2.start.assert_called_once()
Expand Down
9 changes: 6 additions & 3 deletions tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ def test_entry_point(
entry_point()
mock_event_manager.assert_not_called()
mock_listener.assert_called_once_with(
flags=mock_initialize.return_value)
flags=mock_initialize.return_value,
)
mock_executor_pool.assert_called_once_with(
flags=mock_initialize.return_value,
event_queue=None,
Expand Down Expand Up @@ -139,7 +140,8 @@ def test_main_with_no_flags(
main()
mock_event_manager.assert_not_called()
mock_listener.assert_called_once_with(
flags=mock_initialize.return_value)
flags=mock_initialize.return_value,
)
mock_executor_pool.assert_called_once_with(
flags=mock_initialize.return_value,
event_queue=None,
Expand Down Expand Up @@ -178,7 +180,8 @@ def test_enable_events(
mock_event_manager.return_value.setup.assert_called_once()
mock_event_manager.return_value.shutdown.assert_called_once()
mock_listener.assert_called_once_with(
flags=mock_initialize.return_value)
flags=mock_initialize.return_value,
)
mock_executor_pool.assert_called_once_with(
flags=mock_initialize.return_value,
event_queue=mock_event_manager.return_value.queue,
Expand Down