Skip to content

Commit a8e8ecc

Browse files
authored
Track re-sync servers (#170)
* Track re-sync servers * dont change position * Interesting * ok pgbench tell me whats wrong * ok
1 parent 61eb7d7 commit a8e8ecc

File tree

12 files changed

+1265
-6
lines changed

12 files changed

+1265
-6
lines changed

integration/load_balancer/docker/primary.sh

100644100755
File mode changed.

integration/load_balancer/docker/replica.sh

100644100755
File mode changed.

integration/load_balancer/pgdog.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,4 @@ done
1414

1515
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
1616
pushd ${SCRIPT_DIR}/../../
17-
RUST_LOG=debug cargo run -- --config ${SCRIPT_DIR}/pgdog.toml --users ${SCRIPT_DIR}/users.toml
17+
cargo run -- --config ${SCRIPT_DIR}/pgdog.toml --users ${SCRIPT_DIR}/users.toml

integration/load_balancer/test_lb_asyncpy.py

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
import pytest_asyncio
44
from datetime import datetime
55
import json
6+
from time import sleep
7+
import random
8+
import asyncio
69

710
@pytest_asyncio.fixture
811
async def conn():
@@ -32,5 +35,48 @@ async def test_prepared_statements(conn):
3235
assert row[1] == "[email protected]"
3336
assert row[3] == json.dumps({"banned": False})
3437

35-
row = await conn.fetch("SELECT * FROM users WHERE id = $1", 1)
36-
assert row[0][1] == "[email protected]"
38+
for _ in range(3):
39+
try:
40+
row = await conn.fetch("SELECT * FROM users WHERE id = $1", 1)
41+
assert row[0][1] == "[email protected]"
42+
break
43+
except:
44+
# Replica lag
45+
sleep(1)
46+
47+
@pytest.mark.asyncio
48+
async def test_concurrent():
49+
pool = await asyncpg.create_pool("postgres://postgres:[email protected]:6432/postgres")
50+
tasks = []
51+
for _ in range(25):
52+
task = asyncio.create_task(concurrent(pool))
53+
tasks.append(task)
54+
for task in tasks:
55+
await task
56+
57+
async def concurrent(pool):
58+
for _ in range(25):
59+
async with pool.acquire() as conn:
60+
i = random.randint(1, 1_000_000_000)
61+
row = await conn.fetch("INSERT INTO users (id, created_at) VALUES ($1, NOW()) RETURNING id", i)
62+
assert row[0][0] == i
63+
64+
# Read from primary
65+
async with pool.acquire() as conn:
66+
async with conn.transaction():
67+
row = await conn.fetch("SELECT * FROM users WHERE id = $1", i)
68+
assert row[0][0] == i
69+
70+
async with pool.acquire() as conn:
71+
# Try read from replica
72+
for _ in range(3):
73+
try:
74+
row = await conn.fetch("SELECT * FROM users WHERE id = $1", i)
75+
assert row[0][0] == i
76+
break
77+
except Exception as e:
78+
assert "list index out of range" in str(e)
79+
sleep(1)
80+
81+
async with pool.acquire() as conn:
82+
await conn.execute("DELETE FROM users WHERE id = $1", i)

integration/python/test_sqlalchemy.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,6 @@ async def test_reads_writes(engines):
141141

142142

143143
@pytest.mark.asyncio
144-
@pytest.mark.skip
145144
async def test_write_in_read(engines):
146145
normal = engines[0]
147146

0 commit comments

Comments
 (0)