@@ -15,12 +15,14 @@ const process = require('process');
15
15
16
16
const argparse = require ( 'argparse' ) ;
17
17
const pg = require ( 'pg' ) ;
18
+ const pgcopy = require ( 'pg-copy-streams' ) . from ;
19
+ const csvwriter = require ( 'csv-write-stream' )
18
20
19
21
20
22
function _connect ( driverName , args , callback ) {
21
23
let driver = null ;
22
24
23
- if ( driverName == 'pg' ) {
25
+ if ( driverName == 'pg-js ' ) {
24
26
driver = pg ;
25
27
} else if ( driverName == 'pg-native' ) {
26
28
driver = pg . native ;
@@ -111,84 +113,195 @@ function runner(args, querydata) {
111
113
var run_start = _now ( ) ;
112
114
var complete = 0 ;
113
115
var stmt = { text : query , values : query_args } ;
116
+ var copy = null ;
117
+
118
+ if ( query . startsWith ( 'COPY ' ) ) {
119
+ var m = / C O P Y ( \w + ) \s * \( \s * ( (?: \w + ) (?: , \s * \w + ) * ) \s * \) / . exec ( query )
120
+ if ( m == null ) {
121
+ throw "Could not parse COPY query" ;
122
+ }
123
+
124
+ var rowcount = query_args [ 0 ] [ "count" ] ;
125
+ var row = query_args [ 0 ] [ "row" ] ;
126
+ var rows = Array ( rowcount ) ;
127
+ for ( var i = 0 ; i < rowcount ; i += 1 ) {
128
+ rows [ i ] = row ;
129
+ }
130
+
131
+ copy = {
132
+ "table" : m [ 1 ] ,
133
+ "columns" : m [ 2 ] . split ( "," ) . map ( v => v . trim ( ) ) ,
134
+ "rows" : rows
135
+ } ;
136
+ }
137
+
138
+ if ( copy != null && driver == 'pg-native' ) {
139
+ cb ( { code : 3 , msg : "pg-native does not support COPY" } ) ;
140
+ }
114
141
115
142
if ( use_prepared_stmt ) {
116
143
stmt . name = '_pgbench_query' ;
117
144
}
118
145
119
- for ( var i = 0 ; i < concurrency ; i += 1 ) {
120
- _connect ( driver , args , function ( err , conn ) {
146
+ var query_runner = function ( err , conn ) {
147
+ if ( err ) {
148
+ throw err ;
149
+ }
150
+
151
+ var queries = 0 ;
152
+ var rows = 0 ;
153
+ var latency_stats = new Float64Array ( timeout_in_us / 10 ) ;
154
+ var min_latency = Infinity ;
155
+ var max_latency = 0.0 ;
156
+ var duration_in_us = run_duration * 1000000 ;
157
+ var req_start ;
158
+ var req_time ;
159
+
160
+ var _cb = function ( err , result ) {
121
161
if ( err ) {
122
162
throw err ;
123
163
}
124
164
125
- var queries = 0 ;
126
- var rows = 0 ;
127
- var latency_stats = new Float64Array ( timeout_in_us / 10 ) ;
128
- var min_latency = Infinity ;
129
- var max_latency = 0.0 ;
130
- var duration_in_us = run_duration * 1000000 ;
131
- var req_start ;
132
- var req_time ;
133
-
134
- var _cb = function ( err , result ) {
135
- if ( err ) {
136
- throw err ;
137
- }
165
+ // Request time in tens of microseconds
166
+ req_time = Math . round ( ( _now ( ) - req_start ) / 10 ) ;
138
167
139
- // Request time in tens of microseconds
140
- req_time = Math . round ( ( _now ( ) - req_start ) / 10 ) ;
168
+ if ( req_time > max_latency ) {
169
+ max_latency = req_time ;
170
+ }
141
171
142
- if ( req_time > max_latency ) {
143
- max_latency = req_time ;
144
- }
172
+ if ( req_time < min_latency ) {
173
+ min_latency = req_time ;
174
+ }
175
+
176
+ latency_stats [ req_time ] += 1 ;
177
+ queries += 1 ;
178
+ if ( Array . isArray ( result ) ) {
179
+ result = result [ result . length - 1 ] ;
180
+ }
145
181
146
- if ( req_time < min_latency ) {
147
- min_latency = req_time ;
182
+ rows += result . rows . length ;
183
+
184
+ if ( _now ( ) - run_start < duration_in_us ) {
185
+ req_start = _now ( ) ;
186
+ conn . query ( stmt , _cb ) ;
187
+ } else {
188
+ conn . end ( ) ;
189
+ if ( report ) {
190
+ _report_results ( queries , rows , latency_stats ,
191
+ min_latency , max_latency ,
192
+ run_start ) ;
148
193
}
149
194
150
- latency_stats [ req_time ] += 1 ;
151
- queries += 1 ;
152
- rows += result . rows . length ;
153
-
154
- if ( _now ( ) - run_start < duration_in_us ) {
155
- req_start = _now ( ) ;
156
- conn . query ( stmt , _cb ) ;
157
- } else {
158
- conn . end ( ) ;
159
- if ( report ) {
160
- _report_results ( queries , rows , latency_stats ,
161
- min_latency , max_latency ,
162
- run_start ) ;
163
- }
164
-
165
- complete += 1 ;
166
- if ( complete == concurrency && cb ) {
167
- cb ( ) ;
168
- }
195
+ complete += 1 ;
196
+ if ( complete == concurrency && cb ) {
197
+ cb ( ) ;
169
198
}
170
- } ;
199
+ }
200
+ } ;
201
+
202
+ req_start = _now ( ) ;
203
+ conn . query ( stmt , _cb ) ;
204
+ } ;
205
+
206
+ var copy_runner = function ( err , conn ) {
207
+ if ( err ) {
208
+ throw err ;
209
+ }
210
+
211
+ var queries = 0 ;
212
+ var rows = 0 ;
213
+ var latency_stats = new Float64Array ( timeout_in_us / 10 ) ;
214
+ var min_latency = Infinity ;
215
+ var max_latency = 0.0 ;
216
+ var duration_in_us = run_duration * 1000000 ;
217
+ var req_start ;
218
+ var req_time ;
171
219
220
+ var _start_copy = function ( _cb ) {
172
221
req_start = _now ( ) ;
173
- conn . query ( stmt , _cb ) ;
174
- } ) ;
222
+ var csvstream = csvwriter ( {
223
+ sendHeaders : false ,
224
+ separator : '\t' ,
225
+ headers : copy . columns
226
+ } ) ;
227
+ var copystream = conn . query ( pgcopy ( stmt . text ) ) ;
228
+ csvstream . pipe ( copystream ) ;
229
+ copystream . on ( 'end' , _cb ) ;
230
+ copystream . on ( 'error' , _cb ) ;
231
+ for ( var row of copy . rows ) {
232
+ csvstream . write ( row ) ;
233
+ }
234
+ csvstream . end ( ) ;
235
+ }
236
+
237
+ var _cb = function ( err , result ) {
238
+ if ( err ) {
239
+ throw err ;
240
+ }
241
+
242
+ // Request time in tens of microseconds
243
+ req_time = Math . round ( ( _now ( ) - req_start ) / 10 ) ;
244
+
245
+ if ( req_time > max_latency ) {
246
+ max_latency = req_time ;
247
+ }
248
+
249
+ if ( req_time < min_latency ) {
250
+ min_latency = req_time ;
251
+ }
252
+
253
+ latency_stats [ req_time ] += 1 ;
254
+ queries += 1 ;
255
+ rows += copy . rows . length ;
256
+
257
+ if ( _now ( ) - run_start < duration_in_us ) {
258
+ _start_copy ( _cb ) ;
259
+ } else {
260
+ conn . end ( ) ;
261
+ if ( report ) {
262
+ _report_results ( queries , rows , latency_stats ,
263
+ min_latency , max_latency ,
264
+ run_start ) ;
265
+ }
266
+
267
+ complete += 1 ;
268
+ if ( complete == concurrency && cb ) {
269
+ cb ( ) ;
270
+ }
271
+ }
272
+ } ;
273
+
274
+ _start_copy ( _cb ) ;
275
+ } ;
276
+
277
+ var runner = copy != null ? copy_runner : query_runner ;
278
+
279
+ for ( var i = 0 ; i < concurrency ; i += 1 ) {
280
+ _connect ( driver , args , runner ) ;
175
281
}
176
282
}
177
283
178
284
function _setup ( cb ) {
179
285
if ( setup_query ) {
180
286
// pg-native does not like multiple statements in queries
181
- _do_run ( 'pg' , setup_query , [ ] , 1 , 0 , false , false , cb ) ;
287
+ _do_run ( 'pg-js ' , setup_query , [ ] , 1 , 0 , false , false , cb ) ;
182
288
} else {
183
289
if ( cb ) {
184
290
cb ( ) ;
185
291
}
186
292
}
187
293
}
188
294
295
+ function _exit ( err ) {
296
+ if ( err ) {
297
+ console . error ( err . msg ) ;
298
+ process . exit ( err . code ) ;
299
+ }
300
+ }
301
+
189
302
function _teardown ( cb ) {
190
303
if ( teardown_query ) {
191
- _do_run ( 'pg' , teardown_query , [ ] , 1 , 0 , false , false , cb ) ;
304
+ _do_run ( 'pg-js ' , teardown_query , [ ] , 1 , 0 , false , false , cb ) ;
192
305
} else {
193
306
if ( cb ) {
194
307
cb ( ) ;
@@ -198,7 +311,7 @@ function runner(args, querydata) {
198
311
199
312
function _run ( ) {
200
313
_do_run ( args . driver , query , query_args , args . concurrency , duration ,
201
- true , true , _teardown ) ;
314
+ true , true , ( err ) => _teardown ( ( ) => _exit ( err ) ) ) ;
202
315
}
203
316
204
317
function _warmup_and_run ( ) {
@@ -258,7 +371,7 @@ function main() {
258
371
parser . addArgument (
259
372
'driver' ,
260
373
{ type : String , help : 'driver implementation to use' ,
261
- choices : [ 'pg' , 'pg-native' ] } )
374
+ choices : [ 'pg-js ' , 'pg-native' ] } )
262
375
parser . addArgument (
263
376
'queryfile' ,
264
377
{ type : String ,
0 commit comments