-
Notifications
You must be signed in to change notification settings - Fork 1.1k
fix: partial replication data loss #5297
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,7 @@ | |
#include <absl/types/span.h> | ||
|
||
#include "facade/reply_capture.h" | ||
#include "facade/service_interface.h" | ||
#include "server/cluster/cluster_defs.h" | ||
#include "server/journal/types.h" | ||
|
||
|
@@ -22,8 +23,8 @@ class JournalExecutor { | |
|
||
JournalExecutor(JournalExecutor&&) = delete; | ||
|
||
void Execute(DbIndex dbid, absl::Span<journal::ParsedEntry::CmdData> cmds); | ||
std::error_code Execute(DbIndex dbid, journal::ParsedEntry::CmdData& cmd); | ||
// Return true is command executed without error | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. eeh, something is not right here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. typos, fixed |
||
facade::DispatchResult Execute(DbIndex dbid, journal::ParsedEntry::CmdData& cmd); | ||
|
||
void FlushAll(); // Execute FLUSHALL. | ||
void FlushSlots(const cluster::SlotRange& slot_range); | ||
|
@@ -33,7 +34,7 @@ class JournalExecutor { | |
} | ||
|
||
private: | ||
std::error_code Execute(journal::ParsedEntry::CmdData& cmd); | ||
facade::DispatchResult Execute(journal::ParsedEntry::CmdData& cmd); | ||
|
||
// Select database. Ensure it exists if accessed for first time. | ||
void SelectDb(DbIndex dbid); | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -6,6 +6,7 @@ | |||||
#include <chrono> | ||||||
|
||||||
#include "absl/strings/match.h" | ||||||
#include "facade/service_interface.h" | ||||||
|
||||||
extern "C" { | ||||||
#include "redis/rdb.h" | ||||||
|
@@ -913,8 +914,17 @@ void DflyShardReplica::StableSyncDflyReadFb(ExecutionState* cntx) { | |||||
force_ping_ = true; | ||||||
journal_rec_executed_.fetch_add(1, std::memory_order_relaxed); | ||||||
} else { | ||||||
ExecuteTx(std::move(*tx_data), cntx); | ||||||
journal_rec_executed_.fetch_add(1, std::memory_order_relaxed); | ||||||
auto exe_res = ExecuteTx(std::move(*tx_data), cntx); | ||||||
romange marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
if (exe_res) { | ||||||
// We only increment upon successful execution of the transaction. | ||||||
// The reason for this is that during partial sync we sent this | ||||||
// number as the lsn number to resume from. However, if for example | ||||||
// we increment this when a command fails (because the context | ||||||
// got cancelled, e.g, replication connection broke), we will get | ||||||
// inconsistent data because the replica will resume from the next | ||||||
// lsn of the master and this lsn entry will be lost. | ||||||
journal_rec_executed_.fetch_add(1, std::memory_order_relaxed); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe there is a general issue here because // TODO investigate this |
||||||
} | ||||||
} | ||||||
shard_replica_waker_.notifyAll(); | ||||||
} | ||||||
|
@@ -994,15 +1004,14 @@ DflyShardReplica::~DflyShardReplica() { | |||||
JoinFlow(); | ||||||
} | ||||||
|
||||||
void DflyShardReplica::ExecuteTx(TransactionData&& tx_data, ExecutionState* cntx) { | ||||||
bool DflyShardReplica::ExecuteTx(TransactionData&& tx_data, ExecutionState* cntx) { | ||||||
if (!cntx->IsRunning()) { | ||||||
return; | ||||||
return false; | ||||||
} | ||||||
|
||||||
if (!tx_data.IsGlobalCmd()) { | ||||||
VLOG(3) << "Execute cmd without sync between shards. txid: " << tx_data.txid; | ||||||
executor_->Execute(tx_data.dbid, tx_data.command); | ||||||
return; | ||||||
return executor_->Execute(tx_data.dbid, tx_data.command) == facade::DispatchResult::OK; | ||||||
} | ||||||
|
||||||
bool inserted_by_me = | ||||||
|
@@ -1017,26 +1026,27 @@ void DflyShardReplica::ExecuteTx(TransactionData&& tx_data, ExecutionState* cntx | |||||
multi_shard_data.block->Wait(); | ||||||
// Check if we woke up due to cancellation. | ||||||
if (!exec_st_.IsRunning()) | ||||||
return; | ||||||
return false; | ||||||
VLOG(2) << "Execute txid: " << tx_data.txid << " block wait finished"; | ||||||
|
||||||
VLOG(2) << "Execute txid: " << tx_data.txid << " global command execution"; | ||||||
// Wait until all shards flows get to execution step of this transaction. | ||||||
multi_shard_data.barrier.Wait(); | ||||||
// Check if we woke up due to cancellation. | ||||||
if (!exec_st_.IsRunning()) | ||||||
return; | ||||||
return false; | ||||||
// Global command will be executed only from one flow fiber. This ensure corectness of data in | ||||||
// replica. | ||||||
bool execution_res = false; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
if (inserted_by_me) { | ||||||
executor_->Execute(tx_data.dbid, tx_data.command); | ||||||
execution_res = executor_->Execute(tx_data.dbid, tx_data.command) == facade::DispatchResult::OK; | ||||||
} | ||||||
// Wait until exection is done, to make sure we done execute next commands while the global is | ||||||
// executed. | ||||||
multi_shard_data.barrier.Wait(); | ||||||
// Check if we woke up due to cancellation. | ||||||
if (!exec_st_.IsRunning()) | ||||||
return; | ||||||
return false; | ||||||
|
||||||
// Erase from map can be done only after all flow fibers executed the transaction commands. | ||||||
// The last fiber which will decrease the counter to 0 will be the one to erase the data from | ||||||
|
@@ -1046,6 +1056,7 @@ void DflyShardReplica::ExecuteTx(TransactionData&& tx_data, ExecutionState* cntx | |||||
if (val == 1) { | ||||||
multi_shard_exe_->Erase(tx_data.txid); | ||||||
} | ||||||
return inserted_by_me ? execution_res : true; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
} | ||||||
|
||||||
error_code Replica::ParseReplicationHeader(base::IoBuf* io_buf, PSyncResponse* dest) { | ||||||
|
Uh oh!
There was an error while loading. Please reload this page.