Open
Description
Bug Description
When I try to make a query within a thread I get the following message DB ERROR: pool timed out while waiting for an open connection
, it doesn't matter if I increase the waiting time or the connection pool it always does the same
Minimal Reproduction
pub fn process_tracker_entities(
db_pool: DbPool,
rx_db: std::sync::mpsc::Receiver<TestMsg>,
) -> UseCaseResult<()> {
thread::Builder::new().spawn(move || -> UseCaseResult<()> {
let rt = runtime::Builder::new_current_thread()
.enable_all()
.build()?;
rt.block_on(async move {
let od_tracker_db_model = OdTracksDbModel::init(&db_pool);
// test conections
if let Err(err) = &od_tracker_db_model.find(1).await {
eprintln!("Error in database: {}", err);
}
for fwo in rx_db.iter() {}
Ok(())
})
})?;
Ok(())
}
This thread is invoked from another thread that is in an actor
spawn(async move {
let mut actor_state = actor_state.lock().await;
let result = run_video_processing_engine(&ctx_pool, msg.0).await;
if let Err(err) = result {
println!("Error engine: {}", err);
actor_addr.do_send(StopProcessing);
}
actor_state.is_running = false;
});
This in turn invokes the other thread
pub async fn run_video_processing_engine<'a>(
db_pool: &'a DbPool,
stream_entity_id: StreamEntityId,
) -> UseCaseResult<()> {
let (tx_capture, rx_capture) = new_capture_channel();
let (tx_db, rx_db) = sync_channel::<TestMsg>(1);
// code
process_tracker_entities(db_pool.clone(), rx_db)?;
// code
Ok(())
}
Here I create my connection
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let ctx_pool = DBConnection::create_pool()
.await
.expect("Failed to connect to Postgres.");
let engine_actor = EngineActor::new(ctx_pool.clone()).start();
HttpServer::new(move || {
// .. code
})
.bind(("127.0.0.1", 3383))?
.run()
.await
}
This is how I define my connection
pub struct DBConnection;
impl DBConnection {
pub async fn create_pool() -> InfraResult<Pool<Postgres>> {
let config = DBConfig::new();
let config_clone = config.clone();
let max_connections = if cfg!(test) {
5
} else {
config.pool_max_connections
};
PgPoolOptions::new()
.after_connect(move |conn, _| {
let query = format!("SET search_path = '{}';", &config.search_path.as_str());
Box::pin(async move {
conn.execute(&*query).await?;
Ok(())
})
})
.acquire_timeout(std::time::Duration::from_secs(config.timeout))
.max_connections(max_connections)
.connect_with(Self::db_options(&config_clone))
.await
.map_err(InfraError::from)
}
fn db_options(config: &DBConfig) -> PgConnectOptions {
let ssl_mode = match config.require_ssl {
true => PgSslMode::Require,
_ => PgSslMode::Prefer,
};
PgConnectOptions::new()
.host(&config.host)
.username(&config.user)
.password(&config.pass)
.port(config.port)
.ssl_mode(ssl_mode)
.database(&config.db_name)
}
}
Info
- SQLx version: 0.7.4
- SQLx features enabled:
[ "runtime-tokio-rustls", "bigdecimal", "macros", "postgres","uuid", "chrono", "migrate", "json" ]
- Database server and version: Postgres 16.2
- Operating system: Ubuntu 24.04
rustc --version
: rustc 1.78.0 (9b00956e5 2024-04-29)