Yimaru-BackEnd/internal/services/kafka/producer.go

37 lines
624 B
Go

package kafka
import (
"context"
"encoding/json"
"time"
"github.com/segmentio/kafka-go"
)
type Producer struct {
writer *kafka.Writer
}
func NewProducer(brokers []string, topic string) *Producer {
return &Producer{
writer: &kafka.Writer{
Addr: kafka.TCP(brokers...),
Topic: topic,
Balancer: &kafka.LeastBytes{},
},
}
}
func (p *Producer) Publish(ctx context.Context, key string, event any) error {
msgBytes, err := json.Marshal(event)
if err != nil {
return err
}
return p.writer.WriteMessages(ctx, kafka.Message{
Key: []byte(key),
Value: msgBytes,
Time: time.Now(),
})
}