Skip to content

Commit e29e478

Browse files
committed
feat: Add support distributed deployment for Snowflake
Auto-assign node IDs via Redis to support multi-instance deployment and avoid ID conflicts
1 parent 636d867 commit e29e478

File tree

3 files changed

+203
-9
lines changed

3 files changed

+203
-9
lines changed

backend/core/conf.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,15 @@ class Settings(BaseSettings):
5252
# Redis
5353
REDIS_TIMEOUT: int = 5
5454

55+
# .env Snowflake
56+
SNOWFLAKE_CLUSTER_ID: int | None = None
57+
SNOWFLAKE_NODE_ID: int | None = None
58+
59+
# Snowflake
60+
SNOWFLAKE_REDIS_PREFIX: str = 'fba:snowflake'
61+
SNOWFLAKE_HEARTBEAT_INTERVAL: int = 30 # 心跳间隔(秒)
62+
SNOWFLAKE_NODE_TTL: int = 60 # 节点存活时间(秒)
63+
5564
# .env Token
5665
TOKEN_SECRET_KEY: str # 密钥 secrets.token_urlsafe(32)
5766

backend/core/registrar.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
from backend.utils.health_check import ensure_unique_route_names, http_limit_callback
3535
from backend.utils.openapi import simplify_operation_ids
3636
from backend.utils.serializers import MsgSpecJSONResponse
37+
from backend.utils.snowflake import snowflake
3738

3839

3940
@asynccontextmanager
@@ -57,11 +58,17 @@ async def register_init(app: FastAPI) -> AsyncGenerator[None, None]:
5758
http_callback=http_limit_callback,
5859
)
5960

61+
# 初始化 snowflake 节点
62+
await snowflake.initialize()
63+
6064
# 创建操作日志任务
6165
create_task(OperaLogMiddleware.consumer())
6266

6367
yield
6468

69+
# 释放 snowflake 节点
70+
await snowflake.shutdown()
71+
6572
# 关闭 redis 连接
6673
await redis_client.aclose()
6774

backend/utils/snowflake.py

Lines changed: 187 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
1+
import asyncio
2+
import os
13
import time
24

35
from dataclasses import dataclass
46

57
from backend.common.dataclasses import SnowflakeInfo
68
from backend.common.exception import errors
9+
from backend.common.log import log
710
from backend.core.conf import settings
11+
from backend.database.redis import RedisCli
812

913

1014
@dataclass(frozen=True)
@@ -35,31 +39,199 @@ class SnowflakeConfig:
3539
DEFAULT_SEQUENCE: int = 0
3640

3741

42+
class SnowflakeNodeManager:
43+
"""雪花算法节点管理器,负责从 Redis 分配和管理节点 ID"""
44+
45+
def __init__(self, redis_client: RedisCli) -> None:
46+
"""
47+
初始化节点管理器
48+
49+
:param redis_client: Redis 客户端实例
50+
"""
51+
self.redis_client = redis_client
52+
self.prefix = settings.SNOWFLAKE_REDIS_PREFIX
53+
self.cluster_id: int | None = None
54+
self.node_id: int | None = None
55+
self.heartbeat_task: asyncio.Task | None = None
56+
57+
async def acquire_node_id(self) -> tuple[int, int]:
58+
"""
59+
从 Redis 获取可用的 cluster_id 和 node_id
60+
61+
:return: (cluster_id, node_id)
62+
"""
63+
# 查找所有已占用的节点
64+
occupied_nodes = set()
65+
pattern = f'{self.prefix}:nodes:*'
66+
async for key in self.redis_client.scan_iter(match=pattern):
67+
# 解析 key: {prefix}:nodes:{cluster_id}:{node_id}
68+
parts = key.split(':')
69+
if len(parts) >= 5:
70+
try:
71+
cluster_id = int(parts[-2])
72+
node_id = int(parts[-1])
73+
occupied_nodes.add((cluster_id, node_id))
74+
except ValueError:
75+
continue
76+
77+
# 查找最小可用的 ID 组合
78+
for cluster_id in range(SnowflakeConfig.MAX_DATACENTER_ID + 1):
79+
for node_id in range(SnowflakeConfig.MAX_WORKER_ID + 1):
80+
# 尝试注册这个节点
81+
if (cluster_id, node_id) not in occupied_nodes and await self.register_node(cluster_id, node_id):
82+
return cluster_id, node_id
83+
84+
raise errors.ServerError(msg='无可用的雪花算法节点 ID,所有 ID 已被占用')
85+
86+
async def register_node(self, cluster_id: int, node_id: int) -> bool:
87+
"""
88+
注册节点并设置 TTL
89+
90+
:param cluster_id: 集群 ID
91+
:param node_id: 节点 ID
92+
:return: 注册成功返回 True,失败返回 False
93+
"""
94+
key = f'{self.prefix}:nodes:{cluster_id}:{node_id}'
95+
# 使用 SETNX 原子操作,只有 key 不存在时才设置
96+
# 存储进程信息用于调试
97+
value = f'pid:{os.getpid()}'
98+
success = await self.redis_client.set(key, value, nx=True, ex=settings.SNOWFLAKE_NODE_TTL)
99+
return bool(success)
100+
101+
async def release_node(self, cluster_id: int, node_id: int) -> None:
102+
"""
103+
释放节点 ID
104+
105+
:param cluster_id: 集群 ID
106+
:param node_id: 节点 ID
107+
"""
108+
key = f'{self.prefix}:nodes:{cluster_id}:{node_id}'
109+
await self.redis_client.delete(key)
110+
111+
async def heartbeat(self, cluster_id: int, node_id: int) -> None:
112+
"""
113+
心跳续期任务
114+
115+
:param cluster_id: 集群 ID
116+
:param node_id: 节点 ID
117+
"""
118+
key = f'{self.prefix}:nodes:{cluster_id}:{node_id}'
119+
try:
120+
while True:
121+
await asyncio.sleep(settings.SNOWFLAKE_HEARTBEAT_INTERVAL)
122+
try:
123+
# 续期 TTL
124+
await self.redis_client.expire(key, settings.SNOWFLAKE_NODE_TTL)
125+
log.debug(f'雪花算法节点心跳: cluster_id={cluster_id}, node_id={node_id}')
126+
except Exception as e:
127+
log.error(f'雪花算法节点心跳失败: {e}')
128+
except asyncio.CancelledError:
129+
log.info(f'雪花算法节点心跳任务取消: cluster_id={cluster_id}, node_id={node_id}')
130+
131+
async def start_heartbeat(self, cluster_id: int, node_id: int) -> None:
132+
"""
133+
启动心跳任务
134+
135+
:param cluster_id: 集群 ID
136+
:param node_id: 节点 ID
137+
"""
138+
self.cluster_id = cluster_id
139+
self.node_id = node_id
140+
self.heartbeat_task = asyncio.create_task(self.heartbeat(cluster_id, node_id))
141+
142+
async def stop_heartbeat(self) -> None:
143+
"""停止心跳任务"""
144+
if self.heartbeat_task:
145+
self.heartbeat_task.cancel()
146+
try:
147+
await self.heartbeat_task
148+
except asyncio.CancelledError:
149+
pass
150+
self.heartbeat_task = None
151+
152+
38153
class Snowflake:
39154
"""雪花算法类"""
40155

41156
def __init__(
42157
self,
43-
cluster_id: int = SnowflakeConfig.DEFAULT_DATACENTER_ID,
44-
node_id: int = SnowflakeConfig.DEFAULT_WORKER_ID,
158+
cluster_id: int | None = None,
159+
node_id: int | None = None,
45160
sequence: int = SnowflakeConfig.DEFAULT_SEQUENCE,
46161
) -> None:
47162
"""
48163
初始化雪花算法生成器
49164
50-
:param cluster_id: 集群 ID (0-31)
51-
:param node_id: 节点 ID (0-31)
165+
:param cluster_id: 集群 ID (0-31),None 表示自动分配
166+
:param node_id: 节点 ID (0-31),None 表示自动分配
52167
:param sequence: 起始序列号
53168
"""
54-
if cluster_id < 0 or cluster_id > SnowflakeConfig.MAX_DATACENTER_ID:
55-
raise errors.RequestError(msg=f'集群编号必须在 0-{SnowflakeConfig.MAX_DATACENTER_ID} 之间')
56-
if node_id < 0 or node_id > SnowflakeConfig.MAX_WORKER_ID:
57-
raise errors.RequestError(msg=f'节点编号必须在 0-{SnowflakeConfig.MAX_WORKER_ID} 之间')
58-
59169
self.node_id = node_id
60170
self.cluster_id = cluster_id
61171
self.sequence = sequence
62172
self.last_timestamp = -1
173+
self.node_manager: SnowflakeNodeManager | None = None
174+
self._initialized = False
175+
self._auto_allocated = False # 标记是否是自动分配的 ID
176+
177+
async def initialize(self) -> None:
178+
"""
179+
初始化雪花算法,从环境变量或 Redis 获取节点 ID
180+
"""
181+
if self._initialized:
182+
return
183+
184+
# 优先从环境变量读取配置
185+
env_cluster_id = settings.SNOWFLAKE_CLUSTER_ID
186+
env_node_id = settings.SNOWFLAKE_NODE_ID
187+
188+
if env_cluster_id is not None and env_node_id is not None:
189+
# 使用环境变量配置
190+
self.cluster_id = env_cluster_id
191+
self.node_id = env_node_id
192+
log.info(f'✅ 雪花算法使用环境变量配置: cluster_id={self.cluster_id}, node_id={self.node_id}')
193+
elif self.cluster_id is not None and self.node_id is not None:
194+
# 使用初始化时传入的配置
195+
log.info(f'✅ 雪花算法使用手动配置: cluster_id={self.cluster_id}, node_id={self.node_id}')
196+
else:
197+
# 从 Redis 自动分配
198+
from backend.database.redis import redis_client
199+
200+
self.node_manager = SnowflakeNodeManager(redis_client)
201+
self.cluster_id, self.node_id = await self.node_manager.acquire_node_id()
202+
self._auto_allocated = True
203+
log.info(
204+
f'✅ 雪花算法从 Redis 自动分配: cluster_id={self.cluster_id}, node_id={self.node_id}, pid={os.getpid()}'
205+
)
206+
207+
# 启动心跳任务
208+
await self.node_manager.start_heartbeat(self.cluster_id, self.node_id)
209+
210+
# 验证 ID 范围
211+
if self.cluster_id < 0 or self.cluster_id > SnowflakeConfig.MAX_DATACENTER_ID:
212+
raise errors.RequestError(msg=f'集群编号必须在 0-{SnowflakeConfig.MAX_DATACENTER_ID} 之间')
213+
if self.node_id < 0 or self.node_id > SnowflakeConfig.MAX_WORKER_ID:
214+
raise errors.RequestError(msg=f'节点编号必须在 0-{SnowflakeConfig.MAX_WORKER_ID} 之间')
215+
216+
self._initialized = True
217+
218+
async def shutdown(self) -> None:
219+
"""
220+
关闭雪花算法,释放节点 ID
221+
"""
222+
if not self._initialized:
223+
return
224+
225+
if self.node_manager and self._auto_allocated:
226+
# 停止心跳
227+
await self.node_manager.stop_heartbeat()
228+
229+
# 释放节点
230+
if self.cluster_id is not None and self.node_id is not None:
231+
await self.node_manager.release_node(self.cluster_id, self.node_id)
232+
log.info(f'✅ 雪花算法节点已释放: cluster_id={self.cluster_id}, node_id={self.node_id}')
233+
234+
self._initialized = False
63235

64236
@staticmethod
65237
def _current_millis() -> int:
@@ -81,6 +253,12 @@ def _next_millis(self, last_timestamp: int) -> int:
81253

82254
def generate(self) -> int:
83255
"""生成雪花 ID"""
256+
if not self._initialized:
257+
raise errors.ServerError(msg='雪花算法未初始化,请先调用 initialize() 方法')
258+
259+
if self.cluster_id is None or self.node_id is None:
260+
raise errors.ServerError(msg='雪花算法节点 ID 未设置')
261+
84262
timestamp = self._current_millis()
85263

86264
if timestamp < self.last_timestamp:

0 commit comments

Comments
 (0)