from flask import Blueprint, jsonify from dao.kafka.util import * from kafka import KafkaConsumer, TopicPartition from dao.db.mysql import * api_bp = Blueprint('api', __name__) @api_bp.route('/rawdata', methods=['GET']) def readKafka(): consumer: KafkaConsumer = get_KafkaConsumer("orders") messages = [] try: # 读取消息,最多读取10条消息 msg = consumer.poll(timeout_ms=1000, max_records=50) for partition, msgs in msg.items(): for message in msgs: messages.append(raw_Data_to_jsonstr(message.value.decode('utf-8'))) except Exception as e: return jsonify({"error": str(e)}), 500 finally: # 取消订阅并关闭消费者 consumer.close() return jsonify(messages) @api_bp.route('/stats/') def stats(topic:str): # 获取Kafka Topic的offset信息 info = get_offsets(topic) if info is None: return jsonify({"error": "Topic not found"}), 404 return jsonify(info) @api_bp.route('/orders-count') def orders_count(): # 在mysql中读取统计订单数量 return jsonify(get_orders_count()) @api_bp.route('/stream/ordersummary') def orders_count_by_name(): consumer: KafkaConsumer = get_KafkaConsumer("eachOrders_summary") messages = [] try: # 读取消息,最多读取10条消息 msg = consumer.poll(timeout_ms=1000, max_records=50) for partition, msgs in msg.items(): for message in msgs: jsondata = json.loads(message.value.decode('utf-8')) messages.append(jsondata) except Exception as e: return jsonify({"error": str(e)}), 500 finally: # 取消订阅并关闭消费者 consumer.close() print(messages) return jsonify(messages) @api_bp.route('/stream/ordernamecount') def order_name_count(): consumer: KafkaConsumer = get_KafkaConsumer("order_name_count") messages = [] try: # 读取消息,最多读取10条消息 msg = consumer.poll(timeout_ms=1000, max_records=50) for partition, msgs in msg.items(): for message in msgs: jsondata = json.loads(message.value.decode('utf-8')) messages.append(jsondata) except Exception as e: return jsonify({"error": str(e)}), 500 finally: # 取消订阅并关闭消费者 consumer.close() return jsonify(messages) @api_bp.route('/stream/summary') def summary(): consumer: KafkaConsumer = get_KafkaConsumer("orders_summary") messages = [] try: # 读取消息,最多读取10条消息 msg = consumer.poll(timeout_ms=1000, max_records=50) for partition, msgs in msg.items(): for message in msgs: jsondata = json.loads(message.value.decode('utf-8')) messages.append(jsondata) except Exception as e: return jsonify({"error": str(e)}), 500 finally: # 取消订阅并关闭消费者 consumer.close() return jsonify(messages)