@@ -18,12 +18,18 @@ class ConnectionPool extends Object with _ConnectionHelpers implements Queriable
18
18
final int _maxPacketSize;
19
19
int _max;
20
20
21
- /*
21
+ /*
22
22
* The pool maintains a queue of connection requests. When a connection completes, if there
23
23
* is a connection in the queue then it is 'activated' - that is, the future returned
24
24
* by _getConnection() completes.
25
25
*/
26
26
final Queue <Completer <_Connection >> _pendingConnections;
27
+ /*
28
+ * If you need a particular connection, put an entry in _requestedConnections. As soon as
29
+ * that connection is free then the completer completes. _requestedConnections is
30
+ * checked before _pendingConnections.
31
+ */
32
+ final Map <_Connection , Queue <Completer >> _requestedConnections;
27
33
final List <_Connection > _pool;
28
34
29
35
/**
@@ -40,6 +46,7 @@ class ConnectionPool extends Object with _ConnectionHelpers implements Queriable
40
46
// bool useCompression: false,
41
47
bool useSSL: false }) :
42
48
_pendingConnections = new Queue <Completer <_Connection >>(),
49
+ _requestedConnections = new Map <_Connection , Queue <Completer >>(),
43
50
_pool = new List <_Connection >(),
44
51
_host = host,
45
52
_port = port,
@@ -125,6 +132,14 @@ class ConnectionPool extends Object with _ConnectionHelpers implements Queriable
125
132
return ;
126
133
}
127
134
135
+ if (_requestedConnections.containsKey (cnx) && _requestedConnections[cnx].length > 0 ) {
136
+ _log.finest ("Reusing cnx#${cnx .number } for a requested operation" );
137
+ var c = _requestedConnections[cnx].removeFirst ();
138
+ cnx.use ();
139
+ c.complete (cnx);
140
+ return ;
141
+ }
142
+
128
143
if (_pendingConnections.length > 0 ) {
129
144
_log.finest ("Reusing cnx#${cnx .number } for a queued operation" );
130
145
var c = _pendingConnections.removeFirst ();
@@ -217,13 +232,17 @@ class ConnectionPool extends Object with _ConnectionHelpers implements Queriable
217
232
}
218
233
}
219
234
235
+ // Close a prepared query on all connections which have this query.
236
+ // This may take some time if it has to wait a long time for a
237
+ // connection to become free.
220
238
_closeQuery (Query q, bool retain) async {
221
239
_log.finest ("Closing query: ${q .sql }" );
222
240
var thePool = new List <_Connection >();
223
241
thePool.addAll (_pool); // prevent concurrent modification
224
242
for (var cnx in thePool) {
225
243
var preparedQuery = cnx.removePreparedQueryFromCache (q.sql);
226
244
if (preparedQuery != null ) {
245
+ _log.finest ("Connection not ready" );
227
246
await _waitUntilReady (cnx);
228
247
_log.finest ("Connection ready - closing query: ${q .sql }" );
229
248
var handler = new _CloseStatementHandler (preparedQuery.statementHandlerId);
@@ -233,17 +252,22 @@ class ConnectionPool extends Object with _ConnectionHelpers implements Queriable
233
252
}
234
253
}
235
254
236
- /**
237
- * The future returned by [_waitUntilReady] fires when all queued operations in the pool
238
- * have completed, and the connection is free to be used again .
255
+ /**
256
+ * The future returned by [_waitUntilReady] fires when the connection is next available
257
+ * to be used.
239
258
*/
240
259
Future <_Connection > _waitUntilReady (_Connection cnx) {
241
260
var c = new Completer <_Connection >();
242
261
if (! cnx.inUse) {
262
+ // connection isn't in use, so use it straight away
243
263
cnx.use ();
244
264
c.complete (cnx);
245
265
} else {
246
- _pendingConnections.add (c);
266
+ // Connection is in use, so request we get it the next time it's available.
267
+ if (! _requestedConnections.containsKey (cnx)) {
268
+ _requestedConnections[cnx] = new Queue <Completer >();
269
+ }
270
+ _requestedConnections[cnx].add (c);
247
271
}
248
272
return c.future;
249
273
}
0 commit comments