207 lines
5.1 KiB
Go
207 lines
5.1 KiB
Go
package main
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"log"
|
||
"math/rand"
|
||
"os"
|
||
"os/signal"
|
||
"strconv"
|
||
"strings"
|
||
"syscall"
|
||
"time"
|
||
|
||
"github.com/BurntSushi/toml"
|
||
"github.com/Shopify/sarama"
|
||
)
|
||
|
||
// 配置结构体
|
||
type config struct {
|
||
Kafka struct {
|
||
Brokers []string `toml:"brokers"`
|
||
Topic string `toml:"topic"`
|
||
SendInterval string `toml:"send_interval"`
|
||
RetryMax int `toml:"retry_max"`
|
||
RetryBackoffMs int `toml:"retry_backoff_ms"`
|
||
Version string `toml:"version"`
|
||
} `toml:"kafka"`
|
||
|
||
Order struct {
|
||
Categories []struct {
|
||
Name string `toml:"name"`
|
||
Items []string `toml:"items"`
|
||
} `toml:"categories"`
|
||
Scores []int `toml:"scores"`
|
||
Valids []string `toml:"valids"`
|
||
} `toml:"order"`
|
||
}
|
||
|
||
func loadConfig() (config, error) {
|
||
cfg := config{}
|
||
if _, err := os.Stat("config.toml"); err == nil {
|
||
if _, err := toml.DecodeFile("config.toml", &cfg); err != nil {
|
||
return config{}, err
|
||
}
|
||
} else {
|
||
log.Println("未找到配置文件,使用环境变量")
|
||
}
|
||
return cfg, nil
|
||
}
|
||
|
||
func randomOrder(cfg *config) string {
|
||
categoryToNames := make(map[string][]string)
|
||
for _, c := range cfg.Order.Categories {
|
||
categoryToNames[c.Name] = c.Items
|
||
}
|
||
scores := cfg.Order.Scores
|
||
if len(scores) == 0 {
|
||
scores = []int{80, 90, 100}
|
||
}
|
||
|
||
valids := cfg.Order.Valids
|
||
if len(valids) == 0 {
|
||
valids = []string{"Y", "N"}
|
||
}
|
||
|
||
categories := make([]string, 0, len(categoryToNames))
|
||
for k := range categoryToNames {
|
||
categories = append(categories, k)
|
||
}
|
||
category := categories[rand.Intn(len(categories))]
|
||
names := categoryToNames[category]
|
||
name := names[rand.Intn(len(names))]
|
||
quantity := rand.Intn(10) + 1
|
||
date := time.Now().Format("2006-01-02")
|
||
score := strconv.Itoa(scores[rand.Intn(len(scores))])
|
||
valid := valids[rand.Intn(len(valids))]
|
||
|
||
fields := []string{category, name, fmt.Sprintf("%d", quantity), date, score, valid}
|
||
return strings.Join(fields, "\t")
|
||
}
|
||
|
||
func main() {
|
||
// 加载配置
|
||
cfg, err := loadConfig()
|
||
if err != nil {
|
||
log.Fatalf("加载配置失败: %v", err)
|
||
}
|
||
|
||
// 优先使用配置文件中的值,否则使用环境变量
|
||
brokerEnv := os.Getenv("KAFKA_BROKERS")
|
||
if brokerEnv == "" {
|
||
brokerEnv = "localhost:9092" // 默认值
|
||
} else {
|
||
brokerEnv = strings.Join(cfg.Kafka.Brokers, ",")
|
||
}
|
||
brokers := strings.Split(brokerEnv, ",")
|
||
|
||
topic := os.Getenv("KAFKA_TOPIC")
|
||
if topic == "" {
|
||
topic = "orders" // 默认topic
|
||
} else {
|
||
topic = cfg.Kafka.Topic
|
||
}
|
||
|
||
sendIntervalStr := os.Getenv("SEND_INTERVAL")
|
||
sendInterval := 1 * time.Second
|
||
if sendIntervalStr != "" {
|
||
if dur, err := time.ParseDuration(sendIntervalStr); err == nil {
|
||
sendInterval = dur
|
||
} else {
|
||
sendInterval := 1 * time.Second
|
||
if cfg.Kafka.SendInterval != "" {
|
||
var err error
|
||
sendInterval, err = time.ParseDuration(cfg.Kafka.SendInterval)
|
||
if err != nil {
|
||
log.Fatalf("无效的发送间隔: %v", err)
|
||
}
|
||
}
|
||
ticker := time.NewTicker(sendInterval)
|
||
defer ticker.Stop()
|
||
}
|
||
} else {
|
||
if cfg.Kafka.SendInterval != "" {
|
||
var err error
|
||
sendInterval, err = time.ParseDuration(cfg.Kafka.SendInterval)
|
||
if err != nil {
|
||
log.Fatalf("无效的发送间隔: %v", err)
|
||
}
|
||
}
|
||
}
|
||
|
||
log.Printf("启动订单生产者: brokers=%v, topic=%s, 发送间隔=%v", brokers, topic, sendInterval)
|
||
|
||
// Kafka生产者配置
|
||
// Kafka高级配置
|
||
config := sarama.NewConfig()
|
||
config.Producer.Return.Successes = true
|
||
|
||
// 从环境变量获取Kafka版本
|
||
kafkaVersion := os.Getenv("KAFKA_VERSION")
|
||
if kafkaVersion == "" {
|
||
config.Version = sarama.V0_10_0_0 // 默认版本
|
||
} else {
|
||
version, err := sarama.ParseKafkaVersion(kafkaVersion)
|
||
if err != nil {
|
||
log.Fatalf("无效的Kafka版本: %v", err)
|
||
}
|
||
config.Version = version
|
||
}
|
||
|
||
// 配置重试策略
|
||
retryMaxStr := os.Getenv("KAFKA_RETRY_MAX")
|
||
retryMax := 3
|
||
if n, err := strconv.Atoi(retryMaxStr); err == nil && n > 0 {
|
||
retryMax = n
|
||
} else {
|
||
retryMax = cfg.Kafka.RetryMax
|
||
}
|
||
|
||
retryBackoffStr := os.Getenv("KAFKA_RETRY_BACKOFF_MS")
|
||
retryBackoff := 100 * time.Millisecond
|
||
if ms, err := strconv.Atoi(retryBackoffStr); err == nil && ms > 0 {
|
||
retryBackoff = time.Duration(ms) * time.Millisecond
|
||
} else {
|
||
retryBackoff = time.Duration(cfg.Kafka.RetryBackoffMs) * time.Millisecond
|
||
}
|
||
|
||
config.Producer.Retry.Max = retryMax
|
||
config.Producer.Retry.Backoff = retryBackoff
|
||
|
||
producer, err := sarama.NewSyncProducer(brokers, config)
|
||
if err != nil {
|
||
log.Fatalf("创建Kafka生产者失败: %v", err)
|
||
}
|
||
defer producer.Close()
|
||
|
||
// 优雅终止处理
|
||
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
||
defer stop()
|
||
|
||
// 主循环
|
||
for {
|
||
select {
|
||
case <-ctx.Done():
|
||
log.Println("接收到终止信号,关闭生产者...")
|
||
return
|
||
default:
|
||
order := randomOrder(&cfg)
|
||
msg := &sarama.ProducerMessage{
|
||
Topic: topic,
|
||
Value: sarama.StringEncoder(order),
|
||
}
|
||
|
||
// 发送消息(依赖sarama内置重试机制)
|
||
partition, offset, err := producer.SendMessage(msg)
|
||
if err != nil {
|
||
log.Printf("发送失败: topic=%s, 数据=%s, 错误=%v", topic, order, err)
|
||
} else {
|
||
log.Printf("发送成功: 分区=%d, 偏移量=%d, 数据=%s", partition, offset, order)
|
||
}
|
||
|
||
time.Sleep(sendInterval)
|
||
}
|
||
}
|
||
}
|