diff --git a/cmd/main.go b/cmd/main.go index 5f45894..9fd5b73 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -118,16 +118,20 @@ func main() { var notificatioStore notificationservice.NotificationStore // var userStore user.UserStore + // Initialize producer + brokers := []string{"localhost:9092"} + topic := "wallet-balance-topic" + producer := kafka.NewProducer(brokers, topic) + walletSvc := wallet.NewService( wallet.WalletStore(store), wallet.TransferStore(store), notificatioStore, - // userStore, notificationSvc, userSvc, domain.MongoDBLogger, logger, - kafka.NewProducer([]string{"localhost:9092"}, "wallet-events"), + producer, ) branchSvc := branch.NewService(store) diff --git a/internal/services/kafka/producer.go b/internal/services/kafka/producer.go index 1c3278d..8c03b5c 100644 --- a/internal/services/kafka/producer.go +++ b/internal/services/kafka/producer.go @@ -27,7 +27,7 @@ func (p *Producer) Publish(ctx context.Context, key string, event any) error { if err != nil { return err } - + return p.writer.WriteMessages(ctx, kafka.Message{ Key: []byte(key), Value: msgBytes, diff --git a/internal/services/notfication/service.go b/internal/services/notfication/service.go index fc44b71..c6a5457 100644 --- a/internal/services/notfication/service.go +++ b/internal/services/notfication/service.go @@ -23,7 +23,7 @@ import ( type Service struct { repo repository.NotificationRepository Hub *ws.NotificationHub - notificationStore NotificationStore + // notificationStore connections sync.Map notificationCh chan *domain.Notification stopCh chan struct{} @@ -402,7 +402,7 @@ func (s *Service) UpdateLiveMetricForWallet(ctx context.Context, wallet domain.W ) // Try company first - company, companyErr := s.notificationStore.GetCompanyByWalletID(ctx, wallet.ID) + company, companyErr := s.GetCompanyByWalletID(ctx, wallet.ID) if companyErr == nil { payload = domain.LiveWalletMetrics{ Timestamp: time.Now(), @@ -422,7 +422,7 @@ func (s *Service) UpdateLiveMetricForWallet(ctx context.Context, wallet domain.W } } else { // Try branch next - branch, branchErr := s.notificationStore.GetBranchByWalletID(ctx, wallet.ID) + branch, branchErr := s.GetBranchByWalletID(ctx, wallet.ID) if branchErr == nil { payload = domain.LiveWalletMetrics{ Timestamp: time.Now(), @@ -467,9 +467,9 @@ func (s *Service) UpdateLiveMetricForWallet(ctx context.Context, wallet domain.W } func (s *Service) GetCompanyByWalletID(ctx context.Context, walletID int64) (domain.Company, error) { - return s.notificationStore.GetCompanyByWalletID(ctx, walletID) + return s.GetCompanyByWalletID(ctx, walletID) } func (s *Service) GetBranchByWalletID(ctx context.Context, walletID int64) (domain.Branch, error) { - return s.notificationStore.GetBranchByWalletID(ctx, walletID) + return s.GetBranchByWalletID(ctx, walletID) } diff --git a/internal/services/wallet/service.go b/internal/services/wallet/service.go index 319e3c3..4217dd8 100644 --- a/internal/services/wallet/service.go +++ b/internal/services/wallet/service.go @@ -3,14 +3,10 @@ package wallet import ( "log/slog" -<<<<<<< HEAD "github.com/SamuelTariku/FortuneBet-Backend/internal/services/kafka" - notificationservice "github.com/SamuelTariku/FortuneBet-Backend/internal/services/notfication" -======= notificationservice "github.com/SamuelTariku/FortuneBet-Backend/internal/services/notification" "github.com/SamuelTariku/FortuneBet-Backend/internal/services/user" "go.uber.org/zap" ->>>>>>> d43b12c589d32e4b6147cfb54a3b939c476bae6f ) type Service struct { @@ -25,9 +21,6 @@ type Service struct { kafkaProducer *kafka.Producer } -<<<<<<< HEAD -func NewService(walletStore WalletStore, transferStore TransferStore, notificationStore notificationservice.NotificationStore, notificationSvc *notificationservice.Service, logger *slog.Logger, kafkaProducer *kafka.Producer) *Service { -======= func NewService( walletStore WalletStore, transferStore TransferStore, @@ -36,8 +29,8 @@ func NewService( userSvc *user.Service, mongoLogger *zap.Logger, logger *slog.Logger, + kafkaProducer *kafka.Producer, ) *Service { ->>>>>>> d43b12c589d32e4b6147cfb54a3b939c476bae6f return &Service{ walletStore: walletStore, transferStore: transferStore, diff --git a/internal/services/wallet/wallet.go b/internal/services/wallet/wallet.go index 4dfc565..5ae8ab4 100644 --- a/internal/services/wallet/wallet.go +++ b/internal/services/wallet/wallet.go @@ -7,11 +7,8 @@ import ( "time" "github.com/SamuelTariku/FortuneBet-Backend/internal/domain" -<<<<<<< HEAD "github.com/SamuelTariku/FortuneBet-Backend/internal/event" -======= "go.uber.org/zap" ->>>>>>> d43b12c589d32e4b6147cfb54a3b939c476bae6f ) var ( @@ -90,18 +87,13 @@ func (s *Service) UpdateBalance(ctx context.Context, id int64, balance domain.Cu return err } -<<<<<<< HEAD - wallet, err := s.walletStore.GetWalletByID(ctx, id) -======= - _, err = s.GetWalletByID(ctx, id) ->>>>>>> d43b12c589d32e4b6147cfb54a3b939c476bae6f + wallet, err := s.GetWalletByID(ctx, id) if err != nil { return err } -<<<<<<< HEAD go func() { - s.kafkaProducer.Publish(ctx, fmt.Sprint(wallet.ID), event.WalletEvent{ + s.kafkaProducer.Publish(ctx, fmt.Sprint(wallet.UserID), event.WalletEvent{ EventType: event.WalletBalanceUpdated, WalletID: wallet.ID, UserID: wallet.UserID, @@ -110,9 +102,6 @@ func (s *Service) UpdateBalance(ctx context.Context, id int64, balance domain.Cu }) }() -======= - // go s.notificationSvc.UpdateLiveWalletMetricForWallet(ctx, wallet) ->>>>>>> d43b12c589d32e4b6147cfb54a3b939c476bae6f return nil } @@ -267,7 +256,8 @@ func (s *Service) UpdateWalletActive(ctx context.Context, id int64, isActive boo func (s *Service) GetAdminNotificationRecipients(ctx context.Context, walletID int64, walletType domain.WalletType) ([]int64, error) { var recipients []int64 - if walletType == domain.BranchWalletType { + switch walletType { + case domain.BranchWalletType: branch, err := s.GetBranchByWalletID(ctx, walletID) if err != nil { return nil, err @@ -288,13 +278,13 @@ func (s *Service) GetAdminNotificationRecipients(ctx context.Context, walletID i } recipients = append(recipients, admin.ID) - } else if walletType == domain.CompanyWalletType { + case domain.CompanyWalletType: company, err := s.GetCompanyByWalletID(ctx, walletID) if err != nil { return nil, err } recipients = append(recipients, company.AdminID) - } else { + default: return nil, fmt.Errorf("Invalid wallet type") }