Yimaru-BackEnd/internal/services/wallet/monitor/service.go

217 lines
5.8 KiB
Go

// internal/services/walletmonitor/service.go
package monitor
import (
"context"
"fmt"
"log/slog"
"sync"
"time"
"github.com/SamuelTariku/FortuneBet-Backend/internal/domain"
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/branch"
notificationservice "github.com/SamuelTariku/FortuneBet-Backend/internal/services/notfication"
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/wallet"
)
type Service struct {
walletSvc wallet.Service
branchSvc branch.Service
notificationSvc *notificationservice.Service
logger *slog.Logger
thresholds []float64
checkInterval time.Duration
stopCh chan struct{}
wg sync.WaitGroup
initialDeposits map[int64]domain.Currency // companyID -> initial deposit
mu sync.RWMutex
}
func NewService(
walletSvc wallet.Service,
branchSvc branch.Service,
notificationSvc *notificationservice.Service, // Change to pointer
logger *slog.Logger,
checkInterval time.Duration,
) *Service {
return &Service{
walletSvc: walletSvc,
branchSvc: branchSvc,
notificationSvc: notificationSvc, // Now storing the pointer
logger: logger,
thresholds: []float64{0.75, 0.50, 0.25, 0.10, 0.05},
checkInterval: checkInterval,
stopCh: make(chan struct{}),
initialDeposits: make(map[int64]domain.Currency),
}
}
func (s *Service) Start() {
s.wg.Add(1)
go s.monitorWallets()
}
func (s *Service) Stop() {
close(s.stopCh)
s.wg.Wait()
}
func (s *Service) monitorWallets() {
defer s.wg.Done()
ticker := time.NewTicker(s.checkInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.checkWalletThresholds()
case <-s.stopCh:
return
}
}
}
func (s *Service) checkWalletThresholds() {
ctx := context.Background()
// Get all company wallets
companies, err := s.branchSvc.GetAllCompaniesBranch(ctx)
if err != nil {
s.logger.Error("failed to get companies", "error", err)
return
}
for _, company := range companies {
wallet, err := s.walletSvc.GetWalletByID(ctx, company.WalletID)
if err != nil {
s.logger.Error("failed to get company wallet", "company_id", company.ID, "error", err)
continue
}
// Initialize initial deposit if not set
s.mu.Lock()
if _, exists := s.initialDeposits[company.ID]; !exists {
s.initialDeposits[company.ID] = wallet.Balance
s.mu.Unlock()
continue
}
initialDeposit := s.initialDeposits[company.ID]
s.mu.Unlock()
if initialDeposit == 0 {
continue // avoid division by zero
}
currentBalance := wallet.Balance
currentPercentage := float64(currentBalance) / float64(initialDeposit)
for _, threshold := range s.thresholds {
if currentPercentage <= threshold {
// Check if we've already notified for this threshold
key := notificationKey(company.ID, threshold)
if s.hasNotified(key) {
continue
}
// Send notifications
s.sendThresholdNotifications(ctx, company.ID, threshold, currentBalance, initialDeposit)
s.markAsNotified(key)
}
}
}
}
func (s *Service) sendThresholdNotifications(
ctx context.Context,
companyID int64,
threshold float64,
currentBalance domain.Currency,
initialDeposit domain.Currency,
) {
// Get all recipients (branch managers, admins, super admins for this company)
recipients, err := s.getNotificationRecipients(ctx, companyID)
if err != nil {
s.logger.Error("failed to get notification recipients", "company_id", companyID, "error", err)
return
}
thresholdPercent := int(threshold * 100)
message := buildNotificationMessage(thresholdPercent, currentBalance, initialDeposit)
for _, recipientID := range recipients {
notification := &domain.Notification{
RecipientID: recipientID,
Type: domain.NOTIFICATION_TYPE_WALLET,
Level: domain.NotificationLevelWarning,
Reciever: domain.NotificationRecieverSideAdmin,
DeliveryChannel: domain.DeliveryChannelInApp,
Payload: domain.NotificationPayload{
Headline: "Wallet Threshold Alert",
Message: message,
},
Priority: 2, // Medium priority
}
if err := s.notificationSvc.SendNotification(ctx, notification); err != nil {
s.logger.Error("failed to send threshold notification",
"recipient_id", recipientID,
"company_id", companyID,
"threshold", thresholdPercent,
"error", err)
}
}
s.logger.Info("sent wallet threshold notifications",
"company_id", companyID,
"threshold", thresholdPercent,
"recipient_count", len(recipients))
}
func (s *Service) getNotificationRecipients(ctx context.Context, companyID int64) ([]int64, error) {
// Get branch managers for this company
branches, err := s.branchSvc.GetBranchesByCompany(ctx, companyID)
if err != nil {
return nil, err
}
var recipientIDs []int64
// Add branch managers
for _, branch := range branches {
if branch.BranchManagerID != 0 {
recipientIDs = append(recipientIDs, branch.BranchManagerID)
}
}
// Add company admins (implementation depends on your user service)
// This would typically query users with admin role for this company
return recipientIDs, nil
}
func (s *Service) hasNotified(key string) bool {
fmt.Println(key)
// Implement your notification tracking logic here
// Could use a cache or database to track which thresholds have been notified
return false
}
func (s *Service) markAsNotified(key string) {
// Implement your notification tracking logic here
// Mark that this threshold has been notified
}
func notificationKey(companyID int64, threshold float64) string {
return fmt.Sprintf("%d_%.2f", companyID, threshold)
}
func buildNotificationMessage(thresholdPercent int, currentBalance, initialDeposit domain.Currency) string {
return fmt.Sprintf(
"Company wallet balance has reached %d%% of initial deposit. Current balance: %.2f, Initial deposit: %.2f",
thresholdPercent,
float64(currentBalance)/100, // Assuming currency is in cents
float64(initialDeposit)/100,
)
}