package kafka import ( "context" "encoding/json" "log" "github.com/SamuelTariku/FortuneBet-Backend/internal/event" "github.com/SamuelTariku/FortuneBet-Backend/internal/web_server/ws" "github.com/segmentio/kafka-go" ) type WalletConsumer struct { reader *kafka.Reader hub *ws.NotificationHub topic string groupID string brokers []string } func NewWalletConsumer(brokers []string, topic, groupID string, hub *ws.NotificationHub) *WalletConsumer { return &WalletConsumer{ brokers: brokers, topic: topic, groupID: groupID, hub: hub, reader: kafka.NewReader(kafka.ReaderConfig{ Brokers: brokers, GroupID: groupID, Topic: topic, }), } } func (c *WalletConsumer) Start(ctx context.Context) { go func() { for { m, err := c.reader.ReadMessage(ctx) if err != nil { log.Printf("Error reading wallet Kafka message: %v", err) continue } var evt event.WalletEvent if err := json.Unmarshal(m.Value, &evt); err != nil { log.Printf("Failed to unmarshal wallet event: %v", err) continue } payload := map[string]interface{}{ "type": evt.EventType, "wallet_id": evt.WalletID, "user_id": evt.UserID, "balance": evt.Balance, "trigger": evt.Trigger, "recipient_id": evt.UserID, } // Broadcast to appropriate WebSocket clients c.hub.Broadcast <- payload } }() } // func (c *WalletConsumer) Shutdown() error { // return c.reader.Close() // } // func (h *ws.NotificationHub) Shutdown() { // close(h.Register) // close(h.Unregister) // close(h.Broadcast) // h.mu.Lock() // defer h.mu.Unlock() // for client := range h.Clients { // client.Conn.Close() // delete(h.Clients, client) // } // }