Yimaru-BackEnd/internal/services/notfication/service.go

476 lines
16 KiB
Go

package notificationservice
import (
"context"
"encoding/json"
"errors"
"log/slog"
"sync"
"time"
"github.com/SamuelTariku/FortuneBet-Backend/internal/config"
"github.com/SamuelTariku/FortuneBet-Backend/internal/domain"
"github.com/SamuelTariku/FortuneBet-Backend/internal/pkgs/helpers"
"github.com/SamuelTariku/FortuneBet-Backend/internal/repository"
// "github.com/SamuelTariku/FortuneBet-Backend/internal/services/wallet"
"github.com/SamuelTariku/FortuneBet-Backend/internal/web_server/ws"
afro "github.com/amanuelabay/afrosms-go"
"github.com/gorilla/websocket"
"github.com/redis/go-redis/v9"
)
type Service struct {
repo repository.NotificationRepository
Hub *ws.NotificationHub
// notificationStore
connections sync.Map
notificationCh chan *domain.Notification
stopCh chan struct{}
config *config.Config
logger *slog.Logger
redisClient *redis.Client
}
func New(repo repository.NotificationRepository, logger *slog.Logger, cfg *config.Config) *Service {
hub := ws.NewNotificationHub()
rdb := redis.NewClient(&redis.Options{
Addr: cfg.RedisAddr, // e.g., "redis:6379"
})
svc := &Service{
repo: repo,
Hub: hub,
logger: logger,
connections: sync.Map{},
notificationCh: make(chan *domain.Notification, 1000),
stopCh: make(chan struct{}),
config: cfg,
redisClient: rdb,
}
go hub.Run()
go svc.startWorker()
go svc.startRetryWorker()
go svc.RunRedisSubscriber(context.Background())
return svc
}
func (s *Service) addConnection(recipientID int64, c *websocket.Conn) {
if c == nil {
s.logger.Warn("[NotificationSvc.AddConnection] Attempted to add nil WebSocket connection", "recipientID", recipientID)
return
}
s.connections.Store(recipientID, c)
s.logger.Info("[NotificationSvc.AddConnection] Added WebSocket connection", "recipientID", recipientID)
}
func (s *Service) SendNotification(ctx context.Context, notification *domain.Notification) error {
notification.ID = helpers.GenerateID()
notification.Timestamp = time.Now()
notification.DeliveryStatus = domain.DeliveryStatusPending
created, err := s.repo.CreateNotification(ctx, notification)
if err != nil {
s.logger.Error("[NotificationSvc.SendNotification] Failed to create notification", "id", notification.ID, "error", err)
return err
}
notification = created
if notification.DeliveryChannel == domain.DeliveryChannelInApp {
s.Hub.Broadcast <- map[string]interface{}{
"type": "CREATED_NOTIFICATION",
"recipient_id": notification.RecipientID,
"payload": notification,
}
}
select {
case s.notificationCh <- notification:
default:
s.logger.Warn("[NotificationSvc.SendNotification] Notification channel full, dropping notification", "id", notification.ID)
}
return nil
}
func (s *Service) MarkAsRead(ctx context.Context, notificationIDs []string, recipientID int64) error {
for _, notificationID := range notificationIDs {
_, err := s.repo.UpdateNotificationStatus(ctx, notificationID, string(domain.DeliveryStatusSent), true, nil)
if err != nil {
s.logger.Error("[NotificationSvc.MarkAsRead] Failed to mark notification as read", "notificationID", notificationID, "recipientID", recipientID, "error", err)
return err
}
// count, err := s.repo.CountUnreadNotifications(ctx, recipientID)
// if err != nil {
// s.logger.Error("[NotificationSvc.MarkAsRead] Failed to count unread notifications", "recipientID", recipientID, "error", err)
// return err
// }
// s.Hub.Broadcast <- map[string]interface{}{
// "type": "COUNT_NOT_OPENED_NOTIFICATION",
// "recipient_id": recipientID,
// "payload": map[string]int{
// "not_opened_notifications_count": int(count),
// },
// }
s.logger.Info("[NotificationSvc.MarkAsRead] Notification marked as read", "notificationID", notificationID, "recipientID", recipientID)
}
return nil
}
func (s *Service) ListNotifications(ctx context.Context, recipientID int64, limit, offset int) ([]domain.Notification, error) {
notifications, err := s.repo.ListNotifications(ctx, recipientID, limit, offset)
if err != nil {
s.logger.Error("[NotificationSvc.ListNotifications] Failed to list notifications", "recipientID", recipientID, "limit", limit, "offset", offset, "error", err)
return nil, err
}
s.logger.Info("[NotificationSvc.ListNotifications] Successfully listed notifications", "recipientID", recipientID, "count", len(notifications))
return notifications, nil
}
func (s *Service) GetAllNotifications(ctx context.Context, limit, offset int) ([]domain.Notification, error) {
notifications, err := s.repo.GetAllNotifications(ctx, limit, offset)
if err != nil {
s.logger.Error("[NotificationSvc.ListNotifications] Failed to get all notifications")
return nil, err
}
s.logger.Info("[NotificationSvc.ListNotifications] Successfully retrieved all notifications", "count", len(notifications))
return notifications, nil
}
func (s *Service) ConnectWebSocket(ctx context.Context, recipientID int64, c *websocket.Conn) error {
s.addConnection(recipientID, c)
s.logger.Info("[NotificationSvc.ConnectWebSocket] WebSocket connection established", "recipientID", recipientID)
return nil
}
func (s *Service) DisconnectWebSocket(recipientID int64) {
if conn, loaded := s.connections.LoadAndDelete(recipientID); loaded {
conn.(*websocket.Conn).Close()
s.logger.Info("[NotificationSvc.DisconnectWebSocket] Disconnected WebSocket", "recipientID", recipientID)
}
}
func (s *Service) SendSMS(ctx context.Context, recipientID int64, message string) error {
s.logger.Info("[NotificationSvc.SendSMS] SMS notification requested", "recipientID", recipientID, "message", message)
apiKey := s.config.AFRO_SMS_API_KEY
senderName := s.config.AFRO_SMS_SENDER_NAME
receiverPhone := s.config.AFRO_SMS_RECEIVER_PHONE_NUMBER
hostURL := s.config.ADRO_SMS_HOST_URL
endpoint := "/api/send"
request := afro.GetRequest(apiKey, endpoint, hostURL)
request.Method = "GET"
request.Sender(senderName)
request.To(receiverPhone, message)
response, err := afro.MakeRequestWithContext(ctx, request)
if err != nil {
s.logger.Error("[NotificationSvc.SendSMS] Failed to send SMS", "recipientID", recipientID, "error", err)
return err
}
if response["acknowledge"] == "success" {
s.logger.Info("[NotificationSvc.SendSMS] SMS sent successfully", "recipientID", recipientID)
} else {
s.logger.Error("[NotificationSvc.SendSMS] Failed to send SMS", "recipientID", recipientID, "response", response["response"])
return errors.New("SMS delivery failed: " + response["response"].(string))
}
return nil
}
func (s *Service) SendEmail(ctx context.Context, recipientID int64, subject, message string) error {
s.logger.Info("[NotificationSvc.SendEmail] Email notification requested", "recipientID", recipientID, "subject", subject)
return nil
}
func (s *Service) startWorker() {
for {
select {
case notification := <-s.notificationCh:
s.handleNotification(notification)
case <-s.stopCh:
s.logger.Info("[NotificationSvc.StartWorker] Worker stopped")
return
}
}
}
func (s *Service) ListRecipientIDs(ctx context.Context, receiver domain.NotificationRecieverSide) ([]int64, error) {
return s.repo.ListRecipientIDs(ctx, receiver)
}
func (s *Service) handleNotification(notification *domain.Notification) {
ctx := context.Background()
switch notification.DeliveryChannel {
case domain.DeliveryChannelSMS:
err := s.SendSMS(ctx, notification.RecipientID, notification.Payload.Message)
if err != nil {
notification.DeliveryStatus = domain.DeliveryStatusFailed
} else {
notification.DeliveryStatus = domain.DeliveryStatusSent
}
case domain.DeliveryChannelEmail:
err := s.SendEmail(ctx, notification.RecipientID, notification.Payload.Headline, notification.Payload.Message)
if err != nil {
notification.DeliveryStatus = domain.DeliveryStatusFailed
} else {
notification.DeliveryStatus = domain.DeliveryStatusSent
}
default:
if notification.DeliveryChannel != domain.DeliveryChannelInApp {
s.logger.Warn("[NotificationSvc.HandleNotification] Unsupported delivery channel", "channel", notification.DeliveryChannel)
notification.DeliveryStatus = domain.DeliveryStatusFailed
}
}
if _, err := s.repo.UpdateNotificationStatus(ctx, notification.ID, string(notification.DeliveryStatus), notification.IsRead, notification.Metadata); err != nil {
s.logger.Error("[NotificationSvc.HandleNotification] Failed to update notification status", "id", notification.ID, "error", err)
}
}
func (s *Service) startRetryWorker() {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.retryFailedNotifications()
case <-s.stopCh:
s.logger.Info("[NotificationSvc.StartRetryWorker] Retry worker stopped")
return
}
}
}
func (s *Service) retryFailedNotifications() {
ctx := context.Background()
failedNotifications, err := s.repo.ListFailedNotifications(ctx, 100)
if err != nil {
s.logger.Error("[NotificationSvc.RetryFailedNotifications] Failed to list failed notifications", "error", err)
return
}
for _, n := range failedNotifications {
notification := &n
go func(notification *domain.Notification) {
for attempt := 0; attempt < 3; attempt++ {
time.Sleep(time.Duration(attempt) * time.Second)
switch notification.DeliveryChannel {
case domain.DeliveryChannelSMS:
if err := s.SendSMS(ctx, notification.RecipientID, notification.Payload.Message); err == nil {
notification.DeliveryStatus = domain.DeliveryStatusSent
if _, err := s.repo.UpdateNotificationStatus(ctx, notification.ID, string(notification.DeliveryStatus), notification.IsRead, notification.Metadata); err != nil {
s.logger.Error("[NotificationSvc.RetryFailedNotifications] Failed to update after retry", "id", notification.ID, "error", err)
}
s.logger.Info("[NotificationSvc.RetryFailedNotifications] Successfully retried notification", "id", notification.ID)
return
}
case domain.DeliveryChannelEmail:
if err := s.SendEmail(ctx, notification.RecipientID, notification.Payload.Headline, notification.Payload.Message); err == nil {
notification.DeliveryStatus = domain.DeliveryStatusSent
if _, err := s.repo.UpdateNotificationStatus(ctx, notification.ID, string(notification.DeliveryStatus), notification.IsRead, notification.Metadata); err != nil {
s.logger.Error("[NotificationSvc.RetryFailedNotifications] Failed to update after retry", "id", notification.ID, "error", err)
}
s.logger.Info("[NotificationSvc.RetryFailedNotifications] Successfully retried notification", "id", notification.ID)
return
}
}
}
s.logger.Error("[NotificationSvc.RetryFailedNotifications] Max retries reached for notification", "id", notification.ID)
}(notification)
}
}
func (s *Service) CountUnreadNotifications(ctx context.Context, recipient_id int64) (int64, error) {
return s.repo.CountUnreadNotifications(ctx, recipient_id)
}
// func (s *Service) GetNotificationCounts(ctx context.Context, filter domain.ReportFilter) (total, read, unread int64, err error){
// return s.repo.Get(ctx, filter)
// }
func (s *Service) RunRedisSubscriber(ctx context.Context) {
pubsub := s.redisClient.Subscribe(ctx, "live_metrics")
defer pubsub.Close()
ch := pubsub.Channel()
for msg := range ch {
var parsed map[string]interface{}
if err := json.Unmarshal([]byte(msg.Payload), &parsed); err != nil {
s.logger.Error("invalid Redis message format", "payload", msg.Payload, "error", err)
continue
}
eventType, _ := parsed["type"].(string)
payload := parsed["payload"]
recipientID, hasRecipient := parsed["recipient_id"]
recipientType, _ := parsed["recipient_type"].(string)
message := map[string]interface{}{
"type": eventType,
"payload": payload,
}
if hasRecipient {
message["recipient_id"] = recipientID
message["recipient_type"] = recipientType
}
s.Hub.Broadcast <- message
}
}
func (s *Service) UpdateLiveWalletMetrics(ctx context.Context, companies []domain.GetCompany, branches []domain.BranchWallet) error {
const key = "live_metrics"
companyBalances := make([]domain.CompanyWalletBalance, 0, len(companies))
for _, c := range companies {
companyBalances = append(companyBalances, domain.CompanyWalletBalance{
CompanyID: c.ID,
CompanyName: c.Name,
Balance: float64(c.WalletBalance.Float32()),
})
}
branchBalances := make([]domain.BranchWalletBalance, 0, len(branches))
for _, b := range branches {
branchBalances = append(branchBalances, domain.BranchWalletBalance{
BranchID: b.ID,
BranchName: b.Name,
CompanyID: b.CompanyID,
Balance: float64(b.Balance.Float32()),
})
}
payload := domain.LiveWalletMetrics{
Timestamp: time.Now(),
CompanyBalances: companyBalances,
BranchBalances: branchBalances,
}
updatedData, err := json.Marshal(payload)
if err != nil {
return err
}
if err := s.redisClient.Set(ctx, key, updatedData, 0).Err(); err != nil {
return err
}
if err := s.redisClient.Publish(ctx, key, updatedData).Err(); err != nil {
return err
}
return nil
}
func (s *Service) GetLiveMetrics(ctx context.Context) (domain.LiveMetric, error) {
const key = "live_metrics"
var metric domain.LiveMetric
val, err := s.redisClient.Get(ctx, key).Result()
if err == redis.Nil {
// Key does not exist yet, return zero-valued struct
return domain.LiveMetric{}, nil
} else if err != nil {
return domain.LiveMetric{}, err
}
if err := json.Unmarshal([]byte(val), &metric); err != nil {
return domain.LiveMetric{}, err
}
return metric, nil
}
func (s *Service) UpdateLiveMetricForWallet(ctx context.Context, wallet domain.Wallet) {
var (
payload domain.LiveWalletMetrics
event map[string]interface{}
key = "live_metrics"
)
// Try company first
company, companyErr := s.GetCompanyByWalletID(ctx, wallet.ID)
if companyErr == nil {
payload = domain.LiveWalletMetrics{
Timestamp: time.Now(),
CompanyBalances: []domain.CompanyWalletBalance{{
CompanyID: company.ID,
CompanyName: company.Name,
Balance: float64(wallet.Balance),
}},
BranchBalances: []domain.BranchWalletBalance{},
}
event = map[string]interface{}{
"type": "LIVE_WALLET_METRICS_UPDATE",
"recipient_id": company.ID,
"recipient_type": "company",
"payload": payload,
}
} else {
// Try branch next
branch, branchErr := s.GetBranchByWalletID(ctx, wallet.ID)
if branchErr == nil {
payload = domain.LiveWalletMetrics{
Timestamp: time.Now(),
CompanyBalances: []domain.CompanyWalletBalance{},
BranchBalances: []domain.BranchWalletBalance{{
BranchID: branch.ID,
BranchName: branch.Name,
CompanyID: branch.CompanyID,
Balance: float64(wallet.Balance),
}},
}
event = map[string]interface{}{
"type": "LIVE_WALLET_METRICS_UPDATE",
"recipient_id": branch.ID,
"recipient_type": "branch",
"payload": payload,
}
} else {
// Neither company nor branch matched this wallet
s.logger.Warn("wallet not linked to any company or branch", "walletID", wallet.ID)
return
}
}
// Save latest metric to Redis
if jsonBytes, err := json.Marshal(payload); err == nil {
s.redisClient.Set(ctx, key, jsonBytes, 0)
} else {
s.logger.Error("failed to marshal wallet metrics payload", "walletID", wallet.ID, "err", err)
}
// Publish via Redis
if jsonEvent, err := json.Marshal(event); err == nil {
s.redisClient.Publish(ctx, key, jsonEvent)
} else {
s.logger.Error("failed to marshal event payload", "walletID", wallet.ID, "err", err)
}
// Broadcast over WebSocket
s.Hub.Broadcast <- event
}
func (s *Service) GetCompanyByWalletID(ctx context.Context, walletID int64) (domain.Company, error) {
return s.GetCompanyByWalletID(ctx, walletID)
}
func (s *Service) GetBranchByWalletID(ctx context.Context, walletID int64) (domain.Branch, error) {
return s.GetBranchByWalletID(ctx, walletID)
}