@@ -307,11 +307,8 @@ async def UnSubscribeMarketData(self, inst_ids: list):
307
307
await sub_client .close ()
308
308
return None
309
309
310
- async def ReqOrderInsert (self , sig : Signal ):
311
- sub_client = None
312
- channel_rtn_odr , channel_rsp_err = None , None
310
+ def ReqOrderInsert (self , sig : Signal ):
313
311
try :
314
- sub_client = self .redis_client .pubsub (ignore_subscribe_messages = True )
315
312
request_id = get_next_id ()
316
313
autoid = Autonumber .objects .create ()
317
314
order_ref = f"{ autoid .id :07} { sig .id :05} "
@@ -355,33 +352,10 @@ async def ReqOrderInsert(self, sig: Signal):
355
352
close_time__isnull = True ).first ()
356
353
param_dict ['Direction' ] = ApiStruct .D_Buy if pos .direction == DirectionType .values [
357
354
DirectionType .LONG ] else ApiStruct .D_Sell
358
- if pos .open_time .astimezone ().date () == timezone .localtime ().date () \
359
- and pos .instrument .exchange == ExchangeType .SHFE :
360
- param_dict ['CombOffsetFlag' ] = ApiStruct .OF_CloseToday # 上期所区分平今和平昨
361
355
logger .info (f'{ pos .code } ->{ sig .code } { pos .direction } 头换月开新{ sig .volume } 手 价格: { sig .price } ' )
362
- channel_rtn_odr = self .__trade_response_format .format ('OnRtnOrder' , order_ref )
363
- channel_rsp_err = self .__trade_response_format .format ('OnRspError' , request_id )
364
- # 同时发送多个订单时,订阅 OnRspOrderInsert:0 会导致取消订阅时报超时错误
365
- # channel_rsp_odr = self.__trade_response_format.format('OnRspOrderInsert', 0)
366
- # await sub_client.psubscribe(channel_rtn_odr, channel_rsp_err, channel_rsp_odr)
367
- await sub_client .psubscribe (channel_rtn_odr , channel_rsp_err )
368
- task = asyncio .create_task (self .query_reader (sub_client ))
369
356
self .raw_redis .publish (self .__request_format .format ('ReqOrderInsert' ), json .dumps (param_dict ))
370
- await asyncio .wait_for (task , HANDLER_TIME_OUT )
371
- await sub_client .punsubscribe ()
372
- await sub_client .close ()
373
- result = task .result ()[0 ]
374
- if 'ErrorID' in result :
375
- logger .warning (f"提交订单出错: { ctp_errors [result ['ErrorID' ]]} " )
376
- return False
377
- logger .debug (f"ReqOrderInsert, rst: { result ['StatusMsg' ]} " )
378
- return result
379
357
except Exception as e :
380
358
logger .warning (f'ReqOrderInsert 发生错误: { repr (e )} ' , exc_info = True )
381
- if sub_client and sub_client .in_pubsub and channel_rtn_odr :
382
- await sub_client .unsubscribe ()
383
- await sub_client .close ()
384
- return False
385
359
386
360
async def cancel_order (self , order : dict ):
387
361
sub_client = None
@@ -586,7 +560,7 @@ async def OnRtnOrder(self, _: str, order: dict):
586
560
return
587
561
logger .info (f"{ inst } 以价格 { price } 开多{ volume } 手 重新报单..." )
588
562
signal .price = price
589
- self .io_loop .create_task (self .ReqOrderInsert ( signal ) )
563
+ self .io_loop .call_soon (self .ReqOrderInsert , signal )
590
564
else :
591
565
delta = (last_bar .settlement - price ) * Decimal (0.5 )
592
566
price = price_round (last_bar .settlement - delta , inst .price_tick )
@@ -595,7 +569,7 @@ async def OnRtnOrder(self, _: str, order: dict):
595
569
return
596
570
logger .info (f"{ inst } 以价格 { price } 开空{ volume } 手 重新报单..." )
597
571
signal .price = price
598
- self .io_loop .create_task (self .ReqOrderInsert ( signal ) )
572
+ self .io_loop .call_soon (self .ReqOrderInsert , signal )
599
573
else :
600
574
if order ['Direction' ] == DirectionType .LONG :
601
575
delta = (price - last_bar .settlement ) * Decimal (0.5 )
@@ -605,7 +579,7 @@ async def OnRtnOrder(self, _: str, order: dict):
605
579
return
606
580
logger .info (f"{ inst } 以价格 { price } 买平{ volume } 手 重新报单..." )
607
581
signal .price = price
608
- self .io_loop .create_task (self .ReqOrderInsert ( signal ) )
582
+ self .io_loop .call_soon (self .ReqOrderInsert , signal )
609
583
else :
610
584
delta = (last_bar .settlement - price ) * Decimal (0.5 )
611
585
price = price_round (last_bar .settlement - delta , inst .price_tick )
@@ -614,7 +588,7 @@ async def OnRtnOrder(self, _: str, order: dict):
614
588
return
615
589
logger .info (f"{ inst } 以价格 { price } 卖平{ volume } 手 重新报单..." )
616
590
signal .price = price
617
- self .io_loop .create_task (self .ReqOrderInsert ( signal ) )
591
+ self .io_loop .call_soon (self .ReqOrderInsert , signal )
618
592
except Exception as ee :
619
593
logger .warning (f'OnRtnOrder 发生错误: { repr (ee )} ' , exc_info = True )
620
594
@@ -633,7 +607,7 @@ async def processing_signal1(self):
633
607
~ Q (instrument__exchange = ExchangeType .CFFEX ), trigger_time__gte = self .__last_trading_day ,
634
608
strategy = self .__strategy , instrument__night_trade = False , processed = False ).order_by ('-priority' ):
635
609
logger .info (f'发现日盘信号: { sig } ' )
636
- self .io_loop .create_task (self .ReqOrderInsert ( sig ) )
610
+ self .io_loop .call_soon (self .ReqOrderInsert , sig )
637
611
if (self .__trading_day - self .__last_trading_day ).days > 3 :
638
612
logger .info (f'假期后第一天,处理节前未成交夜盘信号.' )
639
613
self .io_loop .call_soon (asyncio .create_task , self .processing_signal3 (day ))
@@ -648,7 +622,7 @@ async def check_signal1_processed(self):
648
622
~ Q (instrument__exchange = ExchangeType .CFFEX ), trigger_time__gte = self .__last_trading_day ,
649
623
strategy = self .__strategy , instrument__night_trade = False , processed = False ).order_by ('-priority' ):
650
624
logger .info (f'发现遗漏信号: { sig } ' )
651
- self .io_loop .create_task (self .ReqOrderInsert ( sig ) )
625
+ self .io_loop .call_soon (self .ReqOrderInsert , sig )
652
626
653
627
@RegisterCallback (crontab = '25 9 * * *' )
654
628
async def processing_signal2 (self ):
@@ -661,7 +635,7 @@ async def processing_signal2(self):
661
635
instrument__exchange = ExchangeType .CFFEX , trigger_time__gte = self .__last_trading_day ,
662
636
strategy = self .__strategy , instrument__night_trade = False , processed = False ).order_by ('-priority' ):
663
637
logger .info (f'发现股指和国债信号: { sig } ' )
664
- self .io_loop .create_task (self .ReqOrderInsert ( sig ) )
638
+ self .io_loop .call_soon (self .ReqOrderInsert , sig )
665
639
666
640
@RegisterCallback (crontab = '31 9 * * *' )
667
641
async def check_signal2_processed (self ):
@@ -673,7 +647,7 @@ async def check_signal2_processed(self):
673
647
instrument__exchange = ExchangeType .CFFEX , trigger_time__gte = self .__last_trading_day ,
674
648
strategy = self .__strategy , instrument__night_trade = False , processed = False ).order_by ('-priority' ):
675
649
logger .info (f'发现遗漏的股指和国债信号: { sig } ' )
676
- self .io_loop .create_task (self .ReqOrderInsert ( sig ) )
650
+ self .io_loop .call_soon (self .ReqOrderInsert , sig )
677
651
678
652
@RegisterCallback (crontab = '55 20 * * *' )
679
653
async def processing_signal3 (self ):
@@ -686,7 +660,7 @@ async def processing_signal3(self):
686
660
trigger_time__gte = self .__last_trading_day ,
687
661
strategy = self .__strategy , instrument__night_trade = True , processed = False ).order_by ('-priority' ):
688
662
logger .info (f'发现夜盘信号: { sig } ' )
689
- self .io_loop .create_task (self .ReqOrderInsert ( sig ) )
663
+ self .io_loop .call_soon (self .ReqOrderInsert , sig )
690
664
691
665
@RegisterCallback (crontab = '1 21 * * *' )
692
666
async def check_signal3_processed (self ):
@@ -698,7 +672,7 @@ async def check_signal3_processed(self):
698
672
trigger_time__gte = self .__last_trading_day ,
699
673
strategy = self .__strategy , instrument__night_trade = True , processed = False ).order_by ('-priority' ):
700
674
logger .info (f'发现遗漏的夜盘信号: { sig } ' )
701
- self .io_loop .create_task (self .ReqOrderInsert ( sig ) )
675
+ self .io_loop .call_soon (self .ReqOrderInsert , sig )
702
676
703
677
@RegisterCallback (crontab = '20 15 * * *' )
704
678
async def refresh_all (self ):
0 commit comments