Skip to content

Commit 2d9c1c4

Browse files
author
Peng Gu
committed
Add commits in spectrum log
1 parent a9bc2a8 commit 2d9c1c4

File tree

8 files changed

+277
-35
lines changed

8 files changed

+277
-35
lines changed

libspectrum/include/spectrum.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ extern bool is_spectrum_storage_replica();
5353
extern void disable_spectrum_compute(THD *thd);
5454
extern void enable_spectrum_compute(THD *thd);
5555

56+
extern spectrum::StorageNode::Stub* get_storage_primary_client();
57+
5658
extern void spectrum_print_row(char* method, TABLE* table);
5759
extern void spectrum_print_row(char* method, TABLE* table, uchar* record);
5860
extern void spectrum_row_fill_fields(TABLE* table, ::spectrum::Row *spectrum_row);
@@ -94,12 +96,15 @@ extern int spectrum_compute_post_ddl(THD *thd);
9496

9597
extern int spectrum_storage_init();
9698

99+
extern int spectrum_log_init();
97100
extern int spectrum_log_create_table(THD *thd, const char* db_name, const char* table_name, uint64 handler_id);
98101
extern int spectrum_log_delete_table(THD *thd, const char* db_name, const char* table_name, const char* table_path);
99102
extern int spectrum_log_post_ddl(THD *thd);
100103
extern int spectrum_log_update_metadata(THD *thd, const char* table, dd::Object_id object_id, const char* object_name);
101104
extern int spectrum_log_add_row(THD *thd, TABLE *table, uchar *new_row, uchar *old_row);
102105
extern int spectrum_log_prepare(THD *thd, bool all);
103106
extern int spectrum_log_commit(THD *thd, bool all);
107+
extern int spectrum_log_read_commit(THD *thd, uint64 start_id_exclusive, spectrum::Commit *commit);
108+
extern int spectrum_log_read_events_by_xid(THD *thd, uint64 xid, spectrum::EventList *events);
104109

105110
#endif

libspectrum/src/protobuf/spectrum.proto

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,4 +383,18 @@ message Event {
383383
uint64 id = 2;
384384
uint64 type = 3;
385385
string body = 4;
386+
}
387+
388+
message EventList {
389+
repeated Event event = 1;
390+
}
391+
392+
message Commit {
393+
uint64 id = 1;
394+
uint64 xid = 2;
395+
EventList events = 3;
396+
}
397+
398+
message CommitList {
399+
repeated Commit commit = 1;
386400
}

libspectrum/src/spectrum_common.cc

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ this program; if not, write to the Free Software Foundation, Inc.,
5353
#include <mysql/plugin.h>
5454

5555
#include <grpc/grpc.h>
56+
#include <grpcpp/create_channel.h>
5657
#include "spectrum.h"
5758
#include "spectrum_config.h"
5859

@@ -82,6 +83,16 @@ void enable_spectrum_compute(THD *thd) {
8283
thd->spectrum_compute_disabled = false;
8384
}
8485

86+
std::unique_ptr<spectrum::StorageNode::Stub> storage_primary_client;
87+
spectrum::StorageNode::Stub* get_storage_primary_client() {
88+
if (!storage_primary_client) {
89+
node_config_t* node_config = find_storage_primary_node_config();
90+
std::shared_ptr<grpc::Channel> channel = grpc::CreateChannel(node_config->address, grpc::InsecureChannelCredentials());
91+
storage_primary_client = spectrum::StorageNode::NewStub(channel);
92+
}
93+
return storage_primary_client.get();
94+
}
95+
8596
void spectrum_thread_fill_system_variables(THD *thd, spectrum::Thread *spectrum_thread) {
8697
spectrum_thread->mutable_system_variables()->set_option_bits(thd->variables.option_bits);
8798
}

libspectrum/src/spectrum_compute.cc

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -110,21 +110,17 @@ this program; if not, write to the Free Software Foundation, Inc.,
110110
#include <vector>
111111

112112
#include <grpc/grpc.h>
113-
#include <grpcpp/create_channel.h>
114113
#include "spectrum.h"
115114
#include "spectrum_config.h"
116115
#include "spectrum.grpc.pb.h"
117116

118117
bool spectrum_debug = false;
119118

120-
std::unique_ptr<spectrum::StorageNode::Stub> storage_primary_client;
121-
spectrum::StorageNode::Stub* get_storage_primary_client() {
122-
if (!storage_primary_client) {
123-
node_config_t* node_config = find_storage_primary_node_config();
124-
std::shared_ptr<grpc::Channel> channel = grpc::CreateChannel(node_config->address, grpc::InsecureChannelCredentials());
125-
storage_primary_client = spectrum::StorageNode::NewStub(channel);
126-
}
127-
return storage_primary_client.get();
119+
std::time_t spectrum_compute_init_time;
120+
int spectrum_compute_init() {
121+
auto now = std::chrono::system_clock::now();
122+
spectrum_compute_init_time = std::chrono::system_clock::to_time_t(now);
123+
return 0;
128124
}
129125

130126
int spectrum_compute_create_table(THD *thd, TABLE *table) {

0 commit comments

Comments
 (0)