Skip to content

[BUG] Subscriber ID already taken after background task is dropped #8

Open
@tyrantlink

Description

@tyrantlink

When spawning a background task to keep a subscription alive, after the task is dropped, the application will panic with "INTERNAL BUG: Subscriber ID already taken." from "convex-0.9.0/src/base_client/mod.rs:122:13"

I'm not sure what the core issue is, or if there's a different way this should be handled, but it is reproducible.

My goal is to always have at least one subscription cached in the background for 10 minutes after the last query, lowered to five seconds for the example.

Reproduction code:

// convex/test.ts
import { v } from "convex/values";
import { query } from "./_generated/server";

export const ping = query({
  args: {},
  returns: v.string(),
  handler: async (ctx) => {
    return "pong";
  },
});
// Cargo.toml
[package]
name = "convex_internal_bug"
version = "0.1.0"
edition = "2024"

[dependencies]
anyhow = "1.0.98"
convex = "0.9.0"
tokio = "1.46.0"
futures-util = "0.3"
// src/main.rs
use convex::ConvexClient;
use std::{collections::BTreeMap, env, time::Duration};
use futures_util::stream::StreamExt;
use tokio::time::timeout;


#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let convex = ConvexClient::new(&env::var("CONVEX_URL").unwrap()).await?;

    loop {
        let mut subscription = convex
            .clone()
            .subscribe("test:ping", BTreeMap::new())
            .await?;

        let Some(result) = subscription.next().await else {
            panic!("Failed to subscribe to query");
        };

        // ? Spawn a background task to keep the subscription alive
        tokio::spawn(async move {
            // ? run until timeout, or subscription ends
            while let Ok(Some(_)) = timeout(
                Duration::from_secs(5),
                subscription.next()
            ).await { }
            println!("subscription ended");
        });

        println!("result: {:?}", result);
        tokio::time::sleep(Duration::from_secs(1)).await;

        // ? After the first subscription task ends (5 seconds)
        // ? program panics with INTERNAL BUG: Subscriber ID already taken.
    }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions