This commit is contained in:
2025-01-03 10:31:30 +08:00
parent e36c7f4810
commit ce9db4c25d
8 changed files with 177 additions and 66 deletions

View File

@@ -1,39 +1,101 @@
from flask import Blueprint, jsonify
from dao.kafka.util import get_KafkaConsumer, raw_Data_to_jsonstr
from confluent_kafka import KafkaError
from dao.kafka.util import *
from kafka import KafkaConsumer, TopicPartition
from dao.db.mysql import *
api_bp = Blueprint('api', __name__)
@api_bp.route('/readkafka', methods=['GET'])
@api_bp.route('/rawdata', methods=['GET'])
def readKafka():
consumer = get_KafkaConsumer()
# 订阅Kafka主题
consumer.subscribe(["orders"])
consumer: KafkaConsumer = get_KafkaConsumer("orders")
# 读取Kafka消息
messages = []
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
break
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print(msg.error())
break
messages.append(raw_Data_to_jsonstr(msg.value().decode('utf-8')))
# 这里只读取最新的10条消息
if len(messages) >= 100:
break
# 读取消息最多读取10条消息
msg = consumer.poll(timeout_ms=500, max_records=50)
for partition, msgs in msg.items():
for message in msgs:
messages.append(raw_Data_to_jsonstr(message.value.decode('utf-8')))
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
# 取消订阅并关闭消费者
consumer.unsubscribe()
consumer.close()
return jsonify(messages)
@api_bp.route('/test', methods=['GET'])
def test():
return jsonify({"message": "Hello, World!"})
@api_bp.route('/stats/<topic>')
def stats(topic:str):
# 获取Kafka Topic的offset信息
info = get_offsets(topic)
if info is None:
return jsonify({"error": "Topic not found"}), 404
return jsonify(info)
@api_bp.route('/orders-count')
def orders_count():
# 在mysql中读取统计订单数量
return jsonify(get_orders_count())
@api_bp.route('/stream/ordersummary')
def orders_count_by_name():
consumer: KafkaConsumer = get_KafkaConsumer("eachOrders_summary")
messages = []
try:
# 读取消息最多读取10条消息
msg = consumer.poll(timeout_ms=500, max_records=50)
for partition, msgs in msg.items():
for message in msgs:
jsondata = json.loads(message.value.decode('utf-8'))
messages.append(jsondata)
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
# 取消订阅并关闭消费者
consumer.close()
return jsonify(messages)
@api_bp.route('/stream/ordernamecount')
def order_name_count():
consumer: KafkaConsumer = get_KafkaConsumer("order_name_count")
messages = []
try:
# 读取消息最多读取10条消息
msg = consumer.poll(timeout_ms=500, max_records=50)
for partition, msgs in msg.items():
for message in msgs:
jsondata = json.loads(message.value.decode('utf-8'))
messages.append(jsondata)
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
# 取消订阅并关闭消费者
consumer.close()
return jsonify(messages)
@api_bp.route('/stream/summary')
def summary():
consumer: KafkaConsumer = get_KafkaConsumer("orders_summary")
messages = []
try:
# 读取消息最多读取10条消息
msg = consumer.poll(timeout_ms=500, max_records=50)
for partition, msgs in msg.items():
for message in msgs:
jsondata = json.loads(message.value.decode('utf-8'))
messages.append(jsondata)
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
# 取消订阅并关闭消费者
consumer.close()
return jsonify(messages)