2025-01-03 10:31:30 +08:00

101 lines
3.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Blueprint, jsonify
from dao.kafka.util import *
from kafka import KafkaConsumer, TopicPartition
from dao.db.mysql import *
api_bp = Blueprint('api', __name__)
@api_bp.route('/rawdata', methods=['GET'])
def readKafka():
consumer: KafkaConsumer = get_KafkaConsumer("orders")
messages = []
try:
# 读取消息最多读取10条消息
msg = consumer.poll(timeout_ms=500, max_records=50)
for partition, msgs in msg.items():
for message in msgs:
messages.append(raw_Data_to_jsonstr(message.value.decode('utf-8')))
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
# 取消订阅并关闭消费者
consumer.close()
return jsonify(messages)
@api_bp.route('/stats/<topic>')
def stats(topic:str):
# 获取Kafka Topic的offset信息
info = get_offsets(topic)
if info is None:
return jsonify({"error": "Topic not found"}), 404
return jsonify(info)
@api_bp.route('/orders-count')
def orders_count():
# 在mysql中读取统计订单数量
return jsonify(get_orders_count())
@api_bp.route('/stream/ordersummary')
def orders_count_by_name():
consumer: KafkaConsumer = get_KafkaConsumer("eachOrders_summary")
messages = []
try:
# 读取消息最多读取10条消息
msg = consumer.poll(timeout_ms=500, max_records=50)
for partition, msgs in msg.items():
for message in msgs:
jsondata = json.loads(message.value.decode('utf-8'))
messages.append(jsondata)
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
# 取消订阅并关闭消费者
consumer.close()
return jsonify(messages)
@api_bp.route('/stream/ordernamecount')
def order_name_count():
consumer: KafkaConsumer = get_KafkaConsumer("order_name_count")
messages = []
try:
# 读取消息最多读取10条消息
msg = consumer.poll(timeout_ms=500, max_records=50)
for partition, msgs in msg.items():
for message in msgs:
jsondata = json.loads(message.value.decode('utf-8'))
messages.append(jsondata)
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
# 取消订阅并关闭消费者
consumer.close()
return jsonify(messages)
@api_bp.route('/stream/summary')
def summary():
consumer: KafkaConsumer = get_KafkaConsumer("orders_summary")
messages = []
try:
# 读取消息最多读取10条消息
msg = consumer.poll(timeout_ms=500, max_records=50)
for partition, msgs in msg.items():
for message in msgs:
jsondata = json.loads(message.value.decode('utf-8'))
messages.append(jsondata)
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
# 取消订阅并关闭消费者
consumer.close()
return jsonify(messages)