fix: notifications for win bet and wallet balance low

This commit is contained in:
Samuel Tariku 2025-07-21 04:31:38 +03:00
parent 65bd5ab3f5
commit 6e6ed2c9a9
37 changed files with 1414 additions and 724 deletions

View File

@ -7,5 +7,12 @@
],
"cSpell.enabledFileTypes": {
"sql": false
}
},
"workbench.editor.customLabels.enabled": true,
"workbench.editor.customLabels.patterns": {
"**/internal/services/**/service.go": "${dirname}.service",
"**/internal/services/**/*.go": "${filename}.${dirname}.service",
"**/internal/domain/**/*.go": "${filename}.${dirname}",
"**/internal/repository/**/*.go": "${filename}.repo",
},
}

View File

@ -39,7 +39,8 @@ import (
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/institutions"
issuereporting "github.com/SamuelTariku/FortuneBet-Backend/internal/services/issue_reporting"
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/league"
notificationservice "github.com/SamuelTariku/FortuneBet-Backend/internal/services/notfication"
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/messenger"
notificationservice "github.com/SamuelTariku/FortuneBet-Backend/internal/services/notification"
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/odds"
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/recommendation"
referralservice "github.com/SamuelTariku/FortuneBet-Backend/internal/services/referal"
@ -101,13 +102,15 @@ func main() {
// Initialize services
settingSvc := settings.NewService(store)
messengerSvc := messenger.NewService(settingSvc, cfg)
authSvc := authentication.NewService(store, store, cfg.RefreshExpiry)
userSvc := user.NewService(store, store, cfg)
userSvc := user.NewService(store, store, messengerSvc, cfg)
eventSvc := event.New(cfg.Bet365Token, store)
oddsSvc := odds.New(store, cfg, logger)
notificationRepo := repository.NewNotificationRepository(store)
virtuaGamesRepo := repository.NewVirtualGameRepository(store)
notificationSvc := notificationservice.New(notificationRepo, logger, cfg)
notificationSvc := notificationservice.New(notificationRepo, domain.MongoDBLogger, logger, cfg, messengerSvc, userSvc)
var notificatioStore notificationservice.NotificationStore
// var userStore user.UserStore
@ -118,6 +121,8 @@ func main() {
notificatioStore,
// userStore,
notificationSvc,
userSvc,
domain.MongoDBLogger,
logger,
)
@ -125,7 +130,7 @@ func main() {
companySvc := company.NewService(store)
leagueSvc := league.New(store)
ticketSvc := ticket.NewService(store, eventSvc, *oddsSvc, domain.MongoDBLogger, *settingSvc, notificationSvc)
betSvc := bet.NewService(store, eventSvc, *oddsSvc, *walletSvc, *branchSvc, *companySvc, *settingSvc, notificationSvc, logger, domain.MongoDBLogger)
betSvc := bet.NewService(store, eventSvc, *oddsSvc, *walletSvc, *branchSvc, *companySvc, *settingSvc, *userSvc, notificationSvc, logger, domain.MongoDBLogger)
resultSvc := result.NewService(store, cfg, logger, *betSvc, *oddsSvc, eventSvc, leagueSvc, notificationSvc)
bonusSvc := bonus.NewService(store)
referalRepo := repository.NewReferralRepository(store)

View File

@ -1,6 +1,7 @@
-- Settings Initial Data
INSERT INTO settings (key, value)
VALUES ('max_number_of_outcomes', '30'),
VALUES ('sms_provider', '30'),
('max_number_of_outcomes', '30'),
('bet_amount_limit', '100000'),
('daily_ticket_limit', '50'),
('total_winnings_limit', '1000000'),

View File

@ -10,26 +10,4 @@ INSERT INTO settings (key, value, updated_at)
VALUES ($1, $2, CURRENT_TIMESTAMP) ON CONFLICT (key) DO
UPDATE
SET value = EXCLUDED.value
RETURNING *;
-- name: SetInitialData :exec
INSERT INTO settings (key, value)
VALUES ('max_number_of_outcomes', '30') ON CONFLICT (key) DO
UPDATE
SET value = EXCLUDED.value;
INSERT INTO settings (key, value)
VALUES ('bet_amount_limit', '100000') ON CONFLICT (key) DO
UPDATE
SET value = EXCLUDED.value;
INSERT INTO settings (key, value)
VALUES ('daily_ticket_limit', '50') ON CONFLICT (key) DO
UPDATE
SET value = EXCLUDED.value;
INSERT INTO settings (key, value)
VALUES ('total_winnings_limit', '1000000') ON CONFLICT (key) DO
UPDATE
SET value = EXCLUDED.value;
INSERT INTO settings (key, value)
VALUES ('amount_for_bet_referral', '1000000') ON CONFLICT (key) DO
UPDATE
SET value = EXCLUDED.value;
RETURNING *;

View File

@ -192,4 +192,9 @@ SET password = $1,
WHERE (
email = $2
OR phone_number = $3
);
);
-- name: GetAdminByCompanyID :one
SELECT users.*
FROM companies
JOIN users ON companies.admin_id = users.id
where companies.id = $1;

View File

@ -81,15 +81,3 @@ func (q *Queries) SaveSetting(ctx context.Context, arg SaveSettingParams) (Setti
)
return i, err
}
const SetInitialData = `-- name: SetInitialData :exec
INSERT INTO settings (key, value)
VALUES ('max_number_of_outcomes', '30') ON CONFLICT (key) DO
UPDATE
SET value = EXCLUDED.value
`
func (q *Queries) SetInitialData(ctx context.Context) error {
_, err := q.db.Exec(ctx, SetInitialData)
return err
}

View File

@ -159,6 +159,37 @@ func (q *Queries) DeleteUser(ctx context.Context, id int64) error {
return err
}
const GetAdminByCompanyID = `-- name: GetAdminByCompanyID :one
SELECT users.id, users.first_name, users.last_name, users.email, users.phone_number, users.role, users.password, users.email_verified, users.phone_verified, users.created_at, users.updated_at, users.company_id, users.suspended_at, users.suspended, users.referral_code, users.referred_by
FROM companies
JOIN users ON companies.admin_id = users.id
where companies.id = $1
`
func (q *Queries) GetAdminByCompanyID(ctx context.Context, id int64) (User, error) {
row := q.db.QueryRow(ctx, GetAdminByCompanyID, id)
var i User
err := row.Scan(
&i.ID,
&i.FirstName,
&i.LastName,
&i.Email,
&i.PhoneNumber,
&i.Role,
&i.Password,
&i.EmailVerified,
&i.PhoneVerified,
&i.CreatedAt,
&i.UpdatedAt,
&i.CompanyID,
&i.SuspendedAt,
&i.Suspended,
&i.ReferralCode,
&i.ReferredBy,
)
return i, err
}
const GetAllUsers = `-- name: GetAllUsers :many
SELECT id,
first_name,

View File

@ -26,12 +26,7 @@ const (
OtpMediumSms OtpMedium = "sms"
)
type OtpProvider string
const (
TwilioSms OtpProvider = "twilio"
AfroMessage OtpProvider = "afro_message"
)
type Otp struct {
ID int64

View File

@ -17,15 +17,17 @@ type SettingRes struct {
}
type SettingList struct {
MaxNumberOfOutcomes int64 `json:"max_number_of_outcomes"`
BetAmountLimit Currency `json:"bet_amount_limit"`
DailyTicketPerIP int64 `json:"daily_ticket_limit"`
TotalWinningLimit Currency `json:"total_winning_limit"`
AmountForBetReferral Currency `json:"amount_for_bet_referral"`
CashbackAmountCap Currency `json:"cashback_amount_cap"`
SMSProvider SMSProvider `json:"sms_provider"`
MaxNumberOfOutcomes int64 `json:"max_number_of_outcomes"`
BetAmountLimit Currency `json:"bet_amount_limit"`
DailyTicketPerIP int64 `json:"daily_ticket_limit"`
TotalWinningLimit Currency `json:"total_winning_limit"`
AmountForBetReferral Currency `json:"amount_for_bet_referral"`
CashbackAmountCap Currency `json:"cashback_amount_cap"`
}
type DBSettingList struct {
SMSProvider ValidString
MaxNumberOfOutcomes ValidInt64
BetAmountLimit ValidInt64
DailyTicketPerIP ValidInt64
@ -45,8 +47,27 @@ func ConvertInt64SettingsMap(dbSettingList *DBSettingList) map[string]*ValidInt6
}
}
func ConvertStringSettingsMap(dbSettingList *DBSettingList) map[string]*ValidString {
return map[string]*ValidString{
"sms_provider": &dbSettingList.SMSProvider,
}
}
func ConvertBoolSettingsMap(dbSettingList *DBSettingList) map[string]*ValidBool {
return map[string]*ValidBool{}
}
func ConvertFloat32SettingsMap(dbSettingList *DBSettingList) map[string]*ValidFloat32 {
return map[string]*ValidFloat32{}
}
func ConvertTimeSettingsMap(dbSettingList *DBSettingList) map[string]*ValidTime {
return map[string]*ValidTime{}
}
func ConvertDBSetting(dbSettingList DBSettingList) SettingList {
return SettingList{
SMSProvider: SMSProvider(dbSettingList.SMSProvider.Value),
MaxNumberOfOutcomes: dbSettingList.MaxNumberOfOutcomes.Value,
BetAmountLimit: Currency(dbSettingList.BetAmountLimit.Value),
DailyTicketPerIP: dbSettingList.DailyTicketPerIP.Value,

18
internal/domain/sms.go Normal file
View File

@ -0,0 +1,18 @@
package domain
type SMSProvider string
const (
TwilioSms SMSProvider = "twilio"
AfroMessage SMSProvider = "afro_message"
)
// IsValid checks if the SMSProvider is a valid enum value
func (s SMSProvider) IsValid() bool {
switch s {
case TwilioSms, AfroMessage:
return true
default:
return false
}
}

View File

@ -317,39 +317,7 @@ func (s *Store) CountUnreadNotifications(ctx context.Context, userID int64) (int
return count, nil
}
func (s *Store) GetCompanyByWalletID(ctx context.Context, walletID int64) (domain.Company, error) {
dbCompany, err := s.queries.GetCompanyByWalletID(ctx, walletID)
if err != nil {
return domain.Company{}, err
}
return domain.Company{
ID: dbCompany.ID,
Name: dbCompany.Name,
AdminID: dbCompany.AdminID,
WalletID: dbCompany.WalletID,
}, nil
}
func (s *Store) GetBranchByWalletID(ctx context.Context, walletID int64) (domain.Branch, error) {
dbBranch, err := s.queries.GetBranchByWalletID(ctx, walletID)
if err != nil {
return domain.Branch{}, err
}
return domain.Branch{
ID: dbBranch.ID,
Name: dbBranch.Name,
Location: dbBranch.Location,
IsActive: dbBranch.IsActive,
WalletID: dbBranch.WalletID,
BranchManagerID: dbBranch.BranchManagerID,
CompanyID: dbBranch.CompanyID,
IsSelfOwned: dbBranch.IsSelfOwned,
// Creat: dbBranch.CreatedAt.Time,
// UpdatedAt: dbBranch.UpdatedAt.Time,
}, nil
}
// func (s *Store) GetAllNotifications(ctx context.Context, limit, offset int) ([]domain.Notification, error) {
// dbNotifications, err := s.queries.GetAllNotifications(ctx, dbgen.GetAllNotificationsParams{

View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strconv"
"time"
dbgen "github.com/SamuelTariku/FortuneBet-Backend/gen/db"
"github.com/SamuelTariku/FortuneBet-Backend/internal/domain"
@ -14,6 +15,10 @@ func GetDBSettingList(settings []dbgen.Setting) (domain.SettingList, error) {
var dbSettingList domain.DBSettingList
var int64SettingsMap = domain.ConvertInt64SettingsMap(&dbSettingList)
var stringSettingsMap = domain.ConvertStringSettingsMap(&dbSettingList)
var boolSettingsMap = domain.ConvertBoolSettingsMap(&dbSettingList)
var float32SettingsMap = domain.ConvertFloat32SettingsMap(&dbSettingList)
var timeSettingsMap = domain.ConvertTimeSettingsMap(&dbSettingList)
for _, setting := range settings {
is_setting_unknown := true
@ -31,6 +36,57 @@ func GetDBSettingList(settings []dbgen.Setting) (domain.SettingList, error) {
}
}
for key, dbSetting := range stringSettingsMap {
if setting.Key == key {
*dbSetting = domain.ValidString{
Value: setting.Value,
Valid: true,
}
is_setting_unknown = false
}
}
for key, dbSetting := range boolSettingsMap {
if setting.Key == key {
value, err := strconv.ParseBool(setting.Value)
if err != nil {
return domain.SettingList{}, err
}
*dbSetting = domain.ValidBool{
Value: value,
Valid: true,
}
is_setting_unknown = false
}
}
for key, dbSetting := range float32SettingsMap {
if setting.Key == key {
value, err := strconv.ParseFloat(setting.Value, 32)
if err != nil {
return domain.SettingList{}, err
}
*dbSetting = domain.ValidFloat32{
Value: float32(value),
Valid: true,
}
is_setting_unknown = false
}
}
for key, dbSetting := range timeSettingsMap {
if setting.Key == key {
value, err := time.Parse(time.RFC3339, setting.Value)
if err != nil {
return domain.SettingList{}, err
}
*dbSetting = domain.ValidTime{
Value: value,
Valid: true,
}
is_setting_unknown = false
}
}
if is_setting_unknown {
domain.MongoDBLogger.Warn("unknown setting found on database", zap.String("setting", setting.Key))
}

View File

@ -490,6 +490,27 @@ func (s *Store) CreateUserWithoutOtp(ctx context.Context, user domain.User, is_c
}, nil
}
func (s *Store) GetAdminByCompanyID(ctx context.Context, companyID int64) (domain.User, error) {
userRes, err := s.queries.GetAdminByCompanyID(ctx, companyID)
if err != nil {
return domain.User{}, err
}
return domain.User{
ID: userRes.ID,
FirstName: userRes.FirstName,
LastName: userRes.LastName,
Email: userRes.Email.String,
PhoneNumber: userRes.PhoneNumber.String,
Role: domain.Role(userRes.Role),
EmailVerified: userRes.EmailVerified,
PhoneVerified: userRes.PhoneVerified,
CreatedAt: userRes.CreatedAt.Time,
UpdatedAt: userRes.UpdatedAt.Time,
Suspended: userRes.Suspended,
}, nil
}
// GetCustomerCounts returns total and active customer counts
func (s *Store) GetCustomerCounts(ctx context.Context, filter domain.ReportFilter) (total, active, inactive int64, err error) {
query := `SELECT

View File

@ -185,6 +185,40 @@ func (s *Store) UpdateWalletActive(ctx context.Context, id int64, isActive bool)
return err
}
func (s *Store) GetCompanyByWalletID(ctx context.Context, walletID int64) (domain.Company, error) {
dbCompany, err := s.queries.GetCompanyByWalletID(ctx, walletID)
if err != nil {
return domain.Company{}, err
}
return domain.Company{
ID: dbCompany.ID,
Name: dbCompany.Name,
AdminID: dbCompany.AdminID,
WalletID: dbCompany.WalletID,
}, nil
}
func (s *Store) GetBranchByWalletID(ctx context.Context, walletID int64) (domain.Branch, error) {
dbBranch, err := s.queries.GetBranchByWalletID(ctx, walletID)
if err != nil {
return domain.Branch{}, err
}
return domain.Branch{
ID: dbBranch.ID,
Name: dbBranch.Name,
Location: dbBranch.Location,
IsActive: dbBranch.IsActive,
WalletID: dbBranch.WalletID,
BranchManagerID: dbBranch.BranchManagerID,
CompanyID: dbBranch.CompanyID,
IsSelfOwned: dbBranch.IsSelfOwned,
// Creat: dbBranch.CreatedAt.Time,
// UpdatedAt: dbBranch.UpdatedAt.Time,
}, nil
}
// GetBalanceSummary returns wallet balance summary
func (s *Store) GetBalanceSummary(ctx context.Context, filter domain.ReportFilter) (domain.BalanceSummary, error) {
var summary domain.BalanceSummary

View File

@ -22,9 +22,10 @@ import (
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/branch"
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/company"
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/event"
notificationservice "github.com/SamuelTariku/FortuneBet-Backend/internal/services/notfication"
notificationservice "github.com/SamuelTariku/FortuneBet-Backend/internal/services/notification"
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/odds"
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/settings"
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/user"
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/wallet"
"go.uber.org/zap"
)
@ -54,6 +55,7 @@ type Service struct {
branchSvc branch.Service
companySvc company.Service
settingSvc settings.Service
userSvc user.Service
notificationSvc *notificationservice.Service
logger *slog.Logger
mongoLogger *zap.Logger
@ -67,6 +69,7 @@ func NewService(
branchSvc branch.Service,
companySvc company.Service,
settingSvc settings.Service,
userSvc user.Service,
notificationSvc *notificationservice.Service,
logger *slog.Logger,
mongoLogger *zap.Logger,
@ -215,6 +218,9 @@ func (s *Service) GenerateBetOutcome(ctx context.Context, eventID int64, marketI
func (s *Service) PlaceBet(ctx context.Context, req domain.CreateBetReq, userID int64, role domain.Role, companyID domain.ValidInt64) (domain.CreateBetRes, error) {
settingsList, err := s.settingSvc.GetSettingList(ctx)
if err != nil {
return domain.CreateBetRes{}, err
}
if req.Amount < 1 {
return domain.CreateBetRes{}, ErrInvalidAmount
}
@ -490,7 +496,7 @@ func (s *Service) DeductBetFromBranchWallet(ctx context.Context, amount float32,
deductedAmount := amount * company.DeductedPercentage
_, err = s.walletSvc.DeductFromWallet(ctx,
walletID, domain.ToCurrency(deductedAmount), domain.BranchWalletType, domain.ValidInt64{
walletID, domain.ToCurrency(deductedAmount), domain.ValidInt64{
Value: userID,
Valid: true,
}, domain.TRANSFER_DIRECT,
@ -519,7 +525,7 @@ func (s *Service) DeductBetFromCustomerWallet(ctx context.Context, amount float3
}
if amount < wallets.RegularBalance.Float32() {
_, err = s.walletSvc.DeductFromWallet(ctx, wallets.RegularID,
domain.ToCurrency(amount), domain.CustomerWalletType, domain.ValidInt64{},
domain.ToCurrency(amount), domain.ValidInt64{},
domain.TRANSFER_DIRECT, fmt.Sprintf("Deducted %v amount from wallet by system while placing bet", amount))
if err != nil {
s.mongoLogger.Error("wallet deduction failed for customer regular wallet",
@ -538,7 +544,7 @@ func (s *Service) DeductBetFromCustomerWallet(ctx context.Context, amount float3
}
// Empty the regular balance
_, err = s.walletSvc.DeductFromWallet(ctx, wallets.RegularID,
wallets.RegularBalance, domain.CustomerWalletType, domain.ValidInt64{}, domain.TRANSFER_DIRECT,
wallets.RegularBalance, domain.ValidInt64{}, domain.TRANSFER_DIRECT,
fmt.Sprintf("Deducted %v amount from wallet by system while placing bet", wallets.RegularBalance.Float32()))
if err != nil {
s.mongoLogger.Error("wallet deduction failed for customer regular wallet",
@ -553,7 +559,7 @@ func (s *Service) DeductBetFromCustomerWallet(ctx context.Context, amount float3
// Empty remaining from static balance
remainingAmount := wallets.RegularBalance - domain.Currency(amount)
_, err = s.walletSvc.DeductFromWallet(ctx, wallets.StaticID,
remainingAmount, domain.CustomerWalletType, domain.ValidInt64{}, domain.TRANSFER_DIRECT,
remainingAmount, domain.ValidInt64{}, domain.TRANSFER_DIRECT,
fmt.Sprintf("Deducted %v amount from wallet by system while placing bet", remainingAmount.Float32()))
if err != nil {
s.mongoLogger.Error("wallet deduction failed for customer static wallet",
@ -894,10 +900,19 @@ func (s *Service) UpdateStatus(ctx context.Context, id int64, status domain.Outc
return err
}
if bet.IsShopBet ||
status == domain.OUTCOME_STATUS_ERROR ||
status == domain.OUTCOME_STATUS_PENDING ||
status == domain.OUTCOME_STATUS_LOSS {
switch {
case bet.IsShopBet:
return s.betStore.UpdateStatus(ctx, id, status)
case status == domain.OUTCOME_STATUS_ERROR, status == domain.OUTCOME_STATUS_PENDING:
s.SendErrorStatusNotification(ctx, status, bet.UserID, "")
s.SendAdminErrorAlertNotification(ctx, status, "")
s.mongoLogger.Error("Bet Status is error",
zap.Int64("bet_id", id),
zap.Error(err),
)
return s.betStore.UpdateStatus(ctx, id, status)
case status == domain.OUTCOME_STATUS_LOSS:
s.SendLosingStatusNotification(ctx, status, bet.UserID, "")
return s.betStore.UpdateStatus(ctx, id, status)
}
@ -914,10 +929,15 @@ func (s *Service) UpdateStatus(ctx context.Context, id int64, status domain.Outc
switch status {
case domain.OUTCOME_STATUS_WIN:
amount = domain.CalculateWinnings(bet.Amount, bet.TotalOdds)
s.SendWinningStatusNotification(ctx, status, bet.UserID, amount, "")
case domain.OUTCOME_STATUS_HALF:
amount = domain.CalculateWinnings(bet.Amount, bet.TotalOdds) / 2
default:
s.SendWinningStatusNotification(ctx, status, bet.UserID, amount, "")
case domain.OUTCOME_STATUS_VOID:
amount = bet.Amount
s.SendWinningStatusNotification(ctx, status, bet.UserID, amount, "")
default:
return fmt.Errorf("invalid outcome status")
}
_, err = s.walletSvc.AddToWallet(ctx, customerWallet.RegularID, amount, domain.ValidInt64{},
@ -935,6 +955,207 @@ func (s *Service) UpdateStatus(ctx context.Context, id int64, status domain.Outc
return s.betStore.UpdateStatus(ctx, id, status)
}
func (s *Service) SendWinningStatusNotification(ctx context.Context, status domain.OutcomeStatus, userID int64, winningAmount domain.Currency, extra string) error {
var headline string
var message string
switch status {
case domain.OUTCOME_STATUS_WIN:
headline = "You Bet Has Won!"
message = fmt.Sprintf(
"You have been awarded %.2f",
winningAmount.Float32(),
)
case domain.OUTCOME_STATUS_HALF:
headline = "You have a half win"
message = fmt.Sprintf(
"You have been awarded %.2f",
winningAmount.Float32(),
)
case domain.OUTCOME_STATUS_VOID:
headline = "Your bet has been refunded"
message = fmt.Sprintf(
"You have been awarded %.2f",
winningAmount.Float32(),
)
}
betNotification := &domain.Notification{
RecipientID: userID,
Type: domain.NOTIFICATION_TYPE_BET_RESULT,
Level: domain.NotificationLevelSuccess,
Reciever: domain.NotificationRecieverSideCustomer,
DeliveryChannel: domain.DeliveryChannelInApp,
Payload: domain.NotificationPayload{
Headline: headline,
Message: message,
},
Priority: 2,
Metadata: fmt.Appendf(nil, `{
"winning_amount":%.2f,
"status":%v
"more": %v
}`, winningAmount.Float32(), status, extra),
}
if err := s.notificationSvc.SendNotification(ctx, betNotification); err != nil {
return err
}
betNotification.DeliveryChannel = domain.DeliveryChannelEmail
if err := s.notificationSvc.SendNotification(ctx, betNotification); err != nil {
return err
}
return nil
}
func (s *Service) SendLosingStatusNotification(ctx context.Context, status domain.OutcomeStatus, userID int64, extra string) error {
var headline string
var message string
switch status {
case domain.OUTCOME_STATUS_LOSS:
headline = "Your bet has lost"
message = "Better luck next time"
}
betNotification := &domain.Notification{
RecipientID: userID,
Type: domain.NOTIFICATION_TYPE_BET_RESULT,
Level: domain.NotificationLevelSuccess,
Reciever: domain.NotificationRecieverSideCustomer,
DeliveryChannel: domain.DeliveryChannelInApp,
Payload: domain.NotificationPayload{
Headline: headline,
Message: message,
},
Priority: 2,
Metadata: fmt.Appendf(nil, `{
"status":%v
"more": %v
}`, status, extra),
}
if err := s.notificationSvc.SendNotification(ctx, betNotification); err != nil {
return err
}
betNotification.DeliveryChannel = domain.DeliveryChannelEmail
if err := s.notificationSvc.SendNotification(ctx, betNotification); err != nil {
return err
}
return nil
}
func (s *Service) SendErrorStatusNotification(ctx context.Context, status domain.OutcomeStatus, userID int64, extra string) error {
var headline string
var message string
switch status {
case domain.OUTCOME_STATUS_ERROR, domain.OUTCOME_STATUS_PENDING:
headline = "There was an error with your bet"
message = "We have encounter an error with your bet. We will fix it as soon as we can"
}
errorSeverityLevel := domain.NotificationErrorSeverityFatal
betNotification := &domain.Notification{
RecipientID: userID,
Type: domain.NOTIFICATION_TYPE_BET_RESULT,
Level: domain.NotificationLevelSuccess,
Reciever: domain.NotificationRecieverSideCustomer,
DeliveryChannel: domain.DeliveryChannelInApp,
Payload: domain.NotificationPayload{
Headline: headline,
Message: message,
},
Priority: 1,
ErrorSeverity: &errorSeverityLevel,
Metadata: fmt.Appendf(nil, `{
"status":%v
"more": %v
}`, status, extra),
}
if err := s.notificationSvc.SendNotification(ctx, betNotification); err != nil {
return err
}
betNotification.DeliveryChannel = domain.DeliveryChannelEmail
if err := s.notificationSvc.SendNotification(ctx, betNotification); err != nil {
return err
}
return nil
}
func (s *Service) SendAdminErrorAlertNotification(ctx context.Context, status domain.OutcomeStatus, extra string) error {
var headline string
var message string
switch status {
case domain.OUTCOME_STATUS_ERROR, domain.OUTCOME_STATUS_PENDING:
headline = "There was an error with your bet"
message = "We have encounter an error with your bet. We will fix it as soon as we can"
}
betNotification := &domain.Notification{
Type: domain.NOTIFICATION_TYPE_BET_RESULT,
Level: domain.NotificationLevelSuccess,
Reciever: domain.NotificationRecieverSideCustomer,
DeliveryChannel: domain.DeliveryChannelEmail,
Payload: domain.NotificationPayload{
Headline: headline,
Message: message,
},
Priority: 2,
Metadata: fmt.Appendf(nil, `{
"status":%v
"more": %v
}`, status, extra),
}
users, _, err := s.userSvc.GetAllUsers(ctx, domain.UserFilter{
Role: string(domain.RoleAdmin),
})
if err != nil {
s.mongoLogger.Error("failed to get admin recipients",
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return err
}
for _, user := range users {
betNotification.RecipientID = user.ID
if err := s.notificationSvc.SendNotification(ctx, betNotification); err != nil {
s.mongoLogger.Error("failed to send admin notification",
zap.Int64("admin_id", user.ID),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return err
}
betNotification.DeliveryChannel = domain.DeliveryChannelEmail
if err := s.notificationSvc.SendNotification(ctx, betNotification); err != nil {
s.mongoLogger.Error("failed to send email admin notification",
zap.Int64("admin_id", user.ID),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return err
}
}
return nil
}
func (s *Service) CheckBetOutcomeForBet(ctx context.Context, betID int64) (domain.OutcomeStatus, error) {
betOutcomes, err := s.betStore.GetBetOutcomeByBetID(ctx, betID)
if err != nil {

View File

@ -0,0 +1,26 @@
package messenger
import (
"context"
"github.com/resend/resend-go/v2"
)
func (s *Service) SendEmail(ctx context.Context, receiverEmail, message string, subject string) error {
apiKey := s.config.ResendApiKey
client := resend.NewClient(apiKey)
formattedSenderEmail := "FortuneBets <" + s.config.ResendSenderEmail + ">"
params := &resend.SendEmailRequest{
From: formattedSenderEmail,
To: []string{receiverEmail},
Subject: subject,
Text: message,
}
_, err := client.Emails.Send(params)
if err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,21 @@
package messenger
import (
"github.com/SamuelTariku/FortuneBet-Backend/internal/config"
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/settings"
)
type Service struct {
settingSvc *settings.Service
config *config.Config
}
func NewService(
settingSvc *settings.Service,
cfg *config.Config,
) *Service {
return &Service{
settingSvc: settingSvc,
config: cfg,
}
}

View File

@ -0,0 +1,85 @@
package messenger
import (
"context"
"errors"
"fmt"
"github.com/SamuelTariku/FortuneBet-Backend/internal/domain"
afro "github.com/amanuelabay/afrosms-go"
"github.com/twilio/twilio-go"
twilioApi "github.com/twilio/twilio-go/rest/api/v2010"
)
var (
ErrSMSProviderNotFound = errors.New("SMS Provider Not Found")
)
func (s *Service) SendSMS(ctx context.Context, receiverPhone, message string) error {
settingsList, err := s.settingSvc.GetSettingList(ctx)
if err != nil {
return err
}
switch settingsList.SMSProvider {
case domain.AfroMessage:
return s.SendAfroMessageSMS(ctx, receiverPhone, message)
case domain.TwilioSms:
return s.SendTwilioSMS(ctx, receiverPhone, message)
default:
return ErrSMSProviderNotFound
}
}
func (s *Service) SendAfroMessageSMS(ctx context.Context, receiverPhone, message string) error {
apiKey := s.config.AFRO_SMS_API_KEY
senderName := s.config.AFRO_SMS_SENDER_NAME
hostURL := s.config.ADRO_SMS_HOST_URL
endpoint := "/api/send"
// API endpoint has been updated
// TODO: no need for package for the afro message operations (pretty simple stuff)
request := afro.GetRequest(apiKey, endpoint, hostURL)
request.BaseURL = "https://api.afromessage.com/api/send"
request.Method = "GET"
request.Sender(senderName)
request.To(receiverPhone, message)
response, err := afro.MakeRequestWithContext(ctx, request)
if err != nil {
return err
}
if response["acknowledge"] == "success" {
return nil
} else {
fmt.Println(response["response"].(map[string]interface{}))
return errors.New("SMS delivery failed")
}
}
func (s *Service) SendTwilioSMS(ctx context.Context, receiverPhone, message string) error {
accountSid := s.config.TwilioAccountSid
authToken := s.config.TwilioAuthToken
senderPhone := s.config.TwilioSenderPhoneNumber
client := twilio.NewRestClientWithParams(twilio.ClientParams{
Username: accountSid,
Password: authToken,
})
params := &twilioApi.CreateMessageParams{}
params.SetTo(receiverPhone)
params.SetFrom(senderPhone)
params.SetBody(message)
_, err := client.Api.CreateMessage(params)
if err != nil {
return fmt.Errorf("%s", "Error sending SMS message: %s"+err.Error())
}
return nil
}

View File

@ -1,475 +0,0 @@
package notificationservice
import (
"context"
"encoding/json"
"errors"
"log/slog"
"sync"
"time"
"github.com/SamuelTariku/FortuneBet-Backend/internal/config"
"github.com/SamuelTariku/FortuneBet-Backend/internal/domain"
"github.com/SamuelTariku/FortuneBet-Backend/internal/pkgs/helpers"
"github.com/SamuelTariku/FortuneBet-Backend/internal/repository"
// "github.com/SamuelTariku/FortuneBet-Backend/internal/services/wallet"
"github.com/SamuelTariku/FortuneBet-Backend/internal/web_server/ws"
afro "github.com/amanuelabay/afrosms-go"
"github.com/gorilla/websocket"
"github.com/redis/go-redis/v9"
)
type Service struct {
repo repository.NotificationRepository
Hub *ws.NotificationHub
notificationStore NotificationStore
connections sync.Map
notificationCh chan *domain.Notification
stopCh chan struct{}
config *config.Config
logger *slog.Logger
redisClient *redis.Client
}
func New(repo repository.NotificationRepository, logger *slog.Logger, cfg *config.Config) *Service {
hub := ws.NewNotificationHub()
rdb := redis.NewClient(&redis.Options{
Addr: cfg.RedisAddr, // e.g., "redis:6379"
})
svc := &Service{
repo: repo,
Hub: hub,
logger: logger,
connections: sync.Map{},
notificationCh: make(chan *domain.Notification, 1000),
stopCh: make(chan struct{}),
config: cfg,
redisClient: rdb,
}
go hub.Run()
go svc.startWorker()
go svc.startRetryWorker()
go svc.RunRedisSubscriber(context.Background())
return svc
}
func (s *Service) addConnection(recipientID int64, c *websocket.Conn) {
if c == nil {
s.logger.Warn("[NotificationSvc.AddConnection] Attempted to add nil WebSocket connection", "recipientID", recipientID)
return
}
s.connections.Store(recipientID, c)
s.logger.Info("[NotificationSvc.AddConnection] Added WebSocket connection", "recipientID", recipientID)
}
func (s *Service) SendNotification(ctx context.Context, notification *domain.Notification) error {
notification.ID = helpers.GenerateID()
notification.Timestamp = time.Now()
notification.DeliveryStatus = domain.DeliveryStatusPending
created, err := s.repo.CreateNotification(ctx, notification)
if err != nil {
s.logger.Error("[NotificationSvc.SendNotification] Failed to create notification", "id", notification.ID, "error", err)
return err
}
notification = created
if notification.DeliveryChannel == domain.DeliveryChannelInApp {
s.Hub.Broadcast <- map[string]interface{}{
"type": "CREATED_NOTIFICATION",
"recipient_id": notification.RecipientID,
"payload": notification,
}
}
select {
case s.notificationCh <- notification:
default:
s.logger.Warn("[NotificationSvc.SendNotification] Notification channel full, dropping notification", "id", notification.ID)
}
return nil
}
func (s *Service) MarkAsRead(ctx context.Context, notificationIDs []string, recipientID int64) error {
for _, notificationID := range notificationIDs {
_, err := s.repo.UpdateNotificationStatus(ctx, notificationID, string(domain.DeliveryStatusSent), true, nil)
if err != nil {
s.logger.Error("[NotificationSvc.MarkAsRead] Failed to mark notification as read", "notificationID", notificationID, "recipientID", recipientID, "error", err)
return err
}
// count, err := s.repo.CountUnreadNotifications(ctx, recipientID)
// if err != nil {
// s.logger.Error("[NotificationSvc.MarkAsRead] Failed to count unread notifications", "recipientID", recipientID, "error", err)
// return err
// }
// s.Hub.Broadcast <- map[string]interface{}{
// "type": "COUNT_NOT_OPENED_NOTIFICATION",
// "recipient_id": recipientID,
// "payload": map[string]int{
// "not_opened_notifications_count": int(count),
// },
// }
s.logger.Info("[NotificationSvc.MarkAsRead] Notification marked as read", "notificationID", notificationID, "recipientID", recipientID)
}
return nil
}
func (s *Service) ListNotifications(ctx context.Context, recipientID int64, limit, offset int) ([]domain.Notification, error) {
notifications, err := s.repo.ListNotifications(ctx, recipientID, limit, offset)
if err != nil {
s.logger.Error("[NotificationSvc.ListNotifications] Failed to list notifications", "recipientID", recipientID, "limit", limit, "offset", offset, "error", err)
return nil, err
}
s.logger.Info("[NotificationSvc.ListNotifications] Successfully listed notifications", "recipientID", recipientID, "count", len(notifications))
return notifications, nil
}
func (s *Service) GetAllNotifications(ctx context.Context, limit, offset int) ([]domain.Notification, error) {
notifications, err := s.repo.GetAllNotifications(ctx, limit, offset)
if err != nil {
s.logger.Error("[NotificationSvc.ListNotifications] Failed to get all notifications")
return nil, err
}
s.logger.Info("[NotificationSvc.ListNotifications] Successfully retrieved all notifications", "count", len(notifications))
return notifications, nil
}
func (s *Service) ConnectWebSocket(ctx context.Context, recipientID int64, c *websocket.Conn) error {
s.addConnection(recipientID, c)
s.logger.Info("[NotificationSvc.ConnectWebSocket] WebSocket connection established", "recipientID", recipientID)
return nil
}
func (s *Service) DisconnectWebSocket(recipientID int64) {
if conn, loaded := s.connections.LoadAndDelete(recipientID); loaded {
conn.(*websocket.Conn).Close()
s.logger.Info("[NotificationSvc.DisconnectWebSocket] Disconnected WebSocket", "recipientID", recipientID)
}
}
func (s *Service) SendSMS(ctx context.Context, recipientID int64, message string) error {
s.logger.Info("[NotificationSvc.SendSMS] SMS notification requested", "recipientID", recipientID, "message", message)
apiKey := s.config.AFRO_SMS_API_KEY
senderName := s.config.AFRO_SMS_SENDER_NAME
receiverPhone := s.config.AFRO_SMS_RECEIVER_PHONE_NUMBER
hostURL := s.config.ADRO_SMS_HOST_URL
endpoint := "/api/send"
request := afro.GetRequest(apiKey, endpoint, hostURL)
request.Method = "GET"
request.Sender(senderName)
request.To(receiverPhone, message)
response, err := afro.MakeRequestWithContext(ctx, request)
if err != nil {
s.logger.Error("[NotificationSvc.SendSMS] Failed to send SMS", "recipientID", recipientID, "error", err)
return err
}
if response["acknowledge"] == "success" {
s.logger.Info("[NotificationSvc.SendSMS] SMS sent successfully", "recipientID", recipientID)
} else {
s.logger.Error("[NotificationSvc.SendSMS] Failed to send SMS", "recipientID", recipientID, "response", response["response"])
return errors.New("SMS delivery failed: " + response["response"].(string))
}
return nil
}
func (s *Service) SendEmail(ctx context.Context, recipientID int64, subject, message string) error {
s.logger.Info("[NotificationSvc.SendEmail] Email notification requested", "recipientID", recipientID, "subject", subject)
return nil
}
func (s *Service) startWorker() {
for {
select {
case notification := <-s.notificationCh:
s.handleNotification(notification)
case <-s.stopCh:
s.logger.Info("[NotificationSvc.StartWorker] Worker stopped")
return
}
}
}
func (s *Service) ListRecipientIDs(ctx context.Context, receiver domain.NotificationRecieverSide) ([]int64, error) {
return s.repo.ListRecipientIDs(ctx, receiver)
}
func (s *Service) handleNotification(notification *domain.Notification) {
ctx := context.Background()
switch notification.DeliveryChannel {
case domain.DeliveryChannelSMS:
err := s.SendSMS(ctx, notification.RecipientID, notification.Payload.Message)
if err != nil {
notification.DeliveryStatus = domain.DeliveryStatusFailed
} else {
notification.DeliveryStatus = domain.DeliveryStatusSent
}
case domain.DeliveryChannelEmail:
err := s.SendEmail(ctx, notification.RecipientID, notification.Payload.Headline, notification.Payload.Message)
if err != nil {
notification.DeliveryStatus = domain.DeliveryStatusFailed
} else {
notification.DeliveryStatus = domain.DeliveryStatusSent
}
default:
if notification.DeliveryChannel != domain.DeliveryChannelInApp {
s.logger.Warn("[NotificationSvc.HandleNotification] Unsupported delivery channel", "channel", notification.DeliveryChannel)
notification.DeliveryStatus = domain.DeliveryStatusFailed
}
}
if _, err := s.repo.UpdateNotificationStatus(ctx, notification.ID, string(notification.DeliveryStatus), notification.IsRead, notification.Metadata); err != nil {
s.logger.Error("[NotificationSvc.HandleNotification] Failed to update notification status", "id", notification.ID, "error", err)
}
}
func (s *Service) startRetryWorker() {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.retryFailedNotifications()
case <-s.stopCh:
s.logger.Info("[NotificationSvc.StartRetryWorker] Retry worker stopped")
return
}
}
}
func (s *Service) retryFailedNotifications() {
ctx := context.Background()
failedNotifications, err := s.repo.ListFailedNotifications(ctx, 100)
if err != nil {
s.logger.Error("[NotificationSvc.RetryFailedNotifications] Failed to list failed notifications", "error", err)
return
}
for _, n := range failedNotifications {
notification := &n
go func(notification *domain.Notification) {
for attempt := 0; attempt < 3; attempt++ {
time.Sleep(time.Duration(attempt) * time.Second)
switch notification.DeliveryChannel {
case domain.DeliveryChannelSMS:
if err := s.SendSMS(ctx, notification.RecipientID, notification.Payload.Message); err == nil {
notification.DeliveryStatus = domain.DeliveryStatusSent
if _, err := s.repo.UpdateNotificationStatus(ctx, notification.ID, string(notification.DeliveryStatus), notification.IsRead, notification.Metadata); err != nil {
s.logger.Error("[NotificationSvc.RetryFailedNotifications] Failed to update after retry", "id", notification.ID, "error", err)
}
s.logger.Info("[NotificationSvc.RetryFailedNotifications] Successfully retried notification", "id", notification.ID)
return
}
case domain.DeliveryChannelEmail:
if err := s.SendEmail(ctx, notification.RecipientID, notification.Payload.Headline, notification.Payload.Message); err == nil {
notification.DeliveryStatus = domain.DeliveryStatusSent
if _, err := s.repo.UpdateNotificationStatus(ctx, notification.ID, string(notification.DeliveryStatus), notification.IsRead, notification.Metadata); err != nil {
s.logger.Error("[NotificationSvc.RetryFailedNotifications] Failed to update after retry", "id", notification.ID, "error", err)
}
s.logger.Info("[NotificationSvc.RetryFailedNotifications] Successfully retried notification", "id", notification.ID)
return
}
}
}
s.logger.Error("[NotificationSvc.RetryFailedNotifications] Max retries reached for notification", "id", notification.ID)
}(notification)
}
}
func (s *Service) CountUnreadNotifications(ctx context.Context, recipient_id int64) (int64, error) {
return s.repo.CountUnreadNotifications(ctx, recipient_id)
}
// func (s *Service) GetNotificationCounts(ctx context.Context, filter domain.ReportFilter) (total, read, unread int64, err error){
// return s.repo.Get(ctx, filter)
// }
func (s *Service) RunRedisSubscriber(ctx context.Context) {
pubsub := s.redisClient.Subscribe(ctx, "live_metrics")
defer pubsub.Close()
ch := pubsub.Channel()
for msg := range ch {
var parsed map[string]interface{}
if err := json.Unmarshal([]byte(msg.Payload), &parsed); err != nil {
s.logger.Error("invalid Redis message format", "payload", msg.Payload, "error", err)
continue
}
eventType, _ := parsed["type"].(string)
payload := parsed["payload"]
recipientID, hasRecipient := parsed["recipient_id"]
recipientType, _ := parsed["recipient_type"].(string)
message := map[string]interface{}{
"type": eventType,
"payload": payload,
}
if hasRecipient {
message["recipient_id"] = recipientID
message["recipient_type"] = recipientType
}
s.Hub.Broadcast <- message
}
}
func (s *Service) UpdateLiveWalletMetrics(ctx context.Context, companies []domain.GetCompany, branches []domain.BranchWallet) error {
const key = "live_metrics"
companyBalances := make([]domain.CompanyWalletBalance, 0, len(companies))
for _, c := range companies {
companyBalances = append(companyBalances, domain.CompanyWalletBalance{
CompanyID: c.ID,
CompanyName: c.Name,
Balance: float64(c.WalletBalance.Float32()),
})
}
branchBalances := make([]domain.BranchWalletBalance, 0, len(branches))
for _, b := range branches {
branchBalances = append(branchBalances, domain.BranchWalletBalance{
BranchID: b.ID,
BranchName: b.Name,
CompanyID: b.CompanyID,
Balance: float64(b.Balance.Float32()),
})
}
payload := domain.LiveWalletMetrics{
Timestamp: time.Now(),
CompanyBalances: companyBalances,
BranchBalances: branchBalances,
}
updatedData, err := json.Marshal(payload)
if err != nil {
return err
}
if err := s.redisClient.Set(ctx, key, updatedData, 0).Err(); err != nil {
return err
}
if err := s.redisClient.Publish(ctx, key, updatedData).Err(); err != nil {
return err
}
return nil
}
func (s *Service) GetLiveMetrics(ctx context.Context) (domain.LiveMetric, error) {
const key = "live_metrics"
var metric domain.LiveMetric
val, err := s.redisClient.Get(ctx, key).Result()
if err == redis.Nil {
// Key does not exist yet, return zero-valued struct
return domain.LiveMetric{}, nil
} else if err != nil {
return domain.LiveMetric{}, err
}
if err := json.Unmarshal([]byte(val), &metric); err != nil {
return domain.LiveMetric{}, err
}
return metric, nil
}
func (s *Service) UpdateLiveWalletMetricForWallet(ctx context.Context, wallet domain.Wallet) {
var (
payload domain.LiveWalletMetrics
event map[string]interface{}
key = "live_metrics"
)
// Try company first
company, companyErr := s.notificationStore.GetCompanyByWalletID(ctx, wallet.ID)
if companyErr == nil {
payload = domain.LiveWalletMetrics{
Timestamp: time.Now(),
CompanyBalances: []domain.CompanyWalletBalance{{
CompanyID: company.ID,
CompanyName: company.Name,
Balance: float64(wallet.Balance),
}},
BranchBalances: []domain.BranchWalletBalance{},
}
event = map[string]interface{}{
"type": "LIVE_WALLET_METRICS_UPDATE",
"recipient_id": company.ID,
"recipient_type": "company",
"payload": payload,
}
} else {
// Try branch next
branch, branchErr := s.notificationStore.GetBranchByWalletID(ctx, wallet.ID)
if branchErr == nil {
payload = domain.LiveWalletMetrics{
Timestamp: time.Now(),
CompanyBalances: []domain.CompanyWalletBalance{},
BranchBalances: []domain.BranchWalletBalance{{
BranchID: branch.ID,
BranchName: branch.Name,
CompanyID: branch.CompanyID,
Balance: float64(wallet.Balance),
}},
}
event = map[string]interface{}{
"type": "LIVE_WALLET_METRICS_UPDATE",
"recipient_id": branch.ID,
"recipient_type": "branch",
"payload": payload,
}
} else {
// Neither company nor branch matched this wallet
s.logger.Warn("wallet not linked to any company or branch", "walletID", wallet.ID)
return
}
}
// Save latest metric to Redis
if jsonBytes, err := json.Marshal(payload); err == nil {
s.redisClient.Set(ctx, key, jsonBytes, 0)
} else {
s.logger.Error("failed to marshal wallet metrics payload", "walletID", wallet.ID, "err", err)
}
// Publish via Redis
if jsonEvent, err := json.Marshal(event); err == nil {
s.redisClient.Publish(ctx, key, jsonEvent)
} else {
s.logger.Error("failed to marshal event payload", "walletID", wallet.ID, "err", err)
}
// Broadcast over WebSocket
s.Hub.Broadcast <- event
}
func (s *Service) GetCompanyByWalletID(ctx context.Context, walletID int64) (domain.Company, error) {
return s.notificationStore.GetCompanyByWalletID(ctx, walletID)
}
func (s *Service) GetBranchByWalletID(ctx context.Context, walletID int64) (domain.Branch, error) {
return s.notificationStore.GetBranchByWalletID(ctx, walletID)
}

View File

@ -8,8 +8,6 @@ import (
)
type NotificationStore interface {
GetCompanyByWalletID(ctx context.Context, walletID int64) (domain.Company, error)
GetBranchByWalletID(ctx context.Context, walletID int64) (domain.Branch, error)
SendNotification(ctx context.Context, notification *domain.Notification) error
MarkAsRead(ctx context.Context, notificationID string, recipientID int64) error
ListNotifications(ctx context.Context, recipientID int64, limit, offset int) ([]domain.Notification, error)

View File

@ -0,0 +1,646 @@
package notificationservice
import (
"context"
"encoding/json"
"fmt"
// "errors"
"log/slog"
"sync"
"time"
"github.com/SamuelTariku/FortuneBet-Backend/internal/config"
"github.com/SamuelTariku/FortuneBet-Backend/internal/domain"
"github.com/SamuelTariku/FortuneBet-Backend/internal/pkgs/helpers"
"github.com/SamuelTariku/FortuneBet-Backend/internal/repository"
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/messenger"
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/user"
"go.uber.org/zap"
// "github.com/SamuelTariku/FortuneBet-Backend/internal/services/wallet"
"github.com/SamuelTariku/FortuneBet-Backend/internal/web_server/ws"
// afro "github.com/amanuelabay/afrosms-go"
"github.com/gorilla/websocket"
"github.com/redis/go-redis/v9"
)
type Service struct {
repo repository.NotificationRepository
Hub *ws.NotificationHub
notificationStore NotificationStore
connections sync.Map
notificationCh chan *domain.Notification
stopCh chan struct{}
config *config.Config
userSvc *user.Service
messengerSvc *messenger.Service
mongoLogger *zap.Logger
logger *slog.Logger
redisClient *redis.Client
}
func New(repo repository.NotificationRepository,
mongoLogger *zap.Logger,
logger *slog.Logger,
cfg *config.Config,
messengerSvc *messenger.Service,
userSvc *user.Service,
) *Service {
hub := ws.NewNotificationHub()
rdb := redis.NewClient(&redis.Options{
Addr: cfg.RedisAddr, // e.g., "redis:6379"
})
svc := &Service{
repo: repo,
Hub: hub,
mongoLogger: mongoLogger,
logger: logger,
connections: sync.Map{},
notificationCh: make(chan *domain.Notification, 1000),
stopCh: make(chan struct{}),
messengerSvc: messengerSvc,
userSvc: userSvc,
config: cfg,
redisClient: rdb,
}
go hub.Run()
go svc.startWorker()
go svc.startRetryWorker()
go svc.RunRedisSubscriber(context.Background())
return svc
}
func (s *Service) addConnection(recipientID int64, c *websocket.Conn) error {
if c == nil {
s.mongoLogger.Warn("[NotificationSvc.AddConnection] Attempted to add nil WebSocket connection",
zap.Int64("recipientID", recipientID),
zap.Time("timestamp", time.Now()),
)
return fmt.Errorf("Invalid Websocket Connection")
}
s.connections.Store(recipientID, c)
s.mongoLogger.Info("[NotificationSvc.AddConnection] Added WebSocket connection",
zap.Int64("recipientID", recipientID),
zap.Time("timestamp", time.Now()),
)
return nil
}
func (s *Service) SendNotification(ctx context.Context, notification *domain.Notification) error {
notification.ID = helpers.GenerateID()
notification.Timestamp = time.Now()
notification.DeliveryStatus = domain.DeliveryStatusPending
created, err := s.repo.CreateNotification(ctx, notification)
if err != nil {
s.mongoLogger.Error("[NotificationSvc.SendNotification] Failed to create notification",
zap.String("id", notification.ID),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return err
}
notification = created
if notification.DeliveryChannel == domain.DeliveryChannelInApp {
s.Hub.Broadcast <- map[string]interface{}{
"type": "CREATED_NOTIFICATION",
"recipient_id": notification.RecipientID,
"payload": notification,
}
}
select {
case s.notificationCh <- notification:
default:
s.mongoLogger.Warn("[NotificationSvc.SendNotification] Notification channel full, dropping notification",
zap.String("id", notification.ID),
zap.Time("timestamp", time.Now()),
)
}
return nil
}
func (s *Service) MarkAsRead(ctx context.Context, notificationIDs []string, recipientID int64) error {
for _, notificationID := range notificationIDs {
_, err := s.repo.UpdateNotificationStatus(ctx, notificationID, string(domain.DeliveryStatusSent), true, nil)
if err != nil {
s.mongoLogger.Error("[NotificationSvc.MarkAsRead] Failed to mark notification as read",
zap.String("notificationID", notificationID),
zap.Int64("recipientID", recipientID),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return err
}
// count, err := s.repo.CountUnreadNotifications(ctx, recipientID)
// if err != nil {
// s.logger.Error("[NotificationSvc.MarkAsRead] Failed to count unread notifications", "recipientID", recipientID, "error", err)
// return err
// }
// s.Hub.Broadcast <- map[string]interface{}{
// "type": "COUNT_NOT_OPENED_NOTIFICATION",
// "recipient_id": recipientID,
// "payload": map[string]int{
// "not_opened_notifications_count": int(count),
// },
// }
s.mongoLogger.Info("[NotificationSvc.MarkAsRead] Notification marked as read",
zap.String("notificationID", notificationID),
zap.Int64("recipientID", recipientID),
zap.Time("timestamp", time.Now()),
)
}
return nil
}
func (s *Service) ListNotifications(ctx context.Context, recipientID int64, limit, offset int) ([]domain.Notification, error) {
notifications, err := s.repo.ListNotifications(ctx, recipientID, limit, offset)
if err != nil {
s.mongoLogger.Error("[NotificationSvc.ListNotifications] Failed to list notifications",
zap.Int64("recipientID", recipientID),
zap.Int("limit", limit),
zap.Int("offset", offset),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return nil, err
}
s.mongoLogger.Info("[NotificationSvc.ListNotifications] Successfully listed notifications",
zap.Int64("recipientID", recipientID),
zap.Int("count", len(notifications)),
zap.Time("timestamp", time.Now()),
)
return notifications, nil
}
func (s *Service) GetAllNotifications(ctx context.Context, limit, offset int) ([]domain.Notification, error) {
notifications, err := s.repo.GetAllNotifications(ctx, limit, offset)
if err != nil {
s.mongoLogger.Error("[NotificationSvc.ListNotifications] Failed to get all notifications",
zap.Int("limit", limit),
zap.Int("offset", offset),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return nil, err
}
s.mongoLogger.Info("[NotificationSvc.ListNotifications] Successfully retrieved all notifications",
zap.Int("count", len(notifications)),
zap.Time("timestamp", time.Now()),
)
return notifications, nil
}
func (s *Service) ConnectWebSocket(ctx context.Context, recipientID int64, c *websocket.Conn) error {
err := s.addConnection(recipientID, c)
if err != nil {
s.mongoLogger.Error("[NotificationSvc.ConnectWebSocket] Failed to create WebSocket connection",
zap.Int64("recipientID", recipientID),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return err
}
s.mongoLogger.Info("[NotificationSvc.ConnectWebSocket] WebSocket connection established",
zap.Int64("recipientID", recipientID),
zap.Time("timestamp", time.Now()),
)
return nil
}
func (s *Service) DisconnectWebSocket(recipientID int64) {
if conn, loaded := s.connections.LoadAndDelete(recipientID); loaded {
conn.(*websocket.Conn).Close()
// s.logger.Info("[NotificationSvc.DisconnectWebSocket] Disconnected WebSocket", "recipientID", recipientID)
s.mongoLogger.Info("[NotificationSvc.DisconnectWebSocket] Disconnected WebSocket",
zap.Int64("recipientID", recipientID),
zap.Time("timestamp", time.Now()),
)
}
}
// func (s *Service) SendSMS(ctx context.Context, recipientID int64, message string) error {
// s.logger.Info("[NotificationSvc.SendSMS] SMS notification requested", "recipientID", recipientID, "message", message)
// apiKey := s.config.AFRO_SMS_API_KEY
// senderName := s.config.AFRO_SMS_SENDER_NAME
// receiverPhone := s.config.AFRO_SMS_RECEIVER_PHONE_NUMBER
// hostURL := s.config.ADRO_SMS_HOST_URL
// endpoint := "/api/send"
// request := afro.GetRequest(apiKey, endpoint, hostURL)
// request.Method = "GET"
// request.Sender(senderName)
// request.To(receiverPhone, message)
// response, err := afro.MakeRequestWithContext(ctx, request)
// if err != nil {
// s.logger.Error("[NotificationSvc.SendSMS] Failed to send SMS", "recipientID", recipientID, "error", err)
// return err
// }
// if response["acknowledge"] == "success" {
// s.logger.Info("[NotificationSvc.SendSMS] SMS sent successfully", "recipientID", recipientID)
// } else {
// s.logger.Error("[NotificationSvc.SendSMS] Failed to send SMS", "recipientID", recipientID, "response", response["response"])
// return errors.New("SMS delivery failed: " + response["response"].(string))
// }
// return nil
// }
// func (s *Service) SendEmail(ctx context.Context, recipientID int64, subject, message string) error {
// s.logger.Info("[NotificationSvc.SendEmail] Email notification requested", "recipientID", recipientID, "subject", subject)
// return nil
// }
func (s *Service) startWorker() {
for {
select {
case notification := <-s.notificationCh:
s.handleNotification(notification)
case <-s.stopCh:
s.mongoLogger.Info("[NotificationSvc.StartWorker] Worker stopped",
zap.Time("timestamp", time.Now()),
)
return
}
}
}
func (s *Service) ListRecipientIDs(ctx context.Context, receiver domain.NotificationRecieverSide) ([]int64, error) {
return s.repo.ListRecipientIDs(ctx, receiver)
}
func (s *Service) handleNotification(notification *domain.Notification) {
ctx := context.Background()
switch notification.DeliveryChannel {
case domain.DeliveryChannelSMS:
err := s.SendNotificationSMS(ctx, notification.RecipientID, notification.Payload.Message)
if err != nil {
notification.DeliveryStatus = domain.DeliveryStatusFailed
} else {
notification.DeliveryStatus = domain.DeliveryStatusSent
}
case domain.DeliveryChannelEmail:
err := s.SendNotificationEmail(ctx, notification.RecipientID, notification.Payload.Headline, notification.Payload.Message)
if err != nil {
notification.DeliveryStatus = domain.DeliveryStatusFailed
} else {
notification.DeliveryStatus = domain.DeliveryStatusSent
}
default:
if notification.DeliveryChannel != domain.DeliveryChannelInApp {
s.mongoLogger.Warn("[NotificationSvc.HandleNotification] Unsupported delivery channel",
zap.String("channel", string(notification.DeliveryChannel)),
zap.Time("timestamp", time.Now()),
)
notification.DeliveryStatus = domain.DeliveryStatusFailed
}
}
if _, err := s.repo.UpdateNotificationStatus(ctx, notification.ID, string(notification.DeliveryStatus), notification.IsRead, notification.Metadata); err != nil {
s.mongoLogger.Error("[NotificationSvc.HandleNotification] Failed to update notification status",
zap.String("id", notification.ID),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
}
}
func (s *Service) SendNotificationSMS(ctx context.Context, recipientID int64, message string) error {
// Get User Phone Number
user, err := s.userSvc.GetUserByID(ctx, recipientID)
if err != nil {
return err
}
if !user.PhoneVerified {
return fmt.Errorf("Cannot send notification to unverified phone number")
}
if user.PhoneNumber == "" {
return fmt.Errorf("Phone Number is invalid")
}
err = s.messengerSvc.SendSMS(ctx, user.PhoneNumber, message)
if err != nil {
return err
}
return nil
}
func (s *Service) SendNotificationEmail(ctx context.Context, recipientID int64, message string, subject string) error {
// Get User Phone Number
user, err := s.userSvc.GetUserByID(ctx, recipientID)
if err != nil {
return err
}
if !user.EmailVerified {
return fmt.Errorf("Cannot send notification to unverified email")
}
if user.PhoneNumber == "" {
return fmt.Errorf("Email is invalid")
}
err = s.messengerSvc.SendEmail(ctx, user.PhoneNumber, message, subject)
if err != nil {
return err
}
return nil
}
func (s *Service) startRetryWorker() {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.retryFailedNotifications()
case <-s.stopCh:
s.mongoLogger.Info("[NotificationSvc.StartRetryWorker] Retry worker stopped",
zap.Time("timestamp", time.Now()),
)
return
}
}
}
func (s *Service) retryFailedNotifications() {
ctx := context.Background()
failedNotifications, err := s.repo.ListFailedNotifications(ctx, 100)
if err != nil {
s.mongoLogger.Error("[NotificationSvc.RetryFailedNotifications] Failed to list failed notifications",
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return
}
for _, n := range failedNotifications {
notification := &n
go func(notification *domain.Notification) {
for attempt := 0; attempt < 3; attempt++ {
time.Sleep(time.Duration(attempt) * time.Second)
switch notification.DeliveryChannel {
case domain.DeliveryChannelSMS:
if err := s.SendNotificationSMS(ctx, notification.RecipientID, notification.Payload.Message); err == nil {
notification.DeliveryStatus = domain.DeliveryStatusSent
if _, err := s.repo.UpdateNotificationStatus(ctx, notification.ID, string(notification.DeliveryStatus), notification.IsRead, notification.Metadata); err != nil {
s.mongoLogger.Error("[NotificationSvc.RetryFailedNotifications] Failed to update after retry",
zap.String("id", notification.ID),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
} else {
s.mongoLogger.Info("[NotificationSvc.RetryFailedNotifications] Successfully retried notification",
zap.String("id", notification.ID),
zap.Time("timestamp", time.Now()),
)
}
return
}
case domain.DeliveryChannelEmail:
if err := s.SendNotificationEmail(ctx, notification.RecipientID, notification.Payload.Headline, notification.Payload.Message); err == nil {
notification.DeliveryStatus = domain.DeliveryStatusSent
if _, err := s.repo.UpdateNotificationStatus(ctx, notification.ID, string(notification.DeliveryStatus), notification.IsRead, notification.Metadata); err != nil {
s.mongoLogger.Error("[NotificationSvc.RetryFailedNotifications] Failed to update after retry",
zap.String("id", notification.ID),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
} else {
s.mongoLogger.Info("[NotificationSvc.RetryFailedNotifications] Successfully retried notification",
zap.String("id", notification.ID),
zap.Time("timestamp", time.Now()),
)
}
return
}
}
}
s.mongoLogger.Error("[NotificationSvc.RetryFailedNotifications] Max retries reached for notification",
zap.String("id", notification.ID),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
}(notification)
}
}
func (s *Service) CountUnreadNotifications(ctx context.Context, recipient_id int64) (int64, error) {
return s.repo.CountUnreadNotifications(ctx, recipient_id)
}
// func (s *Service) GetNotificationCounts(ctx context.Context, filter domain.ReportFilter) (total, read, unread int64, err error){
// return s.repo.Get(ctx, filter)
// }
func (s *Service) RunRedisSubscriber(ctx context.Context) {
pubsub := s.redisClient.Subscribe(ctx, "live_metrics")
defer pubsub.Close()
ch := pubsub.Channel()
for msg := range ch {
var parsed map[string]interface{}
if err := json.Unmarshal([]byte(msg.Payload), &parsed); err != nil {
// s.logger.Error("invalid Redis message format", "payload", msg.Payload, "error", err)
s.mongoLogger.Error("invalid Redis message format",
zap.String("payload", msg.Payload),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
continue
}
eventType, _ := parsed["type"].(string)
payload := parsed["payload"]
recipientID, hasRecipient := parsed["recipient_id"]
recipientType, _ := parsed["recipient_type"].(string)
message := map[string]interface{}{
"type": eventType,
"payload": payload,
}
if hasRecipient {
message["recipient_id"] = recipientID
message["recipient_type"] = recipientType
}
s.Hub.Broadcast <- message
}
}
func (s *Service) UpdateLiveWalletMetrics(ctx context.Context, companies []domain.GetCompany, branches []domain.BranchWallet) error {
const key = "live_metrics"
companyBalances := make([]domain.CompanyWalletBalance, 0, len(companies))
for _, c := range companies {
companyBalances = append(companyBalances, domain.CompanyWalletBalance{
CompanyID: c.ID,
CompanyName: c.Name,
Balance: float64(c.WalletBalance.Float32()),
})
}
branchBalances := make([]domain.BranchWalletBalance, 0, len(branches))
for _, b := range branches {
branchBalances = append(branchBalances, domain.BranchWalletBalance{
BranchID: b.ID,
BranchName: b.Name,
CompanyID: b.CompanyID,
Balance: float64(b.Balance.Float32()),
})
}
payload := domain.LiveWalletMetrics{
Timestamp: time.Now(),
CompanyBalances: companyBalances,
BranchBalances: branchBalances,
}
updatedData, err := json.Marshal(payload)
if err != nil {
return err
}
if err := s.redisClient.Set(ctx, key, updatedData, 0).Err(); err != nil {
return err
}
if err := s.redisClient.Publish(ctx, key, updatedData).Err(); err != nil {
return err
}
return nil
}
func (s *Service) GetLiveMetrics(ctx context.Context) (domain.LiveMetric, error) {
const key = "live_metrics"
var metric domain.LiveMetric
val, err := s.redisClient.Get(ctx, key).Result()
if err == redis.Nil {
// Key does not exist yet, return zero-valued struct
return domain.LiveMetric{}, nil
} else if err != nil {
return domain.LiveMetric{}, err
}
if err := json.Unmarshal([]byte(val), &metric); err != nil {
return domain.LiveMetric{}, err
}
return metric, nil
}
// func (s *Service) UpdateLiveWalletMetricForWallet(ctx context.Context, wallet domain.Wallet) {
// var (
// payload domain.LiveWalletMetrics
// event map[string]interface{}
// key = "live_metrics"
// )
// // Try company first
// company, companyErr := s.notificationStore.GetCompanyByWalletID(ctx, wallet.ID)
// if companyErr == nil {
// payload = domain.LiveWalletMetrics{
// Timestamp: time.Now(),
// CompanyBalances: []domain.CompanyWalletBalance{{
// CompanyID: company.ID,
// CompanyName: company.Name,
// Balance: float64(wallet.Balance),
// }},
// BranchBalances: []domain.BranchWalletBalance{},
// }
// event = map[string]interface{}{
// "type": "LIVE_WALLET_METRICS_UPDATE",
// "recipient_id": company.ID,
// "recipient_type": "company",
// "payload": payload,
// }
// } else {
// // Try branch next
// branch, branchErr := s.notificationStore.GetBranchByWalletID(ctx, wallet.ID)
// if branchErr == nil {
// payload = domain.LiveWalletMetrics{
// Timestamp: time.Now(),
// CompanyBalances: []domain.CompanyWalletBalance{},
// BranchBalances: []domain.BranchWalletBalance{{
// BranchID: branch.ID,
// BranchName: branch.Name,
// CompanyID: branch.CompanyID,
// Balance: float64(wallet.Balance),
// }},
// }
// event = map[string]interface{}{
// "type": "LIVE_WALLET_METRICS_UPDATE",
// "recipient_id": branch.ID,
// "recipient_type": "branch",
// "payload": payload,
// }
// } else {
// // Neither company nor branch matched this wallet
// // s.logger.Warn("wallet not linked to any company or branch", "walletID", wallet.ID)
// s.mongoLogger.Warn("wallet not linked to any company or branch",
// zap.Int64("walletID", wallet.ID),
// zap.Time("timestamp", time.Now()),
// )
// return
// }
// }
// // Save latest metric to Redis
// if jsonBytes, err := json.Marshal(payload); err == nil {
// s.redisClient.Set(ctx, key, jsonBytes, 0)
// } else {
// // s.logger.Error("failed to marshal wallet metrics payload", "walletID", wallet.ID, "err", err)
// s.mongoLogger.Error("failed to marshal wallet metrics payload",
// zap.Int64("walletID", wallet.ID),
// zap.Error(err),
// zap.Time("timestamp", time.Now()),
// )
// }
// // Publish via Redis
// if jsonEvent, err := json.Marshal(event); err == nil {
// s.redisClient.Publish(ctx, key, jsonEvent)
// } else {
// // s.logger.Error("failed to marshal event payload", "walletID", wallet.ID, "err", err)
// s.mongoLogger.Error("failed to marshal event payload",
// zap.Int64("walletID", wallet.ID),
// zap.Error(err),
// zap.Time("timestamp", time.Now()),
// )
// }
// // Broadcast over WebSocket
// s.Hub.Broadcast <- event
// }

View File

@ -16,7 +16,7 @@ import (
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/bet"
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/event"
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/league"
notificationservice "github.com/SamuelTariku/FortuneBet-Backend/internal/services/notfication"
notificationservice "github.com/SamuelTariku/FortuneBet-Backend/internal/services/notification"
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/odds"
)

View File

@ -9,7 +9,7 @@ import (
"github.com/SamuelTariku/FortuneBet-Backend/internal/domain"
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/event"
notificationservice "github.com/SamuelTariku/FortuneBet-Backend/internal/services/notfication"
notificationservice "github.com/SamuelTariku/FortuneBet-Backend/internal/services/notification"
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/odds"
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/settings"
"go.uber.org/zap"

View File

@ -2,40 +2,36 @@ package user
import (
"context"
"errors"
"fmt"
"time"
"github.com/SamuelTariku/FortuneBet-Backend/internal/domain"
"github.com/SamuelTariku/FortuneBet-Backend/internal/pkgs/helpers"
afro "github.com/amanuelabay/afrosms-go"
"github.com/resend/resend-go/v2"
"github.com/twilio/twilio-go"
twilioApi "github.com/twilio/twilio-go/rest/api/v2010"
"golang.org/x/crypto/bcrypt"
)
func (s *Service) SendOtp(ctx context.Context, sentTo string, otpFor domain.OtpFor, medium domain.OtpMedium, provider domain.OtpProvider) error {
func (s *Service) SendOtp(ctx context.Context, sentTo string, otpFor domain.OtpFor, medium domain.OtpMedium, provider domain.SMSProvider) error {
otpCode := helpers.GenerateOTP()
message := fmt.Sprintf("Welcome to Fortune bets, your OTP is %s please don't share with anyone.", otpCode)
switch medium {
case domain.OtpMediumSms:
switch provider {
case domain.TwilioSms:
if err := s.SendTwilioSMSOTP(ctx, sentTo, message, provider); err != nil {
if err := s.messengerSvc.SendTwilioSMS(ctx, sentTo, message); err != nil {
return err
}
case domain.AfroMessage:
if err := s.SendAfroMessageSMSOTP(ctx, sentTo, message, provider); err != nil {
if err := s.messengerSvc.SendAfroMessageSMS(ctx, sentTo, message); err != nil {
return err
}
default:
return fmt.Errorf("invalid sms provider: %s", provider)
}
case domain.OtpMediumEmail:
if err := s.SendEmailOTP(ctx, sentTo, message); err != nil {
if err := s.messengerSvc.SendEmail(ctx, sentTo, message, "FortuneBets - One Time Password"); err != nil {
return err
}
}
@ -61,73 +57,3 @@ func hashPassword(plaintextPassword string) ([]byte, error) {
return hash, nil
}
func (s *Service) SendAfroMessageSMSOTP(ctx context.Context, receiverPhone, message string, provider domain.OtpProvider) error {
apiKey := s.config.AFRO_SMS_API_KEY
senderName := s.config.AFRO_SMS_SENDER_NAME
hostURL := s.config.ADRO_SMS_HOST_URL
endpoint := "/api/send"
// API endpoint has been updated
// TODO: no need for package for the afro message operations (pretty simple stuff)
request := afro.GetRequest(apiKey, endpoint, hostURL)
request.BaseURL = "https://api.afromessage.com/api/send"
request.Method = "GET"
request.Sender(senderName)
request.To(receiverPhone, message)
response, err := afro.MakeRequestWithContext(ctx, request)
if err != nil {
return err
}
if response["acknowledge"] == "success" {
return nil
} else {
fmt.Println(response["response"].(map[string]interface{}))
return errors.New("SMS delivery failed")
}
}
func (s *Service) SendTwilioSMSOTP(ctx context.Context, receiverPhone, message string, provider domain.OtpProvider) error {
accountSid := s.config.TwilioAccountSid
authToken := s.config.TwilioAuthToken
senderPhone := s.config.TwilioSenderPhoneNumber
client := twilio.NewRestClientWithParams(twilio.ClientParams{
Username: accountSid,
Password: authToken,
})
params := &twilioApi.CreateMessageParams{}
params.SetTo(receiverPhone)
params.SetFrom(senderPhone)
params.SetBody(message)
_, err := client.Api.CreateMessage(params)
if err != nil {
return fmt.Errorf("%s", "Error sending SMS message: %s"+err.Error())
}
return nil
}
func (s *Service) SendEmailOTP(ctx context.Context, receiverEmail, message string) error {
apiKey := s.config.ResendApiKey
client := resend.NewClient(apiKey)
formattedSenderEmail := "FortuneBets <" + s.config.ResendSenderEmail + ">"
params := &resend.SendEmailRequest{
From: formattedSenderEmail,
To: []string{receiverEmail},
Subject: "FortuneBets - One Time Password",
Text: message,
}
_, err := client.Emails.Send(params)
if err != nil {
return err
}
return nil
}

View File

@ -43,8 +43,6 @@ func (s *Service) DeleteUser(ctx context.Context, id int64) error {
return s.userStore.DeleteUser(ctx, id)
}
func (s *Service) GetAllUsers(ctx context.Context, filter domain.UserFilter) ([]domain.User, int64, error) {
// Get all Users
return s.userStore.GetAllUsers(ctx, filter)
@ -58,7 +56,10 @@ func (s *Service) GetCashiersByBranch(ctx context.Context, branchID int64) ([]do
return s.userStore.GetCashiersByBranch(ctx, branchID)
}
func (s *Service) GetAllCashiers(ctx context.Context, filter domain.UserFilter) ([]domain.GetCashier, int64, error){
func (s *Service) GetAdminByCompanyID(ctx context.Context, companyID int64) (domain.User, error) {
return s.userStore.GetAdminByCompanyID(ctx, companyID)
}
func (s *Service) GetAllCashiers(ctx context.Context, filter domain.UserFilter) ([]domain.GetCashier, int64, error) {
return s.userStore.GetAllCashiers(ctx, filter)
}

View File

@ -14,6 +14,7 @@ type UserStore interface {
GetAllCashiers(ctx context.Context, filter domain.UserFilter) ([]domain.GetCashier, int64, error)
GetCashierByID(ctx context.Context, cashierID int64) (domain.GetCashier, error)
GetCashiersByBranch(ctx context.Context, branchID int64) ([]domain.User, error)
GetAdminByCompanyID(ctx context.Context, companyID int64) (domain.User, error)
UpdateUser(ctx context.Context, user domain.UpdateUserReq) error
UpdateUserCompany(ctx context.Context, id int64, companyID int64) error
UpdateUserSuspend(ctx context.Context, id int64, status bool) error

View File

@ -10,7 +10,7 @@ import (
func (s *Service) CheckPhoneEmailExist(ctx context.Context, phoneNum, email string) (bool, bool, error) { // email,phone,error
return s.userStore.CheckPhoneEmailExist(ctx, phoneNum, email)
}
func (s *Service) SendRegisterCode(ctx context.Context, medium domain.OtpMedium, sentTo string, provider domain.OtpProvider) error {
func (s *Service) SendRegisterCode(ctx context.Context, medium domain.OtpMedium, sentTo string, provider domain.SMSProvider) error {
var err error
// check if user exists
switch medium {

View File

@ -8,7 +8,7 @@ import (
"github.com/SamuelTariku/FortuneBet-Backend/internal/domain"
)
func (s *Service) SendResetCode(ctx context.Context, medium domain.OtpMedium, sentTo string, provider domain.OtpProvider) error {
func (s *Service) SendResetCode(ctx context.Context, medium domain.OtpMedium, sentTo string, provider domain.SMSProvider) error {
var err error
// check if user exists

View File

@ -4,6 +4,7 @@ import (
"time"
"github.com/SamuelTariku/FortuneBet-Backend/internal/config"
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/messenger"
)
const (
@ -11,19 +12,22 @@ const (
)
type Service struct {
userStore UserStore
otpStore OtpStore
config *config.Config
userStore UserStore
otpStore OtpStore
messengerSvc *messenger.Service
config *config.Config
}
func NewService(
userStore UserStore,
otpStore OtpStore,
messengerSvc *messenger.Service,
cfg *config.Config,
) *Service {
return &Service{
userStore: userStore,
otpStore: otpStore,
config: cfg,
userStore: userStore,
otpStore: otpStore,
messengerSvc: messengerSvc,
config: cfg,
}
}

View File

@ -254,7 +254,7 @@ func (s *service) ProcessBet(ctx context.Context, req *domain.PopOKBetRequest) (
return &domain.PopOKBetResponse{}, fmt.Errorf("Failed to read user wallets")
}
_, err = s.walletSvc.DeductFromWallet(ctx, claims.UserID, domain.Currency(amountCents),
domain.CustomerWalletType, domain.ValidInt64{}, domain.TRANSFER_DIRECT,
domain.ValidInt64{}, domain.TRANSFER_DIRECT,
fmt.Sprintf("Deducted %v amount from wallet by system while placing virtual game bet", amountCents))
if err != nil {
return nil, fmt.Errorf("insufficient balance")

View File

@ -115,7 +115,7 @@ func (c *Client) ProcessBet(ctx context.Context, req domain.BetRequest) (*domain
return &domain.BetResponse{}, err
}
c.walletSvc.DeductFromWallet(ctx, wallets[0].ID, domain.Currency(req.Amount.Amount), domain.CustomerWalletType, domain.ValidInt64{}, domain.TRANSFER_DIRECT,
c.walletSvc.DeductFromWallet(ctx, wallets[0].ID, domain.Currency(req.Amount.Amount), domain.ValidInt64{}, domain.TRANSFER_DIRECT,
fmt.Sprintf("Deducting %v from wallet for creating Veli Game Bet", req.Amount.Amount),
)

View File

@ -10,7 +10,7 @@ import (
"github.com/SamuelTariku/FortuneBet-Backend/internal/domain"
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/branch"
notificationservice "github.com/SamuelTariku/FortuneBet-Backend/internal/services/notfication"
notificationservice "github.com/SamuelTariku/FortuneBet-Backend/internal/services/notification"
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/wallet"
)

View File

@ -7,8 +7,8 @@ import (
)
type WalletStore interface {
// GetCompanyByWalletID(ctx context.Context, walletID int64) (domain.Company, error)
// GetBranchByWalletID(ctx context.Context, walletID int64) (domain.Branch, error)
GetCompanyByWalletID(ctx context.Context, walletID int64) (domain.Company, error)
GetBranchByWalletID(ctx context.Context, walletID int64) (domain.Branch, error)
CreateWallet(ctx context.Context, wallet domain.CreateWallet) (domain.Wallet, error)
CreateCustomerWallet(ctx context.Context, customerWallet domain.CreateCustomerWallet) (domain.CustomerWallet, error)
GetWalletByID(ctx context.Context, id int64) (domain.Wallet, error)

View File

@ -3,7 +3,9 @@ package wallet
import (
"log/slog"
notificationservice "github.com/SamuelTariku/FortuneBet-Backend/internal/services/notfication"
notificationservice "github.com/SamuelTariku/FortuneBet-Backend/internal/services/notification"
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/user"
"go.uber.org/zap"
)
type Service struct {
@ -12,17 +14,29 @@ type Service struct {
transferStore TransferStore
notificationStore notificationservice.NotificationStore
notificationSvc *notificationservice.Service
userSvc *user.Service
mongoLogger *zap.Logger
logger *slog.Logger
// userStore user.UserStore
}
func NewService(walletStore WalletStore, transferStore TransferStore, notificationStore notificationservice.NotificationStore, notificationSvc *notificationservice.Service, logger *slog.Logger) *Service {
func NewService(
walletStore WalletStore,
transferStore TransferStore,
notificationStore notificationservice.NotificationStore,
notificationSvc *notificationservice.Service,
userSvc *user.Service,
mongoLogger *zap.Logger,
logger *slog.Logger,
) *Service {
return &Service{
walletStore: walletStore,
transferStore: transferStore,
// approvalStore: approvalStore,
notificationStore: notificationStore,
notificationSvc: notificationSvc,
userSvc: userSvc,
mongoLogger: mongoLogger,
logger: logger,
// userStore: userStore,
// userStore users

View File

@ -4,8 +4,10 @@ import (
"context"
"errors"
"fmt"
"time"
"github.com/SamuelTariku/FortuneBet-Backend/internal/domain"
"go.uber.org/zap"
)
var (
@ -59,6 +61,14 @@ func (s *Service) GetWalletsByUser(ctx context.Context, id int64) ([]domain.Wall
return s.walletStore.GetWalletsByUser(ctx, id)
}
func (s *Service) GetCompanyByWalletID(ctx context.Context, walletID int64) (domain.Company, error) {
return s.walletStore.GetCompanyByWalletID(ctx, walletID)
}
func (s *Service) GetBranchByWalletID(ctx context.Context, walletID int64) (domain.Branch, error) {
return s.walletStore.GetBranchByWalletID(ctx, walletID)
}
func (s *Service) GetAllCustomerWallet(ctx context.Context) ([]domain.GetCustomerWallet, error) {
return s.walletStore.GetAllCustomerWallets(ctx)
}
@ -76,12 +86,12 @@ func (s *Service) UpdateBalance(ctx context.Context, id int64, balance domain.Cu
return err
}
wallet, err := s.GetWalletByID(ctx, id)
_, err = s.GetWalletByID(ctx, id)
if err != nil {
return err
}
go s.notificationSvc.UpdateLiveWalletMetricForWallet(ctx, wallet)
// go s.notificationSvc.UpdateLiveWalletMetricForWallet(ctx, wallet)
return nil
}
@ -117,7 +127,7 @@ func (s *Service) AddToWallet(
return newTransfer, err
}
func (s *Service) DeductFromWallet(ctx context.Context, id int64, amount domain.Currency, walletType domain.WalletType, cashierID domain.ValidInt64, paymentMethod domain.PaymentMethod, message string) (domain.Transfer, error) {
func (s *Service) DeductFromWallet(ctx context.Context, id int64, amount domain.Currency, cashierID domain.ValidInt64, paymentMethod domain.PaymentMethod, message string) (domain.Transfer, error) {
wallet, err := s.GetWalletByID(ctx, id)
if err != nil {
return domain.Transfer{}, err
@ -125,8 +135,10 @@ func (s *Service) DeductFromWallet(ctx context.Context, id int64, amount domain.
if wallet.Balance < amount {
// Send Wallet low to admin
if walletType == domain.CompanyWalletType || walletType == domain.BranchWalletType {
if wallet.Type == domain.CompanyWalletType || wallet.Type == domain.BranchWalletType {
s.SendAdminWalletInsufficientNotification(ctx, wallet, amount)
} else {
s.SendCustomerWalletInsufficientNotification(ctx, wallet, amount)
}
return domain.Transfer{}, ErrBalanceInsufficient
}
@ -215,6 +227,55 @@ func (s *Service) UpdateWalletActive(ctx context.Context, id int64, isActive boo
return s.walletStore.UpdateWalletActive(ctx, id, isActive)
}
func (s *Service) GetAdminNotificationRecipients(ctx context.Context, walletID int64, walletType domain.WalletType) ([]int64, error) {
var recipients []int64
if walletType == domain.BranchWalletType {
branch, err := s.GetBranchByWalletID(ctx, walletID)
if err != nil {
return nil, err
}
recipients = append(recipients, branch.BranchManagerID)
cashiers, err := s.userSvc.GetCashiersByBranch(ctx, branch.ID)
if err != nil {
return nil, err
}
for _, cashier := range cashiers {
recipients = append(recipients, cashier.ID)
}
admin, err := s.userSvc.GetAdminByCompanyID(ctx, branch.CompanyID)
if err != nil {
return nil, err
}
recipients = append(recipients, admin.ID)
} else if walletType == domain.CompanyWalletType {
company, err := s.GetCompanyByWalletID(ctx, walletID)
if err != nil {
return nil, err
}
recipients = append(recipients, company.AdminID)
} else {
return nil, fmt.Errorf("Invalid wallet type")
}
users, _, err := s.userSvc.GetAllUsers(ctx, domain.UserFilter{
Role: string(domain.RoleSuperAdmin),
})
if err != nil {
return nil, err
}
for _, user := range users {
recipients = append(recipients, user.ID)
}
return recipients, nil
}
func (s *Service) SendAdminWalletLowNotification(ctx context.Context, adminWallet domain.Wallet) error {
// Send notification to admin team
adminNotification := &domain.Notification{
@ -222,7 +283,7 @@ func (s *Service) SendAdminWalletLowNotification(ctx context.Context, adminWalle
Type: domain.NOTIFICATION_TYPE_ADMIN_ALERT,
Level: domain.NotificationLevelWarning,
Reciever: domain.NotificationRecieverSideAdmin,
DeliveryChannel: domain.DeliveryChannelEmail, // Or any preferred admin channel
DeliveryChannel: domain.DeliveryChannelInApp, // Or any preferred admin channel
Payload: domain.NotificationPayload{
Headline: "CREDIT WARNING: System Running Out of Funds",
Message: fmt.Sprintf(
@ -240,35 +301,48 @@ func (s *Service) SendAdminWalletLowNotification(ctx context.Context, adminWalle
}
// Get admin recipients and send to all
adminRecipients, err := s.notificationStore.ListRecipientIDs(ctx, domain.NotificationRecieverSideAdmin)
adminRecipients, err := s.GetAdminNotificationRecipients(ctx, adminWallet.ID, adminWallet.Type)
if err != nil {
s.logger.Error("failed to get admin recipients", "error", err)
s.mongoLogger.Error("failed to get admin recipients",
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return err
} else {
for _, adminID := range adminRecipients {
adminNotification.RecipientID = adminID
if err := s.notificationStore.SendNotification(ctx, adminNotification); err != nil {
s.logger.Error("failed to send admin notification",
"admin_id", adminID,
"error", err)
}
}
for _, adminID := range adminRecipients {
adminNotification.RecipientID = adminID
if err := s.notificationStore.SendNotification(ctx, adminNotification); err != nil {
s.mongoLogger.Error("failed to send admin notification",
zap.Int64("admin_id", adminID),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
}
adminNotification.DeliveryChannel = domain.DeliveryChannelEmail
if err := s.notificationStore.SendNotification(ctx, adminNotification); err != nil {
s.mongoLogger.Error("failed to send email admin notification",
zap.Int64("admin_id", adminID),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return err
}
}
return nil
}
func (s *Service) SendAdminWalletInsufficientNotification(ctx context.Context, adminWallet domain.Wallet, amount domain.Currency) error {
// Send notification to admin team
adminNotification := &domain.Notification{
RecipientID: adminWallet.UserID,
Type: domain.NOTIFICATION_TYPE_ADMIN_ALERT,
Level: domain.NotificationLevelError,
Reciever: domain.NotificationRecieverSideAdmin,
DeliveryChannel: domain.DeliveryChannelEmail, // Or any preferred admin channel
DeliveryChannel: domain.DeliveryChannelInApp, // Or any preferred admin channel
Payload: domain.NotificationPayload{
Headline: "CREDIT Error: Admin Wallet insufficient to process customer request",
Message: fmt.Sprintf(
@ -288,33 +362,49 @@ func (s *Service) SendAdminWalletInsufficientNotification(ctx context.Context, a
}
// Get admin recipients and send to all
adminRecipients, err := s.notificationStore.ListRecipientIDs(ctx, domain.NotificationRecieverSideAdmin)
recipients, err := s.GetAdminNotificationRecipients(ctx, adminWallet.ID, adminWallet.Type)
if err != nil {
s.logger.Error("failed to get admin recipients", "error", err)
s.mongoLogger.Error("failed to get admin recipients",
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return err
} else {
for _, adminID := range adminRecipients {
adminNotification.RecipientID = adminID
if err := s.notificationStore.SendNotification(ctx, adminNotification); err != nil {
s.logger.Error("failed to send admin notification",
"admin_id", adminID,
"error", err)
}
}
for _, adminID := range recipients {
adminNotification.RecipientID = adminID
if err := s.notificationStore.SendNotification(ctx, adminNotification); err != nil {
s.mongoLogger.Error("failed to send admin notification",
zap.Int64("admin_id", adminID),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
}
adminNotification.DeliveryChannel = domain.DeliveryChannelEmail
if err := s.notificationStore.SendNotification(ctx, adminNotification); err != nil {
s.mongoLogger.Error("failed to send email admin notification",
zap.Int64("admin_id", adminID),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return err
}
}
return nil
}
func (s *Service) SendCustomerWalletInsufficientNotification(ctx context.Context, customerWallet domain.Wallet, amount domain.Currency) error {
// Send notification to admin team
adminNotification := &domain.Notification{
customerNotification := &domain.Notification{
RecipientID: customerWallet.UserID,
Type: domain.NOTIFICATION_TYPE_WALLET,
Level: domain.NotificationLevelError,
Reciever: domain.NotificationRecieverSideAdmin,
DeliveryChannel: domain.DeliveryChannelEmail, // Or any preferred admin channel
Reciever: domain.NotificationRecieverSideCustomer,
DeliveryChannel: domain.DeliveryChannelInApp, // Or any preferred admin channel
Payload: domain.NotificationPayload{
Headline: "CREDIT Error: Admin Wallet insufficient to process customer request",
Headline: "CREDIT Error: Wallet insufficient",
Message: fmt.Sprintf(
"Wallet ID %d. Transaction Amount %.2f. Current balance: %.2f",
customerWallet.ID,
@ -331,10 +421,14 @@ func (s *Service) SendCustomerWalletInsufficientNotification(ctx context.Context
}`, customerWallet.ID, customerWallet.Balance, amount.Float32()),
}
if err := s.notificationStore.SendNotification(ctx, adminNotification); err != nil {
s.logger.Error("failed to send customer notification",
"admin_id", customerWallet.UserID,
"error", err)
if err := s.notificationStore.SendNotification(ctx, customerNotification); err != nil {
s.mongoLogger.Error("failed to create customer notification",
zap.Int64("customer_id", customerWallet.UserID),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return err
}
return nil
}

View File

@ -33,7 +33,7 @@ import (
customvalidator "github.com/SamuelTariku/FortuneBet-Backend/internal/web_server/validator"
"go.uber.org/zap"
notificationservice "github.com/SamuelTariku/FortuneBet-Backend/internal/services/notfication"
notificationservice "github.com/SamuelTariku/FortuneBet-Backend/internal/services/notification"
"github.com/bytedance/sonic"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/cors"

View File

@ -15,7 +15,7 @@ import (
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/institutions"
issuereporting "github.com/SamuelTariku/FortuneBet-Backend/internal/services/issue_reporting"
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/league"
notificationservice "github.com/SamuelTariku/FortuneBet-Backend/internal/services/notfication"
notificationservice "github.com/SamuelTariku/FortuneBet-Backend/internal/services/notification"
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/odds"
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/recommendation"
referralservice "github.com/SamuelTariku/FortuneBet-Backend/internal/services/referal"
@ -58,13 +58,13 @@ type Handler struct {
virtualGameSvc virtualgameservice.VirtualGameService
aleaVirtualGameSvc alea.AleaVirtualGameService
veliVirtualGameSvc veli.VeliVirtualGameService
recommendationSvc recommendation.RecommendationService
authSvc *authentication.Service
resultSvc result.Service
jwtConfig jwtutil.JwtConfig
validator *customvalidator.CustomValidator
Cfg *config.Config
mongoLoggerSvc *zap.Logger
recommendationSvc recommendation.RecommendationService
authSvc *authentication.Service
resultSvc result.Service
jwtConfig jwtutil.JwtConfig
validator *customvalidator.CustomValidator
Cfg *config.Config
mongoLoggerSvc *zap.Logger
}
func New(
@ -124,11 +124,11 @@ func New(
virtualGameSvc: virtualGameSvc,
aleaVirtualGameSvc: aleaVirtualGameSvc,
veliVirtualGameSvc: veliVirtualGameSvc,
recommendationSvc: recommendationSvc,
authSvc: authSvc,
resultSvc: resultSvc,
jwtConfig: jwtConfig,
Cfg: cfg,
mongoLoggerSvc: mongoLoggerSvc,
recommendationSvc: recommendationSvc,
authSvc: authSvc,
resultSvc: resultSvc,
jwtConfig: jwtConfig,
Cfg: cfg,
mongoLoggerSvc: mongoLoggerSvc,
}
}