2025-01-03 10:31:30 +08:00

70 lines
2.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from conf.util import get_config_object
from kafka import TopicPartition
import kafka
import json, re
conf = get_config_object()
# Kafka
def get_KafkaConsumer(topic: str) -> kafka.KafkaConsumer:
consumer = kafka.KafkaConsumer(
topic,
bootstrap_servers=conf.get("kafka","bootstrap_servers"), # Kafka 服务器地址
group_id='test', # 消费者组
auto_offset_reset='earliest', # 从最早的消息开始消费
enable_auto_commit=True, # 自动提交消费位移
)
return consumer
def raw_Data_to_jsonstr(data: str) -> str:
"""
将原始数据切分转换为json字符串
"""
# 清理转义字符
data = re.sub(r"\\", "", data)
# 去除多余的空格和换行符
data = data.strip()
data_list = data.split("\t")
return {
"order_id": data_list[0],
"order_category": data_list[1],
"order_name": data_list[2],
"order_quantity": data_list[3],
"date": data_list[4],
"is_valid": data_list[5],
}
def get_offsets(topic_name: str):
"""获取 Kafka 主题的已提交位移和终末位移"""
consumer = get_KafkaConsumer(topic_name)
offsets_data = None
# 获取该主题的所有分区
partitions = consumer.partitions_for_topic(topic_name)
if not partitions:
return print({"error": f"Topic {topic_name} not found"})
# 获取每个分区的已提交位移和终末位移
for partition in partitions:
tp = TopicPartition(topic_name, partition)
# 获取已提交的位移
commit_offset = consumer.committed(tp)
# 获取终末位移high watermark
end_offset = next(iter(consumer.end_offsets([tp]).values()))
offsets_data = {
"partition": partition,
"commit_offset": commit_offset,
"end_offset": end_offset,
"lag": end_offset - commit_offset if commit_offset is not None else None,
}
return offsets_data