2025-06-17 21:04:43 +08:00

65 lines
1.8 KiB
Python

from sqlmodel import create_engine, SQLModel
from sqlalchemy.pool import QueuePool
from sqlalchemy import text
from app.models import UserRole, OrderStatus
import time
import redis
from redis import RedisError
def wait_for_db(max_retries=5, delay=5):
"""等待数据库连接可用"""
for i in range(max_retries):
try:
test_engine = create_engine(
"mysql+pymysql://root:123456@niit-node3/orders_db",
poolclass=QueuePool,
pool_size=10,
max_overflow=20,
pool_timeout=30,
pool_recycle=3600,
echo=False
)
with test_engine.connect() as conn:
conn.execute(text("SELECT 1"))
print(f"数据库连接测试成功 (尝试 {i+1}/{max_retries})")
return test_engine
except Exception as e:
if i == max_retries - 1:
raise
time.sleep(delay)
return None
def init_redis(max_retries=3, delay=1):
"""初始化Redis连接池"""
redis_pool = redis.ConnectionPool(
host='localhost',
port=6379,
db=0,
max_connections=10,
decode_responses=True
)
for i in range(max_retries):
try:
r = redis.Redis(connection_pool=redis_pool)
if r.ping():
print(f"Redis连接成功 (尝试 {i+1}/{max_retries})")
return redis_pool
except RedisError as e:
if i == max_retries - 1:
raise
time.sleep(delay)
return None
# 使用连接池的数据库引擎
engine = wait_for_db()
redis_pool = init_redis()
def get_redis():
"""获取Redis连接"""
return redis.Redis(connection_pool=redis_pool)
# 创建数据库表
def init_db():
SQLModel.metadata.create_all(engine)