Skip to content

Commit 6712596

Browse files
committed
add
1 parent 6ae6c6c commit 6712596

File tree

6 files changed

+180
-9
lines changed

6 files changed

+180
-9
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ thiserror = "1.0.49"
3030
include_dir = "0.7.3"
3131
inquire = "0.6.2"
3232

33-
degen-sql = "0.1.12"
33+
degen-sql = "0.1.13"
3434
#degen-sql={path="../degen-sql"}
3535

3636
rust_decimal = { version = "1.33.1", features = ["db-tokio-postgres"] }

src/db/postgres/migrations/0001_create_event_indexers.up.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ CREATE TABLE event_indexers (
66
contract_name VARCHAR(255) NOT NULL,
77
contract_address VARCHAR(255) NOT NULL,
88

9-
chain_id BIGINT ,
9+
chain_id BIGINT NOT NULL ,
1010

1111
start_block BIGINT NOT NULL ,
1212

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
use tokio_postgres::Row;
2+
use ethers::types::H256;
3+
use ethers::types::{Address, U256, U64};
4+
5+
6+
7+
use rust_decimal::Decimal;
8+
use rust_decimal::prelude::ToPrimitive;
9+
use log::info;
10+
use serde_json;
11+
use std::str::FromStr;
12+
use tokio::time::timeout;
13+
use std::time::Duration;
14+
15+
use crate::event::ContractEvent;
16+
use degen_sql::db::postgres::models::model::PostgresModelError;
17+
use degen_sql::db::postgres::postgres_db::Database;
18+
19+
20+
21+
22+
pub struct EventIndexer {
23+
pub id: u64,
24+
pub contract_name: String,
25+
pub contract_address: Address,
26+
pub chain_id: u64,
27+
pub start_block: u64,
28+
pub current_indexing_block: Option<u64>,
29+
pub synced: bool,
30+
// pub created_at: DateTime<Utc>,
31+
}
32+
33+
impl EventIndexer {
34+
35+
36+
pub fn from_row(row: &Row) -> Result<Self, PostgresModelError>{
37+
38+
let contract_address = Address::from_str(&row.get::<_, String>("contract_address"))
39+
.map_err(|e| PostgresModelError::RowParseError(format!("Invalid contract address: {:?}", e).into()))?;
40+
41+
42+
43+
Ok( Self{
44+
id: (row.get::<_, i64>("id")) as u64 ,
45+
contract_name: row.get("contract_name"),
46+
contract_address ,
47+
48+
chain_id: (row.get::<_, i64>("chain_id")) as u64 ,
49+
start_block: (row.get::<_, i64>("start_block")) as u64 ,
50+
51+
52+
current_indexing_block: (row.try_get::<_, i64>("current_indexing_block")) .ok().map(|i| i as u64) ,
53+
synced: (row.get::<_, bool>("synced")) ,
54+
55+
56+
57+
58+
})
59+
60+
61+
}
62+
63+
64+
65+
}
66+
67+
pub struct EventIndexerModel {}
68+
69+
impl EventIndexerModel {
70+
71+
72+
pub async fn find_next_event_indexer(
73+
offset_indexer_id:i64,
74+
psql_db: &mut Database,
75+
) -> Result< EventIndexer, PostgresModelError> {
76+
77+
let query = "
78+
SELECT id, contract_name, contract_address, chain_id, start_block, current_indexing_block, synced, created_at
79+
FROM event_indexers
80+
WHERE id > $1
81+
ORDER BY id ASC
82+
LIMIT 1;
83+
";
84+
85+
let row = psql_db.query_one_with_reconnect(query, &[&offset_indexer_id]).await?;
86+
87+
88+
89+
EventIndexer::from_row(&row)
90+
91+
}
92+
93+
94+
95+
96+
pub async fn insert_event_indexer(
97+
event: &ContractEvent,
98+
psql_db: &mut Database,
99+
) -> Result<i32, PostgresModelError> {
100+
let contract_address = event.address.to_string();
101+
let contract_name = &event.name;
102+
103+
let chain_id = event.chain_id as i64 ;
104+
let start_block = event.block_number.map(|num| num.as_u64() as i64);
105+
106+
let result = psql_db.query_one_with_reconnect(
107+
"
108+
INSERT INTO event_indexers (
109+
contract_name,
110+
contract_address,
111+
chain_id,
112+
start_block
113+
) VALUES ($1, $2, $3, $4)
114+
RETURNING id;
115+
",
116+
&[
117+
&contract_name,
118+
&contract_address,
119+
&chain_id,
120+
&start_block,
121+
]
122+
123+
).await;
124+
125+
result.map(|row| row.get(0))
126+
}
127+
128+
pub async fn update_current_indexing_block(
129+
indexer_id: i32,
130+
current_block: i64,
131+
psql_db: &mut Database,
132+
) -> Result<(), PostgresModelError> {
133+
psql_db.execute_with_reconnect(
134+
"
135+
UPDATE event_indexers
136+
SET current_indexing_block = $1
137+
WHERE id = $2;
138+
",
139+
&[&current_block, &indexer_id]
140+
141+
).await?;
142+
143+
Ok(())
144+
}
145+
146+
pub async fn update_is_synced(
147+
indexer_id: i32,
148+
is_synced: bool,
149+
psql_db: &mut Database,
150+
) -> Result<(), PostgresModelError> {
151+
psql_db.execute_with_reconnect(
152+
"
153+
UPDATE event_indexers
154+
SET synced = $1
155+
WHERE id = $2;
156+
",
157+
&[&is_synced,&indexer_id]
158+
159+
).await?;
160+
161+
Ok(())
162+
}
163+
}
164+
165+
fn decimal_to_u64(input: &Decimal) -> Option<U64> {
166+
input.to_u128().and_then(|val| {
167+
if val > u64::MAX as u128 {
168+
None
169+
} else {
170+
Some(U64::from(val as u64))
171+
}
172+
})
173+
}

src/db/postgres/models/events_model.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,7 @@ impl EventsModel {
105105
&block_hash,
106106
&log_index,
107107
&transaction_index
108-
],
109-
3
108+
]
110109

111110
).await;
112111

@@ -144,8 +143,7 @@ pub async fn find_most_recent_event(
144143
ORDER BY created_at DESC
145144
LIMIT 1;
146145
",
147-
&[&parsed_contract_address],
148-
3
146+
&[&parsed_contract_address]
149147
).await;
150148

151149
match row {

src/db/postgres/models/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
1+
pub mod event_indexer_model;
22

33
pub mod events_model;

0 commit comments

Comments
 (0)