102 lines
3.1 KiB
Python
102 lines
3.1 KiB
Python
from flask import Blueprint, jsonify
|
||
from dao.kafka.util import *
|
||
from kafka import KafkaConsumer, TopicPartition
|
||
from dao.db.mysql import *
|
||
|
||
api_bp = Blueprint('api', __name__)
|
||
|
||
@api_bp.route('/rawdata', methods=['GET'])
|
||
def readKafka():
|
||
consumer: KafkaConsumer = get_KafkaConsumer("orders")
|
||
|
||
messages = []
|
||
try:
|
||
# 读取消息,最多读取10条消息
|
||
msg = consumer.poll(timeout_ms=500, max_records=50)
|
||
for partition, msgs in msg.items():
|
||
for message in msgs:
|
||
messages.append(raw_Data_to_jsonstr(message.value.decode('utf-8')))
|
||
except Exception as e:
|
||
return jsonify({"error": str(e)}), 500
|
||
finally:
|
||
# 取消订阅并关闭消费者
|
||
consumer.close()
|
||
|
||
return jsonify(messages)
|
||
|
||
@api_bp.route('/stats/<topic>')
|
||
def stats(topic:str):
|
||
# 获取Kafka Topic的offset信息
|
||
info = get_offsets(topic)
|
||
if info is None:
|
||
return jsonify({"error": "Topic not found"}), 404
|
||
return jsonify(info)
|
||
|
||
@api_bp.route('/orders-count')
|
||
def orders_count():
|
||
# 在mysql中读取统计订单数量
|
||
return jsonify(get_orders_count())
|
||
|
||
@api_bp.route('/stream/ordersummary')
|
||
def orders_count_by_name():
|
||
consumer: KafkaConsumer = get_KafkaConsumer("eachOrders_summary")
|
||
|
||
messages = []
|
||
try:
|
||
# 读取消息,最多读取10条消息
|
||
msg = consumer.poll(timeout_ms=500, max_records=50)
|
||
for partition, msgs in msg.items():
|
||
for message in msgs:
|
||
jsondata = json.loads(message.value.decode('utf-8'))
|
||
messages.append(jsondata)
|
||
except Exception as e:
|
||
return jsonify({"error": str(e)}), 500
|
||
finally:
|
||
# 取消订阅并关闭消费者
|
||
consumer.close()
|
||
print(messages)
|
||
return jsonify(messages)
|
||
|
||
|
||
|
||
@api_bp.route('/stream/ordernamecount')
|
||
def order_name_count():
|
||
consumer: KafkaConsumer = get_KafkaConsumer("order_name_count")
|
||
|
||
messages = []
|
||
try:
|
||
# 读取消息,最多读取10条消息
|
||
msg = consumer.poll(timeout_ms=500, max_records=50)
|
||
for partition, msgs in msg.items():
|
||
for message in msgs:
|
||
jsondata = json.loads(message.value.decode('utf-8'))
|
||
messages.append(jsondata)
|
||
except Exception as e:
|
||
return jsonify({"error": str(e)}), 500
|
||
finally:
|
||
# 取消订阅并关闭消费者
|
||
consumer.close()
|
||
|
||
return jsonify(messages)
|
||
|
||
|
||
|
||
@api_bp.route('/stream/summary')
|
||
def summary():
|
||
consumer: KafkaConsumer = get_KafkaConsumer("orders_summary")
|
||
|
||
messages = []
|
||
try:
|
||
# 读取消息,最多读取10条消息
|
||
msg = consumer.poll(timeout_ms=500, max_records=50)
|
||
for partition, msgs in msg.items():
|
||
for message in msgs:
|
||
jsondata = json.loads(message.value.decode('utf-8'))
|
||
messages.append(jsondata)
|
||
except Exception as e:
|
||
return jsonify({"error": str(e)}), 500
|
||
finally:
|
||
# 取消订阅并关闭消费者
|
||
consumer.close()
|
||
|
||
return jsonify(messages) |