Skip to content

Commit d555b68

Browse files
fix: race condition unified exec (#3644)
Fix race condition without storing an rx in the session
1 parent 9baa5c3 commit d555b68

File tree

3 files changed

+46
-55
lines changed

3 files changed

+46
-55
lines changed

codex-rs/core/src/exec_command/exec_command_session.rs

Lines changed: 15 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,6 @@ pub(crate) struct ExecCommandSession {
1111
/// Broadcast stream of output chunks read from the PTY. New subscribers
1212
/// receive only chunks emitted after they subscribe.
1313
output_tx: broadcast::Sender<Vec<u8>>,
14-
/// Receiver subscribed before the child process starts emitting output so
15-
/// the first caller can consume any early data without races.
16-
initial_output_rx: StdMutex<Option<broadcast::Receiver<Vec<u8>>>>,
1714

1815
/// Child killer handle for termination on drop (can signal independently
1916
/// of a thread blocked in `.wait()`).
@@ -41,39 +38,28 @@ impl ExecCommandSession {
4138
writer_handle: JoinHandle<()>,
4239
wait_handle: JoinHandle<()>,
4340
exit_status: std::sync::Arc<std::sync::atomic::AtomicBool>,
44-
) -> Self {
45-
Self {
46-
writer_tx,
47-
output_tx,
48-
initial_output_rx: StdMutex::new(None),
49-
killer: StdMutex::new(Some(killer)),
50-
reader_handle: StdMutex::new(Some(reader_handle)),
51-
writer_handle: StdMutex::new(Some(writer_handle)),
52-
wait_handle: StdMutex::new(Some(wait_handle)),
53-
exit_status,
54-
}
55-
}
56-
57-
pub(crate) fn set_initial_output_receiver(&self, receiver: broadcast::Receiver<Vec<u8>>) {
58-
if let Ok(mut guard) = self.initial_output_rx.lock()
59-
&& guard.is_none()
60-
{
61-
*guard = Some(receiver);
62-
}
41+
) -> (Self, broadcast::Receiver<Vec<u8>>) {
42+
let initial_output_rx = output_tx.subscribe();
43+
(
44+
Self {
45+
writer_tx,
46+
output_tx,
47+
killer: StdMutex::new(Some(killer)),
48+
reader_handle: StdMutex::new(Some(reader_handle)),
49+
writer_handle: StdMutex::new(Some(writer_handle)),
50+
wait_handle: StdMutex::new(Some(wait_handle)),
51+
exit_status,
52+
},
53+
initial_output_rx,
54+
)
6355
}
6456

6557
pub(crate) fn writer_sender(&self) -> mpsc::Sender<Vec<u8>> {
6658
self.writer_tx.clone()
6759
}
6860

6961
pub(crate) fn output_receiver(&self) -> broadcast::Receiver<Vec<u8>> {
70-
if let Ok(mut guard) = self.initial_output_rx.lock()
71-
&& let Some(receiver) = guard.take()
72-
{
73-
receiver
74-
} else {
75-
self.output_tx.subscribe()
76-
}
62+
self.output_tx.subscribe()
7763
}
7864

7965
pub(crate) fn has_exited(&self) -> bool {

codex-rs/core/src/exec_command/session_manager.rs

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -93,18 +93,16 @@ impl SessionManager {
9393
.fetch_add(1, std::sync::atomic::Ordering::SeqCst),
9494
);
9595

96-
let (session, mut exit_rx) =
97-
create_exec_command_session(params.clone())
98-
.await
99-
.map_err(|err| {
100-
format!(
101-
"failed to create exec command session for session id {}: {err}",
102-
session_id.0
103-
)
104-
})?;
96+
let (session, mut output_rx, mut exit_rx) = create_exec_command_session(params.clone())
97+
.await
98+
.map_err(|err| {
99+
format!(
100+
"failed to create exec command session for session id {}: {err}",
101+
session_id.0
102+
)
103+
})?;
105104

106105
// Insert into session map.
107-
let mut output_rx = session.output_receiver();
108106
self.sessions.lock().await.insert(session_id, session);
109107

110108
// Collect output until either timeout expires or process exits.
@@ -245,7 +243,11 @@ impl SessionManager {
245243
/// Spawn PTY and child process per spawn_exec_command_session logic.
246244
async fn create_exec_command_session(
247245
params: ExecCommandParams,
248-
) -> anyhow::Result<(ExecCommandSession, oneshot::Receiver<i32>)> {
246+
) -> anyhow::Result<(
247+
ExecCommandSession,
248+
tokio::sync::broadcast::Receiver<Vec<u8>>,
249+
oneshot::Receiver<i32>,
250+
)> {
249251
let ExecCommandParams {
250252
cmd,
251253
yield_time_ms: _,
@@ -279,8 +281,6 @@ async fn create_exec_command_session(
279281
let (writer_tx, mut writer_rx) = mpsc::channel::<Vec<u8>>(128);
280282
// Broadcast for streaming PTY output to readers: subscribers receive from subscription time.
281283
let (output_tx, _) = tokio::sync::broadcast::channel::<Vec<u8>>(256);
282-
let initial_output_rx = output_tx.subscribe();
283-
284284
// Reader task: drain PTY and forward chunks to output channel.
285285
let mut reader = pair.master.try_clone_reader()?;
286286
let output_tx_clone = output_tx.clone();
@@ -342,7 +342,7 @@ async fn create_exec_command_session(
342342
});
343343

344344
// Create and store the session with channels.
345-
let session = ExecCommandSession::new(
345+
let (session, initial_output_rx) = ExecCommandSession::new(
346346
writer_tx,
347347
output_tx,
348348
killer,
@@ -351,8 +351,7 @@ async fn create_exec_command_session(
351351
wait_handle,
352352
exit_status,
353353
);
354-
session.set_initial_output_receiver(initial_output_rx);
355-
Ok((session, exit_rx))
354+
Ok((session, initial_output_rx, exit_rx))
356355
}
357356

358357
#[cfg(test)]

codex-rs/core/src/unified_exec/mod.rs

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,13 @@ type OutputBuffer = Arc<Mutex<OutputBufferState>>;
100100
type OutputHandles = (OutputBuffer, Arc<Notify>);
101101

102102
impl ManagedUnifiedExecSession {
103-
fn new(session: ExecCommandSession) -> Self {
103+
fn new(
104+
session: ExecCommandSession,
105+
initial_output_rx: tokio::sync::broadcast::Receiver<Vec<u8>>,
106+
) -> Self {
104107
let output_buffer = Arc::new(Mutex::new(OutputBufferState::default()));
105108
let output_notify = Arc::new(Notify::new());
106-
let mut receiver = session.output_receiver();
109+
let mut receiver = initial_output_rx;
107110
let buffer_clone = Arc::clone(&output_buffer);
108111
let notify_clone = Arc::clone(&output_notify);
109112
let output_task = tokio::spawn(async move {
@@ -193,8 +196,8 @@ impl UnifiedExecSessionManager {
193196
} else {
194197
let command = request.input_chunks.to_vec();
195198
let new_id = self.next_session_id.fetch_add(1, Ordering::SeqCst);
196-
let session = create_unified_exec_session(&command).await?;
197-
let managed_session = ManagedUnifiedExecSession::new(session);
199+
let (session, initial_output_rx) = create_unified_exec_session(&command).await?;
200+
let managed_session = ManagedUnifiedExecSession::new(session, initial_output_rx);
198201
let (buffer, notify) = managed_session.output_handles();
199202
writer_tx = managed_session.writer_sender();
200203
output_buffer = buffer;
@@ -297,7 +300,13 @@ impl UnifiedExecSessionManager {
297300

298301
async fn create_unified_exec_session(
299302
command: &[String],
300-
) -> Result<ExecCommandSession, UnifiedExecError> {
303+
) -> Result<
304+
(
305+
ExecCommandSession,
306+
tokio::sync::broadcast::Receiver<Vec<u8>>,
307+
),
308+
UnifiedExecError,
309+
> {
301310
if command.is_empty() {
302311
return Err(UnifiedExecError::MissingCommandLine);
303312
}
@@ -327,7 +336,6 @@ async fn create_unified_exec_session(
327336

328337
let (writer_tx, mut writer_rx) = mpsc::channel::<Vec<u8>>(128);
329338
let (output_tx, _) = tokio::sync::broadcast::channel::<Vec<u8>>(256);
330-
let initial_output_rx = output_tx.subscribe();
331339

332340
let mut reader = pair
333341
.master
@@ -381,7 +389,7 @@ async fn create_unified_exec_session(
381389
wait_exit_status.store(true, Ordering::SeqCst);
382390
});
383391

384-
let session = ExecCommandSession::new(
392+
let (session, initial_output_rx) = ExecCommandSession::new(
385393
writer_tx,
386394
output_tx,
387395
killer,
@@ -390,9 +398,7 @@ async fn create_unified_exec_session(
390398
wait_handle,
391399
exit_status,
392400
);
393-
session.set_initial_output_receiver(initial_output_rx);
394-
395-
Ok(session)
401+
Ok((session, initial_output_rx))
396402
}
397403

398404
#[cfg(test)]

0 commit comments

Comments
 (0)