Yimaru-BackEnd/internal/services/notification/service.go

850 lines
26 KiB
Go

package notificationservice
import (
"Yimaru-Backend/internal/config"
"Yimaru-Backend/internal/domain"
"Yimaru-Backend/internal/pkgs/helpers"
"Yimaru-Backend/internal/ports"
"Yimaru-Backend/internal/services/messenger"
"Yimaru-Backend/internal/services/user"
"Yimaru-Backend/internal/web_server/ws"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
// "errors"
"log/slog"
"sync"
"time"
// "github.com/segmentio/kafka-go"
"go.uber.org/zap"
// afro "github.com/amanuelabay/afrosms-go"
afro "github.com/amanuelabay/afrosms-go"
"github.com/gorilla/websocket"
// "github.com/redis/go-redis/v9"
)
type Service struct {
store ports.NotificationStore
Hub *ws.NotificationHub
connections sync.Map
notificationCh chan *domain.Notification
stopCh chan struct{}
config *config.Config
userSvc *user.Service
messengerSvc *messenger.Service
mongoLogger *zap.Logger
logger *slog.Logger
}
func New(
store ports.NotificationStore,
mongoLogger *zap.Logger,
logger *slog.Logger,
cfg *config.Config,
messengerSvc *messenger.Service,
userSvc *user.Service,
) *Service {
hub := ws.NewNotificationHub()
svc := &Service{
store: store,
Hub: hub,
mongoLogger: mongoLogger,
logger: logger,
connections: sync.Map{},
notificationCh: make(chan *domain.Notification, 1000),
stopCh: make(chan struct{}),
messengerSvc: messengerSvc,
userSvc: userSvc,
config: cfg,
}
go hub.Run()
go svc.startWorker()
// go svc.startRetryWorker()
// go svc.RunRedisSubscriber(context.Background())
// go svc.StartKafkaConsumer(context.Background())
return svc
}
func (s *Service) SendAfroMessageSMS(ctx context.Context, receiverPhone, message string) error {
apiKey := s.config.AFRO_SMS_API_KEY
senderName := s.config.AFRO_SMS_SENDER_NAME
baseURL := "https://api.afromessage.com"
endpoint := "/api/send"
request := afro.GetRequest(apiKey, endpoint, baseURL)
// MUST be POST
request.Method = "POST"
request.Sender(senderName)
request.To(receiverPhone, message)
response, err := afro.MakeRequestWithContext(ctx, request)
if err != nil {
return err
}
ack, ok := response["acknowledge"].(string)
if !ok {
return fmt.Errorf("unexpected SMS response format: %v", response)
}
if ack != "success" {
return fmt.Errorf("SMS delivery failed: %v", response)
}
return nil
}
func (s *Service) SendAfroMessageSMSTemp(
ctx context.Context,
receiverPhone string,
message string,
callbackURL *string, // optional
) error {
baseURL := s.config.AFROSMSConfig.AfroSMSBaseURL
// Build query parameters explicitly
params := url.Values{}
params.Set("to", receiverPhone)
params.Set("message", message)
params.Set("sender", s.config.AFRO_SMS_SENDER_NAME)
// Optional parameters
if s.config.AFROSMSConfig.AfroSMSIdentifierID != "" {
params.Set("from", s.config.AFROSMSConfig.AfroSMSIdentifierID)
}
if callbackURL != nil {
params.Set("callback", *callbackURL)
}
// Construct full URL
reqURL := fmt.Sprintf("%s?%s", baseURL, params.Encode())
req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqURL, nil)
if err != nil {
return err
}
// AfroMessage authentication (API key)
req.Header.Set("Authorization", "Bearer "+s.config.AFRO_SMS_API_KEY)
req.Header.Set("Accept", "application/json")
client := &http.Client{
Timeout: 10 * time.Second,
}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf(
"afromessage sms failed: status=%d response=%s",
resp.StatusCode,
string(body),
)
}
// Parse response
var result map[string]interface{}
if err := json.Unmarshal(body, &result); err != nil {
return err
}
ack, ok := result["acknowledge"].(string)
if !ok || ack != "success" {
return fmt.Errorf("sms delivery failed: %v", result)
}
return nil
}
func (s *Service) addConnection(recipientID int64, c *websocket.Conn) error {
if c == nil {
s.mongoLogger.Warn("[NotificationSvc.AddConnection] Attempted to add nil WebSocket connection",
zap.Int64("recipientID", recipientID),
zap.Time("timestamp", time.Now()),
)
return fmt.Errorf("Invalid Websocket Connection")
}
s.connections.Store(recipientID, c)
s.mongoLogger.Info("[NotificationSvc.AddConnection] Added WebSocket connection",
zap.Int64("recipientID", recipientID),
zap.Time("timestamp", time.Now()),
)
return nil
}
func (s *Service) SendNotification(ctx context.Context, notification *domain.Notification) error {
notification.ID = helpers.GenerateID()
notification.Timestamp = time.Now()
notification.DeliveryStatus = domain.DeliveryStatusPending
created, err := s.store.CreateNotification(ctx, notification)
if err != nil {
s.mongoLogger.Error("[NotificationSvc.SendNotification] Failed to create notification",
zap.String("id", notification.ID),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return err
}
notification = created
if notification.DeliveryChannel == domain.DeliveryChannelInApp {
s.Hub.Broadcast <- map[string]interface{}{
"type": "CREATED_NOTIFICATION",
"recipient_id": notification.RecipientID,
"payload": notification,
}
}
select {
case s.notificationCh <- notification:
default:
s.mongoLogger.Warn("[NotificationSvc.SendNotification] Notification channel full, dropping notification",
zap.String("id", notification.ID),
zap.Time("timestamp", time.Now()),
)
}
return nil
}
// func (s *Service) MarkAsRead(ctx context.Context, notificationIDs []string, recipientID int64) error {
// for _, notificationID := range notificationIDs {
// _, err := s.store.UpdateNotificationStatus(ctx, notificationID, string(domain.DeliveryStatusSent), true, nil)
// if err != nil {
// s.mongoLogger.Error("[NotificationSvc.MarkAsRead] Failed to mark notification as read",
// zap.String("notificationID", notificationID),
// zap.Int64("recipientID", recipientID),
// zap.Error(err),
// zap.Time("timestamp", time.Now()),
// )
// return err
// }
// // count, err := s.store.CountUnreadNotifications(ctx, recipientID)
// // if err != nil {
// // s.logger.Error("[NotificationSvc.MarkAsRead] Failed to count unread notifications", "recipientID", recipientID, "error", err)
// // return err
// // }
// // s.Hub.Broadcast <- map[string]interface{}{
// // "type": "COUNT_NOT_OPENED_NOTIFICATION",
// // "recipient_id": recipientID,
// // "payload": map[string]int{
// // "not_opened_notifications_count": int(count),
// // },
// // }
// s.mongoLogger.Info("[NotificationSvc.MarkAsRead] Notification marked as read",
// zap.String("notificationID", notificationID),
// zap.Int64("recipientID", recipientID),
// zap.Time("timestamp", time.Now()),
// )
// }
// return nil
// }
func (s *Service) GetUserNotifications(ctx context.Context, recipientID int64, limit, offset int) ([]domain.Notification, int64, error) {
notifications, total, err := s.store.GetUserNotifications(ctx, recipientID, limit, offset)
if err != nil {
s.mongoLogger.Error("[NotificationSvc.GetUserNotifications] Failed to list notifications",
zap.Int64("recipientID", recipientID),
zap.Int("limit", limit),
zap.Int("offset", offset),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return nil, 0, err
}
s.mongoLogger.Info("[NotificationSvc.GetUserNotifications] Successfully listed notifications",
zap.Int64("recipientID", recipientID),
zap.Int("count", len(notifications)),
zap.Int64("total", total),
zap.Time("timestamp", time.Now()),
)
return notifications, total, nil
}
func (s *Service) GetAllNotifications(ctx context.Context, limit, offset int) ([]domain.Notification, error) {
notifications, err := s.store.GetAllNotifications(ctx, limit, offset)
if err != nil {
s.mongoLogger.Error("[NotificationSvc.ListNotifications] Failed to get all notifications",
zap.Int("limit", limit),
zap.Int("offset", offset),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return nil, err
}
s.mongoLogger.Info("[NotificationSvc.ListNotifications] Successfully retrieved all notifications",
zap.Int("count", len(notifications)),
zap.Time("timestamp", time.Now()),
)
return notifications, nil
}
func (s *Service) ConnectWebSocket(ctx context.Context, recipientID int64, c *websocket.Conn) error {
err := s.addConnection(recipientID, c)
if err != nil {
s.mongoLogger.Error("[NotificationSvc.ConnectWebSocket] Failed to create WebSocket connection",
zap.Int64("recipientID", recipientID),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return err
}
s.mongoLogger.Info("[NotificationSvc.ConnectWebSocket] WebSocket connection established",
zap.Int64("recipientID", recipientID),
zap.Time("timestamp", time.Now()),
)
return nil
}
func (s *Service) DisconnectWebSocket(recipientID int64) {
if conn, loaded := s.connections.LoadAndDelete(recipientID); loaded {
conn.(*websocket.Conn).Close()
// s.logger.Info("[NotificationSvc.DisconnectWebSocket] Disconnected WebSocket", "recipientID", recipientID)
s.mongoLogger.Info("[NotificationSvc.DisconnectWebSocket] Disconnected WebSocket",
zap.Int64("recipientID", recipientID),
zap.Time("timestamp", time.Now()),
)
}
}
// func (s *Service) SendSMS(ctx context.Context, recipientID int64, message string) error {
// s.logger.Info("[NotificationSvc.SendSMS] SMS notification requested", "recipientID", recipientID, "message", message)
// apiKey := s.config.AFRO_SMS_API_KEY
// senderName := s.config.AFRO_SMS_SENDER_NAME
// receiverPhone := s.config.AFRO_SMS_RECEIVER_PHONE_NUMBER
// hostURL := s.config.ADRO_SMS_HOST_URL
// endpoint := "/api/send"
// request := afro.GetRequest(apiKey, endpoint, hostURL)
// request.Method = "GET"
// request.Sender(senderName)
// request.To(receiverPhone, message)
// response, err := afro.MakeRequestWithContext(ctx, request)
// if err != nil {
// s.logger.Error("[NotificationSvc.SendSMS] Failed to send SMS", "recipientID", recipientID, "error", err)
// return err
// }
// if response["acknowledge"] == "success" {
// s.logger.Info("[NotificationSvc.SendSMS] SMS sent successfully", "recipientID", recipientID)
// } else {
// s.logger.Error("[NotificationSvc.SendSMS] Failed to send SMS", "recipientID", recipientID, "response", response["response"])
// return errors.New("SMS delivery failed: " + response["response"].(string))
// }
// return nil
// }
// func (s *Service) SendEmail(ctx context.Context, recipientID int64, subject, message string) error {
// s.logger.Info("[NotificationSvc.SendEmail] Email notification requested", "recipientID", recipientID, "subject", subject)
// return nil
// }
func (s *Service) startWorker() {
for {
select {
case notification := <-s.notificationCh:
s.handleNotification(notification)
case <-s.stopCh:
s.mongoLogger.Info("[NotificationSvc.StartWorker] Worker stopped",
zap.Time("timestamp", time.Now()),
)
return
}
}
}
// func (s *Service) ListRecipientIDs(ctx context.Context, receiver domain.NotificationRecieverSide) ([]int64, error) {
// return s.store.ListRecipientIDs(ctx, receiver)
// }
func (s *Service) handleNotification(notification *domain.Notification) {
ctx := context.Background()
switch notification.DeliveryChannel {
case domain.DeliveryChannelSMS:
err := s.SendNotificationSMS(ctx, notification.RecipientID, notification.Payload.Message)
if err != nil {
notification.DeliveryStatus = domain.DeliveryStatusFailed
} else {
notification.DeliveryStatus = domain.DeliveryStatusSent
}
case domain.DeliveryChannelEmail:
err := s.SendNotificationEmail(ctx, notification.RecipientID, notification.Payload.Message, notification.Payload.Headline)
if err != nil {
notification.DeliveryStatus = domain.DeliveryStatusFailed
} else {
notification.DeliveryStatus = domain.DeliveryStatusSent
}
default:
if notification.DeliveryChannel != domain.DeliveryChannelInApp {
s.mongoLogger.Warn("[NotificationSvc.HandleNotification] Unsupported delivery channel",
zap.String("channel", string(notification.DeliveryChannel)),
zap.Time("timestamp", time.Now()),
)
notification.DeliveryStatus = domain.DeliveryStatusFailed
}
}
// if _, err := s.store.UpdateNotificationStatus(ctx, notification.ID, string(notification.DeliveryStatus), notification.IsRead, notification.Metadata); err != nil {
// s.mongoLogger.Error("[NotificationSvc.HandleNotification] Failed to update notification status",
// zap.String("id", notification.ID),
// zap.Error(err),
// zap.Time("timestamp", time.Now()),
// )
// }
}
func (s *Service) SendNotificationSMS(ctx context.Context, recipientID int64, message string) error {
// Get User Phone Number
user, err := s.userSvc.GetUserByID(ctx, recipientID)
if err != nil {
return err
}
if !user.PhoneVerified {
return fmt.Errorf("cannot send notification to unverified phone number")
}
if user.PhoneNumber == "" {
return fmt.Errorf("phone Number is invalid")
}
err = s.messengerSvc.SendSMS(ctx, user.PhoneNumber, message)
if err != nil {
s.mongoLogger.Error("[NotificationSvc.HandleNotification] Failed to send notification SMS",
zap.Int64("recipient_id", recipientID),
zap.String("user_phone_number", user.PhoneNumber),
zap.String("message", message),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return err
}
return nil
}
func (s *Service) SendNotificationEmail(ctx context.Context, recipientID int64, message string, subject string) error {
// Get User Phone Number
user, err := s.userSvc.GetUserByID(ctx, recipientID)
if err != nil {
return err
}
if !user.EmailVerified {
return fmt.Errorf("cannot send notification to unverified email")
}
if user.Email == "" {
return fmt.Errorf("email is invalid")
}
err = s.messengerSvc.SendEmail(ctx, user.Email, message, message, subject)
if err != nil {
s.mongoLogger.Error("[NotificationSvc.HandleNotification] Failed to send notification SMS",
zap.Int64("recipient_id", recipientID),
zap.String("user_email", user.Email),
zap.String("message", message),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return err
}
return nil
}
// func (s *Service) startRetryWorker() {
// ticker := time.NewTicker(1 * time.Minute)
// defer ticker.Stop()
// for {
// select {
// case <-ticker.C:
// s.retryFailedNotifications()
// case <-s.stopCh:
// s.mongoLogger.Info("[NotificationSvc.StartRetryWorker] Retry worker stopped",
// zap.Time("timestamp", time.Now()),
// )
// return
// }
// }
// }
// func (s *Service) retryFailedNotifications() {
// ctx := context.Background()
// failedNotifications, err := s.store.ListFailedNotifications(ctx, 100)
// if err != nil {
// s.mongoLogger.Error("[NotificationSvc.RetryFailedNotifications] Failed to list failed notifications",
// zap.Error(err),
// zap.Time("timestamp", time.Now()),
// )
// return
// }
// for _, n := range failedNotifications {
// notification := &n
// go func(notification *domain.Notification) {
// for attempt := 0; attempt < 3; attempt++ {
// time.Sleep(time.Duration(attempt) * time.Second)
// switch notification.DeliveryChannel {
// case domain.DeliveryChannelSMS:
// if err := s.SendNotificationSMS(ctx, notification.RecipientID, notification.Payload.Message); err == nil {
// notification.DeliveryStatus = domain.DeliveryStatusSent
// if _, err := s.store.UpdateNotificationStatus(ctx, notification.ID, string(notification.DeliveryStatus), notification.IsRead, notification.Metadata); err != nil {
// s.mongoLogger.Error("[NotificationSvc.RetryFailedNotifications] Failed to update after retry",
// zap.String("id", notification.ID),
// zap.Error(err),
// zap.Time("timestamp", time.Now()),
// )
// } else {
// s.mongoLogger.Info("[NotificationSvc.RetryFailedNotifications] Successfully retried notification",
// zap.String("id", notification.ID),
// zap.Time("timestamp", time.Now()),
// )
// }
// return
// }
// case domain.DeliveryChannelEmail:
// if err := s.SendNotificationEmail(ctx, notification.RecipientID, notification.Payload.Message, notification.Payload.Headline); err == nil {
// notification.DeliveryStatus = domain.DeliveryStatusSent
// if _, err := s.store.UpdateNotificationStatus(ctx, notification.ID, string(notification.DeliveryStatus), notification.IsRead, notification.Metadata); err != nil {
// s.mongoLogger.Error("[NotificationSvc.RetryFailedNotifications] Failed to update after retry",
// zap.String("id", notification.ID),
// zap.Error(err),
// zap.Time("timestamp", time.Now()),
// )
// } else {
// s.mongoLogger.Info("[NotificationSvc.RetryFailedNotifications] Successfully retried notification",
// zap.String("id", notification.ID),
// zap.Time("timestamp", time.Now()),
// )
// }
// return
// }
// }
// }
// s.mongoLogger.Error("[NotificationSvc.RetryFailedNotifications] Max retries reached for notification",
// zap.String("id", notification.ID),
// zap.Error(err),
// zap.Time("timestamp", time.Now()),
// )
// }(notification)
// }
// }
func (s *Service) CountUnreadNotifications(ctx context.Context, recipient_id int64) (int64, error) {
return s.store.CountUnreadNotifications(ctx, recipient_id)
}
// func (s *Service) DeleteOldNotifications(ctx context.Context) error {
// return s.store.DeleteOldNotifications(ctx)
// }
// func (s *Service) GetNotificationCounts(ctx context.Context, filter domain.ReportFilter) (total, read, unread int64, err error){
// return s.store.Get(ctx, filter)
// }
// func (s *Service) RunRedisSubscriber(ctx context.Context) {
// pubsub := s.redisClient.Subscribe(ctx, "live_metrics")
// defer pubsub.Close()
// ch := pubsub.Channel()
// for msg := range ch {
// var parsed map[string]interface{}
// if err := json.Unmarshal([]byte(msg.Payload), &parsed); err != nil {
// // s.logger.Error("invalid Redis message format", "payload", msg.Payload, "error", err)
// s.mongoLogger.Error("invalid Redis message format",
// zap.String("payload", msg.Payload),
// zap.Error(err),
// zap.Time("timestamp", time.Now()),
// )
// continue
// }
// eventType, _ := parsed["type"].(string)
// payload := parsed["payload"]
// recipientID, hasRecipient := parsed["recipient_id"]
// recipientType, _ := parsed["recipient_type"].(string)
// message := map[string]interface{}{
// "type": eventType,
// "payload": payload,
// }
// if hasRecipient {
// message["recipient_id"] = recipientID
// message["recipient_type"] = recipientType
// }
// s.Hub.Broadcast <- message
// }
// }
// func (s *Service) UpdateLiveWalletMetrics(ctx context.Context, companies []domain.GetCompany, branches []domain.BranchWallet) error {
// const key = "live_metrics"
// companyBalances := make([]domain.CompanyWalletBalance, 0, len(companies))
// for _, c := range companies {
// companyBalances = append(companyBalances, domain.CompanyWalletBalance{
// CompanyID: c.ID,
// CompanyName: c.Name,
// Balance: float64(c.WalletBalance.Float32()),
// })
// }
// branchBalances := make([]domain.BranchWalletBalance, 0, len(branches))
// for _, b := range branches {
// branchBalances = append(branchBalances, domain.BranchWalletBalance{
// BranchID: b.ID,
// BranchName: b.Name,
// CompanyID: b.CompanyID,
// Balance: float64(b.Balance.Float32()),
// })
// }
// payload := domain.LiveWalletMetrics{
// Timestamp: time.Now(),
// CompanyBalances: companyBalances,
// BranchBalances: branchBalances,
// }
// updatedData, err := json.Marshal(payload)
// if err != nil {
// return err
// }
// if err := s.redisClient.Set(ctx, key, updatedData, 0).Err(); err != nil {
// return err
// }
// if err := s.redisClient.Publish(ctx, key, updatedData).Err(); err != nil {
// return err
// }
// return nil
// }
// func (s *Service) GetLiveMetrics(ctx context.Context) (domain.LiveMetric, error) {
// const key = "live_metrics"
// var metric domain.LiveMetric
// val, err := s.redisClient.Get(ctx, key).Result()
// if err == redis.Nil {
// // Key does not exist yet, return zero-valued struct
// return domain.LiveMetric{}, nil
// } else if err != nil {
// return domain.LiveMetric{}, err
// }
// if err := json.Unmarshal([]byte(val), &metric); err != nil {
// return domain.LiveMetric{}, err
// }
// return metric, nil
// }
// func (s *Service) StartKafkaConsumer(ctx context.Context) {
// go func() {
// for {
// m, err := s.reader.ReadMessage(ctx)
// if err != nil {
// if err == context.Canceled {
// s.mongoLogger.Info("[NotificationSvc.KafkaConsumer] Stopped by context")
// return
// }
// s.mongoLogger.Error("[NotificationSvc.KafkaConsumer] Error reading message",
// zap.Error(err),
// zap.Time("timestamp", time.Now()),
// )
// time.Sleep(1 * time.Second) // backoff
// continue
// }
// var walletEvent event.WalletEvent
// if err := json.Unmarshal(m.Value, &walletEvent); err != nil {
// s.mongoLogger.Error("[NotificationSvc.KafkaConsumer] Failed to unmarshal wallet event",
// zap.String("message", string(m.Value)),
// zap.Error(err),
// zap.Time("timestamp", time.Now()),
// )
// continue
// }
// raw, _ := json.Marshal(map[string]any{
// "balance": walletEvent.Balance.Float32(),
// "type": walletEvent.WalletType,
// "timestamp": time.Now(),
// })
// headline := ""
// message := ""
// var receiver domain.NotificationRecieverSide
// switch walletEvent.WalletType {
// case domain.StaticWalletType:
// headline = "Referral and Bonus Wallet Updated"
// message = fmt.Sprintf("Your referral and bonus wallet balance is now %.2f", walletEvent.Balance.Float32())
// receiver = domain.NotificationRecieverSideCustomer
// case domain.RegularWalletType:
// headline = "Wallet Updated"
// message = fmt.Sprintf("Your wallet balance is now %.2f", walletEvent.Balance.Float32())
// receiver = domain.NotificationRecieverSideCustomer
// case domain.BranchWalletType:
// headline = "Branch Wallet Updated"
// message = fmt.Sprintf("branch wallet balance is now %.2f", walletEvent.Balance.Float32())
// receiver = domain.NotificationRecieverSideBranchManager
// case domain.CompanyWalletType:
// headline = "Company Wallet Updated"
// message = fmt.Sprintf("company wallet balance is now %.2f", walletEvent.Balance.Float32())
// receiver = domain.NotificationRecieverSideAdmin
// }
// // Handle the wallet event: send notification
// notification := &domain.Notification{
// RecipientID: walletEvent.UserID,
// DeliveryChannel: domain.DeliveryChannelInApp,
// Reciever: receiver,
// Type: domain.NotificationTypeWalletUpdated,
// DeliveryStatus: domain.DeliveryStatusPending,
// IsRead: false,
// Level: domain.NotificationLevelInfo,
// Priority: 2,
// Metadata: raw,
// Payload: domain.NotificationPayload{
// Headline: headline,
// Message: message,
// },
// }
// if err := s.SendNotification(ctx, notification); err != nil {
// s.mongoLogger.Error("[NotificationSvc.KafkaConsumer] Failed to send notification",
// zap.Error(err),
// zap.Time("timestamp", time.Now()),
// )
// }
// }
// }()
// }
// func (s *Service) UpdateLiveWalletMetricForWallet(ctx context.Context, wallet domain.Wallet) {
// var (
// payload domain.LiveWalletMetrics
// event map[string]interface{}
// key = "live_metrics"
// )
// // Try company first
// company, companyErr := s.notificationStore.GetCompanyByWalletID(ctx, wallet.ID)
// if companyErr == nil {
// payload = domain.LiveWalletMetrics{
// Timestamp: time.Now(),
// CompanyBalances: []domain.CompanyWalletBalance{{
// CompanyID: company.ID,
// CompanyName: company.Name,
// Balance: float64(wallet.Balance),
// }},
// BranchBalances: []domain.BranchWalletBalance{},
// }
// event = map[string]interface{}{
// "type": "LIVE_WALLET_METRICS_UPDATE",
// "recipient_id": company.ID,
// "recipient_type": "company",
// "payload": payload,
// }
// } else {
// // Try branch next
// branch, branchErr := s.notificationStore.GetBranchByWalletID(ctx, wallet.ID)
// if branchErr == nil {
// payload = domain.LiveWalletMetrics{
// Timestamp: time.Now(),
// CompanyBalances: []domain.CompanyWalletBalance{},
// BranchBalances: []domain.BranchWalletBalance{{
// BranchID: branch.ID,
// BranchName: branch.Name,
// CompanyID: branch.CompanyID,
// Balance: float64(wallet.Balance),
// }},
// }
// event = map[string]interface{}{
// "type": "LIVE_WALLET_METRICS_UPDATE",
// "recipient_id": branch.ID,
// "recipient_type": "branch",
// "payload": payload,
// }
// } else {
// // Neither company nor branch matched this wallet
// // s.logger.Warn("wallet not linked to any company or branch", "walletID", wallet.ID)
// s.mongoLogger.Warn("wallet not linked to any company or branch",
// zap.Int64("walletID", wallet.ID),
// zap.Time("timestamp", time.Now()),
// )
// return
// }
// }
// // Save latest metric to Redis
// if jsonBytes, err := json.Marshal(payload); err == nil {
// s.redisClient.Set(ctx, key, jsonBytes, 0)
// } else {
// // s.logger.Error("failed to marshal wallet metrics payload", "walletID", wallet.ID, "err", err)
// s.mongoLogger.Error("failed to marshal wallet metrics payload",
// zap.Int64("walletID", wallet.ID),
// zap.Error(err),
// zap.Time("timestamp", time.Now()),
// )
// }
// // Publish via Redis
// if jsonEvent, err := json.Marshal(event); err == nil {
// s.redisClient.Publish(ctx, key, jsonEvent)
// } else {
// // s.logger.Error("failed to marshal event payload", "walletID", wallet.ID, "err", err)
// s.mongoLogger.Error("failed to marshal event payload",
// zap.Int64("walletID", wallet.ID),
// zap.Error(err),
// zap.Time("timestamp", time.Now()),
// )
// }
// // Broadcast over WebSocket
// s.Hub.Broadcast <- event
// }