Skip to content

Commit 7bf2cc4

Browse files
SubCoder1Oscar Pacheco
authored andcommitted
BUG#36733242: Contribution: Add Connection Pooling Support for AsyncIO Connector
This enhancement patch introduces connection pooling support for asynchronous type connector. Change-Id: I37aa396c2580138eb12b327efe9ac5ffc3951e3d
1 parent 43263cd commit 7bf2cc4

File tree

10 files changed

+1460
-259
lines changed

10 files changed

+1460
-259
lines changed

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ v9.4.0
2424
- BUG#37642447: The license type is missing from RPM package
2525
- BUG#37627508: mysql/connector python fetchmany() has an off by one bug when argument given as 1
2626
- BUG#37047789: Python connector does not support Django enum
27+
- BUG#36733242: Contribution: Add Connection Pooling Support for AsyncIO Connector
2728
- BUG#36452514: Missing version info resources
2829
- BUG#34950958: MySQL Python Connector doesn't work with ssh in the same process
2930
- BUG#34844347: Freezes on connection via sshtunnel

mysql-connector-python/lib/mysql/connector/abstracts.py

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,6 @@
9090
Error,
9191
InterfaceError,
9292
NotSupportedError,
93-
OperationalError,
9493
ProgrammingError,
9594
)
9695
from .opentelemetry.constants import (
@@ -1893,6 +1892,7 @@ def start_transaction(
18931892
query += " WITH CONSISTENT SNAPSHOT"
18941893
self.cmd_query(query)
18951894

1895+
@abstractmethod
18961896
def reset_session(
18971897
self,
18981898
user_variables: Optional[Dict[str, Any]] = None,
@@ -1923,33 +1923,6 @@ def reset_session(
19231923
>>> cnx.reset_session(user_variables, session_variables)
19241924
```
19251925
"""
1926-
if not self.is_connected():
1927-
raise OperationalError("MySQL Connection not available")
1928-
1929-
try:
1930-
self.cmd_reset_connection()
1931-
except (NotSupportedError, NotImplementedError):
1932-
if self._compress:
1933-
raise NotSupportedError(
1934-
"Reset session is not supported with compression for "
1935-
"MySQL server version 5.7.2 or earlier"
1936-
) from None
1937-
self.cmd_change_user(
1938-
self._user,
1939-
self._password,
1940-
self._database,
1941-
self._charset_id,
1942-
)
1943-
1944-
if user_variables or session_variables:
1945-
cur = self.cursor()
1946-
if user_variables:
1947-
for key, value in user_variables.items():
1948-
cur.execute(f"SET @`{key}` = {value}")
1949-
if session_variables:
1950-
for key, value in session_variables.items():
1951-
cur.execute(f"SET SESSION `{key}` = {value}")
1952-
cur.close()
19531926

19541927
@deprecated(DEPRECATED_METHOD_WARNING.format(property_name="converter_class"))
19551928
def set_converter_class(self, convclass: Optional[Type[MySQLConverter]]) -> None:
Lines changed: 10 additions & 225 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright (c) 2023, 2024, Oracle and/or its affiliates.
1+
# Copyright (c) 2023, 2025, Oracle and/or its affiliates.
22
#
33
# This program is free software; you can redistribute it and/or modify
44
# it under the terms of the GNU General Public License, version 2.0, as
@@ -28,228 +28,13 @@
2828

2929
"""MySQL Connector/Python - MySQL driver written in Python."""
3030

31-
__all__ = ["CMySQLConnection", "MySQLConnection", "connect"]
31+
from .connection import MySQLConnection, MySQLConnectionAbstract
32+
from .pooling import MySQLConnectionPool, PooledMySQLConnection, connect
3233

33-
import random
34-
35-
from typing import Any
36-
37-
from ..constants import DEFAULT_CONFIGURATION
38-
from ..errors import Error, InterfaceError, ProgrammingError
39-
from ..pooling import ERROR_NO_CEXT
40-
from .abstracts import MySQLConnectionAbstract
41-
from .connection import MySQLConnection
42-
43-
try:
44-
import dns.exception
45-
import dns.resolver
46-
except ImportError:
47-
HAVE_DNSPYTHON = False
48-
else:
49-
HAVE_DNSPYTHON = True
50-
51-
52-
try:
53-
from .connection_cext import CMySQLConnection
54-
except ImportError:
55-
CMySQLConnection = None
56-
57-
58-
async def connect(*args: Any, **kwargs: Any) -> MySQLConnectionAbstract:
59-
"""Creates or gets a MySQL connection object.
60-
61-
In its simpliest form, `connect()` will open a connection to a
62-
MySQL server and return a `MySQLConnectionAbstract` subclass
63-
object such as `MySQLConnection` or `CMySQLConnection`.
64-
65-
When any connection pooling arguments are given, for example `pool_name`
66-
or `pool_size`, a pool is created or a previously one is used to return
67-
a `PooledMySQLConnection`.
68-
69-
Args:
70-
*args: N/A.
71-
**kwargs: For a complete list of possible arguments, see [1]. If no arguments
72-
are given, it uses the already configured or default values.
73-
74-
Returns:
75-
A `MySQLConnectionAbstract` subclass instance (such as `MySQLConnection` or
76-
a `CMySQLConnection`) instance.
77-
78-
Examples:
79-
A connection with the MySQL server can be established using either the
80-
`mysql.connector.connect()` method or a `MySQLConnectionAbstract` subclass:
81-
```
82-
>>> from mysql.connector.aio import MySQLConnection, HAVE_CEXT
83-
>>>
84-
>>> cnx1 = await mysql.connector.aio.connect(user='joe', database='test')
85-
>>> cnx2 = MySQLConnection(user='joe', database='test')
86-
>>> await cnx2.connect()
87-
>>>
88-
>>> cnx3 = None
89-
>>> if HAVE_CEXT:
90-
>>> from mysql.connector.aio import CMySQLConnection
91-
>>> cnx3 = CMySQLConnection(user='joe', database='test')
92-
```
93-
94-
References:
95-
[1]: https://dev.mysql.com/doc/connector-python/en/connector-python-connectargs.html
96-
"""
97-
# DNS SRV
98-
dns_srv = kwargs.pop("dns_srv") if "dns_srv" in kwargs else False
99-
100-
if not isinstance(dns_srv, bool):
101-
raise InterfaceError("The value of 'dns-srv' must be a boolean")
102-
103-
if dns_srv:
104-
if not HAVE_DNSPYTHON:
105-
raise InterfaceError(
106-
"MySQL host configuration requested DNS "
107-
"SRV. This requires the Python dnspython "
108-
"module. Please refer to documentation"
109-
)
110-
if "unix_socket" in kwargs:
111-
raise InterfaceError(
112-
"Using Unix domain sockets with DNS SRV lookup is not allowed"
113-
)
114-
if "port" in kwargs:
115-
raise InterfaceError(
116-
"Specifying a port number with DNS SRV lookup is not allowed"
117-
)
118-
if "failover" in kwargs:
119-
raise InterfaceError(
120-
"Specifying multiple hostnames with DNS SRV look up is not allowed"
121-
)
122-
if "host" not in kwargs:
123-
kwargs["host"] = DEFAULT_CONFIGURATION["host"]
124-
125-
try:
126-
srv_records = dns.resolver.query(kwargs["host"], "SRV")
127-
except dns.exception.DNSException:
128-
raise InterfaceError(
129-
f"Unable to locate any hosts for '{kwargs['host']}'"
130-
) from None
131-
132-
failover = []
133-
for srv in srv_records:
134-
failover.append(
135-
{
136-
"host": srv.target.to_text(omit_final_dot=True),
137-
"port": srv.port,
138-
"priority": srv.priority,
139-
"weight": srv.weight,
140-
}
141-
)
142-
143-
failover.sort(key=lambda x: (x["priority"], -x["weight"]))
144-
kwargs["failover"] = [
145-
{"host": srv["host"], "port": srv["port"]} for srv in failover
146-
]
147-
148-
# Failover
149-
if "failover" in kwargs:
150-
return await _get_failover_connection(**kwargs)
151-
152-
# Use C Extension by default
153-
use_pure = kwargs.get("use_pure", False)
154-
if "use_pure" in kwargs:
155-
del kwargs["use_pure"] # Remove 'use_pure' from kwargs
156-
if not use_pure and CMySQLConnection is None:
157-
raise ImportError(ERROR_NO_CEXT)
158-
159-
if CMySQLConnection and not use_pure:
160-
cnx = CMySQLConnection(*args, **kwargs)
161-
else:
162-
cnx = MySQLConnection(*args, **kwargs)
163-
await cnx.connect()
164-
return cnx
165-
166-
167-
async def _get_failover_connection(**kwargs: Any) -> MySQLConnectionAbstract:
168-
"""Return a MySQL connection and try to failover if needed.
169-
170-
An InterfaceError is raise when no MySQL is available. ValueError is
171-
raised when the failover server configuration contains an illegal
172-
connection argument. Supported arguments are user, password, host, port,
173-
unix_socket and database. ValueError is also raised when the failover
174-
argument was not provided.
175-
176-
Returns MySQLConnection instance.
177-
"""
178-
config = kwargs.copy()
179-
try:
180-
failover = config["failover"]
181-
except KeyError:
182-
raise ValueError("failover argument not provided") from None
183-
del config["failover"]
184-
185-
support_cnx_args = set(
186-
[
187-
"user",
188-
"password",
189-
"host",
190-
"port",
191-
"unix_socket",
192-
"database",
193-
"pool_name",
194-
"pool_size",
195-
"priority",
196-
]
197-
)
198-
199-
# First check if we can add all use the configuration
200-
priority_count = 0
201-
for server in failover:
202-
diff = set(server.keys()) - support_cnx_args
203-
if diff:
204-
arg = "s" if len(diff) > 1 else ""
205-
lst = ", ".join(diff)
206-
raise ValueError(
207-
f"Unsupported connection argument {arg} in failover: {lst}"
208-
)
209-
if hasattr(server, "priority"):
210-
priority_count += 1
211-
212-
server["priority"] = server.get("priority", 100)
213-
if server["priority"] < 0 or server["priority"] > 100:
214-
raise InterfaceError(
215-
"Priority value should be in the range of 0 to 100, "
216-
f"got : {server['priority']}"
217-
)
218-
if not isinstance(server["priority"], int):
219-
raise InterfaceError(
220-
"Priority value should be an integer in the range of 0 to "
221-
f"100, got : {server['priority']}"
222-
)
223-
224-
if 0 < priority_count < len(failover):
225-
raise ProgrammingError(
226-
"You must either assign no priority to any "
227-
"of the routers or give a priority for "
228-
"every router"
229-
)
230-
231-
server_directory = {}
232-
server_priority_list = []
233-
for server in sorted(failover, key=lambda x: x["priority"], reverse=True):
234-
if server["priority"] not in server_directory:
235-
server_directory[server["priority"]] = [server]
236-
server_priority_list.append(server["priority"])
237-
else:
238-
server_directory[server["priority"]].append(server)
239-
240-
for priority in server_priority_list:
241-
failover_list = server_directory[priority]
242-
for _ in range(len(failover_list)):
243-
last = len(failover_list) - 1
244-
index = random.randint(0, last)
245-
server = failover_list.pop(index)
246-
new_config = config.copy()
247-
new_config.update(server)
248-
new_config.pop("priority", None)
249-
try:
250-
return await connect(**new_config)
251-
except Error:
252-
# If we failed to connect, we try the next server
253-
pass
254-
255-
raise InterfaceError("Unable to connect to any of the target hosts")
34+
__all__ = [
35+
"MySQLConnection",
36+
"connect",
37+
"MySQLConnectionAbstract",
38+
"MySQLConnectionPool",
39+
"PooledMySQLConnection",
40+
]

0 commit comments

Comments
 (0)