5 Commits

Author SHA1 Message Date
ce9db4c25d api10.31 2025-01-03 10:31:30 +08:00
e36c7f4810 feat:新增测试接口 轮询更新前端 2024-12-31 16:12:54 +08:00
7677852f2e update:为生产者添加UUID字段 2024-12-30 09:04:32 +08:00
0a6a51737c fix:修改了无法正常启动多线程的问题 2024-12-26 11:14:02 +08:00
samlyy
f219e3306f feat:添加Kafka Producer相关功能 2024-12-26 10:59:22 +08:00
12 changed files with 396 additions and 36 deletions

View File

@@ -4,13 +4,16 @@ port = 8080
debug = true
[database]
host = 172.16.5.2
host = 43.140.205.103
port = 3306
database = test
user = root
password = 123456
database = kaku
user = kaku
password = p4J7fY8mc6hcZfjG
[kafka]
bootstrap_servers = 172.16.5.2:9092
group_id = test
raw_topic = raw
[producer]
data_path = producer-data.json

25
dao/db/mysql.py Normal file
View File

@@ -0,0 +1,25 @@
from dao.db.util import *
def get_orders_count():
# 在mysql中读取统计订单数量
connect = get_connet()
sql = "SELECT * FROM order_name_yn LIMIT 100"
cursor = connect.cursor()
cursor.execute(sql)
result = cursor.fetchall()
cursor.close()
connect.close()
resu = []
for row in result:
resu.append(
{
"order_name": row[0],
"update_time": row[1],
"order_count": row[2]
}
)
return resu

View File

@@ -1,13 +1,70 @@
from conf.util import get_config_object
from kafka import KafkaConsumer
from kafka import TopicPartition
import kafka
import json, re
conf = get_config_object()
# Kafka
def get_KafkaConsumer() -> KafkaConsumer:
""" 返回KafkaConsumer对象 """
consumer = KafkaConsumer(
bootstrap_servers=conf.get("kafka", "bootstrap_servers"),
group_id=conf.get("kafka", "group_id")
def get_KafkaConsumer(topic: str) -> kafka.KafkaConsumer:
consumer = kafka.KafkaConsumer(
topic,
bootstrap_servers=conf.get("kafka","bootstrap_servers"), # Kafka 服务器地址
group_id='test', # 消费者组
auto_offset_reset='earliest', # 从最早的消息开始消费
enable_auto_commit=True, # 自动提交消费位移
)
return consumer
def raw_Data_to_jsonstr(data: str) -> str:
"""
将原始数据切分转换为json字符串
"""
# 清理转义字符
data = re.sub(r"\\", "", data)
# 去除多余的空格和换行符
data = data.strip()
data_list = data.split("\t")
return {
"order_id": data_list[0],
"order_category": data_list[1],
"order_name": data_list[2],
"order_quantity": data_list[3],
"date": data_list[4],
"is_valid": data_list[5],
}
def get_offsets(topic_name: str):
"""获取 Kafka 主题的已提交位移和终末位移"""
consumer = get_KafkaConsumer(topic_name)
offsets_data = None
# 获取该主题的所有分区
partitions = consumer.partitions_for_topic(topic_name)
if not partitions:
return print({"error": f"Topic {topic_name} not found"})
# 获取每个分区的已提交位移和终末位移
for partition in partitions:
tp = TopicPartition(topic_name, partition)
# 获取已提交的位移
commit_offset = consumer.committed(tp)
# 获取终末位移high watermark
end_offset = next(iter(consumer.end_offsets([tp]).values()))
offsets_data = {
"partition": partition,
"commit_offset": commit_offset,
"end_offset": end_offset,
"lag": end_offset - commit_offset if commit_offset is not None else None,
}
return offsets_data

10
main.py
View File

@@ -1,7 +1,9 @@
from flask import *
import threading
import service.manager as manager
from init import init_all
from service.init import run_kafka_spark
from conf.util import *
# from web import app
@@ -14,7 +16,7 @@ app = Flask(__name__)
conf = get_config_object()
# 注册路由
app.register_blueprint(api_bp)
app.register_blueprint(api_bp,url_prefix='/api')
app.register_blueprint(page_bp)
# 启动
@@ -23,9 +25,11 @@ if __name__ == '__main__':
init_all()
# 运行Kafka与Spark相关
run_kafka_spark()
manager.start()
# 运行Web服务
app.run(host=conf.get('server', 'listen'), port=conf.get('server', 'port'), debug=conf.get('server', 'debug'))

7
producer-data.json Normal file
View File

@@ -0,0 +1,7 @@
{
"电器":["电脑", "手机", "电视", "手柄"],
"食物":["面包", "水果", "面条", "坚果"],
"衣服":["大衣", "短袖", "风衣", "卫衣"],
"家具":["椅子", "沙发", "茶几", "柜子"],
"程序猿":["陈楠", "郭子奇", "李尧宇", "郭志胜", "许家禾", "李烁升"]
}

View File

@@ -1,4 +1,101 @@
from flask import Blueprint
from flask import Blueprint, jsonify
from dao.kafka.util import *
from kafka import KafkaConsumer, TopicPartition
from dao.db.mysql import *
api_bp = Blueprint('api', __name__)
@api_bp.route('/rawdata', methods=['GET'])
def readKafka():
consumer: KafkaConsumer = get_KafkaConsumer("orders")
messages = []
try:
# 读取消息最多读取10条消息
msg = consumer.poll(timeout_ms=500, max_records=50)
for partition, msgs in msg.items():
for message in msgs:
messages.append(raw_Data_to_jsonstr(message.value.decode('utf-8')))
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
# 取消订阅并关闭消费者
consumer.close()
return jsonify(messages)
@api_bp.route('/stats/<topic>')
def stats(topic:str):
# 获取Kafka Topic的offset信息
info = get_offsets(topic)
if info is None:
return jsonify({"error": "Topic not found"}), 404
return jsonify(info)
@api_bp.route('/orders-count')
def orders_count():
# 在mysql中读取统计订单数量
return jsonify(get_orders_count())
@api_bp.route('/stream/ordersummary')
def orders_count_by_name():
consumer: KafkaConsumer = get_KafkaConsumer("eachOrders_summary")
messages = []
try:
# 读取消息最多读取10条消息
msg = consumer.poll(timeout_ms=500, max_records=50)
for partition, msgs in msg.items():
for message in msgs:
jsondata = json.loads(message.value.decode('utf-8'))
messages.append(jsondata)
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
# 取消订阅并关闭消费者
consumer.close()
return jsonify(messages)
@api_bp.route('/stream/ordernamecount')
def order_name_count():
consumer: KafkaConsumer = get_KafkaConsumer("order_name_count")
messages = []
try:
# 读取消息最多读取10条消息
msg = consumer.poll(timeout_ms=500, max_records=50)
for partition, msgs in msg.items():
for message in msgs:
jsondata = json.loads(message.value.decode('utf-8'))
messages.append(jsondata)
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
# 取消订阅并关闭消费者
consumer.close()
return jsonify(messages)
@api_bp.route('/stream/summary')
def summary():
consumer: KafkaConsumer = get_KafkaConsumer("orders_summary")
messages = []
try:
# 读取消息最多读取10条消息
msg = consumer.poll(timeout_ms=500, max_records=50)
for partition, msgs in msg.items():
for message in msgs:
jsondata = json.loads(message.value.decode('utf-8'))
messages.append(jsondata)
except Exception as e:
return jsonify({"error": str(e)}), 500
finally:
# 取消订阅并关闭消费者
consumer.close()
return jsonify(messages)

View File

@@ -6,3 +6,7 @@ page_bp = Blueprint('page', __name__)
@page_bp.route('/')
def index():
return render_template('index.html')
@page_bp.route('/show')
def test():
return render_template('show.html')

View File

@@ -1,7 +0,0 @@
import threading
from service.test import test
def run_kafka_spark():
print('run_kafka_spark')
t1 = threading.Thread(target=test)
t1.start()

9
service/manager.py Normal file
View File

@@ -0,0 +1,9 @@
import threading
from service.producer import run_kafka_producer
def start():
kafka_producer_thread = threading.Thread(target=run_kafka_producer)
kafka_producer_thread.start()

64
service/producer.py Normal file
View File

@@ -0,0 +1,64 @@
import random
import time
import json
import uuid
from datetime import datetime
from confluent_kafka import Producer
from conf.util import get_config_object
conf_obj = get_config_object()
# 读取 data.json 数据
with open(conf_obj.get("producer","data_path"), 'r', encoding='utf-8') as f:
data = json.load(f)
# 配置 Kafka 生产者
conf = {
'bootstrap.servers': conf_obj.get("kafka", "bootstrap_servers"), # Kafka 服务地址
'client.id': 'python-producer'
}
# 创建 Kafka 生产者
producer = Producer(conf)
# 生成随机数据的函数
def generate_order_data():
# 随机选择一个订单类别
order_category = random.choice(list(data.keys()))
# 根据订单类别随机选择商品
order_name = random.choice(data[order_category])
# 随机选择数量
order_quantity = random.randint(1, 100)
# 获取当前时间
date = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 随机选择是否有效
is_valid = random.choice(['Y', 'N'])
# 数据格式: UUID\t订单类别\t订单名字\t订单数量\t日期\t是否有效
data_str = f"{uuid.uuid1()}\t{order_category}\t{order_name}\t{order_quantity}\t{date}\t{is_valid}"
# data_str = json.dumps({
# "order_id": str(uuid.uuid1()),
# "order_category": order_category,
# "order_name": order_name,
# "order_quantity": order_quantity,
# "date": date,
# "is_valid": is_valid
# })
return data_str
# 发送消息的回调函数
def delivery_report(err, msg):
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
# print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
pass
def run_kafka_producer():
while True:
order_data = generate_order_data() # 生成数据
producer.produce('orders', order_data, callback=delivery_report) # 发送到 Kafka 的 orders 主题
producer.poll(0) # 处理任何待处理的事件(如回调)
time.sleep(random.random()*3) # 每隔 1-5 秒发送一次

View File

@@ -6,18 +6,10 @@
<title>哇哦哦哦哦哦哦哦哦哦哦哦哦哦哦哦哦</title>
</head>
<body>
Lorem ipsum dolor sit amet consectetur adipisicing elit. Culpa magnam dicta harum sit voluptas, explicabo sed cumque omnis. Culpa, reiciendis numquam atque quod id molestiae nobis similique placeat eos amet.
<ul>
<li>123</li>
<li>123</li>
<li>123</li>
<li>123</li>
<li>123</li>
<li>123</li>
<li>123</li>
<li>123</li>
<li>123</li>
<li>132</li>
</ul>
<div class="container">
<div>
<iframe src="/show" frameborder="0" width="500" height="600"></iframe></iframe>
</div>
</div>
</body>
</html>

105
templates/show.html Normal file
View File

@@ -0,0 +1,105 @@
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>订单展示</title>
<style>
body {
font-family: Arial, sans-serif;
margin: 0;
padding: 20px;
background-color: #ffffff;
}
h1 {
text-align: center;
color: #333;
}
.container {
max-width: 1200px;
margin: 0 auto;
padding: 20px;
}
table {
width: 100%;
border-collapse: collapse;
margin-top: 20px;
box-shadow: 0 2px 8px rgba(0, 0, 0, 0.1);
}
th, td {
padding: 12px;
text-align: left;
border: 1px solid #ddd;
}
th {
background-color: #f2f2f2;
}
tr:nth-child(even) {
background-color: #f9f9f9;
}
.valid {
color: green;
font-weight: bold;
}
.invalid {
color: red;
font-weight: bold;
}
</style>
</head>
<body>
<div class="container">
<h1>订单信息</h1>
<table id="orders-table">
<thead>
<tr>
<th>订单ID</th>
<th>订单分类</th>
<th>订单名称</th>
<th>数量</th>
<th>日期</th>
<th>是否有效</th>
</tr>
</thead>
<tbody>
<!-- 这里将通过 JavaScript 动态填充订单数据 -->
</tbody>
</table>
</div>
<script>
// 定义一个函数用于请求数据并更新表格
function fetchAndUpdateOrders() {
fetch('/api/rawdata') // Flask API 路径
.then(response => response.json())
.then(data => {
const tableBody = document.querySelector('#orders-table tbody');
// 清空现有的表格内容
tableBody.innerHTML = '';
// 填充新的数据
data.forEach(order => {
const row = document.createElement('tr');
row.innerHTML = `
<td>${order.order_id}</td>
<td>${order.order_category}</td>
<td>${order.order_name}</td>
<td>${order.order_quantity}</td>
<td>${order.date}</td>
<td class="${order.is_valid === 'Y' ? 'valid' : 'invalid'}">${order.is_valid === 'Y' ? '有效' : '无效'}</td>
`;
tableBody.appendChild(row);
});
})
.catch(error => {
console.error('获取订单数据失败:', error);
});
}
// 页面加载时,立即调用一次更新数据的函数
fetchAndUpdateOrders();
// 设置轮询:每 5 秒请求一次数据并更新表格
setInterval(fetchAndUpdateOrders, 5000); // 5000 毫秒 = 5 秒
</script>
</body>
</html>