@@ -7,6 +7,7 @@ use std::io;
77use std:: io:: Error ;
88use std:: sync:: Arc ;
99use tokio:: spawn;
10+ use tokio:: sync:: Mutex ; // Already present, but ensure it's used for cache
1011
1112pub mod config;
1213pub mod raw_udp;
@@ -16,20 +17,16 @@ pub mod udp;
1617pub struct Object {
1718 config : Arc < ObjectConfig > ,
1819 router : Arc < dyn RouterSet > ,
20+ connector_cache : Arc < Mutex < HashMap < String , Arc < Box < dyn RunConnector > > > > > , // New field
1921}
2022
2123impl Object {
2224 pub fn new ( config : Arc < ObjectConfig > , router : Arc < dyn RouterSet > ) -> Self {
23- Self { config, router }
24- }
25-
26- async fn init_connectors ( & self ) -> io:: Result < HashMap < String , Arc < Box < dyn RunConnector > > > > {
27- let mut cache = HashMap :: with_capacity ( self . config . connector . len ( ) ) ;
28- for ( name, conn_conf) in & self . config . connector {
29- let connector = connector:: create ( conn_conf) . await ?;
30- cache. insert ( name. clone ( ) , Arc :: new ( connector) ) ;
25+ Self {
26+ config,
27+ router,
28+ connector_cache : Arc :: new ( Mutex :: new ( HashMap :: new ( ) ) ) , // Initialize cache
3129 }
32- Ok ( cache)
3330 }
3431
3532 pub async fn start ( & self ) -> io:: Result < ( ) > {
@@ -42,7 +39,8 @@ impl Object {
4239 e
4340 } ) ?;
4441 let main_acceptor = Arc :: new ( acc) ;
45- let connector_cache_outer = Arc :: new ( self . init_connectors ( ) . await ?) ;
42+ let connector_cache_outer = self . connector_cache . clone ( ) ; // Clone cache Arc for the loop
43+
4644 loop {
4745 let ( mut acc_stream, _) = main_acceptor. accept ( ) . await . map_err ( |e| {
4846 error ! ( "Failed to accept connection: {}" , e) ;
@@ -51,7 +49,7 @@ impl Object {
5149 let main_acceptor_clone = Arc :: clone ( & main_acceptor) ;
5250 let router_clone = Arc :: clone ( & router_outer) ;
5351 let config_clone = Arc :: clone ( & config_outer) ;
54- let connector_cache_clone = Arc :: clone ( & connector_cache_outer) ;
52+ let connector_cache_clone = Arc :: clone ( & connector_cache_outer) ; // Clone cache Arc for the spawned task
5553
5654 spawn ( async move {
5755 match acc_stream {
@@ -85,20 +83,53 @@ impl Object {
8583 addr_ref,
8684 )
8785 . await ;
88- let connector_obj =
89- match connector_cache_clone. get ( client_name. as_str ( ) ) {
90- Some ( cached_connector) => {
91- debug ! (
92- "Reusing cached connector for client: {}" ,
93- client_name
94- ) ;
95- Arc :: clone ( cached_connector)
96- }
97- None => {
98- error ! ( "Connector '{}' not preloaded" , client_name) ;
99- return Ok ( ( ) ) ;
100- }
101- } ;
86+ let conn_conf = match config_clone
87+ . connector
88+ . get ( client_name. as_str ( ) )
89+ {
90+ Some ( c) => c,
91+ None => {
92+ error ! ( "Connector config '{}' not found" , client_name) ;
93+ return Ok ( ( ) ) ; // Exit the task for this connection
94+ }
95+ } ;
96+
97+ let connector_obj: Arc < Box < dyn RunConnector > > ;
98+ {
99+ let mut connector_cache_guard =
100+ connector_cache_clone. lock ( ) . await ;
101+ if let Some ( cached_connector) =
102+ connector_cache_guard. get ( client_name. as_str ( ) )
103+ {
104+ connector_obj = Arc :: clone ( cached_connector) ;
105+ debug ! (
106+ "Reusing cached connector for client: {}" ,
107+ client_name
108+ ) ;
109+ } else {
110+ debug ! (
111+ "Creating new connector for client: {}" ,
112+ client_name
113+ ) ;
114+ let new_connector =
115+ match connector:: create ( conn_conf) . await {
116+ Ok ( c) => c,
117+ Err ( e) => {
118+ error ! (
119+ "Failed to create connector '{}': {}" ,
120+ client_name, e
121+ ) ;
122+ return Ok ( ( ) ) ;
123+ }
124+ } ;
125+ let new_connector_arc = Arc :: new ( new_connector) ;
126+ connector_cache_guard. insert (
127+ client_name. clone ( ) ,
128+ Arc :: clone ( & new_connector_arc) ,
129+ ) ;
130+ connector_obj = new_connector_arc;
131+ }
132+ }
102133
103134 debug ! ( "Handshake successful {:?}" , addr_ref) ;
104135 let client_stream_res = Arc :: clone ( & connector_obj)
0 commit comments