from confluent_kafka import Consumer, KafkaException import json import time # Kafka 配置 conf = { 'bootstrap.servers': 'niit-node3:9092', 'group.id': 'recommendation-group', 'auto.offset.reset': 'earliest' } consumer = Consumer(conf) consumer.subscribe(['orders']) # 状态存储:category -> { product -> (total_rating, count) } product_ratings = {} def update_product_rating(category, product, rating): if category not in product_ratings: product_ratings[category] = {} ratings = product_ratings[category] if product in ratings: total, cnt = ratings[product] ratings[product] = (total + rating, cnt + 1) else: ratings[product] = (rating, 1) def get_top_n_products(ratings_dict, n=3): # 计算平均评分并排序 rated_products = [ (prod, total / cnt) for prod, (total, cnt) in ratings_dict.items() ] rated_products.sort(key=lambda x: x[1], reverse=True) return [p[0] for p in rated_products[:n]] try: while True: msg = consumer.poll(timeout=1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaException._PARTITION_EOF: print(f"Reached end of partition: {msg.topic()} [{msg.partition()}]") else: raise KafkaException(msg.error()) else: # 解析消息内容 try: # 注意:Java 生产者发送的是 tab 分隔的字符串,不是 JSON line = msg.value().decode('utf-8').strip() parts = line.split('\t') if len(parts) != 6: continue category, product, quantity, date, rating_str, is_valid = parts if is_valid != "Y": continue rating = int(rating_str) # 更新评分 update_product_rating(category, product, rating) # 获取当前类别下的 Top 3 top3 = get_top_n_products(product_ratings[category]) print(f"[{category}] Top 3: {', '.join(top3)}") except Exception as e: print("Error parsing message:", str(e)) continue except KeyboardInterrupt: print("Stopping consumer...") finally: consumer.close()