package main import ( "context" "fmt" "log" "math/rand" "os" "os/signal" "strconv" "strings" "syscall" "time" "github.com/BurntSushi/toml" "github.com/Shopify/sarama" ) // 配置结构体 type config struct { Kafka struct { Brokers []string `toml:"brokers"` Topic string `toml:"topic"` SendInterval string `toml:"send_interval"` RetryMax int `toml:"retry_max"` RetryBackoffMs int `toml:"retry_backoff_ms"` Version string `toml:"version"` } `toml:"kafka"` Order struct { Categories []struct { Name string `toml:"name"` Items []string `toml:"items"` } `toml:"categories"` Scores []int `toml:"scores"` Valids []string `toml:"valids"` } `toml:"order"` } func loadConfig() (config, error) { cfg := config{} if _, err := os.Stat("config.toml"); err == nil { if _, err := toml.DecodeFile("config.toml", &cfg); err != nil { return config{}, err } } else { log.Println("未找到配置文件,使用环境变量") } return cfg, nil } func randomOrder(cfg *config) string { categoryToNames := make(map[string][]string) for _, c := range cfg.Order.Categories { categoryToNames[c.Name] = c.Items } scores := cfg.Order.Scores if len(scores) == 0 { scores = []int{80, 90, 100} } valids := cfg.Order.Valids if len(valids) == 0 { valids = []string{"Y", "N"} } categories := make([]string, 0, len(categoryToNames)) for k := range categoryToNames { categories = append(categories, k) } category := categories[rand.Intn(len(categories))] names := categoryToNames[category] name := names[rand.Intn(len(names))] quantity := rand.Intn(10) + 1 date := time.Now().Format("2006-01-02") score := strconv.Itoa(scores[rand.Intn(len(scores))]) valid := valids[rand.Intn(len(valids))] fields := []string{category, name, fmt.Sprintf("%d", quantity), date, score, valid} return strings.Join(fields, "\t") } func main() { // 加载配置 cfg, err := loadConfig() if err != nil { log.Fatalf("加载配置失败: %v", err) } // 优先使用配置文件中的值,否则使用环境变量 brokerEnv := os.Getenv("KAFKA_BROKERS") if brokerEnv == "" { brokerEnv = "localhost:9092" // 默认值 } else { brokerEnv = strings.Join(cfg.Kafka.Brokers, ",") } brokers := strings.Split(brokerEnv, ",") topic := os.Getenv("KAFKA_TOPIC") if topic == "" { topic = "orders" // 默认topic } else { topic = cfg.Kafka.Topic } sendIntervalStr := os.Getenv("SEND_INTERVAL") sendInterval := 1 * time.Second if sendIntervalStr != "" { if dur, err := time.ParseDuration(sendIntervalStr); err == nil { sendInterval = dur } else { sendInterval := 1 * time.Second if cfg.Kafka.SendInterval != "" { var err error sendInterval, err = time.ParseDuration(cfg.Kafka.SendInterval) if err != nil { log.Fatalf("无效的发送间隔: %v", err) } } ticker := time.NewTicker(sendInterval) defer ticker.Stop() } } else { if cfg.Kafka.SendInterval != "" { var err error sendInterval, err = time.ParseDuration(cfg.Kafka.SendInterval) if err != nil { log.Fatalf("无效的发送间隔: %v", err) } } } log.Printf("启动订单生产者: brokers=%v, topic=%s, 发送间隔=%v", brokers, topic, sendInterval) // Kafka生产者配置 // Kafka高级配置 config := sarama.NewConfig() config.Producer.Return.Successes = true // 从环境变量获取Kafka版本 kafkaVersion := os.Getenv("KAFKA_VERSION") if kafkaVersion == "" { config.Version = sarama.V0_10_0_0 // 默认版本 } else { version, err := sarama.ParseKafkaVersion(kafkaVersion) if err != nil { log.Fatalf("无效的Kafka版本: %v", err) } config.Version = version } // 配置重试策略 retryMaxStr := os.Getenv("KAFKA_RETRY_MAX") retryMax := 3 if n, err := strconv.Atoi(retryMaxStr); err == nil && n > 0 { retryMax = n } else { retryMax = cfg.Kafka.RetryMax } retryBackoffStr := os.Getenv("KAFKA_RETRY_BACKOFF_MS") retryBackoff := 100 * time.Millisecond if ms, err := strconv.Atoi(retryBackoffStr); err == nil && ms > 0 { retryBackoff = time.Duration(ms) * time.Millisecond } else { retryBackoff = time.Duration(cfg.Kafka.RetryBackoffMs) * time.Millisecond } config.Producer.Retry.Max = retryMax config.Producer.Retry.Backoff = retryBackoff producer, err := sarama.NewSyncProducer(brokers, config) if err != nil { log.Fatalf("创建Kafka生产者失败: %v", err) } defer producer.Close() // 优雅终止处理 ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() // 主循环 for { select { case <-ctx.Done(): log.Println("接收到终止信号,关闭生产者...") return default: order := randomOrder(&cfg) msg := &sarama.ProducerMessage{ Topic: topic, Value: sarama.StringEncoder(order), } // 发送消息(依赖sarama内置重试机制) partition, offset, err := producer.SendMessage(msg) if err != nil { log.Printf("发送失败: topic=%s, 数据=%s, 错误=%v", topic, order, err) } else { log.Printf("发送成功: 分区=%d, 偏移量=%d, 数据=%s", partition, offset, order) } time.Sleep(sendInterval) } } }