Skip to content

Commit 8d1bf5e

Browse files
author
Peng Gu
committed
Add Rollback
1 parent 804850e commit 8d1bf5e

File tree

8 files changed

+152
-9
lines changed

8 files changed

+152
-9
lines changed

libspectrum/include/spectrum.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ enum event_type_enum {
4040
UPDATE_ROW,
4141
DELETE_ROW,
4242
PREPARE,
43-
COMMIT
43+
COMMIT,
44+
ROLLBACK
4445
};
4546
}
4647

@@ -188,6 +189,7 @@ extern int spectrum_compute_update_row(THD *thd, TABLE *table, const uchar *old_
188189
extern int spectrum_compute_delete_row(THD *thd, TABLE *table, const uchar *record);
189190
extern int spectrum_compute_prepare(THD *thd, bool all);
190191
extern int spectrum_compute_commit(THD *thd, bool all);
192+
extern int spectrum_compute_rollback(THD *thd, bool all);
191193
extern int spectrum_compute_begin_attachable_transaction(THD *thd, bool readonly);
192194
extern int spectrum_compute_end_attachable_transaction(THD *thd);
193195
extern int spectrum_compute_acquire_mdl(THD *thd, MDL_ticket *ticket);
@@ -209,6 +211,7 @@ extern int spectrum_log_update_row(THD *thd, TABLE *table, uchar *new_row, uchar
209211
extern int spectrum_log_delete_row(THD *thd, TABLE *table, uchar *row);
210212
extern int spectrum_log_prepare(THD *thd, bool all, bool real_trans);
211213
extern int spectrum_log_commit(THD *thd, bool all, bool real_trans);
214+
extern int spectrum_log_rollback(THD *thd, bool all, bool real_trans);
212215
extern int spectrum_log_write_commit(THD *thd, commit_id_t commit_id, my_xid xid);
213216
extern int spectrum_log_write_event(THD *thd, spectrum::Event *event);
214217

libspectrum/src/protobuf/spectrum.proto

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ service StorageNode {
3434

3535
rpc Prepare(PrepareRequest) returns (PrepareResponse) {}
3636
rpc Commit(CommitRequest) returns (CommitResponse) {}
37+
rpc Rollback(RollbackRequest) returns (RollbackResponse) {}
3738
}
3839

3940
service StorageReplicaNode {
@@ -342,6 +343,15 @@ message CommitRequest {
342343
message CommitResponse {
343344
}
344345

346+
message RollbackRequest {
347+
Thread thread = 1;
348+
bool all = 2;
349+
uint64 commit_id = 3;
350+
}
351+
352+
message RollbackResponse {
353+
}
354+
345355
message BeginAttachableTransactionRequest {
346356
Thread thread = 1;
347357
bool readonly = 2;

libspectrum/src/spectrum_compute.cc

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -644,6 +644,30 @@ int spectrum_compute_commit(THD *thd, bool all) {
644644
return 0;
645645
}
646646

647+
int spectrum_compute_rollback(THD *thd, bool all) {
648+
spectrum::RollbackRequest request;
649+
spectrum::RollbackResponse response;
650+
651+
if (thd->spectrum_compute_disabled) {
652+
return 0;
653+
}
654+
655+
sql_print_information("spectrum_rollback[%d]: all=%d", thd->spectrum_thread_id, all);
656+
657+
spectrum::Thread *spectrum_thread = request.mutable_thread();
658+
spectrum_thread_fill(thd, spectrum_thread);
659+
request.set_all(all);
660+
661+
grpc::ClientContext context;
662+
grpc::Status status = get_storage_primary_client()->Rollback(&context, request, &response);
663+
if (!status.ok()) {
664+
sql_print_error("spectrum_rollback[%d]: error=%s", thd->spectrum_thread_id, status.error_message().c_str());
665+
assert(false);
666+
}
667+
668+
return 0;
669+
}
670+
647671
int spectrum_compute_begin_attachable_transaction(THD *thd, bool readonly) {
648672
spectrum::BeginAttachableTransactionRequest request;
649673
spectrum::BeginAttachableTransactionResponse response;

libspectrum/src/spectrum_log.cc

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -499,8 +499,6 @@ class ReplicationStream {
499499
commit_id_t max_commit_id;
500500
spectrum::CommitList commits;
501501
std::set<commit_id_t> prepared_commit_ids_copy;
502-
TABLE *commit_table;
503-
TABLE *event_table;
504502

505503
spectrum::InitReplicationStreamRequest request;
506504
spectrum::InitReplicationStreamResponse response;
@@ -761,9 +759,13 @@ int spectrum_log_prepare(THD *thd, bool all, bool real_trans) {
761759
mysql_mutex_lock(&prepare_lock);
762760
prepare_locked = true;
763761
commit_id = next_commit_id();
764-
spectrum_log_write_commit(thd, commit_id, xid);
762+
763+
mysql_mutex_lock(&commit_lock);
765764
prepared_commit_ids.insert(commit_id);
765+
mysql_mutex_unlock(&commit_lock);
766+
766767
storage_thd_context->set_commit_id(commit_id);
768+
spectrum_log_write_commit(thd, commit_id, xid);
767769
}
768770

769771
spectrum::PrepareRequest event_body;
@@ -795,8 +797,9 @@ int spectrum_log_commit(THD *thd, bool all, bool real_trans) {
795797

796798
if (real_trans) {
797799
mysql_mutex_lock(&commit_lock);
798-
commit_locked = true;
799800
prepared_commit_ids.erase(commit_id);
801+
mysql_mutex_unlock(&commit_lock);
802+
800803
storage_thd_context->clear_commit_id();
801804
}
802805

@@ -810,9 +813,35 @@ int spectrum_log_commit(THD *thd, bool all, bool real_trans) {
810813
if (replication_stream->write(thd, &event, false)) {
811814
sql_print_error("spectrum_log_commit: stream write error");
812815
}
816+
return 0;
817+
}
818+
819+
int spectrum_log_rollback(THD *thd, bool all, bool real_trans) {
820+
spectrum_storage::THD_context *storage_thd_context = thd->spectrum_storage_context();
821+
my_xid xid = storage_thd_context->xid();
822+
event_id_t event_id = storage_thd_context->next_event_id();
823+
commit_id_t commit_id = storage_thd_context->commit_id();
824+
bool commit_locked = false;
825+
826+
sql_print_information("spectrum_log_rollback: all=%d, real_trans=%d, xid=%d, commit_id=%d", all, real_trans, xid, commit_id);
813827

814-
if (commit_locked) {
828+
if (real_trans) {
829+
mysql_mutex_lock(&commit_lock);
830+
prepared_commit_ids.erase(commit_id);
815831
mysql_mutex_unlock(&commit_lock);
832+
833+
storage_thd_context->clear_commit_id();
834+
}
835+
836+
spectrum::RollbackRequest event_body;
837+
spectrum_thread_fill(thd, event_body.mutable_thread());
838+
event_body.set_all(all);
839+
event_body.set_commit_id(commit_id);
840+
841+
spectrum::Event event;
842+
spectrum_log_build_event(xid, event_id, spectrum::event_type_enum::ROLLBACK, event_body, &event);
843+
if (replication_stream->write(thd, &event, false)) {
844+
sql_print_error("spectrum_log_rollback: stream write error");
816845
}
817846
return 0;
818847
}

libspectrum/src/spectrum_storage.cc

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,10 +101,15 @@ THD *create_thd(const spectrum::Thread &spectrum_thread)
101101
} else {
102102
thd->query_id = (query_id_t)spectrum_thread.query_id();
103103
}
104-
105104
return (thd);
106105
}
107106

107+
void release_thd(THD *thd) {
108+
thd->release_resources();
109+
Global_THD_manager::get_instance()->remove_thd(thd);
110+
delete thd;
111+
}
112+
108113
bool check_and_coalesce_trx_read_write(THD *thd, bool all) {
109114
Transaction_ctx::enum_trx_scope trx_scope =
110115
all ? Transaction_ctx::SESSION : Transaction_ctx::STMT;
@@ -603,6 +608,34 @@ class StorageNodeImpl final : public spectrum::StorageNode::Service {
603608
return grpc::Status::OK;
604609
}
605610

611+
::grpc::Status Rollback(::grpc::ServerContext* context, const ::spectrum::RollbackRequest* request, ::spectrum::RollbackResponse* response) {
612+
THD *thd = create_thd(request->thread());
613+
Transaction_ctx *trn_ctx = thd->get_transaction();
614+
bool all = request->all();
615+
bool real_trans = (all || !trn_ctx->is_active(Transaction_ctx::SESSION));
616+
spectrum_storage::THD_context *thd_storage_context = thd->spectrum_storage_context();
617+
618+
sql_print_information("Rollback[%d]: all=%d, real_trans=%d", thd->spectrum_thread_id, all, real_trans);
619+
620+
if (check_and_coalesce_trx_read_write(thd, all)) {
621+
spectrum_log_open(thd);
622+
spectrum_log_rollback(thd, all, real_trans);
623+
} else {
624+
sql_print_information("Rollback[%d]: skip spectrum log rollback for readonly transaction", thd->spectrum_thread_id);
625+
}
626+
627+
ha_rollback_low(thd, all);
628+
thd_storage_context->set_prepared(false);
629+
630+
spectrum_log_close(thd);
631+
632+
if (real_trans) {
633+
trn_ctx->cleanup();
634+
thd->tx_priority = 0;
635+
}
636+
return grpc::Status::OK;
637+
}
638+
606639
::grpc::Status BeginAttachableTransaction(::grpc::ServerContext* context, const ::spectrum::BeginAttachableTransactionRequest* request, ::spectrum::BeginAttachableTransactionResponse* response) {
607640
THD *thd;
608641

@@ -932,7 +965,6 @@ class StorageReplicaNodeImpl final : public spectrum::StorageReplicaNode::Servic
932965
ha_prepare_low(thd, false);
933966
ha_commit_low(thd, false);
934967
}
935-
936968
ha_prepare_low(thd, all);
937969
thd_storage_context->set_prepared(true);
938970
return 0;
@@ -972,6 +1004,30 @@ class StorageReplicaNodeImpl final : public spectrum::StorageReplicaNode::Servic
9721004
return 0;
9731005
}
9741006

1007+
int Rollback(spectrum::Event &event) {
1008+
spectrum::RollbackRequest request;
1009+
google::protobuf::TextFormat::ParseFromString(event.body(), &request);
1010+
1011+
THD *thd = create_thd(request.thread());
1012+
spectrum_storage::THD_context *thd_storage_context = thd->spectrum_storage_context();
1013+
bool all = request.all();
1014+
commit_id_t commit_id = request.commit_id();
1015+
1016+
sql_print_information("Rollback: all=%d, commit_id=%d", all, commit_id);
1017+
1018+
close_thread_tables(thd);
1019+
1020+
ha_rollback_low(thd, all);
1021+
thd_storage_context->set_prepared(false);
1022+
1023+
spectrum_log_close(thd);
1024+
1025+
if (all) {
1026+
thd->mdl_context.release_transactional_locks();
1027+
}
1028+
return 0;
1029+
}
1030+
9751031
::grpc::Status InitReplicationStream(::grpc::ServerContext* context, const ::spectrum::InitReplicationStreamRequest* request, ::spectrum::InitReplicationStreamResponse* response) {
9761032
mysql_mutex_lock(&replication_lock);
9771033

@@ -1019,6 +1075,10 @@ class StorageReplicaNodeImpl final : public spectrum::StorageReplicaNode::Servic
10191075
Commit(event);
10201076
response.set_event_id(event.id());
10211077
stream->Write(response);
1078+
} else if (event_type == spectrum::event_type_enum::ROLLBACK) {
1079+
Rollback(event);
1080+
response.set_event_id(event.id());
1081+
stream->Write(response);
10221082
}
10231083

10241084
mysql_mutex_unlock(&replication_lock);

mysql-test/r/spectrum.result

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,11 @@ select * from employees where id=1;
2424
id first_name last_name hire_date salary
2525
1 John Doe 2024-12-01 555.00
2626
commit;
27+
update employees set salary=999 where id=1;
28+
select * from employees where id=1;
29+
id first_name last_name hire_date salary
30+
1 John Doe 2024-12-01 999.00
31+
rollback;
32+
select * from employees where id=1;
33+
id first_name last_name hire_date salary
34+
1 John Doe 2024-12-01 888.00

mysql-test/t/spectrum.test

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,9 @@ set session transaction isolation level READ COMMITTED;
3232
select * from employees where id=1;
3333

3434
connection compute0;
35-
commit;
35+
commit;
36+
37+
update employees set salary=999 where id=1;
38+
select * from employees where id=1;
39+
rollback;
40+
select * from employees where id=1;

storage/innobase/handler/ha_innodb.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5942,6 +5942,10 @@ static int innobase_rollback(handlerton *hton, /*!< in: InnoDB handlerton */
59425942
assert(hton == innodb_hton_ptr);
59435943
DBUG_PRINT("trans", ("aborting transaction"));
59445944

5945+
if (is_spectrum_compute()) {
5946+
return spectrum_compute_rollback(thd, rollback_trx);
5947+
}
5948+
59455949
trx_t *trx = check_trx_exists(thd);
59465950

59475951
TrxInInnoDB trx_in_innodb(trx);

0 commit comments

Comments
 (0)