Compare commits

...

3 Commits

Author SHA1 Message Date
7677852f2e update:为生产者添加UUID字段 2024-12-30 09:04:32 +08:00
0a6a51737c fix:修改了无法正常启动多线程的问题 2024-12-26 11:14:02 +08:00
samlyy
f219e3306f feat:添加Kafka Producer相关功能 2024-12-26 10:59:22 +08:00
6 changed files with 90 additions and 10 deletions

View File

@ -13,4 +13,7 @@ password = 123456
[kafka]
bootstrap_servers = 172.16.5.2:9092
group_id = test
raw_topic = raw
raw_topic = raw
[producer]
data_path = producer-data.json

View File

@ -1,7 +1,9 @@
from flask import *
import threading
import service.manager as manager
from init import init_all
from service.init import run_kafka_spark
from conf.util import *
# from web import app
@ -23,9 +25,11 @@ if __name__ == '__main__':
init_all()
# 运行Kafka与Spark相关
run_kafka_spark()
manager.start()
# 运行Web服务
app.run(host=conf.get('server', 'listen'), port=conf.get('server', 'port'), debug=conf.get('server', 'debug'))

7
producer-data.json Normal file
View File

@ -0,0 +1,7 @@
{
"电器":["电脑", "手机", "电视", "手柄"],
"食物":["面包", "水果", "面条", "坚果"],
"衣服":["大衣", "短袖", "风衣", "卫衣"],
"家具":["椅子", "沙发", "茶几", "柜子"],
"程序猿":["陈楠", "郭子奇", "李尧宇", "郭志胜", "许家禾", "李烁升"]
}

View File

@ -1,7 +0,0 @@
import threading
from service.test import test
def run_kafka_spark():
print('run_kafka_spark')
t1 = threading.Thread(target=test)
t1.start()

9
service/manager.py Normal file
View File

@ -0,0 +1,9 @@
import threading
from service.producer import run_kafka_producer
def start():
kafka_producer_thread = threading.Thread(target=run_kafka_producer)
kafka_producer_thread.start()

64
service/producer.py Normal file
View File

@ -0,0 +1,64 @@
import random
import time
import json
import uuid
from datetime import datetime
from confluent_kafka import Producer
from conf.util import get_config_object
conf_obj = get_config_object()
# 读取 data.json 数据
with open(conf_obj.get("producer","data_path"), 'r', encoding='utf-8') as f:
data = json.load(f)
# 配置 Kafka 生产者
conf = {
'bootstrap.servers': conf_obj.get("kafka", "bootstrap_servers"), # Kafka 服务地址
'client.id': 'python-producer'
}
# 创建 Kafka 生产者
producer = Producer(conf)
# 生成随机数据的函数
def generate_order_data():
# 随机选择一个订单类别
order_category = random.choice(list(data.keys()))
# 根据订单类别随机选择商品
order_name = random.choice(data[order_category])
# 随机选择数量
order_quantity = random.randint(1, 100)
# 获取当前时间
date = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 随机选择是否有效
is_valid = random.choice(['Y', 'N'])
# 数据格式: UUID\t订单类别\t订单名字\t订单数量\t日期\t是否有效
data_str = f"{uuid.uuid1()}\t{order_category}\t{order_name}\t{order_quantity}\t{date}\t{is_valid}"
# data_str = json.dumps({
# "order_id": str(uuid.uuid1()),
# "order_category": order_category,
# "order_name": order_name,
# "order_quantity": order_quantity,
# "date": date,
# "is_valid": is_valid
# })
return data_str
# 发送消息的回调函数
def delivery_report(err, msg):
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
pass
def run_kafka_producer():
while True:
order_data = generate_order_data() # 生成数据
producer.produce('orders', order_data, callback=delivery_report) # 发送到 Kafka 的 orders 主题
producer.poll(0) # 处理任何待处理的事件(如回调)
time.sleep(5) # 每隔 5 秒发送一次