diff --git a/codex-rs/mcp-server/src/codex_message_processor.rs b/codex-rs/mcp-server/src/codex_message_processor.rs index 9174403e52..e74ff04d34 100644 --- a/codex-rs/mcp-server/src/codex_message_processor.rs +++ b/codex-rs/mcp-server/src/codex_message_processor.rs @@ -11,7 +11,9 @@ use crate::outgoing_message::OutgoingNotification; use codex_core::AuthManager; use codex_core::CodexConversation; use codex_core::ConversationManager; +use codex_core::Cursor as RolloutCursor; use codex_core::NewConversation; +use codex_core::RolloutRecorder; use codex_core::auth::CLIENT_ID; use codex_core::config::Config; use codex_core::config::ConfigOverrides; @@ -40,6 +42,7 @@ use codex_protocol::mcp_protocol::AuthMode; use codex_protocol::mcp_protocol::AuthStatusChangeNotification; use codex_protocol::mcp_protocol::ClientRequest; use codex_protocol::mcp_protocol::ConversationId; +use codex_protocol::mcp_protocol::ConversationSummary; use codex_protocol::mcp_protocol::EXEC_COMMAND_APPROVAL_METHOD; use codex_protocol::mcp_protocol::ExecArbitraryCommandResponse; use codex_protocol::mcp_protocol::ExecCommandApprovalParams; @@ -50,12 +53,15 @@ use codex_protocol::mcp_protocol::GitDiffToRemoteResponse; use codex_protocol::mcp_protocol::InputItem as WireInputItem; use codex_protocol::mcp_protocol::InterruptConversationParams; use codex_protocol::mcp_protocol::InterruptConversationResponse; +use codex_protocol::mcp_protocol::ListConversationsParams; +use codex_protocol::mcp_protocol::ListConversationsResponse; use codex_protocol::mcp_protocol::LoginChatGptCompleteNotification; use codex_protocol::mcp_protocol::LoginChatGptResponse; use codex_protocol::mcp_protocol::NewConversationParams; use codex_protocol::mcp_protocol::NewConversationResponse; use codex_protocol::mcp_protocol::RemoveConversationListenerParams; use codex_protocol::mcp_protocol::RemoveConversationSubscriptionResponse; +use codex_protocol::mcp_protocol::ResumeConversationParams; use codex_protocol::mcp_protocol::SendUserMessageParams; use codex_protocol::mcp_protocol::SendUserMessageResponse; use codex_protocol::mcp_protocol::SendUserTurnParams; @@ -123,6 +129,12 @@ impl CodexMessageProcessor { // created before processing any subsequent messages. self.process_new_conversation(request_id, params).await; } + ClientRequest::ListConversations { request_id, params } => { + self.handle_list_conversations(request_id, params).await; + } + ClientRequest::ResumeConversation { request_id, params } => { + self.handle_resume_conversation(request_id, params).await; + } ClientRequest::SendUserMessage { request_id, params } => { self.send_user_message(request_id, params).await; } @@ -535,6 +547,128 @@ impl CodexMessageProcessor { } } + async fn handle_list_conversations( + &self, + request_id: RequestId, + params: ListConversationsParams, + ) { + let page_size = params.page_size.unwrap_or(25); + // Decode the optional cursor string to a Cursor via serde (Cursor implements Deserialize from string) + let cursor_obj: Option = match params.cursor { + Some(s) => serde_json::from_str::(&format!("\"{s}\"")).ok(), + None => None, + }; + let cursor_ref = cursor_obj.as_ref(); + + let page = match RolloutRecorder::list_conversations( + &self.config.codex_home, + page_size, + cursor_ref, + ) + .await + { + Ok(p) => p, + Err(err) => { + let error = JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: format!("failed to list conversations: {err}"), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return; + } + }; + + // Build summaries + let mut items: Vec = Vec::new(); + for it in page.items.into_iter() { + let (timestamp, preview) = extract_ts_and_preview(&it.head); + items.push(ConversationSummary { + path: it.path, + preview, + timestamp, + }); + } + + // Encode next_cursor as a plain string + let next_cursor = match page.next_cursor { + Some(c) => match serde_json::to_value(&c) { + Ok(serde_json::Value::String(s)) => Some(s), + _ => None, + }, + None => None, + }; + + let response = ListConversationsResponse { items, next_cursor }; + self.outgoing.send_response(request_id, response).await; + } + + async fn handle_resume_conversation( + &self, + request_id: RequestId, + params: ResumeConversationParams, + ) { + // Derive a Config using the same logic as new conversation, honoring overrides if provided. + let config = match params.overrides { + Some(overrides) => { + derive_config_from_params(overrides, self.codex_linux_sandbox_exe.clone()) + } + None => Ok(self.config.as_ref().clone()), + }; + let config = match config { + Ok(cfg) => cfg, + Err(err) => { + let error = JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!("error deriving config: {err}"), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + return; + } + }; + + match self + .conversation_manager + .resume_conversation_from_rollout( + config, + params.path.clone(), + self.auth_manager.clone(), + ) + .await + { + Ok(NewConversation { + conversation_id, + session_configured, + .. + }) => { + let event = codex_core::protocol::Event { + id: "".to_string(), + msg: codex_core::protocol::EventMsg::SessionConfigured( + session_configured.clone(), + ), + }; + self.outgoing.send_event_as_notification(&event, None).await; + + // Reply with conversation id + model and initial messages (when present) + let response = codex_protocol::mcp_protocol::ResumeConversationResponse { + conversation_id: ConversationId(conversation_id), + model: session_configured.model.clone(), + initial_messages: session_configured.initial_messages.clone(), + }; + self.outgoing.send_response(request_id, response).await; + } + Err(err) => { + let error = JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: format!("error resuming conversation: {err}"), + data: None, + }; + self.outgoing.send_error(request_id, error).await; + } + } + } + async fn send_user_message(&self, request_id: RequestId, params: SendUserMessageParams) { let SendUserMessageParams { conversation_id, @@ -971,3 +1105,38 @@ async fn on_exec_approval_response( error!("failed to submit ExecApproval: {err}"); } } + +fn extract_ts_and_preview(head: &[serde_json::Value]) -> (Option, String) { + let ts = head + .first() + .and_then(|v| v.get("timestamp")) + .and_then(|v| v.as_str()) + .map(|s| s.to_string()); + let preview = find_first_user_text(head).unwrap_or_default(); + (ts, preview) +} + +fn find_first_user_text(head: &[serde_json::Value]) -> Option { + use codex_core::protocol::InputMessageKind; + for v in head.iter() { + let t = v.get("type").and_then(|x| x.as_str()).unwrap_or(""); + if t != "message" { + continue; + } + if v.get("role").and_then(|x| x.as_str()) != Some("user") { + continue; + } + if let Some(arr) = v.get("content").and_then(|c| c.as_array()) { + for c in arr.iter() { + if let (Some("input_text"), Some(txt)) = + (c.get("type").and_then(|t| t.as_str()), c.get("text")) + && let Some(s) = txt.as_str() + && matches!(InputMessageKind::from(("user", s)), InputMessageKind::Plain) + { + return Some(s.to_string()); + } + } + } + } + None +} diff --git a/codex-rs/mcp-server/tests/common/mcp_process.rs b/codex-rs/mcp-server/tests/common/mcp_process.rs index 14939156bb..ca41858e6f 100644 --- a/codex-rs/mcp-server/tests/common/mcp_process.rs +++ b/codex-rs/mcp-server/tests/common/mcp_process.rs @@ -16,8 +16,10 @@ use codex_protocol::mcp_protocol::AddConversationListenerParams; use codex_protocol::mcp_protocol::CancelLoginChatGptParams; use codex_protocol::mcp_protocol::GetAuthStatusParams; use codex_protocol::mcp_protocol::InterruptConversationParams; +use codex_protocol::mcp_protocol::ListConversationsParams; use codex_protocol::mcp_protocol::NewConversationParams; use codex_protocol::mcp_protocol::RemoveConversationListenerParams; +use codex_protocol::mcp_protocol::ResumeConversationParams; use codex_protocol::mcp_protocol::SendUserMessageParams; use codex_protocol::mcp_protocol::SendUserTurnParams; @@ -245,6 +247,24 @@ impl McpProcess { self.send_request("getConfigToml", None).await } + /// Send a `listConversations` JSON-RPC request. + pub async fn send_list_conversations_request( + &mut self, + params: ListConversationsParams, + ) -> anyhow::Result { + let params = Some(serde_json::to_value(params)?); + self.send_request("listConversations", params).await + } + + /// Send a `resumeConversation` JSON-RPC request. + pub async fn send_resume_conversation_request( + &mut self, + params: ResumeConversationParams, + ) -> anyhow::Result { + let params = Some(serde_json::to_value(params)?); + self.send_request("resumeConversation", params).await + } + /// Send a `loginChatGpt` JSON-RPC request. pub async fn send_login_chat_gpt_request(&mut self) -> anyhow::Result { self.send_request("loginChatGpt", None).await diff --git a/codex-rs/mcp-server/tests/suite/list_resume.rs b/codex-rs/mcp-server/tests/suite/list_resume.rs new file mode 100644 index 0000000000..d835e5ca77 --- /dev/null +++ b/codex-rs/mcp-server/tests/suite/list_resume.rs @@ -0,0 +1,172 @@ +use std::fs; +use std::path::Path; + +use codex_protocol::mcp_protocol::ListConversationsParams; +use codex_protocol::mcp_protocol::ListConversationsResponse; +use codex_protocol::mcp_protocol::NewConversationParams; // reused for overrides shape +use codex_protocol::mcp_protocol::ResumeConversationParams; +use codex_protocol::mcp_protocol::ResumeConversationResponse; +use mcp_test_support::McpProcess; +use mcp_test_support::to_response; +use mcp_types::JSONRPCNotification; +use mcp_types::JSONRPCResponse; +use mcp_types::RequestId; +use pretty_assertions::assert_eq; +use serde_json::json; +use tempfile::TempDir; +use tokio::time::timeout; +use uuid::Uuid; + +const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_list_and_resume_conversations() { + // Prepare a temporary CODEX_HOME with a few fake rollout files. + let codex_home = TempDir::new().expect("create temp dir"); + create_fake_rollout( + codex_home.path(), + "2025-01-02T12-00-00", + "2025-01-02T12:00:00Z", + "Hello A", + ); + create_fake_rollout( + codex_home.path(), + "2025-01-01T13-00-00", + "2025-01-01T13:00:00Z", + "Hello B", + ); + create_fake_rollout( + codex_home.path(), + "2025-01-01T12-00-00", + "2025-01-01T12:00:00Z", + "Hello C", + ); + + let mut mcp = McpProcess::new(codex_home.path()) + .await + .expect("spawn mcp process"); + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()) + .await + .expect("init timeout") + .expect("init failed"); + + // Request first page with size 2 + let req_id = mcp + .send_list_conversations_request(ListConversationsParams { + page_size: Some(2), + cursor: None, + }) + .await + .expect("send listConversations"); + let resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(req_id)), + ) + .await + .expect("listConversations timeout") + .expect("listConversations resp"); + let ListConversationsResponse { items, next_cursor } = + to_response::(resp).expect("deserialize response"); + + assert_eq!(items.len(), 2); + // Newest first; preview text should match + assert_eq!(items[0].preview, "Hello A"); + assert_eq!(items[1].preview, "Hello B"); + assert!(items[0].path.is_absolute()); + assert!(next_cursor.is_some()); + + // Request the next page using the cursor + let req_id2 = mcp + .send_list_conversations_request(ListConversationsParams { + page_size: Some(2), + cursor: next_cursor, + }) + .await + .expect("send listConversations page 2"); + let resp2: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(req_id2)), + ) + .await + .expect("listConversations page 2 timeout") + .expect("listConversations page 2 resp"); + let ListConversationsResponse { + items: items2, + next_cursor: next2, + .. + } = to_response::(resp2).expect("deserialize response"); + assert_eq!(items2.len(), 1); + assert_eq!(items2[0].preview, "Hello C"); + assert!(next2.is_some()); + + // Now resume one of the sessions and expect a SessionConfigured notification and response. + let resume_req_id = mcp + .send_resume_conversation_request(ResumeConversationParams { + path: items[0].path.clone(), + overrides: Some(NewConversationParams { + model: Some("o3".to_string()), + ..Default::default() + }), + }) + .await + .expect("send resumeConversation"); + + // Expect a codex/event notification with msg.type == session_configured + let notification: JSONRPCNotification = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("codex/event"), + ) + .await + .expect("session_configured notification timeout") + .expect("session_configured notification"); + // Basic shape assertion: ensure event type is session_configured + let msg_type = notification + .params + .as_ref() + .and_then(|p| p.get("msg")) + .and_then(|m| m.get("type")) + .and_then(|t| t.as_str()) + .unwrap_or(""); + assert_eq!(msg_type, "session_configured"); + + // Then the response for resumeConversation + let resume_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(resume_req_id)), + ) + .await + .expect("resumeConversation timeout") + .expect("resumeConversation resp"); + let ResumeConversationResponse { + conversation_id, .. + } = to_response::(resume_resp) + .expect("deserialize resumeConversation response"); + // conversation id should be a valid UUID + let _ = uuid::Uuid::from_bytes(conversation_id.0.into_bytes()); +} + +fn create_fake_rollout(codex_home: &Path, filename_ts: &str, meta_rfc3339: &str, preview: &str) { + let uuid = Uuid::new_v4(); + // sessions/YYYY/MM/DD/ derived from filename_ts (YYYY-MM-DDThh-mm-ss) + let year = &filename_ts[0..4]; + let month = &filename_ts[5..7]; + let day = &filename_ts[8..10]; + let dir = codex_home.join("sessions").join(year).join(month).join(day); + fs::create_dir_all(&dir).unwrap_or_else(|e| panic!("create sessions dir: {e}")); + + let file_path = dir.join(format!("rollout-{filename_ts}-{uuid}.jsonl")); + let mut lines = Vec::new(); + // Meta line with timestamp + lines.push(json!({"timestamp": meta_rfc3339}).to_string()); + // Minimal user message entry as a persisted response item + lines.push( + json!({ + "type":"message", + "role":"user", + "content":[{"type":"input_text","text": preview}] + }) + .to_string(), + ); + fs::write(file_path, lines.join("\n") + "\n") + .unwrap_or_else(|e| panic!("write rollout file: {e}")); +} diff --git a/codex-rs/mcp-server/tests/suite/mod.rs b/codex-rs/mcp-server/tests/suite/mod.rs index fb6d1cefac..138e11a4af 100644 --- a/codex-rs/mcp-server/tests/suite/mod.rs +++ b/codex-rs/mcp-server/tests/suite/mod.rs @@ -5,5 +5,6 @@ mod codex_tool; mod config; mod create_conversation; mod interrupt; +mod list_resume; mod login; mod send_message; diff --git a/codex-rs/protocol/src/mcp_protocol.rs b/codex-rs/protocol/src/mcp_protocol.rs index e1797b91ad..c46cafde5f 100644 --- a/codex-rs/protocol/src/mcp_protocol.rs +++ b/codex-rs/protocol/src/mcp_protocol.rs @@ -7,6 +7,7 @@ use crate::config_types::ReasoningEffort; use crate::config_types::ReasoningSummary; use crate::config_types::SandboxMode; use crate::protocol::AskForApproval; +use crate::protocol::EventMsg; use crate::protocol::FileChange; use crate::protocol::ReviewDecision; use crate::protocol::SandboxPolicy; @@ -54,6 +55,18 @@ pub enum ClientRequest { request_id: RequestId, params: NewConversationParams, }, + /// List recorded Codex conversations (rollouts) with optional pagination and search. + ListConversations { + #[serde(rename = "id")] + request_id: RequestId, + params: ListConversationsParams, + }, + /// Resume a recorded Codex conversation from a rollout file. + ResumeConversation { + #[serde(rename = "id")] + request_id: RequestId, + params: ResumeConversationParams, + }, SendUserMessage { #[serde(rename = "id")] request_id: RequestId, @@ -164,6 +177,56 @@ pub struct NewConversationResponse { pub model: String, } +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +pub struct ResumeConversationResponse { + pub conversation_id: ConversationId, + pub model: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub initial_messages: Option>, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, TS)] +#[serde(rename_all = "camelCase")] +pub struct ListConversationsParams { + /// Optional page size; defaults to a reasonable server-side value. + #[serde(skip_serializing_if = "Option::is_none")] + pub page_size: Option, + /// Opaque pagination cursor returned by a previous call. + #[serde(skip_serializing_if = "Option::is_none")] + pub cursor: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, TS)] +#[serde(rename_all = "camelCase")] +pub struct ConversationSummary { + pub path: PathBuf, + pub preview: String, + /// RFC3339 timestamp string for the session start, if available. + #[serde(skip_serializing_if = "Option::is_none")] + pub timestamp: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, TS)] +#[serde(rename_all = "camelCase")] +pub struct ListConversationsResponse { + pub items: Vec, + /// Opaque cursor to pass to the next call to continue after the last item. + /// if None, there are no more items to return. + #[serde(skip_serializing_if = "Option::is_none")] + pub next_cursor: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, TS)] +#[serde(rename_all = "camelCase")] +pub struct ResumeConversationParams { + /// Absolute path to the rollout JSONL file. + pub path: PathBuf, + /// Optional overrides to apply when spawning the resumed session. + #[serde(skip_serializing_if = "Option::is_none")] + pub overrides: Option, +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, TS)] #[serde(rename_all = "camelCase")] pub struct AddConversationSubscriptionResponse {