Skip to content

Commit 59344d6

Browse files
committed
large deltas mode
1 parent 6bcd4fd commit 59344d6

File tree

1 file changed

+16
-2
lines changed

1 file changed

+16
-2
lines changed

src/fill-postgresql.cpp

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -732,6 +732,7 @@ struct session : enable_shared_from_this<session> {
732732
, create_schema(create_schema) {
733733

734734
stream.binary(true);
735+
stream.read_message_max(1024 * 1024 * 1024);
735736
if (drop_schema) {
736737
pqxx::work t(sql_connection);
737738
t.exec("drop schema if exists " + t.quote_name(this->schema) + " cascade");
@@ -969,7 +970,13 @@ struct session : enable_shared_from_this<session> {
969970
if (!result.this_block)
970971
return true;
971972

972-
bool bulk = result.this_block->block_num + 4 < result.last_irreversible.block_num;
973+
bool bulk = result.this_block->block_num + 4 < result.last_irreversible.block_num;
974+
bool large_deltas = false;
975+
if (!bulk && result.deltas && result.deltas->end - result.deltas->pos >= 10 * 1024 * 1024) {
976+
cerr << "large deltas size: " << (result.deltas->end - result.deltas->pos) << "\n";
977+
bulk = true;
978+
large_deltas = true;
979+
}
973980

974981
if (stop_before && result.this_block->block_num >= stop_before) {
975982
close_streams();
@@ -984,7 +991,7 @@ struct session : enable_shared_from_this<session> {
984991
bulk = false;
985992
}
986993

987-
if (!bulk || !(result.this_block->block_num % 200))
994+
if (!bulk || large_deltas || !(result.this_block->block_num % 200))
988995
close_streams();
989996
if (!bulk) {
990997
auto n = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
@@ -1016,6 +1023,8 @@ struct session : enable_shared_from_this<session> {
10161023

10171024
pipeline.complete();
10181025
t.commit();
1026+
if (large_deltas)
1027+
close_streams();
10191028
return true;
10201029
} // receive_result()
10211030

@@ -1161,13 +1170,17 @@ struct session : enable_shared_from_this<session> {
11611170
throw std::runtime_error("don't know how to proccess " + variant_type.name);
11621171
auto& type = *variant_type.fields[0].type;
11631172

1173+
size_t num_processed = 0;
11641174
for (auto& row : table_delta.rows) {
1175+
if (table_delta.rows.size() > 1000 && !(num_processed % 10000))
1176+
cerr << table_delta.name << " row " << num_processed << " of " << table_delta.rows.size() << " bulk=" << bulk << "\n";
11651177
check_variant(row.data, variant_type, 0u);
11661178
string fields = "block_index, present";
11671179
string values = to_string(block_num) + sep(bulk) + sql_str(bulk, row.present);
11681180
for (auto& field : type.fields)
11691181
fill_value(bulk, false, t, "", fields, values, row.data, field);
11701182
write(block_num, t, pipeline, bulk, table_delta.name, fields, values);
1183+
++num_processed;
11711184
}
11721185
numRows += table_delta.rows.size();
11731186
}
@@ -1377,6 +1390,7 @@ int main(int argc, char** argv) {
13771390
ioc, vm["host"].as<string>(), vm["port"].as<string>(), vm["schema"].as<string>(),
13781391
vm.count("skip-to") ? vm["skip-to"].as<uint32_t>() : 0, vm.count("stop") ? vm["stop"].as<uint32_t>() : 0, vm.count("drop"),
13791392
vm.count("create"));
1393+
cerr.imbue(std::locale(""));
13801394
s->start();
13811395
ioc.run();
13821396
}

0 commit comments

Comments
 (0)