662 lines
21 KiB
Go
662 lines
21 KiB
Go
package notificationservice
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
|
|
// "errors"
|
|
"log/slog"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/SamuelTariku/FortuneBet-Backend/internal/config"
|
|
"github.com/SamuelTariku/FortuneBet-Backend/internal/domain"
|
|
"github.com/SamuelTariku/FortuneBet-Backend/internal/pkgs/helpers"
|
|
"github.com/SamuelTariku/FortuneBet-Backend/internal/repository"
|
|
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/messenger"
|
|
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/user"
|
|
"go.uber.org/zap"
|
|
|
|
// "github.com/SamuelTariku/FortuneBet-Backend/internal/services/wallet"
|
|
"github.com/SamuelTariku/FortuneBet-Backend/internal/web_server/ws"
|
|
// afro "github.com/amanuelabay/afrosms-go"
|
|
"github.com/gorilla/websocket"
|
|
"github.com/redis/go-redis/v9"
|
|
)
|
|
|
|
type Service struct {
|
|
repo repository.NotificationRepository
|
|
Hub *ws.NotificationHub
|
|
notificationStore NotificationStore
|
|
connections sync.Map
|
|
notificationCh chan *domain.Notification
|
|
stopCh chan struct{}
|
|
config *config.Config
|
|
userSvc *user.Service
|
|
messengerSvc *messenger.Service
|
|
mongoLogger *zap.Logger
|
|
logger *slog.Logger
|
|
redisClient *redis.Client
|
|
}
|
|
|
|
func New(repo repository.NotificationRepository,
|
|
mongoLogger *zap.Logger,
|
|
logger *slog.Logger,
|
|
cfg *config.Config,
|
|
messengerSvc *messenger.Service,
|
|
userSvc *user.Service,
|
|
) *Service {
|
|
hub := ws.NewNotificationHub()
|
|
rdb := redis.NewClient(&redis.Options{
|
|
Addr: cfg.RedisAddr, // e.g., "redis:6379"
|
|
})
|
|
|
|
svc := &Service{
|
|
repo: repo,
|
|
Hub: hub,
|
|
mongoLogger: mongoLogger,
|
|
logger: logger,
|
|
connections: sync.Map{},
|
|
notificationCh: make(chan *domain.Notification, 1000),
|
|
stopCh: make(chan struct{}),
|
|
messengerSvc: messengerSvc,
|
|
userSvc: userSvc,
|
|
config: cfg,
|
|
redisClient: rdb,
|
|
}
|
|
|
|
go hub.Run()
|
|
go svc.startWorker()
|
|
go svc.startRetryWorker()
|
|
go svc.RunRedisSubscriber(context.Background())
|
|
|
|
return svc
|
|
}
|
|
|
|
func (s *Service) addConnection(recipientID int64, c *websocket.Conn) error {
|
|
if c == nil {
|
|
s.mongoLogger.Warn("[NotificationSvc.AddConnection] Attempted to add nil WebSocket connection",
|
|
zap.Int64("recipientID", recipientID),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return fmt.Errorf("Invalid Websocket Connection")
|
|
}
|
|
|
|
s.connections.Store(recipientID, c)
|
|
s.mongoLogger.Info("[NotificationSvc.AddConnection] Added WebSocket connection",
|
|
zap.Int64("recipientID", recipientID),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) SendNotification(ctx context.Context, notification *domain.Notification) error {
|
|
|
|
notification.ID = helpers.GenerateID()
|
|
notification.Timestamp = time.Now()
|
|
notification.DeliveryStatus = domain.DeliveryStatusPending
|
|
|
|
created, err := s.repo.CreateNotification(ctx, notification)
|
|
if err != nil {
|
|
s.mongoLogger.Error("[NotificationSvc.SendNotification] Failed to create notification",
|
|
zap.String("id", notification.ID),
|
|
zap.Error(err),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return err
|
|
}
|
|
|
|
notification = created
|
|
|
|
if notification.DeliveryChannel == domain.DeliveryChannelInApp {
|
|
s.Hub.Broadcast <- map[string]interface{}{
|
|
"type": "CREATED_NOTIFICATION",
|
|
"recipient_id": notification.RecipientID,
|
|
"payload": notification,
|
|
}
|
|
}
|
|
|
|
select {
|
|
case s.notificationCh <- notification:
|
|
default:
|
|
s.mongoLogger.Warn("[NotificationSvc.SendNotification] Notification channel full, dropping notification",
|
|
zap.String("id", notification.ID),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) MarkAsRead(ctx context.Context, notificationIDs []string, recipientID int64) error {
|
|
for _, notificationID := range notificationIDs {
|
|
_, err := s.repo.UpdateNotificationStatus(ctx, notificationID, string(domain.DeliveryStatusSent), true, nil)
|
|
if err != nil {
|
|
s.mongoLogger.Error("[NotificationSvc.MarkAsRead] Failed to mark notification as read",
|
|
zap.String("notificationID", notificationID),
|
|
zap.Int64("recipientID", recipientID),
|
|
zap.Error(err),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return err
|
|
}
|
|
|
|
// count, err := s.repo.CountUnreadNotifications(ctx, recipientID)
|
|
// if err != nil {
|
|
// s.logger.Error("[NotificationSvc.MarkAsRead] Failed to count unread notifications", "recipientID", recipientID, "error", err)
|
|
// return err
|
|
// }
|
|
|
|
// s.Hub.Broadcast <- map[string]interface{}{
|
|
// "type": "COUNT_NOT_OPENED_NOTIFICATION",
|
|
// "recipient_id": recipientID,
|
|
// "payload": map[string]int{
|
|
// "not_opened_notifications_count": int(count),
|
|
// },
|
|
// }
|
|
|
|
s.mongoLogger.Info("[NotificationSvc.MarkAsRead] Notification marked as read",
|
|
zap.String("notificationID", notificationID),
|
|
zap.Int64("recipientID", recipientID),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) ListNotifications(ctx context.Context, recipientID int64, limit, offset int) ([]domain.Notification, error) {
|
|
notifications, err := s.repo.ListNotifications(ctx, recipientID, limit, offset)
|
|
if err != nil {
|
|
s.mongoLogger.Error("[NotificationSvc.ListNotifications] Failed to list notifications",
|
|
zap.Int64("recipientID", recipientID),
|
|
zap.Int("limit", limit),
|
|
zap.Int("offset", offset),
|
|
zap.Error(err),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return nil, err
|
|
}
|
|
s.mongoLogger.Info("[NotificationSvc.ListNotifications] Successfully listed notifications",
|
|
zap.Int64("recipientID", recipientID),
|
|
zap.Int("count", len(notifications)),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return notifications, nil
|
|
}
|
|
|
|
func (s *Service) GetAllNotifications(ctx context.Context, limit, offset int) ([]domain.Notification, error) {
|
|
notifications, err := s.repo.GetAllNotifications(ctx, limit, offset)
|
|
if err != nil {
|
|
s.mongoLogger.Error("[NotificationSvc.ListNotifications] Failed to get all notifications",
|
|
zap.Int("limit", limit),
|
|
zap.Int("offset", offset),
|
|
zap.Error(err),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return nil, err
|
|
}
|
|
s.mongoLogger.Info("[NotificationSvc.ListNotifications] Successfully retrieved all notifications",
|
|
zap.Int("count", len(notifications)),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return notifications, nil
|
|
}
|
|
|
|
func (s *Service) ConnectWebSocket(ctx context.Context, recipientID int64, c *websocket.Conn) error {
|
|
err := s.addConnection(recipientID, c)
|
|
|
|
if err != nil {
|
|
s.mongoLogger.Error("[NotificationSvc.ConnectWebSocket] Failed to create WebSocket connection",
|
|
zap.Int64("recipientID", recipientID),
|
|
zap.Error(err),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return err
|
|
}
|
|
s.mongoLogger.Info("[NotificationSvc.ConnectWebSocket] WebSocket connection established",
|
|
zap.Int64("recipientID", recipientID),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) DisconnectWebSocket(recipientID int64) {
|
|
if conn, loaded := s.connections.LoadAndDelete(recipientID); loaded {
|
|
conn.(*websocket.Conn).Close()
|
|
// s.logger.Info("[NotificationSvc.DisconnectWebSocket] Disconnected WebSocket", "recipientID", recipientID)
|
|
s.mongoLogger.Info("[NotificationSvc.DisconnectWebSocket] Disconnected WebSocket",
|
|
zap.Int64("recipientID", recipientID),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
}
|
|
}
|
|
|
|
// func (s *Service) SendSMS(ctx context.Context, recipientID int64, message string) error {
|
|
// s.logger.Info("[NotificationSvc.SendSMS] SMS notification requested", "recipientID", recipientID, "message", message)
|
|
|
|
// apiKey := s.config.AFRO_SMS_API_KEY
|
|
// senderName := s.config.AFRO_SMS_SENDER_NAME
|
|
// receiverPhone := s.config.AFRO_SMS_RECEIVER_PHONE_NUMBER
|
|
// hostURL := s.config.ADRO_SMS_HOST_URL
|
|
// endpoint := "/api/send"
|
|
|
|
// request := afro.GetRequest(apiKey, endpoint, hostURL)
|
|
// request.Method = "GET"
|
|
// request.Sender(senderName)
|
|
// request.To(receiverPhone, message)
|
|
|
|
// response, err := afro.MakeRequestWithContext(ctx, request)
|
|
// if err != nil {
|
|
// s.logger.Error("[NotificationSvc.SendSMS] Failed to send SMS", "recipientID", recipientID, "error", err)
|
|
// return err
|
|
// }
|
|
|
|
// if response["acknowledge"] == "success" {
|
|
// s.logger.Info("[NotificationSvc.SendSMS] SMS sent successfully", "recipientID", recipientID)
|
|
// } else {
|
|
// s.logger.Error("[NotificationSvc.SendSMS] Failed to send SMS", "recipientID", recipientID, "response", response["response"])
|
|
// return errors.New("SMS delivery failed: " + response["response"].(string))
|
|
// }
|
|
|
|
// return nil
|
|
// }
|
|
|
|
// func (s *Service) SendEmail(ctx context.Context, recipientID int64, subject, message string) error {
|
|
// s.logger.Info("[NotificationSvc.SendEmail] Email notification requested", "recipientID", recipientID, "subject", subject)
|
|
// return nil
|
|
// }
|
|
|
|
func (s *Service) startWorker() {
|
|
for {
|
|
select {
|
|
case notification := <-s.notificationCh:
|
|
s.handleNotification(notification)
|
|
case <-s.stopCh:
|
|
s.mongoLogger.Info("[NotificationSvc.StartWorker] Worker stopped",
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Service) ListRecipientIDs(ctx context.Context, receiver domain.NotificationRecieverSide) ([]int64, error) {
|
|
return s.repo.ListRecipientIDs(ctx, receiver)
|
|
}
|
|
|
|
func (s *Service) handleNotification(notification *domain.Notification) {
|
|
ctx := context.Background()
|
|
|
|
switch notification.DeliveryChannel {
|
|
case domain.DeliveryChannelSMS:
|
|
err := s.SendNotificationSMS(ctx, notification.RecipientID, notification.Payload.Message)
|
|
if err != nil {
|
|
notification.DeliveryStatus = domain.DeliveryStatusFailed
|
|
} else {
|
|
notification.DeliveryStatus = domain.DeliveryStatusSent
|
|
}
|
|
|
|
case domain.DeliveryChannelEmail:
|
|
err := s.SendNotificationEmail(ctx, notification.RecipientID, notification.Payload.Headline, notification.Payload.Message)
|
|
if err != nil {
|
|
notification.DeliveryStatus = domain.DeliveryStatusFailed
|
|
} else {
|
|
notification.DeliveryStatus = domain.DeliveryStatusSent
|
|
}
|
|
default:
|
|
if notification.DeliveryChannel != domain.DeliveryChannelInApp {
|
|
s.mongoLogger.Warn("[NotificationSvc.HandleNotification] Unsupported delivery channel",
|
|
zap.String("channel", string(notification.DeliveryChannel)),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
notification.DeliveryStatus = domain.DeliveryStatusFailed
|
|
}
|
|
}
|
|
|
|
if _, err := s.repo.UpdateNotificationStatus(ctx, notification.ID, string(notification.DeliveryStatus), notification.IsRead, notification.Metadata); err != nil {
|
|
s.mongoLogger.Error("[NotificationSvc.HandleNotification] Failed to update notification status",
|
|
zap.String("id", notification.ID),
|
|
zap.Error(err),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
}
|
|
}
|
|
|
|
func (s *Service) SendNotificationSMS(ctx context.Context, recipientID int64, message string) error {
|
|
// Get User Phone Number
|
|
user, err := s.userSvc.GetUserByID(ctx, recipientID)
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if !user.PhoneVerified {
|
|
return fmt.Errorf("cannot send notification to unverified phone number")
|
|
}
|
|
|
|
if user.PhoneNumber == "" {
|
|
return fmt.Errorf("phone Number is invalid")
|
|
}
|
|
err = s.messengerSvc.SendSMS(ctx, user.PhoneNumber, message, user.CompanyID)
|
|
if err != nil {
|
|
s.mongoLogger.Error("[NotificationSvc.HandleNotification] Failed to send notification SMS",
|
|
zap.Int64("recipient_id", recipientID),
|
|
zap.String("user_phone_number", user.PhoneNumber),
|
|
zap.String("message", message),
|
|
zap.Int64("company_id", user.CompanyID.Value),
|
|
zap.Error(err),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) SendNotificationEmail(ctx context.Context, recipientID int64, message string, subject string) error {
|
|
// Get User Phone Number
|
|
user, err := s.userSvc.GetUserByID(ctx, recipientID)
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if !user.EmailVerified {
|
|
return fmt.Errorf("cannot send notification to unverified email")
|
|
}
|
|
|
|
if user.Email == "" {
|
|
return fmt.Errorf("email is invalid")
|
|
}
|
|
err = s.messengerSvc.SendEmail(ctx, user.Email, message, subject)
|
|
if err != nil {
|
|
s.mongoLogger.Error("[NotificationSvc.HandleNotification] Failed to send notification SMS",
|
|
zap.Int64("recipient_id", recipientID),
|
|
zap.String("user_email", user.Email),
|
|
zap.String("message", message),
|
|
zap.Int64("company_id", user.CompanyID.Value),
|
|
zap.Error(err),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) startRetryWorker() {
|
|
ticker := time.NewTicker(1 * time.Minute)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
s.retryFailedNotifications()
|
|
case <-s.stopCh:
|
|
s.mongoLogger.Info("[NotificationSvc.StartRetryWorker] Retry worker stopped",
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Service) retryFailedNotifications() {
|
|
ctx := context.Background()
|
|
failedNotifications, err := s.repo.ListFailedNotifications(ctx, 100)
|
|
if err != nil {
|
|
s.mongoLogger.Error("[NotificationSvc.RetryFailedNotifications] Failed to list failed notifications",
|
|
zap.Error(err),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return
|
|
}
|
|
|
|
for _, n := range failedNotifications {
|
|
notification := &n
|
|
go func(notification *domain.Notification) {
|
|
for attempt := 0; attempt < 3; attempt++ {
|
|
time.Sleep(time.Duration(attempt) * time.Second)
|
|
switch notification.DeliveryChannel {
|
|
case domain.DeliveryChannelSMS:
|
|
if err := s.SendNotificationSMS(ctx, notification.RecipientID, notification.Payload.Message); err == nil {
|
|
notification.DeliveryStatus = domain.DeliveryStatusSent
|
|
if _, err := s.repo.UpdateNotificationStatus(ctx, notification.ID, string(notification.DeliveryStatus), notification.IsRead, notification.Metadata); err != nil {
|
|
s.mongoLogger.Error("[NotificationSvc.RetryFailedNotifications] Failed to update after retry",
|
|
zap.String("id", notification.ID),
|
|
zap.Error(err),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
} else {
|
|
s.mongoLogger.Info("[NotificationSvc.RetryFailedNotifications] Successfully retried notification",
|
|
zap.String("id", notification.ID),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
}
|
|
|
|
return
|
|
}
|
|
case domain.DeliveryChannelEmail:
|
|
if err := s.SendNotificationEmail(ctx, notification.RecipientID, notification.Payload.Headline, notification.Payload.Message); err == nil {
|
|
notification.DeliveryStatus = domain.DeliveryStatusSent
|
|
if _, err := s.repo.UpdateNotificationStatus(ctx, notification.ID, string(notification.DeliveryStatus), notification.IsRead, notification.Metadata); err != nil {
|
|
s.mongoLogger.Error("[NotificationSvc.RetryFailedNotifications] Failed to update after retry",
|
|
zap.String("id", notification.ID),
|
|
zap.Error(err),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
} else {
|
|
s.mongoLogger.Info("[NotificationSvc.RetryFailedNotifications] Successfully retried notification",
|
|
zap.String("id", notification.ID),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
}
|
|
|
|
return
|
|
}
|
|
}
|
|
}
|
|
s.mongoLogger.Error("[NotificationSvc.RetryFailedNotifications] Max retries reached for notification",
|
|
zap.String("id", notification.ID),
|
|
zap.Error(err),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
}(notification)
|
|
}
|
|
}
|
|
|
|
func (s *Service) CountUnreadNotifications(ctx context.Context, recipient_id int64) (int64, error) {
|
|
return s.repo.CountUnreadNotifications(ctx, recipient_id)
|
|
}
|
|
|
|
// func (s *Service) GetNotificationCounts(ctx context.Context, filter domain.ReportFilter) (total, read, unread int64, err error){
|
|
// return s.repo.Get(ctx, filter)
|
|
// }
|
|
|
|
func (s *Service) RunRedisSubscriber(ctx context.Context) {
|
|
pubsub := s.redisClient.Subscribe(ctx, "live_metrics")
|
|
defer pubsub.Close()
|
|
|
|
ch := pubsub.Channel()
|
|
for msg := range ch {
|
|
var parsed map[string]interface{}
|
|
if err := json.Unmarshal([]byte(msg.Payload), &parsed); err != nil {
|
|
// s.logger.Error("invalid Redis message format", "payload", msg.Payload, "error", err)
|
|
s.mongoLogger.Error("invalid Redis message format",
|
|
zap.String("payload", msg.Payload),
|
|
zap.Error(err),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
continue
|
|
}
|
|
|
|
eventType, _ := parsed["type"].(string)
|
|
payload := parsed["payload"]
|
|
recipientID, hasRecipient := parsed["recipient_id"]
|
|
recipientType, _ := parsed["recipient_type"].(string)
|
|
|
|
message := map[string]interface{}{
|
|
"type": eventType,
|
|
"payload": payload,
|
|
}
|
|
|
|
if hasRecipient {
|
|
message["recipient_id"] = recipientID
|
|
message["recipient_type"] = recipientType
|
|
}
|
|
|
|
s.Hub.Broadcast <- message
|
|
}
|
|
}
|
|
|
|
func (s *Service) UpdateLiveWalletMetrics(ctx context.Context, companies []domain.GetCompany, branches []domain.BranchWallet) error {
|
|
const key = "live_metrics"
|
|
|
|
companyBalances := make([]domain.CompanyWalletBalance, 0, len(companies))
|
|
for _, c := range companies {
|
|
companyBalances = append(companyBalances, domain.CompanyWalletBalance{
|
|
CompanyID: c.ID,
|
|
CompanyName: c.Name,
|
|
Balance: float64(c.WalletBalance.Float32()),
|
|
})
|
|
}
|
|
|
|
branchBalances := make([]domain.BranchWalletBalance, 0, len(branches))
|
|
for _, b := range branches {
|
|
branchBalances = append(branchBalances, domain.BranchWalletBalance{
|
|
BranchID: b.ID,
|
|
BranchName: b.Name,
|
|
CompanyID: b.CompanyID,
|
|
Balance: float64(b.Balance.Float32()),
|
|
})
|
|
}
|
|
|
|
payload := domain.LiveWalletMetrics{
|
|
Timestamp: time.Now(),
|
|
CompanyBalances: companyBalances,
|
|
BranchBalances: branchBalances,
|
|
}
|
|
|
|
updatedData, err := json.Marshal(payload)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := s.redisClient.Set(ctx, key, updatedData, 0).Err(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := s.redisClient.Publish(ctx, key, updatedData).Err(); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) GetLiveMetrics(ctx context.Context) (domain.LiveMetric, error) {
|
|
const key = "live_metrics"
|
|
var metric domain.LiveMetric
|
|
|
|
val, err := s.redisClient.Get(ctx, key).Result()
|
|
if err == redis.Nil {
|
|
// Key does not exist yet, return zero-valued struct
|
|
return domain.LiveMetric{}, nil
|
|
} else if err != nil {
|
|
return domain.LiveMetric{}, err
|
|
}
|
|
|
|
if err := json.Unmarshal([]byte(val), &metric); err != nil {
|
|
return domain.LiveMetric{}, err
|
|
}
|
|
|
|
return metric, nil
|
|
}
|
|
|
|
// func (s *Service) UpdateLiveWalletMetricForWallet(ctx context.Context, wallet domain.Wallet) {
|
|
// var (
|
|
// payload domain.LiveWalletMetrics
|
|
// event map[string]interface{}
|
|
// key = "live_metrics"
|
|
// )
|
|
|
|
// // Try company first
|
|
// company, companyErr := s.notificationStore.GetCompanyByWalletID(ctx, wallet.ID)
|
|
// if companyErr == nil {
|
|
// payload = domain.LiveWalletMetrics{
|
|
// Timestamp: time.Now(),
|
|
// CompanyBalances: []domain.CompanyWalletBalance{{
|
|
// CompanyID: company.ID,
|
|
// CompanyName: company.Name,
|
|
// Balance: float64(wallet.Balance),
|
|
// }},
|
|
// BranchBalances: []domain.BranchWalletBalance{},
|
|
// }
|
|
|
|
// event = map[string]interface{}{
|
|
// "type": "LIVE_WALLET_METRICS_UPDATE",
|
|
// "recipient_id": company.ID,
|
|
// "recipient_type": "company",
|
|
// "payload": payload,
|
|
// }
|
|
// } else {
|
|
// // Try branch next
|
|
// branch, branchErr := s.notificationStore.GetBranchByWalletID(ctx, wallet.ID)
|
|
// if branchErr == nil {
|
|
// payload = domain.LiveWalletMetrics{
|
|
// Timestamp: time.Now(),
|
|
// CompanyBalances: []domain.CompanyWalletBalance{},
|
|
// BranchBalances: []domain.BranchWalletBalance{{
|
|
// BranchID: branch.ID,
|
|
// BranchName: branch.Name,
|
|
// CompanyID: branch.CompanyID,
|
|
// Balance: float64(wallet.Balance),
|
|
// }},
|
|
// }
|
|
|
|
// event = map[string]interface{}{
|
|
// "type": "LIVE_WALLET_METRICS_UPDATE",
|
|
// "recipient_id": branch.ID,
|
|
// "recipient_type": "branch",
|
|
// "payload": payload,
|
|
// }
|
|
// } else {
|
|
// // Neither company nor branch matched this wallet
|
|
// // s.logger.Warn("wallet not linked to any company or branch", "walletID", wallet.ID)
|
|
// s.mongoLogger.Warn("wallet not linked to any company or branch",
|
|
// zap.Int64("walletID", wallet.ID),
|
|
// zap.Time("timestamp", time.Now()),
|
|
// )
|
|
// return
|
|
// }
|
|
// }
|
|
|
|
// // Save latest metric to Redis
|
|
// if jsonBytes, err := json.Marshal(payload); err == nil {
|
|
// s.redisClient.Set(ctx, key, jsonBytes, 0)
|
|
// } else {
|
|
// // s.logger.Error("failed to marshal wallet metrics payload", "walletID", wallet.ID, "err", err)
|
|
// s.mongoLogger.Error("failed to marshal wallet metrics payload",
|
|
// zap.Int64("walletID", wallet.ID),
|
|
// zap.Error(err),
|
|
// zap.Time("timestamp", time.Now()),
|
|
// )
|
|
// }
|
|
|
|
// // Publish via Redis
|
|
// if jsonEvent, err := json.Marshal(event); err == nil {
|
|
// s.redisClient.Publish(ctx, key, jsonEvent)
|
|
// } else {
|
|
// // s.logger.Error("failed to marshal event payload", "walletID", wallet.ID, "err", err)
|
|
// s.mongoLogger.Error("failed to marshal event payload",
|
|
// zap.Int64("walletID", wallet.ID),
|
|
// zap.Error(err),
|
|
// zap.Time("timestamp", time.Now()),
|
|
// )
|
|
// }
|
|
|
|
// // Broadcast over WebSocket
|
|
// s.Hub.Broadcast <- event
|
|
// }
|