Yimaru-BackEnd/internal/services/kafka/consumer.go

82 lines
1.9 KiB
Go

package kafka
// import (
// "context"
// "encoding/json"
// "log"
// "github.com/SamuelTariku/FortuneBet-Backend/internal/event"
// "github.com/SamuelTariku/FortuneBet-Backend/internal/web_server/ws"
// "github.com/segmentio/kafka-go"
// )
// type WalletConsumer struct {
// reader *kafka.Reader
// hub *ws.NotificationHub
// topic string
// groupID string
// brokers []string
// }
// func NewWalletConsumer(brokers []string, topic, groupID string, hub *ws.NotificationHub) *WalletConsumer {
// return &WalletConsumer{
// brokers: brokers,
// topic: topic,
// groupID: groupID,
// hub: hub,
// reader: kafka.NewReader(kafka.ReaderConfig{
// Brokers: brokers,
// GroupID: groupID,
// Topic: topic,
// }),
// }
// }
// func (c *WalletConsumer) Start(ctx context.Context) {
// go func() {
// for {
// m, err := c.reader.ReadMessage(ctx)
// if err != nil {
// log.Printf("Error reading wallet Kafka message: %v", err)
// continue
// }
// var evt event.WalletEvent
// if err := json.Unmarshal(m.Value, &evt); err != nil {
// log.Printf("Failed to unmarshal wallet event: %v", err)
// continue
// }
// payload := map[string]interface{}{
// "type": evt.EventType,
// "wallet_id": evt.WalletID,
// "user_id": evt.UserID,
// "balance": evt.Balance,
// "wallet_type": evt.WalletType,
// "trigger": evt.Trigger,
// "recipient_id": evt.UserID,
// }
// // Broadcast to appropriate WebSocket clients
// c.hub.Broadcast <- payload
// }
// }()
// }
// func (c *WalletConsumer) Shutdown() error {
// return c.reader.Close()
// }
// func (h *ws.NotificationHub) Shutdown() {
// close(h.Register)
// close(h.Unregister)
// close(h.Broadcast)
// h.mu.Lock()
// defer h.mu.Unlock()
// for client := range h.Clients {
// client.Conn.Close()
// delete(h.Clients, client)
// }
// }