From c6ea6ca4ef5b2594c6ba941f3f36417b14090874 Mon Sep 17 00:00:00 2001 From: samlyy <1751589035@qq .com> Date: Tue, 17 Jun 2025 20:02:26 +0800 Subject: [PATCH] =?UTF-8?q?feat:=E6=8F=90=E4=BA=A4=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 + .idea/.gitignore | 3 + .idea/inspectionProfiles/Project_Default.xml | 16 +++ .../inspectionProfiles/profiles_settings.xml | 6 + .idea/misc.xml | 7 + .idea/modules.xml | 8 ++ .idea/pythonProject.iml | 10 ++ .idea/vcs.xml | 6 + cf.py | 74 +++++++++++ cf2.py | 113 ++++++++++++++++ cf3.py | 121 ++++++++++++++++++ cf_test.py | 77 +++++++++++ 12 files changed, 442 insertions(+) create mode 100644 .gitignore create mode 100644 .idea/.gitignore create mode 100644 .idea/inspectionProfiles/Project_Default.xml create mode 100644 .idea/inspectionProfiles/profiles_settings.xml create mode 100644 .idea/misc.xml create mode 100644 .idea/modules.xml create mode 100644 .idea/pythonProject.iml create mode 100644 .idea/vcs.xml create mode 100644 cf.py create mode 100644 cf2.py create mode 100644 cf3.py create mode 100644 cf_test.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ef81b1e --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/.venv/ diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..359bb53 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,3 @@ +# 默认忽略的文件 +/shelf/ +/workspace.xml diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml new file mode 100644 index 0000000..8d15c6d --- /dev/null +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -0,0 +1,16 @@ + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..5db334c --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,7 @@ + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..e15ec35 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/pythonProject.iml b/.idea/pythonProject.iml new file mode 100644 index 0000000..6cb8b9a --- /dev/null +++ b/.idea/pythonProject.iml @@ -0,0 +1,10 @@ + + + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/cf.py b/cf.py new file mode 100644 index 0000000..b93892a --- /dev/null +++ b/cf.py @@ -0,0 +1,74 @@ +from confluent_kafka import Consumer, KafkaException +import json +import time + +# Kafka 配置 +conf = { + 'bootstrap.servers': 'niit-node3:9092', + 'group.id': 'recommendation-group', + 'auto.offset.reset': 'earliest' +} + +consumer = Consumer(conf) +consumer.subscribe(['orders']) + +# 状态存储:category -> { product -> (total_rating, count) } +product_ratings = {} + +def update_product_rating(category, product, rating): + if category not in product_ratings: + product_ratings[category] = {} + ratings = product_ratings[category] + if product in ratings: + total, cnt = ratings[product] + ratings[product] = (total + rating, cnt + 1) + else: + ratings[product] = (rating, 1) + +def get_top_n_products(ratings_dict, n=3): + # 计算平均评分并排序 + rated_products = [ + (prod, total / cnt) for prod, (total, cnt) in ratings_dict.items() + ] + rated_products.sort(key=lambda x: x[1], reverse=True) + return [p[0] for p in rated_products[:n]] + +try: + while True: + msg = consumer.poll(timeout=1.0) + if msg is None: + continue + if msg.error(): + if msg.error().code() == KafkaException._PARTITION_EOF: + print(f"Reached end of partition: {msg.topic()} [{msg.partition()}]") + else: + raise KafkaException(msg.error()) + else: + # 解析消息内容 + try: + # 注意:Java 生产者发送的是 tab 分隔的字符串,不是 JSON + line = msg.value().decode('utf-8').strip() + parts = line.split('\t') + if len(parts) != 6: + continue + category, product, quantity, date, rating_str, is_valid = parts + if is_valid != "Y": + continue + rating = int(rating_str) + + # 更新评分 + update_product_rating(category, product, rating) + + # 获取当前类别下的 Top 3 + top3 = get_top_n_products(product_ratings[category]) + print(f"[{category}] Top 3: {', '.join(top3)}") + + except Exception as e: + print("Error parsing message:", str(e)) + continue + +except KeyboardInterrupt: + print("Stopping consumer...") + +finally: + consumer.close() \ No newline at end of file diff --git a/cf2.py b/cf2.py new file mode 100644 index 0000000..2e35e0e --- /dev/null +++ b/cf2.py @@ -0,0 +1,113 @@ +from confluent_kafka import Consumer, KafkaException +import json +import time +import mysql.connector +from mysql.connector import Error + +# =================== Kafka 配置 =================== +conf = { + 'bootstrap.servers': 'niit-node3:9092', + 'group.id': 'recommendation-group', + 'auto.offset.reset': 'earliest' +} + +consumer = Consumer(conf) +consumer.subscribe(['orders']) + +# =================== MySQL 配置 =================== +mysql_config = { + 'host': '172.16.5.3', + 'port': 3306, + 'user': 'root', + 'password': '123456', + 'database': 'orders_db' +} + +# 创建数据库连接 +try: + conn = mysql.connector.connect(**mysql_config) + if conn.is_connected(): + print("✅ 成功连接到 MySQL 数据库") + cursor = conn.cursor() +except Error as e: + print("❌ 连接 MySQL 失败:", e) + exit(1) + +# =================== 状态存储 =================== +product_ratings = {} + +def update_product_rating(category, product, rating): + if category not in product_ratings: + product_ratings[category] = {} + ratings = product_ratings[category] + if product in ratings: + total, cnt = ratings[product] + ratings[product] = (total + rating, cnt + 1) + else: + ratings[product] = (rating, 1) + +def get_top_n_products(ratings_dict, n=3): + rated_products = [ + (prod, total / cnt) for prod, (total, cnt) in ratings_dict.items() + ] + rated_products.sort(key=lambda x: x[1], reverse=True) + return [p[0] for p in rated_products[:n]] + +# =================== 插入 MySQL 函数 =================== +def insert_top_products_to_mysql(category, top_products_list): + try: + # 将商品列表转为 JSON 字符串 + top_products_json = json.dumps(top_products_list, ensure_ascii=False) + query = """ + INSERT INTO category_top_products (category, top_products) + VALUES (%s, %s) + """ + cursor.execute(query, (category, top_products_json)) + conn.commit() + print(f"🟢 已插入数据库:[{category}] {top_products_json}") + except Error as e: + print(f"🔴 插入数据库失败 [{category}]:", e) + conn.rollback() + +# =================== 主循环 =================== +try: + while True: + msg = consumer.poll(timeout=1.0) + if msg is None: + continue + if msg.error(): + if msg.error().code() == KafkaException._PARTITION_EOF: + print(f"Reached end of partition: {msg.topic()} [{msg.partition()}]") + else: + raise KafkaException(msg.error()) + else: + try: + line = msg.value().decode('utf-8').strip() + parts = line.split('\t') + if len(parts) != 6: + continue + category, product, quantity, date, rating_str, is_valid = parts + if is_valid != "Y": + continue + rating = int(rating_str) + + update_product_rating(category, product, rating) + + top3 = get_top_n_products(product_ratings[category]) + print(f"[{category}] Top 3: {', '.join(top3)}") + + # 插入 MySQL + insert_top_products_to_mysql(category, top3) + + except Exception as e: + print("⚠️ 解析消息错误:", str(e)) + continue + +except KeyboardInterrupt: + print("\n🛑 停止消费者...") + +finally: + consumer.close() + cursor.close() + conn.close() + print("🔌 数据库和 Kafka 连接已关闭") \ No newline at end of file diff --git a/cf3.py b/cf3.py new file mode 100644 index 0000000..a90bf3a --- /dev/null +++ b/cf3.py @@ -0,0 +1,121 @@ +from confluent_kafka import Consumer, KafkaException +import json +import time +import mysql.connector +from mysql.connector import Error +from collections import defaultdict + +# Kafka 配置 +conf = { + 'bootstrap.servers': 'niit-node3:9092', + 'group.id': 'recommendation-group', + 'auto.offset.reset': 'earliest' +} + +consumer = Consumer(conf) +consumer.subscribe(['orders']) + +# MySQL 配置 +mysql_config = { + 'host': '172.16.5.3', + 'port': 3306, + 'user': 'root', + 'password': '123456', + 'database': 'orders_db' +} + +# 创建数据库连接 +try: + conn = mysql.connector.connect(**mysql_config) + if conn.is_connected(): + print(" 成功连接到 MySQL 数据库") + cursor = conn.cursor() +except Error as e: + print(" 连接 MySQL 失败:", e) + exit(1) + +# =================== 状态存储 +window_duration = 300 # 5 分钟 +current_window_start = int(time.time()) + +# 缓存当前窗口的商品评分:{ category -> { product -> (total_rating, count) } } +product_ratings_window = defaultdict(lambda: defaultdict(lambda: [0, 0])) + +def get_top_n_products(ratings_dict, n=3): + rated_products = [ + (prod, total / cnt) for prod, (total, cnt) in ratings_dict.items() + ] + rated_products.sort(key=lambda x: x[1], reverse=True) + return [p[0] for p in rated_products[:n]] + +# =================== 插入 MySQL 函数 +def insert_top_products_to_mysql(category, top_products_list, window_end_time): + try: + top_products_json = json.dumps(top_products_list, ensure_ascii=False) + timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(window_end_time)) + query = """ + INSERT INTO category_top_products (category, top_products, timestamp) + VALUES (%s, %s, %s) + """ + cursor.execute(query, (category, top_products_json, timestamp)) + conn.commit() + print(f" 已插入数据库:[{category}] {top_products_json} @ {timestamp}") + except Error as e: + print(f" 插入数据库失败 [{category}]:", e) + conn.rollback() + +# =================== 主循环 +try: + while True: + msg = consumer.poll(timeout=1.0) + current_time = int(time.time()) + + # 判断是否到了窗口结束时间 + if current_time - current_window_start >= window_duration: + print(f"\n 开始处理从 {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(current_window_start))} 到现在的时间窗口...") + + # 为每个类别生成 Top 3 推荐 + for category, products in product_ratings_window.items(): + top3 = get_top_n_products(products) + insert_top_products_to_mysql(category, top3, current_time) + + # 清空窗口缓存,更新窗口起始时间 + product_ratings_window.clear() + current_window_start = current_time + print(" 窗口已重置\n") + + # 如果有消息,则继续处理 + if msg is None: + continue + if msg.error(): + if msg.error().code() == KafkaException._PARTITION_EOF: + print(f"Reached end of partition: {msg.topic()} [{msg.partition()}]") + else: + raise KafkaException(msg.error()) + else: + try: + line = msg.value().decode('utf-8').strip() + parts = line.split('\t') + if len(parts) != 6: + continue + category, product, quantity, date, rating_str, is_valid = parts + if is_valid != "Y": + continue + rating = int(rating_str) + + # 更新当前窗口评分 + product_ratings_window[category][product][0] += rating + product_ratings_window[category][product][1] += 1 + + except Exception as e: + print("⚠ 解析消息错误:", str(e)) + continue + +except KeyboardInterrupt: + print("\n 停止消费者...") + +finally: + consumer.close() + cursor.close() + conn.close() + print(" 数据库和 Kafka 连接已关闭") \ No newline at end of file diff --git a/cf_test.py b/cf_test.py new file mode 100644 index 0000000..34eb1f9 --- /dev/null +++ b/cf_test.py @@ -0,0 +1,77 @@ +from collections import defaultdict +import random + +# 商品类别和商品名称映射 +CATEGORIES = ["电器", "服饰", "食品", "玩具", "手机"] + +PRODUCT_NAMES = { + "电器": ["电视", "冰箱", "洗衣机", "空调", "吸尘器", "电饭煲", "微波炉", "电磁炉", "热水器", "空气净化器"], + "服饰": ["T恤", "牛仔裤", "羽绒服", "衬衫", "运动鞋", "夹克", "卫衣", "连衣裙", "短裤", "风衣"], + "食品": ["巧克力", "饼干", "方便面", "牛奶", "饮料", "面包", "糖果", "果冻", "薯片", "蛋挞"], + "玩具": ["积木", "拼图", "玩偶", "遥控车", "毛绒玩具", "魔方", "乐高", "变形金刚", "洋娃娃", "电子琴"], + "手机": ["华为", "苹果", "小米", "OPPO", "vivo", "荣耀", "三星", "魅族", "联想", "努比亚"] +} + +# 用户评分 (0, 50, 100) -> 不喜欢、还行、很喜欢 +RATINGS = [0, 50, 100] +IS_VALID = ["Y", "N"] + + +# ================== 生成假数据 ================== +def generate_fake_orders(n=50): + orders = [] + for _ in range(n): + category = random.choice(CATEGORIES) + product = random.choice(PRODUCT_NAMES[category]) + rating = random.choice(RATINGS) + is_valid = random.choice(IS_VALID) + + orders.append({ + 'category': category, + 'product': product, + 'rating': rating, + 'isValid': is_valid + }) + return orders + + +# ================== 推荐逻辑 ================== +def recommend_top_n(orders, n=3): + # 存储:{ category: { product: (total_rating, count) } } + product_ratings = defaultdict(lambda: defaultdict(lambda: [0, 0])) # [总评分, 数量] + + for order in orders: + if order['isValid'] != 'Y': + continue + category = order['category'] + product = order['product'] + rating = order['rating'] + + product_ratings[category][product][0] += rating + product_ratings[category][product][1] += 1 + + # 计算平均评分并排序 + recommendations = {} + for category, products in product_ratings.items(): + rated_products = [ + (product, total / count) for product, (total, count) in products.items() + ] + rated_products.sort(key=lambda x: x[1], reverse=True) + recommendations[category] = [p[0] for p in rated_products[:n]] + + return recommendations + + +# ================== 主程序入口 ================== +if __name__ == '__main__': + fake_data = generate_fake_orders(100) + + print("=== 假数据样本 ===") + for d in fake_data[:10]: # 打印前10条数据看看 + print(d) + + print("\n=== 开始推荐 Top 3 商品 ===") + result = recommend_top_n(fake_data, n=3) + + for category, top_products in result.items(): + print(f"[{category}] 推荐商品:{', '.join(top_products)}") \ No newline at end of file