Skip to content

Commit 62f17bf

Browse files
committed
Merge pull request #58 from jamesots/async
Async
2 parents 96408be + f850163 commit 62f17bf

28 files changed

+876
-1201
lines changed

example/example.dart

Lines changed: 46 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -14,24 +14,17 @@ class Example {
1414

1515
Example(this.pool);
1616

17-
Future run() {
18-
var completer = new Completer();
17+
Future run() async {
1918
// drop the tables if they already exist
20-
dropTables().then((_) {
21-
print("dropped tables");
22-
// then recreate the tables
23-
return createTables();
24-
}).then((_) {
25-
print("created tables");
26-
// add some data
27-
return addData();
28-
}).then((_) {
29-
// and read it back out
30-
return readData();
31-
}).then((_) {
32-
completer.complete(null);
33-
});
34-
return completer.future;
19+
await dropTables();
20+
print("dropped tables");
21+
// then recreate the tables
22+
await createTables();
23+
print("created tables");
24+
// add some data
25+
await addData();
26+
// and read it back out
27+
await readData();
3528
}
3629

3730
Future dropTables() {
@@ -58,53 +51,47 @@ class Example {
5851
return querier.executeQueries();
5952
}
6053

61-
Future addData() {
62-
var completer = new Completer();
63-
pool.prepare("insert into people (name, age) values (?, ?)").then((query) {
64-
print("prepared query 1");
65-
var parameters = [
66-
["Dave", 15],
67-
["John", 16],
68-
["Mavis", 93]
69-
];
70-
return query.executeMulti(parameters);
71-
}).then((results) {
72-
print("executed query 1");
73-
return pool.prepare("insert into pets (name, species, owner_id) values (?, ?, ?)");
74-
}).then((query) {
75-
print("prepared query 2");
76-
var parameters = [
77-
["Rover", "Dog", 1],
78-
["Daisy", "Cow", 2],
79-
["Spot", "Dog", 2]];
54+
Future addData() async {
55+
var query = await pool.prepare("insert into people (name, age) values (?, ?)");
56+
print("prepared query 1");
57+
var parameters = [
58+
["Dave", 15],
59+
["John", 16],
60+
["Mavis", 93]
61+
];
62+
var results = await query.executeMulti(parameters);
63+
64+
print("executed query 1");
65+
query = await pool.prepare("insert into pets (name, species, owner_id) values (?, ?, ?)");
66+
67+
print("prepared query 2");
68+
parameters = [
69+
["Rover", "Dog", 1],
70+
["Daisy", "Cow", 2],
71+
["Spot", "Dog", 2]];
8072
// ["Spot", "D\u0000og", 2]];
81-
return query.executeMulti(parameters);
82-
}).then((results) {
83-
print("executed query 2");
84-
completer.complete(null);
85-
});
86-
return completer.future;
73+
results = await query.executeMulti(parameters);
74+
75+
print("executed query 2");
8776
}
8877

89-
Future readData() {
90-
var completer = new Completer();
78+
Future readData() async {
9179
print("querying");
92-
return pool.query('select p.id, p.name, p.age, t.name, t.species '
80+
var result = await pool.query('select p.id, p.name, p.age, t.name, t.species '
9381
'from people p '
94-
'left join pets t on t.owner_id = p.id').then((result) {
95-
print("got results");
96-
return result.forEach((row) {
97-
if (row[3] == null) {
98-
print("ID: ${row[0]}, Name: ${row[1]}, Age: ${row[2]}, No Pets");
99-
} else {
100-
print("ID: ${row[0]}, Name: ${row[1]}, Age: ${row[2]}, Pet Name: ${row[3]}, Pet Species ${row[4]}");
101-
}
102-
});
82+
'left join pets t on t.owner_id = p.id');
83+
print("got results");
84+
return result.forEach((row) {
85+
if (row[3] == null) {
86+
print("ID: ${row[0]}, Name: ${row[1]}, Age: ${row[2]}, No Pets");
87+
} else {
88+
print("ID: ${row[0]}, Name: ${row[1]}, Age: ${row[2]}, Pet Name: ${row[3]}, Pet Species ${row[4]}");
89+
}
10390
});
10491
}
10592
}
10693

107-
void main() {
94+
main() async {
10895
OptionsFile options = new OptionsFile('connection.options');
10996
String user = options.getString('user');
11097
String password = options.getString('password');
@@ -120,9 +107,8 @@ void main() {
120107
var example = new Example(pool);
121108
// run the example
122109
print("running example");
123-
example.run().then((_) {
124-
// finally, close the connection
125-
print("K THNX BYE!");
126-
pool.close();
127-
});
110+
await example.run();
111+
// finally, close the connection
112+
print("K THNX BYE!");
113+
pool.closeConnectionsNow();
128114
}

lib/src/buffered_socket.dart

Lines changed: 23 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -53,31 +53,26 @@ class BufferedSocket {
5353
_closed = true;
5454
}
5555
}
56+
57+
static defaultSocketFactory(host, port) => RawSocket.connect(host, port);
5658

57-
/**
58-
* [socketFactory] is for unit testing.
59-
*/
6059
static Future<BufferedSocket> connect(String host, int port,
6160
{DataReadyHandler onDataReady,
6261
DoneHandler onDone,
6362
ErrorHandler onError,
64-
SocketFactory socketFactory,
65-
OnConnection onConnection}) {
66-
var c = new Completer<BufferedSocket>();
67-
var future;
68-
if (socketFactory != null) {
69-
future = socketFactory(host, port);
70-
} else {
71-
future = RawSocket.connect(host, port);
63+
SocketFactory socketFactory : defaultSocketFactory,
64+
OnConnection onConnection}) async {
65+
try {
66+
var socket;
67+
socket = await socketFactory(host, port);
68+
var bufferedSocket = new BufferedSocket._(socket, onDataReady, onDone, onError);
69+
if (onConnection != null) {
70+
onConnection(bufferedSocket);
71+
}
72+
return bufferedSocket;
73+
} catch (e) {
74+
onError(e);
7275
}
73-
future.then((socket) {
74-
var bufferedSocket = new BufferedSocket._(socket, onDataReady, onDone, onError);
75-
if (onConnection != null) {
76-
onConnection(bufferedSocket);
77-
}
78-
return c.complete(bufferedSocket);
79-
}, onError: onError);
80-
return c.future;
8176
}
8277

8378
void _onData(RawSocketEvent event) {
@@ -188,16 +183,15 @@ class BufferedSocket {
188183
_closed = true;
189184
}
190185

191-
Future startSSL() {
186+
Future startSSL() async {
192187
log.fine("Securing socket");
193-
return RawSecureSocket.secure(_socket, subscription: _subscription,
194-
onBadCertificate: (cert) => true).then((socket) {
195-
log.fine("Socket is secure");
196-
_socket = socket;
197-
_subscription = _socket.listen(_onData, onError: _onSocketError,
198-
onDone: _onSocketDone, cancelOnError: true);
199-
_socket.writeEventsEnabled = true;
200-
_socket.readEventsEnabled = true;
201-
});
188+
var socket = await RawSecureSocket.secure(_socket, subscription: _subscription,
189+
onBadCertificate: (cert) => true);
190+
log.fine("Socket is secure");
191+
_socket = socket;
192+
_subscription = _socket.listen(_onData, onError: _onSocketError,
193+
onDone: _onSocketDone, cancelOnError: true);
194+
_socket.writeEventsEnabled = true;
195+
_socket.readEventsEnabled = true;
202196
}
203197
}

lib/src/connection.dart

Lines changed: 36 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ class _Connection {
8888
* is succesful.
8989
*/
9090
Future connect({String host, int port, String user,
91-
String password, String db, bool useCompression, bool useSSL}) {
91+
String password, String db, bool useCompression, bool useSSL}) async {
9292
if (_socket != null) {
9393
throw new MySqlClientError._("Cannot connect to server while a connection is already open");
9494
}
@@ -120,10 +120,8 @@ class _Connection {
120120
//TODO Only useDatabase if connection actually ended up as an SSL connection?
121121
//TODO On the other hand, it doesn't hurt to call useDatabase anyway.
122122
if (useSSL) {
123-
return _completer.future
124-
.then((_) {
125-
return _useDatabase(db);
126-
});
123+
await _completer.future;
124+
return _useDatabase(db);
127125
} else {
128126
return _completer.future;
129127
}
@@ -134,24 +132,27 @@ class _Connection {
134132
return processHandler(handler);
135133
}
136134

137-
void _readPacket() {
135+
_readPacket() async {
138136
log.fine("readPacket readyForHeader=${_readyForHeader}");
139137
if (_readyForHeader) {
140138
_readyForHeader = false;
141-
_socket.readBuffer(_headerBuffer).then(_handleHeader);
139+
var buffer = await _socket.readBuffer(_headerBuffer);
140+
_handleHeader(buffer);
142141
}
143142
}
144143

145-
void _handleHeader(buffer) {
144+
_handleHeader(buffer) async {
146145
_dataSize = buffer[0] + (buffer[1] << 8) + (buffer[2] << 16);
147146
_packetNumber = buffer[3];
148147
log.fine("about to read $_dataSize bytes for packet ${_packetNumber}");
149148
_dataBuffer = new Buffer(_dataSize);
150149
log.fine("buffer size=${_dataBuffer.length}");
151150
if (_dataSize == 0xffffff || _largePacketBuffers.length > 0) {
152-
_socket.readBuffer(_dataBuffer).then(_handleMoreData);
151+
var buffer = await _socket.readBuffer(_dataBuffer);
152+
_handleMoreData(buffer);
153153
} else {
154-
_socket.readBuffer(_dataBuffer).then(_dataHandler);
154+
var buffer = await _socket.readBuffer(_dataBuffer);
155+
_dataHandler(buffer);
155156
}
156157
}
157158

@@ -176,7 +177,7 @@ class _Connection {
176177
}
177178
}
178179

179-
void _handleData(buffer) {
180+
_handleData(buffer) async {
180181
_readyForHeader = true;
181182
//log.fine("read all data: ${_dataBuffer._list}");
182183
//log.fine("read all data: ${Buffer.listChars(_dataBuffer._list)}");
@@ -191,19 +192,16 @@ class _Connection {
191192
if (response.nextHandler != null) {
192193
// if handler.processResponse() returned a Handler, pass control to that handler now
193194
_handler = response.nextHandler;
194-
_sendBuffer(_handler.createRequest()).then((_) {
195-
if (_useSSL && _handler is _SSLHandler) {
196-
log.fine("Use SSL");
197-
_socket.startSSL().then((_) {
198-
_secure = true;
199-
_handler = (_handler as _SSLHandler).nextHandler;
200-
_sendBuffer(_handler.createRequest()).then((_) {
201-
log.fine("Sent buffer");
202-
});
203-
});
204-
return;
205-
}
206-
});
195+
await _sendBuffer(_handler.createRequest());
196+
if (_useSSL && _handler is _SSLHandler) {
197+
log.fine("Use SSL");
198+
await _socket.startSSL();
199+
_secure = true;
200+
_handler = (_handler as _SSLHandler).nextHandler;
201+
await _sendBuffer(_handler.createRequest());
202+
log.fine("Sent buffer");
203+
return;
204+
}
207205
}
208206
if (response.finished) {
209207
_finishAndReuse();
@@ -264,24 +262,22 @@ class _Connection {
264262
}
265263
}
266264

267-
Future<Buffer> _sendBufferPart(Buffer buffer, int start) {
265+
Future<Buffer> _sendBufferPart(Buffer buffer, int start) async {
268266
var len = math.min(buffer.length - start, 0xFFFFFF);
269267

270268
_headerBuffer[0] = len & 0xFF;
271269
_headerBuffer[1] = (len & 0xFF00) >> 8;
272270
_headerBuffer[2] = (len & 0xFF0000) >> 16;
273271
_headerBuffer[3] = ++_packetNumber;
274272
log.fine("sending header, packet $_packetNumber");
275-
return _socket.writeBuffer(_headerBuffer).then((_) {
276-
log.fine("sendBuffer body, buffer length=${buffer.length}, start=$start, len=$len");
277-
return _socket.writeBufferPart(buffer, start, len);
278-
}).then((_) {
279-
if (len == 0xFFFFFF) {
280-
return _sendBufferPart(buffer, start + len);
281-
} else {
282-
return buffer;
283-
}
284-
});
273+
await _socket.writeBuffer(_headerBuffer);
274+
log.fine("sendBuffer body, buffer length=${buffer.length}, start=$start, len=$len");
275+
await _socket.writeBufferPart(buffer, start, len);
276+
if (len == 0xFFFFFF) {
277+
return _sendBufferPart(buffer, start + len);
278+
} else {
279+
return buffer;
280+
}
285281
}
286282

287283
/**
@@ -290,7 +286,7 @@ class _Connection {
290286
*
291287
* Returns a future
292288
*/
293-
Future<dynamic> processHandler(_Handler handler, {bool noResponse:false}) {
289+
Future<dynamic> processHandler(_Handler handler, {bool noResponse:false}) async {
294290
if (_handler != null) {
295291
throw new MySqlClientError._("Connection #$number cannot process a request for $handler while a request is already in progress for $_handler");
296292
}
@@ -300,11 +296,10 @@ class _Connection {
300296
if (!noResponse) {
301297
_handler = handler;
302298
}
303-
_sendBuffer(handler.createRequest()).then((_) {
304-
if (noResponse) {
305-
_finishAndReuse();
306-
}
307-
});
299+
await _sendBuffer(handler.createRequest());
300+
if (noResponse) {
301+
_finishAndReuse();
302+
}
308303
return _completer.future;
309304
}
310305

0 commit comments

Comments
 (0)