Yimaru-BackEnd/internal/services/notfication/service.go

197 lines
5.8 KiB
Go

package notificationservice
import (
"context"
"log/slog"
"sync"
"time"
"github.com/SamuelTariku/FortuneBet-Backend/internal/domain"
"github.com/SamuelTariku/FortuneBet-Backend/internal/pkgs/helpers"
"github.com/SamuelTariku/FortuneBet-Backend/internal/repository"
"github.com/gofiber/websocket/v2"
)
type Service struct {
repo repository.NotificationRepository
logger *slog.Logger
connections sync.Map
notificationCh chan *domain.Notification
stopCh chan struct{}
}
func New(repo repository.NotificationRepository, logger *slog.Logger) NotificationStore {
svc := &Service{
repo: repo,
logger: logger,
connections: sync.Map{},
notificationCh: make(chan *domain.Notification, 1000),
stopCh: make(chan struct{}),
}
go svc.startWorker()
go svc.startRetryWorker()
return svc
}
func (s *Service) addConnection(ctx context.Context, recipientID string, c *websocket.Conn) {
if c == nil {
s.logger.Warn("Attempted to add nil WebSocket connection", "recipientID", recipientID)
return
}
s.connections.Store(recipientID, c)
s.logger.Info("Added WebSocket connection", "recipientID", recipientID)
}
func (s *Service) SendNotification(ctx context.Context, notification *domain.Notification) error {
notification.ID = helpers.GenerateID()
notification.Timestamp = time.Now()
notification.DeliveryStatus = domain.DeliveryStatusPending
created, err := s.repo.CreateNotification(ctx, notification)
if err != nil {
return err
}
notification = created
select {
case s.notificationCh <- notification:
default:
s.logger.Error("Notification channel full, dropping notification", "id", notification.ID)
}
return nil
}
func (s *Service) MarkAsRead(ctx context.Context, notificationID, recipientID string) error {
_, err := s.repo.UpdateNotificationStatus(ctx, notificationID, string(domain.DeliveryStatusSent), true, nil)
if err != nil {
return err
}
return nil
}
func (s *Service) ListNotifications(ctx context.Context, recipientID string, limit, offset int) ([]domain.Notification, error) {
return s.repo.ListNotifications(ctx, recipientID, limit, offset)
}
func (s *Service) ConnectWebSocket(ctx context.Context, recipientID string, c *websocket.Conn) error {
s.addConnection(ctx, recipientID, c)
defer func() {
s.DisconnectWebSocket(recipientID)
}()
for {
_, _, err := c.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
s.logger.Error("WebSocket error", "recipientID", recipientID, "error", err)
}
return nil
}
}
}
func (s *Service) DisconnectWebSocket(recipientID string) {
s.connections.Delete(recipientID)
if conn, loaded := s.connections.LoadAndDelete(recipientID); loaded {
conn.(*websocket.Conn).Close()
s.logger.Info("Disconnected WebSocket", "recipientID", recipientID)
}
}
func (s *Service) SendSMS(ctx context.Context, recipientID, message string) error {
s.logger.Info("SMS notification requested", "recipientID", recipientID, "message", message)
return nil
}
func (s *Service) SendEmail(ctx context.Context, recipientID, subject, message string) error {
s.logger.Info("Email notification requested", "recipientID", recipientID, "subject", subject)
return nil
}
func (s *Service) startWorker() {
for {
select {
case notification := <-s.notificationCh:
s.handleNotification(notification)
case <-s.stopCh:
return
}
}
}
func (s *Service) handleNotification(notification *domain.Notification) {
ctx := context.Background()
if conn, ok := s.connections.Load(notification.RecipientID); ok {
data, err := notification.ToJSON()
if err != nil {
s.logger.Error("Failed to serialize notification", "id", notification.ID, "error", err)
return
}
if err := conn.(*websocket.Conn).WriteMessage(websocket.TextMessage, data); err != nil {
s.logger.Error("Failed to send WebSocket message", "id", notification.ID, "error", err)
notification.DeliveryStatus = domain.DeliveryStatusFailed
} else {
notification.DeliveryStatus = domain.DeliveryStatusSent
}
} else {
s.logger.Warn("No WebSocket connection for recipient", "recipientID", notification.RecipientID)
notification.DeliveryStatus = domain.DeliveryStatusFailed
}
if _, err := s.repo.UpdateNotificationStatus(ctx, notification.ID, string(notification.DeliveryStatus), notification.IsRead, notification.Metadata); err != nil {
s.logger.Error("Failed to update notification status", "id", notification.ID, "error", err)
}
}
func (s *Service) startRetryWorker() {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.retryFailedNotifications()
case <-s.stopCh:
return
}
}
}
func (s *Service) retryFailedNotifications() {
ctx := context.Background()
failedNotifications, err := s.repo.ListFailedNotifications(ctx, 100)
if err != nil {
s.logger.Error("Failed to list failed notifications", "error", err)
return
}
for _, n := range failedNotifications {
notification := &n
go func(notification *domain.Notification) {
for attempt := 0; attempt < 3; attempt++ {
time.Sleep(time.Duration(attempt) * time.Second)
if conn, ok := s.connections.Load(notification.RecipientID); ok {
data, err := notification.ToJSON()
if err != nil {
continue
}
if err := conn.(*websocket.Conn).WriteMessage(websocket.TextMessage, data); err == nil {
notification.DeliveryStatus = domain.DeliveryStatusSent
if _, err := s.repo.UpdateNotificationStatus(ctx, notification.ID, string(notification.DeliveryStatus), notification.IsRead, notification.Metadata); err != nil {
s.logger.Error("Failed to update after retry", "id", notification.ID, "error", err)
}
return
}
}
}
s.logger.Error("Max retries reached for notification", "id", notification.ID)
}(notification)
}
}