113 lines
3.5 KiB
Python
113 lines
3.5 KiB
Python
from confluent_kafka import Consumer, KafkaException
|
|
import json
|
|
import time
|
|
import mysql.connector
|
|
from mysql.connector import Error
|
|
|
|
# =================== Kafka 配置 ===================
|
|
conf = {
|
|
'bootstrap.servers': 'niit-node3:9092',
|
|
'group.id': 'recommendation-group',
|
|
'auto.offset.reset': 'earliest'
|
|
}
|
|
|
|
consumer = Consumer(conf)
|
|
consumer.subscribe(['orders'])
|
|
|
|
# =================== MySQL 配置 ===================
|
|
mysql_config = {
|
|
'host': '172.16.5.3',
|
|
'port': 3306,
|
|
'user': 'root',
|
|
'password': '123456',
|
|
'database': 'orders_db'
|
|
}
|
|
|
|
# 创建数据库连接
|
|
try:
|
|
conn = mysql.connector.connect(**mysql_config)
|
|
if conn.is_connected():
|
|
print("✅ 成功连接到 MySQL 数据库")
|
|
cursor = conn.cursor()
|
|
except Error as e:
|
|
print("❌ 连接 MySQL 失败:", e)
|
|
exit(1)
|
|
|
|
# =================== 状态存储 ===================
|
|
product_ratings = {}
|
|
|
|
def update_product_rating(category, product, rating):
|
|
if category not in product_ratings:
|
|
product_ratings[category] = {}
|
|
ratings = product_ratings[category]
|
|
if product in ratings:
|
|
total, cnt = ratings[product]
|
|
ratings[product] = (total + rating, cnt + 1)
|
|
else:
|
|
ratings[product] = (rating, 1)
|
|
|
|
def get_top_n_products(ratings_dict, n=3):
|
|
rated_products = [
|
|
(prod, total / cnt) for prod, (total, cnt) in ratings_dict.items()
|
|
]
|
|
rated_products.sort(key=lambda x: x[1], reverse=True)
|
|
return [p[0] for p in rated_products[:n]]
|
|
|
|
# =================== 插入 MySQL 函数 ===================
|
|
def insert_top_products_to_mysql(category, top_products_list):
|
|
try:
|
|
# 将商品列表转为 JSON 字符串
|
|
top_products_json = json.dumps(top_products_list, ensure_ascii=False)
|
|
query = """
|
|
INSERT INTO category_top_products (category, top_products)
|
|
VALUES (%s, %s)
|
|
"""
|
|
cursor.execute(query, (category, top_products_json))
|
|
conn.commit()
|
|
print(f"🟢 已插入数据库:[{category}] {top_products_json}")
|
|
except Error as e:
|
|
print(f"🔴 插入数据库失败 [{category}]:", e)
|
|
conn.rollback()
|
|
|
|
# =================== 主循环 ===================
|
|
try:
|
|
while True:
|
|
msg = consumer.poll(timeout=1.0)
|
|
if msg is None:
|
|
continue
|
|
if msg.error():
|
|
if msg.error().code() == KafkaException._PARTITION_EOF:
|
|
print(f"Reached end of partition: {msg.topic()} [{msg.partition()}]")
|
|
else:
|
|
raise KafkaException(msg.error())
|
|
else:
|
|
try:
|
|
line = msg.value().decode('utf-8').strip()
|
|
parts = line.split('\t')
|
|
if len(parts) != 6:
|
|
continue
|
|
category, product, quantity, date, rating_str, is_valid = parts
|
|
if is_valid != "Y":
|
|
continue
|
|
rating = int(rating_str)
|
|
|
|
update_product_rating(category, product, rating)
|
|
|
|
top3 = get_top_n_products(product_ratings[category])
|
|
print(f"[{category}] Top 3: {', '.join(top3)}")
|
|
|
|
# 插入 MySQL
|
|
insert_top_products_to_mysql(category, top3)
|
|
|
|
except Exception as e:
|
|
print("⚠️ 解析消息错误:", str(e))
|
|
continue
|
|
|
|
except KeyboardInterrupt:
|
|
print("\n🛑 停止消费者...")
|
|
|
|
finally:
|
|
consumer.close()
|
|
cursor.close()
|
|
conn.close()
|
|
print("🔌 数据库和 Kafka 连接已关闭") |