5 min report fix + arifpay integration + kafka fixes

This commit is contained in:
Yared Yemane 2025-07-23 14:33:56 +03:00
parent a35d4b37d3
commit 982573d67e
5 changed files with 19 additions and 32 deletions

View File

@ -118,16 +118,20 @@ func main() {
var notificatioStore notificationservice.NotificationStore var notificatioStore notificationservice.NotificationStore
// var userStore user.UserStore // var userStore user.UserStore
// Initialize producer
brokers := []string{"localhost:9092"}
topic := "wallet-balance-topic"
producer := kafka.NewProducer(brokers, topic)
walletSvc := wallet.NewService( walletSvc := wallet.NewService(
wallet.WalletStore(store), wallet.WalletStore(store),
wallet.TransferStore(store), wallet.TransferStore(store),
notificatioStore, notificatioStore,
// userStore,
notificationSvc, notificationSvc,
userSvc, userSvc,
domain.MongoDBLogger, domain.MongoDBLogger,
logger, logger,
kafka.NewProducer([]string{"localhost:9092"}, "wallet-events"), producer,
) )
branchSvc := branch.NewService(store) branchSvc := branch.NewService(store)

View File

@ -23,7 +23,7 @@ import (
type Service struct { type Service struct {
repo repository.NotificationRepository repo repository.NotificationRepository
Hub *ws.NotificationHub Hub *ws.NotificationHub
notificationStore NotificationStore // notificationStore
connections sync.Map connections sync.Map
notificationCh chan *domain.Notification notificationCh chan *domain.Notification
stopCh chan struct{} stopCh chan struct{}
@ -402,7 +402,7 @@ func (s *Service) UpdateLiveMetricForWallet(ctx context.Context, wallet domain.W
) )
// Try company first // Try company first
company, companyErr := s.notificationStore.GetCompanyByWalletID(ctx, wallet.ID) company, companyErr := s.GetCompanyByWalletID(ctx, wallet.ID)
if companyErr == nil { if companyErr == nil {
payload = domain.LiveWalletMetrics{ payload = domain.LiveWalletMetrics{
Timestamp: time.Now(), Timestamp: time.Now(),
@ -422,7 +422,7 @@ func (s *Service) UpdateLiveMetricForWallet(ctx context.Context, wallet domain.W
} }
} else { } else {
// Try branch next // Try branch next
branch, branchErr := s.notificationStore.GetBranchByWalletID(ctx, wallet.ID) branch, branchErr := s.GetBranchByWalletID(ctx, wallet.ID)
if branchErr == nil { if branchErr == nil {
payload = domain.LiveWalletMetrics{ payload = domain.LiveWalletMetrics{
Timestamp: time.Now(), Timestamp: time.Now(),
@ -467,9 +467,9 @@ func (s *Service) UpdateLiveMetricForWallet(ctx context.Context, wallet domain.W
} }
func (s *Service) GetCompanyByWalletID(ctx context.Context, walletID int64) (domain.Company, error) { func (s *Service) GetCompanyByWalletID(ctx context.Context, walletID int64) (domain.Company, error) {
return s.notificationStore.GetCompanyByWalletID(ctx, walletID) return s.GetCompanyByWalletID(ctx, walletID)
} }
func (s *Service) GetBranchByWalletID(ctx context.Context, walletID int64) (domain.Branch, error) { func (s *Service) GetBranchByWalletID(ctx context.Context, walletID int64) (domain.Branch, error) {
return s.notificationStore.GetBranchByWalletID(ctx, walletID) return s.GetBranchByWalletID(ctx, walletID)
} }

View File

@ -3,14 +3,10 @@ package wallet
import ( import (
"log/slog" "log/slog"
<<<<<<< HEAD
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/kafka" "github.com/SamuelTariku/FortuneBet-Backend/internal/services/kafka"
notificationservice "github.com/SamuelTariku/FortuneBet-Backend/internal/services/notfication"
=======
notificationservice "github.com/SamuelTariku/FortuneBet-Backend/internal/services/notification" notificationservice "github.com/SamuelTariku/FortuneBet-Backend/internal/services/notification"
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/user" "github.com/SamuelTariku/FortuneBet-Backend/internal/services/user"
"go.uber.org/zap" "go.uber.org/zap"
>>>>>>> d43b12c589d32e4b6147cfb54a3b939c476bae6f
) )
type Service struct { type Service struct {
@ -25,9 +21,6 @@ type Service struct {
kafkaProducer *kafka.Producer kafkaProducer *kafka.Producer
} }
<<<<<<< HEAD
func NewService(walletStore WalletStore, transferStore TransferStore, notificationStore notificationservice.NotificationStore, notificationSvc *notificationservice.Service, logger *slog.Logger, kafkaProducer *kafka.Producer) *Service {
=======
func NewService( func NewService(
walletStore WalletStore, walletStore WalletStore,
transferStore TransferStore, transferStore TransferStore,
@ -36,8 +29,8 @@ func NewService(
userSvc *user.Service, userSvc *user.Service,
mongoLogger *zap.Logger, mongoLogger *zap.Logger,
logger *slog.Logger, logger *slog.Logger,
kafkaProducer *kafka.Producer,
) *Service { ) *Service {
>>>>>>> d43b12c589d32e4b6147cfb54a3b939c476bae6f
return &Service{ return &Service{
walletStore: walletStore, walletStore: walletStore,
transferStore: transferStore, transferStore: transferStore,

View File

@ -7,11 +7,8 @@ import (
"time" "time"
"github.com/SamuelTariku/FortuneBet-Backend/internal/domain" "github.com/SamuelTariku/FortuneBet-Backend/internal/domain"
<<<<<<< HEAD
"github.com/SamuelTariku/FortuneBet-Backend/internal/event" "github.com/SamuelTariku/FortuneBet-Backend/internal/event"
=======
"go.uber.org/zap" "go.uber.org/zap"
>>>>>>> d43b12c589d32e4b6147cfb54a3b939c476bae6f
) )
var ( var (
@ -90,18 +87,13 @@ func (s *Service) UpdateBalance(ctx context.Context, id int64, balance domain.Cu
return err return err
} }
<<<<<<< HEAD wallet, err := s.GetWalletByID(ctx, id)
wallet, err := s.walletStore.GetWalletByID(ctx, id)
=======
_, err = s.GetWalletByID(ctx, id)
>>>>>>> d43b12c589d32e4b6147cfb54a3b939c476bae6f
if err != nil { if err != nil {
return err return err
} }
<<<<<<< HEAD
go func() { go func() {
s.kafkaProducer.Publish(ctx, fmt.Sprint(wallet.ID), event.WalletEvent{ s.kafkaProducer.Publish(ctx, fmt.Sprint(wallet.UserID), event.WalletEvent{
EventType: event.WalletBalanceUpdated, EventType: event.WalletBalanceUpdated,
WalletID: wallet.ID, WalletID: wallet.ID,
UserID: wallet.UserID, UserID: wallet.UserID,
@ -110,9 +102,6 @@ func (s *Service) UpdateBalance(ctx context.Context, id int64, balance domain.Cu
}) })
}() }()
=======
// go s.notificationSvc.UpdateLiveWalletMetricForWallet(ctx, wallet)
>>>>>>> d43b12c589d32e4b6147cfb54a3b939c476bae6f
return nil return nil
} }
@ -267,7 +256,8 @@ func (s *Service) UpdateWalletActive(ctx context.Context, id int64, isActive boo
func (s *Service) GetAdminNotificationRecipients(ctx context.Context, walletID int64, walletType domain.WalletType) ([]int64, error) { func (s *Service) GetAdminNotificationRecipients(ctx context.Context, walletID int64, walletType domain.WalletType) ([]int64, error) {
var recipients []int64 var recipients []int64
if walletType == domain.BranchWalletType { switch walletType {
case domain.BranchWalletType:
branch, err := s.GetBranchByWalletID(ctx, walletID) branch, err := s.GetBranchByWalletID(ctx, walletID)
if err != nil { if err != nil {
return nil, err return nil, err
@ -288,13 +278,13 @@ func (s *Service) GetAdminNotificationRecipients(ctx context.Context, walletID i
} }
recipients = append(recipients, admin.ID) recipients = append(recipients, admin.ID)
} else if walletType == domain.CompanyWalletType { case domain.CompanyWalletType:
company, err := s.GetCompanyByWalletID(ctx, walletID) company, err := s.GetCompanyByWalletID(ctx, walletID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
recipients = append(recipients, company.AdminID) recipients = append(recipients, company.AdminID)
} else { default:
return nil, fmt.Errorf("Invalid wallet type") return nil, fmt.Errorf("Invalid wallet type")
} }