121 lines
4.1 KiB
Python
121 lines
4.1 KiB
Python
from confluent_kafka import Consumer, KafkaException
|
||
import json
|
||
import time
|
||
import mysql.connector
|
||
from mysql.connector import Error
|
||
from collections import defaultdict
|
||
|
||
# Kafka 配置
|
||
conf = {
|
||
'bootstrap.servers': 'niit-node3:9092',
|
||
'group.id': 'recommendation-group',
|
||
'auto.offset.reset': 'earliest'
|
||
}
|
||
|
||
consumer = Consumer(conf)
|
||
consumer.subscribe(['orders'])
|
||
|
||
# MySQL 配置
|
||
mysql_config = {
|
||
'host': '172.16.5.3',
|
||
'port': 3306,
|
||
'user': 'root',
|
||
'password': '123456',
|
||
'database': 'orders_db'
|
||
}
|
||
|
||
# 创建数据库连接
|
||
try:
|
||
conn = mysql.connector.connect(**mysql_config)
|
||
if conn.is_connected():
|
||
print(" 成功连接到 MySQL 数据库")
|
||
cursor = conn.cursor()
|
||
except Error as e:
|
||
print(" 连接 MySQL 失败:", e)
|
||
exit(1)
|
||
|
||
# =================== 状态存储
|
||
window_duration = 300 # 5 分钟
|
||
current_window_start = int(time.time())
|
||
|
||
# 缓存当前窗口的商品评分:{ category -> { product -> (total_rating, count) } }
|
||
product_ratings_window = defaultdict(lambda: defaultdict(lambda: [0, 0]))
|
||
|
||
def get_top_n_products(ratings_dict, n=3):
|
||
rated_products = [
|
||
(prod, total / cnt) for prod, (total, cnt) in ratings_dict.items()#用字典,剑是商品名,值是总评分,次数
|
||
]
|
||
rated_products.sort(key=lambda x: x[1], reverse=True)
|
||
return [p[0] for p in rated_products[:n]]
|
||
|
||
# =================== 插入 MySQL 函数
|
||
def insert_top_products_to_mysql(category, top_products_list, window_end_time):
|
||
try:
|
||
top_products_json = json.dumps(top_products_list, ensure_ascii=False)#数据转换
|
||
timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(window_end_time))#热门商品列表(JSON 字符串)
|
||
query = """
|
||
INSERT INTO category_top_products (category, top_products, timestamp)
|
||
VALUES (%s, %s, %s)
|
||
"""
|
||
cursor.execute(query, (category, top_products_json, timestamp))
|
||
conn.commit()
|
||
print(f" 已插入数据库:[{category}] {top_products_json} @ {timestamp}")
|
||
except Error as e:
|
||
print(f" 插入数据库失败 [{category}]:", e)
|
||
conn.rollback()
|
||
|
||
# =================== 主循环
|
||
try:
|
||
while True:
|
||
msg = consumer.poll(timeout=1.0)#消息轮询
|
||
current_time = int(time.time())
|
||
|
||
# 判断是否到了窗口结束时间
|
||
if current_time - current_window_start >= window_duration:
|
||
print(f"\n 开始处理从 {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(current_window_start))} 到现在的时间窗口...")
|
||
|
||
# 为每个类别生成 Top 3 推荐
|
||
for category, products in product_ratings_window.items():
|
||
top3 = get_top_n_products(products)#热门商品
|
||
insert_top_products_to_mysql(category, top3, current_time)
|
||
|
||
# 清空窗口缓存,更新窗口起始时间
|
||
product_ratings_window.clear()
|
||
current_window_start = current_time
|
||
print(" 窗口已重置\n")
|
||
|
||
# 如果有消息,则继续处理
|
||
if msg is None:
|
||
continue
|
||
if msg.error():
|
||
if msg.error().code() == KafkaException._PARTITION_EOF:
|
||
print(f"Reached end of partition: {msg.topic()} [{msg.partition()}]")
|
||
else:
|
||
raise KafkaException(msg.error())
|
||
else:
|
||
try:
|
||
line = msg.value().decode('utf-8').strip()
|
||
parts = line.split('\t')
|
||
if len(parts) != 6:
|
||
continue
|
||
category, product, quantity, date, rating_str, is_valid = parts
|
||
if is_valid != "Y":
|
||
continue
|
||
rating = int(rating_str)
|
||
|
||
# 更新当前窗口评分
|
||
product_ratings_window[category][product][0] += rating
|
||
product_ratings_window[category][product][1] += 1
|
||
|
||
except Exception as e:
|
||
print("⚠ 解析消息错误:", str(e))
|
||
continue
|
||
|
||
except KeyboardInterrupt:
|
||
print("\n 停止消费者...")
|
||
|
||
finally:
|
||
consumer.close()
|
||
cursor.close()
|
||
conn.close()
|
||
print(" 数据库和 Kafka 连接已关闭") |