from flask import Blueprint, jsonify from dao.kafka.util import get_KafkaConsumer, raw_Data_to_jsonstr from confluent_kafka import KafkaError api_bp = Blueprint('api', __name__) @api_bp.route('/readkafka', methods=['GET']) def readKafka(): consumer = get_KafkaConsumer() # 订阅Kafka主题 consumer.subscribe(["orders"]) # 读取Kafka消息 messages = [] try: while True: msg = consumer.poll(timeout=1.0) if msg is None: break if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: continue else: print(msg.error()) break messages.append(raw_Data_to_jsonstr(msg.value().decode('utf-8'))) # 这里只读取最新的10条消息 if len(messages) >= 100: break finally: # 取消订阅并关闭消费者 consumer.unsubscribe() consumer.close() return jsonify(messages) @api_bp.route('/test', methods=['GET']) def test(): return jsonify({"message": "Hello, World!"})