diff --git a/dao/kafka/util.py b/dao/kafka/util.py index 4432b59..7671916 100644 --- a/dao/kafka/util.py +++ b/dao/kafka/util.py @@ -1,13 +1,38 @@ from conf.util import get_config_object -from kafka import KafkaConsumer +from confluent_kafka import Consumer +import json, re conf = get_config_object() + # Kafka -def get_KafkaConsumer() -> KafkaConsumer: - """ 返回KafkaConsumer对象 """ - consumer = KafkaConsumer( - bootstrap_servers=conf.get("kafka", "bootstrap_servers"), - group_id=conf.get("kafka", "group_id") - ) - return consumer \ No newline at end of file +def get_KafkaConsumer() -> Consumer: + """返回KafkaConsumer对象""" + kafka_conf = { + "bootstrap.servers": conf.get("kafka", "bootstrap_servers"), + "group.id": conf.get("kafka", "group_id"), + "auto.offset.reset": "earliest", + } + consumer = Consumer(kafka_conf) + return consumer + + +def raw_Data_to_jsonstr(data: str) -> str: + """ + 将原始数据切分转换为json字符串 + """ + # 清理转义字符 + data = re.sub(r'\\', '', data) + + # 去除多余的空格和换行符 + data = data.strip() + data_list = data.split("\t") + + return { + "order_id": data_list[0], + "order_category": data_list[1], + "order_name": data_list[2], + "order_quantity": data_list[3], + "date": data_list[4], + "is_valid": data_list[5], + } diff --git a/router/api.py b/router/api.py index ca111ca..55dd10c 100644 --- a/router/api.py +++ b/router/api.py @@ -1,4 +1,39 @@ -from flask import Blueprint +from flask import Blueprint, jsonify +from dao.kafka.util import get_KafkaConsumer, raw_Data_to_jsonstr +from confluent_kafka import KafkaError api_bp = Blueprint('api', __name__) +@api_bp.route('/readkafka', methods=['GET']) +def readKafka(): + consumer = get_KafkaConsumer() + # 订阅Kafka主题 + consumer.subscribe(["orders"]) + + # 读取Kafka消息 + messages = [] + try: + while True: + msg = consumer.poll(timeout=1.0) + if msg is None: + break + if msg.error(): + if msg.error().code() == KafkaError._PARTITION_EOF: + continue + else: + print(msg.error()) + break + messages.append(raw_Data_to_jsonstr(msg.value().decode('utf-8'))) + # 这里只读取最新的10条消息 + if len(messages) >= 100: + break + finally: + # 取消订阅并关闭消费者 + consumer.unsubscribe() + consumer.close() + + return jsonify(messages) + +@api_bp.route('/test', methods=['GET']) +def test(): + return jsonify({"message": "Hello, World!"}) diff --git a/router/page.py b/router/page.py index 13b0e16..91ea9b5 100644 --- a/router/page.py +++ b/router/page.py @@ -6,3 +6,7 @@ page_bp = Blueprint('page', __name__) @page_bp.route('/') def index(): return render_template('index.html') + +@page_bp.route('/show') +def test(): + return render_template('show.html') diff --git a/templates/show.html b/templates/show.html new file mode 100644 index 0000000..da7a392 --- /dev/null +++ b/templates/show.html @@ -0,0 +1,105 @@ + + +
+ + +订单ID | +订单分类 | +订单名称 | +数量 | +日期 | +是否有效 | +
---|