Yimaru-BackEnd/internal/services/kafka/consumer.go
Samuel Tariku e49ff366d5 feat: Implement wallet notification system and refactor related services
- Added new notification handling in the wallet service to notify admins when wallet balances are low or insufficient.
- Created a new file for wallet notifications and moved relevant functions from the wallet service to this new file.
- Updated the wallet service to publish wallet events including wallet type.
- Refactored the client code to improve readability and maintainability.
- Enhanced the bet handler to support pagination and status filtering for bets.
- Updated routes and handlers for user search functionality to improve clarity and organization.
- Modified cron job scheduling to comment out unused jobs for clarity.
- Updated the WebSocket broadcast to include wallet type in notifications.
- Adjusted the makefile to include Kafka in the docker-compose setup for local development.
2025-09-25 21:26:24 +03:00

82 lines
1.7 KiB
Go

package kafka
import (
"context"
"encoding/json"
"log"
"github.com/SamuelTariku/FortuneBet-Backend/internal/event"
"github.com/SamuelTariku/FortuneBet-Backend/internal/web_server/ws"
"github.com/segmentio/kafka-go"
)
type WalletConsumer struct {
reader *kafka.Reader
hub *ws.NotificationHub
topic string
groupID string
brokers []string
}
func NewWalletConsumer(brokers []string, topic, groupID string, hub *ws.NotificationHub) *WalletConsumer {
return &WalletConsumer{
brokers: brokers,
topic: topic,
groupID: groupID,
hub: hub,
reader: kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
GroupID: groupID,
Topic: topic,
}),
}
}
func (c *WalletConsumer) Start(ctx context.Context) {
go func() {
for {
m, err := c.reader.ReadMessage(ctx)
if err != nil {
log.Printf("Error reading wallet Kafka message: %v", err)
continue
}
var evt event.WalletEvent
if err := json.Unmarshal(m.Value, &evt); err != nil {
log.Printf("Failed to unmarshal wallet event: %v", err)
continue
}
payload := map[string]interface{}{
"type": evt.EventType,
"wallet_id": evt.WalletID,
"user_id": evt.UserID,
"balance": evt.Balance,
"wallet_type": evt.WalletType,
"trigger": evt.Trigger,
"recipient_id": evt.UserID,
}
// Broadcast to appropriate WebSocket clients
c.hub.Broadcast <- payload
}
}()
}
// func (c *WalletConsumer) Shutdown() error {
// return c.reader.Close()
// }
// func (h *ws.NotificationHub) Shutdown() {
// close(h.Register)
// close(h.Unregister)
// close(h.Broadcast)
// h.mu.Lock()
// defer h.mu.Unlock()
// for client := range h.Clients {
// client.Conn.Close()
// delete(h.Clients, client)
// }
// }