import random import time import json import uuid from datetime import datetime from confluent_kafka import Producer from conf.util import get_config_object conf_obj = get_config_object() # 读取 data.json 数据 with open(conf_obj.get("producer","data_path"), 'r', encoding='utf-8') as f: data = json.load(f) # 配置 Kafka 生产者 conf = { 'bootstrap.servers': conf_obj.get("kafka", "bootstrap_servers"), # Kafka 服务地址 'client.id': 'python-producer' } # 创建 Kafka 生产者 producer = Producer(conf) # 生成随机数据的函数 def generate_order_data(): # 随机选择一个订单类别 order_category = random.choice(list(data.keys())) # 根据订单类别随机选择商品 order_name = random.choice(data[order_category]) # 随机选择数量 order_quantity = random.randint(1, 100) # 获取当前时间 date = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # 随机选择是否有效 is_valid = random.choice(['Y', 'N']) # 数据格式: UUID\t订单类别\t订单名字\t订单数量\t日期\t是否有效 data_str = f"{uuid.uuid1()}\t{order_category}\t{order_name}\t{order_quantity}\t{date}\t{is_valid}" # data_str = json.dumps({ # "order_id": str(uuid.uuid1()), # "order_category": order_category, # "order_name": order_name, # "order_quantity": order_quantity, # "date": date, # "is_valid": is_valid # }) return data_str # 发送消息的回调函数 def delivery_report(err, msg): if err is not None: print('Message delivery failed: {}'.format(err)) else: print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) pass def run_kafka_producer(): while True: order_data = generate_order_data() # 生成数据 producer.produce('orders', order_data, callback=delivery_report) # 发送到 Kafka 的 orders 主题 producer.poll(0) # 处理任何待处理的事件(如回调) time.sleep(5) # 每隔 5 秒发送一次