Skip to content

Commit 059509a

Browse files
Natan16Natan Vianafelixwang9817
authored
fix: Implements connection pool for postgres online store (feast-dev#3633)
* Implements connection pool for postgres online store Signed-off-by: Natan Viana <[email protected]> * Connection type configurable Signed-off-by: Natan Viana <[email protected]> * Postgres online store with connection type configurable Signed-off-by: Natan Viana <[email protected]> * Fix imports Signed-off-by: Natan Viana <[email protected]> * Writes integration test to validate postgres connection pool Signed-off-by: Natan Viana <[email protected]> * Fix Signed-off-by: Felix Wang <[email protected]> * Fix Signed-off-by: Felix Wang <[email protected]> * Fix Signed-off-by: Felix Wang <[email protected]> --------- Signed-off-by: Natan Viana <[email protected]> Signed-off-by: Felix Wang <[email protected]> Co-authored-by: Natan Viana <[email protected]> Co-authored-by: Felix Wang <[email protected]>
1 parent 5dc9d9e commit 059509a

File tree

5 files changed

+99
-19
lines changed

5 files changed

+99
-19
lines changed

sdk/python/feast/infra/online_stores/contrib/postgres.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import contextlib
12
import logging
23
from collections import defaultdict
34
from datetime import datetime
@@ -7,14 +8,15 @@
78
import pytz
89
from psycopg2 import sql
910
from psycopg2.extras import execute_values
11+
from psycopg2.pool import SimpleConnectionPool
1012
from pydantic.schema import Literal
1113

1214
from feast import Entity
1315
from feast.feature_view import FeatureView
1416
from feast.infra.key_encoding_utils import serialize_entity_key
1517
from feast.infra.online_stores.online_store import OnlineStore
16-
from feast.infra.utils.postgres.connection_utils import _get_conn
17-
from feast.infra.utils.postgres.postgres_config import PostgreSQLConfig
18+
from feast.infra.utils.postgres.connection_utils import _get_conn, _get_connection_pool
19+
from feast.infra.utils.postgres.postgres_config import ConnectionType, PostgreSQLConfig
1820
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
1921
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
2022
from feast.repo_config import RepoConfig
@@ -27,12 +29,21 @@ class PostgreSQLOnlineStoreConfig(PostgreSQLConfig):
2729

2830
class PostgreSQLOnlineStore(OnlineStore):
2931
_conn: Optional[psycopg2._psycopg.connection] = None
32+
_conn_pool: Optional[SimpleConnectionPool] = None
3033

34+
@contextlib.contextmanager
3135
def _get_conn(self, config: RepoConfig):
32-
if not self._conn:
33-
assert config.online_store.type == "postgres"
34-
self._conn = _get_conn(config.online_store)
35-
return self._conn
36+
assert config.online_store.type == "postgres"
37+
if config.online_store.conn_type == ConnectionType.pool:
38+
if not self._conn_pool:
39+
self._conn_pool = _get_connection_pool(config.online_store)
40+
connection = self._conn_pool.getconn()
41+
yield connection
42+
self._conn_pool.putconn(connection)
43+
else:
44+
if not self._conn:
45+
self._conn = _get_conn(config.online_store)
46+
yield self._conn
3647

3748
@log_exceptions_and_usage(online_store="postgres")
3849
def online_write_batch(

sdk/python/feast/infra/utils/postgres/connection_utils.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import psycopg2
66
import psycopg2.extras
77
import pyarrow as pa
8+
from psycopg2.pool import SimpleConnectionPool
89

910
from feast.infra.utils.postgres.postgres_config import PostgreSQLConfig
1011
from feast.type_map import arrow_to_pg_type
@@ -22,10 +23,28 @@ def _get_conn(config: PostgreSQLConfig):
2223
sslcert=config.sslcert_path,
2324
sslrootcert=config.sslrootcert_path,
2425
options="-c search_path={}".format(config.db_schema or config.user),
26+
keepalives_idle=config.keepalives_idle,
2527
)
2628
return conn
2729

2830

31+
def _get_connection_pool(config: PostgreSQLConfig):
32+
return SimpleConnectionPool(
33+
config.min_conn,
34+
config.max_conn,
35+
dbname=config.database,
36+
host=config.host,
37+
port=int(config.port),
38+
user=config.user,
39+
password=config.password,
40+
sslmode=config.sslmode,
41+
sslkey=config.sslkey_path,
42+
sslcert=config.sslcert_path,
43+
sslrootcert=config.sslrootcert_path,
44+
options="-c search_path={}".format(config.db_schema or config.user),
45+
)
46+
47+
2948
def _df_to_create_table_sql(entity_df, table_name) -> str:
3049
pa_table = pa.Table.from_pandas(entity_df)
3150
columns = [

sdk/python/feast/infra/utils/postgres/postgres_config.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,20 @@
1+
from enum import Enum
12
from typing import Optional
23

34
from pydantic import StrictStr
45

56
from feast.repo_config import FeastConfigBaseModel
67

78

9+
class ConnectionType(Enum):
10+
singleton = "singleton"
11+
pool = "pool"
12+
13+
814
class PostgreSQLConfig(FeastConfigBaseModel):
15+
min_conn: int = 1
16+
max_conn: int = 10
17+
conn_type: ConnectionType = ConnectionType.singleton
918
host: StrictStr
1019
port: int = 5432
1120
database: StrictStr
@@ -16,3 +25,4 @@ class PostgreSQLConfig(FeastConfigBaseModel):
1625
sslkey_path: Optional[StrictStr] = None
1726
sslcert_path: Optional[StrictStr] = None
1827
sslrootcert_path: Optional[StrictStr] = None
28+
keepalives_idle: int = 0

sdk/python/tests/conftest.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,3 +393,17 @@ def feature_store_for_online_retrieval(
393393
]
394394

395395
return fs, feature_refs, entity_rows
396+
397+
398+
@pytest.fixture
399+
def fake_ingest_data():
400+
"""Fake data to ingest into the feature store"""
401+
data = {
402+
"driver_id": [1],
403+
"conv_rate": [0.5],
404+
"acc_rate": [0.6],
405+
"avg_daily_trips": [4],
406+
"event_timestamp": [pd.Timestamp(datetime.utcnow()).round("ms")],
407+
"created": [pd.Timestamp(datetime.utcnow()).round("ms")],
408+
}
409+
return pd.DataFrame(data)

sdk/python/tests/integration/online_store/test_universal_online.py

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from feast.feature_service import FeatureService
1818
from feast.feature_view import FeatureView
1919
from feast.field import Field
20+
from feast.infra.utils.postgres.postgres_config import ConnectionType
2021
from feast.online_response import TIMESTAMP_POSTFIX
2122
from feast.types import Float32, Int32, String
2223
from feast.wait import wait_retry_backoff
@@ -32,9 +33,45 @@
3233
from tests.utils.data_source_test_creator import prep_file_source
3334

3435

36+
@pytest.mark.integration
37+
@pytest.mark.universal_online_stores(only=["postgres"])
38+
def test_connection_pool_online_stores(
39+
environment, universal_data_sources, fake_ingest_data
40+
):
41+
if os.getenv("FEAST_IS_LOCAL_TEST", "False") == "True":
42+
return
43+
fs = environment.feature_store
44+
fs.config.online_store.conn_type = ConnectionType.pool
45+
fs.config.online_store.min_conn = 1
46+
fs.config.online_store.max_conn = 10
47+
48+
entities, datasets, data_sources = universal_data_sources
49+
driver_hourly_stats = create_driver_hourly_stats_feature_view(data_sources.driver)
50+
driver_entity = driver()
51+
52+
# Register Feature View and Entity
53+
fs.apply([driver_hourly_stats, driver_entity])
54+
55+
# directly ingest data into the Online Store
56+
fs.write_to_online_store("driver_stats", fake_ingest_data)
57+
58+
# assert the right data is in the Online Store
59+
df = fs.get_online_features(
60+
features=[
61+
"driver_stats:avg_daily_trips",
62+
"driver_stats:acc_rate",
63+
"driver_stats:conv_rate",
64+
],
65+
entity_rows=[{"driver_id": 1}],
66+
).to_df()
67+
assertpy.assert_that(df["avg_daily_trips"].iloc[0]).is_equal_to(4)
68+
assertpy.assert_that(df["acc_rate"].iloc[0]).is_close_to(0.6, 1e-6)
69+
assertpy.assert_that(df["conv_rate"].iloc[0]).is_close_to(0.5, 1e-6)
70+
71+
3572
@pytest.mark.integration
3673
@pytest.mark.universal_online_stores(only=["redis"])
37-
def test_entity_ttl_online_store(environment, universal_data_sources):
74+
def test_entity_ttl_online_store(environment, universal_data_sources, fake_ingest_data):
3875
if os.getenv("FEAST_IS_LOCAL_TEST", "False") == "True":
3976
return
4077
fs = environment.feature_store
@@ -47,19 +84,8 @@ def test_entity_ttl_online_store(environment, universal_data_sources):
4784
# Register Feature View and Entity
4885
fs.apply([driver_hourly_stats, driver_entity])
4986

50-
# fake data to ingest into Online Store
51-
data = {
52-
"driver_id": [1],
53-
"conv_rate": [0.5],
54-
"acc_rate": [0.6],
55-
"avg_daily_trips": [4],
56-
"event_timestamp": [pd.Timestamp(datetime.datetime.utcnow()).round("ms")],
57-
"created": [pd.Timestamp(datetime.datetime.utcnow()).round("ms")],
58-
}
59-
df_ingest = pd.DataFrame(data)
60-
6187
# directly ingest data into the Online Store
62-
fs.write_to_online_store("driver_stats", df_ingest)
88+
fs.write_to_online_store("driver_stats", fake_ingest_data)
6389

6490
# assert the right data is in the Online Store
6591
df = fs.get_online_features(

0 commit comments

Comments
 (0)