from conf.util import get_config_object from kafka import TopicPartition import kafka import json, re conf = get_config_object() # Kafka def get_KafkaConsumer(topic: str) -> kafka.KafkaConsumer: consumer = kafka.KafkaConsumer( topic, bootstrap_servers=conf.get("kafka","bootstrap_servers"), # Kafka 服务器地址 group_id='test', # 消费者组 auto_offset_reset='earliest', # 从最早的消息开始消费 enable_auto_commit=True, # 自动提交消费位移 ) return consumer def raw_Data_to_jsonstr(data: str) -> str: """ 将原始数据切分转换为json字符串 """ # 清理转义字符 data = re.sub(r"\\", "", data) # 去除多余的空格和换行符 data = data.strip() data_list = data.split("\t") return { "order_id": data_list[0], "order_category": data_list[1], "order_name": data_list[2], "order_quantity": data_list[3], "date": data_list[4], "is_valid": data_list[5], } def get_offsets(topic_name: str): """获取 Kafka 主题的已提交位移和终末位移""" consumer = get_KafkaConsumer(topic_name) offsets_data = None # 获取该主题的所有分区 partitions = consumer.partitions_for_topic(topic_name) if not partitions: return print({"error": f"Topic {topic_name} not found"}) # 获取每个分区的已提交位移和终末位移 for partition in partitions: tp = TopicPartition(topic_name, partition) # 获取已提交的位移 commit_offset = consumer.committed(tp) # 获取终末位移(high watermark) end_offset = next(iter(consumer.end_offsets([tp]).values())) offsets_data = { "partition": partition, "commit_offset": commit_offset, "end_offset": end_offset, "lag": end_offset - commit_offset if commit_offset is not None else None, } return offsets_data