diff --git a/cmd/main.go b/cmd/main.go index 1297a2c..5b0f003 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -41,7 +41,6 @@ import ( "github.com/SamuelTariku/FortuneBet-Backend/internal/services/event" "github.com/SamuelTariku/FortuneBet-Backend/internal/services/institutions" issuereporting "github.com/SamuelTariku/FortuneBet-Backend/internal/services/issue_reporting" - "github.com/SamuelTariku/FortuneBet-Backend/internal/services/kafka" "github.com/SamuelTariku/FortuneBet-Backend/internal/services/league" "github.com/SamuelTariku/FortuneBet-Backend/internal/services/messenger" notificationservice "github.com/SamuelTariku/FortuneBet-Backend/internal/services/notification" @@ -122,10 +121,10 @@ func main() { // var userStore user.UserStore // Initialize producer - topic := "wallet-balance-topic" - producer := kafka.NewProducer(cfg.KafkaBrokers, topic) + // topic := "wallet-balance-topic" + // producer := kafka.NewProducer(cfg.KafkaBrokers, topic) - notificationSvc := notificationservice.New(notificationRepo, domain.MongoDBLogger, logger, cfg, messengerSvc, userSvc, cfg.KafkaBrokers) + notificationSvc := notificationservice.New(notificationRepo, domain.MongoDBLogger, logger, cfg, messengerSvc, userSvc) walletSvc := wallet.NewService( wallet.WalletStore(store), @@ -135,7 +134,6 @@ func main() { userSvc, domain.MongoDBLogger, logger, - producer, ) branchSvc := branch.NewService(store) @@ -239,7 +237,7 @@ func main() { go walletMonitorSvc.Start() httpserver.StartDataFetchingCrons(eventSvc, *oddsSvc, resultSvc, domain.MongoDBLogger) - httpserver.StartTicketCrons(*ticketSvc, domain.MongoDBLogger) + httpserver.StartCleanupCrons(*ticketSvc, notificationSvc, domain.MongoDBLogger) issueReportingRepo := repository.NewReportedIssueRepository(store) diff --git a/db/migrations/000002_notification.up.sql b/db/migrations/000002_notification.up.sql index 3d74954..78b28c8 100644 --- a/db/migrations/000002_notification.up.sql +++ b/db/migrations/000002_notification.up.sql @@ -37,6 +37,8 @@ CREATE TABLE IF NOT EXISTS notifications ( priority INTEGER, version INTEGER NOT NULL DEFAULT 0, timestamp TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, + img TEXT, + expires TIMESTAMPTZ NOT NULL, metadata JSONB ); CREATE TABLE IF NOT EXISTS wallet_threshold_notifications ( diff --git a/db/query/branch.sql b/db/query/branch.sql index 1e09f40..14fd768 100644 --- a/db/query/branch.sql +++ b/db/query/branch.sql @@ -65,7 +65,11 @@ WHERE branch_manager_id = $1; -- name: SearchBranchByName :many SELECT * FROM branch_details -WHERE name ILIKE '%' || $1 || '%'; +WHERE name ILIKE '%' || $1 || '%' + AND ( + company_id = sqlc.narg('company_id') + OR sqlc.narg('company_id') IS NULL + ); -- name: GetAllSupportedOperations :many SELECT * FROM supported_operations; diff --git a/db/query/notification.sql b/db/query/notification.sql index 76d0edc..c1dfa06 100644 --- a/db/query/notification.sql +++ b/db/query/notification.sql @@ -12,6 +12,8 @@ INSERT INTO notifications ( payload, priority, timestamp, + expires, + img, metadata ) VALUES ( @@ -27,7 +29,9 @@ VALUES ( $10, $11, $12, - $13 + $13, + $14, + $15 ) RETURNING *; -- name: GetNotification :one @@ -88,4 +92,8 @@ SELECT COUNT(*) as total, WHEN is_read = false THEN 1 END ) as unread -FROM notifications; \ No newline at end of file +FROM notifications; + +-- name: DeleteOldNotifications :exec +DELETE FROM notifications +WHERE expires < now(); \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index b480236..d80ff06 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -54,46 +54,17 @@ services: ] networks: - app - redis: - image: redis:7-alpine - ports: - - "6379:6379" - networks: - - app - healthcheck: - test: ["CMD", "redis-cli", "ping"] - interval: 10s - timeout: 5s - retries: 5 - - - zookeeper: - image: confluentinc/cp-zookeeper:7.5.0 - container_name: zookeeper - ports: - - "2181:2181" - environment: - ZOOKEEPER_CLIENT_PORT: 2181 - ZOOKEEPER_TICK_TIME: 2000 - networks: - - app - kafka: - image: confluentinc/cp-kafka:7.5.0 - depends_on: - - zookeeper - environment: - KAFKA_BROKER_ID: 1 - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:29092 - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - - ports: - - "9092:9092" - - "29092:29092" - networks: - - app + # redis: + # image: redis:7-alpine + # ports: + # - "6379:6379" + # networks: + # - app + # healthcheck: + # test: ["CMD", "redis-cli", "ping"] + # interval: 10s + # timeout: 5s + # retries: 5 app: build: context: . @@ -104,15 +75,9 @@ services: environment: - DB_URL=postgresql://root:secret@postgres:5432/gh?sslmode=disable - MONGO_URI=mongodb://root:secret@mongo:27017 - - REDIS_ADDR=redis:6379 - - KAFKA_BROKERS=kafka:9092 depends_on: migrate: condition: service_completed_successfully - mongo: - condition: service_healthy - redis: - condition: service_healthy networks: - app command: ["/app/bin/web"] diff --git a/gen/db/auth.sql.go b/gen/db/auth.sql.go index 8dd2280..7d8d59d 100644 --- a/gen/db/auth.sql.go +++ b/gen/db/auth.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.30.0 +// sqlc v1.29.0 // source: auth.sql package dbgen diff --git a/gen/db/bet.sql.go b/gen/db/bet.sql.go index a9fac0e..8e6254c 100644 --- a/gen/db/bet.sql.go +++ b/gen/db/bet.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.30.0 +// sqlc v1.29.0 // source: bet.sql package dbgen diff --git a/gen/db/bet_stat.sql.go b/gen/db/bet_stat.sql.go index 9a7b494..275ef07 100644 --- a/gen/db/bet_stat.sql.go +++ b/gen/db/bet_stat.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.30.0 +// sqlc v1.29.0 // source: bet_stat.sql package dbgen diff --git a/gen/db/bonus.sql.go b/gen/db/bonus.sql.go index 1a5d8e9..7c6f168 100644 --- a/gen/db/bonus.sql.go +++ b/gen/db/bonus.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.30.0 +// sqlc v1.29.0 // source: bonus.sql package dbgen diff --git a/gen/db/branch.sql.go b/gen/db/branch.sql.go index 89d2959..64cd62e 100644 --- a/gen/db/branch.sql.go +++ b/gen/db/branch.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.30.0 +// sqlc v1.29.0 // source: branch.sql package dbgen @@ -455,10 +455,19 @@ const SearchBranchByName = `-- name: SearchBranchByName :many SELECT id, name, location, profit_percent, is_active, wallet_id, branch_manager_id, company_id, is_self_owned, created_at, updated_at, manager_name, manager_phone_number, balance, wallet_is_active FROM branch_details WHERE name ILIKE '%' || $1 || '%' + AND ( + company_id = $2 + OR $2 IS NULL + ) ` -func (q *Queries) SearchBranchByName(ctx context.Context, dollar_1 pgtype.Text) ([]BranchDetail, error) { - rows, err := q.db.Query(ctx, SearchBranchByName, dollar_1) +type SearchBranchByNameParams struct { + Column1 pgtype.Text `json:"column_1"` + CompanyID pgtype.Int8 `json:"company_id"` +} + +func (q *Queries) SearchBranchByName(ctx context.Context, arg SearchBranchByNameParams) ([]BranchDetail, error) { + rows, err := q.db.Query(ctx, SearchBranchByName, arg.Column1, arg.CompanyID) if err != nil { return nil, err } diff --git a/gen/db/cashier.sql.go b/gen/db/cashier.sql.go index 55e69d2..fc4a7f8 100644 --- a/gen/db/cashier.sql.go +++ b/gen/db/cashier.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.30.0 +// sqlc v1.29.0 // source: cashier.sql package dbgen diff --git a/gen/db/company.sql.go b/gen/db/company.sql.go index ec851a4..32d9ee2 100644 --- a/gen/db/company.sql.go +++ b/gen/db/company.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.30.0 +// sqlc v1.29.0 // source: company.sql package dbgen diff --git a/gen/db/copyfrom.go b/gen/db/copyfrom.go index f7a4793..1212253 100644 --- a/gen/db/copyfrom.go +++ b/gen/db/copyfrom.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.30.0 +// sqlc v1.29.0 // source: copyfrom.go package dbgen diff --git a/gen/db/db.go b/gen/db/db.go index 8134784..84de07c 100644 --- a/gen/db/db.go +++ b/gen/db/db.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.30.0 +// sqlc v1.29.0 package dbgen diff --git a/gen/db/direct_deposit.sql.go b/gen/db/direct_deposit.sql.go index ff5a3b2..be02750 100644 --- a/gen/db/direct_deposit.sql.go +++ b/gen/db/direct_deposit.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.30.0 +// sqlc v1.29.0 // source: direct_deposit.sql package dbgen diff --git a/gen/db/disabled_odds.sql.go b/gen/db/disabled_odds.sql.go index 58913cf..b9cc744 100644 --- a/gen/db/disabled_odds.sql.go +++ b/gen/db/disabled_odds.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.30.0 +// sqlc v1.29.0 // source: disabled_odds.sql package dbgen diff --git a/gen/db/enet_pulse.sql.go b/gen/db/enet_pulse.sql.go index 9e72da8..a2c131b 100644 --- a/gen/db/enet_pulse.sql.go +++ b/gen/db/enet_pulse.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.30.0 +// sqlc v1.29.0 // source: enet_pulse.sql package dbgen diff --git a/gen/db/event_history.sql.go b/gen/db/event_history.sql.go index 35946cd..a4f1c2e 100644 --- a/gen/db/event_history.sql.go +++ b/gen/db/event_history.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.30.0 +// sqlc v1.29.0 // source: event_history.sql package dbgen diff --git a/gen/db/events.sql.go b/gen/db/events.sql.go index 1feaf00..a8345fb 100644 --- a/gen/db/events.sql.go +++ b/gen/db/events.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.30.0 +// sqlc v1.29.0 // source: events.sql package dbgen diff --git a/gen/db/events_stat.sql.go b/gen/db/events_stat.sql.go index 615e2fa..677fa2a 100644 --- a/gen/db/events_stat.sql.go +++ b/gen/db/events_stat.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.30.0 +// sqlc v1.29.0 // source: events_stat.sql package dbgen diff --git a/gen/db/flags.sql.go b/gen/db/flags.sql.go index 4b82cac..653543f 100644 --- a/gen/db/flags.sql.go +++ b/gen/db/flags.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.30.0 +// sqlc v1.29.0 // source: flags.sql package dbgen diff --git a/gen/db/institutions.sql.go b/gen/db/institutions.sql.go index 61ca108..324ac3e 100644 --- a/gen/db/institutions.sql.go +++ b/gen/db/institutions.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.30.0 +// sqlc v1.29.0 // source: institutions.sql package dbgen diff --git a/gen/db/issue_reporting.sql.go b/gen/db/issue_reporting.sql.go index e35fba1..7fcb4af 100644 --- a/gen/db/issue_reporting.sql.go +++ b/gen/db/issue_reporting.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.30.0 +// sqlc v1.29.0 // source: issue_reporting.sql package dbgen diff --git a/gen/db/leagues.sql.go b/gen/db/leagues.sql.go index 8fb31ca..1413699 100644 --- a/gen/db/leagues.sql.go +++ b/gen/db/leagues.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.30.0 +// sqlc v1.29.0 // source: leagues.sql package dbgen diff --git a/gen/db/location.sql.go b/gen/db/location.sql.go index 254c73a..008aa61 100644 --- a/gen/db/location.sql.go +++ b/gen/db/location.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.30.0 +// sqlc v1.29.0 // source: location.sql package dbgen diff --git a/gen/db/models.go b/gen/db/models.go index af11573..7f7d4f9 100644 --- a/gen/db/models.go +++ b/gen/db/models.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.30.0 +// sqlc v1.29.0 package dbgen @@ -482,6 +482,8 @@ type Notification struct { Priority pgtype.Int4 `json:"priority"` Version int32 `json:"version"` Timestamp pgtype.Timestamptz `json:"timestamp"` + Img pgtype.Text `json:"img"` + Expires pgtype.Timestamptz `json:"expires"` Metadata []byte `json:"metadata"` } diff --git a/gen/db/monitor.sql.go b/gen/db/monitor.sql.go index b5f248f..a9a7ecb 100644 --- a/gen/db/monitor.sql.go +++ b/gen/db/monitor.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.30.0 +// sqlc v1.29.0 // source: monitor.sql package dbgen diff --git a/gen/db/notification.sql.go b/gen/db/notification.sql.go index 14d3a4c..3c029fa 100644 --- a/gen/db/notification.sql.go +++ b/gen/db/notification.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.30.0 +// sqlc v1.29.0 // source: notification.sql package dbgen @@ -39,6 +39,8 @@ INSERT INTO notifications ( payload, priority, timestamp, + expires, + img, metadata ) VALUES ( @@ -54,9 +56,11 @@ VALUES ( $10, $11, $12, - $13 + $13, + $14, + $15 ) -RETURNING id, recipient_id, type, level, error_severity, reciever, is_read, delivery_status, delivery_channel, payload, priority, version, timestamp, metadata +RETURNING id, recipient_id, type, level, error_severity, reciever, is_read, delivery_status, delivery_channel, payload, priority, version, timestamp, img, expires, metadata ` type CreateNotificationParams struct { @@ -72,6 +76,8 @@ type CreateNotificationParams struct { Payload []byte `json:"payload"` Priority pgtype.Int4 `json:"priority"` Timestamp pgtype.Timestamptz `json:"timestamp"` + Expires pgtype.Timestamptz `json:"expires"` + Img pgtype.Text `json:"img"` Metadata []byte `json:"metadata"` } @@ -89,6 +95,8 @@ func (q *Queries) CreateNotification(ctx context.Context, arg CreateNotification arg.Payload, arg.Priority, arg.Timestamp, + arg.Expires, + arg.Img, arg.Metadata, ) var i Notification @@ -106,13 +114,25 @@ func (q *Queries) CreateNotification(ctx context.Context, arg CreateNotification &i.Priority, &i.Version, &i.Timestamp, + &i.Img, + &i.Expires, &i.Metadata, ) return i, err } +const DeleteOldNotifications = `-- name: DeleteOldNotifications :exec +DELETE FROM notifications +WHERE expires < now() +` + +func (q *Queries) DeleteOldNotifications(ctx context.Context) error { + _, err := q.db.Exec(ctx, DeleteOldNotifications) + return err +} + const GetAllNotifications = `-- name: GetAllNotifications :many -SELECT id, recipient_id, type, level, error_severity, reciever, is_read, delivery_status, delivery_channel, payload, priority, version, timestamp, metadata +SELECT id, recipient_id, type, level, error_severity, reciever, is_read, delivery_status, delivery_channel, payload, priority, version, timestamp, img, expires, metadata FROM notifications ORDER BY timestamp DESC LIMIT $1 OFFSET $2 @@ -146,6 +166,8 @@ func (q *Queries) GetAllNotifications(ctx context.Context, arg GetAllNotificatio &i.Priority, &i.Version, &i.Timestamp, + &i.Img, + &i.Expires, &i.Metadata, ); err != nil { return nil, err @@ -159,7 +181,7 @@ func (q *Queries) GetAllNotifications(ctx context.Context, arg GetAllNotificatio } const GetNotification = `-- name: GetNotification :one -SELECT id, recipient_id, type, level, error_severity, reciever, is_read, delivery_status, delivery_channel, payload, priority, version, timestamp, metadata +SELECT id, recipient_id, type, level, error_severity, reciever, is_read, delivery_status, delivery_channel, payload, priority, version, timestamp, img, expires, metadata FROM notifications WHERE id = $1 LIMIT 1 @@ -182,6 +204,8 @@ func (q *Queries) GetNotification(ctx context.Context, id string) (Notification, &i.Priority, &i.Version, &i.Timestamp, + &i.Img, + &i.Expires, &i.Metadata, ) return i, err @@ -254,7 +278,7 @@ func (q *Queries) GetUserNotificationCount(ctx context.Context, recipientID int6 } const GetUserNotifications = `-- name: GetUserNotifications :many -SELECT id, recipient_id, type, level, error_severity, reciever, is_read, delivery_status, delivery_channel, payload, priority, version, timestamp, metadata +SELECT id, recipient_id, type, level, error_severity, reciever, is_read, delivery_status, delivery_channel, payload, priority, version, timestamp, img, expires, metadata FROM notifications WHERE recipient_id = $1 ORDER BY timestamp DESC @@ -290,6 +314,8 @@ func (q *Queries) GetUserNotifications(ctx context.Context, arg GetUserNotificat &i.Priority, &i.Version, &i.Timestamp, + &i.Img, + &i.Expires, &i.Metadata, ); err != nil { return nil, err @@ -303,7 +329,7 @@ func (q *Queries) GetUserNotifications(ctx context.Context, arg GetUserNotificat } const ListFailedNotifications = `-- name: ListFailedNotifications :many -SELECT id, recipient_id, type, level, error_severity, reciever, is_read, delivery_status, delivery_channel, payload, priority, version, timestamp, metadata +SELECT id, recipient_id, type, level, error_severity, reciever, is_read, delivery_status, delivery_channel, payload, priority, version, timestamp, img, expires, metadata FROM notifications WHERE delivery_status = 'failed' AND timestamp < NOW() - INTERVAL '1 hour' @@ -334,6 +360,8 @@ func (q *Queries) ListFailedNotifications(ctx context.Context, limit int32) ([]N &i.Priority, &i.Version, &i.Timestamp, + &i.Img, + &i.Expires, &i.Metadata, ); err != nil { return nil, err @@ -378,7 +406,7 @@ SET delivery_status = $2, is_read = $3, metadata = $4 WHERE id = $1 -RETURNING id, recipient_id, type, level, error_severity, reciever, is_read, delivery_status, delivery_channel, payload, priority, version, timestamp, metadata +RETURNING id, recipient_id, type, level, error_severity, reciever, is_read, delivery_status, delivery_channel, payload, priority, version, timestamp, img, expires, metadata ` type UpdateNotificationStatusParams struct { @@ -410,6 +438,8 @@ func (q *Queries) UpdateNotificationStatus(ctx context.Context, arg UpdateNotifi &i.Priority, &i.Version, &i.Timestamp, + &i.Img, + &i.Expires, &i.Metadata, ) return i, err diff --git a/gen/db/odd_history.sql.go b/gen/db/odd_history.sql.go index dd834c5..3fe7dd9 100644 --- a/gen/db/odd_history.sql.go +++ b/gen/db/odd_history.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.30.0 +// sqlc v1.29.0 // source: odd_history.sql package dbgen diff --git a/gen/db/odds.sql.go b/gen/db/odds.sql.go index d1e676d..e7c687e 100644 --- a/gen/db/odds.sql.go +++ b/gen/db/odds.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.30.0 +// sqlc v1.29.0 // source: odds.sql package dbgen diff --git a/gen/db/otp.sql.go b/gen/db/otp.sql.go index c96aaaa..7dba175 100644 --- a/gen/db/otp.sql.go +++ b/gen/db/otp.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.30.0 +// sqlc v1.29.0 // source: otp.sql package dbgen diff --git a/gen/db/raffle.sql.go b/gen/db/raffle.sql.go index 8d0be34..d4b85d3 100644 --- a/gen/db/raffle.sql.go +++ b/gen/db/raffle.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.30.0 +// sqlc v1.29.0 // source: raffle.sql package dbgen diff --git a/gen/db/referal.sql.go b/gen/db/referal.sql.go index 99d8bb2..caaa01a 100644 --- a/gen/db/referal.sql.go +++ b/gen/db/referal.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.30.0 +// sqlc v1.29.0 // source: referal.sql package dbgen diff --git a/gen/db/report.sql.go b/gen/db/report.sql.go index d6193c1..1a1ccde 100644 --- a/gen/db/report.sql.go +++ b/gen/db/report.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.30.0 +// sqlc v1.29.0 // source: report.sql package dbgen diff --git a/gen/db/result.sql.go b/gen/db/result.sql.go index 899561b..bff7b1e 100644 --- a/gen/db/result.sql.go +++ b/gen/db/result.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.30.0 +// sqlc v1.29.0 // source: result.sql package dbgen diff --git a/gen/db/result_log.sql.go b/gen/db/result_log.sql.go index 3f11e16..468795e 100644 --- a/gen/db/result_log.sql.go +++ b/gen/db/result_log.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.30.0 +// sqlc v1.29.0 // source: result_log.sql package dbgen diff --git a/gen/db/settings.sql.go b/gen/db/settings.sql.go index 76eb504..96ea916 100644 --- a/gen/db/settings.sql.go +++ b/gen/db/settings.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.30.0 +// sqlc v1.29.0 // source: settings.sql package dbgen diff --git a/gen/db/shop_transactions.sql.go b/gen/db/shop_transactions.sql.go index 68d770e..5c4b52e 100644 --- a/gen/db/shop_transactions.sql.go +++ b/gen/db/shop_transactions.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.30.0 +// sqlc v1.29.0 // source: shop_transactions.sql package dbgen diff --git a/gen/db/ticket.sql.go b/gen/db/ticket.sql.go index 45603ba..bc9bb5f 100644 --- a/gen/db/ticket.sql.go +++ b/gen/db/ticket.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.30.0 +// sqlc v1.29.0 // source: ticket.sql package dbgen diff --git a/gen/db/transfer.sql.go b/gen/db/transfer.sql.go index fe25cbe..b2a1066 100644 --- a/gen/db/transfer.sql.go +++ b/gen/db/transfer.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.30.0 +// sqlc v1.29.0 // source: transfer.sql package dbgen diff --git a/gen/db/user.sql.go b/gen/db/user.sql.go index 9b16163..f2f9fff 100644 --- a/gen/db/user.sql.go +++ b/gen/db/user.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.30.0 +// sqlc v1.29.0 // source: user.sql package dbgen diff --git a/gen/db/virtual_games.sql.go b/gen/db/virtual_games.sql.go index b98f602..5a2809a 100644 --- a/gen/db/virtual_games.sql.go +++ b/gen/db/virtual_games.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.30.0 +// sqlc v1.29.0 // source: virtual_games.sql package dbgen diff --git a/gen/db/wallet.sql.go b/gen/db/wallet.sql.go index ccb2d37..fcde631 100644 --- a/gen/db/wallet.sql.go +++ b/gen/db/wallet.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.30.0 +// sqlc v1.29.0 // source: wallet.sql package dbgen diff --git a/internal/domain/currency.go b/internal/domain/currency.go index 7ce3b3c..a15ba52 100644 --- a/internal/domain/currency.go +++ b/internal/domain/currency.go @@ -25,49 +25,83 @@ func (m Currency) String() string { return fmt.Sprintf("$%.2f", m.Float32()) } + +// TODO: Change the currency to this format when implementing multi-currency +// type Currency struct { +// Value int64 +// Type IntCurrency +// } + +// // ToCurrency converts a float32 (like 12.34) into Currency (stored in cents). +// func NewCurrency(f float32, currencyType IntCurrency) Currency { +// cents := math.Round(float64(f) * 100) // avoid float32 precision issues +// return Currency{ +// Value: int64(cents), +// Type: currencyType, +// } +// } + +// func NewBase(v int64) Currency { +// return Currency{ +// Value: v, +// Type: BASE, +// } +// } + +// // Float32 converts a Currency back into float32 (like 12.34). +// func (m Currency) Float32() float32 { +// return float32(m.Value) / 100 +// } + +// // String returns a formatted Currency value for display. +// func (m Currency) String() string { +// return fmt.Sprintf("$%.2f", m.Float32()) +// } + type IntCurrency string const ( - ETB IntCurrency = "ETB" // Ethiopian Birr - NGN IntCurrency = "NGN" // Nigerian Naira - ZAR IntCurrency = "ZAR" // South African Rand - EGP IntCurrency = "EGP" // Egyptian Pound - KES IntCurrency = "KES" // Kenyan Shilling - UGX IntCurrency = "UGX" // Ugandan Shilling - TZS IntCurrency = "TZS" // Tanzanian Shilling - RWF IntCurrency = "RWF" // Rwandan Franc - BIF IntCurrency = "BIF" // Burundian Franc - XOF IntCurrency = "XOF" // West African CFA Franc (BCEAO) - XAF IntCurrency = "XAF" // Central African CFA Franc (BEAC) - GHS IntCurrency = "GHS" // Ghanaian Cedi - SDG IntCurrency = "SDG" // Sudanese Pound - SSP IntCurrency = "SSP" // South Sudanese Pound - DZD IntCurrency = "DZD" // Algerian Dinar - MAD IntCurrency = "MAD" // Moroccan Dirham - TND IntCurrency = "TND" // Tunisian Dinar - LYD IntCurrency = "LYD" // Libyan Dinar - MZN IntCurrency = "MZN" // Mozambican Metical - AOA IntCurrency = "AOA" // Angolan Kwanza - BWP IntCurrency = "BWP" // Botswana Pula - ZMW IntCurrency = "ZMW" // Zambian Kwacha - MWK IntCurrency = "MWK" // Malawian Kwacha - LSL IntCurrency = "LSL" // Lesotho Loti - NAD IntCurrency = "NAD" // Namibian Dollar - SZL IntCurrency = "SZL" // Swazi Lilangeni - CVE IntCurrency = "CVE" // Cape Verdean Escudo - GMD IntCurrency = "GMD" // Gambian Dalasi - SLL IntCurrency = "SLL" // Sierra Leonean Leone - LRD IntCurrency = "LRD" // Liberian Dollar - GNF IntCurrency = "GNF" // Guinean Franc - XCD IntCurrency = "XCD" // Eastern Caribbean Dollar (used in Saint Lucia) - MRU IntCurrency = "MRU" // Mauritanian Ouguiya - KMF IntCurrency = "KMF" // Comorian Franc - DJF IntCurrency = "DJF" // Djiboutian Franc - SOS IntCurrency = "SOS" // Somali Shilling - ERN IntCurrency = "ERN" // Eritrean Nakfa - MGA IntCurrency = "MGA" // Malagasy Ariary - SCR IntCurrency = "SCR" // Seychellois Rupee - MUR IntCurrency = "MUR" // Mauritian Rupee + BASE IntCurrency = "BASE" + ETB IntCurrency = "ETB" // Ethiopian Birr + NGN IntCurrency = "NGN" // Nigerian Naira + ZAR IntCurrency = "ZAR" // South African Rand + EGP IntCurrency = "EGP" // Egyptian Pound + KES IntCurrency = "KES" // Kenyan Shilling + UGX IntCurrency = "UGX" // Ugandan Shilling + TZS IntCurrency = "TZS" // Tanzanian Shilling + RWF IntCurrency = "RWF" // Rwandan Franc + BIF IntCurrency = "BIF" // Burundian Franc + XOF IntCurrency = "XOF" // West African CFA Franc (BCEAO) + XAF IntCurrency = "XAF" // Central African CFA Franc (BEAC) + GHS IntCurrency = "GHS" // Ghanaian Cedi + SDG IntCurrency = "SDG" // Sudanese Pound + SSP IntCurrency = "SSP" // South Sudanese Pound + DZD IntCurrency = "DZD" // Algerian Dinar + MAD IntCurrency = "MAD" // Moroccan Dirham + TND IntCurrency = "TND" // Tunisian Dinar + LYD IntCurrency = "LYD" // Libyan Dinar + MZN IntCurrency = "MZN" // Mozambican Metical + AOA IntCurrency = "AOA" // Angolan Kwanza + BWP IntCurrency = "BWP" // Botswana Pula + ZMW IntCurrency = "ZMW" // Zambian Kwacha + MWK IntCurrency = "MWK" // Malawian Kwacha + LSL IntCurrency = "LSL" // Lesotho Loti + NAD IntCurrency = "NAD" // Namibian Dollar + SZL IntCurrency = "SZL" // Swazi Lilangeni + CVE IntCurrency = "CVE" // Cape Verdean Escudo + GMD IntCurrency = "GMD" // Gambian Dalasi + SLL IntCurrency = "SLL" // Sierra Leonean Leone + LRD IntCurrency = "LRD" // Liberian Dollar + GNF IntCurrency = "GNF" // Guinean Franc + XCD IntCurrency = "XCD" // Eastern Caribbean Dollar (used in Saint Lucia) + MRU IntCurrency = "MRU" // Mauritanian Ouguiya + KMF IntCurrency = "KMF" // Comorian Franc + DJF IntCurrency = "DJF" // Djiboutian Franc + SOS IntCurrency = "SOS" // Somali Shilling + ERN IntCurrency = "ERN" // Eritrean Nakfa + MGA IntCurrency = "MGA" // Malagasy Ariary + SCR IntCurrency = "SCR" // Seychellois Rupee + MUR IntCurrency = "MUR" // Mauritian Rupee // International currencies (already listed) USD IntCurrency = "USD" // US Dollar diff --git a/internal/domain/notification.go b/internal/domain/notification.go index b7193d7..046b3a9 100644 --- a/internal/domain/notification.go +++ b/internal/domain/notification.go @@ -84,6 +84,8 @@ type Notification struct { Priority int `json:"priority,omitempty"` Version int `json:"-"` Timestamp time.Time `json:"timestamp"` + Expires time.Time `json:"expires"` + Image string `json:"image"` Metadata json.RawMessage `json:"metadata,omitempty"` } type CreateNotification struct { @@ -97,6 +99,8 @@ type CreateNotification struct { DeliveryChannel DeliveryChannel `json:"delivery_channel,omitempty"` Payload NotificationPayload `json:"payload"` Priority int `json:"priority,omitempty"` + Expires time.Time `json:"expires"` + Image string `json:"image,omitempty"` Metadata json.RawMessage `json:"metadata,omitempty"` } diff --git a/internal/domain/shop_bet.go b/internal/domain/shop_bet.go index b46b4b6..c321a02 100644 --- a/internal/domain/shop_bet.go +++ b/internal/domain/shop_bet.go @@ -87,7 +87,7 @@ type ShopBetRes struct { BetID int64 `json:"bet_id" example:"1"` NumberOfOutcomes int64 `json:"number_of_outcomes" example:"1"` Status OutcomeStatus `json:"status" example:"1"` - Amount Currency `json:"amount"` + Amount float32 `json:"amount"` Outcomes []BetOutcome `json:"outcomes"` TransactionVerified bool `json:"transaction_verified" example:"true"` UpdatedAt time.Time `json:"updated_at" example:"2025-04-08T12:00:00Z"` @@ -119,7 +119,7 @@ func ConvertShopBetDetail(shopBet ShopBetDetail) ShopBetRes { BetID: shopBet.BetID, NumberOfOutcomes: shopBet.NumberOfOutcomes, Status: shopBet.Status, - Amount: shopBet.Amount, + Amount: shopBet.Amount.Float32(), Outcomes: shopBet.Outcomes, TransactionVerified: shopBet.TransactionVerified, UpdatedAt: shopBet.UpdatedAt, diff --git a/internal/repository/branch.go b/internal/repository/branch.go index 89b329e..4c5041d 100644 --- a/internal/repository/branch.go +++ b/internal/repository/branch.go @@ -68,8 +68,11 @@ func (s *Store) GetAllBranches(ctx context.Context, filter domain.BranchFilter) return branches, nil } -func (s *Store) SearchBranchByName(ctx context.Context, name string) ([]domain.BranchDetail, error) { - dbBranches, err := s.queries.SearchBranchByName(ctx, pgtype.Text{String: name, Valid: true}) +func (s *Store) SearchBranchByName(ctx context.Context, name string, companyID domain.ValidInt64) ([]domain.BranchDetail, error) { + dbBranches, err := s.queries.SearchBranchByName(ctx, dbgen.SearchBranchByNameParams{ + Column1: pgtype.Text{String: name, Valid: true}, + CompanyID: companyID.ToPG(), + }) if err != nil { return nil, err } diff --git a/internal/repository/notification.go b/internal/repository/notification.go index eea21e5..0c91e82 100644 --- a/internal/repository/notification.go +++ b/internal/repository/notification.go @@ -14,12 +14,13 @@ import ( type NotificationRepository interface { CreateNotification(ctx context.Context, notification *domain.Notification) (*domain.Notification, error) UpdateNotificationStatus(ctx context.Context, id, status string, isRead bool, metadata []byte) (*domain.Notification, error) - GetUserNotifications(ctx context.Context, recipientID int64, limit, offset int) ([]domain.Notification, int64, error) + GetUserNotifications(ctx context.Context, recipientID int64, limit, offset int) ([]domain.Notification, int64, error) ListFailedNotifications(ctx context.Context, limit int) ([]domain.Notification, error) ListRecipientIDs(ctx context.Context, receiver domain.NotificationRecieverSide) ([]int64, error) CountUnreadNotifications(ctx context.Context, recipient_id int64) (int64, error) GetAllNotifications(ctx context.Context, limit, offset int) ([]domain.Notification, error) GetNotificationCounts(ctx context.Context, filter domain.ReportFilter) (total, read, unread int64, err error) + DeleteOldNotifications(ctx context.Context) error } type Repository struct { @@ -69,6 +70,8 @@ func (r *Repository) CreateNotification(ctx context.Context, notification *domai Payload: marshalPayload(notification.Payload), Priority: priority, Timestamp: pgtype.Timestamptz{Time: notification.Timestamp, Valid: true}, + Expires: pgtype.Timestamptz{Time: notification.Expires, Valid: true}, + Img: pgtype.Text{String: notification.Image, Valid: notification.Image != ""}, Metadata: notification.Metadata, } @@ -113,7 +116,7 @@ func (r *Repository) GetUserNotifications(ctx context.Context, recipientID int64 if err != nil { return nil, 0, err } - + var result []domain.Notification = make([]domain.Notification, 0, len(dbNotifications)) for _, dbNotif := range dbNotifications { domainNotif := r.mapDBToDomain(&dbNotif) @@ -160,6 +163,10 @@ func (r *Repository) ListRecipientIDs(ctx context.Context, receiver domain.Notif return r.store.queries.ListRecipientIDsByReceiver(ctx, string(receiver)) } +func (s *Repository) DeleteOldNotifications(ctx context.Context) error { + return s.store.queries.DeleteOldNotifications(ctx) +} + func (r *Repository) mapDBToDomain(dbNotif *dbgen.Notification) *domain.Notification { var errorSeverity domain.NotificationErrorSeverity if dbNotif.ErrorSeverity.Valid { @@ -199,6 +206,8 @@ func (r *Repository) mapDBToDomain(dbNotif *dbgen.Notification) *domain.Notifica Payload: payload, Priority: priority, Timestamp: dbNotif.Timestamp.Time, + Expires: dbNotif.Expires.Time, + Image: dbNotif.Img.String, Metadata: dbNotif.Metadata, } } diff --git a/internal/services/bet/service.go b/internal/services/bet/service.go index 2379c3b..1e33f8b 100644 --- a/internal/services/bet/service.go +++ b/internal/services/bet/service.go @@ -46,6 +46,8 @@ var ( ErrInvalidAmount = errors.New("invalid amount") ErrBetAmountTooHigh = errors.New("cannot create a bet with an amount above limit") ErrBetWinningTooHigh = errors.New("total Winnings over set limit") + + ErrCompanyDeductedPercentInvalid = errors.New("invalid company deducted percentage") ) type Service struct { @@ -303,8 +305,9 @@ func (s *Service) PlaceBet(ctx context.Context, req domain.CreateBetReq, userID } fastCode := helpers.GenerateFastCode() - accumulator := calculateAccumulator(len(outcomes)) - amount := req.Amount + (req.Amount * accumulator) + // accumulator := calculateAccumulator(len(outcomes)) + // amount := req.Amount + (req.Amount * accumulator) + amount := req.Amount newBet := domain.CreateBet{ Amount: domain.ToCurrency(amount), @@ -524,7 +527,25 @@ func (s *Service) DeductBetFromBranchWallet(ctx context.Context, amount float32, return err } - deductedAmount := amount * company.DeductedPercentage + if company.DeductedPercentage > 1 { + s.mongoLogger.Error("Invalid company deducted percentage", + zap.Int64("wallet_id", walletID), + zap.Float32("amount", company.DeductedPercentage), + zap.Error(err), + ) + return ErrCompanyDeductedPercentInvalid + } + + deductedAmount := amount - (amount * company.DeductedPercentage) + + if deductedAmount == 0 { + s.mongoLogger.Fatal("Amount", + zap.Int64("wallet_id", walletID), + zap.Float32("amount", deductedAmount), + zap.Error(err), + ) + return err + } _, err = s.walletSvc.DeductFromWallet(ctx, walletID, domain.ToCurrency(deductedAmount), domain.ValidInt64{ Value: userID, diff --git a/internal/services/branch/port.go b/internal/services/branch/port.go index 8b17ae1..d23f91a 100644 --- a/internal/services/branch/port.go +++ b/internal/services/branch/port.go @@ -12,7 +12,7 @@ type BranchStore interface { GetBranchByManagerID(ctx context.Context, branchManagerID int64) ([]domain.BranchDetail, error) GetBranchByCompanyID(ctx context.Context, companyID int64) ([]domain.BranchDetail, error) GetAllBranches(ctx context.Context, filter domain.BranchFilter) ([]domain.BranchDetail, error) - SearchBranchByName(ctx context.Context, name string) ([]domain.BranchDetail, error) + SearchBranchByName(ctx context.Context, name string, companyID domain.ValidInt64) ([]domain.BranchDetail, error) UpdateBranch(ctx context.Context, branch domain.UpdateBranch) (domain.Branch, error) DeleteBranch(ctx context.Context, id int64) error CreateBranchOperation(ctx context.Context, branchOperation domain.CreateBranchOperation) error diff --git a/internal/services/branch/service.go b/internal/services/branch/service.go index 9e4f641..1b44651 100644 --- a/internal/services/branch/service.go +++ b/internal/services/branch/service.go @@ -54,8 +54,8 @@ func (s *Service) GetAllSupportedOperations(ctx context.Context) ([]domain.Suppo return s.branchStore.GetAllSupportedOperations(ctx) } -func (s *Service) SearchBranchByName(ctx context.Context, name string) ([]domain.BranchDetail, error) { - return s.branchStore.SearchBranchByName(ctx, name) +func (s *Service) SearchBranchByName(ctx context.Context, name string, companyID domain.ValidInt64) ([]domain.BranchDetail, error) { + return s.branchStore.SearchBranchByName(ctx, name, companyID) } func (s *Service) UpdateBranch(ctx context.Context, branch domain.UpdateBranch) (domain.Branch, error) { return s.branchStore.UpdateBranch(ctx, branch) diff --git a/internal/services/kafka/consumer.go b/internal/services/kafka/consumer.go index c2343fe..fb55eea 100644 --- a/internal/services/kafka/consumer.go +++ b/internal/services/kafka/consumer.go @@ -1,67 +1,67 @@ package kafka -import ( - "context" - "encoding/json" - "log" +// import ( +// "context" +// "encoding/json" +// "log" - "github.com/SamuelTariku/FortuneBet-Backend/internal/event" - "github.com/SamuelTariku/FortuneBet-Backend/internal/web_server/ws" - "github.com/segmentio/kafka-go" -) +// "github.com/SamuelTariku/FortuneBet-Backend/internal/event" +// "github.com/SamuelTariku/FortuneBet-Backend/internal/web_server/ws" +// "github.com/segmentio/kafka-go" +// ) -type WalletConsumer struct { - reader *kafka.Reader - hub *ws.NotificationHub - topic string - groupID string - brokers []string -} +// type WalletConsumer struct { +// reader *kafka.Reader +// hub *ws.NotificationHub +// topic string +// groupID string +// brokers []string +// } -func NewWalletConsumer(brokers []string, topic, groupID string, hub *ws.NotificationHub) *WalletConsumer { - return &WalletConsumer{ - brokers: brokers, - topic: topic, - groupID: groupID, - hub: hub, - reader: kafka.NewReader(kafka.ReaderConfig{ - Brokers: brokers, - GroupID: groupID, - Topic: topic, - }), - } -} +// func NewWalletConsumer(brokers []string, topic, groupID string, hub *ws.NotificationHub) *WalletConsumer { +// return &WalletConsumer{ +// brokers: brokers, +// topic: topic, +// groupID: groupID, +// hub: hub, +// reader: kafka.NewReader(kafka.ReaderConfig{ +// Brokers: brokers, +// GroupID: groupID, +// Topic: topic, +// }), +// } +// } -func (c *WalletConsumer) Start(ctx context.Context) { - go func() { - for { - m, err := c.reader.ReadMessage(ctx) - if err != nil { - log.Printf("Error reading wallet Kafka message: %v", err) - continue - } +// func (c *WalletConsumer) Start(ctx context.Context) { +// go func() { +// for { +// m, err := c.reader.ReadMessage(ctx) +// if err != nil { +// log.Printf("Error reading wallet Kafka message: %v", err) +// continue +// } - var evt event.WalletEvent - if err := json.Unmarshal(m.Value, &evt); err != nil { - log.Printf("Failed to unmarshal wallet event: %v", err) - continue - } +// var evt event.WalletEvent +// if err := json.Unmarshal(m.Value, &evt); err != nil { +// log.Printf("Failed to unmarshal wallet event: %v", err) +// continue +// } - payload := map[string]interface{}{ - "type": evt.EventType, - "wallet_id": evt.WalletID, - "user_id": evt.UserID, - "balance": evt.Balance, - "wallet_type": evt.WalletType, - "trigger": evt.Trigger, - "recipient_id": evt.UserID, - } +// payload := map[string]interface{}{ +// "type": evt.EventType, +// "wallet_id": evt.WalletID, +// "user_id": evt.UserID, +// "balance": evt.Balance, +// "wallet_type": evt.WalletType, +// "trigger": evt.Trigger, +// "recipient_id": evt.UserID, +// } - // Broadcast to appropriate WebSocket clients - c.hub.Broadcast <- payload - } - }() -} +// // Broadcast to appropriate WebSocket clients +// c.hub.Broadcast <- payload +// } +// }() +// } // func (c *WalletConsumer) Shutdown() error { // return c.reader.Close() diff --git a/internal/services/kafka/producer.go b/internal/services/kafka/producer.go index 8c03b5c..e2720ee 100644 --- a/internal/services/kafka/producer.go +++ b/internal/services/kafka/producer.go @@ -1,36 +1,36 @@ package kafka -import ( - "context" - "encoding/json" - "time" +// import ( +// "context" +// "encoding/json" +// "time" - "github.com/segmentio/kafka-go" -) +// "github.com/segmentio/kafka-go" +// ) -type Producer struct { - writer *kafka.Writer -} +// type Producer struct { +// writer *kafka.Writer +// } -func NewProducer(brokers []string, topic string) *Producer { - return &Producer{ - writer: &kafka.Writer{ - Addr: kafka.TCP(brokers...), - Topic: topic, - Balancer: &kafka.LeastBytes{}, - }, - } -} +// func NewProducer(brokers []string, topic string) *Producer { +// return &Producer{ +// writer: &kafka.Writer{ +// Addr: kafka.TCP(brokers...), +// Topic: topic, +// Balancer: &kafka.LeastBytes{}, +// }, +// } +// } -func (p *Producer) Publish(ctx context.Context, key string, event any) error { - msgBytes, err := json.Marshal(event) - if err != nil { - return err - } +// func (p *Producer) Publish(ctx context.Context, key string, event any) error { +// msgBytes, err := json.Marshal(event) +// if err != nil { +// return err +// } - return p.writer.WriteMessages(ctx, kafka.Message{ - Key: []byte(key), - Value: msgBytes, - Time: time.Now(), - }) -} +// return p.writer.WriteMessages(ctx, kafka.Message{ +// Key: []byte(key), +// Value: msgBytes, +// Time: time.Now(), +// }) +// } diff --git a/internal/services/notification/service.go b/internal/services/notification/service.go index bc31c02..283ed9a 100644 --- a/internal/services/notification/service.go +++ b/internal/services/notification/service.go @@ -2,7 +2,6 @@ package notificationservice import ( "context" - "encoding/json" "fmt" // "errors" @@ -12,19 +11,19 @@ import ( "github.com/SamuelTariku/FortuneBet-Backend/internal/config" "github.com/SamuelTariku/FortuneBet-Backend/internal/domain" - "github.com/SamuelTariku/FortuneBet-Backend/internal/event" + "github.com/SamuelTariku/FortuneBet-Backend/internal/pkgs/helpers" "github.com/SamuelTariku/FortuneBet-Backend/internal/repository" "github.com/SamuelTariku/FortuneBet-Backend/internal/services/messenger" "github.com/SamuelTariku/FortuneBet-Backend/internal/services/user" - "github.com/segmentio/kafka-go" + // "github.com/segmentio/kafka-go" "go.uber.org/zap" // "github.com/SamuelTariku/FortuneBet-Backend/internal/services/wallet" "github.com/SamuelTariku/FortuneBet-Backend/internal/web_server/ws" // afro "github.com/amanuelabay/afrosms-go" "github.com/gorilla/websocket" - "github.com/redis/go-redis/v9" + // "github.com/redis/go-redis/v9" ) type Service struct { @@ -39,8 +38,6 @@ type Service struct { messengerSvc *messenger.Service mongoLogger *zap.Logger logger *slog.Logger - redisClient *redis.Client - reader *kafka.Reader } func New(repo repository.NotificationRepository, @@ -49,17 +46,8 @@ func New(repo repository.NotificationRepository, cfg *config.Config, messengerSvc *messenger.Service, userSvc *user.Service, - kafkaBrokers []string, ) *Service { hub := ws.NewNotificationHub() - rdb := redis.NewClient(&redis.Options{ - Addr: cfg.RedisAddr, // e.g., "redis:6379" - }) - walletReader := kafka.NewReader(kafka.ReaderConfig{ - Brokers: kafkaBrokers, - Topic: "wallet-balance-topic", - GroupID: "notification-service-group", // Each service should have its own group - }) svc := &Service{ repo: repo, @@ -72,15 +60,13 @@ func New(repo repository.NotificationRepository, messengerSvc: messengerSvc, userSvc: userSvc, config: cfg, - redisClient: rdb, - reader: walletReader, } go hub.Run() go svc.startWorker() go svc.startRetryWorker() - go svc.RunRedisSubscriber(context.Background()) - go svc.StartKafkaConsumer(context.Background()) + // go svc.RunRedisSubscriber(context.Background()) + // go svc.StartKafkaConsumer(context.Background()) return svc } @@ -484,189 +470,192 @@ func (s *Service) CountUnreadNotifications(ctx context.Context, recipient_id int return s.repo.CountUnreadNotifications(ctx, recipient_id) } +func (s *Service) DeleteOldNotifications(ctx context.Context) error { + return s.repo.DeleteOldNotifications(ctx) +} + // func (s *Service) GetNotificationCounts(ctx context.Context, filter domain.ReportFilter) (total, read, unread int64, err error){ // return s.repo.Get(ctx, filter) // } -func (s *Service) RunRedisSubscriber(ctx context.Context) { - pubsub := s.redisClient.Subscribe(ctx, "live_metrics") - defer pubsub.Close() +// func (s *Service) RunRedisSubscriber(ctx context.Context) { +// pubsub := s.redisClient.Subscribe(ctx, "live_metrics") +// defer pubsub.Close() - ch := pubsub.Channel() - for msg := range ch { - var parsed map[string]interface{} - if err := json.Unmarshal([]byte(msg.Payload), &parsed); err != nil { - // s.logger.Error("invalid Redis message format", "payload", msg.Payload, "error", err) - s.mongoLogger.Error("invalid Redis message format", - zap.String("payload", msg.Payload), - zap.Error(err), - zap.Time("timestamp", time.Now()), - ) - continue - } +// ch := pubsub.Channel() +// for msg := range ch { +// var parsed map[string]interface{} +// if err := json.Unmarshal([]byte(msg.Payload), &parsed); err != nil { +// // s.logger.Error("invalid Redis message format", "payload", msg.Payload, "error", err) +// s.mongoLogger.Error("invalid Redis message format", +// zap.String("payload", msg.Payload), +// zap.Error(err), +// zap.Time("timestamp", time.Now()), +// ) +// continue +// } - eventType, _ := parsed["type"].(string) - payload := parsed["payload"] - recipientID, hasRecipient := parsed["recipient_id"] - recipientType, _ := parsed["recipient_type"].(string) +// eventType, _ := parsed["type"].(string) +// payload := parsed["payload"] +// recipientID, hasRecipient := parsed["recipient_id"] +// recipientType, _ := parsed["recipient_type"].(string) - message := map[string]interface{}{ - "type": eventType, - "payload": payload, - } +// message := map[string]interface{}{ +// "type": eventType, +// "payload": payload, +// } - if hasRecipient { - message["recipient_id"] = recipientID - message["recipient_type"] = recipientType - } +// if hasRecipient { +// message["recipient_id"] = recipientID +// message["recipient_type"] = recipientType +// } - s.Hub.Broadcast <- message - } -} +// s.Hub.Broadcast <- message +// } +// } -func (s *Service) UpdateLiveWalletMetrics(ctx context.Context, companies []domain.GetCompany, branches []domain.BranchWallet) error { - const key = "live_metrics" +// func (s *Service) UpdateLiveWalletMetrics(ctx context.Context, companies []domain.GetCompany, branches []domain.BranchWallet) error { +// const key = "live_metrics" - companyBalances := make([]domain.CompanyWalletBalance, 0, len(companies)) - for _, c := range companies { - companyBalances = append(companyBalances, domain.CompanyWalletBalance{ - CompanyID: c.ID, - CompanyName: c.Name, - Balance: float64(c.WalletBalance.Float32()), - }) - } +// companyBalances := make([]domain.CompanyWalletBalance, 0, len(companies)) +// for _, c := range companies { +// companyBalances = append(companyBalances, domain.CompanyWalletBalance{ +// CompanyID: c.ID, +// CompanyName: c.Name, +// Balance: float64(c.WalletBalance.Float32()), +// }) +// } - branchBalances := make([]domain.BranchWalletBalance, 0, len(branches)) - for _, b := range branches { - branchBalances = append(branchBalances, domain.BranchWalletBalance{ - BranchID: b.ID, - BranchName: b.Name, - CompanyID: b.CompanyID, - Balance: float64(b.Balance.Float32()), - }) - } +// branchBalances := make([]domain.BranchWalletBalance, 0, len(branches)) +// for _, b := range branches { +// branchBalances = append(branchBalances, domain.BranchWalletBalance{ +// BranchID: b.ID, +// BranchName: b.Name, +// CompanyID: b.CompanyID, +// Balance: float64(b.Balance.Float32()), +// }) +// } - payload := domain.LiveWalletMetrics{ - Timestamp: time.Now(), - CompanyBalances: companyBalances, - BranchBalances: branchBalances, - } +// payload := domain.LiveWalletMetrics{ +// Timestamp: time.Now(), +// CompanyBalances: companyBalances, +// BranchBalances: branchBalances, +// } - updatedData, err := json.Marshal(payload) - if err != nil { - return err - } +// updatedData, err := json.Marshal(payload) +// if err != nil { +// return err +// } - if err := s.redisClient.Set(ctx, key, updatedData, 0).Err(); err != nil { - return err - } +// if err := s.redisClient.Set(ctx, key, updatedData, 0).Err(); err != nil { +// return err +// } - if err := s.redisClient.Publish(ctx, key, updatedData).Err(); err != nil { - return err - } - return nil -} +// if err := s.redisClient.Publish(ctx, key, updatedData).Err(); err != nil { +// return err +// } +// return nil +// } -func (s *Service) GetLiveMetrics(ctx context.Context) (domain.LiveMetric, error) { - const key = "live_metrics" - var metric domain.LiveMetric +// func (s *Service) GetLiveMetrics(ctx context.Context) (domain.LiveMetric, error) { +// const key = "live_metrics" +// var metric domain.LiveMetric - val, err := s.redisClient.Get(ctx, key).Result() - if err == redis.Nil { - // Key does not exist yet, return zero-valued struct - return domain.LiveMetric{}, nil - } else if err != nil { - return domain.LiveMetric{}, err - } +// val, err := s.redisClient.Get(ctx, key).Result() +// if err == redis.Nil { +// // Key does not exist yet, return zero-valued struct +// return domain.LiveMetric{}, nil +// } else if err != nil { +// return domain.LiveMetric{}, err +// } - if err := json.Unmarshal([]byte(val), &metric); err != nil { - return domain.LiveMetric{}, err - } +// if err := json.Unmarshal([]byte(val), &metric); err != nil { +// return domain.LiveMetric{}, err +// } - return metric, nil -} +// return metric, nil +// } +// func (s *Service) StartKafkaConsumer(ctx context.Context) { +// go func() { +// for { +// m, err := s.reader.ReadMessage(ctx) +// if err != nil { +// if err == context.Canceled { +// s.mongoLogger.Info("[NotificationSvc.KafkaConsumer] Stopped by context") +// return +// } +// s.mongoLogger.Error("[NotificationSvc.KafkaConsumer] Error reading message", +// zap.Error(err), +// zap.Time("timestamp", time.Now()), +// ) +// time.Sleep(1 * time.Second) // backoff +// continue +// } -func (s *Service) StartKafkaConsumer(ctx context.Context) { - go func() { - for { - m, err := s.reader.ReadMessage(ctx) - if err != nil { - if err == context.Canceled { - s.mongoLogger.Info("[NotificationSvc.KafkaConsumer] Stopped by context") - return - } - s.mongoLogger.Error("[NotificationSvc.KafkaConsumer] Error reading message", - zap.Error(err), - zap.Time("timestamp", time.Now()), - ) - time.Sleep(1 * time.Second) // backoff - continue - } +// var walletEvent event.WalletEvent +// if err := json.Unmarshal(m.Value, &walletEvent); err != nil { +// s.mongoLogger.Error("[NotificationSvc.KafkaConsumer] Failed to unmarshal wallet event", +// zap.String("message", string(m.Value)), +// zap.Error(err), +// zap.Time("timestamp", time.Now()), +// ) +// continue +// } - var walletEvent event.WalletEvent - if err := json.Unmarshal(m.Value, &walletEvent); err != nil { - s.mongoLogger.Error("[NotificationSvc.KafkaConsumer] Failed to unmarshal wallet event", - zap.String("message", string(m.Value)), - zap.Error(err), - zap.Time("timestamp", time.Now()), - ) - continue - } +// raw, _ := json.Marshal(map[string]any{ +// "balance": walletEvent.Balance.Float32(), +// "type": walletEvent.WalletType, +// "timestamp": time.Now(), +// }) - raw, _ := json.Marshal(map[string]any{ - "balance": walletEvent.Balance.Float32(), - "type": walletEvent.WalletType, - "timestamp": time.Now(), - }) +// headline := "" +// message := "" +// var receiver domain.NotificationRecieverSide +// switch walletEvent.WalletType { - headline := "" - message := "" - var receiver domain.NotificationRecieverSide - switch walletEvent.WalletType { +// case domain.StaticWalletType: +// headline = "Referral and Bonus Wallet Updated" +// message = fmt.Sprintf("Your referral and bonus wallet balance is now %.2f", walletEvent.Balance.Float32()) +// receiver = domain.NotificationRecieverSideCustomer +// case domain.RegularWalletType: +// headline = "Wallet Updated" +// message = fmt.Sprintf("Your wallet balance is now %.2f", walletEvent.Balance.Float32()) +// receiver = domain.NotificationRecieverSideCustomer +// case domain.BranchWalletType: +// headline = "Branch Wallet Updated" +// message = fmt.Sprintf("branch wallet balance is now %.2f", walletEvent.Balance.Float32()) +// receiver = domain.NotificationRecieverSideBranchManager +// case domain.CompanyWalletType: +// headline = "Company Wallet Updated" +// message = fmt.Sprintf("company wallet balance is now %.2f", walletEvent.Balance.Float32()) +// receiver = domain.NotificationRecieverSideAdmin +// } +// // Handle the wallet event: send notification +// notification := &domain.Notification{ +// RecipientID: walletEvent.UserID, +// DeliveryChannel: domain.DeliveryChannelInApp, +// Reciever: receiver, +// Type: domain.NotificationTypeWalletUpdated, +// DeliveryStatus: domain.DeliveryStatusPending, +// IsRead: false, +// Level: domain.NotificationLevelInfo, +// Priority: 2, +// Metadata: raw, +// Payload: domain.NotificationPayload{ +// Headline: headline, +// Message: message, +// }, +// } - case domain.StaticWalletType: - headline = "Referral and Bonus Wallet Updated" - message = fmt.Sprintf("Your referral and bonus wallet balance is now %.2f", walletEvent.Balance.Float32()) - receiver = domain.NotificationRecieverSideCustomer - case domain.RegularWalletType: - headline = "Wallet Updated" - message = fmt.Sprintf("Your wallet balance is now %.2f", walletEvent.Balance.Float32()) - receiver = domain.NotificationRecieverSideCustomer - case domain.BranchWalletType: - headline = "Branch Wallet Updated" - message = fmt.Sprintf("branch wallet balance is now %.2f", walletEvent.Balance.Float32()) - receiver = domain.NotificationRecieverSideBranchManager - case domain.CompanyWalletType: - headline = "Company Wallet Updated" - message = fmt.Sprintf("company wallet balance is now %.2f", walletEvent.Balance.Float32()) - receiver = domain.NotificationRecieverSideAdmin - } - // Handle the wallet event: send notification - notification := &domain.Notification{ - RecipientID: walletEvent.UserID, - DeliveryChannel: domain.DeliveryChannelInApp, - Reciever: receiver, - Type: domain.NotificationTypeWalletUpdated, - DeliveryStatus: domain.DeliveryStatusPending, - IsRead: false, - Level: domain.NotificationLevelInfo, - Priority: 2, - Metadata: raw, - Payload: domain.NotificationPayload{ - Headline: headline, - Message: message, - }, - } - - if err := s.SendNotification(ctx, notification); err != nil { - s.mongoLogger.Error("[NotificationSvc.KafkaConsumer] Failed to send notification", - zap.Error(err), - zap.Time("timestamp", time.Now()), - ) - } - } - }() -} +// if err := s.SendNotification(ctx, notification); err != nil { +// s.mongoLogger.Error("[NotificationSvc.KafkaConsumer] Failed to send notification", +// zap.Error(err), +// zap.Time("timestamp", time.Now()), +// ) +// } +// } +// }() +// } // func (s *Service) UpdateLiveWalletMetricForWallet(ctx context.Context, wallet domain.Wallet) { // var ( diff --git a/internal/services/transaction/shop_deposit.go b/internal/services/transaction/shop_deposit.go index cc08874..aaef480 100644 --- a/internal/services/transaction/shop_deposit.go +++ b/internal/services/transaction/shop_deposit.go @@ -54,7 +54,7 @@ func (s *Service) CreateShopDeposit(ctx context.Context, userID int64, role doma // } newTransaction, err := s.CreateShopTransaction(ctx, domain.CreateShopTransaction{ - Amount: domain.Currency(req.Amount), + Amount: domain.ToCurrency(req.Amount), BranchID: branchID, CompanyID: companyID, UserID: userID, diff --git a/internal/services/wallet/direct_deposit.go b/internal/services/wallet/direct_deposit.go index a049d66..11ef1bf 100644 --- a/internal/services/wallet/direct_deposit.go +++ b/internal/services/wallet/direct_deposit.go @@ -8,7 +8,6 @@ import ( "time" "github.com/SamuelTariku/FortuneBet-Backend/internal/domain" - "github.com/SamuelTariku/FortuneBet-Backend/internal/event" ) // InitiateDirectDeposit creates a pending deposit request @@ -73,8 +72,8 @@ func (s *Service) VerifyDirectDeposit( } // Publish wallet update event - go s.publishWalletUpdate(ctx, deposit.WalletID, deposit.Wallet.UserID, - deposit.Wallet.Balance+deposit.Amount, "direct_deposit_verified") + // go s.publishWalletUpdate(ctx, deposit.WalletID, deposit.Wallet.UserID, + // deposit.Wallet.Balance+deposit.Amount, "direct_deposit_verified") // Update deposit status deposit.Status = domain.DepositStatusCompleted @@ -206,12 +205,12 @@ func (s *Service) notifyCustomerVerificationResult(ctx context.Context, deposit } } -func (s *Service) publishWalletUpdate(ctx context.Context, walletID, userID int64, newBalance domain.Currency, trigger string) { - s.kafkaProducer.Publish(ctx, fmt.Sprint(walletID), event.WalletEvent{ - EventType: event.WalletBalanceUpdated, - WalletID: walletID, - UserID: userID, - Balance: newBalance, - Trigger: trigger, - }) -} +// func (s *Service) publishWalletUpdate(ctx context.Context, walletID, userID int64, newBalance domain.Currency, trigger string) { +// s.kafkaProducer.Publish(ctx, fmt.Sprint(walletID), event.WalletEvent{ +// EventType: event.WalletBalanceUpdated, +// WalletID: walletID, +// UserID: userID, +// Balance: newBalance, +// Trigger: trigger, +// }) +// } diff --git a/internal/services/wallet/notification.go b/internal/services/wallet/notification.go index 6580ef6..0112492 100644 --- a/internal/services/wallet/notification.go +++ b/internal/services/wallet/notification.go @@ -2,10 +2,12 @@ package wallet import ( "context" + "encoding/json" "fmt" + "time" + "github.com/SamuelTariku/FortuneBet-Backend/internal/domain" "go.uber.org/zap" - "time" ) func (s *Service) GetAdminNotificationRecipients(ctx context.Context, walletID int64, walletType domain.WalletType) ([]int64, error) { @@ -63,12 +65,65 @@ func (s *Service) GetAdminNotificationRecipients(ctx context.Context, walletID i return recipients, nil } +func (s *Service) SendWalletUpdateNotification(ctx context.Context, wallet domain.Wallet) error { + raw, _ := json.Marshal(map[string]any{ + "balance": wallet.Balance.Float32(), + "type": wallet.Type, + "timestamp": time.Now(), + }) + headline := "" + message := "" + var receiver domain.NotificationRecieverSide + switch wallet.Type { + case domain.StaticWalletType: + headline = "Referral and Bonus Wallet Updated" + message = fmt.Sprintf("Your referral and bonus wallet balance is now %.2f", wallet.Balance.Float32()) + receiver = domain.NotificationRecieverSideCustomer + case domain.RegularWalletType: + headline = "Wallet Updated" + message = fmt.Sprintf("Your wallet balance is now %.2f", wallet.Balance.Float32()) + receiver = domain.NotificationRecieverSideCustomer + case domain.BranchWalletType: + headline = "Branch Wallet Updated" + message = fmt.Sprintf("branch wallet balance is now %.2f", wallet.Balance.Float32()) + receiver = domain.NotificationRecieverSideBranchManager + case domain.CompanyWalletType: + headline = "Company Wallet Updated" + message = fmt.Sprintf("company wallet balance is now %.2f", wallet.Balance.Float32()) + receiver = domain.NotificationRecieverSideAdmin + } + // Handle the wallet event: send notification + notification := &domain.Notification{ + RecipientID: wallet.UserID, + DeliveryChannel: domain.DeliveryChannelInApp, + Reciever: receiver, + Type: domain.NotificationTypeWalletUpdated, + DeliveryStatus: domain.DeliveryStatusPending, + IsRead: false, + Level: domain.NotificationLevelInfo, + Priority: 2, + Metadata: raw, + Payload: domain.NotificationPayload{ + Headline: headline, + Message: message, + }, + } + if err := s.notificationSvc.SendNotification(ctx, notification); err != nil { + s.mongoLogger.Error("[WalletSvc.SendWalletUpdateNotification] Failed to send notification", + zap.Error(err), + zap.Time("timestamp", time.Now()), + ) + return err + } + + return nil +} func (s *Service) SendAdminWalletLowNotification(ctx context.Context, adminWallet domain.Wallet) error { // Send different messages - + // Send notification to admin team adminNotification := &domain.Notification{ ErrorSeverity: "low", diff --git a/internal/services/wallet/service.go b/internal/services/wallet/service.go index 2e248a5..27616cb 100644 --- a/internal/services/wallet/service.go +++ b/internal/services/wallet/service.go @@ -3,7 +3,7 @@ package wallet import ( "log/slog" - "github.com/SamuelTariku/FortuneBet-Backend/internal/services/kafka" + // "github.com/SamuelTariku/FortuneBet-Backend/internal/services/kafka" notificationservice "github.com/SamuelTariku/FortuneBet-Backend/internal/services/notification" "github.com/SamuelTariku/FortuneBet-Backend/internal/services/user" "go.uber.org/zap" @@ -18,7 +18,6 @@ type Service struct { userSvc *user.Service mongoLogger *zap.Logger logger *slog.Logger - kafkaProducer *kafka.Producer } func NewService( @@ -29,7 +28,6 @@ func NewService( userSvc *user.Service, mongoLogger *zap.Logger, logger *slog.Logger, - kafkaProducer *kafka.Producer, ) *Service { return &Service{ walletStore: walletStore, @@ -40,6 +38,5 @@ func NewService( userSvc: userSvc, mongoLogger: mongoLogger, logger: logger, - kafkaProducer: kafkaProducer, } } diff --git a/internal/services/wallet/wallet.go b/internal/services/wallet/wallet.go index 54ad2e6..a5b3f49 100644 --- a/internal/services/wallet/wallet.go +++ b/internal/services/wallet/wallet.go @@ -3,10 +3,12 @@ package wallet import ( "context" "errors" - "fmt" + + // "fmt" "github.com/SamuelTariku/FortuneBet-Backend/internal/domain" - "github.com/SamuelTariku/FortuneBet-Backend/internal/event" + "go.uber.org/zap" + // "github.com/SamuelTariku/FortuneBet-Backend/internal/event" ) var ( @@ -92,16 +94,22 @@ func (s *Service) UpdateBalance(ctx context.Context, id int64, balance domain.Cu return err } - go func() { - s.kafkaProducer.Publish(ctx, fmt.Sprint(wallet.UserID), event.WalletEvent{ - EventType: event.WalletBalanceUpdated, - WalletID: wallet.ID, - UserID: wallet.UserID, - Balance: balance, - WalletType: wallet.Type, - Trigger: "UpdateBalance", - }) - }() + // go func() { + // s.kafkaProducer.Publish(ctx, fmt.Sprint(wallet.UserID), event.WalletEvent{ + // EventType: event.WalletBalanceUpdated, + // WalletID: wallet.ID, + // UserID: wallet.UserID, + // Balance: balance, + // WalletType: wallet.Type, + // Trigger: "UpdateBalance", + // }) + // }() + + if err := s.SendWalletUpdateNotification(ctx, wallet); err != nil { + s.mongoLogger.Info("Failed to send wallet update notification", + zap.Int64("wallet_id", wallet.ID), + zap.Error(err)) + } return nil } @@ -118,16 +126,21 @@ func (s *Service) AddToWallet( return domain.Transfer{}, err } - go func() { - s.kafkaProducer.Publish(ctx, fmt.Sprint(wallet.ID), event.WalletEvent{ - EventType: event.WalletBalanceUpdated, - WalletID: wallet.ID, - UserID: wallet.UserID, - Balance: wallet.Balance + amount, - WalletType: wallet.Type, - Trigger: "AddToWallet", - }) - }() + // go func() { + // s.kafkaProducer.Publish(ctx, fmt.Sprint(wallet.ID), event.WalletEvent{ + // EventType: event.WalletBalanceUpdated, + // WalletID: wallet.ID, + // UserID: wallet.UserID, + // Balance: wallet.Balance + amount, + // WalletType: wallet.Type, + // Trigger: "AddToWallet", + // }) + // }() + if err := s.SendWalletUpdateNotification(ctx, wallet); err != nil { + s.mongoLogger.Info("Failed to send wallet update notification", + zap.Int64("wallet_id", wallet.ID), + zap.Error(err)) + } // Log the transfer here for reference newTransfer, err := s.transferStore.CreateTransfer(ctx, domain.CreateTransfer{ @@ -184,17 +197,23 @@ func (s *Service) DeductFromWallet(ctx context.Context, id int64, amount domain. return domain.Transfer{}, nil } - go func() { - s.kafkaProducer.Publish(ctx, fmt.Sprint(wallet.ID), event.WalletEvent{ - EventType: event.WalletBalanceUpdated, - WalletID: wallet.ID, - UserID: wallet.UserID, - Balance: wallet.Balance - amount, - WalletType: wallet.Type, - Trigger: "DeductFromWallet", - }) - }() + // go func() { + // s.kafkaProducer.Publish(ctx, fmt.Sprint(wallet.ID), event.WalletEvent{ + // EventType: event.WalletBalanceUpdated, + // WalletID: wallet.ID, + // UserID: wallet.UserID, + // Balance: wallet.Balance - amount, + // WalletType: wallet.Type, + // Trigger: "DeductFromWallet", + // }) + // }() + if err := s.SendWalletUpdateNotification(ctx, wallet); err != nil { + s.mongoLogger.Info("Failed to send wallet update notification", + zap.Int64("wallet_id", wallet.ID), + zap.Error(err)) + } + // Log the transfer here for reference newTransfer, err := s.transferStore.CreateTransfer(ctx, domain.CreateTransfer{ Message: message, diff --git a/internal/web_server/cron.go b/internal/web_server/cron.go index f1caaac..bcdce17 100644 --- a/internal/web_server/cron.go +++ b/internal/web_server/cron.go @@ -11,6 +11,7 @@ import ( betSvc "github.com/SamuelTariku/FortuneBet-Backend/internal/services/bet" enetpulse "github.com/SamuelTariku/FortuneBet-Backend/internal/services/enet_pulse" eventsvc "github.com/SamuelTariku/FortuneBet-Backend/internal/services/event" + notificationservice "github.com/SamuelTariku/FortuneBet-Backend/internal/services/notification" oddssvc "github.com/SamuelTariku/FortuneBet-Backend/internal/services/odds" "github.com/SamuelTariku/FortuneBet-Backend/internal/services/report" resultsvc "github.com/SamuelTariku/FortuneBet-Backend/internal/services/result" @@ -27,75 +28,75 @@ func StartDataFetchingCrons(eventService eventsvc.Service, oddsService oddssvc.S spec string task func() }{ - { - spec: "0 0 * * * *", // Every 1 hour - task: func() { - mongoLogger.Info("Began fetching upcoming events cron task") - if err := eventService.FetchUpcomingEvents(context.Background()); err != nil { - mongoLogger.Error("Failed to fetch upcoming events", - zap.Error(err), - ) - } else { - mongoLogger.Info("Completed fetching upcoming events without errors") - } - }, - }, - { - spec: "0 0 * * * *", // Every 1 hour (since its takes that long to fetch all the events) - task: func() { - mongoLogger.Info("Began fetching non live odds cron task") - if err := oddsService.FetchNonLiveOdds(context.Background()); err != nil { - mongoLogger.Error("Failed to fetch non live odds", - zap.Error(err), - ) - } else { - mongoLogger.Info("Completed fetching non live odds without errors") - } - }, - }, - { - spec: "0 */5 * * * *", // Every 5 Minutes - task: func() { - mongoLogger.Info("Began update all expired events status cron task") - if _, err := resultService.CheckAndUpdateExpiredB365Events(context.Background()); err != nil { - mongoLogger.Error("Failed to update expired events status", - zap.Error(err), - ) - } else { - mongoLogger.Info("Completed expired events without errors") - } - }, - }, - { - spec: "0 */15 * * * *", // Every 15 Minutes - task: func() { - mongoLogger.Info("Began updating bets based on event results cron task") - if err := resultService.FetchB365ResultAndUpdateBets(context.Background()); err != nil { - mongoLogger.Error("Failed to process result", - zap.Error(err), - ) - } else { - mongoLogger.Info("Completed processing all event result outcomes without errors") - } - }, - }, - { - spec: "0 0 0 * * 1", // Every Monday - task: func() { - mongoLogger.Info("Began Send weekly result notification cron task") - if err := resultService.CheckAndSendResultNotifications(context.Background(), time.Now().Add(-7*24*time.Hour)); err != nil { - mongoLogger.Error("Failed to process result", - zap.Error(err), - ) - } else { - mongoLogger.Info("Completed sending weekly result notification without errors") - } - }, - }, + // { + // spec: "0 0 * * * *", // Every 1 hour + // task: func() { + // mongoLogger.Info("Began fetching upcoming events cron task") + // if err := eventService.FetchUpcomingEvents(context.Background()); err != nil { + // mongoLogger.Error("Failed to fetch upcoming events", + // zap.Error(err), + // ) + // } else { + // mongoLogger.Info("Completed fetching upcoming events without errors") + // } + // }, + // }, + // { + // spec: "0 0 * * * *", // Every 1 hour (since its takes that long to fetch all the events) + // task: func() { + // mongoLogger.Info("Began fetching non live odds cron task") + // if err := oddsService.FetchNonLiveOdds(context.Background()); err != nil { + // mongoLogger.Error("Failed to fetch non live odds", + // zap.Error(err), + // ) + // } else { + // mongoLogger.Info("Completed fetching non live odds without errors") + // } + // }, + // }, + // { + // spec: "0 */5 * * * *", // Every 5 Minutes + // task: func() { + // mongoLogger.Info("Began update all expired events status cron task") + // if _, err := resultService.CheckAndUpdateExpiredB365Events(context.Background()); err != nil { + // mongoLogger.Error("Failed to update expired events status", + // zap.Error(err), + // ) + // } else { + // mongoLogger.Info("Completed expired events without errors") + // } + // }, + // }, + // { + // spec: "0 */15 * * * *", // Every 15 Minutes + // task: func() { + // mongoLogger.Info("Began updating bets based on event results cron task") + // if err := resultService.FetchB365ResultAndUpdateBets(context.Background()); err != nil { + // mongoLogger.Error("Failed to process result", + // zap.Error(err), + // ) + // } else { + // mongoLogger.Info("Completed processing all event result outcomes without errors") + // } + // }, + // }, + // { + // spec: "0 0 0 * * 1", // Every Monday + // task: func() { + // mongoLogger.Info("Began Send weekly result notification cron task") + // if err := resultService.CheckAndSendResultNotifications(context.Background(), time.Now().Add(-7*24*time.Hour)); err != nil { + // mongoLogger.Error("Failed to process result", + // zap.Error(err), + // ) + // } else { + // mongoLogger.Info("Completed sending weekly result notification without errors") + // } + // }, + // }, } for _, job := range schedule { - // job.task() + job.task() if _, err := c.AddFunc(job.spec, job.task); err != nil { mongoLogger.Error("Failed to schedule data fetching cron job", zap.Error(err), @@ -108,7 +109,7 @@ func StartDataFetchingCrons(eventService eventsvc.Service, oddsService oddssvc.S mongoLogger.Info("Cron jobs started for event and odds services") } -func StartTicketCrons(ticketService ticket.Service, mongoLogger *zap.Logger) { +func StartCleanupCrons(ticketService ticket.Service, notificationSvc *notificationservice.Service, mongoLogger *zap.Logger) { c := cron.New(cron.WithSeconds()) schedule := []struct { @@ -128,6 +129,19 @@ func StartTicketCrons(ticketService ticket.Service, mongoLogger *zap.Logger) { } }, }, + { + spec: "0 0 * * * *", + task: func() { + mongoLogger.Info("Deleting old notifications") + if err := notificationSvc.DeleteOldNotifications(context.Background()); err != nil { + mongoLogger.Error("Failed to remove old notifications", + zap.Error(err), + ) + } else { + mongoLogger.Info("Successfully deleted old notifications") + } + }, + }, } for _, job := range schedule { diff --git a/internal/web_server/handlers/bet_handler.go b/internal/web_server/handlers/bet_handler.go index 8fe7d5f..4550395 100644 --- a/internal/web_server/handlers/bet_handler.go +++ b/internal/web_server/handlers/bet_handler.go @@ -643,7 +643,6 @@ func (h *Handler) GetAllTenantBets(c *fiber.Ctx) error { h.BadRequestLogger().Error("invalid company id", zap.Any("company_id", companyID)) return fiber.NewError(fiber.StatusBadRequest, "invalid company id") } - role := c.Locals("role").(domain.Role) page := c.QueryInt("page", 1) pageSize := c.QueryInt("page_size", 10) @@ -657,7 +656,7 @@ func (h *Handler) GetAllTenantBets(c *fiber.Ctx) error { } var isShopBet domain.ValidBool isShopBetQuery := c.Query("is_shop") - if isShopBetQuery != "" && role == domain.RoleSuperAdmin { + if isShopBetQuery != "" { isShopBetParse, err := strconv.ParseBool(isShopBetQuery) if err != nil { h.mongoLoggerSvc.Info("failed to parse is_shop_bet", @@ -775,6 +774,8 @@ func (h *Handler) GetAllTenantBets(c *fiber.Ctx) error { // @Failure 500 {object} response.APIResponse // @Router /api/v1/sport/bet/{id} [get] func (h *Handler) GetBetByID(c *fiber.Ctx) error { + companyID := c.Locals("company_id").(domain.ValidInt64) + betID := c.Params("id") id, err := strconv.ParseInt(betID, 10, 64) if err != nil { @@ -800,6 +801,15 @@ func (h *Handler) GetBetByID(c *fiber.Ctx) error { res := domain.ConvertBet(bet) + if companyID.Valid && bet.CompanyID != companyID.Value { + h.mongoLoggerSvc.Warn("Warn - Company is trying to access another companies bet", + zap.Int64("betID", id), + zap.Int("status_code", fiber.StatusNotFound), + zap.Error(err), + zap.Time("timestamp", time.Now()), + ) + return fiber.NewError(fiber.StatusNotFound, "Failed to retrieve bet") + } // h.mongoLoggerSvc.Info("Bet retrieved successfully", // zap.Int64("betID", id), // zap.Int("status_code", fiber.StatusOK), diff --git a/internal/web_server/handlers/branch_handler.go b/internal/web_server/handlers/branch_handler.go index d187b21..94a91c6 100644 --- a/internal/web_server/handlers/branch_handler.go +++ b/internal/web_server/handlers/branch_handler.go @@ -611,6 +611,8 @@ func (h *Handler) GetAllBranches(c *fiber.Ctx) error { // @Failure 500 {object} response.APIResponse // @Router /api/v1/search/branch [get] func (h *Handler) SearchBranch(c *fiber.Ctx) error { + companyID := c.Locals("company_id").(domain.ValidInt64) + // Get search query from request searchQuery := c.Query("q") if searchQuery == "" { @@ -622,7 +624,7 @@ func (h *Handler) SearchBranch(c *fiber.Ctx) error { } // Call the service to search for branches - branches, err := h.branchSvc.SearchBranchByName(c.Context(), searchQuery) + branches, err := h.branchSvc.SearchBranchByName(c.Context(), searchQuery, companyID) if err != nil { h.mongoLoggerSvc.Info("Failed to search branches", zap.String("query", searchQuery), diff --git a/internal/web_server/handlers/event_handler.go b/internal/web_server/handlers/event_handler.go index 27fa0cd..bad2fdf 100644 --- a/internal/web_server/handlers/event_handler.go +++ b/internal/web_server/handlers/event_handler.go @@ -270,22 +270,22 @@ func (h *Handler) GetTenantUpcomingEvents(c *fiber.Ctx) error { Valid: searchQuery != "", } - // firstStartTimeQuery := c.Query("first_start_time") - // var firstStartTime domain.ValidTime - // if firstStartTimeQuery != "" { - // firstStartTimeParsed, err := time.Parse(time.RFC3339, firstStartTimeQuery) - // if err != nil { - // h.BadRequestLogger().Info("invalid start_time format", - // zap.String("first_start_time", firstStartTimeQuery), - // zap.Error(err), - // ) - // return fiber.NewError(fiber.StatusBadRequest, "Invalid start_time format") - // } - // firstStartTime = domain.ValidTime{ - // Value: firstStartTimeParsed, - // Valid: true, - // } - // } + firstStartTimeQuery := c.Query("first_start_time") + var firstStartTime domain.ValidTime + if firstStartTimeQuery != "" { + firstStartTimeParsed, err := time.Parse(time.RFC3339, firstStartTimeQuery) + if err != nil { + h.BadRequestLogger().Info("invalid start_time format", + zap.String("first_start_time", firstStartTimeQuery), + zap.Error(err), + ) + return fiber.NewError(fiber.StatusBadRequest, "Invalid start_time format") + } + firstStartTime = domain.ValidTime{ + Value: firstStartTimeParsed, + Valid: true, + } + } lastStartTimeQuery := c.Query("last_start_time") var lastStartTime domain.ValidTime @@ -330,18 +330,15 @@ func (h *Handler) GetTenantUpcomingEvents(c *fiber.Ctx) error { events, total, err := h.eventSvc.GetEventsWithSettings( c.Context(), companyID.Value, domain.EventFilter{ - SportID: sportID, - LeagueID: leagueID, - Query: searchString, - FirstStartTime: domain.ValidTime{ - Value: time.Now(), - Valid: true, - }, - LastStartTime: lastStartTime, - Limit: limit, - Offset: offset, - CountryCode: countryCode, - Featured: isFeatured, + SportID: sportID, + LeagueID: leagueID, + Query: searchString, + FirstStartTime: firstStartTime, + LastStartTime: lastStartTime, + Limit: limit, + Offset: offset, + CountryCode: countryCode, + Featured: isFeatured, Status: domain.ValidEventStatus{ Value: domain.STATUS_PENDING, Valid: true, diff --git a/internal/web_server/routes.go b/internal/web_server/routes.go index df3eb90..8c1a58a 100644 --- a/internal/web_server/routes.go +++ b/internal/web_server/routes.go @@ -348,8 +348,8 @@ func (a *App) initAppRoutes() { tenant.Patch("/sport/bet/:id", a.authMiddleware, h.UpdateCashOut) tenant.Delete("/sport/bet/:id", a.authMiddleware, h.DeleteTenantBet) + groupV1.Get("/sport/bet/:id", a.authMiddleware, a.CompanyOnly, h.GetBetByID) groupV1.Get("/sport/bet", a.authMiddleware, a.SuperAdminOnly, h.GetAllBet) - groupV1.Get("/sport/bet/:id", a.authMiddleware, a.SuperAdminOnly, h.GetBetByID) groupV1.Delete("/sport/bet/:id", a.authMiddleware, a.SuperAdminOnly, h.DeleteBet) tenant.Post("/sport/random/bet", a.authMiddleware, h.RandomBet) diff --git a/makefile b/makefile index 83e8ba1..ac8ec7b 100644 --- a/makefile +++ b/makefile @@ -79,7 +79,7 @@ logs: @mkdir -p logs db-up: | logs @mkdir -p logs - @docker compose up -d postgres migrate mongo redis kafka + @docker compose up -d postgres migrate mongo @docker logs fortunebet-backend-postgres-1 > logs/postgres.log 2>&1 & .PHONY: db-down db-down: