from confluent_kafka import Consumer, KafkaException import json import time import mysql.connector from mysql.connector import Error # =================== Kafka 配置 =================== conf = { 'bootstrap.servers': 'niit-node3:9092', 'group.id': 'recommendation-group', 'auto.offset.reset': 'earliest' } consumer = Consumer(conf) consumer.subscribe(['orders']) # =================== MySQL 配置 =================== mysql_config = { 'host': '172.16.5.3', 'port': 3306, 'user': 'root', 'password': '123456', 'database': 'orders_db' } # 创建数据库连接 try: conn = mysql.connector.connect(**mysql_config) if conn.is_connected(): print("✅ 成功连接到 MySQL 数据库") cursor = conn.cursor() except Error as e: print("❌ 连接 MySQL 失败:", e) exit(1) # =================== 状态存储 =================== product_ratings = {} def update_product_rating(category, product, rating): if category not in product_ratings: product_ratings[category] = {} ratings = product_ratings[category] if product in ratings: total, cnt = ratings[product] ratings[product] = (total + rating, cnt + 1) else: ratings[product] = (rating, 1) def get_top_n_products(ratings_dict, n=3): rated_products = [ (prod, total / cnt) for prod, (total, cnt) in ratings_dict.items() ] rated_products.sort(key=lambda x: x[1], reverse=True) return [p[0] for p in rated_products[:n]] # =================== 插入 MySQL 函数 =================== def insert_top_products_to_mysql(category, top_products_list): try: # 将商品列表转为 JSON 字符串 top_products_json = json.dumps(top_products_list, ensure_ascii=False) query = """ INSERT INTO category_top_products (category, top_products) VALUES (%s, %s) """ cursor.execute(query, (category, top_products_json)) conn.commit() print(f"🟢 已插入数据库:[{category}] {top_products_json}") except Error as e: print(f"🔴 插入数据库失败 [{category}]:", e) conn.rollback() # =================== 主循环 =================== try: while True: msg = consumer.poll(timeout=1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaException._PARTITION_EOF: print(f"Reached end of partition: {msg.topic()} [{msg.partition()}]") else: raise KafkaException(msg.error()) else: try: line = msg.value().decode('utf-8').strip() parts = line.split('\t') if len(parts) != 6: continue category, product, quantity, date, rating_str, is_valid = parts if is_valid != "Y": continue rating = int(rating_str) update_product_rating(category, product, rating) top3 = get_top_n_products(product_ratings[category]) print(f"[{category}] Top 3: {', '.join(top3)}") # 插入 MySQL insert_top_products_to_mysql(category, top3) except Exception as e: print("⚠️ 解析消息错误:", str(e)) continue except KeyboardInterrupt: print("\n🛑 停止消费者...") finally: consumer.close() cursor.close() conn.close() print("🔌 数据库和 Kafka 连接已关闭")