Yimaru-BackEnd/internal/services/notification/service.go
Yared Yemane 4509fe2dc0 Initialize FCM client lazily during push send.
Add ensureFCMClient() so push APIs retry FCM initialization at request time and return actionable initialization errors when the service account key is empty or invalid.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-11 10:58:42 -07:00

1445 lines
44 KiB
Go

package notificationservice
import (
"Yimaru-Backend/internal/config"
"Yimaru-Backend/internal/domain"
"Yimaru-Backend/internal/pkgs/helpers"
"Yimaru-Backend/internal/ports"
"Yimaru-Backend/internal/services/messenger"
"Yimaru-Backend/internal/services/user"
"Yimaru-Backend/internal/web_server/ws"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strconv"
"strings"
// "errors"
"log/slog"
"sync"
"time"
// "github.com/segmentio/kafka-go"
firebase "firebase.google.com/go/v4"
"firebase.google.com/go/v4/messaging"
"github.com/gorilla/websocket"
"github.com/resend/resend-go/v2"
"go.uber.org/zap"
"google.golang.org/api/option"
// "github.com/redis/go-redis/v9"
)
type Service struct {
store ports.NotificationStore
Hub *ws.NotificationHub
connections sync.Map
notificationCh chan *domain.Notification
stopCh chan struct{}
config *config.Config
userSvc *user.Service
messengerSvc *messenger.Service
mongoLogger *zap.Logger
logger *slog.Logger
fcmClient *messaging.Client
}
func New(
store ports.NotificationStore,
mongoLogger *zap.Logger,
logger *slog.Logger,
cfg *config.Config,
messengerSvc *messenger.Service,
userSvc *user.Service,
) *Service {
hub := ws.NewNotificationHub()
svc := &Service{
store: store,
Hub: hub,
mongoLogger: mongoLogger,
logger: logger,
connections: sync.Map{},
notificationCh: make(chan *domain.Notification, 1000),
stopCh: make(chan struct{}),
messengerSvc: messengerSvc,
userSvc: userSvc,
config: cfg,
}
mongoLogger.Info("FCM_SERVICE_ACCOUNT_KEY value at startup",
zap.String("fcm_service_account_key", cfg.FCMServiceAccountKey),
)
// Initialize FCM client if service account key is provided
if cfg.FCMServiceAccountKey != "" {
if err := svc.initFCMClient(); err != nil {
mongoLogger.Error("Failed to initialize FCM client", zap.Error(err))
}
}
go hub.Run()
go svc.startWorker()
go svc.startSchedulerWorker()
// go svc.startRetryWorker()
// go svc.RunRedisSubscriber(context.Background())
// go svc.StartKafkaConsumer(context.Background())
return svc
}
func (s *Service) initFCMClient() error {
ctx := context.Background()
// Prepare client options; if a service account JSON string is provided, use it.
var opts []option.ClientOption
if s.config.FCMServiceAccountKey != "" {
opts = append(opts, option.WithCredentialsJSON([]byte(s.config.FCMServiceAccountKey)))
}
// Initialize Firebase app
app, err := firebase.NewApp(ctx, nil, opts...)
if err != nil {
return fmt.Errorf("failed to initialize Firebase app: %w", err)
}
// Get messaging client
client, err := app.Messaging(ctx)
if err != nil {
return fmt.Errorf("failed to get FCM client: %w", err)
}
s.fcmClient = client
return nil
}
func (s *Service) ensureFCMClient() error {
if s.fcmClient != nil {
return nil
}
if strings.TrimSpace(s.config.FCMServiceAccountKey) == "" {
return fmt.Errorf("FCM_SERVICE_ACCOUNT_KEY is empty")
}
if err := s.initFCMClient(); err != nil {
return fmt.Errorf("failed to initialize FCM client: %w", err)
}
return nil
}
func (s *Service) SendAfroMessageSMSTemp(
ctx context.Context,
receiverPhone string,
message string,
callbackURL *string, // optional
) error {
baseURL := s.config.AFROSMSConfig.AfroSMSBaseURL
// Build query parameters explicitly
params := url.Values{}
params.Set("to", receiverPhone)
params.Set("message", message)
params.Set("sender", s.config.AFRO_SMS_SENDER_NAME)
// Optional parameters
if s.config.AFROSMSConfig.AfroSMSIdentifierID != "" {
params.Set("from", s.config.AFROSMSConfig.AfroSMSIdentifierID)
}
if callbackURL != nil {
params.Set("callback", *callbackURL)
}
// Construct full URL
reqURL := fmt.Sprintf("%s/api/send?%s", baseURL, params.Encode())
req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqURL, nil)
if err != nil {
return err
}
// AfroMessage authentication (API key)
req.Header.Set("Authorization", "Bearer "+s.config.AFRO_SMS_API_KEY)
req.Header.Set("Accept", "application/json")
client := &http.Client{
Timeout: 10 * time.Second,
}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf(
"afromessage sms failed: status=%d response=%s",
resp.StatusCode,
string(body),
)
}
// Parse response
var result map[string]interface{}
if err := json.Unmarshal(body, &result); err != nil {
return err
}
ack, ok := result["acknowledge"].(string)
if !ok || ack != "success" {
return fmt.Errorf("sms delivery failed: %v", result)
}
return nil
}
func (s *Service) addConnection(recipientID int64, c *websocket.Conn) error {
if c == nil {
s.mongoLogger.Warn("[NotificationSvc.AddConnection] Attempted to add nil WebSocket connection",
zap.Int64("recipientID", recipientID),
zap.Time("timestamp", time.Now()),
)
return fmt.Errorf("Invalid Websocket Connection")
}
s.connections.Store(recipientID, c)
s.mongoLogger.Info("[NotificationSvc.AddConnection] Added WebSocket connection",
zap.Int64("recipientID", recipientID),
zap.Time("timestamp", time.Now()),
)
return nil
}
func (s *Service) GetFilteredNotifications(ctx context.Context, filter domain.NotificationFilter) ([]domain.Notification, int64, error) {
return s.store.GetFilteredNotifications(ctx, filter)
}
// RecordNotification saves a notification to the database for history/audit without dispatching to the worker.
func (s *Service) RecordNotification(ctx context.Context, recipientID int64, notifType domain.NotificationType, channel domain.DeliveryChannel, level domain.NotificationLevel, headline, message string) {
notification := &domain.Notification{
ID: helpers.GenerateID(),
RecipientID: recipientID,
Type: notifType,
Level: level,
DeliveryChannel: channel,
DeliveryStatus: domain.DeliveryStatusSent,
Payload: domain.NotificationPayload{
Headline: headline,
Message: message,
},
Timestamp: time.Now(),
}
if _, err := s.store.CreateNotification(ctx, notification); err != nil {
s.mongoLogger.Error("[NotificationSvc.RecordNotification] Failed to record notification",
zap.Int64("recipientID", recipientID),
zap.String("channel", string(channel)),
zap.Error(err),
)
}
}
func (s *Service) SendNotification(ctx context.Context, notification *domain.Notification) error {
notification.ID = helpers.GenerateID()
notification.Timestamp = time.Now()
notification.DeliveryStatus = domain.DeliveryStatusPending
created, err := s.store.CreateNotification(ctx, notification)
if err != nil {
s.mongoLogger.Error("[NotificationSvc.SendNotification] Failed to create notification",
zap.String("id", notification.ID),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return err
}
notification = created
if notification.DeliveryChannel == domain.DeliveryChannelInApp {
s.Hub.Broadcast <- map[string]interface{}{
"type": "CREATED_NOTIFICATION",
"recipient_id": notification.RecipientID,
"payload": notification,
}
}
select {
case s.notificationCh <- notification:
default:
s.mongoLogger.Warn("[NotificationSvc.SendNotification] Notification channel full, dropping notification",
zap.String("id", notification.ID),
zap.Time("timestamp", time.Now()),
)
}
return nil
}
// func (s *Service) MarkAsRead(ctx context.Context, notificationIDs []string, recipientID int64) error {
// for _, notificationID := range notificationIDs {
// _, err := s.store.UpdateNotificationStatus(ctx, notificationID, string(domain.DeliveryStatusSent), true, nil)
// if err != nil {
// s.mongoLogger.Error("[NotificationSvc.MarkAsRead] Failed to mark notification as read",
// zap.String("notificationID", notificationID),
// zap.Int64("recipientID", recipientID),
// zap.Error(err),
// zap.Time("timestamp", time.Now()),
// )
// return err
// }
// // count, err := s.store.CountUnreadNotifications(ctx, recipientID)
// // if err != nil {
// // s.logger.Error("[NotificationSvc.MarkAsRead] Failed to count unread notifications", "recipientID", recipientID, "error", err)
// // return err
// // }
// // s.Hub.Broadcast <- map[string]interface{}{
// // "type": "COUNT_NOT_OPENED_NOTIFICATION",
// // "recipient_id": recipientID,
// // "payload": map[string]int{
// // "not_opened_notifications_count": int(count),
// // },
// // }
// s.mongoLogger.Info("[NotificationSvc.MarkAsRead] Notification marked as read",
// zap.String("notificationID", notificationID),
// zap.Int64("recipientID", recipientID),
// zap.Time("timestamp", time.Now()),
// )
// }
// return nil
// }
func (s *Service) GetUserNotifications(ctx context.Context, recipientID int64, limit, offset int) ([]domain.Notification, int64, error) {
notifications, total, err := s.store.GetUserNotifications(ctx, recipientID, limit, offset)
if err != nil {
s.mongoLogger.Error("[NotificationSvc.GetUserNotifications] Failed to list notifications",
zap.Int64("recipientID", recipientID),
zap.Int("limit", limit),
zap.Int("offset", offset),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return nil, 0, err
}
s.mongoLogger.Info("[NotificationSvc.GetUserNotifications] Successfully listed notifications",
zap.Int64("recipientID", recipientID),
zap.Int("count", len(notifications)),
zap.Int64("total", total),
zap.Time("timestamp", time.Now()),
)
return notifications, total, nil
}
func (s *Service) GetAllNotifications(ctx context.Context, limit, offset int) ([]domain.Notification, error) {
notifications, err := s.store.GetAllNotifications(ctx, limit, offset)
if err != nil {
s.mongoLogger.Error("[NotificationSvc.ListNotifications] Failed to get all notifications",
zap.Int("limit", limit),
zap.Int("offset", offset),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return nil, err
}
s.mongoLogger.Info("[NotificationSvc.ListNotifications] Successfully retrieved all notifications",
zap.Int("count", len(notifications)),
zap.Time("timestamp", time.Now()),
)
return notifications, nil
}
func (s *Service) ConnectWebSocket(ctx context.Context, recipientID int64, c *websocket.Conn) error {
err := s.addConnection(recipientID, c)
if err != nil {
s.mongoLogger.Error("[NotificationSvc.ConnectWebSocket] Failed to create WebSocket connection",
zap.Int64("recipientID", recipientID),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return err
}
s.mongoLogger.Info("[NotificationSvc.ConnectWebSocket] WebSocket connection established",
zap.Int64("recipientID", recipientID),
zap.Time("timestamp", time.Now()),
)
return nil
}
func (s *Service) DisconnectWebSocket(recipientID int64) {
if conn, loaded := s.connections.LoadAndDelete(recipientID); loaded {
conn.(*websocket.Conn).Close()
// s.logger.Info("[NotificationSvc.DisconnectWebSocket] Disconnected WebSocket", "recipientID", recipientID)
s.mongoLogger.Info("[NotificationSvc.DisconnectWebSocket] Disconnected WebSocket",
zap.Int64("recipientID", recipientID),
zap.Time("timestamp", time.Now()),
)
}
}
// func (s *Service) SendSMS(ctx context.Context, recipientID int64, message string) error {
// s.logger.Info("[NotificationSvc.SendSMS] SMS notification requested", "recipientID", recipientID, "message", message)
// apiKey := s.config.AFRO_SMS_API_KEY
// senderName := s.config.AFRO_SMS_SENDER_NAME
// receiverPhone := s.config.AFRO_SMS_RECEIVER_PHONE_NUMBER
// hostURL := s.config.AFRO_SMS_HOST_URL
// endpoint := "/api/send"
// request := afro.GetRequest(apiKey, endpoint, hostURL)
// request.Method = "GET"
// request.Sender(senderName)
// request.To(receiverPhone, message)
// response, err := afro.MakeRequestWithContext(ctx, request)
// if err != nil {
// s.logger.Error("[NotificationSvc.SendSMS] Failed to send SMS", "recipientID", recipientID, "error", err)
// return err
// }
// if response["acknowledge"] == "success" {
// s.logger.Info("[NotificationSvc.SendSMS] SMS sent successfully", "recipientID", recipientID)
// } else {
// s.logger.Error("[NotificationSvc.SendSMS] Failed to send SMS", "recipientID", recipientID, "response", response["response"])
// return errors.New("SMS delivery failed: " + response["response"].(string))
// }
// return nil
// }
// func (s *Service) SendEmail(ctx context.Context, recipientID int64, subject, message string) error {
// s.logger.Info("[NotificationSvc.SendEmail] Email notification requested", "recipientID", recipientID, "subject", subject)
// return nil
// }
func (s *Service) startWorker() {
for {
select {
case notification := <-s.notificationCh:
s.handleNotification(notification)
case <-s.stopCh:
s.mongoLogger.Info("[NotificationSvc.StartWorker] Worker stopped",
zap.Time("timestamp", time.Now()),
)
return
}
}
}
// func (s *Service) ListRecipientIDs(ctx context.Context, receiver domain.NotificationRecieverSide) ([]int64, error) {
// return s.store.ListRecipientIDs(ctx, receiver)
// }
func (s *Service) handleNotification(notification *domain.Notification) {
ctx := context.Background()
switch notification.DeliveryChannel {
case domain.DeliveryChannelSMS:
err := s.SendNotificationSMS(ctx, notification.RecipientID, notification.Payload.Message)
if err != nil {
notification.DeliveryStatus = domain.DeliveryStatusFailed
} else {
notification.DeliveryStatus = domain.DeliveryStatusSent
}
case domain.DeliveryChannelEmail:
err := s.SendNotificationEmail(ctx, notification.RecipientID, notification.Payload.Message, notification.Payload.Headline)
if err != nil {
notification.DeliveryStatus = domain.DeliveryStatusFailed
} else {
notification.DeliveryStatus = domain.DeliveryStatusSent
}
case domain.DeliveryChannelPush:
err := s.SendPushNotification(ctx, notification)
if err != nil {
notification.DeliveryStatus = domain.DeliveryStatusFailed
} else {
notification.DeliveryStatus = domain.DeliveryStatusSent
}
case domain.DeliveryChannelInApp:
notification.DeliveryStatus = domain.DeliveryStatusSent
default:
s.mongoLogger.Warn("[NotificationSvc.HandleNotification] Unsupported delivery channel",
zap.String("channel", string(notification.DeliveryChannel)),
zap.Time("timestamp", time.Now()),
)
notification.DeliveryStatus = domain.DeliveryStatusFailed
}
// if _, err := s.store.UpdateNotificationStatus(ctx, notification.ID, string(notification.DeliveryStatus), notification.IsRead, notification.Metadata); err != nil {
// s.mongoLogger.Error("[NotificationSvc.HandleNotification] Failed to update notification status",
// zap.String("id", notification.ID),
// zap.Error(err),
// zap.Time("timestamp", time.Now()),
// )
// }
}
func (s *Service) SendNotificationSMS(ctx context.Context, recipientID int64, message string) error {
// Get User Phone Number
user, err := s.userSvc.GetUserByID(ctx, recipientID)
if err != nil {
return err
}
if !user.PhoneVerified {
return fmt.Errorf("cannot send notification to unverified phone number")
}
if user.PhoneNumber == "" {
return fmt.Errorf("phone Number is invalid")
}
err = s.messengerSvc.SendSMS(ctx, user.PhoneNumber, message)
if err != nil {
s.mongoLogger.Error("[NotificationSvc.HandleNotification] Failed to send notification SMS",
zap.Int64("recipient_id", recipientID),
zap.String("user_phone_number", user.PhoneNumber),
zap.String("message", message),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return err
}
return nil
}
func (s *Service) SendNotificationEmail(ctx context.Context, recipientID int64, message string, subject string) error {
// Get User Phone Number
user, err := s.userSvc.GetUserByID(ctx, recipientID)
if err != nil {
return err
}
if !user.EmailVerified {
return fmt.Errorf("cannot send notification to unverified email")
}
if user.Email == "" {
return fmt.Errorf("email is invalid")
}
err = s.messengerSvc.SendEmail(ctx, user.Email, message, message, subject)
if err != nil {
s.mongoLogger.Error("[NotificationSvc.HandleNotification] Failed to send notification SMS",
zap.Int64("recipient_id", recipientID),
zap.String("user_email", user.Email),
zap.String("message", message),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return err
}
return nil
}
func (s *Service) SendPushNotification(ctx context.Context, notification *domain.Notification) error {
if err := s.ensureFCMClient(); err != nil {
return err
}
// Get user device tokens
tokens, err := s.userSvc.GetUserDeviceTokens(ctx, notification.RecipientID)
if err != nil {
return fmt.Errorf("failed to get user device tokens: %w", err)
}
if len(tokens) == 0 {
s.mongoLogger.Info("[NotificationSvc.SendPushNotification] No device tokens found for user",
zap.Int64("userID", notification.RecipientID),
zap.Time("timestamp", time.Now()),
)
return nil // Not an error, just no devices to send to
}
// Create FCM message
message := &messaging.Message{
Notification: &messaging.Notification{
Title: notification.Payload.Headline,
Body: notification.Payload.Message,
ImageURL: notification.Image,
},
Data: map[string]string{
"type": string(notification.Type),
"recipient_id": strconv.FormatInt(notification.RecipientID, 10),
"notification_id": notification.ID,
},
}
// Send to all user devices
for _, token := range tokens {
message.Token = token
response, err := s.fcmClient.Send(ctx, message)
if err != nil {
s.mongoLogger.Error("[NotificationSvc.SendPushNotification] Failed to send FCM message",
zap.String("token", token),
zap.Int64("userID", notification.RecipientID),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
// Check if token is invalid/unregistered and deactivate it
if messaging.IsUnregistered(err) || messaging.IsInvalidArgument(err) {
if deactivateErr := s.userSvc.DeactivateDevice(ctx, notification.RecipientID, token); deactivateErr != nil {
s.mongoLogger.Warn("[NotificationSvc.SendPushNotification] Failed to deactivate invalid token",
zap.String("token", token),
zap.Int64("userID", notification.RecipientID),
zap.Error(deactivateErr),
zap.Time("timestamp", time.Now()),
)
} else {
s.mongoLogger.Info("[NotificationSvc.SendPushNotification] Deactivated invalid FCM token",
zap.String("token", token),
zap.Int64("userID", notification.RecipientID),
zap.Time("timestamp", time.Now()),
)
}
}
continue // Continue with other tokens
}
s.mongoLogger.Info("[NotificationSvc.SendPushNotification] FCM message sent successfully",
zap.String("response", response),
zap.String("token", token),
zap.Int64("userID", notification.RecipientID),
zap.Time("timestamp", time.Now()),
)
}
return nil
}
// SendBulkPushNotification sends a push notification to multiple users using FCM multicast.
// It collects all device tokens for the given user IDs and sends in batches of 500 (FCM limit).
func (s *Service) MessengerSvc() *messenger.Service {
return s.messengerSvc
}
func (s *Service) SendBulkPushNotification(ctx context.Context, userIDs []int64, notification *domain.Notification) (sent int, failed int, err error) {
if err := s.ensureFCMClient(); err != nil {
return 0, 0, err
}
// Collect all device tokens for the given users
var allTokens []string
tokenUserMap := make(map[string]int64) // token -> userID for cleanup
for _, uid := range userIDs {
tokens, err := s.userSvc.GetUserDeviceTokens(ctx, uid)
if err != nil {
s.mongoLogger.Warn("[NotificationSvc.SendBulkPushNotification] Failed to get tokens for user",
zap.Int64("userID", uid),
zap.Error(err),
)
continue
}
for _, t := range tokens {
tokenUserMap[t] = uid
}
allTokens = append(allTokens, tokens...)
}
if len(allTokens) == 0 {
s.mongoLogger.Info("[NotificationSvc.SendBulkPushNotification] No device tokens found for any user",
zap.Int("userCount", len(userIDs)),
)
return 0, 0, nil
}
fcmNotification := &messaging.Notification{
Title: notification.Payload.Headline,
Body: notification.Payload.Message,
ImageURL: notification.Image,
}
data := map[string]string{
"type": string(notification.Type),
"notification_id": notification.ID,
}
// FCM multicast supports max 500 tokens per batch
const batchSize = 500
for i := 0; i < len(allTokens); i += batchSize {
end := i + batchSize
if end > len(allTokens) {
end = len(allTokens)
}
batch := allTokens[i:end]
msg := &messaging.MulticastMessage{
Notification: fcmNotification,
Data: data,
Tokens: batch,
}
resp, err := s.fcmClient.SendEachForMulticast(ctx, msg)
if err != nil {
s.mongoLogger.Error("[NotificationSvc.SendBulkPushNotification] Multicast send failed",
zap.Error(err),
zap.Int("batchSize", len(batch)),
)
failed += len(batch)
continue
}
sent += resp.SuccessCount
failed += resp.FailureCount
// Deactivate invalid tokens
for j, sendResp := range resp.Responses {
if sendResp.Error != nil && (messaging.IsUnregistered(sendResp.Error) || messaging.IsInvalidArgument(sendResp.Error)) {
token := batch[j]
if uid, ok := tokenUserMap[token]; ok {
_ = s.userSvc.DeactivateDevice(ctx, uid, token)
}
}
}
}
s.mongoLogger.Info("[NotificationSvc.SendBulkPushNotification] Bulk push completed",
zap.Int("totalTokens", len(allTokens)),
zap.Int("sent", sent),
zap.Int("failed", failed),
)
return sent, failed, nil
}
// SendBulkSMS sends an SMS to multiple phone numbers using AfroMessage.
// It sends sequentially and returns the count of successful and failed deliveries.
func (s *Service) SendBulkSMS(ctx context.Context, recipients []string, message string) (sent int, failed int) {
for _, phone := range recipients {
if err := s.SendAfroMessageSMSTemp(ctx, phone, message, nil); err != nil {
s.mongoLogger.Error("[NotificationSvc.SendBulkSMS] Failed to send SMS",
zap.String("phone", phone),
zap.Error(err),
)
failed++
continue
}
sent++
}
s.mongoLogger.Info("[NotificationSvc.SendBulkSMS] Bulk SMS completed",
zap.Int("totalRecipients", len(recipients)),
zap.Int("sent", sent),
zap.Int("failed", failed),
)
return sent, failed
}
// SendBulkEmail sends an email to multiple recipients using the messenger service.
// It sends sequentially and returns the count of successful and failed deliveries.
func (s *Service) SendBulkEmail(ctx context.Context, recipients []string, subject, message, messageHTML string, attachments []*resend.Attachment) (sent int, failed int) {
for _, email := range recipients {
if err := s.messengerSvc.SendEmailWithAttachments(ctx, email, message, messageHTML, subject, attachments); err != nil {
s.mongoLogger.Error("[NotificationSvc.SendBulkEmail] Failed to send email",
zap.String("email", email),
zap.Error(err),
)
failed++
continue
}
sent++
}
s.mongoLogger.Info("[NotificationSvc.SendBulkEmail] Bulk email completed",
zap.Int("totalRecipients", len(recipients)),
zap.Int("sent", sent),
zap.Int("failed", failed),
)
return sent, failed
}
// func (s *Service) startRetryWorker() {
// ticker := time.NewTicker(1 * time.Minute)
// defer ticker.Stop()
// for {
// select {
// case <-ticker.C:
// s.retryFailedNotifications()
// case <-s.stopCh:
// s.mongoLogger.Info("[NotificationSvc.StartRetryWorker] Retry worker stopped",
// zap.Time("timestamp", time.Now()),
// )
// return
// }
// }
// }
// func (s *Service) retryFailedNotifications() {
// ctx := context.Background()
// failedNotifications, err := s.store.ListFailedNotifications(ctx, 100)
// if err != nil {
// s.mongoLogger.Error("[NotificationSvc.RetryFailedNotifications] Failed to list failed notifications",
// zap.Error(err),
// zap.Time("timestamp", time.Now()),
// )
// return
// }
// for _, n := range failedNotifications {
// notification := &n
// go func(notification *domain.Notification) {
// for attempt := 0; attempt < 3; attempt++ {
// time.Sleep(time.Duration(attempt) * time.Second)
// switch notification.DeliveryChannel {
// case domain.DeliveryChannelSMS:
// if err := s.SendNotificationSMS(ctx, notification.RecipientID, notification.Payload.Message); err == nil {
// notification.DeliveryStatus = domain.DeliveryStatusSent
// if _, err := s.store.UpdateNotificationStatus(ctx, notification.ID, string(notification.DeliveryStatus), notification.IsRead, notification.Metadata); err != nil {
// s.mongoLogger.Error("[NotificationSvc.RetryFailedNotifications] Failed to update after retry",
// zap.String("id", notification.ID),
// zap.Error(err),
// zap.Time("timestamp", time.Now()),
// )
// } else {
// s.mongoLogger.Info("[NotificationSvc.RetryFailedNotifications] Successfully retried notification",
// zap.String("id", notification.ID),
// zap.Time("timestamp", time.Now()),
// )
// }
// return
// }
// case domain.DeliveryChannelEmail:
// if err := s.SendNotificationEmail(ctx, notification.RecipientID, notification.Payload.Message, notification.Payload.Headline); err == nil {
// notification.DeliveryStatus = domain.DeliveryStatusSent
// if _, err := s.store.UpdateNotificationStatus(ctx, notification.ID, string(notification.DeliveryStatus), notification.IsRead, notification.Metadata); err != nil {
// s.mongoLogger.Error("[NotificationSvc.RetryFailedNotifications] Failed to update after retry",
// zap.String("id", notification.ID),
// zap.Error(err),
// zap.Time("timestamp", time.Now()),
// )
// } else {
// s.mongoLogger.Info("[NotificationSvc.RetryFailedNotifications] Successfully retried notification",
// zap.String("id", notification.ID),
// zap.Time("timestamp", time.Now()),
// )
// }
// return
// }
// }
// }
// s.mongoLogger.Error("[NotificationSvc.RetryFailedNotifications] Max retries reached for notification",
// zap.String("id", notification.ID),
// zap.Error(err),
// zap.Time("timestamp", time.Now()),
// )
// }(notification)
// }
// }
func (s *Service) CountUnreadNotifications(ctx context.Context, recipient_id int64) (int64, error) {
return s.store.CountUnreadNotifications(ctx, recipient_id)
}
func (s *Service) MarkNotificationAsRead(ctx context.Context, id int64) (*domain.Notification, error) {
notification, err := s.store.MarkNotificationAsRead(ctx, id)
if err != nil {
s.mongoLogger.Error("[NotificationSvc.MarkNotificationAsRead] Failed to mark notification as read",
zap.Int64("notificationID", id),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return nil, err
}
return notification, nil
}
func (s *Service) MarkAllUserNotificationsAsRead(ctx context.Context, userID int64) error {
err := s.store.MarkAllUserNotificationsAsRead(ctx, userID)
if err != nil {
s.mongoLogger.Error("[NotificationSvc.MarkAllUserNotificationsAsRead] Failed to mark all notifications as read",
zap.Int64("userID", userID),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return err
}
return nil
}
func (s *Service) MarkNotificationAsUnread(ctx context.Context, id int64) (*domain.Notification, error) {
notification, err := s.store.MarkNotificationAsUnread(ctx, id)
if err != nil {
s.mongoLogger.Error("[NotificationSvc.MarkNotificationAsUnread] Failed to mark notification as unread",
zap.Int64("notificationID", id),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return nil, err
}
return notification, nil
}
func (s *Service) MarkAllUserNotificationsAsUnread(ctx context.Context, userID int64) error {
err := s.store.MarkAllUserNotificationsAsUnread(ctx, userID)
if err != nil {
s.mongoLogger.Error("[NotificationSvc.MarkAllUserNotificationsAsUnread] Failed to mark all notifications as unread",
zap.Int64("userID", userID),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return err
}
return nil
}
func (s *Service) DeleteUserNotifications(ctx context.Context, userID int64) error {
err := s.store.DeleteUserNotifications(ctx, userID)
if err != nil {
s.mongoLogger.Error("[NotificationSvc.DeleteUserNotifications] Failed to delete user notifications",
zap.Int64("userID", userID),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return err
}
return nil
}
// Scheduled Notification Methods
func (s *Service) CreateScheduledNotification(ctx context.Context, sn *domain.ScheduledNotification) (*domain.ScheduledNotification, error) {
created, err := s.store.CreateScheduledNotification(ctx, sn)
if err != nil {
s.mongoLogger.Error("[NotificationSvc.CreateScheduledNotification] Failed to create",
zap.String("channel", string(sn.Channel)),
zap.Error(err),
)
return nil, err
}
s.mongoLogger.Info("[NotificationSvc.CreateScheduledNotification] Created",
zap.Int64("id", created.ID),
zap.String("channel", string(created.Channel)),
zap.Time("scheduledAt", created.ScheduledAt),
)
return created, nil
}
func (s *Service) GetScheduledNotification(ctx context.Context, id int64) (*domain.ScheduledNotification, error) {
return s.store.GetScheduledNotification(ctx, id)
}
func (s *Service) ListScheduledNotifications(ctx context.Context, filter domain.ScheduledNotificationFilter) ([]domain.ScheduledNotification, int64, error) {
return s.store.ListScheduledNotifications(ctx, filter)
}
func (s *Service) CancelScheduledNotification(ctx context.Context, id int64) (*domain.ScheduledNotification, error) {
cancelled, err := s.store.CancelScheduledNotification(ctx, id)
if err != nil {
s.mongoLogger.Error("[NotificationSvc.CancelScheduledNotification] Failed to cancel",
zap.Int64("id", id),
zap.Error(err),
)
return nil, err
}
s.mongoLogger.Info("[NotificationSvc.CancelScheduledNotification] Cancelled",
zap.Int64("id", cancelled.ID),
)
return cancelled, nil
}
func (s *Service) startSchedulerWorker() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.processDueScheduledNotifications()
case <-s.stopCh:
s.mongoLogger.Info("[NotificationSvc.SchedulerWorker] Stopped")
return
}
}
}
func (s *Service) processDueScheduledNotifications() {
ctx := context.Background()
claimed, err := s.store.ClaimDueScheduledNotifications(ctx, 20)
if err != nil {
s.mongoLogger.Error("[NotificationSvc.SchedulerWorker] Failed to claim due notifications",
zap.Error(err),
)
return
}
for i := range claimed {
sn := &claimed[i]
go s.dispatchScheduledNotification(ctx, sn)
}
}
func (s *Service) dispatchScheduledNotification(ctx context.Context, sn *domain.ScheduledNotification) {
var dispatchErr error
switch sn.Channel {
case domain.DeliveryChannelSMS:
dispatchErr = s.dispatchScheduledSMS(ctx, sn)
case domain.DeliveryChannelEmail:
dispatchErr = s.dispatchScheduledEmail(ctx, sn)
case domain.DeliveryChannelPush:
dispatchErr = s.dispatchScheduledPush(ctx, sn)
default:
dispatchErr = fmt.Errorf("unsupported channel: %s", sn.Channel)
}
if dispatchErr != nil {
s.mongoLogger.Error("[NotificationSvc.SchedulerWorker] Failed to dispatch",
zap.Int64("id", sn.ID),
zap.String("channel", string(sn.Channel)),
zap.Error(dispatchErr),
)
_ = s.store.MarkScheduledNotificationFailed(ctx, sn.ID, dispatchErr.Error())
return
}
_ = s.store.MarkScheduledNotificationSent(ctx, sn.ID)
s.mongoLogger.Info("[NotificationSvc.SchedulerWorker] Dispatched",
zap.Int64("id", sn.ID),
zap.String("channel", string(sn.Channel)),
)
}
func (s *Service) resolvePhoneNumbers(ctx context.Context, sn *domain.ScheduledNotification) []string {
phoneSet := make(map[string]struct{})
userIDs := sn.TargetUserIDs
if len(userIDs) == 0 && sn.TargetRole != "" {
users, _, err := s.userSvc.GetAllUsers(ctx, domain.UserFilter{Role: sn.TargetRole})
if err == nil {
for _, u := range users {
userIDs = append(userIDs, u.ID)
}
}
}
for _, uid := range userIDs {
user, err := s.userSvc.GetUserByID(ctx, uid)
if err == nil && user.PhoneNumber != "" {
phoneSet[user.PhoneNumber] = struct{}{}
}
}
if len(sn.TargetRaw) > 0 {
var raw domain.ScheduledNotificationTargetRaw
if err := json.Unmarshal(sn.TargetRaw, &raw); err == nil {
for _, p := range raw.Phones {
phoneSet[p] = struct{}{}
}
}
}
phones := make([]string, 0, len(phoneSet))
for p := range phoneSet {
phones = append(phones, p)
}
return phones
}
func (s *Service) resolveEmails(ctx context.Context, sn *domain.ScheduledNotification) []string {
emailSet := make(map[string]struct{})
userIDs := sn.TargetUserIDs
if len(userIDs) == 0 && sn.TargetRole != "" {
users, _, err := s.userSvc.GetAllUsers(ctx, domain.UserFilter{Role: sn.TargetRole})
if err == nil {
for _, u := range users {
userIDs = append(userIDs, u.ID)
}
}
}
for _, uid := range userIDs {
user, err := s.userSvc.GetUserByID(ctx, uid)
if err == nil && user.Email != "" {
emailSet[user.Email] = struct{}{}
}
}
if len(sn.TargetRaw) > 0 {
var raw domain.ScheduledNotificationTargetRaw
if err := json.Unmarshal(sn.TargetRaw, &raw); err == nil {
for _, e := range raw.Emails {
emailSet[e] = struct{}{}
}
}
}
emails := make([]string, 0, len(emailSet))
for e := range emailSet {
emails = append(emails, e)
}
return emails
}
func (s *Service) resolveUserIDs(ctx context.Context, sn *domain.ScheduledNotification) []int64 {
if len(sn.TargetUserIDs) > 0 {
return sn.TargetUserIDs
}
if sn.TargetRole != "" {
users, _, err := s.userSvc.GetAllUsers(ctx, domain.UserFilter{Role: sn.TargetRole})
if err == nil {
ids := make([]int64, 0, len(users))
for _, u := range users {
ids = append(ids, u.ID)
}
return ids
}
}
return nil
}
func (s *Service) dispatchScheduledSMS(ctx context.Context, sn *domain.ScheduledNotification) error {
phones := s.resolvePhoneNumbers(ctx, sn)
if len(phones) == 0 {
return fmt.Errorf("no SMS recipients resolved")
}
sent, failed := s.SendBulkSMS(ctx, phones, sn.Message)
if sent == 0 && failed > 0 {
return fmt.Errorf("all %d SMS deliveries failed", failed)
}
return nil
}
func (s *Service) dispatchScheduledEmail(ctx context.Context, sn *domain.ScheduledNotification) error {
emails := s.resolveEmails(ctx, sn)
if len(emails) == 0 {
return fmt.Errorf("no email recipients resolved")
}
sent, failed := s.SendBulkEmail(ctx, emails, sn.Title, sn.Message, sn.HTML, nil)
if sent == 0 && failed > 0 {
return fmt.Errorf("all %d email deliveries failed", failed)
}
return nil
}
func (s *Service) dispatchScheduledPush(ctx context.Context, sn *domain.ScheduledNotification) error {
userIDs := s.resolveUserIDs(ctx, sn)
if len(userIDs) == 0 {
return fmt.Errorf("no push recipients resolved")
}
notification := &domain.Notification{
Type: domain.NOTIFICATION_TYPE_SYSTEM_ALERT,
DeliveryChannel: domain.DeliveryChannelPush,
Payload: domain.NotificationPayload{
Headline: sn.Title,
Message: sn.Message,
},
}
sent, failed, err := s.SendBulkPushNotification(ctx, userIDs, notification)
if err != nil {
return err
}
if sent == 0 && failed > 0 {
return fmt.Errorf("all %d push deliveries failed", failed)
}
return nil
}
// func (s *Service) DeleteOldNotifications(ctx context.Context) error {
// return s.store.DeleteOldNotifications(ctx)
// }
// func (s *Service) GetNotificationCounts(ctx context.Context, filter domain.ReportFilter) (total, read, unread int64, err error){
// return s.store.Get(ctx, filter)
// }
// func (s *Service) RunRedisSubscriber(ctx context.Context) {
// pubsub := s.redisClient.Subscribe(ctx, "live_metrics")
// defer pubsub.Close()
// ch := pubsub.Channel()
// for msg := range ch {
// var parsed map[string]interface{}
// if err := json.Unmarshal([]byte(msg.Payload), &parsed); err != nil {
// // s.logger.Error("invalid Redis message format", "payload", msg.Payload, "error", err)
// s.mongoLogger.Error("invalid Redis message format",
// zap.String("payload", msg.Payload),
// zap.Error(err),
// zap.Time("timestamp", time.Now()),
// )
// continue
// }
// eventType, _ := parsed["type"].(string)
// payload := parsed["payload"]
// recipientID, hasRecipient := parsed["recipient_id"]
// recipientType, _ := parsed["recipient_type"].(string)
// message := map[string]interface{}{
// "type": eventType,
// "payload": payload,
// }
// if hasRecipient {
// message["recipient_id"] = recipientID
// message["recipient_type"] = recipientType
// }
// s.Hub.Broadcast <- message
// }
// }
// func (s *Service) UpdateLiveWalletMetrics(ctx context.Context, companies []domain.GetCompany, branches []domain.BranchWallet) error {
// const key = "live_metrics"
// companyBalances := make([]domain.CompanyWalletBalance, 0, len(companies))
// for _, c := range companies {
// companyBalances = append(companyBalances, domain.CompanyWalletBalance{
// CompanyID: c.ID,
// CompanyName: c.Name,
// Balance: float64(c.WalletBalance.Float32()),
// })
// }
// branchBalances := make([]domain.BranchWalletBalance, 0, len(branches))
// for _, b := range branches {
// branchBalances = append(branchBalances, domain.BranchWalletBalance{
// BranchID: b.ID,
// BranchName: b.Name,
// CompanyID: b.CompanyID,
// Balance: float64(b.Balance.Float32()),
// })
// }
// payload := domain.LiveWalletMetrics{
// Timestamp: time.Now(),
// CompanyBalances: companyBalances,
// BranchBalances: branchBalances,
// }
// updatedData, err := json.Marshal(payload)
// if err != nil {
// return err
// }
// if err := s.redisClient.Set(ctx, key, updatedData, 0).Err(); err != nil {
// return err
// }
// if err := s.redisClient.Publish(ctx, key, updatedData).Err(); err != nil {
// return err
// }
// return nil
// }
// func (s *Service) GetLiveMetrics(ctx context.Context) (domain.LiveMetric, error) {
// const key = "live_metrics"
// var metric domain.LiveMetric
// val, err := s.redisClient.Get(ctx, key).Result()
// if err == redis.Nil {
// // Key does not exist yet, return zero-valued struct
// return domain.LiveMetric{}, nil
// } else if err != nil {
// return domain.LiveMetric{}, err
// }
// if err := json.Unmarshal([]byte(val), &metric); err != nil {
// return domain.LiveMetric{}, err
// }
// return metric, nil
// }
// func (s *Service) StartKafkaConsumer(ctx context.Context) {
// go func() {
// for {
// m, err := s.reader.ReadMessage(ctx)
// if err != nil {
// if err == context.Canceled {
// s.mongoLogger.Info("[NotificationSvc.KafkaConsumer] Stopped by context")
// return
// }
// s.mongoLogger.Error("[NotificationSvc.KafkaConsumer] Error reading message",
// zap.Error(err),
// zap.Time("timestamp", time.Now()),
// )
// time.Sleep(1 * time.Second) // backoff
// continue
// }
// var walletEvent event.WalletEvent
// if err := json.Unmarshal(m.Value, &walletEvent); err != nil {
// s.mongoLogger.Error("[NotificationSvc.KafkaConsumer] Failed to unmarshal wallet event",
// zap.String("message", string(m.Value)),
// zap.Error(err),
// zap.Time("timestamp", time.Now()),
// )
// continue
// }
// raw, _ := json.Marshal(map[string]any{
// "balance": walletEvent.Balance.Float32(),
// "type": walletEvent.WalletType,
// "timestamp": time.Now(),
// })
// headline := ""
// message := ""
// var receiver domain.NotificationRecieverSide
// switch walletEvent.WalletType {
// case domain.StaticWalletType:
// headline = "Referral and Bonus Wallet Updated"
// message = fmt.Sprintf("Your referral and bonus wallet balance is now %.2f", walletEvent.Balance.Float32())
// receiver = domain.NotificationRecieverSideCustomer
// case domain.RegularWalletType:
// headline = "Wallet Updated"
// message = fmt.Sprintf("Your wallet balance is now %.2f", walletEvent.Balance.Float32())
// receiver = domain.NotificationRecieverSideCustomer
// case domain.BranchWalletType:
// headline = "Branch Wallet Updated"
// message = fmt.Sprintf("branch wallet balance is now %.2f", walletEvent.Balance.Float32())
// receiver = domain.NotificationRecieverSideBranchManager
// case domain.CompanyWalletType:
// headline = "Company Wallet Updated"
// message = fmt.Sprintf("company wallet balance is now %.2f", walletEvent.Balance.Float32())
// receiver = domain.NotificationRecieverSideAdmin
// }
// // Handle the wallet event: send notification
// notification := &domain.Notification{
// RecipientID: walletEvent.UserID,
// DeliveryChannel: domain.DeliveryChannelInApp,
// Reciever: receiver,
// Type: domain.NotificationTypeWalletUpdated,
// DeliveryStatus: domain.DeliveryStatusPending,
// IsRead: false,
// Level: domain.NotificationLevelInfo,
// Priority: 2,
// Metadata: raw,
// Payload: domain.NotificationPayload{
// Headline: headline,
// Message: message,
// },
// }
// if err := s.SendNotification(ctx, notification); err != nil {
// s.mongoLogger.Error("[NotificationSvc.KafkaConsumer] Failed to send notification",
// zap.Error(err),
// zap.Time("timestamp", time.Now()),
// )
// }
// }
// }()
// }
// func (s *Service) UpdateLiveWalletMetricForWallet(ctx context.Context, wallet domain.Wallet) {
// var (
// payload domain.LiveWalletMetrics
// event map[string]interface{}
// key = "live_metrics"
// )
// // Try company first
// company, companyErr := s.notificationStore.GetCompanyByWalletID(ctx, wallet.ID)
// if companyErr == nil {
// payload = domain.LiveWalletMetrics{
// Timestamp: time.Now(),
// CompanyBalances: []domain.CompanyWalletBalance{{
// CompanyID: company.ID,
// CompanyName: company.Name,
// Balance: float64(wallet.Balance),
// }},
// BranchBalances: []domain.BranchWalletBalance{},
// }
// event = map[string]interface{}{
// "type": "LIVE_WALLET_METRICS_UPDATE",
// "recipient_id": company.ID,
// "recipient_type": "company",
// "payload": payload,
// }
// } else {
// // Try branch next
// branch, branchErr := s.notificationStore.GetBranchByWalletID(ctx, wallet.ID)
// if branchErr == nil {
// payload = domain.LiveWalletMetrics{
// Timestamp: time.Now(),
// CompanyBalances: []domain.CompanyWalletBalance{},
// BranchBalances: []domain.BranchWalletBalance{{
// BranchID: branch.ID,
// BranchName: branch.Name,
// CompanyID: branch.CompanyID,
// Balance: float64(wallet.Balance),
// }},
// }
// event = map[string]interface{}{
// "type": "LIVE_WALLET_METRICS_UPDATE",
// "recipient_id": branch.ID,
// "recipient_type": "branch",
// "payload": payload,
// }
// } else {
// // Neither company nor branch matched this wallet
// // s.logger.Warn("wallet not linked to any company or branch", "walletID", wallet.ID)
// s.mongoLogger.Warn("wallet not linked to any company or branch",
// zap.Int64("walletID", wallet.ID),
// zap.Time("timestamp", time.Now()),
// )
// return
// }
// }
// // Save latest metric to Redis
// if jsonBytes, err := json.Marshal(payload); err == nil {
// s.redisClient.Set(ctx, key, jsonBytes, 0)
// } else {
// // s.logger.Error("failed to marshal wallet metrics payload", "walletID", wallet.ID, "err", err)
// s.mongoLogger.Error("failed to marshal wallet metrics payload",
// zap.Int64("walletID", wallet.ID),
// zap.Error(err),
// zap.Time("timestamp", time.Now()),
// )
// }
// // Publish via Redis
// if jsonEvent, err := json.Marshal(event); err == nil {
// s.redisClient.Publish(ctx, key, jsonEvent)
// } else {
// // s.logger.Error("failed to marshal event payload", "walletID", wallet.ID, "err", err)
// s.mongoLogger.Error("failed to marshal event payload",
// zap.Int64("walletID", wallet.ID),
// zap.Error(err),
// zap.Time("timestamp", time.Now()),
// )
// }
// // Broadcast over WebSocket
// s.Hub.Broadcast <- event
// }