1461 lines
44 KiB
Go
1461 lines
44 KiB
Go
package notificationservice
|
|
|
|
import (
|
|
"Yimaru-Backend/internal/config"
|
|
"Yimaru-Backend/internal/domain"
|
|
"Yimaru-Backend/internal/pkgs/helpers"
|
|
"Yimaru-Backend/internal/ports"
|
|
"Yimaru-Backend/internal/services/messenger"
|
|
"Yimaru-Backend/internal/services/user"
|
|
"Yimaru-Backend/internal/web_server/ws"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/url"
|
|
"strconv"
|
|
|
|
// "errors"
|
|
"log/slog"
|
|
"sync"
|
|
"time"
|
|
|
|
// "github.com/segmentio/kafka-go"
|
|
"go.uber.org/zap"
|
|
// afro "github.com/amanuelabay/afrosms-go"
|
|
firebase "firebase.google.com/go/v4"
|
|
"firebase.google.com/go/v4/messaging"
|
|
afro "github.com/amanuelabay/afrosms-go"
|
|
"github.com/gorilla/websocket"
|
|
"github.com/resend/resend-go/v2"
|
|
"google.golang.org/api/option"
|
|
// "github.com/redis/go-redis/v9"
|
|
)
|
|
|
|
type Service struct {
|
|
store ports.NotificationStore
|
|
Hub *ws.NotificationHub
|
|
connections sync.Map
|
|
notificationCh chan *domain.Notification
|
|
stopCh chan struct{}
|
|
config *config.Config
|
|
userSvc *user.Service
|
|
messengerSvc *messenger.Service
|
|
mongoLogger *zap.Logger
|
|
logger *slog.Logger
|
|
fcmClient *messaging.Client
|
|
}
|
|
|
|
func New(
|
|
store ports.NotificationStore,
|
|
mongoLogger *zap.Logger,
|
|
logger *slog.Logger,
|
|
cfg *config.Config,
|
|
messengerSvc *messenger.Service,
|
|
userSvc *user.Service,
|
|
) *Service {
|
|
hub := ws.NewNotificationHub()
|
|
|
|
svc := &Service{
|
|
store: store,
|
|
Hub: hub,
|
|
mongoLogger: mongoLogger,
|
|
logger: logger,
|
|
connections: sync.Map{},
|
|
notificationCh: make(chan *domain.Notification, 1000),
|
|
stopCh: make(chan struct{}),
|
|
messengerSvc: messengerSvc,
|
|
userSvc: userSvc,
|
|
config: cfg,
|
|
}
|
|
|
|
// Initialize FCM client if service account key is provided
|
|
if cfg.FCMServiceAccountKey != "" {
|
|
if err := svc.initFCMClient(); err != nil {
|
|
mongoLogger.Error("Failed to initialize FCM client", zap.Error(err))
|
|
}
|
|
}
|
|
|
|
go hub.Run()
|
|
go svc.startWorker()
|
|
go svc.startSchedulerWorker()
|
|
// go svc.startRetryWorker()
|
|
// go svc.RunRedisSubscriber(context.Background())
|
|
// go svc.StartKafkaConsumer(context.Background())
|
|
|
|
return svc
|
|
}
|
|
|
|
func (s *Service) initFCMClient() error {
|
|
ctx := context.Background()
|
|
|
|
// Prepare client options; if a service account JSON string is provided, use it.
|
|
var opts []option.ClientOption
|
|
if s.config.FCMServiceAccountKey != "" {
|
|
opts = append(opts, option.WithCredentialsJSON([]byte(s.config.FCMServiceAccountKey)))
|
|
}
|
|
|
|
// Initialize Firebase app
|
|
app, err := firebase.NewApp(ctx, nil, opts...)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to initialize Firebase app: %w", err)
|
|
}
|
|
|
|
// Get messaging client
|
|
client, err := app.Messaging(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get FCM client: %w", err)
|
|
}
|
|
|
|
s.fcmClient = client
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) SendAfroMessageSMS(ctx context.Context, receiverPhone, message string) error {
|
|
apiKey := s.config.AFRO_SMS_API_KEY
|
|
senderName := s.config.AFRO_SMS_SENDER_NAME
|
|
|
|
baseURL := "https://api.afromessage.com"
|
|
endpoint := "/api/send"
|
|
|
|
request := afro.GetRequest(apiKey, endpoint, baseURL)
|
|
|
|
// MUST be POST
|
|
request.Method = "POST"
|
|
|
|
request.Sender(senderName)
|
|
request.To(receiverPhone, message)
|
|
|
|
response, err := afro.MakeRequestWithContext(ctx, request)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
ack, ok := response["acknowledge"].(string)
|
|
if !ok {
|
|
return fmt.Errorf("unexpected SMS response format: %v", response)
|
|
}
|
|
|
|
if ack != "success" {
|
|
return fmt.Errorf("SMS delivery failed: %v", response)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) SendAfroMessageSMSTemp(
|
|
ctx context.Context,
|
|
receiverPhone string,
|
|
message string,
|
|
callbackURL *string, // optional
|
|
) error {
|
|
|
|
baseURL := s.config.AFROSMSConfig.AfroSMSBaseURL
|
|
|
|
// Build query parameters explicitly
|
|
params := url.Values{}
|
|
params.Set("to", receiverPhone)
|
|
params.Set("message", message)
|
|
params.Set("sender", s.config.AFRO_SMS_SENDER_NAME)
|
|
|
|
// Optional parameters
|
|
if s.config.AFROSMSConfig.AfroSMSIdentifierID != "" {
|
|
params.Set("from", s.config.AFROSMSConfig.AfroSMSIdentifierID)
|
|
}
|
|
|
|
if callbackURL != nil {
|
|
params.Set("callback", *callbackURL)
|
|
}
|
|
|
|
// Construct full URL
|
|
reqURL := fmt.Sprintf("%s?%s", baseURL, params.Encode())
|
|
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqURL, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// AfroMessage authentication (API key)
|
|
req.Header.Set("Authorization", "Bearer "+s.config.AFRO_SMS_API_KEY)
|
|
req.Header.Set("Accept", "application/json")
|
|
|
|
client := &http.Client{
|
|
Timeout: 10 * time.Second,
|
|
}
|
|
|
|
resp, err := client.Do(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
body, err := io.ReadAll(resp.Body)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
return fmt.Errorf(
|
|
"afromessage sms failed: status=%d response=%s",
|
|
resp.StatusCode,
|
|
string(body),
|
|
)
|
|
}
|
|
|
|
// Parse response
|
|
var result map[string]interface{}
|
|
if err := json.Unmarshal(body, &result); err != nil {
|
|
return err
|
|
}
|
|
|
|
ack, ok := result["acknowledge"].(string)
|
|
if !ok || ack != "success" {
|
|
return fmt.Errorf("sms delivery failed: %v", result)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) addConnection(recipientID int64, c *websocket.Conn) error {
|
|
if c == nil {
|
|
s.mongoLogger.Warn("[NotificationSvc.AddConnection] Attempted to add nil WebSocket connection",
|
|
zap.Int64("recipientID", recipientID),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return fmt.Errorf("Invalid Websocket Connection")
|
|
}
|
|
|
|
s.connections.Store(recipientID, c)
|
|
s.mongoLogger.Info("[NotificationSvc.AddConnection] Added WebSocket connection",
|
|
zap.Int64("recipientID", recipientID),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) GetFilteredNotifications(ctx context.Context, filter domain.NotificationFilter) ([]domain.Notification, int64, error) {
|
|
return s.store.GetFilteredNotifications(ctx, filter)
|
|
}
|
|
|
|
// RecordNotification saves a notification to the database for history/audit without dispatching to the worker.
|
|
func (s *Service) RecordNotification(ctx context.Context, recipientID int64, notifType domain.NotificationType, channel domain.DeliveryChannel, level domain.NotificationLevel, headline, message string) {
|
|
notification := &domain.Notification{
|
|
ID: helpers.GenerateID(),
|
|
RecipientID: recipientID,
|
|
Type: notifType,
|
|
Level: level,
|
|
DeliveryChannel: channel,
|
|
DeliveryStatus: domain.DeliveryStatusSent,
|
|
Payload: domain.NotificationPayload{
|
|
Headline: headline,
|
|
Message: message,
|
|
},
|
|
Timestamp: time.Now(),
|
|
}
|
|
|
|
if _, err := s.store.CreateNotification(ctx, notification); err != nil {
|
|
s.mongoLogger.Error("[NotificationSvc.RecordNotification] Failed to record notification",
|
|
zap.Int64("recipientID", recipientID),
|
|
zap.String("channel", string(channel)),
|
|
zap.Error(err),
|
|
)
|
|
}
|
|
}
|
|
|
|
func (s *Service) SendNotification(ctx context.Context, notification *domain.Notification) error {
|
|
|
|
notification.ID = helpers.GenerateID()
|
|
notification.Timestamp = time.Now()
|
|
notification.DeliveryStatus = domain.DeliveryStatusPending
|
|
|
|
created, err := s.store.CreateNotification(ctx, notification)
|
|
if err != nil {
|
|
s.mongoLogger.Error("[NotificationSvc.SendNotification] Failed to create notification",
|
|
zap.String("id", notification.ID),
|
|
zap.Error(err),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return err
|
|
}
|
|
|
|
notification = created
|
|
|
|
if notification.DeliveryChannel == domain.DeliveryChannelInApp {
|
|
s.Hub.Broadcast <- map[string]interface{}{
|
|
"type": "CREATED_NOTIFICATION",
|
|
"recipient_id": notification.RecipientID,
|
|
"payload": notification,
|
|
}
|
|
}
|
|
|
|
select {
|
|
case s.notificationCh <- notification:
|
|
default:
|
|
s.mongoLogger.Warn("[NotificationSvc.SendNotification] Notification channel full, dropping notification",
|
|
zap.String("id", notification.ID),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// func (s *Service) MarkAsRead(ctx context.Context, notificationIDs []string, recipientID int64) error {
|
|
// for _, notificationID := range notificationIDs {
|
|
// _, err := s.store.UpdateNotificationStatus(ctx, notificationID, string(domain.DeliveryStatusSent), true, nil)
|
|
// if err != nil {
|
|
// s.mongoLogger.Error("[NotificationSvc.MarkAsRead] Failed to mark notification as read",
|
|
// zap.String("notificationID", notificationID),
|
|
// zap.Int64("recipientID", recipientID),
|
|
// zap.Error(err),
|
|
// zap.Time("timestamp", time.Now()),
|
|
// )
|
|
// return err
|
|
// }
|
|
|
|
// // count, err := s.store.CountUnreadNotifications(ctx, recipientID)
|
|
// // if err != nil {
|
|
// // s.logger.Error("[NotificationSvc.MarkAsRead] Failed to count unread notifications", "recipientID", recipientID, "error", err)
|
|
// // return err
|
|
// // }
|
|
|
|
// // s.Hub.Broadcast <- map[string]interface{}{
|
|
// // "type": "COUNT_NOT_OPENED_NOTIFICATION",
|
|
// // "recipient_id": recipientID,
|
|
// // "payload": map[string]int{
|
|
// // "not_opened_notifications_count": int(count),
|
|
// // },
|
|
// // }
|
|
|
|
// s.mongoLogger.Info("[NotificationSvc.MarkAsRead] Notification marked as read",
|
|
// zap.String("notificationID", notificationID),
|
|
// zap.Int64("recipientID", recipientID),
|
|
// zap.Time("timestamp", time.Now()),
|
|
// )
|
|
// }
|
|
|
|
// return nil
|
|
// }
|
|
|
|
func (s *Service) GetUserNotifications(ctx context.Context, recipientID int64, limit, offset int) ([]domain.Notification, int64, error) {
|
|
notifications, total, err := s.store.GetUserNotifications(ctx, recipientID, limit, offset)
|
|
if err != nil {
|
|
s.mongoLogger.Error("[NotificationSvc.GetUserNotifications] Failed to list notifications",
|
|
zap.Int64("recipientID", recipientID),
|
|
zap.Int("limit", limit),
|
|
zap.Int("offset", offset),
|
|
zap.Error(err),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return nil, 0, err
|
|
}
|
|
s.mongoLogger.Info("[NotificationSvc.GetUserNotifications] Successfully listed notifications",
|
|
zap.Int64("recipientID", recipientID),
|
|
zap.Int("count", len(notifications)),
|
|
zap.Int64("total", total),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return notifications, total, nil
|
|
}
|
|
|
|
func (s *Service) GetAllNotifications(ctx context.Context, limit, offset int) ([]domain.Notification, error) {
|
|
notifications, err := s.store.GetAllNotifications(ctx, limit, offset)
|
|
if err != nil {
|
|
s.mongoLogger.Error("[NotificationSvc.ListNotifications] Failed to get all notifications",
|
|
zap.Int("limit", limit),
|
|
zap.Int("offset", offset),
|
|
zap.Error(err),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return nil, err
|
|
}
|
|
s.mongoLogger.Info("[NotificationSvc.ListNotifications] Successfully retrieved all notifications",
|
|
zap.Int("count", len(notifications)),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return notifications, nil
|
|
}
|
|
|
|
func (s *Service) ConnectWebSocket(ctx context.Context, recipientID int64, c *websocket.Conn) error {
|
|
err := s.addConnection(recipientID, c)
|
|
|
|
if err != nil {
|
|
s.mongoLogger.Error("[NotificationSvc.ConnectWebSocket] Failed to create WebSocket connection",
|
|
zap.Int64("recipientID", recipientID),
|
|
zap.Error(err),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return err
|
|
}
|
|
s.mongoLogger.Info("[NotificationSvc.ConnectWebSocket] WebSocket connection established",
|
|
zap.Int64("recipientID", recipientID),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) DisconnectWebSocket(recipientID int64) {
|
|
if conn, loaded := s.connections.LoadAndDelete(recipientID); loaded {
|
|
conn.(*websocket.Conn).Close()
|
|
// s.logger.Info("[NotificationSvc.DisconnectWebSocket] Disconnected WebSocket", "recipientID", recipientID)
|
|
s.mongoLogger.Info("[NotificationSvc.DisconnectWebSocket] Disconnected WebSocket",
|
|
zap.Int64("recipientID", recipientID),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
}
|
|
}
|
|
|
|
// func (s *Service) SendSMS(ctx context.Context, recipientID int64, message string) error {
|
|
// s.logger.Info("[NotificationSvc.SendSMS] SMS notification requested", "recipientID", recipientID, "message", message)
|
|
|
|
// apiKey := s.config.AFRO_SMS_API_KEY
|
|
// senderName := s.config.AFRO_SMS_SENDER_NAME
|
|
// receiverPhone := s.config.AFRO_SMS_RECEIVER_PHONE_NUMBER
|
|
// hostURL := s.config.ADRO_SMS_HOST_URL
|
|
// endpoint := "/api/send"
|
|
|
|
// request := afro.GetRequest(apiKey, endpoint, hostURL)
|
|
// request.Method = "GET"
|
|
// request.Sender(senderName)
|
|
// request.To(receiverPhone, message)
|
|
|
|
// response, err := afro.MakeRequestWithContext(ctx, request)
|
|
// if err != nil {
|
|
// s.logger.Error("[NotificationSvc.SendSMS] Failed to send SMS", "recipientID", recipientID, "error", err)
|
|
// return err
|
|
// }
|
|
|
|
// if response["acknowledge"] == "success" {
|
|
// s.logger.Info("[NotificationSvc.SendSMS] SMS sent successfully", "recipientID", recipientID)
|
|
// } else {
|
|
// s.logger.Error("[NotificationSvc.SendSMS] Failed to send SMS", "recipientID", recipientID, "response", response["response"])
|
|
// return errors.New("SMS delivery failed: " + response["response"].(string))
|
|
// }
|
|
|
|
// return nil
|
|
// }
|
|
|
|
// func (s *Service) SendEmail(ctx context.Context, recipientID int64, subject, message string) error {
|
|
// s.logger.Info("[NotificationSvc.SendEmail] Email notification requested", "recipientID", recipientID, "subject", subject)
|
|
// return nil
|
|
// }
|
|
|
|
func (s *Service) startWorker() {
|
|
for {
|
|
select {
|
|
case notification := <-s.notificationCh:
|
|
s.handleNotification(notification)
|
|
case <-s.stopCh:
|
|
s.mongoLogger.Info("[NotificationSvc.StartWorker] Worker stopped",
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// func (s *Service) ListRecipientIDs(ctx context.Context, receiver domain.NotificationRecieverSide) ([]int64, error) {
|
|
// return s.store.ListRecipientIDs(ctx, receiver)
|
|
// }
|
|
|
|
func (s *Service) handleNotification(notification *domain.Notification) {
|
|
ctx := context.Background()
|
|
|
|
switch notification.DeliveryChannel {
|
|
case domain.DeliveryChannelSMS:
|
|
err := s.SendNotificationSMS(ctx, notification.RecipientID, notification.Payload.Message)
|
|
if err != nil {
|
|
notification.DeliveryStatus = domain.DeliveryStatusFailed
|
|
} else {
|
|
notification.DeliveryStatus = domain.DeliveryStatusSent
|
|
}
|
|
|
|
case domain.DeliveryChannelEmail:
|
|
err := s.SendNotificationEmail(ctx, notification.RecipientID, notification.Payload.Message, notification.Payload.Headline)
|
|
if err != nil {
|
|
notification.DeliveryStatus = domain.DeliveryStatusFailed
|
|
} else {
|
|
notification.DeliveryStatus = domain.DeliveryStatusSent
|
|
}
|
|
|
|
case domain.DeliveryChannelPush:
|
|
err := s.SendPushNotification(ctx, notification)
|
|
if err != nil {
|
|
notification.DeliveryStatus = domain.DeliveryStatusFailed
|
|
} else {
|
|
notification.DeliveryStatus = domain.DeliveryStatusSent
|
|
}
|
|
|
|
case domain.DeliveryChannelInApp:
|
|
notification.DeliveryStatus = domain.DeliveryStatusSent
|
|
|
|
default:
|
|
s.mongoLogger.Warn("[NotificationSvc.HandleNotification] Unsupported delivery channel",
|
|
zap.String("channel", string(notification.DeliveryChannel)),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
notification.DeliveryStatus = domain.DeliveryStatusFailed
|
|
}
|
|
|
|
// if _, err := s.store.UpdateNotificationStatus(ctx, notification.ID, string(notification.DeliveryStatus), notification.IsRead, notification.Metadata); err != nil {
|
|
// s.mongoLogger.Error("[NotificationSvc.HandleNotification] Failed to update notification status",
|
|
// zap.String("id", notification.ID),
|
|
// zap.Error(err),
|
|
// zap.Time("timestamp", time.Now()),
|
|
// )
|
|
// }
|
|
}
|
|
|
|
func (s *Service) SendNotificationSMS(ctx context.Context, recipientID int64, message string) error {
|
|
// Get User Phone Number
|
|
user, err := s.userSvc.GetUserByID(ctx, recipientID)
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if !user.PhoneVerified {
|
|
return fmt.Errorf("cannot send notification to unverified phone number")
|
|
}
|
|
|
|
if user.PhoneNumber == "" {
|
|
return fmt.Errorf("phone Number is invalid")
|
|
}
|
|
err = s.messengerSvc.SendSMS(ctx, user.PhoneNumber, message)
|
|
if err != nil {
|
|
s.mongoLogger.Error("[NotificationSvc.HandleNotification] Failed to send notification SMS",
|
|
zap.Int64("recipient_id", recipientID),
|
|
zap.String("user_phone_number", user.PhoneNumber),
|
|
zap.String("message", message),
|
|
zap.Error(err),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) SendNotificationEmail(ctx context.Context, recipientID int64, message string, subject string) error {
|
|
// Get User Phone Number
|
|
user, err := s.userSvc.GetUserByID(ctx, recipientID)
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if !user.EmailVerified {
|
|
return fmt.Errorf("cannot send notification to unverified email")
|
|
}
|
|
|
|
if user.Email == "" {
|
|
return fmt.Errorf("email is invalid")
|
|
}
|
|
err = s.messengerSvc.SendEmail(ctx, user.Email, message, message, subject)
|
|
if err != nil {
|
|
s.mongoLogger.Error("[NotificationSvc.HandleNotification] Failed to send notification SMS",
|
|
zap.Int64("recipient_id", recipientID),
|
|
zap.String("user_email", user.Email),
|
|
zap.String("message", message),
|
|
zap.Error(err),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) SendPushNotification(ctx context.Context, notification *domain.Notification) error {
|
|
if s.fcmClient == nil {
|
|
return fmt.Errorf("FCM client not initialized")
|
|
}
|
|
|
|
// Get user device tokens
|
|
tokens, err := s.userSvc.GetUserDeviceTokens(ctx, notification.RecipientID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get user device tokens: %w", err)
|
|
}
|
|
|
|
if len(tokens) == 0 {
|
|
s.mongoLogger.Info("[NotificationSvc.SendPushNotification] No device tokens found for user",
|
|
zap.Int64("userID", notification.RecipientID),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return nil // Not an error, just no devices to send to
|
|
}
|
|
|
|
// Create FCM message
|
|
message := &messaging.Message{
|
|
Notification: &messaging.Notification{
|
|
Title: notification.Payload.Headline,
|
|
Body: notification.Payload.Message,
|
|
ImageURL: notification.Image,
|
|
},
|
|
Data: map[string]string{
|
|
"type": string(notification.Type),
|
|
"recipient_id": strconv.FormatInt(notification.RecipientID, 10),
|
|
"notification_id": notification.ID,
|
|
},
|
|
}
|
|
|
|
// Send to all user devices
|
|
for _, token := range tokens {
|
|
message.Token = token
|
|
|
|
response, err := s.fcmClient.Send(ctx, message)
|
|
if err != nil {
|
|
s.mongoLogger.Error("[NotificationSvc.SendPushNotification] Failed to send FCM message",
|
|
zap.String("token", token),
|
|
zap.Int64("userID", notification.RecipientID),
|
|
zap.Error(err),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
|
|
// Check if token is invalid/unregistered and deactivate it
|
|
if messaging.IsUnregistered(err) || messaging.IsInvalidArgument(err) {
|
|
if deactivateErr := s.userSvc.DeactivateDevice(ctx, notification.RecipientID, token); deactivateErr != nil {
|
|
s.mongoLogger.Warn("[NotificationSvc.SendPushNotification] Failed to deactivate invalid token",
|
|
zap.String("token", token),
|
|
zap.Int64("userID", notification.RecipientID),
|
|
zap.Error(deactivateErr),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
} else {
|
|
s.mongoLogger.Info("[NotificationSvc.SendPushNotification] Deactivated invalid FCM token",
|
|
zap.String("token", token),
|
|
zap.Int64("userID", notification.RecipientID),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
}
|
|
}
|
|
continue // Continue with other tokens
|
|
}
|
|
|
|
s.mongoLogger.Info("[NotificationSvc.SendPushNotification] FCM message sent successfully",
|
|
zap.String("response", response),
|
|
zap.String("token", token),
|
|
zap.Int64("userID", notification.RecipientID),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// SendBulkPushNotification sends a push notification to multiple users using FCM multicast.
|
|
// It collects all device tokens for the given user IDs and sends in batches of 500 (FCM limit).
|
|
func (s *Service) MessengerSvc() *messenger.Service {
|
|
return s.messengerSvc
|
|
}
|
|
|
|
func (s *Service) SendBulkPushNotification(ctx context.Context, userIDs []int64, notification *domain.Notification) (sent int, failed int, err error) {
|
|
if s.fcmClient == nil {
|
|
return 0, 0, fmt.Errorf("FCM client not initialized")
|
|
}
|
|
|
|
// Collect all device tokens for the given users
|
|
var allTokens []string
|
|
tokenUserMap := make(map[string]int64) // token -> userID for cleanup
|
|
for _, uid := range userIDs {
|
|
tokens, err := s.userSvc.GetUserDeviceTokens(ctx, uid)
|
|
if err != nil {
|
|
s.mongoLogger.Warn("[NotificationSvc.SendBulkPushNotification] Failed to get tokens for user",
|
|
zap.Int64("userID", uid),
|
|
zap.Error(err),
|
|
)
|
|
continue
|
|
}
|
|
for _, t := range tokens {
|
|
tokenUserMap[t] = uid
|
|
}
|
|
allTokens = append(allTokens, tokens...)
|
|
}
|
|
|
|
if len(allTokens) == 0 {
|
|
s.mongoLogger.Info("[NotificationSvc.SendBulkPushNotification] No device tokens found for any user",
|
|
zap.Int("userCount", len(userIDs)),
|
|
)
|
|
return 0, 0, nil
|
|
}
|
|
|
|
fcmNotification := &messaging.Notification{
|
|
Title: notification.Payload.Headline,
|
|
Body: notification.Payload.Message,
|
|
ImageURL: notification.Image,
|
|
}
|
|
data := map[string]string{
|
|
"type": string(notification.Type),
|
|
"notification_id": notification.ID,
|
|
}
|
|
|
|
// FCM multicast supports max 500 tokens per batch
|
|
const batchSize = 500
|
|
for i := 0; i < len(allTokens); i += batchSize {
|
|
end := i + batchSize
|
|
if end > len(allTokens) {
|
|
end = len(allTokens)
|
|
}
|
|
batch := allTokens[i:end]
|
|
|
|
msg := &messaging.MulticastMessage{
|
|
Notification: fcmNotification,
|
|
Data: data,
|
|
Tokens: batch,
|
|
}
|
|
|
|
resp, err := s.fcmClient.SendEachForMulticast(ctx, msg)
|
|
if err != nil {
|
|
s.mongoLogger.Error("[NotificationSvc.SendBulkPushNotification] Multicast send failed",
|
|
zap.Error(err),
|
|
zap.Int("batchSize", len(batch)),
|
|
)
|
|
failed += len(batch)
|
|
continue
|
|
}
|
|
|
|
sent += resp.SuccessCount
|
|
failed += resp.FailureCount
|
|
|
|
// Deactivate invalid tokens
|
|
for j, sendResp := range resp.Responses {
|
|
if sendResp.Error != nil && (messaging.IsUnregistered(sendResp.Error) || messaging.IsInvalidArgument(sendResp.Error)) {
|
|
token := batch[j]
|
|
if uid, ok := tokenUserMap[token]; ok {
|
|
_ = s.userSvc.DeactivateDevice(ctx, uid, token)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
s.mongoLogger.Info("[NotificationSvc.SendBulkPushNotification] Bulk push completed",
|
|
zap.Int("totalTokens", len(allTokens)),
|
|
zap.Int("sent", sent),
|
|
zap.Int("failed", failed),
|
|
)
|
|
|
|
return sent, failed, nil
|
|
}
|
|
|
|
// SendBulkSMS sends an SMS to multiple phone numbers using AfroMessage.
|
|
// It sends sequentially and returns the count of successful and failed deliveries.
|
|
func (s *Service) SendBulkSMS(ctx context.Context, recipients []string, message string) (sent int, failed int) {
|
|
for _, phone := range recipients {
|
|
if err := s.SendAfroMessageSMS(ctx, phone, message); err != nil {
|
|
s.mongoLogger.Error("[NotificationSvc.SendBulkSMS] Failed to send SMS",
|
|
zap.String("phone", phone),
|
|
zap.Error(err),
|
|
)
|
|
failed++
|
|
continue
|
|
}
|
|
sent++
|
|
}
|
|
|
|
s.mongoLogger.Info("[NotificationSvc.SendBulkSMS] Bulk SMS completed",
|
|
zap.Int("totalRecipients", len(recipients)),
|
|
zap.Int("sent", sent),
|
|
zap.Int("failed", failed),
|
|
)
|
|
|
|
return sent, failed
|
|
}
|
|
|
|
// SendBulkEmail sends an email to multiple recipients using the messenger service.
|
|
// It sends sequentially and returns the count of successful and failed deliveries.
|
|
func (s *Service) SendBulkEmail(ctx context.Context, recipients []string, subject, message, messageHTML string, attachments []*resend.Attachment) (sent int, failed int) {
|
|
for _, email := range recipients {
|
|
if err := s.messengerSvc.SendEmailWithAttachments(ctx, email, message, messageHTML, subject, attachments); err != nil {
|
|
s.mongoLogger.Error("[NotificationSvc.SendBulkEmail] Failed to send email",
|
|
zap.String("email", email),
|
|
zap.Error(err),
|
|
)
|
|
failed++
|
|
continue
|
|
}
|
|
sent++
|
|
}
|
|
|
|
s.mongoLogger.Info("[NotificationSvc.SendBulkEmail] Bulk email completed",
|
|
zap.Int("totalRecipients", len(recipients)),
|
|
zap.Int("sent", sent),
|
|
zap.Int("failed", failed),
|
|
)
|
|
|
|
return sent, failed
|
|
}
|
|
|
|
// func (s *Service) startRetryWorker() {
|
|
// ticker := time.NewTicker(1 * time.Minute)
|
|
// defer ticker.Stop()
|
|
|
|
// for {
|
|
// select {
|
|
// case <-ticker.C:
|
|
// s.retryFailedNotifications()
|
|
// case <-s.stopCh:
|
|
// s.mongoLogger.Info("[NotificationSvc.StartRetryWorker] Retry worker stopped",
|
|
// zap.Time("timestamp", time.Now()),
|
|
// )
|
|
// return
|
|
// }
|
|
// }
|
|
// }
|
|
|
|
// func (s *Service) retryFailedNotifications() {
|
|
// ctx := context.Background()
|
|
// failedNotifications, err := s.store.ListFailedNotifications(ctx, 100)
|
|
// if err != nil {
|
|
// s.mongoLogger.Error("[NotificationSvc.RetryFailedNotifications] Failed to list failed notifications",
|
|
// zap.Error(err),
|
|
// zap.Time("timestamp", time.Now()),
|
|
// )
|
|
// return
|
|
// }
|
|
|
|
// for _, n := range failedNotifications {
|
|
// notification := &n
|
|
// go func(notification *domain.Notification) {
|
|
// for attempt := 0; attempt < 3; attempt++ {
|
|
// time.Sleep(time.Duration(attempt) * time.Second)
|
|
// switch notification.DeliveryChannel {
|
|
// case domain.DeliveryChannelSMS:
|
|
// if err := s.SendNotificationSMS(ctx, notification.RecipientID, notification.Payload.Message); err == nil {
|
|
// notification.DeliveryStatus = domain.DeliveryStatusSent
|
|
// if _, err := s.store.UpdateNotificationStatus(ctx, notification.ID, string(notification.DeliveryStatus), notification.IsRead, notification.Metadata); err != nil {
|
|
// s.mongoLogger.Error("[NotificationSvc.RetryFailedNotifications] Failed to update after retry",
|
|
// zap.String("id", notification.ID),
|
|
// zap.Error(err),
|
|
// zap.Time("timestamp", time.Now()),
|
|
// )
|
|
// } else {
|
|
// s.mongoLogger.Info("[NotificationSvc.RetryFailedNotifications] Successfully retried notification",
|
|
// zap.String("id", notification.ID),
|
|
// zap.Time("timestamp", time.Now()),
|
|
// )
|
|
// }
|
|
|
|
// return
|
|
// }
|
|
// case domain.DeliveryChannelEmail:
|
|
// if err := s.SendNotificationEmail(ctx, notification.RecipientID, notification.Payload.Message, notification.Payload.Headline); err == nil {
|
|
// notification.DeliveryStatus = domain.DeliveryStatusSent
|
|
// if _, err := s.store.UpdateNotificationStatus(ctx, notification.ID, string(notification.DeliveryStatus), notification.IsRead, notification.Metadata); err != nil {
|
|
// s.mongoLogger.Error("[NotificationSvc.RetryFailedNotifications] Failed to update after retry",
|
|
// zap.String("id", notification.ID),
|
|
// zap.Error(err),
|
|
// zap.Time("timestamp", time.Now()),
|
|
// )
|
|
// } else {
|
|
// s.mongoLogger.Info("[NotificationSvc.RetryFailedNotifications] Successfully retried notification",
|
|
// zap.String("id", notification.ID),
|
|
// zap.Time("timestamp", time.Now()),
|
|
// )
|
|
// }
|
|
|
|
// return
|
|
// }
|
|
// }
|
|
// }
|
|
// s.mongoLogger.Error("[NotificationSvc.RetryFailedNotifications] Max retries reached for notification",
|
|
// zap.String("id", notification.ID),
|
|
// zap.Error(err),
|
|
// zap.Time("timestamp", time.Now()),
|
|
// )
|
|
// }(notification)
|
|
// }
|
|
// }
|
|
|
|
func (s *Service) CountUnreadNotifications(ctx context.Context, recipient_id int64) (int64, error) {
|
|
return s.store.CountUnreadNotifications(ctx, recipient_id)
|
|
}
|
|
|
|
func (s *Service) MarkNotificationAsRead(ctx context.Context, id int64) (*domain.Notification, error) {
|
|
notification, err := s.store.MarkNotificationAsRead(ctx, id)
|
|
if err != nil {
|
|
s.mongoLogger.Error("[NotificationSvc.MarkNotificationAsRead] Failed to mark notification as read",
|
|
zap.Int64("notificationID", id),
|
|
zap.Error(err),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return nil, err
|
|
}
|
|
return notification, nil
|
|
}
|
|
|
|
func (s *Service) MarkAllUserNotificationsAsRead(ctx context.Context, userID int64) error {
|
|
err := s.store.MarkAllUserNotificationsAsRead(ctx, userID)
|
|
if err != nil {
|
|
s.mongoLogger.Error("[NotificationSvc.MarkAllUserNotificationsAsRead] Failed to mark all notifications as read",
|
|
zap.Int64("userID", userID),
|
|
zap.Error(err),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) MarkNotificationAsUnread(ctx context.Context, id int64) (*domain.Notification, error) {
|
|
notification, err := s.store.MarkNotificationAsUnread(ctx, id)
|
|
if err != nil {
|
|
s.mongoLogger.Error("[NotificationSvc.MarkNotificationAsUnread] Failed to mark notification as unread",
|
|
zap.Int64("notificationID", id),
|
|
zap.Error(err),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return nil, err
|
|
}
|
|
return notification, nil
|
|
}
|
|
|
|
func (s *Service) MarkAllUserNotificationsAsUnread(ctx context.Context, userID int64) error {
|
|
err := s.store.MarkAllUserNotificationsAsUnread(ctx, userID)
|
|
if err != nil {
|
|
s.mongoLogger.Error("[NotificationSvc.MarkAllUserNotificationsAsUnread] Failed to mark all notifications as unread",
|
|
zap.Int64("userID", userID),
|
|
zap.Error(err),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) DeleteUserNotifications(ctx context.Context, userID int64) error {
|
|
err := s.store.DeleteUserNotifications(ctx, userID)
|
|
if err != nil {
|
|
s.mongoLogger.Error("[NotificationSvc.DeleteUserNotifications] Failed to delete user notifications",
|
|
zap.Int64("userID", userID),
|
|
zap.Error(err),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Scheduled Notification Methods
|
|
|
|
func (s *Service) CreateScheduledNotification(ctx context.Context, sn *domain.ScheduledNotification) (*domain.ScheduledNotification, error) {
|
|
created, err := s.store.CreateScheduledNotification(ctx, sn)
|
|
if err != nil {
|
|
s.mongoLogger.Error("[NotificationSvc.CreateScheduledNotification] Failed to create",
|
|
zap.String("channel", string(sn.Channel)),
|
|
zap.Error(err),
|
|
)
|
|
return nil, err
|
|
}
|
|
|
|
s.mongoLogger.Info("[NotificationSvc.CreateScheduledNotification] Created",
|
|
zap.Int64("id", created.ID),
|
|
zap.String("channel", string(created.Channel)),
|
|
zap.Time("scheduledAt", created.ScheduledAt),
|
|
)
|
|
|
|
return created, nil
|
|
}
|
|
|
|
func (s *Service) GetScheduledNotification(ctx context.Context, id int64) (*domain.ScheduledNotification, error) {
|
|
return s.store.GetScheduledNotification(ctx, id)
|
|
}
|
|
|
|
func (s *Service) ListScheduledNotifications(ctx context.Context, filter domain.ScheduledNotificationFilter) ([]domain.ScheduledNotification, int64, error) {
|
|
return s.store.ListScheduledNotifications(ctx, filter)
|
|
}
|
|
|
|
func (s *Service) CancelScheduledNotification(ctx context.Context, id int64) (*domain.ScheduledNotification, error) {
|
|
cancelled, err := s.store.CancelScheduledNotification(ctx, id)
|
|
if err != nil {
|
|
s.mongoLogger.Error("[NotificationSvc.CancelScheduledNotification] Failed to cancel",
|
|
zap.Int64("id", id),
|
|
zap.Error(err),
|
|
)
|
|
return nil, err
|
|
}
|
|
|
|
s.mongoLogger.Info("[NotificationSvc.CancelScheduledNotification] Cancelled",
|
|
zap.Int64("id", cancelled.ID),
|
|
)
|
|
|
|
return cancelled, nil
|
|
}
|
|
|
|
func (s *Service) startSchedulerWorker() {
|
|
ticker := time.NewTicker(30 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
s.processDueScheduledNotifications()
|
|
case <-s.stopCh:
|
|
s.mongoLogger.Info("[NotificationSvc.SchedulerWorker] Stopped")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Service) processDueScheduledNotifications() {
|
|
ctx := context.Background()
|
|
|
|
claimed, err := s.store.ClaimDueScheduledNotifications(ctx, 20)
|
|
if err != nil {
|
|
s.mongoLogger.Error("[NotificationSvc.SchedulerWorker] Failed to claim due notifications",
|
|
zap.Error(err),
|
|
)
|
|
return
|
|
}
|
|
|
|
for i := range claimed {
|
|
sn := &claimed[i]
|
|
go s.dispatchScheduledNotification(ctx, sn)
|
|
}
|
|
}
|
|
|
|
func (s *Service) dispatchScheduledNotification(ctx context.Context, sn *domain.ScheduledNotification) {
|
|
var dispatchErr error
|
|
|
|
switch sn.Channel {
|
|
case domain.DeliveryChannelSMS:
|
|
dispatchErr = s.dispatchScheduledSMS(ctx, sn)
|
|
case domain.DeliveryChannelEmail:
|
|
dispatchErr = s.dispatchScheduledEmail(ctx, sn)
|
|
case domain.DeliveryChannelPush:
|
|
dispatchErr = s.dispatchScheduledPush(ctx, sn)
|
|
default:
|
|
dispatchErr = fmt.Errorf("unsupported channel: %s", sn.Channel)
|
|
}
|
|
|
|
if dispatchErr != nil {
|
|
s.mongoLogger.Error("[NotificationSvc.SchedulerWorker] Failed to dispatch",
|
|
zap.Int64("id", sn.ID),
|
|
zap.String("channel", string(sn.Channel)),
|
|
zap.Error(dispatchErr),
|
|
)
|
|
_ = s.store.MarkScheduledNotificationFailed(ctx, sn.ID, dispatchErr.Error())
|
|
return
|
|
}
|
|
|
|
_ = s.store.MarkScheduledNotificationSent(ctx, sn.ID)
|
|
s.mongoLogger.Info("[NotificationSvc.SchedulerWorker] Dispatched",
|
|
zap.Int64("id", sn.ID),
|
|
zap.String("channel", string(sn.Channel)),
|
|
)
|
|
}
|
|
|
|
func (s *Service) resolvePhoneNumbers(ctx context.Context, sn *domain.ScheduledNotification) []string {
|
|
phoneSet := make(map[string]struct{})
|
|
|
|
userIDs := sn.TargetUserIDs
|
|
if len(userIDs) == 0 && sn.TargetRole != "" {
|
|
users, _, err := s.userSvc.GetAllUsers(ctx, domain.UserFilter{Role: sn.TargetRole})
|
|
if err == nil {
|
|
for _, u := range users {
|
|
userIDs = append(userIDs, u.ID)
|
|
}
|
|
}
|
|
}
|
|
|
|
for _, uid := range userIDs {
|
|
user, err := s.userSvc.GetUserByID(ctx, uid)
|
|
if err == nil && user.PhoneNumber != "" {
|
|
phoneSet[user.PhoneNumber] = struct{}{}
|
|
}
|
|
}
|
|
|
|
if len(sn.TargetRaw) > 0 {
|
|
var raw domain.ScheduledNotificationTargetRaw
|
|
if err := json.Unmarshal(sn.TargetRaw, &raw); err == nil {
|
|
for _, p := range raw.Phones {
|
|
phoneSet[p] = struct{}{}
|
|
}
|
|
}
|
|
}
|
|
|
|
phones := make([]string, 0, len(phoneSet))
|
|
for p := range phoneSet {
|
|
phones = append(phones, p)
|
|
}
|
|
return phones
|
|
}
|
|
|
|
func (s *Service) resolveEmails(ctx context.Context, sn *domain.ScheduledNotification) []string {
|
|
emailSet := make(map[string]struct{})
|
|
|
|
userIDs := sn.TargetUserIDs
|
|
if len(userIDs) == 0 && sn.TargetRole != "" {
|
|
users, _, err := s.userSvc.GetAllUsers(ctx, domain.UserFilter{Role: sn.TargetRole})
|
|
if err == nil {
|
|
for _, u := range users {
|
|
userIDs = append(userIDs, u.ID)
|
|
}
|
|
}
|
|
}
|
|
|
|
for _, uid := range userIDs {
|
|
user, err := s.userSvc.GetUserByID(ctx, uid)
|
|
if err == nil && user.Email != "" {
|
|
emailSet[user.Email] = struct{}{}
|
|
}
|
|
}
|
|
|
|
if len(sn.TargetRaw) > 0 {
|
|
var raw domain.ScheduledNotificationTargetRaw
|
|
if err := json.Unmarshal(sn.TargetRaw, &raw); err == nil {
|
|
for _, e := range raw.Emails {
|
|
emailSet[e] = struct{}{}
|
|
}
|
|
}
|
|
}
|
|
|
|
emails := make([]string, 0, len(emailSet))
|
|
for e := range emailSet {
|
|
emails = append(emails, e)
|
|
}
|
|
return emails
|
|
}
|
|
|
|
func (s *Service) resolveUserIDs(ctx context.Context, sn *domain.ScheduledNotification) []int64 {
|
|
if len(sn.TargetUserIDs) > 0 {
|
|
return sn.TargetUserIDs
|
|
}
|
|
if sn.TargetRole != "" {
|
|
users, _, err := s.userSvc.GetAllUsers(ctx, domain.UserFilter{Role: sn.TargetRole})
|
|
if err == nil {
|
|
ids := make([]int64, 0, len(users))
|
|
for _, u := range users {
|
|
ids = append(ids, u.ID)
|
|
}
|
|
return ids
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) dispatchScheduledSMS(ctx context.Context, sn *domain.ScheduledNotification) error {
|
|
phones := s.resolvePhoneNumbers(ctx, sn)
|
|
if len(phones) == 0 {
|
|
return fmt.Errorf("no SMS recipients resolved")
|
|
}
|
|
|
|
sent, failed := s.SendBulkSMS(ctx, phones, sn.Message)
|
|
if sent == 0 && failed > 0 {
|
|
return fmt.Errorf("all %d SMS deliveries failed", failed)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) dispatchScheduledEmail(ctx context.Context, sn *domain.ScheduledNotification) error {
|
|
emails := s.resolveEmails(ctx, sn)
|
|
if len(emails) == 0 {
|
|
return fmt.Errorf("no email recipients resolved")
|
|
}
|
|
|
|
sent, failed := s.SendBulkEmail(ctx, emails, sn.Title, sn.Message, sn.HTML, nil)
|
|
if sent == 0 && failed > 0 {
|
|
return fmt.Errorf("all %d email deliveries failed", failed)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) dispatchScheduledPush(ctx context.Context, sn *domain.ScheduledNotification) error {
|
|
userIDs := s.resolveUserIDs(ctx, sn)
|
|
if len(userIDs) == 0 {
|
|
return fmt.Errorf("no push recipients resolved")
|
|
}
|
|
|
|
notification := &domain.Notification{
|
|
Type: domain.NOTIFICATION_TYPE_SYSTEM_ALERT,
|
|
DeliveryChannel: domain.DeliveryChannelPush,
|
|
Payload: domain.NotificationPayload{
|
|
Headline: sn.Title,
|
|
Message: sn.Message,
|
|
},
|
|
}
|
|
|
|
sent, failed, err := s.SendBulkPushNotification(ctx, userIDs, notification)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if sent == 0 && failed > 0 {
|
|
return fmt.Errorf("all %d push deliveries failed", failed)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// func (s *Service) DeleteOldNotifications(ctx context.Context) error {
|
|
// return s.store.DeleteOldNotifications(ctx)
|
|
// }
|
|
|
|
// func (s *Service) GetNotificationCounts(ctx context.Context, filter domain.ReportFilter) (total, read, unread int64, err error){
|
|
// return s.store.Get(ctx, filter)
|
|
// }
|
|
|
|
// func (s *Service) RunRedisSubscriber(ctx context.Context) {
|
|
// pubsub := s.redisClient.Subscribe(ctx, "live_metrics")
|
|
// defer pubsub.Close()
|
|
|
|
// ch := pubsub.Channel()
|
|
// for msg := range ch {
|
|
// var parsed map[string]interface{}
|
|
// if err := json.Unmarshal([]byte(msg.Payload), &parsed); err != nil {
|
|
// // s.logger.Error("invalid Redis message format", "payload", msg.Payload, "error", err)
|
|
// s.mongoLogger.Error("invalid Redis message format",
|
|
// zap.String("payload", msg.Payload),
|
|
// zap.Error(err),
|
|
// zap.Time("timestamp", time.Now()),
|
|
// )
|
|
// continue
|
|
// }
|
|
|
|
// eventType, _ := parsed["type"].(string)
|
|
// payload := parsed["payload"]
|
|
// recipientID, hasRecipient := parsed["recipient_id"]
|
|
// recipientType, _ := parsed["recipient_type"].(string)
|
|
|
|
// message := map[string]interface{}{
|
|
// "type": eventType,
|
|
// "payload": payload,
|
|
// }
|
|
|
|
// if hasRecipient {
|
|
// message["recipient_id"] = recipientID
|
|
// message["recipient_type"] = recipientType
|
|
// }
|
|
|
|
// s.Hub.Broadcast <- message
|
|
// }
|
|
// }
|
|
|
|
// func (s *Service) UpdateLiveWalletMetrics(ctx context.Context, companies []domain.GetCompany, branches []domain.BranchWallet) error {
|
|
// const key = "live_metrics"
|
|
|
|
// companyBalances := make([]domain.CompanyWalletBalance, 0, len(companies))
|
|
// for _, c := range companies {
|
|
// companyBalances = append(companyBalances, domain.CompanyWalletBalance{
|
|
// CompanyID: c.ID,
|
|
// CompanyName: c.Name,
|
|
// Balance: float64(c.WalletBalance.Float32()),
|
|
// })
|
|
// }
|
|
|
|
// branchBalances := make([]domain.BranchWalletBalance, 0, len(branches))
|
|
// for _, b := range branches {
|
|
// branchBalances = append(branchBalances, domain.BranchWalletBalance{
|
|
// BranchID: b.ID,
|
|
// BranchName: b.Name,
|
|
// CompanyID: b.CompanyID,
|
|
// Balance: float64(b.Balance.Float32()),
|
|
// })
|
|
// }
|
|
|
|
// payload := domain.LiveWalletMetrics{
|
|
// Timestamp: time.Now(),
|
|
// CompanyBalances: companyBalances,
|
|
// BranchBalances: branchBalances,
|
|
// }
|
|
|
|
// updatedData, err := json.Marshal(payload)
|
|
// if err != nil {
|
|
// return err
|
|
// }
|
|
|
|
// if err := s.redisClient.Set(ctx, key, updatedData, 0).Err(); err != nil {
|
|
// return err
|
|
// }
|
|
|
|
// if err := s.redisClient.Publish(ctx, key, updatedData).Err(); err != nil {
|
|
// return err
|
|
// }
|
|
// return nil
|
|
// }
|
|
|
|
// func (s *Service) GetLiveMetrics(ctx context.Context) (domain.LiveMetric, error) {
|
|
// const key = "live_metrics"
|
|
// var metric domain.LiveMetric
|
|
|
|
// val, err := s.redisClient.Get(ctx, key).Result()
|
|
// if err == redis.Nil {
|
|
// // Key does not exist yet, return zero-valued struct
|
|
// return domain.LiveMetric{}, nil
|
|
// } else if err != nil {
|
|
// return domain.LiveMetric{}, err
|
|
// }
|
|
|
|
// if err := json.Unmarshal([]byte(val), &metric); err != nil {
|
|
// return domain.LiveMetric{}, err
|
|
// }
|
|
|
|
// return metric, nil
|
|
// }
|
|
// func (s *Service) StartKafkaConsumer(ctx context.Context) {
|
|
// go func() {
|
|
// for {
|
|
// m, err := s.reader.ReadMessage(ctx)
|
|
// if err != nil {
|
|
// if err == context.Canceled {
|
|
// s.mongoLogger.Info("[NotificationSvc.KafkaConsumer] Stopped by context")
|
|
// return
|
|
// }
|
|
// s.mongoLogger.Error("[NotificationSvc.KafkaConsumer] Error reading message",
|
|
// zap.Error(err),
|
|
// zap.Time("timestamp", time.Now()),
|
|
// )
|
|
// time.Sleep(1 * time.Second) // backoff
|
|
// continue
|
|
// }
|
|
|
|
// var walletEvent event.WalletEvent
|
|
// if err := json.Unmarshal(m.Value, &walletEvent); err != nil {
|
|
// s.mongoLogger.Error("[NotificationSvc.KafkaConsumer] Failed to unmarshal wallet event",
|
|
// zap.String("message", string(m.Value)),
|
|
// zap.Error(err),
|
|
// zap.Time("timestamp", time.Now()),
|
|
// )
|
|
// continue
|
|
// }
|
|
|
|
// raw, _ := json.Marshal(map[string]any{
|
|
// "balance": walletEvent.Balance.Float32(),
|
|
// "type": walletEvent.WalletType,
|
|
// "timestamp": time.Now(),
|
|
// })
|
|
|
|
// headline := ""
|
|
// message := ""
|
|
// var receiver domain.NotificationRecieverSide
|
|
// switch walletEvent.WalletType {
|
|
|
|
// case domain.StaticWalletType:
|
|
// headline = "Referral and Bonus Wallet Updated"
|
|
// message = fmt.Sprintf("Your referral and bonus wallet balance is now %.2f", walletEvent.Balance.Float32())
|
|
// receiver = domain.NotificationRecieverSideCustomer
|
|
// case domain.RegularWalletType:
|
|
// headline = "Wallet Updated"
|
|
// message = fmt.Sprintf("Your wallet balance is now %.2f", walletEvent.Balance.Float32())
|
|
// receiver = domain.NotificationRecieverSideCustomer
|
|
// case domain.BranchWalletType:
|
|
// headline = "Branch Wallet Updated"
|
|
// message = fmt.Sprintf("branch wallet balance is now %.2f", walletEvent.Balance.Float32())
|
|
// receiver = domain.NotificationRecieverSideBranchManager
|
|
// case domain.CompanyWalletType:
|
|
// headline = "Company Wallet Updated"
|
|
// message = fmt.Sprintf("company wallet balance is now %.2f", walletEvent.Balance.Float32())
|
|
// receiver = domain.NotificationRecieverSideAdmin
|
|
// }
|
|
// // Handle the wallet event: send notification
|
|
// notification := &domain.Notification{
|
|
// RecipientID: walletEvent.UserID,
|
|
// DeliveryChannel: domain.DeliveryChannelInApp,
|
|
// Reciever: receiver,
|
|
// Type: domain.NotificationTypeWalletUpdated,
|
|
// DeliveryStatus: domain.DeliveryStatusPending,
|
|
// IsRead: false,
|
|
// Level: domain.NotificationLevelInfo,
|
|
// Priority: 2,
|
|
// Metadata: raw,
|
|
// Payload: domain.NotificationPayload{
|
|
// Headline: headline,
|
|
// Message: message,
|
|
// },
|
|
// }
|
|
|
|
// if err := s.SendNotification(ctx, notification); err != nil {
|
|
// s.mongoLogger.Error("[NotificationSvc.KafkaConsumer] Failed to send notification",
|
|
// zap.Error(err),
|
|
// zap.Time("timestamp", time.Now()),
|
|
// )
|
|
// }
|
|
// }
|
|
// }()
|
|
// }
|
|
|
|
// func (s *Service) UpdateLiveWalletMetricForWallet(ctx context.Context, wallet domain.Wallet) {
|
|
// var (
|
|
// payload domain.LiveWalletMetrics
|
|
// event map[string]interface{}
|
|
// key = "live_metrics"
|
|
// )
|
|
|
|
// // Try company first
|
|
// company, companyErr := s.notificationStore.GetCompanyByWalletID(ctx, wallet.ID)
|
|
// if companyErr == nil {
|
|
// payload = domain.LiveWalletMetrics{
|
|
// Timestamp: time.Now(),
|
|
// CompanyBalances: []domain.CompanyWalletBalance{{
|
|
// CompanyID: company.ID,
|
|
// CompanyName: company.Name,
|
|
// Balance: float64(wallet.Balance),
|
|
// }},
|
|
// BranchBalances: []domain.BranchWalletBalance{},
|
|
// }
|
|
|
|
// event = map[string]interface{}{
|
|
// "type": "LIVE_WALLET_METRICS_UPDATE",
|
|
// "recipient_id": company.ID,
|
|
// "recipient_type": "company",
|
|
// "payload": payload,
|
|
// }
|
|
// } else {
|
|
// // Try branch next
|
|
// branch, branchErr := s.notificationStore.GetBranchByWalletID(ctx, wallet.ID)
|
|
// if branchErr == nil {
|
|
// payload = domain.LiveWalletMetrics{
|
|
// Timestamp: time.Now(),
|
|
// CompanyBalances: []domain.CompanyWalletBalance{},
|
|
// BranchBalances: []domain.BranchWalletBalance{{
|
|
// BranchID: branch.ID,
|
|
// BranchName: branch.Name,
|
|
// CompanyID: branch.CompanyID,
|
|
// Balance: float64(wallet.Balance),
|
|
// }},
|
|
// }
|
|
|
|
// event = map[string]interface{}{
|
|
// "type": "LIVE_WALLET_METRICS_UPDATE",
|
|
// "recipient_id": branch.ID,
|
|
// "recipient_type": "branch",
|
|
// "payload": payload,
|
|
// }
|
|
// } else {
|
|
// // Neither company nor branch matched this wallet
|
|
// // s.logger.Warn("wallet not linked to any company or branch", "walletID", wallet.ID)
|
|
// s.mongoLogger.Warn("wallet not linked to any company or branch",
|
|
// zap.Int64("walletID", wallet.ID),
|
|
// zap.Time("timestamp", time.Now()),
|
|
// )
|
|
// return
|
|
// }
|
|
// }
|
|
|
|
// // Save latest metric to Redis
|
|
// if jsonBytes, err := json.Marshal(payload); err == nil {
|
|
// s.redisClient.Set(ctx, key, jsonBytes, 0)
|
|
// } else {
|
|
// // s.logger.Error("failed to marshal wallet metrics payload", "walletID", wallet.ID, "err", err)
|
|
// s.mongoLogger.Error("failed to marshal wallet metrics payload",
|
|
// zap.Int64("walletID", wallet.ID),
|
|
// zap.Error(err),
|
|
// zap.Time("timestamp", time.Now()),
|
|
// )
|
|
// }
|
|
|
|
// // Publish via Redis
|
|
// if jsonEvent, err := json.Marshal(event); err == nil {
|
|
// s.redisClient.Publish(ctx, key, jsonEvent)
|
|
// } else {
|
|
// // s.logger.Error("failed to marshal event payload", "walletID", wallet.ID, "err", err)
|
|
// s.mongoLogger.Error("failed to marshal event payload",
|
|
// zap.Int64("walletID", wallet.ID),
|
|
// zap.Error(err),
|
|
// zap.Time("timestamp", time.Now()),
|
|
// )
|
|
// }
|
|
|
|
// // Broadcast over WebSocket
|
|
// s.Hub.Broadcast <- event
|
|
// }
|