Skip to content

Commit 3da21f0

Browse files
committed
More async/await code
1 parent 344dd5d commit 3da21f0

File tree

2 files changed

+86
-98
lines changed

2 files changed

+86
-98
lines changed

lib/src/connection_pool.dart

Lines changed: 81 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -76,27 +76,26 @@ class ConnectionPool extends Object with _ConnectionHelpers implements Queriable
7676
return c.future;
7777
}
7878

79-
_createConnection(Completer c) {
79+
_createConnection(Completer c) async {
8080
var cnx = new _Connection(this, _pool.length, _maxPacketSize);
8181
cnx.use();
8282
cnx.autoRelease = false;
8383
_pool.add(cnx);
84-
cnx.connect(
85-
host: _host,
86-
port: _port,
87-
user: _user,
88-
password: _password,
89-
db: _db,
90-
useCompression: _useCompression,
91-
useSSL: _useSSL)
92-
.then((_) {
84+
try {
85+
await cnx.connect(
86+
host: _host,
87+
port: _port,
88+
user: _user,
89+
password: _password,
90+
db: _db,
91+
useCompression: _useCompression,
92+
useSSL: _useSSL);
9393
cnx.autoRelease = true;
9494
_log.finest("Logged in on cnx#${cnx.number}");
9595
c.complete(cnx);
96-
})
97-
.catchError((e) {
96+
} catch (e) {
9897
_releaseReuseCompleteError(cnx, c, e);
99-
});
98+
}
10099
}
101100

102101
_removeConnection(_Connection cnx) {
@@ -172,74 +171,62 @@ class ConnectionPool extends Object with _ConnectionHelpers implements Queriable
172171
}
173172
}
174173

175-
Future<Results> query(String sql) {
174+
Future<Results> query(String sql) async {
176175
_log.info("Running query: ${sql}");
177176

178-
return _getConnection()
179-
.then((cnx) {
180-
var c = new Completer<Results>();
181-
_log.fine("Got cnx#${cnx.number} for query");
182-
cnx.processHandler(new _QueryStreamHandler(sql))
183-
.then((results) {
184-
_log.fine("Got query results on #${cnx.number} for: ${sql}");
185-
c.complete(results);
186-
})
187-
.catchError((e) {
188-
_releaseReuseCompleteError(cnx, c, e);
189-
});
190-
return c.future;
191-
});
177+
var cnx = await _getConnection();
178+
var c = new Completer<Results>();
179+
_log.fine("Got cnx#${cnx.number} for query");
180+
try {
181+
var results = await cnx.processHandler(new _QueryStreamHandler(sql));
182+
_log.fine("Got query results on #${cnx.number} for: ${sql}");
183+
return results;
184+
} catch (e) {
185+
_releaseReuseThrow(cnx, e);
186+
}
192187
}
193188

194189
/**
195190
* Pings the server. Returns a [Future] that completes when the server replies.
196191
*/
197-
Future ping() {
192+
Future ping() async {
198193
_log.info("Pinging server");
199194

200-
return _getConnection()
201-
.then((cnx) {
202-
return cnx.processHandler(new _PingHandler())
203-
.then((x) {
204-
_log.fine("Pinged");
205-
return x;
206-
});
207-
});
195+
var cnx = await _getConnection();
196+
var x = await cnx.processHandler(new _PingHandler());
197+
_log.fine("Pinged");
198+
return x;
208199
}
209200

210201
/**
211202
* Sends a debug message to the server. Returns a [Future] that completes
212203
* when the server replies.
213204
*/
214-
Future debug() {
205+
Future debug() async {
215206
_log.info("Sending debug message");
216207

217-
return _getConnection()
218-
.then((cnx) {
219-
var c = new Completer();
220-
cnx.processHandler(new _DebugHandler())
221-
.then((x) {
222-
_log.fine("Message sent");
223-
return x;
224-
})
225-
.catchError((e) {
226-
_releaseReuseCompleteError(cnx, c, e);
227-
});
228-
return c.future;
229-
});
208+
var cnx = await _getConnection();
209+
try {
210+
var x = await cnx.processHandler(new _DebugHandler());
211+
_log.fine("Message sent");
212+
return x;
213+
} catch (e) {
214+
_releaseReuseThrow(cnx, e);
215+
}
230216
}
231217

232-
void _closeQuery(Query q, bool retain) {
218+
_closeQuery(Query q, bool retain) async {
233219
_log.finest("Closing query: ${q.sql}");
234-
for (var cnx in _pool) {
220+
var thePool = new List<_Connection>();
221+
thePool.addAll(_pool); // prevent concurrent modification
222+
for (var cnx in thePool) {
235223
var preparedQuery = cnx.removePreparedQueryFromCache(q.sql);
236224
if (preparedQuery != null) {
237-
_waitUntilReady(cnx).then((_) {
238-
_log.finest("Connection ready - closing query: ${q.sql}");
239-
var handler = new _CloseStatementHandler(preparedQuery.statementHandlerId);
240-
cnx.autoRelease = !retain;
241-
cnx.processHandler(handler, noResponse: true);
242-
});
225+
await _waitUntilReady(cnx);
226+
_log.finest("Connection ready - closing query: ${q.sql}");
227+
var handler = new _CloseStatementHandler(preparedQuery.statementHandlerId);
228+
cnx.autoRelease = !retain;
229+
cnx.processHandler(handler, noResponse: true);
243230
}
244231
}
245232
}
@@ -259,13 +246,11 @@ class ConnectionPool extends Object with _ConnectionHelpers implements Queriable
259246
return c.future;
260247
}
261248

262-
Future<Query> prepare(String sql) {
249+
Future<Query> prepare(String sql) async {
263250
var query = new Query._internal(this, sql);
264-
return query._prepare(false)
265-
.then((preparedQuery) {
266-
_log.info("Got prepared query");
267-
return query;
268-
});
251+
await query._prepare(false);
252+
_log.info("Got prepared query");
253+
return query;
269254
}
270255

271256
/**
@@ -279,30 +264,25 @@ class ConnectionPool extends Object with _ConnectionHelpers implements Queriable
279264
* and [Transaction.rollback] methods to commit and roll back, otherwise
280265
* the connection will not be released.
281266
*/
282-
Future<Transaction> startTransaction({bool consistent: false}) {
267+
Future<Transaction> startTransaction({bool consistent: false}) async {
283268
_log.info("Starting transaction");
284269

285-
return _getConnection()
286-
.then((cnx) {
287-
cnx.inTransaction = true;
288-
var c = new Completer<Transaction>();
289-
var sql;
290-
if (consistent) {
291-
sql = "start transaction with consistent snapshot";
292-
} else {
293-
sql = "start transaction";
294-
}
295-
cnx.processHandler(new _QueryStreamHandler(sql))
296-
.then((results) {
297-
_log.fine("Transaction started on cnx#${cnx.number}");
298-
var transaction = new _TransactionImpl._(cnx, this);
299-
c.complete(transaction);
300-
})
301-
.catchError((e) {
302-
_releaseReuseCompleteError(cnx, c, e);
303-
});
304-
return c.future;
305-
});
270+
var cnx = await _getConnection();
271+
cnx.inTransaction = true;
272+
var c = new Completer<Transaction>();
273+
var sql;
274+
if (consistent) {
275+
sql = "start transaction with consistent snapshot";
276+
} else {
277+
sql = "start transaction";
278+
}
279+
try {
280+
var results = await cnx.processHandler(new _QueryStreamHandler(sql));
281+
_log.fine("Transaction started on cnx#${cnx.number}");
282+
return new _TransactionImpl._(cnx, this);
283+
} catch (e) {
284+
_releaseReuseThrow(cnx, e);
285+
}
306286
}
307287

308288
/**
@@ -318,20 +298,17 @@ class ConnectionPool extends Object with _ConnectionHelpers implements Queriable
318298
* You must use [RetainedConnection.release] when you have finished with the
319299
* connection, otherwise it will not be available in the pool again.
320300
*/
321-
Future<RetainedConnection> getConnection() {
301+
Future<RetainedConnection> getConnection() async {
322302
_log.info("Retaining connection");
323303

324-
return _getConnection()
325-
.then((cnx) {
326-
cnx.inTransaction = true;
327-
return new _RetainedConnectionImpl._(cnx, this);
328-
});
304+
var cnx = await _getConnection();
305+
cnx.inTransaction = true;
306+
return new _RetainedConnectionImpl._(cnx, this);
329307
}
330308

331-
Future<Results> prepareExecute(String sql, List parameters) {
332-
return prepare(sql).then((query) {
333-
return query.execute(parameters);
334-
});
309+
Future<Results> prepareExecute(String sql, List parameters) async {
310+
var query = await prepare(sql);
311+
return query.execute(parameters);
335312
}
336313

337314
// dynamic fieldList(String table, [String column]);
@@ -358,6 +335,13 @@ abstract class _ConnectionHelpers {
358335
c.completeError(e);
359336
}
360337

338+
_releaseReuseThrow(_Connection cnx, dynamic e) {
339+
if (!(e is MySqlException)) {
340+
_removeConnection(cnx);
341+
}
342+
throw e;
343+
}
344+
361345
_removeConnection(cnx);
362346
}
363347

test/integration/two.dart

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,11 @@ void runIntTests2(String user, String password, String db, int port, String host
4747
expect(finished, equals([]));
4848

4949
return Future.wait(futures).then((_) {
50-
expect(finished, equals([1, 2, 3, 4]));
50+
expect(finished, contains(1));
51+
expect(finished, contains(2));
52+
expect(finished, contains(3));
53+
expect(finished, contains(4));
54+
expect(finished, hasLength(4));
5155
});
5256
});
5357

0 commit comments

Comments
 (0)