70 lines
2.0 KiB
Python
70 lines
2.0 KiB
Python
from conf.util import get_config_object
|
||
from kafka import TopicPartition
|
||
import kafka
|
||
import json, re
|
||
|
||
conf = get_config_object()
|
||
|
||
|
||
# Kafka
|
||
def get_KafkaConsumer(topic: str) -> kafka.KafkaConsumer:
|
||
consumer = kafka.KafkaConsumer(
|
||
topic,
|
||
bootstrap_servers=conf.get("kafka","bootstrap_servers"), # Kafka 服务器地址
|
||
group_id='test', # 消费者组
|
||
auto_offset_reset='earliest', # 从最早的消息开始消费
|
||
enable_auto_commit=True, # 自动提交消费位移
|
||
)
|
||
return consumer
|
||
|
||
|
||
def raw_Data_to_jsonstr(data: str) -> str:
|
||
"""
|
||
将原始数据切分转换为json字符串
|
||
"""
|
||
# 清理转义字符
|
||
data = re.sub(r"\\", "", data)
|
||
|
||
# 去除多余的空格和换行符
|
||
data = data.strip()
|
||
data_list = data.split("\t")
|
||
|
||
return {
|
||
"order_id": data_list[0],
|
||
"order_category": data_list[1],
|
||
"order_name": data_list[2],
|
||
"order_quantity": data_list[3],
|
||
"date": data_list[4],
|
||
"is_valid": data_list[5],
|
||
}
|
||
|
||
|
||
def get_offsets(topic_name: str):
|
||
"""获取 Kafka 主题的已提交位移和终末位移"""
|
||
consumer = get_KafkaConsumer(topic_name)
|
||
|
||
offsets_data = None
|
||
|
||
# 获取该主题的所有分区
|
||
partitions = consumer.partitions_for_topic(topic_name)
|
||
if not partitions:
|
||
return print({"error": f"Topic {topic_name} not found"})
|
||
|
||
# 获取每个分区的已提交位移和终末位移
|
||
for partition in partitions:
|
||
tp = TopicPartition(topic_name, partition)
|
||
|
||
# 获取已提交的位移
|
||
commit_offset = consumer.committed(tp)
|
||
|
||
# 获取终末位移(high watermark)
|
||
end_offset = next(iter(consumer.end_offsets([tp]).values()))
|
||
|
||
offsets_data = {
|
||
"partition": partition,
|
||
"commit_offset": commit_offset,
|
||
"end_offset": end_offset,
|
||
"lag": end_offset - commit_offset if commit_offset is not None else None,
|
||
}
|
||
|
||
return offsets_data |