// internal/services/walletmonitor/service.go package monitor import ( "context" "fmt" "log/slog" "sync" "time" "github.com/SamuelTariku/FortuneBet-Backend/internal/domain" "github.com/SamuelTariku/FortuneBet-Backend/internal/services/branch" notificationservice "github.com/SamuelTariku/FortuneBet-Backend/internal/services/notfication" "github.com/SamuelTariku/FortuneBet-Backend/internal/services/wallet" ) type Service struct { walletSvc wallet.Service branchSvc branch.Service notificationSvc *notificationservice.Service logger *slog.Logger thresholds []float64 checkInterval time.Duration stopCh chan struct{} wg sync.WaitGroup initialDeposits map[int64]domain.Currency // companyID -> initial deposit mu sync.RWMutex } func NewService( walletSvc wallet.Service, branchSvc branch.Service, notificationSvc *notificationservice.Service, // Change to pointer logger *slog.Logger, checkInterval time.Duration, ) *Service { return &Service{ walletSvc: walletSvc, branchSvc: branchSvc, notificationSvc: notificationSvc, // Now storing the pointer logger: logger, thresholds: []float64{0.75, 0.50, 0.25, 0.10, 0.05}, checkInterval: checkInterval, stopCh: make(chan struct{}), initialDeposits: make(map[int64]domain.Currency), } } func (s *Service) Start() { s.wg.Add(1) go s.monitorWallets() } func (s *Service) Stop() { close(s.stopCh) s.wg.Wait() } func (s *Service) monitorWallets() { defer s.wg.Done() ticker := time.NewTicker(s.checkInterval) defer ticker.Stop() for { select { case <-ticker.C: s.checkWalletThresholds() case <-s.stopCh: return } } } func (s *Service) checkWalletThresholds() { ctx := context.Background() // Get all company wallets companies, err := s.branchSvc.GetAllCompaniesBranch(ctx) if err != nil { s.logger.Error("failed to get companies", "error", err) return } for _, company := range companies { wallet, err := s.walletSvc.GetWalletByID(ctx, company.WalletID) if err != nil { s.logger.Error("failed to get company wallet", "company_id", company.ID, "error", err) continue } // Initialize initial deposit if not set s.mu.Lock() initialDeposit, exists := s.initialDeposits[company.ID] if !exists || wallet.Balance > initialDeposit { s.initialDeposits[company.ID] = wallet.Balance initialDeposit = wallet.Balance // update local variable } s.mu.Unlock() if initialDeposit == 0 { continue // avoid division by zero } currentBalance := wallet.Balance currentPercentage := float64(currentBalance) / float64(initialDeposit) for _, threshold := range s.thresholds { if currentPercentage <= threshold { // Check if we've already notified for this threshold key := notificationKey(company.ID, threshold) if s.hasNotified(key) { continue } // Send notifications s.sendThresholdNotifications(ctx, company.ID, threshold, currentBalance, initialDeposit) s.markAsNotified(key) } } } } func (s *Service) sendThresholdNotifications( ctx context.Context, companyID int64, threshold float64, currentBalance domain.Currency, initialDeposit domain.Currency, ) { // Get all recipients (branch managers, admins, super admins for this company) recipients, err := s.getNotificationRecipients(ctx, companyID) if err != nil { s.logger.Error("failed to get notification recipients", "company_id", companyID, "error", err) return } thresholdPercent := int(threshold * 100) message := buildNotificationMessage(thresholdPercent, currentBalance, initialDeposit) for _, recipientID := range recipients { notification := &domain.Notification{ RecipientID: recipientID, Type: domain.NOTIFICATION_TYPE_WALLET, Level: domain.NotificationLevelWarning, Reciever: domain.NotificationRecieverSideAdmin, DeliveryChannel: domain.DeliveryChannelInApp, Payload: domain.NotificationPayload{ Headline: "Wallet Threshold Alert", Message: message, }, Priority: 2, // Medium priority } if err := s.notificationSvc.SendNotification(ctx, notification); err != nil { s.logger.Error("failed to send threshold notification", "recipient_id", recipientID, "company_id", companyID, "threshold", thresholdPercent, "error", err) } } s.logger.Info("sent wallet threshold notifications", "company_id", companyID, "threshold", thresholdPercent, "recipient_count", len(recipients)) } func (s *Service) getNotificationRecipients(ctx context.Context, companyID int64) ([]int64, error) { // Get branch managers for this company branches, err := s.branchSvc.GetBranchesByCompany(ctx, companyID) if err != nil { return nil, err } var recipientIDs []int64 // Add branch managers for _, branch := range branches { if branch.BranchManagerID != 0 { recipientIDs = append(recipientIDs, branch.BranchManagerID) } } // Add company admins (implementation depends on your user service) // This would typically query users with admin role for this company return recipientIDs, nil } func (s *Service) hasNotified(key string) bool { fmt.Println(key) // Implement your notification tracking logic here // Could use a cache or database to track which thresholds have been notified return false } func (s *Service) markAsNotified(key string) { // Implement your notification tracking logic here // Mark that this threshold has been notified } func notificationKey(companyID int64, threshold float64) string { return fmt.Sprintf("%d_%.2f", companyID, threshold) } func buildNotificationMessage(thresholdPercent int, currentBalance, initialDeposit domain.Currency) string { return fmt.Sprintf( "Company wallet balance has reached %d%% of initial deposit. Current balance: %.2f, Initial deposit: %.2f", thresholdPercent, float64(currentBalance), // Assuming currency is in cents float64(initialDeposit), ) }