Skip to content

refactor: remove is_oom flag from ConnectionContext #5216

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/facade/ok_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ namespace {

class OkService : public ServiceInterface {
public:
void DispatchCommand(ArgSlice args, SinkReplyBuilder* builder, ConnectionContext* cntx) final {
DispatchResult DispatchCommand(ArgSlice args, SinkReplyBuilder* builder,
ConnectionContext* cntx) final {
builder->SendOk();
return DispatchResult::OK;
}

size_t DispatchManyCommands(absl::Span<ArgSlice> args_lists, SinkReplyBuilder* builder,
Expand All @@ -47,9 +49,7 @@ void RunEngine(ProactorPool* pool, AcceptServer* acceptor) {
OkService service;

Connection::Init(pool->size());
pool->Await([](auto*) {
tl_facade_stats = new FacadeStats;
});
pool->Await([](auto*) { tl_facade_stats = new FacadeStats; });

acceptor->AddListener(GetFlag(FLAGS_port), new Listener{Protocol::REDIS, &service});

Expand Down
6 changes: 4 additions & 2 deletions src/facade/service_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ class Connection;
class SinkReplyBuilder;
class MCReplyBuilder;

enum class DispatchResult { OK, OOM, ERROR };

class ServiceInterface {
public:
virtual ~ServiceInterface() {
}

virtual void DispatchCommand(ArgSlice args, SinkReplyBuilder* builder,
ConnectionContext* cntx) = 0;
virtual DispatchResult DispatchCommand(ArgSlice args, SinkReplyBuilder* builder,
ConnectionContext* cntx) = 0;

// Returns number of processed commands
virtual size_t DispatchManyCommands(absl::Span<ArgSlice> args_list, SinkReplyBuilder* builder,
Expand Down
13 changes: 8 additions & 5 deletions src/server/cluster/incoming_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ class ClusterShardMigration {
if (tx_data->opcode == journal::Op::PING) {
// TODO check about ping logic
} else {
ExecuteTx(std::move(*tx_data), cntx);
auto err = ExecuteTx(std::move(*tx_data), cntx);
// Break incoming slot migration if command reported OOM
if (executor_.connection_context()->IsOOM()) {
if (err == std::errc::not_enough_memory) {
cntx->ReportError(std::string{kIncomingMigrationOOM});
in_migration_->ReportFatalError(std::string{kIncomingMigrationOOM});
break;
Expand Down Expand Up @@ -143,12 +143,13 @@ class ClusterShardMigration {
}

private:
void ExecuteTx(TransactionData&& tx_data, ExecutionState* cntx) {
std::error_code ExecuteTx(TransactionData&& tx_data, ExecutionState* cntx) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! We need the same for replication ;)

if (!cntx->IsRunning()) {
return;
return {};
}

if (!tx_data.IsGlobalCmd()) {
executor_.Execute(tx_data.dbid, tx_data.command);
return executor_.Execute(tx_data.dbid, tx_data.command);
} else {
// TODO check which global commands should be supported
std::string error =
Expand All @@ -158,6 +159,8 @@ class ClusterShardMigration {
cntx->ReportError(error);
in_migration_->ReportError(error);
}

return {};
}

uint32_t source_shard_id_;
Expand Down
7 changes: 0 additions & 7 deletions src/server/conn_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -298,10 +298,6 @@ class ConnectionContext : public facade::ConnectionContext {
return conn_state.db_index;
}

bool IsOOM() {
return std::exchange(is_oom_, false);
}

void ChangeSubscription(bool to_add, bool to_reply, CmdArgList args,
facade::RedisReplyBuilder* rb);

Expand Down Expand Up @@ -329,9 +325,6 @@ class ConnectionContext : public facade::ConnectionContext {
// The related connection is bound to main listener or serves the memcached protocol
bool has_main_or_memcache_listener = false;

// OOM reported while executing
bool is_oom_ = false;

private:
void EnableMonitoring(bool enable) {
subscriptions++; // required to support the monitoring
Expand Down
10 changes: 6 additions & 4 deletions src/server/journal/executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ void JournalExecutor::Execute(DbIndex dbid, absl::Span<journal::ParsedEntry::Cmd
}
}

void JournalExecutor::Execute(DbIndex dbid, journal::ParsedEntry::CmdData& cmd) {
std::error_code JournalExecutor::Execute(DbIndex dbid, journal::ParsedEntry::CmdData& cmd) {
SelectDb(dbid);
Execute(cmd);
return Execute(cmd);
}

void JournalExecutor::FlushAll() {
Expand All @@ -70,9 +70,11 @@ void JournalExecutor::FlushSlots(const cluster::SlotRange& slot_range) {
Execute(cmd);
}

void JournalExecutor::Execute(journal::ParsedEntry::CmdData& cmd) {
std::error_code JournalExecutor::Execute(journal::ParsedEntry::CmdData& cmd) {
auto span = CmdArgList{cmd.cmd_args.data(), cmd.cmd_args.size()};
service_->DispatchCommand(span, &reply_builder_, &conn_context_);
auto res = service_->DispatchCommand(span, &reply_builder_, &conn_context_);
return res == facade::DispatchResult::OOM ? make_error_code(errc::not_enough_memory)
: error_code();
}

void JournalExecutor::SelectDb(DbIndex dbid) {
Expand Down
4 changes: 2 additions & 2 deletions src/server/journal/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class JournalExecutor {
JournalExecutor(JournalExecutor&&) = delete;

void Execute(DbIndex dbid, absl::Span<journal::ParsedEntry::CmdData> cmds);
void Execute(DbIndex dbid, journal::ParsedEntry::CmdData& cmd);
std::error_code Execute(DbIndex dbid, journal::ParsedEntry::CmdData& cmd);

void FlushAll(); // Execute FLUSHALL.
void FlushSlots(const cluster::SlotRange& slot_range);
Expand All @@ -33,7 +33,7 @@ class JournalExecutor {
}

private:
void Execute(journal::ParsedEntry::CmdData& cmd);
std::error_code Execute(journal::ParsedEntry::CmdData& cmd);

// Select database. Ensure it exists if accessed for first time.
void SelectDb(DbIndex dbid);
Expand Down
58 changes: 37 additions & 21 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1066,7 +1066,7 @@ optional<ErrorReply> Service::VerifyCommandExecution(const CommandId* cid,
const ConnectionContext* cntx,
CmdArgList tail_args) {
if (ShouldDenyOnOOM(cid)) {
return facade::ErrorReply{kOutOfMemory};
return facade::ErrorReply{OpStatus::OUT_OF_MEMORY};
}

return VerifyConnectionAclStatus(cid, cntx, "ACL rules changed between the MULTI and EXEC",
Expand Down Expand Up @@ -1184,8 +1184,8 @@ std::optional<ErrorReply> Service::VerifyCommandState(const CommandId* cid, CmdA
return VerifyConnectionAclStatus(cid, &dfly_cntx, "has no ACL permissions", tail_args);
}

void Service::DispatchCommand(ArgSlice args, SinkReplyBuilder* builder,
facade::ConnectionContext* cntx) {
DispatchResult Service::DispatchCommand(ArgSlice args, SinkReplyBuilder* builder,
facade::ConnectionContext* cntx) {
DCHECK(!args.empty());
DCHECK_NE(0u, shard_set->size()) << "Init was not called";

Expand All @@ -1196,7 +1196,8 @@ void Service::DispatchCommand(ArgSlice args, SinkReplyBuilder* builder,
const auto [cid, args_no_cmd] = registry_.FindExtended(cmd, args.subspan(1));

if (cid == nullptr) {
return builder->SendError(ReportUnknownCmd(cmd));
builder->SendError(ReportUnknownCmd(cmd));
return DispatchResult::ERROR;
}

ConnectionContext* dfly_cntx = static_cast<ConnectionContext*>(cntx);
Expand Down Expand Up @@ -1229,10 +1230,10 @@ void Service::DispatchCommand(ArgSlice args, SinkReplyBuilder* builder,
// their access revoked and reinstated
if (cid->name() == "REPLCONF" && absl::EqualsIgnoreCase(ArgS(args_no_cmd, 0), "ACK")) {
server_family_.GetDflyCmd()->OnClose(dfly_cntx->conn_state.replication_info.repl_session_id);
return;
return DispatchResult::ERROR;
}
builder->SendError(std::move(*err));
return;
return DispatchResult::ERROR;
}

VLOG_IF(1, cid->opt_mask() & CO::CommandOpt::DANGEROUS)
Expand All @@ -1246,7 +1247,8 @@ void Service::DispatchCommand(ArgSlice args, SinkReplyBuilder* builder,
if (cid->IsWriteOnly()) {
dfly_cntx->conn_state.exec_info.is_write = true;
}
return builder->SendSimpleString("QUEUED");
builder->SendSimpleString("QUEUED");
return DispatchResult::OK;
}

// Create command transaction
Expand All @@ -1259,8 +1261,10 @@ void Service::DispatchCommand(ArgSlice args, SinkReplyBuilder* builder,
OpStatus status = dfly_cntx->transaction->InitByArgs(
dfly_cntx->ns, dfly_cntx->conn_state.db_index, args_no_cmd);

if (status != OpStatus::OK)
return builder->SendError(status);
if (status != OpStatus::OK) {
builder->SendError(status);
return DispatchResult::ERROR;
}
}
} else {
DCHECK(dfly_cntx->transaction == nullptr);
Expand All @@ -1272,8 +1276,10 @@ void Service::DispatchCommand(ArgSlice args, SinkReplyBuilder* builder,
CHECK(dfly_cntx->ns != nullptr);
if (auto st =
dist_trans->InitByArgs(dfly_cntx->ns, dfly_cntx->conn_state.db_index, args_no_cmd);
st != OpStatus::OK)
return builder->SendError(st);
st != OpStatus::OK) {
builder->SendError(st);
return DispatchResult::ERROR;
}
}

dfly_cntx->transaction = dist_trans.get();
Expand All @@ -1285,14 +1291,17 @@ void Service::DispatchCommand(ArgSlice args, SinkReplyBuilder* builder,

dfly_cntx->cid = cid;

if (!InvokeCmd(cid, args_no_cmd, CommandContext{dfly_cntx->transaction, builder, dfly_cntx})) {
auto res =
InvokeCmd(cid, args_no_cmd, CommandContext{dfly_cntx->transaction, builder, dfly_cntx});
if ((res != DispatchResult::OK) && (res != DispatchResult::OOM)) {
builder->SendError("Internal Error");
builder->CloseConnection();
}

if (!dispatching_in_multi) {
dfly_cntx->transaction = nullptr;
}
return res;
}

class ReplyGuard {
Expand Down Expand Up @@ -1345,8 +1354,8 @@ OpResult<void> OpTrackKeys(const OpArgs slice_args, const facade::Connection::We
return OpStatus::OK;
}

bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args,
const CommandContext& cmd_cntx) {
DispatchResult Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args,
const CommandContext& cmd_cntx) {
DCHECK(cid);
DCHECK(!cid->Validate(tail_args));

Expand All @@ -1361,11 +1370,16 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args,
// Bonus points because this allows to continue replication with ACL users who got
// their access revoked and reinstated
if (cid->name() == "REPLCONF" && absl::EqualsIgnoreCase(ArgS(tail_args, 0), "ACK")) {
return true;
return DispatchResult::OK;
}
auto res = err->status;
builder->SendError(std::move(*err));
builder->ConsumeLastError();
return true; // return false only for internal error aborts
if (res == OpStatus::OUT_OF_MEMORY) {
return DispatchResult::OOM;
}

return DispatchResult::OK; // return ERROR only for internal error aborts
}

// We are not sending any admin command in the monitor, and we do not want to
Expand Down Expand Up @@ -1404,13 +1418,14 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args,
invoke_time_usec = cid->Invoke(tail_args, cmd_cntx);
} catch (std::exception& e) {
LOG(ERROR) << "Internal error, system probably unstable " << e.what();
return false;
return DispatchResult::ERROR;
}

DispatchResult res = DispatchResult::OK;
if (std::string reason = builder->ConsumeLastError(); !reason.empty()) {
// Set flag if OOM reported
if (reason == kOutOfMemory) {
cmd_cntx.conn_cntx->is_oom_ = true;
res = DispatchResult::OOM;
}
VLOG(2) << FailedCommandToString(cid->name(), tail_args, reason);
LOG_EVERY_T(WARNING, 1) << FailedCommandToString(cid->name(), tail_args, reason);
Expand Down Expand Up @@ -1454,7 +1469,7 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args,
cntx->last_command_debug.clock = tx->txid();
}

return true;
return res;
}

size_t Service::DispatchManyCommands(absl::Span<CmdArgList> args_list, SinkReplyBuilder* builder,
Expand Down Expand Up @@ -2295,8 +2310,9 @@ void Service::Exec(CmdArgList args, const CommandContext& cmd_cntx) {
}
}

bool ok = InvokeCmd(scmd.Cid(), args, cmd_cntx);
if (!ok || rb->GetError()) // checks for i/o error, not logical error.
auto invoke_res = InvokeCmd(scmd.Cid(), args, cmd_cntx);
if ((invoke_res != DispatchResult::OK) ||
rb->GetError()) // checks for i/o error, not logical error.
break;
}
}
Expand Down
7 changes: 4 additions & 3 deletions src/server/main_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,16 @@ class Service : public facade::ServiceInterface {
void Shutdown();

// Prepare command execution, verify and execute, reply to context
void DispatchCommand(ArgSlice args, facade::SinkReplyBuilder* builder,
facade::ConnectionContext* cntx) final;
facade::DispatchResult DispatchCommand(ArgSlice args, facade::SinkReplyBuilder* builder,
facade::ConnectionContext* cntx) final;

// Execute multiple consecutive commands, possibly in parallel by squashing
size_t DispatchManyCommands(absl::Span<ArgSlice> args_list, facade::SinkReplyBuilder* builder,
facade::ConnectionContext* cntx) final;

// Check VerifyCommandExecution and invoke command with args
bool InvokeCmd(const CommandId* cid, CmdArgList tail_args, const CommandContext& cmd_cntx);
facade::DispatchResult InvokeCmd(const CommandId* cid, CmdArgList tail_args,
const CommandContext& cmd_cntx);

// Verify command can be executed now (check out of memory), always called immediately before
// execution
Expand Down
Loading