From f219e3306f3bb1fc1b5885533a02086a49917f85 Mon Sep 17 00:00:00 2001 From: samlyy <1751589035@qq .com> Date: Thu, 26 Dec 2024 10:59:22 +0800 Subject: [PATCH] =?UTF-8?q?feat:=E6=B7=BB=E5=8A=A0Kafka=20Producer?= =?UTF-8?q?=E7=9B=B8=E5=85=B3=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.ini | 5 +++- main.py | 14 ++++++++--- producer-data.json | 7 ++++++ service/init.py | 7 ------ service/producer.py | 59 +++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 81 insertions(+), 11 deletions(-) create mode 100644 producer-data.json delete mode 100644 service/init.py create mode 100644 service/producer.py diff --git a/config.ini b/config.ini index e3b9fa2..8454a1d 100644 --- a/config.ini +++ b/config.ini @@ -13,4 +13,7 @@ password = 123456 [kafka] bootstrap_servers = 172.16.5.2:9092 group_id = test -raw_topic = raw \ No newline at end of file +raw_topic = raw + +[producer] +data_path = producer-data.json \ No newline at end of file diff --git a/main.py b/main.py index 5bf4f2e..3ffb369 100644 --- a/main.py +++ b/main.py @@ -1,7 +1,9 @@ from flask import * +import threading + from init import init_all -from service.init import run_kafka_spark +from service.producer import run_kafka_producer from conf.util import * # from web import app @@ -23,9 +25,15 @@ if __name__ == '__main__': init_all() # 运行Kafka与Spark相关 - run_kafka_spark() + + kafka_producer_t = threading.Thread(target=run_kafka_producer()) + web_t = threading.Thread(target=app.run(host=conf.get('server', 'listen'), port=conf.get('server', 'port'), debug=conf.get('server', 'debug'))) + kafka_producer_t.start() + web_t.start() + + # 运行Web服务 - app.run(host=conf.get('server', 'listen'), port=conf.get('server', 'port'), debug=conf.get('server', 'debug')) + diff --git a/producer-data.json b/producer-data.json new file mode 100644 index 0000000..b3d922b --- /dev/null +++ b/producer-data.json @@ -0,0 +1,7 @@ +{ + "电器":["电脑", "手机", "电视", "手柄"], + "食物":["面包", "水果", "面条", "坚果"], + "衣服":["大衣", "短袖", "风衣", "卫衣"], + "家具":["椅子", "沙发", "茶几", "柜子"], + "程序猿":["陈楠", "郭子奇", "李尧宇", "郭志胜", "许家禾", "李烁升"] +} \ No newline at end of file diff --git a/service/init.py b/service/init.py deleted file mode 100644 index 78cbdc5..0000000 --- a/service/init.py +++ /dev/null @@ -1,7 +0,0 @@ -import threading -from service.test import test -def run_kafka_spark(): - print('run_kafka_spark') - t1 = threading.Thread(target=test) - t1.start() - diff --git a/service/producer.py b/service/producer.py new file mode 100644 index 0000000..6552bf0 --- /dev/null +++ b/service/producer.py @@ -0,0 +1,59 @@ +import random +import time +import json +import threading +from datetime import datetime +from confluent_kafka import Producer + +from conf.util import get_config_object + + +conf_obj = get_config_object() + +# 读取 data.json 数据 +with open(conf_obj.get("producer","data_path"), 'r', encoding='utf-8') as f: + data = json.load(f) + +# 配置 Kafka 生产者 +conf = { + 'bootstrap.servers': conf_obj.get("kafka", "bootstrap_servers"), # Kafka 服务地址 + 'client.id': 'python-producer' +} + +# 创建 Kafka 生产者 +producer = Producer(conf) + +# 生成随机数据的函数 +def generate_order_data(): + # 随机选择一个订单类别 + order_category = random.choice(list(data.keys())) + # 根据订单类别随机选择商品 + order_name = random.choice(data[order_category]) + # 随机选择数量 + order_quantity = random.randint(1, 100) + # 获取当前时间 + date = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + # 随机选择是否有效 + is_valid = random.choice(['Y', 'N']) + # 数据格式: 订单类别\t订单名字\t订单数量\t日期\t是否有效 + data_str = f"{order_category}\t{order_name}\t{order_quantity}\t{date}\t{is_valid}" + return data_str + + +# 发送消息的回调函数 +def delivery_report(err, msg): + if err is not None: + print('Message delivery failed: {}'.format(err)) + else: + print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) + pass + +def run_kafka_producer(): + # while True: + # order_data = generate_order_data() # 生成数据 + # producer.produce('orders', order_data, callback=delivery_report) # 发送到 Kafka 的 orders 主题 + # producer.poll(0) # 处理任何待处理的事件(如回调) + # time.sleep(5) # 每隔 5 秒发送一次 + while True: + print("123") + time.sleep(1) \ No newline at end of file