producer_fisrt
This commit is contained in:
commit
466b49c147
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
@ -0,0 +1 @@
|
||||
.idea
|
7
data.json
Normal file
7
data.json
Normal file
@ -0,0 +1,7 @@
|
||||
{
|
||||
"电器": ["电脑", "手机", "电视", "手柄"],
|
||||
"食物":["面包", "水果", "面条", "坚果"],
|
||||
"衣服":["大衣", "短袖", "风衣", "卫衣"],
|
||||
"家具":["椅子", "沙发", "茶几", "柜子"],
|
||||
"程序猿":["陈楠", "郭子奇", "李尧宇", "郭志胜", "许家禾", "李烁升"]
|
||||
}
|
51
main.py
Normal file
51
main.py
Normal file
@ -0,0 +1,51 @@
|
||||
import random
|
||||
import time
|
||||
import json
|
||||
from datetime import datetime
|
||||
import configparser
|
||||
from confluent_kafka import Producer
|
||||
|
||||
# 读取 data.json 数据
|
||||
with open('data.json', 'r', encoding='utf-8') as f:
|
||||
data = json.load(f)
|
||||
|
||||
# 配置 Kafka 生产者
|
||||
conf = {
|
||||
'bootstrap.servers': 'niit:9092', # Kafka 服务地址
|
||||
'client.id': 'python-producer'
|
||||
}
|
||||
|
||||
# 创建 Kafka 生产者
|
||||
producer = Producer(conf)
|
||||
|
||||
# 生成随机数据的函数
|
||||
def generate_order_data():
|
||||
# 随机选择一个订单类别
|
||||
order_category = random.choice(list(data.keys()))
|
||||
# 根据订单类别随机选择商品
|
||||
order_name = random.choice(data[order_category])
|
||||
# 随机选择数量
|
||||
order_quantity = random.randint(1, 100)
|
||||
# 获取当前时间
|
||||
date = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||||
# 随机选择是否有效
|
||||
is_valid = random.choice(['Y', 'N'])
|
||||
# 数据格式: 订单类别\t订单名字\t订单数量\t日期\t是否有效
|
||||
data_str = f"{order_category}\t{order_name}\t{order_quantity}\t{date}\t{is_valid}"
|
||||
return data_str
|
||||
|
||||
|
||||
# 发送消息的回调函数
|
||||
def delivery_report(err, msg):
|
||||
if err is not None:
|
||||
print('Message delivery failed: {}'.format(err))
|
||||
else:
|
||||
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
|
||||
|
||||
|
||||
# 每隔 5 秒推送一次数据
|
||||
while True:
|
||||
order_data = generate_order_data() # 生成数据
|
||||
producer.produce('orders', order_data, callback=delivery_report) # 发送到 Kafka 的 orders 主题
|
||||
producer.poll(0) # 处理任何待处理的事件(如回调)
|
||||
time.sleep(0.5) # 每隔 5 秒发送一次
|
1
requirements.txt
Normal file
1
requirements.txt
Normal file
@ -0,0 +1 @@
|
||||
confluent_kafka == 2.6.2
|
Loading…
x
Reference in New Issue
Block a user