from confluent_kafka import Consumer, KafkaException import json import time import mysql.connector from mysql.connector import Error from collections import defaultdict # Kafka 配置 conf = { 'bootstrap.servers': 'niit-node3:9092', 'group.id': 'recommendation-group', 'auto.offset.reset': 'earliest' } consumer = Consumer(conf) consumer.subscribe(['orders']) # MySQL 配置 mysql_config = { 'host': '172.16.5.3', 'port': 3306, 'user': 'root', 'password': '123456', 'database': 'orders_db' } # 创建数据库连接 try: conn = mysql.connector.connect(**mysql_config) if conn.is_connected(): print(" 成功连接到 MySQL 数据库") cursor = conn.cursor() except Error as e: print(" 连接 MySQL 失败:", e) exit(1) # =================== 状态存储 window_duration = 300 # 5 分钟 current_window_start = int(time.time()) # 缓存当前窗口的商品评分:{ category -> { product -> (total_rating, count) } } product_ratings_window = defaultdict(lambda: defaultdict(lambda: [0, 0])) def get_top_n_products(ratings_dict, n=3): rated_products = [ (prod, total / cnt) for prod, (total, cnt) in ratings_dict.items() ] rated_products.sort(key=lambda x: x[1], reverse=True) return [p[0] for p in rated_products[:n]] # =================== 插入 MySQL 函数 def insert_top_products_to_mysql(category, top_products_list, window_end_time): try: top_products_json = json.dumps(top_products_list, ensure_ascii=False) timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(window_end_time)) query = """ INSERT INTO category_top_products (category, top_products, timestamp) VALUES (%s, %s, %s) """ cursor.execute(query, (category, top_products_json, timestamp)) conn.commit() print(f" 已插入数据库:[{category}] {top_products_json} @ {timestamp}") except Error as e: print(f" 插入数据库失败 [{category}]:", e) conn.rollback() # =================== 主循环 try: while True: msg = consumer.poll(timeout=1.0) current_time = int(time.time()) # 判断是否到了窗口结束时间 if current_time - current_window_start >= window_duration: print(f"\n 开始处理从 {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(current_window_start))} 到现在的时间窗口...") # 为每个类别生成 Top 3 推荐 for category, products in product_ratings_window.items(): top3 = get_top_n_products(products) insert_top_products_to_mysql(category, top3, current_time) # 清空窗口缓存,更新窗口起始时间 product_ratings_window.clear() current_window_start = current_time print(" 窗口已重置\n") # 如果有消息,则继续处理 if msg is None: continue if msg.error(): if msg.error().code() == KafkaException._PARTITION_EOF: print(f"Reached end of partition: {msg.topic()} [{msg.partition()}]") else: raise KafkaException(msg.error()) else: try: line = msg.value().decode('utf-8').strip() parts = line.split('\t') if len(parts) != 6: continue category, product, quantity, date, rating_str, is_valid = parts if is_valid != "Y": continue rating = int(rating_str) # 更新当前窗口评分 product_ratings_window[category][product][0] += rating product_ratings_window[category][product][1] += 1 except Exception as e: print("⚠ 解析消息错误:", str(e)) continue except KeyboardInterrupt: print("\n 停止消费者...") finally: consumer.close() cursor.close() conn.close() print(" 数据库和 Kafka 连接已关闭")