import random import time import json from datetime import datetime import configparser from confluent_kafka import Producer # 读取 data.json 数据 with open('data.json', 'r', encoding='utf-8') as f: data = json.load(f) # 配置 Kafka 生产者 conf = { 'bootstrap.servers': 'niit:9092', # Kafka 服务地址 'client.id': 'python-producer' } # 创建 Kafka 生产者 producer = Producer(conf) # 生成随机数据的函数 def generate_order_data(): # 随机选择一个订单类别 order_category = random.choice(list(data.keys())) # 根据订单类别随机选择商品 order_name = random.choice(data[order_category]) # 随机选择数量 order_quantity = random.randint(1, 100) # 获取当前时间 date = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # 随机选择是否有效 is_valid = random.choice(['Y', 'N']) # 数据格式: 订单类别\t订单名字\t订单数量\t日期\t是否有效 data_str = f"{order_category}\t{order_name}\t{order_quantity}\t{date}\t{is_valid}" return data_str # 发送消息的回调函数 def delivery_report(err, msg): if err is not None: print('Message delivery failed: {}'.format(err)) else: print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) # 每隔 5 秒推送一次数据 while True: order_data = generate_order_data() # 生成数据 producer.produce('orders', order_data, callback=delivery_report) # 发送到 Kafka 的 orders 主题 producer.poll(0) # 处理任何待处理的事件(如回调) time.sleep(0.5) # 每隔 5 秒发送一次