290 lines
11 KiB
Go
290 lines
11 KiB
Go
package notificationservice
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"log/slog"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/SamuelTariku/FortuneBet-Backend/internal/config"
|
|
"github.com/SamuelTariku/FortuneBet-Backend/internal/domain"
|
|
"github.com/SamuelTariku/FortuneBet-Backend/internal/pkgs/helpers"
|
|
"github.com/SamuelTariku/FortuneBet-Backend/internal/repository"
|
|
"github.com/SamuelTariku/FortuneBet-Backend/internal/web_server/ws"
|
|
afro "github.com/amanuelabay/afrosms-go"
|
|
"github.com/gorilla/websocket"
|
|
)
|
|
|
|
type Service struct {
|
|
repo repository.NotificationRepository
|
|
Hub *ws.NotificationHub
|
|
connections sync.Map
|
|
notificationCh chan *domain.Notification
|
|
stopCh chan struct{}
|
|
config *config.Config
|
|
logger *slog.Logger
|
|
}
|
|
|
|
func New(repo repository.NotificationRepository, logger *slog.Logger, cfg *config.Config) *Service {
|
|
hub := ws.NewNotificationHub()
|
|
svc := &Service{
|
|
repo: repo,
|
|
Hub: hub,
|
|
logger: logger,
|
|
connections: sync.Map{},
|
|
notificationCh: make(chan *domain.Notification, 1000),
|
|
stopCh: make(chan struct{}),
|
|
config: cfg,
|
|
}
|
|
|
|
go hub.Run()
|
|
go svc.startWorker()
|
|
go svc.startRetryWorker()
|
|
|
|
return svc
|
|
}
|
|
|
|
func (s *Service) addConnection(recipientID int64, c *websocket.Conn) {
|
|
if c == nil {
|
|
s.logger.Warn("[NotificationSvc.AddConnection] Attempted to add nil WebSocket connection", "recipientID", recipientID)
|
|
return
|
|
}
|
|
|
|
s.connections.Store(recipientID, c)
|
|
s.logger.Info("[NotificationSvc.AddConnection] Added WebSocket connection", "recipientID", recipientID)
|
|
}
|
|
|
|
func (s *Service) SendNotification(ctx context.Context, notification *domain.Notification) error {
|
|
notification.ID = helpers.GenerateID()
|
|
notification.Timestamp = time.Now()
|
|
notification.DeliveryStatus = domain.DeliveryStatusPending
|
|
|
|
created, err := s.repo.CreateNotification(ctx, notification)
|
|
if err != nil {
|
|
s.logger.Error("[NotificationSvc.SendNotification] Failed to create notification", "id", notification.ID, "error", err)
|
|
return err
|
|
}
|
|
|
|
notification = created
|
|
|
|
if notification.DeliveryChannel == domain.DeliveryChannelInApp {
|
|
s.Hub.Broadcast <- map[string]interface{}{
|
|
"type": "CREATED_NOTIFICATION",
|
|
"recipient_id": notification.RecipientID,
|
|
"payload": notification,
|
|
}
|
|
}
|
|
|
|
select {
|
|
case s.notificationCh <- notification:
|
|
default:
|
|
s.logger.Warn("[NotificationSvc.SendNotification] Notification channel full, dropping notification", "id", notification.ID)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) MarkAsRead(ctx context.Context, notificationIDs []string, recipientID int64) error {
|
|
for _, notificationID := range notificationIDs {
|
|
_, err := s.repo.UpdateNotificationStatus(ctx, notificationID, string(domain.DeliveryStatusSent), true, nil)
|
|
if err != nil {
|
|
s.logger.Error("[NotificationSvc.MarkAsRead] Failed to mark notification as read", "notificationID", notificationID, "recipientID", recipientID, "error", err)
|
|
return err
|
|
}
|
|
|
|
// count, err := s.repo.CountUnreadNotifications(ctx, recipientID)
|
|
// if err != nil {
|
|
// s.logger.Error("[NotificationSvc.MarkAsRead] Failed to count unread notifications", "recipientID", recipientID, "error", err)
|
|
// return err
|
|
// }
|
|
|
|
// s.Hub.Broadcast <- map[string]interface{}{
|
|
// "type": "COUNT_NOT_OPENED_NOTIFICATION",
|
|
// "recipient_id": recipientID,
|
|
// "payload": map[string]int{
|
|
// "not_opened_notifications_count": int(count),
|
|
// },
|
|
// }
|
|
|
|
s.logger.Info("[NotificationSvc.MarkAsRead] Notification marked as read", "notificationID", notificationID, "recipientID", recipientID)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) ListNotifications(ctx context.Context, recipientID int64, limit, offset int) ([]domain.Notification, error) {
|
|
notifications, err := s.repo.ListNotifications(ctx, recipientID, limit, offset)
|
|
if err != nil {
|
|
s.logger.Error("[NotificationSvc.ListNotifications] Failed to list notifications", "recipientID", recipientID, "limit", limit, "offset", offset, "error", err)
|
|
return nil, err
|
|
}
|
|
s.logger.Info("[NotificationSvc.ListNotifications] Successfully listed notifications", "recipientID", recipientID, "count", len(notifications))
|
|
return notifications, nil
|
|
}
|
|
|
|
func (s *Service) GetAllNotifications(ctx context.Context, limit, offset int) ([]domain.Notification, error) {
|
|
notifications, err := s.repo.GetAllNotifications(ctx, limit, offset)
|
|
if err != nil {
|
|
s.logger.Error("[NotificationSvc.ListNotifications] Failed to get all notifications")
|
|
return nil, err
|
|
}
|
|
s.logger.Info("[NotificationSvc.ListNotifications] Successfully retrieved all notifications", "count", len(notifications))
|
|
return notifications, nil
|
|
}
|
|
|
|
func (s *Service) ConnectWebSocket(ctx context.Context, recipientID int64, c *websocket.Conn) error {
|
|
s.addConnection(recipientID, c)
|
|
s.logger.Info("[NotificationSvc.ConnectWebSocket] WebSocket connection established", "recipientID", recipientID)
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) DisconnectWebSocket(recipientID int64) {
|
|
if conn, loaded := s.connections.LoadAndDelete(recipientID); loaded {
|
|
conn.(*websocket.Conn).Close()
|
|
s.logger.Info("[NotificationSvc.DisconnectWebSocket] Disconnected WebSocket", "recipientID", recipientID)
|
|
}
|
|
}
|
|
|
|
func (s *Service) SendSMS(ctx context.Context, recipientID int64, message string) error {
|
|
s.logger.Info("[NotificationSvc.SendSMS] SMS notification requested", "recipientID", recipientID, "message", message)
|
|
|
|
apiKey := s.config.AFRO_SMS_API_KEY
|
|
senderName := s.config.AFRO_SMS_SENDER_NAME
|
|
receiverPhone := s.config.AFRO_SMS_RECEIVER_PHONE_NUMBER
|
|
hostURL := s.config.ADRO_SMS_HOST_URL
|
|
endpoint := "/api/send"
|
|
|
|
request := afro.GetRequest(apiKey, endpoint, hostURL)
|
|
request.Method = "GET"
|
|
request.Sender(senderName)
|
|
request.To(receiverPhone, message)
|
|
|
|
response, err := afro.MakeRequestWithContext(ctx, request)
|
|
if err != nil {
|
|
s.logger.Error("[NotificationSvc.SendSMS] Failed to send SMS", "recipientID", recipientID, "error", err)
|
|
return err
|
|
}
|
|
|
|
if response["acknowledge"] == "success" {
|
|
s.logger.Info("[NotificationSvc.SendSMS] SMS sent successfully", "recipientID", recipientID)
|
|
} else {
|
|
s.logger.Error("[NotificationSvc.SendSMS] Failed to send SMS", "recipientID", recipientID, "response", response["response"])
|
|
return errors.New("SMS delivery failed: " + response["response"].(string))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) SendEmail(ctx context.Context, recipientID int64, subject, message string) error {
|
|
s.logger.Info("[NotificationSvc.SendEmail] Email notification requested", "recipientID", recipientID, "subject", subject)
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) startWorker() {
|
|
for {
|
|
select {
|
|
case notification := <-s.notificationCh:
|
|
s.handleNotification(notification)
|
|
case <-s.stopCh:
|
|
s.logger.Info("[NotificationSvc.StartWorker] Worker stopped")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Service) ListRecipientIDs(ctx context.Context, receiver domain.NotificationRecieverSide) ([]int64, error) {
|
|
return s.repo.ListRecipientIDs(ctx, receiver)
|
|
}
|
|
|
|
func (s *Service) handleNotification(notification *domain.Notification) {
|
|
ctx := context.Background()
|
|
|
|
switch notification.DeliveryChannel {
|
|
case domain.DeliveryChannelSMS:
|
|
err := s.SendSMS(ctx, notification.RecipientID, notification.Payload.Message)
|
|
if err != nil {
|
|
notification.DeliveryStatus = domain.DeliveryStatusFailed
|
|
} else {
|
|
notification.DeliveryStatus = domain.DeliveryStatusSent
|
|
}
|
|
case domain.DeliveryChannelEmail:
|
|
err := s.SendEmail(ctx, notification.RecipientID, notification.Payload.Headline, notification.Payload.Message)
|
|
if err != nil {
|
|
notification.DeliveryStatus = domain.DeliveryStatusFailed
|
|
} else {
|
|
notification.DeliveryStatus = domain.DeliveryStatusSent
|
|
}
|
|
default:
|
|
if notification.DeliveryChannel != domain.DeliveryChannelInApp {
|
|
s.logger.Warn("[NotificationSvc.HandleNotification] Unsupported delivery channel", "channel", notification.DeliveryChannel)
|
|
notification.DeliveryStatus = domain.DeliveryStatusFailed
|
|
}
|
|
}
|
|
|
|
if _, err := s.repo.UpdateNotificationStatus(ctx, notification.ID, string(notification.DeliveryStatus), notification.IsRead, notification.Metadata); err != nil {
|
|
s.logger.Error("[NotificationSvc.HandleNotification] Failed to update notification status", "id", notification.ID, "error", err)
|
|
}
|
|
}
|
|
|
|
func (s *Service) startRetryWorker() {
|
|
ticker := time.NewTicker(1 * time.Minute)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
s.retryFailedNotifications()
|
|
case <-s.stopCh:
|
|
s.logger.Info("[NotificationSvc.StartRetryWorker] Retry worker stopped")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Service) retryFailedNotifications() {
|
|
ctx := context.Background()
|
|
failedNotifications, err := s.repo.ListFailedNotifications(ctx, 100)
|
|
if err != nil {
|
|
s.logger.Error("[NotificationSvc.RetryFailedNotifications] Failed to list failed notifications", "error", err)
|
|
return
|
|
}
|
|
|
|
for _, n := range failedNotifications {
|
|
notification := &n
|
|
go func(notification *domain.Notification) {
|
|
for attempt := 0; attempt < 3; attempt++ {
|
|
time.Sleep(time.Duration(attempt) * time.Second)
|
|
if notification.DeliveryChannel == domain.DeliveryChannelSMS {
|
|
if err := s.SendSMS(ctx, notification.RecipientID, notification.Payload.Message); err == nil {
|
|
notification.DeliveryStatus = domain.DeliveryStatusSent
|
|
if _, err := s.repo.UpdateNotificationStatus(ctx, notification.ID, string(notification.DeliveryStatus), notification.IsRead, notification.Metadata); err != nil {
|
|
s.logger.Error("[NotificationSvc.RetryFailedNotifications] Failed to update after retry", "id", notification.ID, "error", err)
|
|
}
|
|
s.logger.Info("[NotificationSvc.RetryFailedNotifications] Successfully retried notification", "id", notification.ID)
|
|
return
|
|
}
|
|
} else if notification.DeliveryChannel == domain.DeliveryChannelEmail {
|
|
if err := s.SendEmail(ctx, notification.RecipientID, notification.Payload.Headline, notification.Payload.Message); err == nil {
|
|
notification.DeliveryStatus = domain.DeliveryStatusSent
|
|
if _, err := s.repo.UpdateNotificationStatus(ctx, notification.ID, string(notification.DeliveryStatus), notification.IsRead, notification.Metadata); err != nil {
|
|
s.logger.Error("[NotificationSvc.RetryFailedNotifications] Failed to update after retry", "id", notification.ID, "error", err)
|
|
}
|
|
s.logger.Info("[NotificationSvc.RetryFailedNotifications] Successfully retried notification", "id", notification.ID)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
s.logger.Error("[NotificationSvc.RetryFailedNotifications] Max retries reached for notification", "id", notification.ID)
|
|
}(notification)
|
|
}
|
|
}
|
|
|
|
func (s *Service) CountUnreadNotifications(ctx context.Context, recipient_id int64) (int64, error) {
|
|
return s.repo.CountUnreadNotifications(ctx, recipient_id)
|
|
}
|
|
|
|
// func (s *Service) GetNotificationCounts(ctx context.Context, filter domain.ReportFilter) (total, read, unread int64, err error){
|
|
// return s.repo.Get(ctx, filter)
|
|
// }
|