Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 169 additions & 0 deletions codex-rs/mcp-server/src/codex_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ use crate::outgoing_message::OutgoingNotification;
use codex_core::AuthManager;
use codex_core::CodexConversation;
use codex_core::ConversationManager;
use codex_core::Cursor as RolloutCursor;
use codex_core::NewConversation;
use codex_core::RolloutRecorder;
use codex_core::auth::CLIENT_ID;
use codex_core::config::Config;
use codex_core::config::ConfigOverrides;
Expand Down Expand Up @@ -40,6 +42,7 @@ use codex_protocol::mcp_protocol::AuthMode;
use codex_protocol::mcp_protocol::AuthStatusChangeNotification;
use codex_protocol::mcp_protocol::ClientRequest;
use codex_protocol::mcp_protocol::ConversationId;
use codex_protocol::mcp_protocol::ConversationSummary;
use codex_protocol::mcp_protocol::EXEC_COMMAND_APPROVAL_METHOD;
use codex_protocol::mcp_protocol::ExecArbitraryCommandResponse;
use codex_protocol::mcp_protocol::ExecCommandApprovalParams;
Expand All @@ -50,12 +53,15 @@ use codex_protocol::mcp_protocol::GitDiffToRemoteResponse;
use codex_protocol::mcp_protocol::InputItem as WireInputItem;
use codex_protocol::mcp_protocol::InterruptConversationParams;
use codex_protocol::mcp_protocol::InterruptConversationResponse;
use codex_protocol::mcp_protocol::ListConversationsParams;
use codex_protocol::mcp_protocol::ListConversationsResponse;
use codex_protocol::mcp_protocol::LoginChatGptCompleteNotification;
use codex_protocol::mcp_protocol::LoginChatGptResponse;
use codex_protocol::mcp_protocol::NewConversationParams;
use codex_protocol::mcp_protocol::NewConversationResponse;
use codex_protocol::mcp_protocol::RemoveConversationListenerParams;
use codex_protocol::mcp_protocol::RemoveConversationSubscriptionResponse;
use codex_protocol::mcp_protocol::ResumeConversationParams;
use codex_protocol::mcp_protocol::SendUserMessageParams;
use codex_protocol::mcp_protocol::SendUserMessageResponse;
use codex_protocol::mcp_protocol::SendUserTurnParams;
Expand Down Expand Up @@ -123,6 +129,12 @@ impl CodexMessageProcessor {
// created before processing any subsequent messages.
self.process_new_conversation(request_id, params).await;
}
ClientRequest::ListConversations { request_id, params } => {
self.handle_list_conversations(request_id, params).await;
}
ClientRequest::ResumeConversation { request_id, params } => {
self.handle_resume_conversation(request_id, params).await;
}
ClientRequest::SendUserMessage { request_id, params } => {
self.send_user_message(request_id, params).await;
}
Expand Down Expand Up @@ -535,6 +547,128 @@ impl CodexMessageProcessor {
}
}

async fn handle_list_conversations(
&self,
request_id: RequestId,
params: ListConversationsParams,
) {
let page_size = params.page_size.unwrap_or(25);
// Decode the optional cursor string to a Cursor via serde (Cursor implements Deserialize from string)
let cursor_obj: Option<RolloutCursor> = match params.cursor {
Some(s) => serde_json::from_str::<RolloutCursor>(&format!("\"{s}\"")).ok(),
None => None,
};
let cursor_ref = cursor_obj.as_ref();

let page = match RolloutRecorder::list_conversations(
&self.config.codex_home,
page_size,
cursor_ref,
)
.await
{
Ok(p) => p,
Err(err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to list conversations: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};

// Build summaries
let mut items: Vec<ConversationSummary> = Vec::new();
for it in page.items.into_iter() {
let (timestamp, preview) = extract_ts_and_preview(&it.head);
items.push(ConversationSummary {
path: it.path,
preview,
timestamp,
});
}

// Encode next_cursor as a plain string
let next_cursor = match page.next_cursor {
Some(c) => match serde_json::to_value(&c) {
Ok(serde_json::Value::String(s)) => Some(s),
_ => None,
},
None => None,
};

let response = ListConversationsResponse { items, next_cursor };
self.outgoing.send_response(request_id, response).await;
}

async fn handle_resume_conversation(
&self,
request_id: RequestId,
params: ResumeConversationParams,
) {
// Derive a Config using the same logic as new conversation, honoring overrides if provided.
let config = match params.overrides {
Some(overrides) => {
derive_config_from_params(overrides, self.codex_linux_sandbox_exe.clone())
}
None => Ok(self.config.as_ref().clone()),
};
let config = match config {
Ok(cfg) => cfg,
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("error deriving config: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};

match self
.conversation_manager
.resume_conversation_from_rollout(
config,
params.path.clone(),
self.auth_manager.clone(),
)
.await
{
Ok(NewConversation {
conversation_id,
session_configured,
..
}) => {
let event = codex_core::protocol::Event {
id: "".to_string(),
msg: codex_core::protocol::EventMsg::SessionConfigured(
session_configured.clone(),
),
};
self.outgoing.send_event_as_notification(&event, None).await;

// Reply with conversation id + model and initial messages (when present)
let response = codex_protocol::mcp_protocol::ResumeConversationResponse {
conversation_id: ConversationId(conversation_id),
model: session_configured.model.clone(),
initial_messages: session_configured.initial_messages.clone(),
};
self.outgoing.send_response(request_id, response).await;
}
Err(err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("error resuming conversation: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
}
}
}

async fn send_user_message(&self, request_id: RequestId, params: SendUserMessageParams) {
let SendUserMessageParams {
conversation_id,
Expand Down Expand Up @@ -971,3 +1105,38 @@ async fn on_exec_approval_response(
error!("failed to submit ExecApproval: {err}");
}
}

fn extract_ts_and_preview(head: &[serde_json::Value]) -> (Option<String>, String) {
let ts = head
.first()
.and_then(|v| v.get("timestamp"))
.and_then(|v| v.as_str())
.map(|s| s.to_string());
let preview = find_first_user_text(head).unwrap_or_default();
(ts, preview)
}

fn find_first_user_text(head: &[serde_json::Value]) -> Option<String> {
use codex_core::protocol::InputMessageKind;
for v in head.iter() {
let t = v.get("type").and_then(|x| x.as_str()).unwrap_or("");
if t != "message" {
continue;
}
if v.get("role").and_then(|x| x.as_str()) != Some("user") {
continue;
}
if let Some(arr) = v.get("content").and_then(|c| c.as_array()) {
for c in arr.iter() {
if let (Some("input_text"), Some(txt)) =
(c.get("type").and_then(|t| t.as_str()), c.get("text"))
&& let Some(s) = txt.as_str()
&& matches!(InputMessageKind::from(("user", s)), InputMessageKind::Plain)
{
return Some(s.to_string());
}
}
}
}
None
}
20 changes: 20 additions & 0 deletions codex-rs/mcp-server/tests/common/mcp_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ use codex_protocol::mcp_protocol::AddConversationListenerParams;
use codex_protocol::mcp_protocol::CancelLoginChatGptParams;
use codex_protocol::mcp_protocol::GetAuthStatusParams;
use codex_protocol::mcp_protocol::InterruptConversationParams;
use codex_protocol::mcp_protocol::ListConversationsParams;
use codex_protocol::mcp_protocol::NewConversationParams;
use codex_protocol::mcp_protocol::RemoveConversationListenerParams;
use codex_protocol::mcp_protocol::ResumeConversationParams;
use codex_protocol::mcp_protocol::SendUserMessageParams;
use codex_protocol::mcp_protocol::SendUserTurnParams;

Expand Down Expand Up @@ -245,6 +247,24 @@ impl McpProcess {
self.send_request("getConfigToml", None).await
}

/// Send a `listConversations` JSON-RPC request.
pub async fn send_list_conversations_request(
&mut self,
params: ListConversationsParams,
) -> anyhow::Result<i64> {
let params = Some(serde_json::to_value(params)?);
self.send_request("listConversations", params).await
}

/// Send a `resumeConversation` JSON-RPC request.
pub async fn send_resume_conversation_request(
&mut self,
params: ResumeConversationParams,
) -> anyhow::Result<i64> {
let params = Some(serde_json::to_value(params)?);
self.send_request("resumeConversation", params).await
}

/// Send a `loginChatGpt` JSON-RPC request.
pub async fn send_login_chat_gpt_request(&mut self) -> anyhow::Result<i64> {
self.send_request("loginChatGpt", None).await
Expand Down
Loading