Skip to content

chore: replace dcheck with check for partial sync #5277

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions .github/actions/regression-tests/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,16 @@ runs:
export DRAGONFLY_PATH="${GITHUB_WORKSPACE}/${{inputs.build-folder-name}}/${{inputs.dfly-executable}}"
export UBSAN_OPTIONS=print_stacktrace=1:halt_on_error=1 # to crash on errors

if [[ "${{inputs.epoll}}" == 'epoll' ]]; then
export FILTER="${{inputs.filter}} and not exclude_epoll"
# Run only replication tests with epoll
timeout 60m pytest -m "$FILTER" --durations=10 --timeout=300 --color=yes --json-report --json-report-file=report.json dragonfly --df force_epoll=true --log-cli-level=INFO || code=$?
else
export FILTER="${{inputs.filter}}"
# Run only replication tests with iouring
timeout 60m pytest -m "$FILTER" --durations=10 --timeout=300 --color=yes --json-report --json-report-file=report.json dragonfly --log-cli-level=INFO || code=$?
fi
for run in {1..1000}; do
#python3 -m pytest dragonfly/replication_test.py --log-cli-level=INFO -k "test_replication_timeout_on_full_sync"
#taskset --cpu-list 0-1 python3 -m pytest dragonfly/replication_test.py --log-cli-level=INFO -k "test_replication_all"
python3 -m pytest dragonfly/replication_test.py -k "test_partial_sync"
ret=$?
if [ $ret -ne 0 ]; then
echo "Error"
exit 1
fi
done

# timeout returns 124 if we exceeded the timeout duration
if [[ $code -eq 124 ]]; then
Expand Down
44 changes: 1 addition & 43 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -129,58 +129,16 @@ jobs:
- name: Build
run: |
cd ${GITHUB_WORKSPACE}/build
ninja search_family_test
df -h
echo "-----------------------------"
ninja src/all
ninja dragonfly

- name: PostFail
if: failure()
run: |
echo "disk space is:"
df -h

- name: C++ Unit Tests - IoUring
run: |
cd ${GITHUB_WORKSPACE}/build
echo Run ctest -V -L DFLY

GLOG_alsologtostderr=1 GLOG_vmodule=rdb_load=1,rdb_save=1,snapshot=1,op_manager=1,op_manager_test=1 \
FLAGS_fiber_safety_margin=4096 FLAGS_list_experimental_v2=true timeout 20m ctest -V -L DFLY -E allocation_tracker_test

# Run allocation tracker test separately without alsologtostderr because it generates a TON of logs.
FLAGS_fiber_safety_margin=4096 timeout 5m ./allocation_tracker_test

timeout 5m ./dragonfly_test
timeout 5m ./json_family_test --jsonpathv2=false
timeout 5m ./tiered_storage_test --vmodule=db_slice=2 --logtostderr


- name: C++ Unit Tests - Epoll
run: |
cd ${GITHUB_WORKSPACE}/build

# Create a rule that automatically prints stacktrace upon segfault
cat > ./init.gdb <<EOF
catch signal SIGSEGV
command
bt
end
EOF

gdb -ix ./init.gdb --batch -ex r --args ./dragonfly_test --force_epoll
FLAGS_fiber_safety_margin=4096 FLAGS_force_epoll=true GLOG_vmodule=rdb_load=1,rdb_save=1,snapshot=1 timeout 20m ctest -V -L DFLY -E allocation_tracker_test

FLAGS_fiber_safety_margin=4096 FLAGS_force_epoll=true timeout 5m ./allocation_tracker_test

- name: C++ Unit Tests - IoUring with cluster mode
run: |
FLAGS_fiber_safety_margin=4096 FLAGS_cluster_mode=emulated timeout 20m ctest -V -L DFLY

- name: C++ Unit Tests - IoUring with cluster mode and FLAGS_lock_on_hashtags
run: |
FLAGS_fiber_safety_margin=4096 FLAGS_cluster_mode=emulated FLAGS_lock_on_hashtags=true timeout 20m ctest -V -L DFLY

- name: Upload unit logs on failure
if: failure()
uses: actions/upload-artifact@v4
Expand Down
1 change: 0 additions & 1 deletion src/server/dflycmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,6 @@ void DflyCmd::Flow(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext* cn
}
} else if (seqid.has_value()) {
if (sf_->journal()->IsLSNInBuffer(*seqid) || sf_->journal()->GetLsn() == *seqid) {
auto& flow = replica_ptr->flows[flow_id];
flow.start_partial_sync_at = *seqid;
VLOG(1) << "Partial sync requested from LSN=" << flow.start_partial_sync_at.value()
<< " and is available. (current_lsn=" << sf_->journal()->GetLsn() << ")";
Expand Down
4 changes: 2 additions & 2 deletions src/server/journal/journal_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,11 @@ void JournalSlice::CallOnChange(JournalItem* item) {
// We preserve order here. After ConsumeJournalChange there can reordering
if (ring_buffer_.size() == ring_buffer_.capacity()) {
const size_t bytes_removed = ring_buffer_.front().data.size() + sizeof(*item);
DCHECK_GE(ring_buffer_bytes, bytes_removed);
CHECK_GE(ring_buffer_bytes, bytes_removed);
ring_buffer_bytes -= bytes_removed;
}
if (!ring_buffer_.empty()) {
DCHECK(item->lsn == ring_buffer_.back().lsn + 1);
CHECK(item->lsn == ring_buffer_.back().lsn + 1);
}
ring_buffer_.push_back(std::move(*item));
ring_buffer_bytes += sizeof(*item) + ring_buffer_.back().data.size();
Expand Down
5 changes: 4 additions & 1 deletion src/server/replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,7 @@ void DflyShardReplica::StableSyncDflyReadFb(ExecutionState* cntx) {
} else if (tx_data->opcode == journal::Op::PING) {
force_ping_ = true;
journal_rec_executed_.fetch_add(1, std::memory_order_relaxed);
LOG(INFO) << "PING";
} else {
ExecuteTx(std::move(*tx_data), cntx);
journal_rec_executed_.fetch_add(1, std::memory_order_relaxed);
Expand Down Expand Up @@ -996,14 +997,16 @@ DflyShardReplica::~DflyShardReplica() {

void DflyShardReplica::ExecuteTx(TransactionData&& tx_data, ExecutionState* cntx) {
if (!cntx->IsRunning()) {
LOG(INFO) << "ExecuteTx became NO-OP";
return;
}

if (!tx_data.IsGlobalCmd()) {
VLOG(3) << "Execute cmd without sync between shards. txid: " << tx_data.txid;
LOG(INFO) << "Execute cmd without sync between shards. txid: " << tx_data.txid;
executor_->Execute(tx_data.dbid, tx_data.command);
return;
}
LOG(INFO) << "GLOBAL";

bool inserted_by_me =
multi_shard_exe_->InsertTxToSharedMap(tx_data.txid, master_context_.num_flows);
Expand Down
17 changes: 15 additions & 2 deletions tests/dragonfly/replication_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3197,7 +3197,7 @@ async def test_bug_5221(df_factory):


@pytest.mark.parametrize("proactors", [1, 4, 6])
@pytest.mark.parametrize("backlog_len", [1, 256, 1024, 1300])
@pytest.mark.parametrize("backlog_len", [1])
async def test_partial_sync(df_factory, df_seeder_factory, proactors, backlog_len):
keys = 5_000
if proactors > 1:
Expand All @@ -3216,7 +3216,8 @@ async def stream(client, total):
for i in range(0, total):
prefix = "{prefix}"
# Seed to one shard only. This will eventually cause one of the flows to become stale.
await client.execute_command(f"SET {prefix}foo{i} bar{i}")
res = await client.execute_command(f"SET {prefix}foo{i} bar{i}")
assert res == "OK"

async with replica.client() as c_replica, master.client() as c_master:
seeder = SeederV2(key_target=keys)
Expand All @@ -3241,6 +3242,18 @@ async def stream(client, total):
hash1, hash2 = await asyncio.gather(
*(SeederV2.capture(c) for c in (c_master, c_replica))
)
if hash1 != hash2:
res1 = await c_master.execute_command("dbsize")
res2 = await c_replica.execute_command("dbsize")
logging.info(f"master dbsize is {res1}")
logging.info(f"replica dbsize is {res2}")
for i in range(0, backlog_len):
prefix = "{prefix}"
res3 = await c_master.execute_command(f"GET {prefix}foo{i}")
res4 = await c_replica.execute_command(f"GET {prefix}foo{i}")
logging.info(f"called GET of {i} with value: {res3}")
logging.info(f"called GET of {i} with value: {res4}")

assert hash1 == hash2

await proxy.close()
Expand Down
Loading