From 466b49c14703b03cab3b4f6cebd676ae94a40d2a Mon Sep 17 00:00:00 2001 From: samlyy <1751589035@qq .com> Date: Thu, 19 Dec 2024 15:27:13 +0800 Subject: [PATCH] producer_fisrt --- .gitignore | 1 + data.json | 7 +++++++ main.py | 51 ++++++++++++++++++++++++++++++++++++++++++++++++ requirements.txt | 1 + 4 files changed, 60 insertions(+) create mode 100644 .gitignore create mode 100644 data.json create mode 100644 main.py create mode 100644 requirements.txt diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..723ef36 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.idea \ No newline at end of file diff --git a/data.json b/data.json new file mode 100644 index 0000000..456aa78 --- /dev/null +++ b/data.json @@ -0,0 +1,7 @@ +{ + "电器": ["电脑", "手机", "电视", "手柄"], + "食物":["面包", "水果", "面条", "坚果"], + "衣服":["大衣", "短袖", "风衣", "卫衣"], + "家具":["椅子", "沙发", "茶几", "柜子"], + "程序猿":["陈楠", "郭子奇", "李尧宇", "郭志胜", "许家禾", "李烁升"] +} \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..3aeb4bc --- /dev/null +++ b/main.py @@ -0,0 +1,51 @@ +import random +import time +import json +from datetime import datetime +import configparser +from confluent_kafka import Producer + +# 读取 data.json 数据 +with open('data.json', 'r', encoding='utf-8') as f: + data = json.load(f) + +# 配置 Kafka 生产者 +conf = { + 'bootstrap.servers': 'niit:9092', # Kafka 服务地址 + 'client.id': 'python-producer' +} + +# 创建 Kafka 生产者 +producer = Producer(conf) + +# 生成随机数据的函数 +def generate_order_data(): + # 随机选择一个订单类别 + order_category = random.choice(list(data.keys())) + # 根据订单类别随机选择商品 + order_name = random.choice(data[order_category]) + # 随机选择数量 + order_quantity = random.randint(1, 100) + # 获取当前时间 + date = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + # 随机选择是否有效 + is_valid = random.choice(['Y', 'N']) + # 数据格式: 订单类别\t订单名字\t订单数量\t日期\t是否有效 + data_str = f"{order_category}\t{order_name}\t{order_quantity}\t{date}\t{is_valid}" + return data_str + + +# 发送消息的回调函数 +def delivery_report(err, msg): + if err is not None: + print('Message delivery failed: {}'.format(err)) + else: + print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) + + +# 每隔 5 秒推送一次数据 +while True: + order_data = generate_order_data() # 生成数据 + producer.produce('orders', order_data, callback=delivery_report) # 发送到 Kafka 的 orders 主题 + producer.poll(0) # 处理任何待处理的事件(如回调) + time.sleep(0.5) # 每隔 5 秒发送一次 \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..dfacc83 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +confluent_kafka == 2.6.2