Skip to content

Commit d3cee32

Browse files
Pool (#694)
* Refactor pool * mypy fixes * Fix import (relative) * Add WebScraper example skeleton & ConnectionPool skeleton * Add ConnectionPool class * Integrate ConnectionPool with proxy server (experimental) * Lint fixes * Remove unused imports. TODO: Put pool behind a flag. Default to false for now * Make ConnectionPool multiprocess safe. Later we want to make it safe but without using locks * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Remove unused imports * Return created flag from acquire * Guard connection pool behind --enable-conn-pool flag * Flag belongs within connection pool class * spelling * self.upstream = None only for pool config Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent da23c7f commit d3cee32

File tree

13 files changed

+414
-76
lines changed

13 files changed

+414
-76
lines changed

examples/README.md

+18-6
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ Looking for `proxy.py` plugin examples? Check [proxy/plugin](https://github.com
66

77
Table of Contents
88
=================
9+
* [Generic Work Acceptor and Executor](#generic-work-acceptor-and-executor)
910
* [WebSocket Client](#websocket-client)
1011
* [TCP Echo Server](#tcp-echo-server)
1112
* [TCP Echo Client](#tcp-echo-client)
@@ -14,6 +15,17 @@ Table of Contents
1415
* [PubSub Eventing](#pubsub-eventing)
1516
* [Https Connect Tunnel](#https-connect-tunnel)
1617

18+
## Generic Work Acceptor and Executor
19+
20+
1. Makes use of `proxy.core.AcceptorPool` and `proxy.core.Work`
21+
2. Demonstrates how to perform generic work using `proxy.py` core.
22+
23+
Start `web_scraper.py` as:
24+
25+
```console
26+
PYTHONPATH=. python examples/web_scraper.py
27+
```
28+
1729
## WebSocket Client
1830

1931
1. Makes use of `proxy.http.websocket.WebsocketClient` which is built on-top of `asyncio`
@@ -22,7 +34,7 @@ Table of Contents
2234

2335
Start `websocket_client.py` as:
2436

25-
```bash
37+
```console
2638
PYTHONPATH=. python examples/websocket_client.py
2739
Received b'hello' after 306 millisec
2840
Received b'hello' after 308 millisec
@@ -44,7 +56,7 @@ Received b'hello' after 309 millisec
4456

4557
Start `tcp_echo_server.py` as:
4658

47-
```bash
59+
```console
4860
PYTHONPATH=. python examples/tcp_echo_server.py
4961
Connection accepted from ('::1', 53285, 0, 0)
5062
Connection closed by client ('::1', 53285, 0, 0)
@@ -57,7 +69,7 @@ Connection closed by client ('::1', 53285, 0, 0)
5769

5870
Start `tcp_echo_client.py` as:
5971

60-
```bash
72+
```console
6173
PYTHONPATH=. python examples/tcp_echo_client.py
6274
b'hello'
6375
b'hello'
@@ -81,7 +93,7 @@ KeyboardInterrupt
8193

8294
Start `ssl_echo_server.py` as:
8395

84-
```bash
96+
```console
8597
PYTHONPATH=. python examples/ssl_echo_server.py
8698
```
8799

@@ -92,7 +104,7 @@ Start `ssl_echo_server.py` as:
92104

93105
Start `ssl_echo_client.py` as:
94106

95-
```bash
107+
```console
96108
PYTHONPATH=. python examples/ssl_echo_client.py
97109
```
98110

@@ -107,7 +119,7 @@ Start `ssl_echo_client.py` as:
107119

108120
Start `pubsub_eventing.py` as:
109121

110-
```bash
122+
```console
111123
PYTHONPATH=. python examples/pubsub_eventing.py
112124
DEBUG:proxy.core.event.subscriber:Subscribed relay sub id 5eb22010764f4d44900f41e2fb408ca6 from core events
113125
publisher starting

examples/web_scraper.py

+70
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
proxy.py
4+
~~~~~~~~
5+
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
6+
Network monitoring, controls & Application development, testing, debugging.
7+
8+
:copyright: (c) 2013-present by Abhinav Singh and contributors.
9+
:license: BSD, see LICENSE for more details.
10+
"""
11+
import time
12+
import socket
13+
14+
from typing import Dict
15+
16+
from proxy.proxy import Proxy
17+
from proxy.core.acceptor import Work, AcceptorPool
18+
from proxy.common.types import Readables, Writables
19+
20+
21+
class WebScraper(Work):
22+
"""Demonstrates how to orchestrate a generic work acceptors and executors
23+
workflow using proxy.py core.
24+
25+
By default, `WebScraper` expects to receive work from a file on disk.
26+
Each line in the file must be a URL to scrape. Received URL is scrapped
27+
by the implementation in this class.
28+
29+
After scrapping, results are published to the eventing core. One or several
30+
result subscriber can then handle the result as necessary. Currently, result
31+
subscribers consume the scrapped response and write discovered URL in the
32+
file on the disk. This creates a feedback loop. Allowing WebScraper to
33+
continue endlessly.
34+
35+
NOTE: No loop detection is performed currently.
36+
37+
NOTE: File descriptor need not point to a file on disk.
38+
Example, file descriptor can be a database connection.
39+
For simplicity, imagine a Redis server connection handling
40+
only PUBSUB protocol.
41+
"""
42+
43+
def get_events(self) -> Dict[socket.socket, int]:
44+
"""Return sockets and events (read or write) that we are interested in."""
45+
return {}
46+
47+
def handle_events(
48+
self,
49+
readables: Readables,
50+
writables: Writables,
51+
) -> bool:
52+
"""Handle readable and writable sockets.
53+
54+
Return True to shutdown work."""
55+
return False
56+
57+
58+
if __name__ == '__main__':
59+
with AcceptorPool(
60+
flags=Proxy.initialize(
61+
port=12345,
62+
num_workers=1,
63+
threadless=True,
64+
keyfile='https-key.pem',
65+
certfile='https-signed-cert.pem',
66+
),
67+
work_klass=WebScraper,
68+
) as pool:
69+
while True:
70+
time.sleep(1)

proxy/core/acceptor/acceptor.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -74,17 +74,17 @@ def __init__(
7474
) -> None:
7575
super().__init__()
7676
self.flags = flags
77+
# Eventing core queue
78+
self.event_queue = event_queue
79+
# Index assigned by `AcceptorPool`
80+
self.idd = idd
7781
# Lock shared by all acceptor processes
7882
# to avoid concurrent accept over server socket
7983
self.lock = lock
80-
# Index assigned by `AcceptorPool`
81-
self.idd = idd
8284
# Queue over which server socket fd is received on start-up
8385
self.work_queue: connection.Connection = work_queue
8486
# Worker class
8587
self.work_klass = work_klass
86-
# Eventing core queue
87-
self.event_queue = event_queue
8888
# Selector & threadless states
8989
self.running = multiprocessing.Event()
9090
self.selector: Optional[selectors.DefaultSelector] = None

proxy/core/acceptor/pool.py

+48-42
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,13 @@
2525
from ..event import EventQueue
2626

2727
from ...common.flag import flags
28-
from ...common.constants import DEFAULT_BACKLOG, DEFAULT_IPV6_HOSTNAME, DEFAULT_NUM_WORKERS, DEFAULT_PORT
28+
from ...common.constants import DEFAULT_BACKLOG, DEFAULT_IPV6_HOSTNAME
29+
from ...common.constants import DEFAULT_NUM_WORKERS, DEFAULT_PORT
2930

3031
logger = logging.getLogger(__name__)
3132

32-
# Lock shared by worker processes
33+
# Lock shared by acceptors for
34+
# sequential acceptance of work.
3335
LOCK = multiprocessing.Lock()
3436

3537

@@ -61,20 +63,18 @@
6163

6264

6365
class AcceptorPool:
64-
"""AcceptorPool pre-spawns worker processes to utilize all cores available on the system.
65-
A server socket is initialized and dispatched over a pipe to these workers.
66-
Each worker process then concurrently accepts new client connection over
67-
the initialized server socket.
66+
"""AcceptorPool is a helper class which pre-spawns `Acceptor` processes
67+
to utilize all available CPU cores for accepting new work.
68+
69+
A file descriptor to consume work from is shared with `Acceptor` processes
70+
over a pipe. Each `Acceptor` process then concurrently accepts new work over
71+
the shared file descriptor.
6872
6973
Example usage:
7074
71-
pool = AcceptorPool(flags=..., work_klass=...)
72-
try:
73-
pool.setup()
75+
with AcceptorPool(flags=..., work_klass=...) as pool:
7476
while True:
7577
time.sleep(1)
76-
finally:
77-
pool.shutdown()
7878
7979
`work_klass` must implement `work.Work` class.
8080
"""
@@ -84,11 +84,16 @@ def __init__(
8484
work_klass: Type[Work], event_queue: Optional[EventQueue] = None,
8585
) -> None:
8686
self.flags = flags
87+
# Eventing core queue
88+
self.event_queue: Optional[EventQueue] = event_queue
89+
# File descriptor to use for accepting new work
8790
self.socket: Optional[socket.socket] = None
91+
# Acceptor process instances
8892
self.acceptors: List[Acceptor] = []
93+
# Work queue used to share file descriptor with acceptor processes
8994
self.work_queues: List[connection.Connection] = []
95+
# Work class implementation
9096
self.work_klass = work_klass
91-
self.event_queue: Optional[EventQueue] = event_queue
9297

9398
def __enter__(self) -> 'AcceptorPool':
9499
self.setup()
@@ -102,19 +107,43 @@ def __exit__(
102107
) -> None:
103108
self.shutdown()
104109

105-
def listen(self) -> None:
110+
def setup(self) -> None:
111+
"""Listen on port and setup acceptors."""
112+
self._listen()
113+
# Override flags.port to match the actual port
114+
# we are listening upon. This is necessary to preserve
115+
# the server port when `--port=0` is used.
116+
assert self.socket
117+
self.flags.port = self.socket.getsockname()[1]
118+
self._start_acceptors()
119+
# Send file descriptor to all acceptor processes.
120+
assert self.socket is not None
121+
for index in range(self.flags.num_workers):
122+
send_handle(
123+
self.work_queues[index],
124+
self.socket.fileno(),
125+
self.acceptors[index].pid,
126+
)
127+
self.work_queues[index].close()
128+
self.socket.close()
129+
130+
def shutdown(self) -> None:
131+
logger.info('Shutting down %d workers' % self.flags.num_workers)
132+
for acceptor in self.acceptors:
133+
acceptor.running.set()
134+
for acceptor in self.acceptors:
135+
acceptor.join()
136+
logger.debug('Acceptors shutdown')
137+
138+
def _listen(self) -> None:
106139
self.socket = socket.socket(self.flags.family, socket.SOCK_STREAM)
107140
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
108141
self.socket.bind((str(self.flags.hostname), self.flags.port))
109142
self.socket.listen(self.flags.backlog)
110143
self.socket.setblocking(False)
111-
# Override flags.port to match the actual port
112-
# we are listening upon. This is necessary to preserve
113-
# the server port when `--port=0` is used.
114-
self.flags.port = self.socket.getsockname()[1]
115144

116-
def start_workers(self) -> None:
117-
"""Start worker processes."""
145+
def _start_acceptors(self) -> None:
146+
"""Start acceptor processes."""
118147
for acceptor_id in range(self.flags.num_workers):
119148
work_queue = multiprocessing.Pipe()
120149
acceptor = Acceptor(
@@ -134,26 +163,3 @@ def start_workers(self) -> None:
134163
self.acceptors.append(acceptor)
135164
self.work_queues.append(work_queue[0])
136165
logger.info('Started %d workers' % self.flags.num_workers)
137-
138-
def shutdown(self) -> None:
139-
logger.info('Shutting down %d workers' % self.flags.num_workers)
140-
for acceptor in self.acceptors:
141-
acceptor.running.set()
142-
for acceptor in self.acceptors:
143-
acceptor.join()
144-
logger.debug('Acceptors shutdown')
145-
146-
def setup(self) -> None:
147-
"""Listen on port, setup workers and pass server socket to workers."""
148-
self.listen()
149-
self.start_workers()
150-
# Send server socket to all acceptor processes.
151-
assert self.socket is not None
152-
for index in range(self.flags.num_workers):
153-
send_handle(
154-
self.work_queues[index],
155-
self.socket.fileno(),
156-
self.acceptors[index].pid,
157-
)
158-
self.work_queues[index].close()
159-
self.socket.close()

proxy/core/base/tcp_server.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515
from abc import abstractmethod
1616
from typing import Dict, Any, Optional
1717

18-
from proxy.core.acceptor import Work
19-
from proxy.common.types import Readables, Writables
18+
from ...core.acceptor import Work
19+
from ...common.types import Readables, Writables
2020

2121
logger = logging.getLogger(__name__)
2222

proxy/core/base/tcp_tunnel.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@
88
:copyright: (c) 2013-present by Abhinav Singh and contributors.
99
:license: BSD, see LICENSE for more details.
1010
"""
11-
from abc import abstractmethod
1211
import socket
1312
import selectors
13+
14+
from abc import abstractmethod
1415
from typing import Any, Optional, Dict
1516

1617
from ...http.parser import HttpParser, httpParserTypes

proxy/core/connection/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@
1111
from .connection import TcpConnection, TcpConnectionUninitializedException, tcpConnectionTypes
1212
from .client import TcpClientConnection
1313
from .server import TcpServerConnection
14+
from .pool import ConnectionPool
1415

1516
__all__ = [
1617
'TcpConnection',
1718
'TcpConnectionUninitializedException',
1819
'TcpServerConnection',
1920
'TcpClientConnection',
2021
'tcpConnectionTypes',
22+
'ConnectionPool',
2123
]

proxy/core/connection/client.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ def __init__(
2222
self,
2323
conn: Union[ssl.SSLSocket, socket.socket],
2424
addr: Tuple[str, int],
25-
):
25+
) -> None:
2626
super().__init__(tcpConnectionTypes.CLIENT)
2727
self._conn: Optional[Union[ssl.SSLSocket, socket.socket]] = conn
2828
self.addr: Tuple[str, int] = addr

proxy/core/connection/connection.py

+16-3
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,14 @@ class TcpConnection(ABC):
3939
when reading and writing into the socket.
4040
4141
Implement the connection property abstract method to return
42-
a socket connection object."""
42+
a socket connection object.
43+
"""
4344

44-
def __init__(self, tag: int):
45+
def __init__(self, tag: int) -> None:
46+
self.tag: str = 'server' if tag == tcpConnectionTypes.SERVER else 'client'
4547
self.buffer: List[memoryview] = []
4648
self.closed: bool = False
47-
self.tag: str = 'server' if tag == tcpConnectionTypes.SERVER else 'client'
49+
self._reusable: bool = False
4850

4951
@property
5052
@abstractmethod
@@ -95,3 +97,14 @@ def flush(self) -> int:
9597
del mv
9698
logger.debug('flushed %d bytes to %s' % (sent, self.tag))
9799
return sent
100+
101+
def is_reusable(self) -> bool:
102+
return self._reusable
103+
104+
def mark_inuse(self) -> None:
105+
self._reusable = False
106+
107+
def reset(self) -> None:
108+
assert not self.closed
109+
self._reusable = True
110+
self.buffer = []

0 commit comments

Comments
 (0)