80 lines
1.8 KiB
Go
80 lines
1.8 KiB
Go
package kafka
|
|
|
|
// import (
|
|
// "context"
|
|
// "encoding/json"
|
|
// "log"
|
|
|
|
// "github.com/segmentio/kafka-go"
|
|
// )
|
|
|
|
// type WalletConsumer struct {
|
|
// reader *kafka.Reader
|
|
// hub *ws.NotificationHub
|
|
// topic string
|
|
// groupID string
|
|
// brokers []string
|
|
// }
|
|
|
|
// func NewWalletConsumer(brokers []string, topic, groupID string, hub *ws.NotificationHub) *WalletConsumer {
|
|
// return &WalletConsumer{
|
|
// brokers: brokers,
|
|
// topic: topic,
|
|
// groupID: groupID,
|
|
// hub: hub,
|
|
// reader: kafka.NewReader(kafka.ReaderConfig{
|
|
// Brokers: brokers,
|
|
// GroupID: groupID,
|
|
// Topic: topic,
|
|
// }),
|
|
// }
|
|
// }
|
|
|
|
// func (c *WalletConsumer) Start(ctx context.Context) {
|
|
// go func() {
|
|
// for {
|
|
// m, err := c.reader.ReadMessage(ctx)
|
|
// if err != nil {
|
|
// log.Printf("Error reading wallet Kafka message: %v", err)
|
|
// continue
|
|
// }
|
|
|
|
// var evt event.WalletEvent
|
|
// if err := json.Unmarshal(m.Value, &evt); err != nil {
|
|
// log.Printf("Failed to unmarshal wallet event: %v", err)
|
|
// continue
|
|
// }
|
|
|
|
// payload := map[string]interface{}{
|
|
// "type": evt.EventType,
|
|
// "wallet_id": evt.WalletID,
|
|
// "user_id": evt.UserID,
|
|
// "balance": evt.Balance,
|
|
// "wallet_type": evt.WalletType,
|
|
// "trigger": evt.Trigger,
|
|
// "recipient_id": evt.UserID,
|
|
// }
|
|
|
|
// // Broadcast to appropriate WebSocket clients
|
|
// c.hub.Broadcast <- payload
|
|
// }
|
|
// }()
|
|
// }
|
|
|
|
// func (c *WalletConsumer) Shutdown() error {
|
|
// return c.reader.Close()
|
|
// }
|
|
|
|
// func (h *ws.NotificationHub) Shutdown() {
|
|
// close(h.Register)
|
|
// close(h.Unregister)
|
|
// close(h.Broadcast)
|
|
|
|
// h.mu.Lock()
|
|
// defer h.mu.Unlock()
|
|
// for client := range h.Clients {
|
|
// client.Conn.Close()
|
|
// delete(h.Clients, client)
|
|
// }
|
|
// }
|