核心改造思路抽象通用消息结构体定义基础消息结构支持不同业务类型如上传、Prompt 任务、SQL 任务多 Topic 支持允许动态指定 Topic而非固定使用配置文件的 Topic通用发送方法剥离具体业务如 UploadMsg提供通用的SendMessage方法适配任意消息类型保留原有能力兼容你现有的SendUploadMsg方法确保老功能不中断可扩展设计新增消息类型时只需定义结构体无需修改核心发送逻辑。改造后的完整代码go运行package kafka import ( context encoding/json os os/signal strconv strings sync syscall time hooyee/app/services/util github.com/goravel/framework/facades github.com/segmentio/kafka-go ) // -------------------------- 1. 抽象通用消息结构 -------------------------- // BaseMessage 所有Kafka消息的基础结构体可扩展 type BaseMessage struct { TaskID string json:task_id // 全局唯一任务ID MsgType string json:msg_type // 消息类型upload/prompt/sql等 BusinessID string json:business_id // 业务ID如文件ID/供应商ID Data interface{} json:data // 具体业务数据不同类型消息的自定义内容 CreateTime string json:create_time // 创建时间 Ext map[string]interface{} json:ext // 扩展字段预留 } // UploadMessage 原有文件上传消息继承BaseMessage的Data字段 type UploadMessage struct { FileName string json:file_name // 文件名 FileSize int64 json:file_size // 文件大小字节 FileType string json:mime_type // 文件MIME类型 FilePath string json:file_path // 文件路径 BusType int json:bus_type // 业务类型 } // PromptTaskMessage Prompt任务消息新增 type PromptTaskMessage struct { Config []map[string]interface{} json:配置 // 配置数组 TestTimes int json:测试次数 // 测试次数 Prompt string json:prompt // Prompt内容 } // -------------------------- 2. 核心KafkaService改造 -------------------------- // KafkaService Kafka生产/消费服务单例支持多类型消息 type KafkaService struct { writer *kafka.Writer // 基础writer默认Topic reader *kafka.Reader // 基础reader默认Topic defaultTopic string // 默认Topic broker string // Broker地址 partitions int // 默认分区数 replication int // 默认副本数 writerMap map[string]*kafka.Writer // 多Topic的writer缓存避免重复创建 mu sync.RWMutex // 读写锁保护writerMap } // 包级单例 var ( instance *KafkaService once sync.Once ) // NewKafkaService 获取KafkaService单例 func NewKafkaService() *KafkaService { once.Do(func() { instance KafkaService{ writerMap: make(map[string]*kafka.Writer), // 初始化多Topic writer缓存 } // 初始化配置包含兜底和合法性校验 instance.initConfig() // 优先创建默认Topic logUtil : util.NewLogUtil() if success, msg : instance.CreateTopic(instance.defaultTopic, 0, 0); !success { logUtil.Error(应用启动时创建默认Topic失败, map[string]any{error: msg}, nil, 1) } else { logUtil.Info(应用启动时默认Topic检查完成, map[string]any{message: msg, topic: instance.defaultTopic}) time.Sleep(1 * time.Second) // 等待元数据同步 } // 初始化默认生产者和消费者 instance.initDefaultWriter() instance.initDefaultReader() // 注册优雅关闭钩子 instance.registerShutdownHook() }) return instance } // initConfig 初始化基础配置兼容原有逻辑 func (s *KafkaService) initConfig() { // 读取基础配置 s.broker facades.Config().GetString(kafka.brokers) s.defaultTopic facades.Config().GetString(kafka.topic) // 读取Topic配置 s.partitions facades.Config().GetInt(kafka.topic_partitions, 3) s.replication facades.Config().GetInt(kafka.topic_replication, 1) // 安全校验 if s.partitions 0 { s.partitions 3 } if s.replication 0 { s.replication 1 } // 兜底配置 if s.broker { s.broker 127.0.0.1:9092 } if s.defaultTopic { s.defaultTopic default_file_upload_topic } // 检查__consumer_offsets主题 s.checkAndCreateConsumerOffsetsTopic() } // -------------------------- 3. 多Topic Writer管理 -------------------------- // getWriter 获取指定Topic的Writer缓存机制避免重复创建 func (s *KafkaService) getWriter(topic string) *kafka.Writer { // 空值则使用默认Topic if topic { topic s.defaultTopic } // 读锁检查缓存 s.mu.RLock() if writer, exists : s.writerMap[topic]; exists { s.mu.RUnlock() return writer } s.mu.RUnlock() // 写锁创建新writer s.mu.Lock() defer s.mu.Unlock() // 双重检查防止并发创建 if writer, exists : s.writerMap[topic]; exists { return writer } // 拆分broker列表 var brokers []string if s.broker ! { brokers strings.Split(s.broker, ,) } else { brokers []string{127.0.0.1:9092} } // 创建新writer writer : kafka.Writer{ Addr: kafka.TCP(brokers...), Topic: topic, Balancer: kafka.LeastBytes{}, WriteTimeout: 5 * time.Second, ReadTimeout: 5 * time.Second, BatchTimeout: 1 * time.Second, RequiredAcks: kafka.RequireOne, } // 加入缓存 s.writerMap[topic] writer logUtil : util.NewLogUtil() logUtil.Info(创建新的Kafka Writer, map[string]any{topic: topic, brokers: brokers}) return writer } // initDefaultWriter 初始化默认Topic的Writer兼容原有逻辑 func (s *KafkaService) initDefaultWriter() { s.writer s.getWriter(s.defaultTopic) } // initDefaultReader 初始化默认Topic的Reader兼容原有逻辑 func (s *KafkaService) initDefaultReader() { logUtil : util.NewLogUtil() groupID : facades.Config().GetString(kafka.group_id) if groupID { groupID default_file_consumer_group } minBytes : facades.Config().GetInt(kafka.min_bytes, 1024) maxBytes : facades.Config().GetInt(kafka.max_bytes, 10240) maxWait : time.Duration(facades.Config().GetInt(kafka.max_wait_seconds, 5)) * time.Second readBackoffMin : time.Duration(facades.Config().GetInt(kafka.read_backoff_min_seconds, 1)) * time.Second readBackoffMax : time.Duration(facades.Config().GetInt(kafka.read_backoff_max_seconds, 10)) * time.Second var brokers []string if s.broker ! { brokers strings.Split(s.broker, ,) } else { brokers []string{127.0.0.1:9092} } s.reader kafka.NewReader(kafka.ReaderConfig{ Brokers: brokers, Topic: s.defaultTopic, GroupID: groupID, MinBytes: int(minBytes), MaxBytes: int(maxBytes), MaxWait: maxWait, ReadBackoffMin: readBackoffMin, ReadBackoffMax: readBackoffMax, CommitInterval: 2 * time.Second, StartOffset: kafka.FirstOffset, }) logUtil.Info( Kafka Reader 初始化成功, map[string]any{brokers: brokers, groupID: groupID, topic: s.defaultTopic}, ) } // -------------------------- 4. 通用消息发送方法核心 -------------------------- // SendMessage 通用消息发送方法支持任意类型消息、任意Topic // 参数说明 // - msgType: 消息类型upload/prompt/sql等 // - taskID: 全局唯一任务ID // - businessID: 业务ID如文件ID/供应商ID // - data: 具体业务数据UploadMessage/PromptTaskMessage等 // - topic: 目标Topic空则使用默认Topic // - msgKey: 消息Key空则自动生成 func (s *KafkaService) SendMessage(msgType, taskID, businessID string, data interface{}, topic, msgKey string) bool { logUtil : util.NewLogUtil() // 1. 参数校验 if msgType || taskID || data nil { logUtil.Error(发送消息失败必填参数为空, map[string]any{ msgType: msgType, taskID: taskID, businessID: businessID, topic: topic, }, nil, 1) return false } // 2. 构建基础消息结构 baseMsg : BaseMessage{ TaskID: taskID, MsgType: msgType, BusinessID: businessID, Data: data, CreateTime: time.Now().Format(2006-01-02 15:04:05), Ext: make(map[string]interface{}), } // 3. 序列化消息 msgBytes, err : json.Marshal(baseMsg) if err ! nil { logUtil.Error(序列化消息失败, map[string]any{ taskID: taskID, err: err.Error(), data: data, }, err, 1) return false } // 4. 自动生成消息Key如果为空 if msgKey { msgKey msgType _ taskID } // 5. 确保Topic存在 targetTopic : topic if targetTopic { targetTopic s.defaultTopic } if success, _ : s.CreateTopic(targetTopic, 0, 0); !success { logUtil.Error(创建Topic失败无法发送消息, map[string]any{topic: targetTopic}, nil, 1) return false } // 6. 获取对应Topic的Writer writer : s.getWriter(targetTopic) if writer nil { logUtil.Error(获取Kafka Writer失败, map[string]any{topic: targetTopic}, nil, 1) return false } // 7. 发送消息带重试逻辑 err s.writeWithRetry(writer, msgKey, msgBytes, targetTopic) if err nil { logUtil.Info(Kafka消息发送成功, map[string]any{ taskID: taskID, msgType: msgType, topic: targetTopic, key: msgKey, }) return true } // 8. 最终失败记录 logUtil.Error(发送Kafka消息失败最终, map[string]any{ taskID: taskID, msgType: msgType, topic: targetTopic, key: msgKey, err: err.Error(), }, err, 1) return false } // writeWithRetry 消息发送重试逻辑抽离复用 func (s *KafkaService) writeWithRetry(writer *kafka.Writer, msgKey string, msgBytes []byte, topic string) error { logUtil : util.NewLogUtil() // 第一次发送 err : writer.WriteMessages(context.Background(), kafka.Message{ Key: []byte(msgKey), Value: msgBytes, }) if err nil { return nil } // 打印原始错误 logUtil.Warning(消息发送失败原始错误, map[string]any{err: err.Error(), key: msgKey, topic: topic}, nil) // 判断是否需要重试Topic不存在/协调器不可用 if strings.Contains(strings.ToLower(err.Error()), unknown topic or partition) || strings.Contains(err.Error(), [3]) || strings.Contains(err.Error(), Group Coordinator Not Available) { logUtil.Warning(尝试重新创建Topic并重试, map[string]any{topic: topic}, nil) // 重新创建Topic if success, _ : s.CreateTopic(topic, 0, 0); success { time.Sleep(2 * time.Second) // 等待元数据同步 // 重试发送 err writer.WriteMessages(context.Background(), kafka.Message{ Key: []byte(msgKey), Value: msgBytes, }) if err nil { return nil } logUtil.Error(Topic创建成功但重试发送失败, map[string]any{topic: topic, err: err.Error()}, err, 1) } else { logUtil.Error(创建Topic失败无法重试, map[string]any{topic: topic}, nil, 1) } } return err } // -------------------------- 5. 兼容原有方法 新增Prompt任务发送方法 -------------------------- // SendUploadMsg 兼容原有文件上传消息发送不修改老逻辑 func (s *KafkaService) SendUploadMsg(msg UploadMessage) bool { logUtil : util.NewLogUtil() if s.writer nil { logUtil.Error(Kafka Writer 未初始化, nil, nil, 1) return false } // 序列化原有消息结构 msgBytes, err : json.Marshal(msg) if err ! nil { logUtil.Error( 序列化上传消息失败, map[string]any{msg: msg, err: err.Error()}, err, 1, ) return false } // 生成原有Key规则 msgKey : msg.FileName _ strconv.Itoa(msg.BusType) // 调用通用重试逻辑 err s.writeWithRetry(s.writer, msgKey, msgBytes, s.defaultTopic) if err nil { logUtil.Info(Kafka上传消息发送成功, map[string]any{msg: msg, key: msgKey}) return true } logUtil.Error(发送上传消息失败, map[string]any{msg: msg, err: err.Error(), key: msgKey}, err, 1) return false } // SendPromptTaskMsg 新增Prompt任务消息发送示例适配你的业务 // 参数taskID-任务IDbusinessID-供应商ID/业务IDpromptMsg-Prompt任务数据topic-目标Topic空则用默认 func (s *KafkaService) SendPromptTaskMsg(taskID, businessID string, promptMsg PromptTaskMessage, topic string) bool { return s.SendMessage(prompt, taskID, businessID, promptMsg, topic, ) } // -------------------------- 6. 原有核心方法保留仅微调 -------------------------- // checkAndCreateConsumerOffsetsTopic 检查并创建__consumer_offsets核心主题保留原有逻辑 func (s *KafkaService) checkAndCreateConsumerOffsetsTopic() { logUtil : util.NewLogUtil() brokerList : strings.Split(s.broker, ,) if len(brokerList) 0 { logUtil.Warning(broker地址为空跳过__consumer_offsets检查, nil, nil) return } conn, err : kafka.DialContext(context.Background(), tcp, brokerList[0]) if err ! nil { logUtil.Warning(连接Kafka失败跳过__consumer_offsets检查, nil, err) return } defer conn.Close() topics, err : conn.ReadPartitions() if err ! nil { logUtil.Warning(读取Topic列表失败跳过__consumer_offsets检查, nil, err) return } for _, p : range topics { if p.Topic __consumer_offsets { return } } logUtil.Info(__consumer_offsets主题不存在开始创建, nil) topicConfig : kafka.TopicConfig{ Topic: __consumer_offsets, NumPartitions: 50, ReplicationFactor: 1, ConfigEntries: []kafka.ConfigEntry{ {ConfigName: cleanup.policy, ConfigValue: compact}, {ConfigName: segment.bytes, ConfigValue: 104857600}, }, } ctx, cancel : context.WithTimeout(context.Background(), 15*time.Second) defer cancel() deadline, _ : ctx.Deadline() _ conn.SetDeadline(deadline) if err : conn.CreateTopics(topicConfig); err ! nil { logUtil.Warning(创建__consumer_offsets主题失败非致命, nil, err) } else { logUtil.Info(__consumer_offsets主题创建成功, nil) } } // CreateTopic 创建Topic保留原有逻辑 func (s *KafkaService) CreateTopic(topicName string, partitions int, replication int) (bool, string) { logUtil : util.NewLogUtil() if topicName { topicName s.defaultTopic } if partitions 0 { partitions s.partitions } if replication 0 { replication s.replication } defer func() { if r : recover(); r ! nil { logUtil.Error(创建Topic时发生panic, map[string]any{panic: r, topic: topicName}, nil, 1) } }() if topicName || s.broker { errMsg : 创建Topic失败topic或broker配置为空 logUtil.Error(errMsg, map[string]any{topic: topicName, broker: s.broker}, nil, 1) return false, errMsg } brokerList : strings.Split(s.broker, ,) if len(brokerList) 0 { errMsg : 创建Topic失败broker地址解析为空 logUtil.Error(errMsg, map[string]any{broker: s.broker}, nil, 1) return false, errMsg } conn, err : kafka.DialContext(context.Background(), tcp, brokerList[0]) if err ! nil { errMsg : 连接Kafka Broker失败 err.Error() logUtil.Error(errMsg, map[string]any{broker: brokerList[0], err: err.Error()}, err, 1) return false, errMsg } defer conn.Close() topics, err : conn.ReadPartitions() if err ! nil { errMsg : 读取Topic列表失败 err.Error() logUtil.Error(errMsg, map[string]any{err: err.Error()}, err, 1) return false, errMsg } for _, p : range topics { if p.Topic topicName { successMsg : Topic已存在无需创建 topicName logUtil.Info(successMsg, map[string]any{topic: topicName}) return true, successMsg } } topicConfig : kafka.TopicConfig{ Topic: topicName, NumPartitions: partitions, ReplicationFactor: replication, } ctx, cancel : context.WithTimeout(context.Background(), 10*time.Second) defer cancel() deadline, ok : ctx.Deadline() if ok { _ conn.SetDeadline(deadline) } err conn.CreateTopics(topicConfig) if err ! nil { if strings.Contains(err.Error(), replication factor) || strings.Contains(err.Error(), RF) { logUtil.Warning(副本数超限自动降级为1, map[string]any{ topic: topicName, requested_replication: replication, }, err) topicConfig.ReplicationFactor 1 err conn.CreateTopics(topicConfig) if err ! nil { errMsg : 降级后创建Topic仍失败 err.Error() logUtil.Error(errMsg, map[string]any{topic: topicName, err: err.Error()}, err, 1) return false, errMsg } } else if strings.Contains(err.Error(), already exists) { successMsg : Topic已存在并发创建 topicName logUtil.Info(successMsg, map[string]any{topic: topicName}) return true, successMsg } else { errMsg : 创建Topic失败 err.Error() logUtil.Error(errMsg, map[string]any{topic: topicName, err: err.Error()}, err, 1) return false, errMsg } } successMsg : Topic创建成功 topicName logUtil.Info(successMsg, map[string]any{ topic: topicName, partitions: partitions, replication: topicConfig.ReplicationFactor, }) return true, successMsg } // registerShutdownHook 注册优雅关闭钩子增强关闭所有writer func (s *KafkaService) registerShutdownHook() { logUtil : util.NewLogUtil() ch : make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) go func() { -ch logUtil.Info(收到应用退出信号开始关闭Kafka组件..., nil) // 关闭默认writer if err : s.CloseWriter(); err ! nil { logUtil.Error(Kafka默认Writer关闭失败, map[string]any{err: err.Error()}, err, 1) } else { logUtil.Info(Kafka默认Writer关闭成功, nil) } // 关闭所有缓存的writer s.mu.Lock() for topic, writer : range s.writerMap { if err : writer.Close(); err ! nil { logUtil.Error(Kafka Writer关闭失败, map[string]any{topic: topic, err: err.Error()}, err, 1) } else { logUtil.Info(Kafka Writer关闭成功, map[string]any{topic: topic}) } } s.writerMap make(map[string]*kafka.Writer) // 清空缓存 s.mu.Unlock() // 关闭reader if err : s.CloseReader(); err ! nil { logUtil.Error(Kafka Reader关闭失败, map[string]any{err: err.Error()}, err, 1) } else { logUtil.Info(Kafka Reader关闭成功, nil) } os.Exit(0) }() } // -------------------------- 7. 关闭方法增强 -------------------------- // CloseWriter 关闭默认Writer func (s *KafkaService) CloseWriter() error { if s.writer ! nil { return s.writer.Close() } return nil } // CloseReader 关闭Reader func (s *KafkaService) CloseReader() error { if s.reader ! nil { return s.reader.Close() } return nil } // CloseAllWriters 关闭所有Writer新增 func (s *KafkaService) CloseAllWriters() { s.mu.Lock() defer s.mu.Unlock() for _, writer : range s.writerMap { _ writer.Close() } s.writerMap make(map[string]*kafka.Writer) } // GetReader 获取默认Reader func (s *KafkaService) GetReader() *kafka.Reader { return s.reader } // GetReaderByTopic 获取指定Topic的Reader新增支持多Topic消费 func (s *KafkaService) GetReaderByTopic(topic, groupID string) *kafka.Reader { if topic { topic s.defaultTopic } if groupID { groupID facades.Config().GetString(kafka.group_id, default_consumer_group) } var brokers []string if s.broker ! { brokers strings.Split(s.broker, ,) } else { brokers []string{127.0.0.1:9092} } return kafka.NewReader(kafka.ReaderConfig{ Brokers: brokers, Topic: topic, GroupID: groupID, MinBytes: 1024, MaxBytes: 10240, MaxWait: 5 * time.Second, ReadBackoffMin: 1 * time.Second, ReadBackoffMax: 10 * time.Second, CommitInterval: 2 * time.Second, StartOffset: kafka.FirstOffset, }) }核心改造亮点1. 通用化消息处理定义BaseMessage作为所有消息的基类包含MsgType区分业务类型Data字段承载具体业务数据新增SendMessage通用方法支持任意结构体作为Data无需为每种消息写单独的发送逻辑。2. 多 Topic 支持新增writerMap缓存不同 Topic 的 Writer避免重复创建提供getWriter方法自动获取 / 创建指定 Topic 的 Writer支持动态指定 Topic也可使用默认 Topic。3. 兼容性保障完全保留原有SendUploadMsg方法老代码无需修改原有CreateTopic、checkAndCreateConsumerOffsetsTopic等核心逻辑不变。4. 可扩展性新增消息类型只需两步go运行// 1. 定义新消息结构体 type SQLTaskMessage struct { SQL string json:sql DBName string json:db_name Timeout int json:timeout } // 2. 调用通用方法发送 kafkaService.SendMessage( sql, // 消息类型 task_123456, // 任务ID db_789, // 业务ID sqlMsg, // 具体业务数据 sql_task_topic, // 目标Topic , // 自动生成Key )5. 消费端适配示例消费时可根据MsgType解析不同的Datago运行// 消费消息示例 func ConsumeMsg(msg []byte) { var baseMsg BaseMessage _ json.Unmarshal(msg, baseMsg) switch baseMsg.MsgType { case upload: var uploadMsg UploadMessage _ json.Unmarshal([]byte(baseMsg.Data.(string)), uploadMsg) // 处理上传消息 case prompt: var promptMsg PromptTaskMessage _ json.Unmarshal([]byte(baseMsg.Data.(string)), promptMsg) // 处理Prompt任务 case sql: var sqlMsg SQLTaskMessage _ json.Unmarshal([]byte(baseMsg.Data.(string)), sqlMsg) // 处理SQL任务 } }总结核心变化从「仅支持上传消息」改为「支持任意类型消息」通过抽象基类 通用方法实现性能优化多 Topic Writer 缓存避免重复创建连接兼容性原有方法完全保留无破坏性修改可扩展新增消息类型只需定义结构体调用通用发送方法即可稳定性保留原有自动创建 Topic、重试、优雅关闭等核心能力。你可以直接替换原有代码老业务文件上传无需改动新业务如 Prompt 任务调用SendPromptTaskMsg或通用SendMessage方法即可。