package notificationservice import ( "Yimaru-Backend/internal/config" "Yimaru-Backend/internal/domain" "Yimaru-Backend/internal/pkgs/helpers" "Yimaru-Backend/internal/ports" "Yimaru-Backend/internal/services/messenger" "Yimaru-Backend/internal/services/user" "Yimaru-Backend/internal/web_server/ws" "context" "encoding/json" "fmt" "io" "net/http" "net/url" "strconv" // "errors" "log/slog" "sync" "time" // "github.com/segmentio/kafka-go" "go.uber.org/zap" firebase "firebase.google.com/go/v4" "firebase.google.com/go/v4/messaging" "github.com/gorilla/websocket" "github.com/resend/resend-go/v2" "google.golang.org/api/option" // "github.com/redis/go-redis/v9" ) type Service struct { store ports.NotificationStore Hub *ws.NotificationHub connections sync.Map notificationCh chan *domain.Notification stopCh chan struct{} config *config.Config userSvc *user.Service messengerSvc *messenger.Service mongoLogger *zap.Logger logger *slog.Logger fcmClient *messaging.Client } func New( store ports.NotificationStore, mongoLogger *zap.Logger, logger *slog.Logger, cfg *config.Config, messengerSvc *messenger.Service, userSvc *user.Service, ) *Service { hub := ws.NewNotificationHub() svc := &Service{ store: store, Hub: hub, mongoLogger: mongoLogger, logger: logger, connections: sync.Map{}, notificationCh: make(chan *domain.Notification, 1000), stopCh: make(chan struct{}), messengerSvc: messengerSvc, userSvc: userSvc, config: cfg, } // Initialize FCM client if service account key is provided if cfg.FCMServiceAccountKey != "" { if err := svc.initFCMClient(); err != nil { mongoLogger.Error("Failed to initialize FCM client", zap.Error(err)) } } go hub.Run() go svc.startWorker() go svc.startSchedulerWorker() // go svc.startRetryWorker() // go svc.RunRedisSubscriber(context.Background()) // go svc.StartKafkaConsumer(context.Background()) return svc } func (s *Service) initFCMClient() error { ctx := context.Background() // Prepare client options; if a service account JSON string is provided, use it. var opts []option.ClientOption if s.config.FCMServiceAccountKey != "" { opts = append(opts, option.WithCredentialsJSON([]byte(s.config.FCMServiceAccountKey))) } // Initialize Firebase app app, err := firebase.NewApp(ctx, nil, opts...) if err != nil { return fmt.Errorf("failed to initialize Firebase app: %w", err) } // Get messaging client client, err := app.Messaging(ctx) if err != nil { return fmt.Errorf("failed to get FCM client: %w", err) } s.fcmClient = client return nil } func (s *Service) SendAfroMessageSMSTemp( ctx context.Context, receiverPhone string, message string, callbackURL *string, // optional ) error { baseURL := s.config.AFROSMSConfig.AfroSMSBaseURL // Build query parameters explicitly params := url.Values{} params.Set("to", receiverPhone) params.Set("message", message) params.Set("sender", s.config.AFRO_SMS_SENDER_NAME) // Optional parameters if s.config.AFROSMSConfig.AfroSMSIdentifierID != "" { params.Set("from", s.config.AFROSMSConfig.AfroSMSIdentifierID) } if callbackURL != nil { params.Set("callback", *callbackURL) } // Construct full URL reqURL := fmt.Sprintf("%s/api/send?%s", baseURL, params.Encode()) req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqURL, nil) if err != nil { return err } // AfroMessage authentication (API key) req.Header.Set("Authorization", "Bearer "+s.config.AFRO_SMS_API_KEY) req.Header.Set("Accept", "application/json") client := &http.Client{ Timeout: 10 * time.Second, } resp, err := client.Do(req) if err != nil { return err } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { return err } if resp.StatusCode != http.StatusOK { return fmt.Errorf( "afromessage sms failed: status=%d response=%s", resp.StatusCode, string(body), ) } // Parse response var result map[string]interface{} if err := json.Unmarshal(body, &result); err != nil { return err } ack, ok := result["acknowledge"].(string) if !ok || ack != "success" { return fmt.Errorf("sms delivery failed: %v", result) } return nil } func (s *Service) addConnection(recipientID int64, c *websocket.Conn) error { if c == nil { s.mongoLogger.Warn("[NotificationSvc.AddConnection] Attempted to add nil WebSocket connection", zap.Int64("recipientID", recipientID), zap.Time("timestamp", time.Now()), ) return fmt.Errorf("Invalid Websocket Connection") } s.connections.Store(recipientID, c) s.mongoLogger.Info("[NotificationSvc.AddConnection] Added WebSocket connection", zap.Int64("recipientID", recipientID), zap.Time("timestamp", time.Now()), ) return nil } func (s *Service) GetFilteredNotifications(ctx context.Context, filter domain.NotificationFilter) ([]domain.Notification, int64, error) { return s.store.GetFilteredNotifications(ctx, filter) } // RecordNotification saves a notification to the database for history/audit without dispatching to the worker. func (s *Service) RecordNotification(ctx context.Context, recipientID int64, notifType domain.NotificationType, channel domain.DeliveryChannel, level domain.NotificationLevel, headline, message string) { notification := &domain.Notification{ ID: helpers.GenerateID(), RecipientID: recipientID, Type: notifType, Level: level, DeliveryChannel: channel, DeliveryStatus: domain.DeliveryStatusSent, Payload: domain.NotificationPayload{ Headline: headline, Message: message, }, Timestamp: time.Now(), } if _, err := s.store.CreateNotification(ctx, notification); err != nil { s.mongoLogger.Error("[NotificationSvc.RecordNotification] Failed to record notification", zap.Int64("recipientID", recipientID), zap.String("channel", string(channel)), zap.Error(err), ) } } func (s *Service) SendNotification(ctx context.Context, notification *domain.Notification) error { notification.ID = helpers.GenerateID() notification.Timestamp = time.Now() notification.DeliveryStatus = domain.DeliveryStatusPending created, err := s.store.CreateNotification(ctx, notification) if err != nil { s.mongoLogger.Error("[NotificationSvc.SendNotification] Failed to create notification", zap.String("id", notification.ID), zap.Error(err), zap.Time("timestamp", time.Now()), ) return err } notification = created if notification.DeliveryChannel == domain.DeliveryChannelInApp { s.Hub.Broadcast <- map[string]interface{}{ "type": "CREATED_NOTIFICATION", "recipient_id": notification.RecipientID, "payload": notification, } } select { case s.notificationCh <- notification: default: s.mongoLogger.Warn("[NotificationSvc.SendNotification] Notification channel full, dropping notification", zap.String("id", notification.ID), zap.Time("timestamp", time.Now()), ) } return nil } // func (s *Service) MarkAsRead(ctx context.Context, notificationIDs []string, recipientID int64) error { // for _, notificationID := range notificationIDs { // _, err := s.store.UpdateNotificationStatus(ctx, notificationID, string(domain.DeliveryStatusSent), true, nil) // if err != nil { // s.mongoLogger.Error("[NotificationSvc.MarkAsRead] Failed to mark notification as read", // zap.String("notificationID", notificationID), // zap.Int64("recipientID", recipientID), // zap.Error(err), // zap.Time("timestamp", time.Now()), // ) // return err // } // // count, err := s.store.CountUnreadNotifications(ctx, recipientID) // // if err != nil { // // s.logger.Error("[NotificationSvc.MarkAsRead] Failed to count unread notifications", "recipientID", recipientID, "error", err) // // return err // // } // // s.Hub.Broadcast <- map[string]interface{}{ // // "type": "COUNT_NOT_OPENED_NOTIFICATION", // // "recipient_id": recipientID, // // "payload": map[string]int{ // // "not_opened_notifications_count": int(count), // // }, // // } // s.mongoLogger.Info("[NotificationSvc.MarkAsRead] Notification marked as read", // zap.String("notificationID", notificationID), // zap.Int64("recipientID", recipientID), // zap.Time("timestamp", time.Now()), // ) // } // return nil // } func (s *Service) GetUserNotifications(ctx context.Context, recipientID int64, limit, offset int) ([]domain.Notification, int64, error) { notifications, total, err := s.store.GetUserNotifications(ctx, recipientID, limit, offset) if err != nil { s.mongoLogger.Error("[NotificationSvc.GetUserNotifications] Failed to list notifications", zap.Int64("recipientID", recipientID), zap.Int("limit", limit), zap.Int("offset", offset), zap.Error(err), zap.Time("timestamp", time.Now()), ) return nil, 0, err } s.mongoLogger.Info("[NotificationSvc.GetUserNotifications] Successfully listed notifications", zap.Int64("recipientID", recipientID), zap.Int("count", len(notifications)), zap.Int64("total", total), zap.Time("timestamp", time.Now()), ) return notifications, total, nil } func (s *Service) GetAllNotifications(ctx context.Context, limit, offset int) ([]domain.Notification, error) { notifications, err := s.store.GetAllNotifications(ctx, limit, offset) if err != nil { s.mongoLogger.Error("[NotificationSvc.ListNotifications] Failed to get all notifications", zap.Int("limit", limit), zap.Int("offset", offset), zap.Error(err), zap.Time("timestamp", time.Now()), ) return nil, err } s.mongoLogger.Info("[NotificationSvc.ListNotifications] Successfully retrieved all notifications", zap.Int("count", len(notifications)), zap.Time("timestamp", time.Now()), ) return notifications, nil } func (s *Service) ConnectWebSocket(ctx context.Context, recipientID int64, c *websocket.Conn) error { err := s.addConnection(recipientID, c) if err != nil { s.mongoLogger.Error("[NotificationSvc.ConnectWebSocket] Failed to create WebSocket connection", zap.Int64("recipientID", recipientID), zap.Error(err), zap.Time("timestamp", time.Now()), ) return err } s.mongoLogger.Info("[NotificationSvc.ConnectWebSocket] WebSocket connection established", zap.Int64("recipientID", recipientID), zap.Time("timestamp", time.Now()), ) return nil } func (s *Service) DisconnectWebSocket(recipientID int64) { if conn, loaded := s.connections.LoadAndDelete(recipientID); loaded { conn.(*websocket.Conn).Close() // s.logger.Info("[NotificationSvc.DisconnectWebSocket] Disconnected WebSocket", "recipientID", recipientID) s.mongoLogger.Info("[NotificationSvc.DisconnectWebSocket] Disconnected WebSocket", zap.Int64("recipientID", recipientID), zap.Time("timestamp", time.Now()), ) } } // func (s *Service) SendSMS(ctx context.Context, recipientID int64, message string) error { // s.logger.Info("[NotificationSvc.SendSMS] SMS notification requested", "recipientID", recipientID, "message", message) // apiKey := s.config.AFRO_SMS_API_KEY // senderName := s.config.AFRO_SMS_SENDER_NAME // receiverPhone := s.config.AFRO_SMS_RECEIVER_PHONE_NUMBER // hostURL := s.config.AFRO_SMS_HOST_URL // endpoint := "/api/send" // request := afro.GetRequest(apiKey, endpoint, hostURL) // request.Method = "GET" // request.Sender(senderName) // request.To(receiverPhone, message) // response, err := afro.MakeRequestWithContext(ctx, request) // if err != nil { // s.logger.Error("[NotificationSvc.SendSMS] Failed to send SMS", "recipientID", recipientID, "error", err) // return err // } // if response["acknowledge"] == "success" { // s.logger.Info("[NotificationSvc.SendSMS] SMS sent successfully", "recipientID", recipientID) // } else { // s.logger.Error("[NotificationSvc.SendSMS] Failed to send SMS", "recipientID", recipientID, "response", response["response"]) // return errors.New("SMS delivery failed: " + response["response"].(string)) // } // return nil // } // func (s *Service) SendEmail(ctx context.Context, recipientID int64, subject, message string) error { // s.logger.Info("[NotificationSvc.SendEmail] Email notification requested", "recipientID", recipientID, "subject", subject) // return nil // } func (s *Service) startWorker() { for { select { case notification := <-s.notificationCh: s.handleNotification(notification) case <-s.stopCh: s.mongoLogger.Info("[NotificationSvc.StartWorker] Worker stopped", zap.Time("timestamp", time.Now()), ) return } } } // func (s *Service) ListRecipientIDs(ctx context.Context, receiver domain.NotificationRecieverSide) ([]int64, error) { // return s.store.ListRecipientIDs(ctx, receiver) // } func (s *Service) handleNotification(notification *domain.Notification) { ctx := context.Background() switch notification.DeliveryChannel { case domain.DeliveryChannelSMS: err := s.SendNotificationSMS(ctx, notification.RecipientID, notification.Payload.Message) if err != nil { notification.DeliveryStatus = domain.DeliveryStatusFailed } else { notification.DeliveryStatus = domain.DeliveryStatusSent } case domain.DeliveryChannelEmail: err := s.SendNotificationEmail(ctx, notification.RecipientID, notification.Payload.Message, notification.Payload.Headline) if err != nil { notification.DeliveryStatus = domain.DeliveryStatusFailed } else { notification.DeliveryStatus = domain.DeliveryStatusSent } case domain.DeliveryChannelPush: err := s.SendPushNotification(ctx, notification) if err != nil { notification.DeliveryStatus = domain.DeliveryStatusFailed } else { notification.DeliveryStatus = domain.DeliveryStatusSent } case domain.DeliveryChannelInApp: notification.DeliveryStatus = domain.DeliveryStatusSent default: s.mongoLogger.Warn("[NotificationSvc.HandleNotification] Unsupported delivery channel", zap.String("channel", string(notification.DeliveryChannel)), zap.Time("timestamp", time.Now()), ) notification.DeliveryStatus = domain.DeliveryStatusFailed } // if _, err := s.store.UpdateNotificationStatus(ctx, notification.ID, string(notification.DeliveryStatus), notification.IsRead, notification.Metadata); err != nil { // s.mongoLogger.Error("[NotificationSvc.HandleNotification] Failed to update notification status", // zap.String("id", notification.ID), // zap.Error(err), // zap.Time("timestamp", time.Now()), // ) // } } func (s *Service) SendNotificationSMS(ctx context.Context, recipientID int64, message string) error { // Get User Phone Number user, err := s.userSvc.GetUserByID(ctx, recipientID) if err != nil { return err } if !user.PhoneVerified { return fmt.Errorf("cannot send notification to unverified phone number") } if user.PhoneNumber == "" { return fmt.Errorf("phone Number is invalid") } err = s.messengerSvc.SendSMS(ctx, user.PhoneNumber, message) if err != nil { s.mongoLogger.Error("[NotificationSvc.HandleNotification] Failed to send notification SMS", zap.Int64("recipient_id", recipientID), zap.String("user_phone_number", user.PhoneNumber), zap.String("message", message), zap.Error(err), zap.Time("timestamp", time.Now()), ) return err } return nil } func (s *Service) SendNotificationEmail(ctx context.Context, recipientID int64, message string, subject string) error { // Get User Phone Number user, err := s.userSvc.GetUserByID(ctx, recipientID) if err != nil { return err } if !user.EmailVerified { return fmt.Errorf("cannot send notification to unverified email") } if user.Email == "" { return fmt.Errorf("email is invalid") } err = s.messengerSvc.SendEmail(ctx, user.Email, message, message, subject) if err != nil { s.mongoLogger.Error("[NotificationSvc.HandleNotification] Failed to send notification SMS", zap.Int64("recipient_id", recipientID), zap.String("user_email", user.Email), zap.String("message", message), zap.Error(err), zap.Time("timestamp", time.Now()), ) return err } return nil } func (s *Service) SendPushNotification(ctx context.Context, notification *domain.Notification) error { if s.fcmClient == nil { return fmt.Errorf("FCM client not initialized") } // Get user device tokens tokens, err := s.userSvc.GetUserDeviceTokens(ctx, notification.RecipientID) if err != nil { return fmt.Errorf("failed to get user device tokens: %w", err) } if len(tokens) == 0 { s.mongoLogger.Info("[NotificationSvc.SendPushNotification] No device tokens found for user", zap.Int64("userID", notification.RecipientID), zap.Time("timestamp", time.Now()), ) return nil // Not an error, just no devices to send to } // Create FCM message message := &messaging.Message{ Notification: &messaging.Notification{ Title: notification.Payload.Headline, Body: notification.Payload.Message, ImageURL: notification.Image, }, Data: map[string]string{ "type": string(notification.Type), "recipient_id": strconv.FormatInt(notification.RecipientID, 10), "notification_id": notification.ID, }, } // Send to all user devices for _, token := range tokens { message.Token = token response, err := s.fcmClient.Send(ctx, message) if err != nil { s.mongoLogger.Error("[NotificationSvc.SendPushNotification] Failed to send FCM message", zap.String("token", token), zap.Int64("userID", notification.RecipientID), zap.Error(err), zap.Time("timestamp", time.Now()), ) // Check if token is invalid/unregistered and deactivate it if messaging.IsUnregistered(err) || messaging.IsInvalidArgument(err) { if deactivateErr := s.userSvc.DeactivateDevice(ctx, notification.RecipientID, token); deactivateErr != nil { s.mongoLogger.Warn("[NotificationSvc.SendPushNotification] Failed to deactivate invalid token", zap.String("token", token), zap.Int64("userID", notification.RecipientID), zap.Error(deactivateErr), zap.Time("timestamp", time.Now()), ) } else { s.mongoLogger.Info("[NotificationSvc.SendPushNotification] Deactivated invalid FCM token", zap.String("token", token), zap.Int64("userID", notification.RecipientID), zap.Time("timestamp", time.Now()), ) } } continue // Continue with other tokens } s.mongoLogger.Info("[NotificationSvc.SendPushNotification] FCM message sent successfully", zap.String("response", response), zap.String("token", token), zap.Int64("userID", notification.RecipientID), zap.Time("timestamp", time.Now()), ) } return nil } // SendBulkPushNotification sends a push notification to multiple users using FCM multicast. // It collects all device tokens for the given user IDs and sends in batches of 500 (FCM limit). func (s *Service) MessengerSvc() *messenger.Service { return s.messengerSvc } func (s *Service) SendBulkPushNotification(ctx context.Context, userIDs []int64, notification *domain.Notification) (sent int, failed int, err error) { if s.fcmClient == nil { return 0, 0, fmt.Errorf("FCM client not initialized") } // Collect all device tokens for the given users var allTokens []string tokenUserMap := make(map[string]int64) // token -> userID for cleanup for _, uid := range userIDs { tokens, err := s.userSvc.GetUserDeviceTokens(ctx, uid) if err != nil { s.mongoLogger.Warn("[NotificationSvc.SendBulkPushNotification] Failed to get tokens for user", zap.Int64("userID", uid), zap.Error(err), ) continue } for _, t := range tokens { tokenUserMap[t] = uid } allTokens = append(allTokens, tokens...) } if len(allTokens) == 0 { s.mongoLogger.Info("[NotificationSvc.SendBulkPushNotification] No device tokens found for any user", zap.Int("userCount", len(userIDs)), ) return 0, 0, nil } fcmNotification := &messaging.Notification{ Title: notification.Payload.Headline, Body: notification.Payload.Message, ImageURL: notification.Image, } data := map[string]string{ "type": string(notification.Type), "notification_id": notification.ID, } // FCM multicast supports max 500 tokens per batch const batchSize = 500 for i := 0; i < len(allTokens); i += batchSize { end := i + batchSize if end > len(allTokens) { end = len(allTokens) } batch := allTokens[i:end] msg := &messaging.MulticastMessage{ Notification: fcmNotification, Data: data, Tokens: batch, } resp, err := s.fcmClient.SendEachForMulticast(ctx, msg) if err != nil { s.mongoLogger.Error("[NotificationSvc.SendBulkPushNotification] Multicast send failed", zap.Error(err), zap.Int("batchSize", len(batch)), ) failed += len(batch) continue } sent += resp.SuccessCount failed += resp.FailureCount // Deactivate invalid tokens for j, sendResp := range resp.Responses { if sendResp.Error != nil && (messaging.IsUnregistered(sendResp.Error) || messaging.IsInvalidArgument(sendResp.Error)) { token := batch[j] if uid, ok := tokenUserMap[token]; ok { _ = s.userSvc.DeactivateDevice(ctx, uid, token) } } } } s.mongoLogger.Info("[NotificationSvc.SendBulkPushNotification] Bulk push completed", zap.Int("totalTokens", len(allTokens)), zap.Int("sent", sent), zap.Int("failed", failed), ) return sent, failed, nil } // SendBulkSMS sends an SMS to multiple phone numbers using AfroMessage. // It sends sequentially and returns the count of successful and failed deliveries. func (s *Service) SendBulkSMS(ctx context.Context, recipients []string, message string) (sent int, failed int) { for _, phone := range recipients { if err := s.SendAfroMessageSMSTemp(ctx, phone, message, nil); err != nil { s.mongoLogger.Error("[NotificationSvc.SendBulkSMS] Failed to send SMS", zap.String("phone", phone), zap.Error(err), ) failed++ continue } sent++ } s.mongoLogger.Info("[NotificationSvc.SendBulkSMS] Bulk SMS completed", zap.Int("totalRecipients", len(recipients)), zap.Int("sent", sent), zap.Int("failed", failed), ) return sent, failed } // SendBulkEmail sends an email to multiple recipients using the messenger service. // It sends sequentially and returns the count of successful and failed deliveries. func (s *Service) SendBulkEmail(ctx context.Context, recipients []string, subject, message, messageHTML string, attachments []*resend.Attachment) (sent int, failed int) { for _, email := range recipients { if err := s.messengerSvc.SendEmailWithAttachments(ctx, email, message, messageHTML, subject, attachments); err != nil { s.mongoLogger.Error("[NotificationSvc.SendBulkEmail] Failed to send email", zap.String("email", email), zap.Error(err), ) failed++ continue } sent++ } s.mongoLogger.Info("[NotificationSvc.SendBulkEmail] Bulk email completed", zap.Int("totalRecipients", len(recipients)), zap.Int("sent", sent), zap.Int("failed", failed), ) return sent, failed } // func (s *Service) startRetryWorker() { // ticker := time.NewTicker(1 * time.Minute) // defer ticker.Stop() // for { // select { // case <-ticker.C: // s.retryFailedNotifications() // case <-s.stopCh: // s.mongoLogger.Info("[NotificationSvc.StartRetryWorker] Retry worker stopped", // zap.Time("timestamp", time.Now()), // ) // return // } // } // } // func (s *Service) retryFailedNotifications() { // ctx := context.Background() // failedNotifications, err := s.store.ListFailedNotifications(ctx, 100) // if err != nil { // s.mongoLogger.Error("[NotificationSvc.RetryFailedNotifications] Failed to list failed notifications", // zap.Error(err), // zap.Time("timestamp", time.Now()), // ) // return // } // for _, n := range failedNotifications { // notification := &n // go func(notification *domain.Notification) { // for attempt := 0; attempt < 3; attempt++ { // time.Sleep(time.Duration(attempt) * time.Second) // switch notification.DeliveryChannel { // case domain.DeliveryChannelSMS: // if err := s.SendNotificationSMS(ctx, notification.RecipientID, notification.Payload.Message); err == nil { // notification.DeliveryStatus = domain.DeliveryStatusSent // if _, err := s.store.UpdateNotificationStatus(ctx, notification.ID, string(notification.DeliveryStatus), notification.IsRead, notification.Metadata); err != nil { // s.mongoLogger.Error("[NotificationSvc.RetryFailedNotifications] Failed to update after retry", // zap.String("id", notification.ID), // zap.Error(err), // zap.Time("timestamp", time.Now()), // ) // } else { // s.mongoLogger.Info("[NotificationSvc.RetryFailedNotifications] Successfully retried notification", // zap.String("id", notification.ID), // zap.Time("timestamp", time.Now()), // ) // } // return // } // case domain.DeliveryChannelEmail: // if err := s.SendNotificationEmail(ctx, notification.RecipientID, notification.Payload.Message, notification.Payload.Headline); err == nil { // notification.DeliveryStatus = domain.DeliveryStatusSent // if _, err := s.store.UpdateNotificationStatus(ctx, notification.ID, string(notification.DeliveryStatus), notification.IsRead, notification.Metadata); err != nil { // s.mongoLogger.Error("[NotificationSvc.RetryFailedNotifications] Failed to update after retry", // zap.String("id", notification.ID), // zap.Error(err), // zap.Time("timestamp", time.Now()), // ) // } else { // s.mongoLogger.Info("[NotificationSvc.RetryFailedNotifications] Successfully retried notification", // zap.String("id", notification.ID), // zap.Time("timestamp", time.Now()), // ) // } // return // } // } // } // s.mongoLogger.Error("[NotificationSvc.RetryFailedNotifications] Max retries reached for notification", // zap.String("id", notification.ID), // zap.Error(err), // zap.Time("timestamp", time.Now()), // ) // }(notification) // } // } func (s *Service) CountUnreadNotifications(ctx context.Context, recipient_id int64) (int64, error) { return s.store.CountUnreadNotifications(ctx, recipient_id) } func (s *Service) MarkNotificationAsRead(ctx context.Context, id int64) (*domain.Notification, error) { notification, err := s.store.MarkNotificationAsRead(ctx, id) if err != nil { s.mongoLogger.Error("[NotificationSvc.MarkNotificationAsRead] Failed to mark notification as read", zap.Int64("notificationID", id), zap.Error(err), zap.Time("timestamp", time.Now()), ) return nil, err } return notification, nil } func (s *Service) MarkAllUserNotificationsAsRead(ctx context.Context, userID int64) error { err := s.store.MarkAllUserNotificationsAsRead(ctx, userID) if err != nil { s.mongoLogger.Error("[NotificationSvc.MarkAllUserNotificationsAsRead] Failed to mark all notifications as read", zap.Int64("userID", userID), zap.Error(err), zap.Time("timestamp", time.Now()), ) return err } return nil } func (s *Service) MarkNotificationAsUnread(ctx context.Context, id int64) (*domain.Notification, error) { notification, err := s.store.MarkNotificationAsUnread(ctx, id) if err != nil { s.mongoLogger.Error("[NotificationSvc.MarkNotificationAsUnread] Failed to mark notification as unread", zap.Int64("notificationID", id), zap.Error(err), zap.Time("timestamp", time.Now()), ) return nil, err } return notification, nil } func (s *Service) MarkAllUserNotificationsAsUnread(ctx context.Context, userID int64) error { err := s.store.MarkAllUserNotificationsAsUnread(ctx, userID) if err != nil { s.mongoLogger.Error("[NotificationSvc.MarkAllUserNotificationsAsUnread] Failed to mark all notifications as unread", zap.Int64("userID", userID), zap.Error(err), zap.Time("timestamp", time.Now()), ) return err } return nil } func (s *Service) DeleteUserNotifications(ctx context.Context, userID int64) error { err := s.store.DeleteUserNotifications(ctx, userID) if err != nil { s.mongoLogger.Error("[NotificationSvc.DeleteUserNotifications] Failed to delete user notifications", zap.Int64("userID", userID), zap.Error(err), zap.Time("timestamp", time.Now()), ) return err } return nil } // Scheduled Notification Methods func (s *Service) CreateScheduledNotification(ctx context.Context, sn *domain.ScheduledNotification) (*domain.ScheduledNotification, error) { created, err := s.store.CreateScheduledNotification(ctx, sn) if err != nil { s.mongoLogger.Error("[NotificationSvc.CreateScheduledNotification] Failed to create", zap.String("channel", string(sn.Channel)), zap.Error(err), ) return nil, err } s.mongoLogger.Info("[NotificationSvc.CreateScheduledNotification] Created", zap.Int64("id", created.ID), zap.String("channel", string(created.Channel)), zap.Time("scheduledAt", created.ScheduledAt), ) return created, nil } func (s *Service) GetScheduledNotification(ctx context.Context, id int64) (*domain.ScheduledNotification, error) { return s.store.GetScheduledNotification(ctx, id) } func (s *Service) ListScheduledNotifications(ctx context.Context, filter domain.ScheduledNotificationFilter) ([]domain.ScheduledNotification, int64, error) { return s.store.ListScheduledNotifications(ctx, filter) } func (s *Service) CancelScheduledNotification(ctx context.Context, id int64) (*domain.ScheduledNotification, error) { cancelled, err := s.store.CancelScheduledNotification(ctx, id) if err != nil { s.mongoLogger.Error("[NotificationSvc.CancelScheduledNotification] Failed to cancel", zap.Int64("id", id), zap.Error(err), ) return nil, err } s.mongoLogger.Info("[NotificationSvc.CancelScheduledNotification] Cancelled", zap.Int64("id", cancelled.ID), ) return cancelled, nil } func (s *Service) startSchedulerWorker() { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: s.processDueScheduledNotifications() case <-s.stopCh: s.mongoLogger.Info("[NotificationSvc.SchedulerWorker] Stopped") return } } } func (s *Service) processDueScheduledNotifications() { ctx := context.Background() claimed, err := s.store.ClaimDueScheduledNotifications(ctx, 20) if err != nil { s.mongoLogger.Error("[NotificationSvc.SchedulerWorker] Failed to claim due notifications", zap.Error(err), ) return } for i := range claimed { sn := &claimed[i] go s.dispatchScheduledNotification(ctx, sn) } } func (s *Service) dispatchScheduledNotification(ctx context.Context, sn *domain.ScheduledNotification) { var dispatchErr error switch sn.Channel { case domain.DeliveryChannelSMS: dispatchErr = s.dispatchScheduledSMS(ctx, sn) case domain.DeliveryChannelEmail: dispatchErr = s.dispatchScheduledEmail(ctx, sn) case domain.DeliveryChannelPush: dispatchErr = s.dispatchScheduledPush(ctx, sn) default: dispatchErr = fmt.Errorf("unsupported channel: %s", sn.Channel) } if dispatchErr != nil { s.mongoLogger.Error("[NotificationSvc.SchedulerWorker] Failed to dispatch", zap.Int64("id", sn.ID), zap.String("channel", string(sn.Channel)), zap.Error(dispatchErr), ) _ = s.store.MarkScheduledNotificationFailed(ctx, sn.ID, dispatchErr.Error()) return } _ = s.store.MarkScheduledNotificationSent(ctx, sn.ID) s.mongoLogger.Info("[NotificationSvc.SchedulerWorker] Dispatched", zap.Int64("id", sn.ID), zap.String("channel", string(sn.Channel)), ) } func (s *Service) resolvePhoneNumbers(ctx context.Context, sn *domain.ScheduledNotification) []string { phoneSet := make(map[string]struct{}) userIDs := sn.TargetUserIDs if len(userIDs) == 0 && sn.TargetRole != "" { users, _, err := s.userSvc.GetAllUsers(ctx, domain.UserFilter{Role: sn.TargetRole}) if err == nil { for _, u := range users { userIDs = append(userIDs, u.ID) } } } for _, uid := range userIDs { user, err := s.userSvc.GetUserByID(ctx, uid) if err == nil && user.PhoneNumber != "" { phoneSet[user.PhoneNumber] = struct{}{} } } if len(sn.TargetRaw) > 0 { var raw domain.ScheduledNotificationTargetRaw if err := json.Unmarshal(sn.TargetRaw, &raw); err == nil { for _, p := range raw.Phones { phoneSet[p] = struct{}{} } } } phones := make([]string, 0, len(phoneSet)) for p := range phoneSet { phones = append(phones, p) } return phones } func (s *Service) resolveEmails(ctx context.Context, sn *domain.ScheduledNotification) []string { emailSet := make(map[string]struct{}) userIDs := sn.TargetUserIDs if len(userIDs) == 0 && sn.TargetRole != "" { users, _, err := s.userSvc.GetAllUsers(ctx, domain.UserFilter{Role: sn.TargetRole}) if err == nil { for _, u := range users { userIDs = append(userIDs, u.ID) } } } for _, uid := range userIDs { user, err := s.userSvc.GetUserByID(ctx, uid) if err == nil && user.Email != "" { emailSet[user.Email] = struct{}{} } } if len(sn.TargetRaw) > 0 { var raw domain.ScheduledNotificationTargetRaw if err := json.Unmarshal(sn.TargetRaw, &raw); err == nil { for _, e := range raw.Emails { emailSet[e] = struct{}{} } } } emails := make([]string, 0, len(emailSet)) for e := range emailSet { emails = append(emails, e) } return emails } func (s *Service) resolveUserIDs(ctx context.Context, sn *domain.ScheduledNotification) []int64 { if len(sn.TargetUserIDs) > 0 { return sn.TargetUserIDs } if sn.TargetRole != "" { users, _, err := s.userSvc.GetAllUsers(ctx, domain.UserFilter{Role: sn.TargetRole}) if err == nil { ids := make([]int64, 0, len(users)) for _, u := range users { ids = append(ids, u.ID) } return ids } } return nil } func (s *Service) dispatchScheduledSMS(ctx context.Context, sn *domain.ScheduledNotification) error { phones := s.resolvePhoneNumbers(ctx, sn) if len(phones) == 0 { return fmt.Errorf("no SMS recipients resolved") } sent, failed := s.SendBulkSMS(ctx, phones, sn.Message) if sent == 0 && failed > 0 { return fmt.Errorf("all %d SMS deliveries failed", failed) } return nil } func (s *Service) dispatchScheduledEmail(ctx context.Context, sn *domain.ScheduledNotification) error { emails := s.resolveEmails(ctx, sn) if len(emails) == 0 { return fmt.Errorf("no email recipients resolved") } sent, failed := s.SendBulkEmail(ctx, emails, sn.Title, sn.Message, sn.HTML, nil) if sent == 0 && failed > 0 { return fmt.Errorf("all %d email deliveries failed", failed) } return nil } func (s *Service) dispatchScheduledPush(ctx context.Context, sn *domain.ScheduledNotification) error { userIDs := s.resolveUserIDs(ctx, sn) if len(userIDs) == 0 { return fmt.Errorf("no push recipients resolved") } notification := &domain.Notification{ Type: domain.NOTIFICATION_TYPE_SYSTEM_ALERT, DeliveryChannel: domain.DeliveryChannelPush, Payload: domain.NotificationPayload{ Headline: sn.Title, Message: sn.Message, }, } sent, failed, err := s.SendBulkPushNotification(ctx, userIDs, notification) if err != nil { return err } if sent == 0 && failed > 0 { return fmt.Errorf("all %d push deliveries failed", failed) } return nil } // func (s *Service) DeleteOldNotifications(ctx context.Context) error { // return s.store.DeleteOldNotifications(ctx) // } // func (s *Service) GetNotificationCounts(ctx context.Context, filter domain.ReportFilter) (total, read, unread int64, err error){ // return s.store.Get(ctx, filter) // } // func (s *Service) RunRedisSubscriber(ctx context.Context) { // pubsub := s.redisClient.Subscribe(ctx, "live_metrics") // defer pubsub.Close() // ch := pubsub.Channel() // for msg := range ch { // var parsed map[string]interface{} // if err := json.Unmarshal([]byte(msg.Payload), &parsed); err != nil { // // s.logger.Error("invalid Redis message format", "payload", msg.Payload, "error", err) // s.mongoLogger.Error("invalid Redis message format", // zap.String("payload", msg.Payload), // zap.Error(err), // zap.Time("timestamp", time.Now()), // ) // continue // } // eventType, _ := parsed["type"].(string) // payload := parsed["payload"] // recipientID, hasRecipient := parsed["recipient_id"] // recipientType, _ := parsed["recipient_type"].(string) // message := map[string]interface{}{ // "type": eventType, // "payload": payload, // } // if hasRecipient { // message["recipient_id"] = recipientID // message["recipient_type"] = recipientType // } // s.Hub.Broadcast <- message // } // } // func (s *Service) UpdateLiveWalletMetrics(ctx context.Context, companies []domain.GetCompany, branches []domain.BranchWallet) error { // const key = "live_metrics" // companyBalances := make([]domain.CompanyWalletBalance, 0, len(companies)) // for _, c := range companies { // companyBalances = append(companyBalances, domain.CompanyWalletBalance{ // CompanyID: c.ID, // CompanyName: c.Name, // Balance: float64(c.WalletBalance.Float32()), // }) // } // branchBalances := make([]domain.BranchWalletBalance, 0, len(branches)) // for _, b := range branches { // branchBalances = append(branchBalances, domain.BranchWalletBalance{ // BranchID: b.ID, // BranchName: b.Name, // CompanyID: b.CompanyID, // Balance: float64(b.Balance.Float32()), // }) // } // payload := domain.LiveWalletMetrics{ // Timestamp: time.Now(), // CompanyBalances: companyBalances, // BranchBalances: branchBalances, // } // updatedData, err := json.Marshal(payload) // if err != nil { // return err // } // if err := s.redisClient.Set(ctx, key, updatedData, 0).Err(); err != nil { // return err // } // if err := s.redisClient.Publish(ctx, key, updatedData).Err(); err != nil { // return err // } // return nil // } // func (s *Service) GetLiveMetrics(ctx context.Context) (domain.LiveMetric, error) { // const key = "live_metrics" // var metric domain.LiveMetric // val, err := s.redisClient.Get(ctx, key).Result() // if err == redis.Nil { // // Key does not exist yet, return zero-valued struct // return domain.LiveMetric{}, nil // } else if err != nil { // return domain.LiveMetric{}, err // } // if err := json.Unmarshal([]byte(val), &metric); err != nil { // return domain.LiveMetric{}, err // } // return metric, nil // } // func (s *Service) StartKafkaConsumer(ctx context.Context) { // go func() { // for { // m, err := s.reader.ReadMessage(ctx) // if err != nil { // if err == context.Canceled { // s.mongoLogger.Info("[NotificationSvc.KafkaConsumer] Stopped by context") // return // } // s.mongoLogger.Error("[NotificationSvc.KafkaConsumer] Error reading message", // zap.Error(err), // zap.Time("timestamp", time.Now()), // ) // time.Sleep(1 * time.Second) // backoff // continue // } // var walletEvent event.WalletEvent // if err := json.Unmarshal(m.Value, &walletEvent); err != nil { // s.mongoLogger.Error("[NotificationSvc.KafkaConsumer] Failed to unmarshal wallet event", // zap.String("message", string(m.Value)), // zap.Error(err), // zap.Time("timestamp", time.Now()), // ) // continue // } // raw, _ := json.Marshal(map[string]any{ // "balance": walletEvent.Balance.Float32(), // "type": walletEvent.WalletType, // "timestamp": time.Now(), // }) // headline := "" // message := "" // var receiver domain.NotificationRecieverSide // switch walletEvent.WalletType { // case domain.StaticWalletType: // headline = "Referral and Bonus Wallet Updated" // message = fmt.Sprintf("Your referral and bonus wallet balance is now %.2f", walletEvent.Balance.Float32()) // receiver = domain.NotificationRecieverSideCustomer // case domain.RegularWalletType: // headline = "Wallet Updated" // message = fmt.Sprintf("Your wallet balance is now %.2f", walletEvent.Balance.Float32()) // receiver = domain.NotificationRecieverSideCustomer // case domain.BranchWalletType: // headline = "Branch Wallet Updated" // message = fmt.Sprintf("branch wallet balance is now %.2f", walletEvent.Balance.Float32()) // receiver = domain.NotificationRecieverSideBranchManager // case domain.CompanyWalletType: // headline = "Company Wallet Updated" // message = fmt.Sprintf("company wallet balance is now %.2f", walletEvent.Balance.Float32()) // receiver = domain.NotificationRecieverSideAdmin // } // // Handle the wallet event: send notification // notification := &domain.Notification{ // RecipientID: walletEvent.UserID, // DeliveryChannel: domain.DeliveryChannelInApp, // Reciever: receiver, // Type: domain.NotificationTypeWalletUpdated, // DeliveryStatus: domain.DeliveryStatusPending, // IsRead: false, // Level: domain.NotificationLevelInfo, // Priority: 2, // Metadata: raw, // Payload: domain.NotificationPayload{ // Headline: headline, // Message: message, // }, // } // if err := s.SendNotification(ctx, notification); err != nil { // s.mongoLogger.Error("[NotificationSvc.KafkaConsumer] Failed to send notification", // zap.Error(err), // zap.Time("timestamp", time.Now()), // ) // } // } // }() // } // func (s *Service) UpdateLiveWalletMetricForWallet(ctx context.Context, wallet domain.Wallet) { // var ( // payload domain.LiveWalletMetrics // event map[string]interface{} // key = "live_metrics" // ) // // Try company first // company, companyErr := s.notificationStore.GetCompanyByWalletID(ctx, wallet.ID) // if companyErr == nil { // payload = domain.LiveWalletMetrics{ // Timestamp: time.Now(), // CompanyBalances: []domain.CompanyWalletBalance{{ // CompanyID: company.ID, // CompanyName: company.Name, // Balance: float64(wallet.Balance), // }}, // BranchBalances: []domain.BranchWalletBalance{}, // } // event = map[string]interface{}{ // "type": "LIVE_WALLET_METRICS_UPDATE", // "recipient_id": company.ID, // "recipient_type": "company", // "payload": payload, // } // } else { // // Try branch next // branch, branchErr := s.notificationStore.GetBranchByWalletID(ctx, wallet.ID) // if branchErr == nil { // payload = domain.LiveWalletMetrics{ // Timestamp: time.Now(), // CompanyBalances: []domain.CompanyWalletBalance{}, // BranchBalances: []domain.BranchWalletBalance{{ // BranchID: branch.ID, // BranchName: branch.Name, // CompanyID: branch.CompanyID, // Balance: float64(wallet.Balance), // }}, // } // event = map[string]interface{}{ // "type": "LIVE_WALLET_METRICS_UPDATE", // "recipient_id": branch.ID, // "recipient_type": "branch", // "payload": payload, // } // } else { // // Neither company nor branch matched this wallet // // s.logger.Warn("wallet not linked to any company or branch", "walletID", wallet.ID) // s.mongoLogger.Warn("wallet not linked to any company or branch", // zap.Int64("walletID", wallet.ID), // zap.Time("timestamp", time.Now()), // ) // return // } // } // // Save latest metric to Redis // if jsonBytes, err := json.Marshal(payload); err == nil { // s.redisClient.Set(ctx, key, jsonBytes, 0) // } else { // // s.logger.Error("failed to marshal wallet metrics payload", "walletID", wallet.ID, "err", err) // s.mongoLogger.Error("failed to marshal wallet metrics payload", // zap.Int64("walletID", wallet.ID), // zap.Error(err), // zap.Time("timestamp", time.Now()), // ) // } // // Publish via Redis // if jsonEvent, err := json.Marshal(event); err == nil { // s.redisClient.Publish(ctx, key, jsonEvent) // } else { // // s.logger.Error("failed to marshal event payload", "walletID", wallet.ID, "err", err) // s.mongoLogger.Error("failed to marshal event payload", // zap.Int64("walletID", wallet.ID), // zap.Error(err), // zap.Time("timestamp", time.Now()), // ) // } // // Broadcast over WebSocket // s.Hub.Broadcast <- event // }