from sqlmodel import create_engine, SQLModel from sqlalchemy.pool import QueuePool from sqlalchemy import text from app.models import UserRole, OrderStatus import time import redis from redis import RedisError def wait_for_db(max_retries=5, delay=5): """等待数据库连接可用""" for i in range(max_retries): try: test_engine = create_engine( "mysql+pymysql://root:123456@niit-node3/orders_db", poolclass=QueuePool, pool_size=10, max_overflow=20, pool_timeout=30, pool_recycle=3600, echo=False ) with test_engine.connect() as conn: conn.execute(text("SELECT 1")) print(f"数据库连接测试成功 (尝试 {i+1}/{max_retries})") return test_engine except Exception as e: if i == max_retries - 1: raise time.sleep(delay) return None def init_redis(max_retries=3, delay=1): """初始化Redis连接池""" redis_pool = redis.ConnectionPool( host='localhost', port=6379, db=0, max_connections=10, decode_responses=True ) for i in range(max_retries): try: r = redis.Redis(connection_pool=redis_pool) if r.ping(): print(f"Redis连接成功 (尝试 {i+1}/{max_retries})") return redis_pool except RedisError as e: if i == max_retries - 1: raise time.sleep(delay) return None # 使用连接池的数据库引擎 engine = wait_for_db() redis_pool = init_redis() def get_redis(): """获取Redis连接""" return redis.Redis(connection_pool=redis_pool) # 创建数据库表 def init_db(): SQLModel.metadata.create_all(engine)