top/cf3.py
2025-06-17 20:38:56 +08:00

121 lines
4.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from confluent_kafka import Consumer, KafkaException
import json
import time
import mysql.connector
from mysql.connector import Error
from collections import defaultdict
# Kafka 配置
conf = {
'bootstrap.servers': 'niit-node3:9092',
'group.id': 'recommendation-group',
'auto.offset.reset': 'earliest'
}
consumer = Consumer(conf)
consumer.subscribe(['orders'])
# MySQL 配置
mysql_config = {
'host': '172.16.5.3',
'port': 3306,
'user': 'root',
'password': '123456',
'database': 'orders_db'
}
# 创建数据库连接
try:
conn = mysql.connector.connect(**mysql_config)
if conn.is_connected():
print(" 成功连接到 MySQL 数据库")
cursor = conn.cursor()
except Error as e:
print(" 连接 MySQL 失败:", e)
exit(1)
# =================== 状态存储
window_duration = 300 # 5 分钟
current_window_start = int(time.time())
# 缓存当前窗口的商品评分:{ category -> { product -> (total_rating, count) } }
product_ratings_window = defaultdict(lambda: defaultdict(lambda: [0, 0]))
def get_top_n_products(ratings_dict, n=3):
rated_products = [
(prod, total / cnt) for prod, (total, cnt) in ratings_dict.items()#用字典,剑是商品名,值是总评分,次数
]
rated_products.sort(key=lambda x: x[1], reverse=True)
return [p[0] for p in rated_products[:n]]
# =================== 插入 MySQL 函数
def insert_top_products_to_mysql(category, top_products_list, window_end_time):
try:
top_products_json = json.dumps(top_products_list, ensure_ascii=False)#数据转换
timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(window_end_time))#热门商品列表JSON 字符串)
query = """
INSERT INTO category_top_products (category, top_products, timestamp)
VALUES (%s, %s, %s)
"""
cursor.execute(query, (category, top_products_json, timestamp))
conn.commit()
print(f" 已插入数据库:[{category}] {top_products_json} @ {timestamp}")
except Error as e:
print(f" 插入数据库失败 [{category}]:", e)
conn.rollback()
# =================== 主循环
try:
while True:
msg = consumer.poll(timeout=1.0)#消息轮询
current_time = int(time.time())
# 判断是否到了窗口结束时间
if current_time - current_window_start >= window_duration:
print(f"\n 开始处理从 {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(current_window_start))} 到现在的时间窗口...")
# 为每个类别生成 Top 3 推荐
for category, products in product_ratings_window.items():
top3 = get_top_n_products(products)#热门商品
insert_top_products_to_mysql(category, top3, current_time)
# 清空窗口缓存,更新窗口起始时间
product_ratings_window.clear()
current_window_start = current_time
print(" 窗口已重置\n")
# 如果有消息,则继续处理
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaException._PARTITION_EOF:
print(f"Reached end of partition: {msg.topic()} [{msg.partition()}]")
else:
raise KafkaException(msg.error())
else:
try:
line = msg.value().decode('utf-8').strip()
parts = line.split('\t')
if len(parts) != 6:
continue
category, product, quantity, date, rating_str, is_valid = parts
if is_valid != "Y":
continue
rating = int(rating_str)
# 更新当前窗口评分
product_ratings_window[category][product][0] += rating
product_ratings_window[category][product][1] += 1
except Exception as e:
print("⚠ 解析消息错误:", str(e))
continue
except KeyboardInterrupt:
print("\n 停止消费者...")
finally:
consumer.close()
cursor.close()
conn.close()
print(" 数据库和 Kafka 连接已关闭")