From 0a6a51737cec8b5086b474589096e8383e08d3b3 Mon Sep 17 00:00:00 2001 From: Kakune55 Date: Thu, 26 Dec 2024 11:14:02 +0800 Subject: [PATCH] =?UTF-8?q?fix:=E4=BF=AE=E6=94=B9=E4=BA=86=E6=97=A0?= =?UTF-8?q?=E6=B3=95=E6=AD=A3=E5=B8=B8=E5=90=AF=E5=8A=A8=E5=A4=9A=E7=BA=BF?= =?UTF-8?q?=E7=A8=8B=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.py | 10 +++------- service/manager.py | 9 +++++++++ service/producer.py | 11 ++++------- 3 files changed, 16 insertions(+), 14 deletions(-) create mode 100644 service/manager.py diff --git a/main.py b/main.py index 3ffb369..221d403 100644 --- a/main.py +++ b/main.py @@ -2,8 +2,8 @@ from flask import * import threading +import service.manager as manager from init import init_all -from service.producer import run_kafka_producer from conf.util import * # from web import app @@ -25,15 +25,11 @@ if __name__ == '__main__': init_all() # 运行Kafka与Spark相关 - - kafka_producer_t = threading.Thread(target=run_kafka_producer()) - web_t = threading.Thread(target=app.run(host=conf.get('server', 'listen'), port=conf.get('server', 'port'), debug=conf.get('server', 'debug'))) - kafka_producer_t.start() - web_t.start() - + manager.start() # 运行Web服务 + app.run(host=conf.get('server', 'listen'), port=conf.get('server', 'port'), debug=conf.get('server', 'debug')) diff --git a/service/manager.py b/service/manager.py new file mode 100644 index 0000000..1a42e3a --- /dev/null +++ b/service/manager.py @@ -0,0 +1,9 @@ +import threading + +from service.producer import run_kafka_producer + +def start(): + kafka_producer_thread = threading.Thread(target=run_kafka_producer) + + kafka_producer_thread.start() + diff --git a/service/producer.py b/service/producer.py index 6552bf0..271f079 100644 --- a/service/producer.py +++ b/service/producer.py @@ -49,11 +49,8 @@ def delivery_report(err, msg): pass def run_kafka_producer(): - # while True: - # order_data = generate_order_data() # 生成数据 - # producer.produce('orders', order_data, callback=delivery_report) # 发送到 Kafka 的 orders 主题 - # producer.poll(0) # 处理任何待处理的事件(如回调) - # time.sleep(5) # 每隔 5 秒发送一次 while True: - print("123") - time.sleep(1) \ No newline at end of file + order_data = generate_order_data() # 生成数据 + producer.produce('orders', order_data, callback=delivery_report) # 发送到 Kafka 的 orders 主题 + producer.poll(0) # 处理任何待处理的事件(如回调) + time.sleep(5) # 每隔 5 秒发送一次 \ No newline at end of file