Skip to content

Commit 3c12040

Browse files
author
Peng Gu
committed
Fix create and drop table
1 parent 2f81a22 commit 3c12040

File tree

11 files changed

+238
-121
lines changed

11 files changed

+238
-121
lines changed

libspectrum/include/spectrum.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ extern void spectrum_row_extract_fields(TABLE *table, ::spectrum::Row *spectrum_
4545
extern void spectrum_row_extract_fields(TABLE *table, uchar* record, ::spectrum::Row *spectrum_row);
4646
extern void spectrum_thread_fill(THD *thd, spectrum::Thread *spectrum_thread);
4747

48+
template<typename T>
49+
int spectrum_compute_update_metadata(THD *thd, const T *object);
50+
4851
extern int spectrum_compute_create_table(THD *thd, TABLE *table);
4952
extern int spectrum_compute_delete_table(THD *thd, const dd::Table *table_def, const char* table_path);
5053
extern int spectrum_compute_lock_table(THD *thd, TABLE *table);

libspectrum/src/protobuf/spectrum.proto

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ service StorageNode {
66
rpc BeginAttachableTransaction(BeginAttachableTransactionRequest) returns (BeginAttachableTransactionResponse) {}
77
rpc EndAttachableTransaction(EndAttachableTransactionRequest) returns (EndAttachableTransactionResponse) {}
88

9+
rpc UpdateMetadata(UpdateMetadataRequest) returns (UpdateMetadataResponse) {}
910
rpc AcquireMetadataLock(AcquireMetadataLockRequest) returns (AcquireMetadataLockResponse) {}
1011
rpc UpgradeMetadataLock(UpgradeMetadataLockRequest) returns (UpgradeMetadataLockResponse) {}
1112
rpc ReleaseMetadataLock(ReleaseMetadataLockRequest) returns (ReleaseMetadataLockResponse) {}
@@ -59,6 +60,16 @@ message Field {
5960
bool is_null = 3;
6061
}
6162

63+
message UpdateMetadataRequest {
64+
Thread thread = 1;
65+
string table = 2;
66+
uint64 object_id = 3;
67+
string object_name = 4;
68+
}
69+
70+
message UpdateMetadataResponse {
71+
}
72+
6273
message AcquireMetadataLockRequest {
6374
Thread thread = 1;
6475
int32 namespace = 2;

libspectrum/src/spectrum_common.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ void spectrum_print_row(char* method, TABLE* table, uchar* record) {
127127
row.pop_back();
128128
row.pop_back();
129129
}
130-
sql_print_information("%s[%s:%d]: %s", method, table->s->table_name.str, hander_id, row.c_str());
130+
sql_print_information("%s[%s:%s:%d]: %s", method, table->s->db.str, table->s->table_name.str, hander_id, row.c_str());
131131

132132
repoint_field_to_record(table, record, table->record[0]);
133133
table->read_set = temp_read_set;

libspectrum/src/spectrum_compute.cc

Lines changed: 108 additions & 34 deletions
Large diffs are not rendered by default.

mysql-test/r/spectrum.result

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
drop database if exists spectrum_test_45;
2-
create database spectrum_test_45;
3-
use spectrum_test_45;
1+
drop database if exists spectrum_test_66;
2+
create database spectrum_test_66;
3+
use spectrum_test_66;
44
create table employees (id INT PRIMARY KEY, first_name VARCHAR(50) NOT NULL, last_name VARCHAR(50) NOT NULL, hire_date DATE, salary DECIMAL(10, 2));
55
insert into employees (id, first_name, last_name, hire_date, salary) VALUES (2, 'Peng', 'Gu', '2024-12-01', 60000.00);
66
insert into employees (id, first_name, last_name, hire_date, salary) VALUES (1, 'John', 'Doe', '2024-12-01', 50000.00);
@@ -12,7 +12,7 @@ update employees set salary=555 where id=1;
1212
select * from employees where id=1;
1313
id first_name last_name hire_date salary
1414
1 John Doe 2024-12-01 555.00
15-
use spectrum_test_45;
15+
use spectrum_test_66;
1616
select * from employees;
1717
id first_name last_name hire_date salary
1818
1 John Doe 2024-12-01 555.00

mysql-test/t/spectrum.test

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
connect(compute0,localhost,root,,,3306,/tmp/mysql-spectrum-compute-0.sock);
22
connect(compute1,localhost,root,,,3306,/tmp/mysql-spectrum-compute-1.sock);
33

4-
--let $db=spectrum_test_45
4+
--let $db=spectrum_test_66
55

66
connection compute0;
77

plugin/spectrum_storage/spectrum_storage.cc

Lines changed: 40 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,12 @@ class StorageNodeImpl final : public spectrum::StorageNode::Service {
101101
thd->lex->sql_command = (enum_sql_command)spectrum_thread.sql_command();
102102
thd->tx_isolation = (enum_tx_isolation)spectrum_thread.tx_isolation();
103103
thd->variables.option_bits = spectrum_thread.system_variables().option_bits();
104+
105+
// This is needed because register_uncommitted_object() and register_dropped_object() require a non-default
106+
// auto releaser, even though it's not actually required because uncommitted/dropped objects will be
107+
// released in remove_uncommitted_objects() when transaction commits.
108+
new dd::cache::Dictionary_client::Auto_releaser(thd->dd_client());
109+
104110
return (thd);
105111
}
106112

@@ -190,21 +196,13 @@ class StorageNodeImpl final : public spectrum::StorageNode::Service {
190196
const char* table_name = request->table().c_str();
191197
uint64 handler_id = request->handler();
192198

193-
sql_print_information("CreateTable[%s:%d]", table_name, handler_id);
199+
sql_print_information("CreateTable[%s:%s:%d]", db_name, table_name, handler_id);
194200

195201
thd = handler_create_thd(request->thread());
196202

197-
// Invalidate shared dd cache for db
198-
dd::cache::Dictionary_client::Auto_releaser releaser(thd->dd_client());
199-
thd->dd_client()->acquire(db_name, &schema_def);
200-
if (schema_def == nullptr) {
201-
sql_print_error("CreateTable[%s:%s:%d]: can not find schema definition", db_name, table_name, handler_id);
202-
goto end;
203-
}
204-
thd->dd_client()->invalidate(schema_def);
205-
206203
// Retrive table definition
207-
thd->dd_client()->reload_uncommitted(db_name, table_name, (const dd::Abstract_table **)&table_def);
204+
dd::cache::Dictionary_client::Auto_releaser releaser(thd->dd_client());
205+
thd->dd_client()->acquire(db_name, table_name, (const dd::Abstract_table **)&table_def);
208206
if (table_def == nullptr) {
209207
sql_print_error("CreateTable[%s:%s:%d]: can not find table definition", db_name, table_name, handler_id);
210208
goto end;
@@ -219,9 +217,6 @@ class StorageNodeImpl final : public spectrum::StorageNode::Service {
219217
}
220218

221219
end:
222-
thd->lex->sql_command = SQLCOM_CREATE_TABLE;
223-
handlerton *db_type = get_viable_handlerton_for_create(thd, table_name, create_info);
224-
thd->m_transactional_ddl.init(db_name, table_name, db_type);
225220
return grpc::Status::OK;
226221
}
227222

@@ -283,7 +278,7 @@ class StorageNodeImpl final : public spectrum::StorageNode::Service {
283278
thr_lock_type lock_type = (thr_lock_type)request->lock_type();
284279
thr_locked_row_action lock_action = (thr_locked_row_action)request->lock_action();
285280

286-
sql_print_information("LockTable[%s:%d]: lock_type=%d", table_name, handler_id, lock_type);
281+
sql_print_information("LockTable[%s:%s:%d]: lock_type=%d", db_name, table_name, handler_id, lock_type);
287282

288283
thd = handler_create_thd(request->thread());
289284
table = handler_find_or_open_table(thd, db_name, table_name, handler_id, lock_type, lock_action);
@@ -304,7 +299,7 @@ class StorageNodeImpl final : public spectrum::StorageNode::Service {
304299
thr_lock_type lock_type = (thr_lock_type)request->lock_type();
305300
thr_locked_row_action lock_action = (thr_locked_row_action)request->lock_action();
306301

307-
sql_print_information("UnLockTable[%s:%d]", table_name, handler_id);
302+
sql_print_information("UnLockTable[%s:%s:%d]", db_name, table_name, handler_id);
308303

309304
thd = handler_create_thd(request->thread());
310305
table = handler_find_or_open_table(thd, db_name, table_name, handler_id, lock_type, lock_action);
@@ -321,7 +316,7 @@ class StorageNodeImpl final : public spectrum::StorageNode::Service {
321316
const char* table_name = request->table().c_str();
322317
uint64 handler_id = request->handler();
323318

324-
sql_print_information("CloseTable[%s:%d]", table_name, handler_id);
319+
sql_print_information("CloseTable[%s:%s:%d]", db_name, table_name, handler_id);
325320

326321
thd = handler_create_thd(request->thread());
327322
handler_find_and_close_table(thd, db_name, table_name, handler_id);
@@ -335,7 +330,7 @@ class StorageNodeImpl final : public spectrum::StorageNode::Service {
335330
thr_lock_type lock_type = (thr_lock_type)request->lock_type();
336331
thr_locked_row_action lock_action = (thr_locked_row_action)request->lock_action();
337332

338-
sql_print_information("InitIndex[%s:%d]: index=%d", request->table().c_str(), request->handler(), request->index());
333+
sql_print_information("InitIndex[%s:%s:%d]: index=%d", request->database().c_str(), request->table().c_str(), request->handler(), request->index());
339334

340335
thd = handler_create_thd(request->thread());
341336
table = handler_find_or_open_table(thd, request->database().c_str(), request->table().c_str(), request->handler(), lock_type, lock_action);
@@ -351,7 +346,7 @@ class StorageNodeImpl final : public spectrum::StorageNode::Service {
351346
thr_lock_type lock_type = (thr_lock_type)request->lock_type();
352347
thr_locked_row_action lock_action = (thr_locked_row_action)request->lock_action();
353348

354-
sql_print_information("InitRnd[%s:%d]: scan=%d", request->table().c_str(), request->handler(), request->scan());
349+
sql_print_information("InitRnd[%s:%s:%d]: scan=%d", request->database().c_str(), request->table().c_str(), request->handler(), request->scan());
355350

356351
thd = handler_create_thd(request->thread());
357352
table = handler_find_or_open_table(thd, request->database().c_str(), request->table().c_str(), request->handler(), lock_type, lock_action);
@@ -367,7 +362,7 @@ class StorageNodeImpl final : public spectrum::StorageNode::Service {
367362
thr_lock_type lock_type = (thr_lock_type)request->lock_type();
368363
thr_locked_row_action lock_action = (thr_locked_row_action)request->lock_action();
369364

370-
sql_print_information("EndIndex[%s:%d]", request->table().c_str(), request->handler());
365+
sql_print_information("EndIndex[%s:%s:%d]", request->database().c_str(), request->table().c_str(), request->handler());
371366

372367
thd = handler_create_thd(request->thread());
373368
table = handler_find_or_open_table(thd, request->database().c_str(), request->table().c_str(), request->handler(), lock_type, lock_action);
@@ -394,7 +389,7 @@ class StorageNodeImpl final : public spectrum::StorageNode::Service {
394389
key = (const uchar *)request->key().data();
395390
}
396391

397-
sql_print_information("ReadRow[%s:%d]", request->table().c_str(), request->handler());
392+
sql_print_information("ReadRow[%s:%s:%d]", request->database().c_str(), request->table().c_str(), request->handler());
398393

399394
thd = handler_create_thd(request->thread());
400395
table = handler_find_or_open_table(thd, request->database().c_str(), request->table().c_str(), request->handler(), lock_type, lock_action);
@@ -415,7 +410,7 @@ class StorageNodeImpl final : public spectrum::StorageNode::Service {
415410
thr_locked_row_action lock_action = (thr_locked_row_action)request->lock_action();
416411
int error;
417412

418-
sql_print_information("ReadNextRow[%s:%d]", request->table().c_str(), request->handler());
413+
sql_print_information("ReadNextRow[%s:%s:%d]", request->database().c_str(), request->table().c_str(), request->handler());
419414

420415
thd = handler_create_thd(request->thread());
421416
table = handler_find_or_open_table(thd, request->database().c_str(), request->table().c_str(), request->handler(), lock_type, lock_action);
@@ -444,7 +439,7 @@ class StorageNodeImpl final : public spectrum::StorageNode::Service {
444439
thr_locked_row_action lock_action = (thr_locked_row_action)request->lock_action();
445440
int error;
446441

447-
sql_print_information("ReadPrevRow[%s:%d]", request->table().c_str(), request->handler());
442+
sql_print_information("ReadPrevRow[%s:%s:%d]", request->database().c_str(), request->table().c_str(), request->handler());
448443

449444
thd = handler_create_thd(request->thread());
450445
table = handler_find_or_open_table(thd, request->database().c_str(), request->table().c_str(), request->handler(), lock_type, lock_action);
@@ -584,6 +579,27 @@ class StorageNodeImpl final : public spectrum::StorageNode::Service {
584579
return grpc::Status::OK;
585580
}
586581

582+
::grpc::Status UpdateMetadata(::grpc::ServerContext* context, const ::spectrum::UpdateMetadataRequest* request, ::spectrum::UpdateMetadataResponse* response) {
583+
THD *thd;
584+
const std::string& table = request->table();
585+
const dd::Object_id object_id = request->object_id();
586+
const std::string& object_name = request->object_name();
587+
588+
sql_print_information("UpdateMetadata: table=%s, object_id=%d, object_name=%s",
589+
table.c_str(), object_id, object_name.c_str());
590+
591+
thd = handler_create_thd(request->thread());
592+
593+
if (!strcmp(table.c_str(), "schemata")) {
594+
const dd::Schema *object;
595+
thd->dd_client()->reload_uncommitted(object_id, &object);
596+
} else if (!strcmp(table.c_str(), "tables")) {
597+
const dd::Abstract_table *object;
598+
thd->dd_client()->reload_uncommitted(object_id, &object);
599+
}
600+
return grpc::Status::OK;
601+
}
602+
587603
::grpc::Status AcquireMetadataLock(::grpc::ServerContext* context, const ::spectrum::AcquireMetadataLockRequest* request, ::spectrum::AcquireMetadataLockResponse* response) {
588604
THD *thd;
589605
MDL_key::enum_mdl_namespace namespace_ = static_cast<MDL_key::enum_mdl_namespace>(request->namespace_());
@@ -706,7 +722,7 @@ class StorageNodeImpl final : public spectrum::StorageNode::Service {
706722
mysql_unlock_some_tables(thd, &table, 1);
707723

708724
if (err) {
709-
sql_print_error("ReplicateRow[%s:%d]: error=%d", request->table().c_str(), request->handler(), err);
725+
sql_print_error("ReplicateRow[%s:%s:%d]: error=%d", request->database().c_str(), request->table().c_str(), request->handler(), err);
710726
}
711727
return grpc::Status::OK;
712728
}

sql/dd/cache/dictionary_client.h

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -686,22 +686,20 @@ class Dictionary_client {
686686
template <typename T>
687687
[[nodiscard]] bool acquire(const String_type &schema_name,
688688
const String_type &object_name, const T **object);
689-
689+
690690
/**
691-
Reload an uncommitted object from storage and put it to uncommitted cache
691+
Reload an uncommitted object from storage and put it to uncommitted local cache
692692
693693
@tparam T Dictionary object type.
694-
@param schema_name Name of the schema containing the object.
695-
@param object_name Name of the object.
694+
@param id id of the object.
696695
@param [out] object Dictionary object, if present; otherwise NULL.
697696
698697
@retval false No error.
699698
@retval true Error
700699
*/
701-
702700
template <typename T>
703-
[[nodiscard]] bool reload_uncommitted(const String_type &schema_name,
704-
const String_type &object_name, const T **object);
701+
[[nodiscard]] bool reload_uncommitted(const Object_id id,
702+
const T **object);
705703

706704
/**
707705
Retrieve an object by its schema- and object name.

sql/dd/impl/cache/dictionary_client.cc

Lines changed: 30 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1397,54 +1397,41 @@ bool Dictionary_client::acquire(const String_type &schema_name,
13971397
}
13981398

13991399
template <typename T>
1400-
bool Dictionary_client::reload_uncommitted(const String_type &schema_name,
1401-
const String_type &object_name,
1400+
bool Dictionary_client::reload_uncommitted(Object_id id,
14021401
const T **object) {
1403-
// We must make sure the schema is released and unlocked in the right order.
1404-
Schema_MDL_locker mdl_locker(m_thd);
1405-
Auto_releaser releaser(this);
1406-
1407-
assert(object);
1408-
*object = nullptr;
1409-
1410-
// Get the schema object by name.
1411-
const Schema *schema = nullptr;
1412-
bool error = mdl_locker.ensure_locked(schema_name.c_str()) ||
1413-
acquire(schema_name, &schema);
1414-
1415-
// If there was an error, or if we found no valid schema, return here.
1416-
if (error) {
1417-
assert(m_thd->is_error() || m_thd->killed);
1418-
return true;
1419-
}
1420-
1421-
// A non existing schema is not reported as an error.
1422-
if (!schema) return false;
1423-
1424-
// Create the name key for the object.
1425-
typename T::Name_key key;
1426-
T::update_name_key(&key, schema->id(), object_name);
1427-
1428-
// Cache dictionary objects with UTC time
1429-
Timestamp_timezone_guard ts(m_thd);
1402+
bool error;
1403+
typename T::Id_key key(id);
14301404

14311405
// Read the uncached dictionary object using ISO_READ_UNCOMMITTED
14321406
// isolation level.
14331407
const typename T::Cache_partition *stored_object = nullptr;
14341408
error = Shared_dictionary_cache::instance()->get_uncached(
14351409
m_thd, key, ISO_READ_UNCOMMITTED, &stored_object);
1436-
if (!error) {
1437-
// Here, stored_object is a newly created instance, so we do not need to
1438-
// clone() it, but we must delete it if dynamic cast fails.
1439-
*object = const_cast<T *>(dynamic_cast<const T *>(stored_object));
1440-
if (stored_object && !*object) delete stored_object;
1441-
if (*object) {
1442-
register_uncommitted_object(*object);
1443-
}
1444-
} else
1410+
if (error) {
14451411
assert(m_thd->is_error() || m_thd->killed);
1412+
return true;
1413+
}
1414+
*object = const_cast<T *>(dynamic_cast<const T *>(stored_object));
1415+
if (stored_object && !*object) delete stored_object;
1416+
if (*object) {
1417+
register_uncommitted_object(*object);
1418+
return false;
1419+
}
14461420

1447-
return error;
1421+
// The object is dropped, invalidate the object from shared cache.
1422+
Cache_element<T> *element = nullptr;
1423+
error = Shared_dictionary_cache::instance()->get(m_thd, key, &element);
1424+
if (error) {
1425+
assert(m_thd->is_error() || m_thd->killed);
1426+
return true;
1427+
}
1428+
if (element) {
1429+
T *dropped_object = element->object()->clone_dropped_object_placeholder();
1430+
Shared_dictionary_cache::instance()->drop(element);
1431+
register_dropped_object(dropped_object);
1432+
*object = nullptr;
1433+
}
1434+
return false;
14481435
}
14491436

14501437
template <typename T>
@@ -3088,12 +3075,11 @@ template bool Dictionary_client::acquire_uncached(Object_id, Abstract_table **);
30883075
template bool Dictionary_client::acquire(const String_type &,
30893076
const String_type &,
30903077
const Abstract_table **);
3091-
template bool Dictionary_client::reload_uncommitted(const String_type &,
3092-
const String_type &,
3093-
const Abstract_table **);
30943078
template bool Dictionary_client::acquire_for_modification(const String_type &,
30953079
const String_type &,
30963080
Abstract_table **);
3081+
template bool Dictionary_client::reload_uncommitted(const Object_id,
3082+
const Abstract_table **);
30973083
template void Dictionary_client::remove_uncommitted_objects<Abstract_table>(
30983084
bool);
30993085
template bool Dictionary_client::drop(const Abstract_table *);
@@ -3145,6 +3131,8 @@ template bool Dictionary_client::drop(const Schema *);
31453131
template bool Dictionary_client::store(Schema *);
31463132
template bool Dictionary_client::update(Schema *);
31473133
template void Dictionary_client::dump<Schema>() const;
3134+
template bool Dictionary_client::reload_uncommitted(Object_id,
3135+
const Schema **);
31483136

31493137
template bool Dictionary_client::acquire(Object_id,
31503138
const Spatial_reference_system **);

0 commit comments

Comments
 (0)