Skip to content

Commit f4256dc

Browse files
committed
block_info
1 parent f60d84c commit f4256dc

File tree

3 files changed

+218
-7
lines changed

3 files changed

+218
-7
lines changed

CMakeLists.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,12 @@ add_executable(fill-postgresql src/fill-postgresql.cpp)
1313
target_include_directories(fill-postgresql PUBLIC external/abieos/src external/abieos/external/rapidjson/include PRIVATE ${Boost_INCLUDE_DIR})
1414
target_link_libraries(fill-postgresql Boost::date_time Boost::system Boost::iostreams Boost::program_options pqxx pq)
1515

16+
add_executable(fill-postgresql-sanitize src/fill-postgresql.cpp)
17+
target_include_directories(fill-postgresql-sanitize PUBLIC external/abieos/src external/abieos/external/rapidjson/include PRIVATE ${Boost_INCLUDE_DIR})
18+
target_link_libraries(fill-postgresql-sanitize Boost::date_time Boost::system Boost::iostreams Boost::program_options pqxx pq -fsanitize=address,undefined)
19+
target_compile_options(fill-postgresql-sanitize PUBLIC -fno-omit-frame-pointer -fsanitize=address,undefined)
20+
1621
if (CMAKE_CXX_COMPILER_ID MATCHES "Clang")
1722
target_compile_options(fill-postgresql PRIVATE -Wall -Wextra -Wno-unused-parameter -fcolor-diagnostics)
23+
target_compile_options(fill-postgresql-sanitize PRIVATE -Wall -Wextra -Wno-unused-parameter -fcolor-diagnostics)
1824
endif()

external/abieos

src/fill-postgresql.cpp

Lines changed: 211 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ using std::string;
3737
using std::string_view;
3838
using std::to_string;
3939
using std::unique_ptr;
40+
using std::variant;
4041
using std::vector;
4142

4243
namespace asio = boost::asio;
@@ -509,6 +510,126 @@ bool json_to_native(recurse_transaction_trace& obj, json_to_native_state& state,
509510
return json_to_native(o, state, event, start);
510511
}
511512

513+
struct producer_key {
514+
name producer_name;
515+
public_key block_signing_key;
516+
};
517+
518+
template <typename F>
519+
constexpr void for_each_field(producer_key*, F f) {
520+
f("producer_name", member_ptr<&producer_key::producer_name>{});
521+
f("block_signing_key", member_ptr<&producer_key::block_signing_key>{});
522+
}
523+
524+
struct extension {
525+
uint16_t type;
526+
bytes data;
527+
};
528+
529+
template <typename F>
530+
constexpr void for_each_field(extension*, F f) {
531+
f("type", member_ptr<&extension::type>{});
532+
f("data", member_ptr<&extension::data>{});
533+
}
534+
535+
struct producer_schedule {
536+
uint32_t version;
537+
vector<producer_key> producers;
538+
};
539+
540+
template <typename F>
541+
constexpr void for_each_field(producer_schedule*, F f) {
542+
f("version", member_ptr<&producer_schedule::version>{});
543+
f("producers", member_ptr<&producer_schedule::producers>{});
544+
}
545+
546+
struct transaction_receipt_header {
547+
uint8_t status;
548+
uint32_t cpu_usage_us;
549+
varuint32 net_usage_words;
550+
};
551+
552+
template <typename F>
553+
constexpr void for_each_field(transaction_receipt_header*, F f) {
554+
f("status", member_ptr<&transaction_receipt_header::status>{});
555+
f("cpu_usage_us", member_ptr<&transaction_receipt_header::cpu_usage_us>{});
556+
f("net_usage_words", member_ptr<&transaction_receipt_header::net_usage_words>{});
557+
}
558+
559+
struct packed_transaction {
560+
vector<signature> signatures;
561+
uint8_t compression;
562+
bytes packed_context_free_data;
563+
bytes packed_trx;
564+
};
565+
566+
template <typename F>
567+
constexpr void for_each_field(packed_transaction*, F f) {
568+
f("signatures", member_ptr<&packed_transaction::signatures>{});
569+
f("compression", member_ptr<&packed_transaction::compression>{});
570+
f("packed_context_free_data", member_ptr<&packed_transaction::packed_context_free_data>{});
571+
f("packed_trx", member_ptr<&packed_transaction::packed_trx>{});
572+
}
573+
574+
using transaction_variant = variant<checksum256, packed_transaction>;
575+
576+
struct transaction_receipt : transaction_receipt_header {
577+
transaction_variant trx;
578+
};
579+
580+
template <typename F>
581+
constexpr void for_each_field(transaction_receipt*, F f) {
582+
for_each_field((transaction_receipt_header*)nullptr, f);
583+
f("trx", member_ptr<&transaction_receipt::trx>{});
584+
}
585+
586+
struct block_header {
587+
block_timestamp timestamp;
588+
name producer;
589+
uint16_t confirmed;
590+
checksum256 previous;
591+
checksum256 transaction_mroot;
592+
checksum256 action_mroot;
593+
uint32_t schedule_version;
594+
optional<producer_schedule> new_producers;
595+
vector<extension> header_extensions;
596+
};
597+
598+
template <typename F>
599+
constexpr void for_each_field(block_header*, F f) {
600+
f("timestamp", member_ptr<&block_header::timestamp>{});
601+
f("producer", member_ptr<&block_header::producer>{});
602+
f("confirmed", member_ptr<&block_header::confirmed>{});
603+
f("previous", member_ptr<&block_header::previous>{});
604+
f("transaction_mroot", member_ptr<&block_header::transaction_mroot>{});
605+
f("action_mroot", member_ptr<&block_header::action_mroot>{});
606+
f("schedule_version", member_ptr<&block_header::schedule_version>{});
607+
f("new_producers", member_ptr<&block_header::new_producers>{});
608+
f("header_extensions", member_ptr<&block_header::header_extensions>{});
609+
}
610+
611+
struct signed_block_header : block_header {
612+
signature producer_signature;
613+
};
614+
615+
template <typename F>
616+
constexpr void for_each_field(signed_block_header*, F f) {
617+
for_each_field((block_header*)nullptr, f);
618+
f("producer_signature", member_ptr<&signed_block_header::producer_signature>{});
619+
}
620+
621+
struct signed_block : signed_block_header {
622+
vector<transaction_receipt> transactions;
623+
vector<extension> block_extensions;
624+
};
625+
626+
template <typename F>
627+
constexpr void for_each_field(signed_block*, F f) {
628+
for_each_field((signed_block_header*)nullptr, f);
629+
f("transactions", member_ptr<&signed_block::transactions>{});
630+
f("block_extensions", member_ptr<&signed_block::block_extensions>{});
631+
}
632+
512633
std::vector<char> zlib_decompress(input_buffer data) {
513634
std::vector<char> out;
514635
bio::filtering_ostream decomp;
@@ -683,7 +804,7 @@ struct session : enable_shared_from_this<session> {
683804
".transaction_status_type as enum('executed', 'soft_fail', 'hard_fail', 'delayed', 'expired')");
684805
t.exec(
685806
"create table " + t.quote_name(schema) +
686-
R"(.received_blocks ("block_index" bigint, "block_id" varchar(64), primary key("block_index")))");
807+
R"(.received_block ("block_index" bigint, "block_id" varchar(64), primary key("block_index")))");
687808
t.exec(
688809
"create table " + t.quote_name(schema) +
689810
R"(.fill_status ("head" bigint, "head_id" varchar(64), "irreversible" bigint, "irreversible_id" varchar(64)))");
@@ -713,6 +834,23 @@ struct session : enable_shared_from_this<session> {
713834
t.exec(query);
714835
}
715836

837+
t.exec(
838+
"create table " + t.quote_name(schema) +
839+
R"(.block_info(
840+
"block_index" bigint,
841+
"block_id" varchar(64),
842+
"timestamp" timestamp,
843+
"producer" varchar(13),
844+
"confirmed" integer,
845+
"previous" varchar(64),
846+
"transaction_mroot" varchar(64),
847+
"action_mroot" varchar(64),
848+
"schedule_version" bigint,
849+
"new_producers_version" bigint,
850+
"new_producers" )" +
851+
t.quote_name(schema) + R"(.producer_key[],
852+
primary key("block_index")))");
853+
716854
t.commit();
717855
} // create_tables()
718856

@@ -727,7 +865,7 @@ struct session : enable_shared_from_this<session> {
727865
jarray get_positions(pqxx::work& t) {
728866
jarray result;
729867
auto rows = t.exec(
730-
"select block_index, block_id from " + t.quote_name(schema) + ".received_blocks where block_index >= " +
868+
"select block_index, block_id from " + t.quote_name(schema) + ".received_block where block_index >= " +
731869
to_string(irreversible) + " and block_index <= " + to_string(head) + " order by block_index");
732870
for (auto row : rows) {
733871
result.push_back(jvalue{jobject{
@@ -751,17 +889,18 @@ struct session : enable_shared_from_this<session> {
751889
auto trunc = [&](const std::string& name) {
752890
pipeline.insert("delete from " + t.quote_name(schema) + "." + t.quote_name(name) + " where block_index >= " + to_string(block));
753891
};
754-
trunc("received_blocks");
892+
trunc("received_block");
755893
trunc("action_trace_authorization");
756894
trunc("action_trace_auth_sequence");
757895
trunc("action_trace_ram_delta");
758896
trunc("action_trace");
759897
trunc("transaction_trace");
898+
trunc("block_info");
760899
for (auto& table : abi.tables)
761900
trunc(table.type);
762901

763902
auto result = pipeline.retrieve(
764-
pipeline.insert("select block_id from " + t.quote_name(schema) + ".received_blocks where block_index=" + to_string(block - 1)));
903+
pipeline.insert("select block_id from " + t.quote_name(schema) + ".received_block where block_index=" + to_string(block - 1)));
765904
if (result.empty()) {
766905
head = 0;
767906
head_id = "";
@@ -811,6 +950,8 @@ struct session : enable_shared_from_this<session> {
811950
truncate(t, pipeline, result.this_block->block_num);
812951
if (!head_id.empty() && (!result.prev_block || (string)result.prev_block->block_id != head_id))
813952
throw runtime_error("prev_block does not match");
953+
if (result.block)
954+
receive_block(result.this_block->block_num, result.this_block->block_id, *result.block, bulk, t, pipeline);
814955
if (result.deltas)
815956
receive_deltas(result.this_block->block_num, *result.deltas, bulk, t, pipeline);
816957
if (result.traces)
@@ -823,7 +964,7 @@ struct session : enable_shared_from_this<session> {
823964
if (!bulk)
824965
write_fill_status(t, pipeline);
825966
pipeline.insert(
826-
"insert into " + t.quote_name(schema) + ".received_blocks (block_index, block_id) values (" +
967+
"insert into " + t.quote_name(schema) + ".received_block (block_index, block_id) values (" +
827968
to_string(result.this_block->block_num) + ", '" + string(result.this_block->block_id) + "')");
828969

829970
pipeline.complete();
@@ -933,6 +1074,70 @@ struct session : enable_shared_from_this<session> {
9331074
}
9341075
} // fill_value
9351076

1077+
void
1078+
receive_block(uint32_t block_index, const checksum256& block_id, input_buffer bin, bool bulk, pqxx::work& t, pqxx::pipeline& pipeline) {
1079+
signed_block block;
1080+
if (!bin_to_native(block, bin))
1081+
throw runtime_error("block conversion error");
1082+
1083+
string fields = "block_index, block_id, timestamp, producer, confirmed, previous, transaction_mroot, action_mroot, "
1084+
"schedule_version, new_producers_version, new_producers";
1085+
string values;
1086+
if (bulk)
1087+
values = to_string(block_index) + "\t" + //
1088+
(string)(block_id) + "\t" + //
1089+
(string)(block.timestamp) + "\t" + //
1090+
(string)(block.producer) + "\t" + //
1091+
to_string(block.confirmed) + "\t" + //
1092+
(string)(block.previous) + "\t" + //
1093+
(string)(block.transaction_mroot) + "\t" + //
1094+
(string)(block.action_mroot) + "\t" + //
1095+
to_string(block.schedule_version) + "\t" + //
1096+
to_string(block.new_producers ? block.new_producers->version : 0); //
1097+
else
1098+
values = to_string(block_index) + ", '" + //
1099+
(string)(block_id) + "', '" + //
1100+
(string)(block.timestamp) + "', '" + //
1101+
(string)(block.producer) + "', " + //
1102+
to_string(block.confirmed) + ", '" + //
1103+
(string)(block.previous) + "', '" + //
1104+
(string)(block.transaction_mroot) + "', '" + //
1105+
(string)(block.action_mroot) + "', " + //
1106+
to_string(block.schedule_version) + ", " + //
1107+
to_string(block.new_producers ? block.new_producers->version : 0); //
1108+
1109+
if (block.new_producers) {
1110+
if (bulk)
1111+
values += "\t{";
1112+
else
1113+
values += ", array[";
1114+
for (auto& x : block.new_producers->producers) {
1115+
if (&x != &block.new_producers->producers[0])
1116+
values += ",";
1117+
if (bulk)
1118+
values += "\"(" + (string)x.producer_name + "," + public_key_to_string(x.block_signing_key) + ")\"";
1119+
else
1120+
values += "('" + (string)x.producer_name + "','" + public_key_to_string(x.block_signing_key) + "')";
1121+
}
1122+
if (bulk)
1123+
values += "}";
1124+
else
1125+
values += "]::" + t.quote_name(schema) + ".producer_key[]";
1126+
} else {
1127+
if (bulk)
1128+
values += "\t\\N";
1129+
else
1130+
values += ", null";
1131+
}
1132+
1133+
if (bulk) {
1134+
write_stream(block_index, t, "block_info", values);
1135+
} else {
1136+
string query = "insert into " + t.quote_name(schema) + ".block_info(" + fields + ") values (" + values + ")";
1137+
pipeline.insert(query);
1138+
}
1139+
} // receive_block
1140+
9361141
void receive_deltas(uint32_t block_num, input_buffer buf, bool bulk, pqxx::work& t, pqxx::pipeline& pipeline) {
9371142
auto data = zlib_decompress(buf);
9381143
input_buffer bin{data.data(), data.data() + data.size()};
@@ -1095,7 +1300,7 @@ struct session : enable_shared_from_this<session> {
10951300
{{"max_messages_in_flight"s}, {"4294967295"s}},
10961301
{{"have_positions"s}, {positions}},
10971302
{{"irreversible_only"s}, {false}},
1098-
{{"fetch_block"s}, {false}},
1303+
{{"fetch_block"s}, {true}},
10991304
{{"fetch_traces"s}, {true}},
11001305
{{"fetch_deltas"s}, {true}},
11011306
}}}});

0 commit comments

Comments
 (0)