2025-06-17 11:17:17 +08:00

207 lines
5.1 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"context"
"fmt"
"log"
"math/rand"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"
"github.com/BurntSushi/toml"
"github.com/Shopify/sarama"
)
// 配置结构体
type config struct {
Kafka struct {
Brokers []string `toml:"brokers"`
Topic string `toml:"topic"`
SendInterval string `toml:"send_interval"`
RetryMax int `toml:"retry_max"`
RetryBackoffMs int `toml:"retry_backoff_ms"`
Version string `toml:"version"`
} `toml:"kafka"`
Order struct {
Categories []struct {
Name string `toml:"name"`
Items []string `toml:"items"`
} `toml:"categories"`
Scores []int `toml:"scores"`
Valids []string `toml:"valids"`
} `toml:"order"`
}
func loadConfig() (config, error) {
cfg := config{}
if _, err := os.Stat("config.toml"); err == nil {
if _, err := toml.DecodeFile("config.toml", &cfg); err != nil {
return config{}, err
}
} else {
log.Println("未找到配置文件,使用环境变量")
}
return cfg, nil
}
func randomOrder(cfg *config) string {
categoryToNames := make(map[string][]string)
for _, c := range cfg.Order.Categories {
categoryToNames[c.Name] = c.Items
}
scores := cfg.Order.Scores
if len(scores) == 0 {
scores = []int{80, 90, 100}
}
valids := cfg.Order.Valids
if len(valids) == 0 {
valids = []string{"Y", "N"}
}
categories := make([]string, 0, len(categoryToNames))
for k := range categoryToNames {
categories = append(categories, k)
}
category := categories[rand.Intn(len(categories))]
names := categoryToNames[category]
name := names[rand.Intn(len(names))]
quantity := rand.Intn(10) + 1
date := time.Now().Format("2006-01-02")
score := strconv.Itoa(scores[rand.Intn(len(scores))])
valid := valids[rand.Intn(len(valids))]
fields := []string{category, name, fmt.Sprintf("%d", quantity), date, score, valid}
return strings.Join(fields, "\t")
}
func main() {
// 加载配置
cfg, err := loadConfig()
if err != nil {
log.Fatalf("加载配置失败: %v", err)
}
// 优先使用配置文件中的值,否则使用环境变量
brokerEnv := os.Getenv("KAFKA_BROKERS")
if brokerEnv == "" {
brokerEnv = "localhost:9092" // 默认值
} else {
brokerEnv = strings.Join(cfg.Kafka.Brokers, ",")
}
brokers := strings.Split(brokerEnv, ",")
topic := os.Getenv("KAFKA_TOPIC")
if topic == "" {
topic = "orders" // 默认topic
} else {
topic = cfg.Kafka.Topic
}
sendIntervalStr := os.Getenv("SEND_INTERVAL")
sendInterval := 1 * time.Second
if sendIntervalStr != "" {
if dur, err := time.ParseDuration(sendIntervalStr); err == nil {
sendInterval = dur
} else {
sendInterval := 1 * time.Second
if cfg.Kafka.SendInterval != "" {
var err error
sendInterval, err = time.ParseDuration(cfg.Kafka.SendInterval)
if err != nil {
log.Fatalf("无效的发送间隔: %v", err)
}
}
ticker := time.NewTicker(sendInterval)
defer ticker.Stop()
}
} else {
if cfg.Kafka.SendInterval != "" {
var err error
sendInterval, err = time.ParseDuration(cfg.Kafka.SendInterval)
if err != nil {
log.Fatalf("无效的发送间隔: %v", err)
}
}
}
log.Printf("启动订单生产者: brokers=%v, topic=%s, 发送间隔=%v", brokers, topic, sendInterval)
// Kafka生产者配置
// Kafka高级配置
config := sarama.NewConfig()
config.Producer.Return.Successes = true
// 从环境变量获取Kafka版本
kafkaVersion := os.Getenv("KAFKA_VERSION")
if kafkaVersion == "" {
config.Version = sarama.V0_10_0_0 // 默认版本
} else {
version, err := sarama.ParseKafkaVersion(kafkaVersion)
if err != nil {
log.Fatalf("无效的Kafka版本: %v", err)
}
config.Version = version
}
// 配置重试策略
retryMaxStr := os.Getenv("KAFKA_RETRY_MAX")
retryMax := 3
if n, err := strconv.Atoi(retryMaxStr); err == nil && n > 0 {
retryMax = n
} else {
retryMax = cfg.Kafka.RetryMax
}
retryBackoffStr := os.Getenv("KAFKA_RETRY_BACKOFF_MS")
retryBackoff := 100 * time.Millisecond
if ms, err := strconv.Atoi(retryBackoffStr); err == nil && ms > 0 {
retryBackoff = time.Duration(ms) * time.Millisecond
} else {
retryBackoff = time.Duration(cfg.Kafka.RetryBackoffMs) * time.Millisecond
}
config.Producer.Retry.Max = retryMax
config.Producer.Retry.Backoff = retryBackoff
producer, err := sarama.NewSyncProducer(brokers, config)
if err != nil {
log.Fatalf("创建Kafka生产者失败: %v", err)
}
defer producer.Close()
// 优雅终止处理
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
// 主循环
for {
select {
case <-ctx.Done():
log.Println("接收到终止信号,关闭生产者...")
return
default:
order := randomOrder(&cfg)
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(order),
}
// 发送消息依赖sarama内置重试机制
partition, offset, err := producer.SendMessage(msg)
if err != nil {
log.Printf("发送失败: topic=%s, 数据=%s, 错误=%v", topic, order, err)
} else {
log.Printf("发送成功: 分区=%d, 偏移量=%d, 数据=%s", partition, offset, order)
}
time.Sleep(sendInterval)
}
}
}