diff --git a/main.py b/main.py index 3ffb369..221d403 100644 --- a/main.py +++ b/main.py @@ -2,8 +2,8 @@ from flask import * import threading +import service.manager as manager from init import init_all -from service.producer import run_kafka_producer from conf.util import * # from web import app @@ -25,15 +25,11 @@ if __name__ == '__main__': init_all() # 运行Kafka与Spark相关 - - kafka_producer_t = threading.Thread(target=run_kafka_producer()) - web_t = threading.Thread(target=app.run(host=conf.get('server', 'listen'), port=conf.get('server', 'port'), debug=conf.get('server', 'debug'))) - kafka_producer_t.start() - web_t.start() - + manager.start() # 运行Web服务 + app.run(host=conf.get('server', 'listen'), port=conf.get('server', 'port'), debug=conf.get('server', 'debug')) diff --git a/service/manager.py b/service/manager.py new file mode 100644 index 0000000..1a42e3a --- /dev/null +++ b/service/manager.py @@ -0,0 +1,9 @@ +import threading + +from service.producer import run_kafka_producer + +def start(): + kafka_producer_thread = threading.Thread(target=run_kafka_producer) + + kafka_producer_thread.start() + diff --git a/service/producer.py b/service/producer.py index 6552bf0..271f079 100644 --- a/service/producer.py +++ b/service/producer.py @@ -49,11 +49,8 @@ def delivery_report(err, msg): pass def run_kafka_producer(): - # while True: - # order_data = generate_order_data() # 生成数据 - # producer.produce('orders', order_data, callback=delivery_report) # 发送到 Kafka 的 orders 主题 - # producer.poll(0) # 处理任何待处理的事件(如回调) - # time.sleep(5) # 每隔 5 秒发送一次 while True: - print("123") - time.sleep(1) \ No newline at end of file + order_data = generate_order_data() # 生成数据 + producer.produce('orders', order_data, callback=delivery_report) # 发送到 Kafka 的 orders 主题 + producer.poll(0) # 处理任何待处理的事件(如回调) + time.sleep(5) # 每隔 5 秒发送一次 \ No newline at end of file