package notificationservice import ( "context" "encoding/json" "fmt" // "errors" "log/slog" "sync" "time" "github.com/SamuelTariku/FortuneBet-Backend/internal/config" "github.com/SamuelTariku/FortuneBet-Backend/internal/domain" "github.com/SamuelTariku/FortuneBet-Backend/internal/pkgs/helpers" "github.com/SamuelTariku/FortuneBet-Backend/internal/repository" "github.com/SamuelTariku/FortuneBet-Backend/internal/services/messenger" "github.com/SamuelTariku/FortuneBet-Backend/internal/services/user" "go.uber.org/zap" // "github.com/SamuelTariku/FortuneBet-Backend/internal/services/wallet" "github.com/SamuelTariku/FortuneBet-Backend/internal/web_server/ws" // afro "github.com/amanuelabay/afrosms-go" "github.com/gorilla/websocket" "github.com/redis/go-redis/v9" ) type Service struct { repo repository.NotificationRepository Hub *ws.NotificationHub notificationStore NotificationStore connections sync.Map notificationCh chan *domain.Notification stopCh chan struct{} config *config.Config userSvc *user.Service messengerSvc *messenger.Service mongoLogger *zap.Logger logger *slog.Logger redisClient *redis.Client } func New(repo repository.NotificationRepository, mongoLogger *zap.Logger, logger *slog.Logger, cfg *config.Config, messengerSvc *messenger.Service, userSvc *user.Service, ) *Service { hub := ws.NewNotificationHub() rdb := redis.NewClient(&redis.Options{ Addr: cfg.RedisAddr, // e.g., "redis:6379" }) svc := &Service{ repo: repo, Hub: hub, mongoLogger: mongoLogger, logger: logger, connections: sync.Map{}, notificationCh: make(chan *domain.Notification, 1000), stopCh: make(chan struct{}), messengerSvc: messengerSvc, userSvc: userSvc, config: cfg, redisClient: rdb, } go hub.Run() go svc.startWorker() go svc.startRetryWorker() go svc.RunRedisSubscriber(context.Background()) return svc } func (s *Service) addConnection(recipientID int64, c *websocket.Conn) error { if c == nil { s.mongoLogger.Warn("[NotificationSvc.AddConnection] Attempted to add nil WebSocket connection", zap.Int64("recipientID", recipientID), zap.Time("timestamp", time.Now()), ) return fmt.Errorf("Invalid Websocket Connection") } s.connections.Store(recipientID, c) s.mongoLogger.Info("[NotificationSvc.AddConnection] Added WebSocket connection", zap.Int64("recipientID", recipientID), zap.Time("timestamp", time.Now()), ) return nil } func (s *Service) SendNotification(ctx context.Context, notification *domain.Notification) error { notification.ID = helpers.GenerateID() notification.Timestamp = time.Now() notification.DeliveryStatus = domain.DeliveryStatusPending created, err := s.repo.CreateNotification(ctx, notification) if err != nil { s.mongoLogger.Error("[NotificationSvc.SendNotification] Failed to create notification", zap.String("id", notification.ID), zap.Error(err), zap.Time("timestamp", time.Now()), ) return err } notification = created if notification.DeliveryChannel == domain.DeliveryChannelInApp { s.Hub.Broadcast <- map[string]interface{}{ "type": "CREATED_NOTIFICATION", "recipient_id": notification.RecipientID, "payload": notification, } } select { case s.notificationCh <- notification: default: s.mongoLogger.Warn("[NotificationSvc.SendNotification] Notification channel full, dropping notification", zap.String("id", notification.ID), zap.Time("timestamp", time.Now()), ) } return nil } func (s *Service) MarkAsRead(ctx context.Context, notificationIDs []string, recipientID int64) error { for _, notificationID := range notificationIDs { _, err := s.repo.UpdateNotificationStatus(ctx, notificationID, string(domain.DeliveryStatusSent), true, nil) if err != nil { s.mongoLogger.Error("[NotificationSvc.MarkAsRead] Failed to mark notification as read", zap.String("notificationID", notificationID), zap.Int64("recipientID", recipientID), zap.Error(err), zap.Time("timestamp", time.Now()), ) return err } // count, err := s.repo.CountUnreadNotifications(ctx, recipientID) // if err != nil { // s.logger.Error("[NotificationSvc.MarkAsRead] Failed to count unread notifications", "recipientID", recipientID, "error", err) // return err // } // s.Hub.Broadcast <- map[string]interface{}{ // "type": "COUNT_NOT_OPENED_NOTIFICATION", // "recipient_id": recipientID, // "payload": map[string]int{ // "not_opened_notifications_count": int(count), // }, // } s.mongoLogger.Info("[NotificationSvc.MarkAsRead] Notification marked as read", zap.String("notificationID", notificationID), zap.Int64("recipientID", recipientID), zap.Time("timestamp", time.Now()), ) } return nil } func (s *Service) ListNotifications(ctx context.Context, recipientID int64, limit, offset int) ([]domain.Notification, error) { notifications, err := s.repo.ListNotifications(ctx, recipientID, limit, offset) if err != nil { s.mongoLogger.Error("[NotificationSvc.ListNotifications] Failed to list notifications", zap.Int64("recipientID", recipientID), zap.Int("limit", limit), zap.Int("offset", offset), zap.Error(err), zap.Time("timestamp", time.Now()), ) return nil, err } s.mongoLogger.Info("[NotificationSvc.ListNotifications] Successfully listed notifications", zap.Int64("recipientID", recipientID), zap.Int("count", len(notifications)), zap.Time("timestamp", time.Now()), ) return notifications, nil } func (s *Service) GetAllNotifications(ctx context.Context, limit, offset int) ([]domain.Notification, error) { notifications, err := s.repo.GetAllNotifications(ctx, limit, offset) if err != nil { s.mongoLogger.Error("[NotificationSvc.ListNotifications] Failed to get all notifications", zap.Int("limit", limit), zap.Int("offset", offset), zap.Error(err), zap.Time("timestamp", time.Now()), ) return nil, err } s.mongoLogger.Info("[NotificationSvc.ListNotifications] Successfully retrieved all notifications", zap.Int("count", len(notifications)), zap.Time("timestamp", time.Now()), ) return notifications, nil } func (s *Service) ConnectWebSocket(ctx context.Context, recipientID int64, c *websocket.Conn) error { err := s.addConnection(recipientID, c) if err != nil { s.mongoLogger.Error("[NotificationSvc.ConnectWebSocket] Failed to create WebSocket connection", zap.Int64("recipientID", recipientID), zap.Error(err), zap.Time("timestamp", time.Now()), ) return err } s.mongoLogger.Info("[NotificationSvc.ConnectWebSocket] WebSocket connection established", zap.Int64("recipientID", recipientID), zap.Time("timestamp", time.Now()), ) return nil } func (s *Service) DisconnectWebSocket(recipientID int64) { if conn, loaded := s.connections.LoadAndDelete(recipientID); loaded { conn.(*websocket.Conn).Close() // s.logger.Info("[NotificationSvc.DisconnectWebSocket] Disconnected WebSocket", "recipientID", recipientID) s.mongoLogger.Info("[NotificationSvc.DisconnectWebSocket] Disconnected WebSocket", zap.Int64("recipientID", recipientID), zap.Time("timestamp", time.Now()), ) } } // func (s *Service) SendSMS(ctx context.Context, recipientID int64, message string) error { // s.logger.Info("[NotificationSvc.SendSMS] SMS notification requested", "recipientID", recipientID, "message", message) // apiKey := s.config.AFRO_SMS_API_KEY // senderName := s.config.AFRO_SMS_SENDER_NAME // receiverPhone := s.config.AFRO_SMS_RECEIVER_PHONE_NUMBER // hostURL := s.config.ADRO_SMS_HOST_URL // endpoint := "/api/send" // request := afro.GetRequest(apiKey, endpoint, hostURL) // request.Method = "GET" // request.Sender(senderName) // request.To(receiverPhone, message) // response, err := afro.MakeRequestWithContext(ctx, request) // if err != nil { // s.logger.Error("[NotificationSvc.SendSMS] Failed to send SMS", "recipientID", recipientID, "error", err) // return err // } // if response["acknowledge"] == "success" { // s.logger.Info("[NotificationSvc.SendSMS] SMS sent successfully", "recipientID", recipientID) // } else { // s.logger.Error("[NotificationSvc.SendSMS] Failed to send SMS", "recipientID", recipientID, "response", response["response"]) // return errors.New("SMS delivery failed: " + response["response"].(string)) // } // return nil // } // func (s *Service) SendEmail(ctx context.Context, recipientID int64, subject, message string) error { // s.logger.Info("[NotificationSvc.SendEmail] Email notification requested", "recipientID", recipientID, "subject", subject) // return nil // } func (s *Service) startWorker() { for { select { case notification := <-s.notificationCh: s.handleNotification(notification) case <-s.stopCh: s.mongoLogger.Info("[NotificationSvc.StartWorker] Worker stopped", zap.Time("timestamp", time.Now()), ) return } } } func (s *Service) ListRecipientIDs(ctx context.Context, receiver domain.NotificationRecieverSide) ([]int64, error) { return s.repo.ListRecipientIDs(ctx, receiver) } func (s *Service) handleNotification(notification *domain.Notification) { ctx := context.Background() switch notification.DeliveryChannel { case domain.DeliveryChannelSMS: err := s.SendNotificationSMS(ctx, notification.RecipientID, notification.Payload.Message) if err != nil { notification.DeliveryStatus = domain.DeliveryStatusFailed } else { notification.DeliveryStatus = domain.DeliveryStatusSent } case domain.DeliveryChannelEmail: err := s.SendNotificationEmail(ctx, notification.RecipientID, notification.Payload.Headline, notification.Payload.Message) if err != nil { notification.DeliveryStatus = domain.DeliveryStatusFailed } else { notification.DeliveryStatus = domain.DeliveryStatusSent } default: if notification.DeliveryChannel != domain.DeliveryChannelInApp { s.mongoLogger.Warn("[NotificationSvc.HandleNotification] Unsupported delivery channel", zap.String("channel", string(notification.DeliveryChannel)), zap.Time("timestamp", time.Now()), ) notification.DeliveryStatus = domain.DeliveryStatusFailed } } if _, err := s.repo.UpdateNotificationStatus(ctx, notification.ID, string(notification.DeliveryStatus), notification.IsRead, notification.Metadata); err != nil { s.mongoLogger.Error("[NotificationSvc.HandleNotification] Failed to update notification status", zap.String("id", notification.ID), zap.Error(err), zap.Time("timestamp", time.Now()), ) } } func (s *Service) SendNotificationSMS(ctx context.Context, recipientID int64, message string) error { // Get User Phone Number user, err := s.userSvc.GetUserByID(ctx, recipientID) if err != nil { return err } if !user.PhoneVerified { return fmt.Errorf("Cannot send notification to unverified phone number") } if user.PhoneNumber == "" { return fmt.Errorf("Phone Number is invalid") } err = s.messengerSvc.SendSMS(ctx, user.PhoneNumber, message, user.CompanyID) if err != nil { return err } return nil } func (s *Service) SendNotificationEmail(ctx context.Context, recipientID int64, message string, subject string) error { // Get User Phone Number user, err := s.userSvc.GetUserByID(ctx, recipientID) if err != nil { return err } if !user.EmailVerified { return fmt.Errorf("Cannot send notification to unverified email") } if user.PhoneNumber == "" { return fmt.Errorf("Email is invalid") } err = s.messengerSvc.SendEmail(ctx, user.PhoneNumber, message, subject) if err != nil { return err } return nil } func (s *Service) startRetryWorker() { ticker := time.NewTicker(1 * time.Minute) defer ticker.Stop() for { select { case <-ticker.C: s.retryFailedNotifications() case <-s.stopCh: s.mongoLogger.Info("[NotificationSvc.StartRetryWorker] Retry worker stopped", zap.Time("timestamp", time.Now()), ) return } } } func (s *Service) retryFailedNotifications() { ctx := context.Background() failedNotifications, err := s.repo.ListFailedNotifications(ctx, 100) if err != nil { s.mongoLogger.Error("[NotificationSvc.RetryFailedNotifications] Failed to list failed notifications", zap.Error(err), zap.Time("timestamp", time.Now()), ) return } for _, n := range failedNotifications { notification := &n go func(notification *domain.Notification) { for attempt := 0; attempt < 3; attempt++ { time.Sleep(time.Duration(attempt) * time.Second) switch notification.DeliveryChannel { case domain.DeliveryChannelSMS: if err := s.SendNotificationSMS(ctx, notification.RecipientID, notification.Payload.Message); err == nil { notification.DeliveryStatus = domain.DeliveryStatusSent if _, err := s.repo.UpdateNotificationStatus(ctx, notification.ID, string(notification.DeliveryStatus), notification.IsRead, notification.Metadata); err != nil { s.mongoLogger.Error("[NotificationSvc.RetryFailedNotifications] Failed to update after retry", zap.String("id", notification.ID), zap.Error(err), zap.Time("timestamp", time.Now()), ) } else { s.mongoLogger.Info("[NotificationSvc.RetryFailedNotifications] Successfully retried notification", zap.String("id", notification.ID), zap.Time("timestamp", time.Now()), ) } return } case domain.DeliveryChannelEmail: if err := s.SendNotificationEmail(ctx, notification.RecipientID, notification.Payload.Headline, notification.Payload.Message); err == nil { notification.DeliveryStatus = domain.DeliveryStatusSent if _, err := s.repo.UpdateNotificationStatus(ctx, notification.ID, string(notification.DeliveryStatus), notification.IsRead, notification.Metadata); err != nil { s.mongoLogger.Error("[NotificationSvc.RetryFailedNotifications] Failed to update after retry", zap.String("id", notification.ID), zap.Error(err), zap.Time("timestamp", time.Now()), ) } else { s.mongoLogger.Info("[NotificationSvc.RetryFailedNotifications] Successfully retried notification", zap.String("id", notification.ID), zap.Time("timestamp", time.Now()), ) } return } } } s.mongoLogger.Error("[NotificationSvc.RetryFailedNotifications] Max retries reached for notification", zap.String("id", notification.ID), zap.Error(err), zap.Time("timestamp", time.Now()), ) }(notification) } } func (s *Service) CountUnreadNotifications(ctx context.Context, recipient_id int64) (int64, error) { return s.repo.CountUnreadNotifications(ctx, recipient_id) } // func (s *Service) GetNotificationCounts(ctx context.Context, filter domain.ReportFilter) (total, read, unread int64, err error){ // return s.repo.Get(ctx, filter) // } func (s *Service) RunRedisSubscriber(ctx context.Context) { pubsub := s.redisClient.Subscribe(ctx, "live_metrics") defer pubsub.Close() ch := pubsub.Channel() for msg := range ch { var parsed map[string]interface{} if err := json.Unmarshal([]byte(msg.Payload), &parsed); err != nil { // s.logger.Error("invalid Redis message format", "payload", msg.Payload, "error", err) s.mongoLogger.Error("invalid Redis message format", zap.String("payload", msg.Payload), zap.Error(err), zap.Time("timestamp", time.Now()), ) continue } eventType, _ := parsed["type"].(string) payload := parsed["payload"] recipientID, hasRecipient := parsed["recipient_id"] recipientType, _ := parsed["recipient_type"].(string) message := map[string]interface{}{ "type": eventType, "payload": payload, } if hasRecipient { message["recipient_id"] = recipientID message["recipient_type"] = recipientType } s.Hub.Broadcast <- message } } func (s *Service) UpdateLiveWalletMetrics(ctx context.Context, companies []domain.GetCompany, branches []domain.BranchWallet) error { const key = "live_metrics" companyBalances := make([]domain.CompanyWalletBalance, 0, len(companies)) for _, c := range companies { companyBalances = append(companyBalances, domain.CompanyWalletBalance{ CompanyID: c.ID, CompanyName: c.Name, Balance: float64(c.WalletBalance.Float32()), }) } branchBalances := make([]domain.BranchWalletBalance, 0, len(branches)) for _, b := range branches { branchBalances = append(branchBalances, domain.BranchWalletBalance{ BranchID: b.ID, BranchName: b.Name, CompanyID: b.CompanyID, Balance: float64(b.Balance.Float32()), }) } payload := domain.LiveWalletMetrics{ Timestamp: time.Now(), CompanyBalances: companyBalances, BranchBalances: branchBalances, } updatedData, err := json.Marshal(payload) if err != nil { return err } if err := s.redisClient.Set(ctx, key, updatedData, 0).Err(); err != nil { return err } if err := s.redisClient.Publish(ctx, key, updatedData).Err(); err != nil { return err } return nil } func (s *Service) GetLiveMetrics(ctx context.Context) (domain.LiveMetric, error) { const key = "live_metrics" var metric domain.LiveMetric val, err := s.redisClient.Get(ctx, key).Result() if err == redis.Nil { // Key does not exist yet, return zero-valued struct return domain.LiveMetric{}, nil } else if err != nil { return domain.LiveMetric{}, err } if err := json.Unmarshal([]byte(val), &metric); err != nil { return domain.LiveMetric{}, err } return metric, nil } // func (s *Service) UpdateLiveWalletMetricForWallet(ctx context.Context, wallet domain.Wallet) { // var ( // payload domain.LiveWalletMetrics // event map[string]interface{} // key = "live_metrics" // ) // // Try company first // company, companyErr := s.notificationStore.GetCompanyByWalletID(ctx, wallet.ID) // if companyErr == nil { // payload = domain.LiveWalletMetrics{ // Timestamp: time.Now(), // CompanyBalances: []domain.CompanyWalletBalance{{ // CompanyID: company.ID, // CompanyName: company.Name, // Balance: float64(wallet.Balance), // }}, // BranchBalances: []domain.BranchWalletBalance{}, // } // event = map[string]interface{}{ // "type": "LIVE_WALLET_METRICS_UPDATE", // "recipient_id": company.ID, // "recipient_type": "company", // "payload": payload, // } // } else { // // Try branch next // branch, branchErr := s.notificationStore.GetBranchByWalletID(ctx, wallet.ID) // if branchErr == nil { // payload = domain.LiveWalletMetrics{ // Timestamp: time.Now(), // CompanyBalances: []domain.CompanyWalletBalance{}, // BranchBalances: []domain.BranchWalletBalance{{ // BranchID: branch.ID, // BranchName: branch.Name, // CompanyID: branch.CompanyID, // Balance: float64(wallet.Balance), // }}, // } // event = map[string]interface{}{ // "type": "LIVE_WALLET_METRICS_UPDATE", // "recipient_id": branch.ID, // "recipient_type": "branch", // "payload": payload, // } // } else { // // Neither company nor branch matched this wallet // // s.logger.Warn("wallet not linked to any company or branch", "walletID", wallet.ID) // s.mongoLogger.Warn("wallet not linked to any company or branch", // zap.Int64("walletID", wallet.ID), // zap.Time("timestamp", time.Now()), // ) // return // } // } // // Save latest metric to Redis // if jsonBytes, err := json.Marshal(payload); err == nil { // s.redisClient.Set(ctx, key, jsonBytes, 0) // } else { // // s.logger.Error("failed to marshal wallet metrics payload", "walletID", wallet.ID, "err", err) // s.mongoLogger.Error("failed to marshal wallet metrics payload", // zap.Int64("walletID", wallet.ID), // zap.Error(err), // zap.Time("timestamp", time.Now()), // ) // } // // Publish via Redis // if jsonEvent, err := json.Marshal(event); err == nil { // s.redisClient.Publish(ctx, key, jsonEvent) // } else { // // s.logger.Error("failed to marshal event payload", "walletID", wallet.ID, "err", err) // s.mongoLogger.Error("failed to marshal event payload", // zap.Int64("walletID", wallet.ID), // zap.Error(err), // zap.Time("timestamp", time.Now()), // ) // } // // Broadcast over WebSocket // s.Hub.Broadcast <- event // }