top/cf2.py
2025-06-17 20:02:26 +08:00

113 lines
3.5 KiB
Python

from confluent_kafka import Consumer, KafkaException
import json
import time
import mysql.connector
from mysql.connector import Error
# =================== Kafka 配置 ===================
conf = {
'bootstrap.servers': 'niit-node3:9092',
'group.id': 'recommendation-group',
'auto.offset.reset': 'earliest'
}
consumer = Consumer(conf)
consumer.subscribe(['orders'])
# =================== MySQL 配置 ===================
mysql_config = {
'host': '172.16.5.3',
'port': 3306,
'user': 'root',
'password': '123456',
'database': 'orders_db'
}
# 创建数据库连接
try:
conn = mysql.connector.connect(**mysql_config)
if conn.is_connected():
print("✅ 成功连接到 MySQL 数据库")
cursor = conn.cursor()
except Error as e:
print("❌ 连接 MySQL 失败:", e)
exit(1)
# =================== 状态存储 ===================
product_ratings = {}
def update_product_rating(category, product, rating):
if category not in product_ratings:
product_ratings[category] = {}
ratings = product_ratings[category]
if product in ratings:
total, cnt = ratings[product]
ratings[product] = (total + rating, cnt + 1)
else:
ratings[product] = (rating, 1)
def get_top_n_products(ratings_dict, n=3):
rated_products = [
(prod, total / cnt) for prod, (total, cnt) in ratings_dict.items()
]
rated_products.sort(key=lambda x: x[1], reverse=True)
return [p[0] for p in rated_products[:n]]
# =================== 插入 MySQL 函数 ===================
def insert_top_products_to_mysql(category, top_products_list):
try:
# 将商品列表转为 JSON 字符串
top_products_json = json.dumps(top_products_list, ensure_ascii=False)
query = """
INSERT INTO category_top_products (category, top_products)
VALUES (%s, %s)
"""
cursor.execute(query, (category, top_products_json))
conn.commit()
print(f"🟢 已插入数据库:[{category}] {top_products_json}")
except Error as e:
print(f"🔴 插入数据库失败 [{category}]:", e)
conn.rollback()
# =================== 主循环 ===================
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaException._PARTITION_EOF:
print(f"Reached end of partition: {msg.topic()} [{msg.partition()}]")
else:
raise KafkaException(msg.error())
else:
try:
line = msg.value().decode('utf-8').strip()
parts = line.split('\t')
if len(parts) != 6:
continue
category, product, quantity, date, rating_str, is_valid = parts
if is_valid != "Y":
continue
rating = int(rating_str)
update_product_rating(category, product, rating)
top3 = get_top_n_products(product_ratings[category])
print(f"[{category}] Top 3: {', '.join(top3)}")
# 插入 MySQL
insert_top_products_to_mysql(category, top3)
except Exception as e:
print("⚠️ 解析消息错误:", str(e))
continue
except KeyboardInterrupt:
print("\n🛑 停止消费者...")
finally:
consumer.close()
cursor.close()
conn.close()
print("🔌 数据库和 Kafka 连接已关闭")