package notificationservice import ( "context" "errors" "log/slog" "sync" "time" "github.com/SamuelTariku/FortuneBet-Backend/internal/config" "github.com/SamuelTariku/FortuneBet-Backend/internal/domain" "github.com/SamuelTariku/FortuneBet-Backend/internal/pkgs/helpers" "github.com/SamuelTariku/FortuneBet-Backend/internal/repository" "github.com/SamuelTariku/FortuneBet-Backend/internal/web_server/ws" afro "github.com/amanuelabay/afrosms-go" "github.com/gorilla/websocket" ) type Service struct { repo repository.NotificationRepository Hub *ws.NotificationHub connections sync.Map notificationCh chan *domain.Notification stopCh chan struct{} config *config.Config logger *slog.Logger } func New(repo repository.NotificationRepository, logger *slog.Logger, cfg *config.Config) *Service { hub := ws.NewNotificationHub() svc := &Service{ repo: repo, Hub: hub, logger: logger, connections: sync.Map{}, notificationCh: make(chan *domain.Notification, 1000), stopCh: make(chan struct{}), config: cfg, } go hub.Run() go svc.startWorker() go svc.startRetryWorker() return svc } func (s *Service) addConnection(ctx context.Context, recipientID int64, c *websocket.Conn) { if c == nil { s.logger.Warn("[NotificationSvc.AddConnection] Attempted to add nil WebSocket connection", "recipientID", recipientID) return } s.connections.Store(recipientID, c) s.logger.Info("[NotificationSvc.AddConnection] Added WebSocket connection", "recipientID", recipientID) } func (s *Service) SendNotification(ctx context.Context, notification *domain.Notification) error { notification.ID = helpers.GenerateID() notification.Timestamp = time.Now() notification.DeliveryStatus = domain.DeliveryStatusPending created, err := s.repo.CreateNotification(ctx, notification) if err != nil { s.logger.Error("[NotificationSvc.SendNotification] Failed to create notification", "id", notification.ID, "error", err) return err } notification = created if notification.DeliveryChannel == domain.DeliveryChannelInApp { s.Hub.Broadcast <- map[string]interface{}{ "type": "CREATED_NOTIFICATION", "recipient_id": notification.RecipientID, "payload": notification, } } select { case s.notificationCh <- notification: default: s.logger.Warn("[NotificationSvc.SendNotification] Notification channel full, dropping notification", "id", notification.ID) } return nil } func (s *Service) MarkAsRead(ctx context.Context, notificationID string, recipientID int64) error { _, err := s.repo.UpdateNotificationStatus(ctx, notificationID, string(domain.DeliveryStatusSent), true, nil) if err != nil { s.logger.Error("[NotificationSvc.MarkAsRead] Failed to mark notification as read", "notificationID", notificationID, "recipientID", recipientID, "error", err) return err } // count, err := s.repo.CountUnreadNotifications(ctx, recipientID) // if err != nil { // s.logger.Error("[NotificationSvc.MarkAsRead] Failed to count unread notifications", "recipientID", recipientID, "error", err) // return err // } // s.Hub.Broadcast <- map[string]interface{}{ // "type": "COUNT_NOT_OPENED_NOTIFICATION", // "recipient_id": recipientID, // "payload": map[string]int{ // "not_opened_notifications_count": int(count), // }, // } s.logger.Info("[NotificationSvc.MarkAsRead] Notification marked as read", "notificationID", notificationID, "recipientID", recipientID) return nil } func (s *Service) ListNotifications(ctx context.Context, recipientID int64, limit, offset int) ([]domain.Notification, error) { notifications, err := s.repo.ListNotifications(ctx, recipientID, limit, offset) if err != nil { s.logger.Error("[NotificationSvc.ListNotifications] Failed to list notifications", "recipientID", recipientID, "limit", limit, "offset", offset, "error", err) return nil, err } s.logger.Info("[NotificationSvc.ListNotifications] Successfully listed notifications", "recipientID", recipientID, "count", len(notifications)) return notifications, nil } func (s *Service) ConnectWebSocket(ctx context.Context, recipientID int64, c *websocket.Conn) error { s.addConnection(ctx, recipientID, c) s.logger.Info("[NotificationSvc.ConnectWebSocket] WebSocket connection established", "recipientID", recipientID) return nil } func (s *Service) DisconnectWebSocket(recipientID int64) { if conn, loaded := s.connections.LoadAndDelete(recipientID); loaded { conn.(*websocket.Conn).Close() s.logger.Info("[NotificationSvc.DisconnectWebSocket] Disconnected WebSocket", "recipientID", recipientID) } } func (s *Service) SendSMS(ctx context.Context, recipientID int64, message string) error { s.logger.Info("[NotificationSvc.SendSMS] SMS notification requested", "recipientID", recipientID, "message", message) apiKey := s.config.AFRO_SMS_API_KEY senderName := s.config.AFRO_SMS_SENDER_NAME receiverPhone := s.config.AFRO_SMS_RECEIVER_PHONE_NUMBER hostURL := s.config.ADRO_SMS_HOST_URL endpoint := "/api/send" request := afro.GetRequest(apiKey, endpoint, hostURL) request.Method = "GET" request.Sender(senderName) request.To(receiverPhone, message) response, err := afro.MakeRequestWithContext(ctx, request) if err != nil { s.logger.Error("[NotificationSvc.SendSMS] Failed to send SMS", "recipientID", recipientID, "error", err) return err } if response["acknowledge"] == "success" { s.logger.Info("[NotificationSvc.SendSMS] SMS sent successfully", "recipientID", recipientID) } else { s.logger.Error("[NotificationSvc.SendSMS] Failed to send SMS", "recipientID", recipientID, "response", response["response"]) return errors.New("SMS delivery failed: " + response["response"].(string)) } return nil } func (s *Service) SendEmail(ctx context.Context, recipientID int64, subject, message string) error { s.logger.Info("[NotificationSvc.SendEmail] Email notification requested", "recipientID", recipientID, "subject", subject) return nil } func (s *Service) startWorker() { for { select { case notification := <-s.notificationCh: s.handleNotification(notification) case <-s.stopCh: s.logger.Info("[NotificationSvc.StartWorker] Worker stopped") return } } } func (s *Service) ListRecipientIDs(ctx context.Context, receiver domain.NotificationRecieverSide) ([]int64, error) { return s.repo.ListRecipientIDs(ctx, receiver) } func (s *Service) handleNotification(notification *domain.Notification) { ctx := context.Background() switch notification.DeliveryChannel { case domain.DeliveryChannelSMS: err := s.SendSMS(ctx, notification.RecipientID, notification.Payload.Message) if err != nil { notification.DeliveryStatus = domain.DeliveryStatusFailed } else { notification.DeliveryStatus = domain.DeliveryStatusSent } case domain.DeliveryChannelEmail: err := s.SendEmail(ctx, notification.RecipientID, notification.Payload.Headline, notification.Payload.Message) if err != nil { notification.DeliveryStatus = domain.DeliveryStatusFailed } else { notification.DeliveryStatus = domain.DeliveryStatusSent } default: if notification.DeliveryChannel != domain.DeliveryChannelInApp { s.logger.Warn("[NotificationSvc.HandleNotification] Unsupported delivery channel", "channel", notification.DeliveryChannel) notification.DeliveryStatus = domain.DeliveryStatusFailed } } if _, err := s.repo.UpdateNotificationStatus(ctx, notification.ID, string(notification.DeliveryStatus), notification.IsRead, notification.Metadata); err != nil { s.logger.Error("[NotificationSvc.HandleNotification] Failed to update notification status", "id", notification.ID, "error", err) } } func (s *Service) startRetryWorker() { ticker := time.NewTicker(1 * time.Minute) defer ticker.Stop() for { select { case <-ticker.C: s.retryFailedNotifications() case <-s.stopCh: s.logger.Info("[NotificationSvc.StartRetryWorker] Retry worker stopped") return } } } func (s *Service) retryFailedNotifications() { ctx := context.Background() failedNotifications, err := s.repo.ListFailedNotifications(ctx, 100) if err != nil { s.logger.Error("[NotificationSvc.RetryFailedNotifications] Failed to list failed notifications", "error", err) return } for _, n := range failedNotifications { notification := &n go func(notification *domain.Notification) { for attempt := 0; attempt < 3; attempt++ { time.Sleep(time.Duration(attempt) * time.Second) if notification.DeliveryChannel == domain.DeliveryChannelSMS { if err := s.SendSMS(ctx, notification.RecipientID, notification.Payload.Message); err == nil { notification.DeliveryStatus = domain.DeliveryStatusSent if _, err := s.repo.UpdateNotificationStatus(ctx, notification.ID, string(notification.DeliveryStatus), notification.IsRead, notification.Metadata); err != nil { s.logger.Error("[NotificationSvc.RetryFailedNotifications] Failed to update after retry", "id", notification.ID, "error", err) } s.logger.Info("[NotificationSvc.RetryFailedNotifications] Successfully retried notification", "id", notification.ID) return } } else if notification.DeliveryChannel == domain.DeliveryChannelEmail { if err := s.SendEmail(ctx, notification.RecipientID, notification.Payload.Headline, notification.Payload.Message); err == nil { notification.DeliveryStatus = domain.DeliveryStatusSent if _, err := s.repo.UpdateNotificationStatus(ctx, notification.ID, string(notification.DeliveryStatus), notification.IsRead, notification.Metadata); err != nil { s.logger.Error("[NotificationSvc.RetryFailedNotifications] Failed to update after retry", "id", notification.ID, "error", err) } s.logger.Info("[NotificationSvc.RetryFailedNotifications] Successfully retried notification", "id", notification.ID) return } } } s.logger.Error("[NotificationSvc.RetryFailedNotifications] Max retries reached for notification", "id", notification.ID) }(notification) } }