diff --git a/config.ini b/config.ini index 8454a1d..d91ec9e 100644 --- a/config.ini +++ b/config.ini @@ -4,11 +4,11 @@ port = 8080 debug = true [database] -host = 172.16.5.2 +host = 43.140.205.103 port = 3306 -database = test -user = root -password = 123456 +database = kaku +user = kaku +password = p4J7fY8mc6hcZfjG [kafka] bootstrap_servers = 172.16.5.2:9092 diff --git a/dao/db/mysql.py b/dao/db/mysql.py new file mode 100644 index 0000000..2ea5cc2 --- /dev/null +++ b/dao/db/mysql.py @@ -0,0 +1,25 @@ +from dao.db.util import * + +def get_orders_count(): + # 在mysql中读取统计订单数量 + connect = get_connet() + sql = "SELECT * FROM order_name_yn LIMIT 100" + cursor = connect.cursor() + cursor.execute(sql) + result = cursor.fetchall() + cursor.close() + connect.close() + + resu = [] + for row in result: + resu.append( + { + "order_name": row[0], + "update_time": row[1], + "order_count": row[2] + } + ) + + return resu + + diff --git a/dao/kafka/util.py b/dao/kafka/util.py index 7671916..371bb3a 100644 --- a/dao/kafka/util.py +++ b/dao/kafka/util.py @@ -1,19 +1,20 @@ from conf.util import get_config_object -from confluent_kafka import Consumer +from kafka import TopicPartition +import kafka import json, re conf = get_config_object() # Kafka -def get_KafkaConsumer() -> Consumer: - """返回KafkaConsumer对象""" - kafka_conf = { - "bootstrap.servers": conf.get("kafka", "bootstrap_servers"), - "group.id": conf.get("kafka", "group_id"), - "auto.offset.reset": "earliest", - } - consumer = Consumer(kafka_conf) +def get_KafkaConsumer(topic: str) -> kafka.KafkaConsumer: + consumer = kafka.KafkaConsumer( + topic, + bootstrap_servers=conf.get("kafka","bootstrap_servers"), # Kafka 服务器地址 + group_id='test', # 消费者组 + auto_offset_reset='earliest', # 从最早的消息开始消费 + enable_auto_commit=True, # 自动提交消费位移 + ) return consumer @@ -22,17 +23,48 @@ def raw_Data_to_jsonstr(data: str) -> str: 将原始数据切分转换为json字符串 """ # 清理转义字符 - data = re.sub(r'\\', '', data) - + data = re.sub(r"\\", "", data) + # 去除多余的空格和换行符 data = data.strip() data_list = data.split("\t") - + return { - "order_id": data_list[0], - "order_category": data_list[1], - "order_name": data_list[2], - "order_quantity": data_list[3], - "date": data_list[4], - "is_valid": data_list[5], + "order_id": data_list[0], + "order_category": data_list[1], + "order_name": data_list[2], + "order_quantity": data_list[3], + "date": data_list[4], + "is_valid": data_list[5], + } + + +def get_offsets(topic_name: str): + """获取 Kafka 主题的已提交位移和终末位移""" + consumer = get_KafkaConsumer(topic_name) + + offsets_data = None + + # 获取该主题的所有分区 + partitions = consumer.partitions_for_topic(topic_name) + if not partitions: + return print({"error": f"Topic {topic_name} not found"}) + + # 获取每个分区的已提交位移和终末位移 + for partition in partitions: + tp = TopicPartition(topic_name, partition) + + # 获取已提交的位移 + commit_offset = consumer.committed(tp) + + # 获取终末位移(high watermark) + end_offset = next(iter(consumer.end_offsets([tp]).values())) + + offsets_data = { + "partition": partition, + "commit_offset": commit_offset, + "end_offset": end_offset, + "lag": end_offset - commit_offset if commit_offset is not None else None, } + + return offsets_data \ No newline at end of file diff --git a/main.py b/main.py index 221d403..2e67275 100644 --- a/main.py +++ b/main.py @@ -16,7 +16,7 @@ app = Flask(__name__) conf = get_config_object() # 注册路由 -app.register_blueprint(api_bp) +app.register_blueprint(api_bp,url_prefix='/api') app.register_blueprint(page_bp) # 启动 diff --git a/router/api.py b/router/api.py index 55dd10c..8b849a3 100644 --- a/router/api.py +++ b/router/api.py @@ -1,39 +1,101 @@ from flask import Blueprint, jsonify -from dao.kafka.util import get_KafkaConsumer, raw_Data_to_jsonstr -from confluent_kafka import KafkaError +from dao.kafka.util import * +from kafka import KafkaConsumer, TopicPartition +from dao.db.mysql import * api_bp = Blueprint('api', __name__) -@api_bp.route('/readkafka', methods=['GET']) +@api_bp.route('/rawdata', methods=['GET']) def readKafka(): - consumer = get_KafkaConsumer() - # 订阅Kafka主题 - consumer.subscribe(["orders"]) + consumer: KafkaConsumer = get_KafkaConsumer("orders") - # 读取Kafka消息 messages = [] try: - while True: - msg = consumer.poll(timeout=1.0) - if msg is None: - break - if msg.error(): - if msg.error().code() == KafkaError._PARTITION_EOF: - continue - else: - print(msg.error()) - break - messages.append(raw_Data_to_jsonstr(msg.value().decode('utf-8'))) - # 这里只读取最新的10条消息 - if len(messages) >= 100: - break + # 读取消息,最多读取10条消息 + msg = consumer.poll(timeout_ms=500, max_records=50) + for partition, msgs in msg.items(): + for message in msgs: + messages.append(raw_Data_to_jsonstr(message.value.decode('utf-8'))) + except Exception as e: + return jsonify({"error": str(e)}), 500 finally: # 取消订阅并关闭消费者 - consumer.unsubscribe() consumer.close() - + return jsonify(messages) -@api_bp.route('/test', methods=['GET']) -def test(): - return jsonify({"message": "Hello, World!"}) +@api_bp.route('/stats/') +def stats(topic:str): + # 获取Kafka Topic的offset信息 + info = get_offsets(topic) + if info is None: + return jsonify({"error": "Topic not found"}), 404 + return jsonify(info) + +@api_bp.route('/orders-count') +def orders_count(): + # 在mysql中读取统计订单数量 + return jsonify(get_orders_count()) + +@api_bp.route('/stream/ordersummary') +def orders_count_by_name(): + consumer: KafkaConsumer = get_KafkaConsumer("eachOrders_summary") + + messages = [] + try: + # 读取消息,最多读取10条消息 + msg = consumer.poll(timeout_ms=500, max_records=50) + for partition, msgs in msg.items(): + for message in msgs: + jsondata = json.loads(message.value.decode('utf-8')) + messages.append(jsondata) + except Exception as e: + return jsonify({"error": str(e)}), 500 + finally: + # 取消订阅并关闭消费者 + consumer.close() + + return jsonify(messages) + + +@api_bp.route('/stream/ordernamecount') +def order_name_count(): + consumer: KafkaConsumer = get_KafkaConsumer("order_name_count") + + messages = [] + try: + # 读取消息,最多读取10条消息 + msg = consumer.poll(timeout_ms=500, max_records=50) + for partition, msgs in msg.items(): + for message in msgs: + jsondata = json.loads(message.value.decode('utf-8')) + messages.append(jsondata) + except Exception as e: + return jsonify({"error": str(e)}), 500 + finally: + # 取消订阅并关闭消费者 + consumer.close() + + return jsonify(messages) + + + +@api_bp.route('/stream/summary') +def summary(): + consumer: KafkaConsumer = get_KafkaConsumer("orders_summary") + + messages = [] + try: + # 读取消息,最多读取10条消息 + msg = consumer.poll(timeout_ms=500, max_records=50) + for partition, msgs in msg.items(): + for message in msgs: + jsondata = json.loads(message.value.decode('utf-8')) + messages.append(jsondata) + except Exception as e: + return jsonify({"error": str(e)}), 500 + finally: + # 取消订阅并关闭消费者 + consumer.close() + + return jsonify(messages) \ No newline at end of file diff --git a/service/producer.py b/service/producer.py index d56f8e3..df24298 100644 --- a/service/producer.py +++ b/service/producer.py @@ -53,7 +53,7 @@ def delivery_report(err, msg): if err is not None: print('Message delivery failed: {}'.format(err)) else: - print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) + # print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) pass def run_kafka_producer(): @@ -61,4 +61,4 @@ def run_kafka_producer(): order_data = generate_order_data() # 生成数据 producer.produce('orders', order_data, callback=delivery_report) # 发送到 Kafka 的 orders 主题 producer.poll(0) # 处理任何待处理的事件(如回调) - time.sleep(5) # 每隔 5 秒发送一次 \ No newline at end of file + time.sleep(random.random()*3) # 每隔 1-5 秒发送一次 \ No newline at end of file diff --git a/templates/index.html b/templates/index.html index bbd6d5f..77ecd81 100644 --- a/templates/index.html +++ b/templates/index.html @@ -6,18 +6,10 @@ 哇哦哦哦哦哦哦哦哦哦哦哦哦哦哦哦哦 - Lorem ipsum dolor sit amet consectetur adipisicing elit. Culpa magnam dicta harum sit voluptas, explicabo sed cumque omnis. Culpa, reiciendis numquam atque quod id molestiae nobis similique placeat eos amet. - +
+
+ +
+
\ No newline at end of file diff --git a/templates/show.html b/templates/show.html index da7a392..9a13301 100644 --- a/templates/show.html +++ b/templates/show.html @@ -9,7 +9,7 @@ font-family: Arial, sans-serif; margin: 0; padding: 20px; - background-color: #f4f4f9; + background-color: #ffffff; } h1 { text-align: center; @@ -70,7 +70,7 @@