package kafka // import ( // "context" // "encoding/json" // "log" // "github.com/segmentio/kafka-go" // ) // type WalletConsumer struct { // reader *kafka.Reader // hub *ws.NotificationHub // topic string // groupID string // brokers []string // } // func NewWalletConsumer(brokers []string, topic, groupID string, hub *ws.NotificationHub) *WalletConsumer { // return &WalletConsumer{ // brokers: brokers, // topic: topic, // groupID: groupID, // hub: hub, // reader: kafka.NewReader(kafka.ReaderConfig{ // Brokers: brokers, // GroupID: groupID, // Topic: topic, // }), // } // } // func (c *WalletConsumer) Start(ctx context.Context) { // go func() { // for { // m, err := c.reader.ReadMessage(ctx) // if err != nil { // log.Printf("Error reading wallet Kafka message: %v", err) // continue // } // var evt event.WalletEvent // if err := json.Unmarshal(m.Value, &evt); err != nil { // log.Printf("Failed to unmarshal wallet event: %v", err) // continue // } // payload := map[string]interface{}{ // "type": evt.EventType, // "wallet_id": evt.WalletID, // "user_id": evt.UserID, // "balance": evt.Balance, // "wallet_type": evt.WalletType, // "trigger": evt.Trigger, // "recipient_id": evt.UserID, // } // // Broadcast to appropriate WebSocket clients // c.hub.Broadcast <- payload // } // }() // } // func (c *WalletConsumer) Shutdown() error { // return c.reader.Close() // } // func (h *ws.NotificationHub) Shutdown() { // close(h.Register) // close(h.Unregister) // close(h.Broadcast) // h.mu.Lock() // defer h.mu.Unlock() // for client := range h.Clients { // client.Conn.Close() // delete(h.Clients, client) // } // }