@@ -50,20 +50,23 @@ protected override int RawExecuteAffrows()
50
50
{
51
51
var affrows = 0 ;
52
52
Exception exception = null ;
53
- Aop . CurdBeforeEventArgs before = null ;
54
- if ( _source . Count > 1 )
53
+ Aop . CurdBeforeEventArgs before = null ;
54
+ if ( _source . Count > 1 )
55
55
{
56
56
try
57
57
{
58
58
before = new Aop . CurdBeforeEventArgs ( _table . Type , _table , Aop . CurdType . Insert , null , _params ) ;
59
59
_orm . Aop . CurdBeforeHandler ? . Invoke ( this , before ) ;
60
- using var bulkCopyInterface = new ClickHouseBulkCopy ( _orm . Ado . MasterPool . Get ( ) . Value as ClickHouseConnection )
60
+ using ( var conn = _orm . Ado . MasterPool . Get ( ) )
61
61
{
62
- DestinationTableName = _table . DbName ,
63
- BatchSize = _source . Count
64
- } ;
65
- var data = ToDataTable ( ) ;
66
- bulkCopyInterface . WriteToServerAsync ( data , default ) . Wait ( ) ;
62
+ using var bulkCopyInterface = new ClickHouseBulkCopy ( conn . Value as ClickHouseConnection )
63
+ {
64
+ DestinationTableName = _table . DbName ,
65
+ BatchSize = _source . Count
66
+ } ;
67
+ var data = ToDataTable ( ) ;
68
+ bulkCopyInterface . WriteToServerAsync ( data , default ) . Wait ( ) ;
69
+ }
67
70
return affrows ;
68
71
}
69
72
catch ( Exception ex )
@@ -173,6 +176,63 @@ protected override List<T1> RawExecuteInserted()
173
176
public override Task < long > ExecuteIdentityAsync ( CancellationToken cancellationToken = default ) => SplitExecuteIdentityAsync ( _batchValuesLimit > 0 ? _batchValuesLimit : int . MaxValue , _batchValuesLimit > 0 ? _batchValuesLimit : int . MaxValue , cancellationToken ) ;
174
177
public override Task < List < T1 > > ExecuteInsertedAsync ( CancellationToken cancellationToken = default ) => SplitExecuteInsertedAsync ( _batchValuesLimit > 0 ? _batchValuesLimit : int . MaxValue , _batchValuesLimit > 0 ? _batchValuesLimit : int . MaxValue , cancellationToken ) ;
175
178
179
+ async protected override Task < int > RawExecuteAffrowsAsync ( CancellationToken cancellationToken = default )
180
+ {
181
+ var affrows = 0 ;
182
+ Exception exception = null ;
183
+ Aop . CurdBeforeEventArgs before = null ;
184
+ if ( _source . Count > 1 )
185
+ {
186
+ try
187
+ {
188
+ before = new Aop . CurdBeforeEventArgs ( _table . Type , _table , Aop . CurdType . Insert , null , _params ) ;
189
+ _orm . Aop . CurdBeforeHandler ? . Invoke ( this , before ) ;
190
+ using ( var conn = await _orm . Ado . MasterPool . GetAsync ( ) )
191
+ {
192
+ using var bulkCopyInterface = new ClickHouseBulkCopy ( conn . Value as ClickHouseConnection )
193
+ {
194
+ DestinationTableName = _table . DbName ,
195
+ BatchSize = _source . Count
196
+ } ;
197
+ var data = ToDataTable ( ) ;
198
+ await bulkCopyInterface . WriteToServerAsync ( data , default ) ;
199
+ }
200
+ return affrows ;
201
+ }
202
+ catch ( Exception ex )
203
+ {
204
+ exception = ex ;
205
+ throw ex ;
206
+ }
207
+ finally
208
+ {
209
+ var after = new Aop . CurdAfterEventArgs ( before , exception , affrows ) ;
210
+ _orm . Aop . CurdAfterHandler ? . Invoke ( this , after ) ;
211
+ }
212
+ }
213
+ else
214
+ {
215
+ var sql = this . ToSql ( ) ;
216
+ before = new Aop . CurdBeforeEventArgs ( _table . Type , _table , Aop . CurdType . Insert , sql , _params ) ;
217
+ _orm . Aop . CurdBeforeHandler ? . Invoke ( this , before ) ;
218
+ try
219
+ {
220
+ affrows = await _orm . Ado . ExecuteNonQueryAsync ( _connection , null , CommandType . Text , sql , _commandTimeout , _params ) ;
221
+ }
222
+ catch ( Exception ex )
223
+ {
224
+ exception = ex ;
225
+ throw ex ;
226
+ }
227
+ finally
228
+ {
229
+ var after = new Aop . CurdAfterEventArgs ( before , exception , affrows ) ;
230
+ _orm . Aop . CurdAfterHandler ? . Invoke ( this , after ) ;
231
+ }
232
+ return affrows ;
233
+ }
234
+ }
235
+
176
236
async protected override Task < long > RawExecuteIdentityAsync ( CancellationToken cancellationToken = default )
177
237
{
178
238
//var sql = this.ToSql();
0 commit comments