Skip to content

Commit 882339d

Browse files
committed
move reconnect to degensql
1 parent cf26ae0 commit 882339d

File tree

5 files changed

+28
-83
lines changed

5 files changed

+28
-83
lines changed

Cargo.lock

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "vibegraph"
3-
version = "0.3.11"
3+
version = "0.3.12"
44
edition = "2021"
55
default-run = "vibegraph"
66
description = "Reads ethereum contract events from a lightweight RPC and caches them to a database"
@@ -30,7 +30,7 @@ thiserror = "1.0.49"
3030
include_dir = "0.7.3"
3131
inquire = "0.6.2"
3232

33-
degen-sql = "0.1.10"
33+
degen-sql = "0.1.12"
3434
#degen-sql={path="../degen-sql"}
3535

3636
rust_decimal = { version = "1.33.1", features = ["db-tokio-postgres"] }

src/db/postgres/models/events_model.rs

Lines changed: 11 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ impl EventsModel {
3030
pub async fn insert_one(
3131
event: &ContractEvent ,
3232

33-
psql_db: &Database,
33+
psql_db: &mut Database,
3434
) -> Result<i32, PostgresModelError> {
3535

3636

@@ -73,9 +73,8 @@ impl EventsModel {
7373

7474

7575
// Set a timeout (e.g., 5 seconds)
76-
let insert_result = timeout(
77-
Duration::from_secs(5), // Set timeout duration
78-
psql_db.query_one(
76+
let insert_result =
77+
psql_db.query_one_with_reconnect(
7978
"
8079
INSERT INTO events
8180
(
@@ -107,79 +106,25 @@ impl EventsModel {
107106
&log_index,
108107
&transaction_index
109108
],
110-
),
109+
3
110+
111111
).await;
112112

113-
match insert_result {
114-
Ok(Ok(row)) => Ok(row.get(0)), // Successfully inserted and retrieved ID
115-
Ok(Err(e)) => {
116-
eprintln!("Database error: {:?}", e);
117-
Err(PostgresModelError::Postgres(e))
118-
}
119-
Err(_) => {
120-
eprintln!("Database timeout occurred.");
121-
Err(PostgresModelError::Timeout) // You may need to define a Timeout variant in PostgresModelError
122-
}
123-
}
124-
/*
125-
126-
let insert_result = psql_db.query_one(
127-
"
128-
INSERT INTO events
129-
(
130-
contract_address,
131-
name,
132-
signature,
133-
args,
134-
data,
135-
chain_id,
136-
transaction_hash,
137-
block_number,
138-
block_hash,
139-
log_index,
140-
transaction_index
141-
)
142-
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
143-
RETURNING id;
144-
",
145-
&[
146-
&contract_address,
147-
&name,
148-
&signature,
149-
&args,
150-
&data,
151-
&chain_id,
152-
&transaction_hash,
153-
&block_number,
154-
&block_hash,
155-
&log_index ,
156-
&transaction_index
157-
],
158-
).await;
159-
160-
match insert_result {
161-
Ok(row) => Ok(row.get(0)), // Successfully inserted new row and got its ID.
162-
Err(e) => {
163-
eprintln!("Database error: Event {:?}", e);
164-
165-
Err( PostgresModelError::Postgres(e) )
166-
167-
168-
}
169-
}*/
113+
insert_result.map(|r| r.get(0))
114+
170115
}
171116

172117

173118
pub async fn find_most_recent_event(
174119
contract_address: Address,
175-
psql_db: &Database,
120+
psql_db: &mut Database,
176121
) -> Result< ContractEvent, PostgresModelError> {
177122

178123

179124
let parsed_contract_address = to_checksum(&contract_address, None).to_string();
180125

181126

182-
let row = psql_db.query_one(
127+
let row = psql_db.query_one_with_reconnect(
183128
"
184129
SELECT
185130
contract_address,
@@ -200,6 +145,7 @@ pub async fn find_most_recent_event(
200145
LIMIT 1;
201146
",
202147
&[&parsed_contract_address],
148+
3
203149
).await;
204150

205151
match row {
@@ -237,7 +183,7 @@ pub async fn find_most_recent_event(
237183
},
238184
Err(e) => {
239185
eprintln!("Database error: Recent Event {:?}", e);
240-
Err(PostgresModelError::Postgres(e))
186+
Err( e)
241187
}
242188
}
243189
}

src/event.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ pub fn try_identify_event_for_log(
194194

195195
pub async fn find_most_recent_event_blocknumber(
196196
contract_address: Address,
197-
psql_db: &Database
197+
psql_db: &mut Database
198198
) -> Option<U64> {
199199

200200

src/lib.rs

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -309,44 +309,43 @@ async fn collect_events(
309309

310310

311311

312-
let mut encountered_insertion_timeout = false;
313312

314313
for event_log in event_logs {
315314

316315
info!("decoded event log {:?}", event_log);
317316

318317
// let psql_db = &app_state.database;
319318

320-
let psql_db = app_state.database.lock().await; // Lock database correctly
319+
let mut psql_db = app_state.database.lock().await; // Lock database correctly
321320

322-
let inserted = EventsModel::insert_one(&event_log, &*psql_db ).await;
321+
let inserted = EventsModel::insert_one(&event_log, &mut *psql_db ).await;
323322

324323
info!("inserted {:?}", inserted);
325324

326-
if inserted.is_err_and( |e| e == PostgresModelError::Timeout ) {
327-
encountered_insertion_timeout = true ;
328-
}
325+
// if inserted.is_err_and( |e| e == PostgresModelError::Timeout ) {
326+
// encountered_insertion_timeout = true ;
327+
// }
329328

330329

331330
}
332331

333-
if !encountered_insertion_timeout {
332+
// if !encountered_insertion_timeout {
334333
//progress the current indexing block
335334
app_state.indexing_state.current_indexing_block = end_block + 1;
336-
}else {
335+
/* }else {
337336
warn!("Encountered a timeout with the database- retrying");
338337
339338
// Unlock database first before reconnecting
340339
// drop(psql_db);
341340
342341
// Reconnect and update `app_state.database`
343342
let mut db_lock = app_state.database.lock().await;
344-
if let Err(e) = db_lock.reconnect( app_config.db_conn_url.clone() ).await {
343+
if let Err(e) = db_lock.reconnect( ).await {
345344
eprintln!("Database reconnection failed: {:?}", e);
346345
} else {
347346
info!("Database reconnected successfully.");
348347
}
349-
}
348+
}*/
350349
// there there was an error, we are going to cycle this period again .
351350

352351

@@ -390,8 +389,8 @@ async fn initialize(
390389

391390

392391
let most_recent_event_blocknumber = {
393-
let psql_db = app_state.database.lock().await;
394-
find_most_recent_event_blocknumber(contract_address, &psql_db).await
392+
let mut psql_db = app_state.database.lock().await;
393+
find_most_recent_event_blocknumber(contract_address, &mut psql_db).await
395394
}; // `psql_db` is dropped here to avoid deadlock later
396395

397396

0 commit comments

Comments
 (0)