Open
Description
When spawning a background task to keep a subscription alive, after the task is dropped, the application will panic with "INTERNAL BUG: Subscriber ID already taken." from "convex-0.9.0/src/base_client/mod.rs:122:13"
I'm not sure what the core issue is, or if there's a different way this should be handled, but it is reproducible.
My goal is to always have at least one subscription cached in the background for 10 minutes after the last query, lowered to five seconds for the example.
Reproduction code:
// convex/test.ts
import { v } from "convex/values";
import { query } from "./_generated/server";
export const ping = query({
args: {},
returns: v.string(),
handler: async (ctx) => {
return "pong";
},
});
// Cargo.toml
[package]
name = "convex_internal_bug"
version = "0.1.0"
edition = "2024"
[dependencies]
anyhow = "1.0.98"
convex = "0.9.0"
tokio = "1.46.0"
futures-util = "0.3"
// src/main.rs
use convex::ConvexClient;
use std::{collections::BTreeMap, env, time::Duration};
use futures_util::stream::StreamExt;
use tokio::time::timeout;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let convex = ConvexClient::new(&env::var("CONVEX_URL").unwrap()).await?;
loop {
let mut subscription = convex
.clone()
.subscribe("test:ping", BTreeMap::new())
.await?;
let Some(result) = subscription.next().await else {
panic!("Failed to subscribe to query");
};
// ? Spawn a background task to keep the subscription alive
tokio::spawn(async move {
// ? run until timeout, or subscription ends
while let Ok(Some(_)) = timeout(
Duration::from_secs(5),
subscription.next()
).await { }
println!("subscription ended");
});
println!("result: {:?}", result);
tokio::time::sleep(Duration::from_secs(1)).await;
// ? After the first subscription task ends (5 seconds)
// ? program panics with INTERNAL BUG: Subscriber ID already taken.
}
}