feat:添加Kafka Producer相关功能
This commit is contained in:
parent
2e045fc0d8
commit
f219e3306f
@ -13,4 +13,7 @@ password = 123456
|
|||||||
[kafka]
|
[kafka]
|
||||||
bootstrap_servers = 172.16.5.2:9092
|
bootstrap_servers = 172.16.5.2:9092
|
||||||
group_id = test
|
group_id = test
|
||||||
raw_topic = raw
|
raw_topic = raw
|
||||||
|
|
||||||
|
[producer]
|
||||||
|
data_path = producer-data.json
|
14
main.py
14
main.py
@ -1,7 +1,9 @@
|
|||||||
from flask import *
|
from flask import *
|
||||||
|
|
||||||
|
import threading
|
||||||
|
|
||||||
from init import init_all
|
from init import init_all
|
||||||
from service.init import run_kafka_spark
|
from service.producer import run_kafka_producer
|
||||||
from conf.util import *
|
from conf.util import *
|
||||||
|
|
||||||
# from web import app
|
# from web import app
|
||||||
@ -23,9 +25,15 @@ if __name__ == '__main__':
|
|||||||
init_all()
|
init_all()
|
||||||
|
|
||||||
# 运行Kafka与Spark相关
|
# 运行Kafka与Spark相关
|
||||||
run_kafka_spark()
|
|
||||||
|
kafka_producer_t = threading.Thread(target=run_kafka_producer())
|
||||||
|
web_t = threading.Thread(target=app.run(host=conf.get('server', 'listen'), port=conf.get('server', 'port'), debug=conf.get('server', 'debug')))
|
||||||
|
kafka_producer_t.start()
|
||||||
|
web_t.start()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# 运行Web服务
|
# 运行Web服务
|
||||||
app.run(host=conf.get('server', 'listen'), port=conf.get('server', 'port'), debug=conf.get('server', 'debug'))
|
|
||||||
|
|
||||||
|
|
||||||
|
7
producer-data.json
Normal file
7
producer-data.json
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
{
|
||||||
|
"电器":["电脑", "手机", "电视", "手柄"],
|
||||||
|
"食物":["面包", "水果", "面条", "坚果"],
|
||||||
|
"衣服":["大衣", "短袖", "风衣", "卫衣"],
|
||||||
|
"家具":["椅子", "沙发", "茶几", "柜子"],
|
||||||
|
"程序猿":["陈楠", "郭子奇", "李尧宇", "郭志胜", "许家禾", "李烁升"]
|
||||||
|
}
|
@ -1,7 +0,0 @@
|
|||||||
import threading
|
|
||||||
from service.test import test
|
|
||||||
def run_kafka_spark():
|
|
||||||
print('run_kafka_spark')
|
|
||||||
t1 = threading.Thread(target=test)
|
|
||||||
t1.start()
|
|
||||||
|
|
59
service/producer.py
Normal file
59
service/producer.py
Normal file
@ -0,0 +1,59 @@
|
|||||||
|
import random
|
||||||
|
import time
|
||||||
|
import json
|
||||||
|
import threading
|
||||||
|
from datetime import datetime
|
||||||
|
from confluent_kafka import Producer
|
||||||
|
|
||||||
|
from conf.util import get_config_object
|
||||||
|
|
||||||
|
|
||||||
|
conf_obj = get_config_object()
|
||||||
|
|
||||||
|
# 读取 data.json 数据
|
||||||
|
with open(conf_obj.get("producer","data_path"), 'r', encoding='utf-8') as f:
|
||||||
|
data = json.load(f)
|
||||||
|
|
||||||
|
# 配置 Kafka 生产者
|
||||||
|
conf = {
|
||||||
|
'bootstrap.servers': conf_obj.get("kafka", "bootstrap_servers"), # Kafka 服务地址
|
||||||
|
'client.id': 'python-producer'
|
||||||
|
}
|
||||||
|
|
||||||
|
# 创建 Kafka 生产者
|
||||||
|
producer = Producer(conf)
|
||||||
|
|
||||||
|
# 生成随机数据的函数
|
||||||
|
def generate_order_data():
|
||||||
|
# 随机选择一个订单类别
|
||||||
|
order_category = random.choice(list(data.keys()))
|
||||||
|
# 根据订单类别随机选择商品
|
||||||
|
order_name = random.choice(data[order_category])
|
||||||
|
# 随机选择数量
|
||||||
|
order_quantity = random.randint(1, 100)
|
||||||
|
# 获取当前时间
|
||||||
|
date = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||||||
|
# 随机选择是否有效
|
||||||
|
is_valid = random.choice(['Y', 'N'])
|
||||||
|
# 数据格式: 订单类别\t订单名字\t订单数量\t日期\t是否有效
|
||||||
|
data_str = f"{order_category}\t{order_name}\t{order_quantity}\t{date}\t{is_valid}"
|
||||||
|
return data_str
|
||||||
|
|
||||||
|
|
||||||
|
# 发送消息的回调函数
|
||||||
|
def delivery_report(err, msg):
|
||||||
|
if err is not None:
|
||||||
|
print('Message delivery failed: {}'.format(err))
|
||||||
|
else:
|
||||||
|
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
|
||||||
|
pass
|
||||||
|
|
||||||
|
def run_kafka_producer():
|
||||||
|
# while True:
|
||||||
|
# order_data = generate_order_data() # 生成数据
|
||||||
|
# producer.produce('orders', order_data, callback=delivery_report) # 发送到 Kafka 的 orders 主题
|
||||||
|
# producer.poll(0) # 处理任何待处理的事件(如回调)
|
||||||
|
# time.sleep(5) # 每隔 5 秒发送一次
|
||||||
|
while True:
|
||||||
|
print("123")
|
||||||
|
time.sleep(1)
|
Loading…
x
Reference in New Issue
Block a user