fix: removed kafka and redis and added integration changes

This commit is contained in:
Samuel Tariku 2025-10-08 12:46:50 +03:00
parent 0121b31838
commit 80129828e0
65 changed files with 737 additions and 566 deletions

View File

@ -41,7 +41,6 @@ import (
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/event" "github.com/SamuelTariku/FortuneBet-Backend/internal/services/event"
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/institutions" "github.com/SamuelTariku/FortuneBet-Backend/internal/services/institutions"
issuereporting "github.com/SamuelTariku/FortuneBet-Backend/internal/services/issue_reporting" issuereporting "github.com/SamuelTariku/FortuneBet-Backend/internal/services/issue_reporting"
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/kafka"
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/league" "github.com/SamuelTariku/FortuneBet-Backend/internal/services/league"
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/messenger" "github.com/SamuelTariku/FortuneBet-Backend/internal/services/messenger"
notificationservice "github.com/SamuelTariku/FortuneBet-Backend/internal/services/notification" notificationservice "github.com/SamuelTariku/FortuneBet-Backend/internal/services/notification"
@ -122,10 +121,10 @@ func main() {
// var userStore user.UserStore // var userStore user.UserStore
// Initialize producer // Initialize producer
topic := "wallet-balance-topic" // topic := "wallet-balance-topic"
producer := kafka.NewProducer(cfg.KafkaBrokers, topic) // producer := kafka.NewProducer(cfg.KafkaBrokers, topic)
notificationSvc := notificationservice.New(notificationRepo, domain.MongoDBLogger, logger, cfg, messengerSvc, userSvc, cfg.KafkaBrokers) notificationSvc := notificationservice.New(notificationRepo, domain.MongoDBLogger, logger, cfg, messengerSvc, userSvc)
walletSvc := wallet.NewService( walletSvc := wallet.NewService(
wallet.WalletStore(store), wallet.WalletStore(store),
@ -135,7 +134,6 @@ func main() {
userSvc, userSvc,
domain.MongoDBLogger, domain.MongoDBLogger,
logger, logger,
producer,
) )
branchSvc := branch.NewService(store) branchSvc := branch.NewService(store)
@ -239,7 +237,7 @@ func main() {
go walletMonitorSvc.Start() go walletMonitorSvc.Start()
httpserver.StartDataFetchingCrons(eventSvc, *oddsSvc, resultSvc, domain.MongoDBLogger) httpserver.StartDataFetchingCrons(eventSvc, *oddsSvc, resultSvc, domain.MongoDBLogger)
httpserver.StartTicketCrons(*ticketSvc, domain.MongoDBLogger) httpserver.StartCleanupCrons(*ticketSvc, notificationSvc, domain.MongoDBLogger)
issueReportingRepo := repository.NewReportedIssueRepository(store) issueReportingRepo := repository.NewReportedIssueRepository(store)

View File

@ -37,6 +37,8 @@ CREATE TABLE IF NOT EXISTS notifications (
priority INTEGER, priority INTEGER,
version INTEGER NOT NULL DEFAULT 0, version INTEGER NOT NULL DEFAULT 0,
timestamp TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP, timestamp TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
img TEXT,
expires TIMESTAMPTZ NOT NULL,
metadata JSONB metadata JSONB
); );
CREATE TABLE IF NOT EXISTS wallet_threshold_notifications ( CREATE TABLE IF NOT EXISTS wallet_threshold_notifications (

View File

@ -65,7 +65,11 @@ WHERE branch_manager_id = $1;
-- name: SearchBranchByName :many -- name: SearchBranchByName :many
SELECT * SELECT *
FROM branch_details FROM branch_details
WHERE name ILIKE '%' || $1 || '%'; WHERE name ILIKE '%' || $1 || '%'
AND (
company_id = sqlc.narg('company_id')
OR sqlc.narg('company_id') IS NULL
);
-- name: GetAllSupportedOperations :many -- name: GetAllSupportedOperations :many
SELECT * SELECT *
FROM supported_operations; FROM supported_operations;

View File

@ -12,6 +12,8 @@ INSERT INTO notifications (
payload, payload,
priority, priority,
timestamp, timestamp,
expires,
img,
metadata metadata
) )
VALUES ( VALUES (
@ -27,7 +29,9 @@ VALUES (
$10, $10,
$11, $11,
$12, $12,
$13 $13,
$14,
$15
) )
RETURNING *; RETURNING *;
-- name: GetNotification :one -- name: GetNotification :one
@ -88,4 +92,8 @@ SELECT COUNT(*) as total,
WHEN is_read = false THEN 1 WHEN is_read = false THEN 1
END END
) as unread ) as unread
FROM notifications; FROM notifications;
-- name: DeleteOldNotifications :exec
DELETE FROM notifications
WHERE expires < now();

View File

@ -54,46 +54,17 @@ services:
] ]
networks: networks:
- app - app
redis: # redis:
image: redis:7-alpine # image: redis:7-alpine
ports: # ports:
- "6379:6379" # - "6379:6379"
networks: # networks:
- app # - app
healthcheck: # healthcheck:
test: ["CMD", "redis-cli", "ping"] # test: ["CMD", "redis-cli", "ping"]
interval: 10s # interval: 10s
timeout: 5s # timeout: 5s
retries: 5 # retries: 5
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
networks:
- app
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:29092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ports:
- "9092:9092"
- "29092:29092"
networks:
- app
app: app:
build: build:
context: . context: .
@ -104,15 +75,9 @@ services:
environment: environment:
- DB_URL=postgresql://root:secret@postgres:5432/gh?sslmode=disable - DB_URL=postgresql://root:secret@postgres:5432/gh?sslmode=disable
- MONGO_URI=mongodb://root:secret@mongo:27017 - MONGO_URI=mongodb://root:secret@mongo:27017
- REDIS_ADDR=redis:6379
- KAFKA_BROKERS=kafka:9092
depends_on: depends_on:
migrate: migrate:
condition: service_completed_successfully condition: service_completed_successfully
mongo:
condition: service_healthy
redis:
condition: service_healthy
networks: networks:
- app - app
command: ["/app/bin/web"] command: ["/app/bin/web"]

View File

@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT. // Code generated by sqlc. DO NOT EDIT.
// versions: // versions:
// sqlc v1.30.0 // sqlc v1.29.0
// source: auth.sql // source: auth.sql
package dbgen package dbgen

View File

@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT. // Code generated by sqlc. DO NOT EDIT.
// versions: // versions:
// sqlc v1.30.0 // sqlc v1.29.0
// source: bet.sql // source: bet.sql
package dbgen package dbgen

View File

@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT. // Code generated by sqlc. DO NOT EDIT.
// versions: // versions:
// sqlc v1.30.0 // sqlc v1.29.0
// source: bet_stat.sql // source: bet_stat.sql
package dbgen package dbgen

View File

@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT. // Code generated by sqlc. DO NOT EDIT.
// versions: // versions:
// sqlc v1.30.0 // sqlc v1.29.0
// source: bonus.sql // source: bonus.sql
package dbgen package dbgen

View File

@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT. // Code generated by sqlc. DO NOT EDIT.
// versions: // versions:
// sqlc v1.30.0 // sqlc v1.29.0
// source: branch.sql // source: branch.sql
package dbgen package dbgen
@ -455,10 +455,19 @@ const SearchBranchByName = `-- name: SearchBranchByName :many
SELECT id, name, location, profit_percent, is_active, wallet_id, branch_manager_id, company_id, is_self_owned, created_at, updated_at, manager_name, manager_phone_number, balance, wallet_is_active SELECT id, name, location, profit_percent, is_active, wallet_id, branch_manager_id, company_id, is_self_owned, created_at, updated_at, manager_name, manager_phone_number, balance, wallet_is_active
FROM branch_details FROM branch_details
WHERE name ILIKE '%' || $1 || '%' WHERE name ILIKE '%' || $1 || '%'
AND (
company_id = $2
OR $2 IS NULL
)
` `
func (q *Queries) SearchBranchByName(ctx context.Context, dollar_1 pgtype.Text) ([]BranchDetail, error) { type SearchBranchByNameParams struct {
rows, err := q.db.Query(ctx, SearchBranchByName, dollar_1) Column1 pgtype.Text `json:"column_1"`
CompanyID pgtype.Int8 `json:"company_id"`
}
func (q *Queries) SearchBranchByName(ctx context.Context, arg SearchBranchByNameParams) ([]BranchDetail, error) {
rows, err := q.db.Query(ctx, SearchBranchByName, arg.Column1, arg.CompanyID)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT. // Code generated by sqlc. DO NOT EDIT.
// versions: // versions:
// sqlc v1.30.0 // sqlc v1.29.0
// source: cashier.sql // source: cashier.sql
package dbgen package dbgen

View File

@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT. // Code generated by sqlc. DO NOT EDIT.
// versions: // versions:
// sqlc v1.30.0 // sqlc v1.29.0
// source: company.sql // source: company.sql
package dbgen package dbgen

View File

@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT. // Code generated by sqlc. DO NOT EDIT.
// versions: // versions:
// sqlc v1.30.0 // sqlc v1.29.0
// source: copyfrom.go // source: copyfrom.go
package dbgen package dbgen

View File

@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT. // Code generated by sqlc. DO NOT EDIT.
// versions: // versions:
// sqlc v1.30.0 // sqlc v1.29.0
package dbgen package dbgen

View File

@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT. // Code generated by sqlc. DO NOT EDIT.
// versions: // versions:
// sqlc v1.30.0 // sqlc v1.29.0
// source: direct_deposit.sql // source: direct_deposit.sql
package dbgen package dbgen

View File

@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT. // Code generated by sqlc. DO NOT EDIT.
// versions: // versions:
// sqlc v1.30.0 // sqlc v1.29.0
// source: disabled_odds.sql // source: disabled_odds.sql
package dbgen package dbgen

View File

@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT. // Code generated by sqlc. DO NOT EDIT.
// versions: // versions:
// sqlc v1.30.0 // sqlc v1.29.0
// source: enet_pulse.sql // source: enet_pulse.sql
package dbgen package dbgen

View File

@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT. // Code generated by sqlc. DO NOT EDIT.
// versions: // versions:
// sqlc v1.30.0 // sqlc v1.29.0
// source: event_history.sql // source: event_history.sql
package dbgen package dbgen

View File

@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT. // Code generated by sqlc. DO NOT EDIT.
// versions: // versions:
// sqlc v1.30.0 // sqlc v1.29.0
// source: events.sql // source: events.sql
package dbgen package dbgen

View File

@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT. // Code generated by sqlc. DO NOT EDIT.
// versions: // versions:
// sqlc v1.30.0 // sqlc v1.29.0
// source: events_stat.sql // source: events_stat.sql
package dbgen package dbgen

View File

@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT. // Code generated by sqlc. DO NOT EDIT.
// versions: // versions:
// sqlc v1.30.0 // sqlc v1.29.0
// source: flags.sql // source: flags.sql
package dbgen package dbgen

View File

@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT. // Code generated by sqlc. DO NOT EDIT.
// versions: // versions:
// sqlc v1.30.0 // sqlc v1.29.0
// source: institutions.sql // source: institutions.sql
package dbgen package dbgen

View File

@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT. // Code generated by sqlc. DO NOT EDIT.
// versions: // versions:
// sqlc v1.30.0 // sqlc v1.29.0
// source: issue_reporting.sql // source: issue_reporting.sql
package dbgen package dbgen

View File

@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT. // Code generated by sqlc. DO NOT EDIT.
// versions: // versions:
// sqlc v1.30.0 // sqlc v1.29.0
// source: leagues.sql // source: leagues.sql
package dbgen package dbgen

View File

@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT. // Code generated by sqlc. DO NOT EDIT.
// versions: // versions:
// sqlc v1.30.0 // sqlc v1.29.0
// source: location.sql // source: location.sql
package dbgen package dbgen

View File

@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT. // Code generated by sqlc. DO NOT EDIT.
// versions: // versions:
// sqlc v1.30.0 // sqlc v1.29.0
package dbgen package dbgen
@ -482,6 +482,8 @@ type Notification struct {
Priority pgtype.Int4 `json:"priority"` Priority pgtype.Int4 `json:"priority"`
Version int32 `json:"version"` Version int32 `json:"version"`
Timestamp pgtype.Timestamptz `json:"timestamp"` Timestamp pgtype.Timestamptz `json:"timestamp"`
Img pgtype.Text `json:"img"`
Expires pgtype.Timestamptz `json:"expires"`
Metadata []byte `json:"metadata"` Metadata []byte `json:"metadata"`
} }

View File

@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT. // Code generated by sqlc. DO NOT EDIT.
// versions: // versions:
// sqlc v1.30.0 // sqlc v1.29.0
// source: monitor.sql // source: monitor.sql
package dbgen package dbgen

View File

@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT. // Code generated by sqlc. DO NOT EDIT.
// versions: // versions:
// sqlc v1.30.0 // sqlc v1.29.0
// source: notification.sql // source: notification.sql
package dbgen package dbgen
@ -39,6 +39,8 @@ INSERT INTO notifications (
payload, payload,
priority, priority,
timestamp, timestamp,
expires,
img,
metadata metadata
) )
VALUES ( VALUES (
@ -54,9 +56,11 @@ VALUES (
$10, $10,
$11, $11,
$12, $12,
$13 $13,
$14,
$15
) )
RETURNING id, recipient_id, type, level, error_severity, reciever, is_read, delivery_status, delivery_channel, payload, priority, version, timestamp, metadata RETURNING id, recipient_id, type, level, error_severity, reciever, is_read, delivery_status, delivery_channel, payload, priority, version, timestamp, img, expires, metadata
` `
type CreateNotificationParams struct { type CreateNotificationParams struct {
@ -72,6 +76,8 @@ type CreateNotificationParams struct {
Payload []byte `json:"payload"` Payload []byte `json:"payload"`
Priority pgtype.Int4 `json:"priority"` Priority pgtype.Int4 `json:"priority"`
Timestamp pgtype.Timestamptz `json:"timestamp"` Timestamp pgtype.Timestamptz `json:"timestamp"`
Expires pgtype.Timestamptz `json:"expires"`
Img pgtype.Text `json:"img"`
Metadata []byte `json:"metadata"` Metadata []byte `json:"metadata"`
} }
@ -89,6 +95,8 @@ func (q *Queries) CreateNotification(ctx context.Context, arg CreateNotification
arg.Payload, arg.Payload,
arg.Priority, arg.Priority,
arg.Timestamp, arg.Timestamp,
arg.Expires,
arg.Img,
arg.Metadata, arg.Metadata,
) )
var i Notification var i Notification
@ -106,13 +114,25 @@ func (q *Queries) CreateNotification(ctx context.Context, arg CreateNotification
&i.Priority, &i.Priority,
&i.Version, &i.Version,
&i.Timestamp, &i.Timestamp,
&i.Img,
&i.Expires,
&i.Metadata, &i.Metadata,
) )
return i, err return i, err
} }
const DeleteOldNotifications = `-- name: DeleteOldNotifications :exec
DELETE FROM notifications
WHERE expires < now()
`
func (q *Queries) DeleteOldNotifications(ctx context.Context) error {
_, err := q.db.Exec(ctx, DeleteOldNotifications)
return err
}
const GetAllNotifications = `-- name: GetAllNotifications :many const GetAllNotifications = `-- name: GetAllNotifications :many
SELECT id, recipient_id, type, level, error_severity, reciever, is_read, delivery_status, delivery_channel, payload, priority, version, timestamp, metadata SELECT id, recipient_id, type, level, error_severity, reciever, is_read, delivery_status, delivery_channel, payload, priority, version, timestamp, img, expires, metadata
FROM notifications FROM notifications
ORDER BY timestamp DESC ORDER BY timestamp DESC
LIMIT $1 OFFSET $2 LIMIT $1 OFFSET $2
@ -146,6 +166,8 @@ func (q *Queries) GetAllNotifications(ctx context.Context, arg GetAllNotificatio
&i.Priority, &i.Priority,
&i.Version, &i.Version,
&i.Timestamp, &i.Timestamp,
&i.Img,
&i.Expires,
&i.Metadata, &i.Metadata,
); err != nil { ); err != nil {
return nil, err return nil, err
@ -159,7 +181,7 @@ func (q *Queries) GetAllNotifications(ctx context.Context, arg GetAllNotificatio
} }
const GetNotification = `-- name: GetNotification :one const GetNotification = `-- name: GetNotification :one
SELECT id, recipient_id, type, level, error_severity, reciever, is_read, delivery_status, delivery_channel, payload, priority, version, timestamp, metadata SELECT id, recipient_id, type, level, error_severity, reciever, is_read, delivery_status, delivery_channel, payload, priority, version, timestamp, img, expires, metadata
FROM notifications FROM notifications
WHERE id = $1 WHERE id = $1
LIMIT 1 LIMIT 1
@ -182,6 +204,8 @@ func (q *Queries) GetNotification(ctx context.Context, id string) (Notification,
&i.Priority, &i.Priority,
&i.Version, &i.Version,
&i.Timestamp, &i.Timestamp,
&i.Img,
&i.Expires,
&i.Metadata, &i.Metadata,
) )
return i, err return i, err
@ -254,7 +278,7 @@ func (q *Queries) GetUserNotificationCount(ctx context.Context, recipientID int6
} }
const GetUserNotifications = `-- name: GetUserNotifications :many const GetUserNotifications = `-- name: GetUserNotifications :many
SELECT id, recipient_id, type, level, error_severity, reciever, is_read, delivery_status, delivery_channel, payload, priority, version, timestamp, metadata SELECT id, recipient_id, type, level, error_severity, reciever, is_read, delivery_status, delivery_channel, payload, priority, version, timestamp, img, expires, metadata
FROM notifications FROM notifications
WHERE recipient_id = $1 WHERE recipient_id = $1
ORDER BY timestamp DESC ORDER BY timestamp DESC
@ -290,6 +314,8 @@ func (q *Queries) GetUserNotifications(ctx context.Context, arg GetUserNotificat
&i.Priority, &i.Priority,
&i.Version, &i.Version,
&i.Timestamp, &i.Timestamp,
&i.Img,
&i.Expires,
&i.Metadata, &i.Metadata,
); err != nil { ); err != nil {
return nil, err return nil, err
@ -303,7 +329,7 @@ func (q *Queries) GetUserNotifications(ctx context.Context, arg GetUserNotificat
} }
const ListFailedNotifications = `-- name: ListFailedNotifications :many const ListFailedNotifications = `-- name: ListFailedNotifications :many
SELECT id, recipient_id, type, level, error_severity, reciever, is_read, delivery_status, delivery_channel, payload, priority, version, timestamp, metadata SELECT id, recipient_id, type, level, error_severity, reciever, is_read, delivery_status, delivery_channel, payload, priority, version, timestamp, img, expires, metadata
FROM notifications FROM notifications
WHERE delivery_status = 'failed' WHERE delivery_status = 'failed'
AND timestamp < NOW() - INTERVAL '1 hour' AND timestamp < NOW() - INTERVAL '1 hour'
@ -334,6 +360,8 @@ func (q *Queries) ListFailedNotifications(ctx context.Context, limit int32) ([]N
&i.Priority, &i.Priority,
&i.Version, &i.Version,
&i.Timestamp, &i.Timestamp,
&i.Img,
&i.Expires,
&i.Metadata, &i.Metadata,
); err != nil { ); err != nil {
return nil, err return nil, err
@ -378,7 +406,7 @@ SET delivery_status = $2,
is_read = $3, is_read = $3,
metadata = $4 metadata = $4
WHERE id = $1 WHERE id = $1
RETURNING id, recipient_id, type, level, error_severity, reciever, is_read, delivery_status, delivery_channel, payload, priority, version, timestamp, metadata RETURNING id, recipient_id, type, level, error_severity, reciever, is_read, delivery_status, delivery_channel, payload, priority, version, timestamp, img, expires, metadata
` `
type UpdateNotificationStatusParams struct { type UpdateNotificationStatusParams struct {
@ -410,6 +438,8 @@ func (q *Queries) UpdateNotificationStatus(ctx context.Context, arg UpdateNotifi
&i.Priority, &i.Priority,
&i.Version, &i.Version,
&i.Timestamp, &i.Timestamp,
&i.Img,
&i.Expires,
&i.Metadata, &i.Metadata,
) )
return i, err return i, err

View File

@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT. // Code generated by sqlc. DO NOT EDIT.
// versions: // versions:
// sqlc v1.30.0 // sqlc v1.29.0
// source: odd_history.sql // source: odd_history.sql
package dbgen package dbgen

View File

@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT. // Code generated by sqlc. DO NOT EDIT.
// versions: // versions:
// sqlc v1.30.0 // sqlc v1.29.0
// source: odds.sql // source: odds.sql
package dbgen package dbgen

View File

@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT. // Code generated by sqlc. DO NOT EDIT.
// versions: // versions:
// sqlc v1.30.0 // sqlc v1.29.0
// source: otp.sql // source: otp.sql
package dbgen package dbgen

View File

@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT. // Code generated by sqlc. DO NOT EDIT.
// versions: // versions:
// sqlc v1.30.0 // sqlc v1.29.0
// source: raffle.sql // source: raffle.sql
package dbgen package dbgen

View File

@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT. // Code generated by sqlc. DO NOT EDIT.
// versions: // versions:
// sqlc v1.30.0 // sqlc v1.29.0
// source: referal.sql // source: referal.sql
package dbgen package dbgen

View File

@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT. // Code generated by sqlc. DO NOT EDIT.
// versions: // versions:
// sqlc v1.30.0 // sqlc v1.29.0
// source: report.sql // source: report.sql
package dbgen package dbgen

View File

@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT. // Code generated by sqlc. DO NOT EDIT.
// versions: // versions:
// sqlc v1.30.0 // sqlc v1.29.0
// source: result.sql // source: result.sql
package dbgen package dbgen

View File

@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT. // Code generated by sqlc. DO NOT EDIT.
// versions: // versions:
// sqlc v1.30.0 // sqlc v1.29.0
// source: result_log.sql // source: result_log.sql
package dbgen package dbgen

View File

@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT. // Code generated by sqlc. DO NOT EDIT.
// versions: // versions:
// sqlc v1.30.0 // sqlc v1.29.0
// source: settings.sql // source: settings.sql
package dbgen package dbgen

View File

@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT. // Code generated by sqlc. DO NOT EDIT.
// versions: // versions:
// sqlc v1.30.0 // sqlc v1.29.0
// source: shop_transactions.sql // source: shop_transactions.sql
package dbgen package dbgen

View File

@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT. // Code generated by sqlc. DO NOT EDIT.
// versions: // versions:
// sqlc v1.30.0 // sqlc v1.29.0
// source: ticket.sql // source: ticket.sql
package dbgen package dbgen

View File

@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT. // Code generated by sqlc. DO NOT EDIT.
// versions: // versions:
// sqlc v1.30.0 // sqlc v1.29.0
// source: transfer.sql // source: transfer.sql
package dbgen package dbgen

View File

@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT. // Code generated by sqlc. DO NOT EDIT.
// versions: // versions:
// sqlc v1.30.0 // sqlc v1.29.0
// source: user.sql // source: user.sql
package dbgen package dbgen

View File

@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT. // Code generated by sqlc. DO NOT EDIT.
// versions: // versions:
// sqlc v1.30.0 // sqlc v1.29.0
// source: virtual_games.sql // source: virtual_games.sql
package dbgen package dbgen

View File

@ -1,6 +1,6 @@
// Code generated by sqlc. DO NOT EDIT. // Code generated by sqlc. DO NOT EDIT.
// versions: // versions:
// sqlc v1.30.0 // sqlc v1.29.0
// source: wallet.sql // source: wallet.sql
package dbgen package dbgen

View File

@ -25,49 +25,83 @@ func (m Currency) String() string {
return fmt.Sprintf("$%.2f", m.Float32()) return fmt.Sprintf("$%.2f", m.Float32())
} }
// TODO: Change the currency to this format when implementing multi-currency
// type Currency struct {
// Value int64
// Type IntCurrency
// }
// // ToCurrency converts a float32 (like 12.34) into Currency (stored in cents).
// func NewCurrency(f float32, currencyType IntCurrency) Currency {
// cents := math.Round(float64(f) * 100) // avoid float32 precision issues
// return Currency{
// Value: int64(cents),
// Type: currencyType,
// }
// }
// func NewBase(v int64) Currency {
// return Currency{
// Value: v,
// Type: BASE,
// }
// }
// // Float32 converts a Currency back into float32 (like 12.34).
// func (m Currency) Float32() float32 {
// return float32(m.Value) / 100
// }
// // String returns a formatted Currency value for display.
// func (m Currency) String() string {
// return fmt.Sprintf("$%.2f", m.Float32())
// }
type IntCurrency string type IntCurrency string
const ( const (
ETB IntCurrency = "ETB" // Ethiopian Birr BASE IntCurrency = "BASE"
NGN IntCurrency = "NGN" // Nigerian Naira ETB IntCurrency = "ETB" // Ethiopian Birr
ZAR IntCurrency = "ZAR" // South African Rand NGN IntCurrency = "NGN" // Nigerian Naira
EGP IntCurrency = "EGP" // Egyptian Pound ZAR IntCurrency = "ZAR" // South African Rand
KES IntCurrency = "KES" // Kenyan Shilling EGP IntCurrency = "EGP" // Egyptian Pound
UGX IntCurrency = "UGX" // Ugandan Shilling KES IntCurrency = "KES" // Kenyan Shilling
TZS IntCurrency = "TZS" // Tanzanian Shilling UGX IntCurrency = "UGX" // Ugandan Shilling
RWF IntCurrency = "RWF" // Rwandan Franc TZS IntCurrency = "TZS" // Tanzanian Shilling
BIF IntCurrency = "BIF" // Burundian Franc RWF IntCurrency = "RWF" // Rwandan Franc
XOF IntCurrency = "XOF" // West African CFA Franc (BCEAO) BIF IntCurrency = "BIF" // Burundian Franc
XAF IntCurrency = "XAF" // Central African CFA Franc (BEAC) XOF IntCurrency = "XOF" // West African CFA Franc (BCEAO)
GHS IntCurrency = "GHS" // Ghanaian Cedi XAF IntCurrency = "XAF" // Central African CFA Franc (BEAC)
SDG IntCurrency = "SDG" // Sudanese Pound GHS IntCurrency = "GHS" // Ghanaian Cedi
SSP IntCurrency = "SSP" // South Sudanese Pound SDG IntCurrency = "SDG" // Sudanese Pound
DZD IntCurrency = "DZD" // Algerian Dinar SSP IntCurrency = "SSP" // South Sudanese Pound
MAD IntCurrency = "MAD" // Moroccan Dirham DZD IntCurrency = "DZD" // Algerian Dinar
TND IntCurrency = "TND" // Tunisian Dinar MAD IntCurrency = "MAD" // Moroccan Dirham
LYD IntCurrency = "LYD" // Libyan Dinar TND IntCurrency = "TND" // Tunisian Dinar
MZN IntCurrency = "MZN" // Mozambican Metical LYD IntCurrency = "LYD" // Libyan Dinar
AOA IntCurrency = "AOA" // Angolan Kwanza MZN IntCurrency = "MZN" // Mozambican Metical
BWP IntCurrency = "BWP" // Botswana Pula AOA IntCurrency = "AOA" // Angolan Kwanza
ZMW IntCurrency = "ZMW" // Zambian Kwacha BWP IntCurrency = "BWP" // Botswana Pula
MWK IntCurrency = "MWK" // Malawian Kwacha ZMW IntCurrency = "ZMW" // Zambian Kwacha
LSL IntCurrency = "LSL" // Lesotho Loti MWK IntCurrency = "MWK" // Malawian Kwacha
NAD IntCurrency = "NAD" // Namibian Dollar LSL IntCurrency = "LSL" // Lesotho Loti
SZL IntCurrency = "SZL" // Swazi Lilangeni NAD IntCurrency = "NAD" // Namibian Dollar
CVE IntCurrency = "CVE" // Cape Verdean Escudo SZL IntCurrency = "SZL" // Swazi Lilangeni
GMD IntCurrency = "GMD" // Gambian Dalasi CVE IntCurrency = "CVE" // Cape Verdean Escudo
SLL IntCurrency = "SLL" // Sierra Leonean Leone GMD IntCurrency = "GMD" // Gambian Dalasi
LRD IntCurrency = "LRD" // Liberian Dollar SLL IntCurrency = "SLL" // Sierra Leonean Leone
GNF IntCurrency = "GNF" // Guinean Franc LRD IntCurrency = "LRD" // Liberian Dollar
XCD IntCurrency = "XCD" // Eastern Caribbean Dollar (used in Saint Lucia) GNF IntCurrency = "GNF" // Guinean Franc
MRU IntCurrency = "MRU" // Mauritanian Ouguiya XCD IntCurrency = "XCD" // Eastern Caribbean Dollar (used in Saint Lucia)
KMF IntCurrency = "KMF" // Comorian Franc MRU IntCurrency = "MRU" // Mauritanian Ouguiya
DJF IntCurrency = "DJF" // Djiboutian Franc KMF IntCurrency = "KMF" // Comorian Franc
SOS IntCurrency = "SOS" // Somali Shilling DJF IntCurrency = "DJF" // Djiboutian Franc
ERN IntCurrency = "ERN" // Eritrean Nakfa SOS IntCurrency = "SOS" // Somali Shilling
MGA IntCurrency = "MGA" // Malagasy Ariary ERN IntCurrency = "ERN" // Eritrean Nakfa
SCR IntCurrency = "SCR" // Seychellois Rupee MGA IntCurrency = "MGA" // Malagasy Ariary
MUR IntCurrency = "MUR" // Mauritian Rupee SCR IntCurrency = "SCR" // Seychellois Rupee
MUR IntCurrency = "MUR" // Mauritian Rupee
// International currencies (already listed) // International currencies (already listed)
USD IntCurrency = "USD" // US Dollar USD IntCurrency = "USD" // US Dollar

View File

@ -84,6 +84,8 @@ type Notification struct {
Priority int `json:"priority,omitempty"` Priority int `json:"priority,omitempty"`
Version int `json:"-"` Version int `json:"-"`
Timestamp time.Time `json:"timestamp"` Timestamp time.Time `json:"timestamp"`
Expires time.Time `json:"expires"`
Image string `json:"image"`
Metadata json.RawMessage `json:"metadata,omitempty"` Metadata json.RawMessage `json:"metadata,omitempty"`
} }
type CreateNotification struct { type CreateNotification struct {
@ -97,6 +99,8 @@ type CreateNotification struct {
DeliveryChannel DeliveryChannel `json:"delivery_channel,omitempty"` DeliveryChannel DeliveryChannel `json:"delivery_channel,omitempty"`
Payload NotificationPayload `json:"payload"` Payload NotificationPayload `json:"payload"`
Priority int `json:"priority,omitempty"` Priority int `json:"priority,omitempty"`
Expires time.Time `json:"expires"`
Image string `json:"image,omitempty"`
Metadata json.RawMessage `json:"metadata,omitempty"` Metadata json.RawMessage `json:"metadata,omitempty"`
} }

View File

@ -87,7 +87,7 @@ type ShopBetRes struct {
BetID int64 `json:"bet_id" example:"1"` BetID int64 `json:"bet_id" example:"1"`
NumberOfOutcomes int64 `json:"number_of_outcomes" example:"1"` NumberOfOutcomes int64 `json:"number_of_outcomes" example:"1"`
Status OutcomeStatus `json:"status" example:"1"` Status OutcomeStatus `json:"status" example:"1"`
Amount Currency `json:"amount"` Amount float32 `json:"amount"`
Outcomes []BetOutcome `json:"outcomes"` Outcomes []BetOutcome `json:"outcomes"`
TransactionVerified bool `json:"transaction_verified" example:"true"` TransactionVerified bool `json:"transaction_verified" example:"true"`
UpdatedAt time.Time `json:"updated_at" example:"2025-04-08T12:00:00Z"` UpdatedAt time.Time `json:"updated_at" example:"2025-04-08T12:00:00Z"`
@ -119,7 +119,7 @@ func ConvertShopBetDetail(shopBet ShopBetDetail) ShopBetRes {
BetID: shopBet.BetID, BetID: shopBet.BetID,
NumberOfOutcomes: shopBet.NumberOfOutcomes, NumberOfOutcomes: shopBet.NumberOfOutcomes,
Status: shopBet.Status, Status: shopBet.Status,
Amount: shopBet.Amount, Amount: shopBet.Amount.Float32(),
Outcomes: shopBet.Outcomes, Outcomes: shopBet.Outcomes,
TransactionVerified: shopBet.TransactionVerified, TransactionVerified: shopBet.TransactionVerified,
UpdatedAt: shopBet.UpdatedAt, UpdatedAt: shopBet.UpdatedAt,

View File

@ -68,8 +68,11 @@ func (s *Store) GetAllBranches(ctx context.Context, filter domain.BranchFilter)
return branches, nil return branches, nil
} }
func (s *Store) SearchBranchByName(ctx context.Context, name string) ([]domain.BranchDetail, error) { func (s *Store) SearchBranchByName(ctx context.Context, name string, companyID domain.ValidInt64) ([]domain.BranchDetail, error) {
dbBranches, err := s.queries.SearchBranchByName(ctx, pgtype.Text{String: name, Valid: true}) dbBranches, err := s.queries.SearchBranchByName(ctx, dbgen.SearchBranchByNameParams{
Column1: pgtype.Text{String: name, Valid: true},
CompanyID: companyID.ToPG(),
})
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -14,12 +14,13 @@ import (
type NotificationRepository interface { type NotificationRepository interface {
CreateNotification(ctx context.Context, notification *domain.Notification) (*domain.Notification, error) CreateNotification(ctx context.Context, notification *domain.Notification) (*domain.Notification, error)
UpdateNotificationStatus(ctx context.Context, id, status string, isRead bool, metadata []byte) (*domain.Notification, error) UpdateNotificationStatus(ctx context.Context, id, status string, isRead bool, metadata []byte) (*domain.Notification, error)
GetUserNotifications(ctx context.Context, recipientID int64, limit, offset int) ([]domain.Notification, int64, error) GetUserNotifications(ctx context.Context, recipientID int64, limit, offset int) ([]domain.Notification, int64, error)
ListFailedNotifications(ctx context.Context, limit int) ([]domain.Notification, error) ListFailedNotifications(ctx context.Context, limit int) ([]domain.Notification, error)
ListRecipientIDs(ctx context.Context, receiver domain.NotificationRecieverSide) ([]int64, error) ListRecipientIDs(ctx context.Context, receiver domain.NotificationRecieverSide) ([]int64, error)
CountUnreadNotifications(ctx context.Context, recipient_id int64) (int64, error) CountUnreadNotifications(ctx context.Context, recipient_id int64) (int64, error)
GetAllNotifications(ctx context.Context, limit, offset int) ([]domain.Notification, error) GetAllNotifications(ctx context.Context, limit, offset int) ([]domain.Notification, error)
GetNotificationCounts(ctx context.Context, filter domain.ReportFilter) (total, read, unread int64, err error) GetNotificationCounts(ctx context.Context, filter domain.ReportFilter) (total, read, unread int64, err error)
DeleteOldNotifications(ctx context.Context) error
} }
type Repository struct { type Repository struct {
@ -69,6 +70,8 @@ func (r *Repository) CreateNotification(ctx context.Context, notification *domai
Payload: marshalPayload(notification.Payload), Payload: marshalPayload(notification.Payload),
Priority: priority, Priority: priority,
Timestamp: pgtype.Timestamptz{Time: notification.Timestamp, Valid: true}, Timestamp: pgtype.Timestamptz{Time: notification.Timestamp, Valid: true},
Expires: pgtype.Timestamptz{Time: notification.Expires, Valid: true},
Img: pgtype.Text{String: notification.Image, Valid: notification.Image != ""},
Metadata: notification.Metadata, Metadata: notification.Metadata,
} }
@ -113,7 +116,7 @@ func (r *Repository) GetUserNotifications(ctx context.Context, recipientID int64
if err != nil { if err != nil {
return nil, 0, err return nil, 0, err
} }
var result []domain.Notification = make([]domain.Notification, 0, len(dbNotifications)) var result []domain.Notification = make([]domain.Notification, 0, len(dbNotifications))
for _, dbNotif := range dbNotifications { for _, dbNotif := range dbNotifications {
domainNotif := r.mapDBToDomain(&dbNotif) domainNotif := r.mapDBToDomain(&dbNotif)
@ -160,6 +163,10 @@ func (r *Repository) ListRecipientIDs(ctx context.Context, receiver domain.Notif
return r.store.queries.ListRecipientIDsByReceiver(ctx, string(receiver)) return r.store.queries.ListRecipientIDsByReceiver(ctx, string(receiver))
} }
func (s *Repository) DeleteOldNotifications(ctx context.Context) error {
return s.store.queries.DeleteOldNotifications(ctx)
}
func (r *Repository) mapDBToDomain(dbNotif *dbgen.Notification) *domain.Notification { func (r *Repository) mapDBToDomain(dbNotif *dbgen.Notification) *domain.Notification {
var errorSeverity domain.NotificationErrorSeverity var errorSeverity domain.NotificationErrorSeverity
if dbNotif.ErrorSeverity.Valid { if dbNotif.ErrorSeverity.Valid {
@ -199,6 +206,8 @@ func (r *Repository) mapDBToDomain(dbNotif *dbgen.Notification) *domain.Notifica
Payload: payload, Payload: payload,
Priority: priority, Priority: priority,
Timestamp: dbNotif.Timestamp.Time, Timestamp: dbNotif.Timestamp.Time,
Expires: dbNotif.Expires.Time,
Image: dbNotif.Img.String,
Metadata: dbNotif.Metadata, Metadata: dbNotif.Metadata,
} }
} }

View File

@ -46,6 +46,8 @@ var (
ErrInvalidAmount = errors.New("invalid amount") ErrInvalidAmount = errors.New("invalid amount")
ErrBetAmountTooHigh = errors.New("cannot create a bet with an amount above limit") ErrBetAmountTooHigh = errors.New("cannot create a bet with an amount above limit")
ErrBetWinningTooHigh = errors.New("total Winnings over set limit") ErrBetWinningTooHigh = errors.New("total Winnings over set limit")
ErrCompanyDeductedPercentInvalid = errors.New("invalid company deducted percentage")
) )
type Service struct { type Service struct {
@ -303,8 +305,9 @@ func (s *Service) PlaceBet(ctx context.Context, req domain.CreateBetReq, userID
} }
fastCode := helpers.GenerateFastCode() fastCode := helpers.GenerateFastCode()
accumulator := calculateAccumulator(len(outcomes)) // accumulator := calculateAccumulator(len(outcomes))
amount := req.Amount + (req.Amount * accumulator) // amount := req.Amount + (req.Amount * accumulator)
amount := req.Amount
newBet := domain.CreateBet{ newBet := domain.CreateBet{
Amount: domain.ToCurrency(amount), Amount: domain.ToCurrency(amount),
@ -524,7 +527,25 @@ func (s *Service) DeductBetFromBranchWallet(ctx context.Context, amount float32,
return err return err
} }
deductedAmount := amount * company.DeductedPercentage if company.DeductedPercentage > 1 {
s.mongoLogger.Error("Invalid company deducted percentage",
zap.Int64("wallet_id", walletID),
zap.Float32("amount", company.DeductedPercentage),
zap.Error(err),
)
return ErrCompanyDeductedPercentInvalid
}
deductedAmount := amount - (amount * company.DeductedPercentage)
if deductedAmount == 0 {
s.mongoLogger.Fatal("Amount",
zap.Int64("wallet_id", walletID),
zap.Float32("amount", deductedAmount),
zap.Error(err),
)
return err
}
_, err = s.walletSvc.DeductFromWallet(ctx, _, err = s.walletSvc.DeductFromWallet(ctx,
walletID, domain.ToCurrency(deductedAmount), domain.ValidInt64{ walletID, domain.ToCurrency(deductedAmount), domain.ValidInt64{
Value: userID, Value: userID,

View File

@ -12,7 +12,7 @@ type BranchStore interface {
GetBranchByManagerID(ctx context.Context, branchManagerID int64) ([]domain.BranchDetail, error) GetBranchByManagerID(ctx context.Context, branchManagerID int64) ([]domain.BranchDetail, error)
GetBranchByCompanyID(ctx context.Context, companyID int64) ([]domain.BranchDetail, error) GetBranchByCompanyID(ctx context.Context, companyID int64) ([]domain.BranchDetail, error)
GetAllBranches(ctx context.Context, filter domain.BranchFilter) ([]domain.BranchDetail, error) GetAllBranches(ctx context.Context, filter domain.BranchFilter) ([]domain.BranchDetail, error)
SearchBranchByName(ctx context.Context, name string) ([]domain.BranchDetail, error) SearchBranchByName(ctx context.Context, name string, companyID domain.ValidInt64) ([]domain.BranchDetail, error)
UpdateBranch(ctx context.Context, branch domain.UpdateBranch) (domain.Branch, error) UpdateBranch(ctx context.Context, branch domain.UpdateBranch) (domain.Branch, error)
DeleteBranch(ctx context.Context, id int64) error DeleteBranch(ctx context.Context, id int64) error
CreateBranchOperation(ctx context.Context, branchOperation domain.CreateBranchOperation) error CreateBranchOperation(ctx context.Context, branchOperation domain.CreateBranchOperation) error

View File

@ -54,8 +54,8 @@ func (s *Service) GetAllSupportedOperations(ctx context.Context) ([]domain.Suppo
return s.branchStore.GetAllSupportedOperations(ctx) return s.branchStore.GetAllSupportedOperations(ctx)
} }
func (s *Service) SearchBranchByName(ctx context.Context, name string) ([]domain.BranchDetail, error) { func (s *Service) SearchBranchByName(ctx context.Context, name string, companyID domain.ValidInt64) ([]domain.BranchDetail, error) {
return s.branchStore.SearchBranchByName(ctx, name) return s.branchStore.SearchBranchByName(ctx, name, companyID)
} }
func (s *Service) UpdateBranch(ctx context.Context, branch domain.UpdateBranch) (domain.Branch, error) { func (s *Service) UpdateBranch(ctx context.Context, branch domain.UpdateBranch) (domain.Branch, error) {
return s.branchStore.UpdateBranch(ctx, branch) return s.branchStore.UpdateBranch(ctx, branch)

View File

@ -1,67 +1,67 @@
package kafka package kafka
import ( // import (
"context" // "context"
"encoding/json" // "encoding/json"
"log" // "log"
"github.com/SamuelTariku/FortuneBet-Backend/internal/event" // "github.com/SamuelTariku/FortuneBet-Backend/internal/event"
"github.com/SamuelTariku/FortuneBet-Backend/internal/web_server/ws" // "github.com/SamuelTariku/FortuneBet-Backend/internal/web_server/ws"
"github.com/segmentio/kafka-go" // "github.com/segmentio/kafka-go"
) // )
type WalletConsumer struct { // type WalletConsumer struct {
reader *kafka.Reader // reader *kafka.Reader
hub *ws.NotificationHub // hub *ws.NotificationHub
topic string // topic string
groupID string // groupID string
brokers []string // brokers []string
} // }
func NewWalletConsumer(brokers []string, topic, groupID string, hub *ws.NotificationHub) *WalletConsumer { // func NewWalletConsumer(brokers []string, topic, groupID string, hub *ws.NotificationHub) *WalletConsumer {
return &WalletConsumer{ // return &WalletConsumer{
brokers: brokers, // brokers: brokers,
topic: topic, // topic: topic,
groupID: groupID, // groupID: groupID,
hub: hub, // hub: hub,
reader: kafka.NewReader(kafka.ReaderConfig{ // reader: kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers, // Brokers: brokers,
GroupID: groupID, // GroupID: groupID,
Topic: topic, // Topic: topic,
}), // }),
} // }
} // }
func (c *WalletConsumer) Start(ctx context.Context) { // func (c *WalletConsumer) Start(ctx context.Context) {
go func() { // go func() {
for { // for {
m, err := c.reader.ReadMessage(ctx) // m, err := c.reader.ReadMessage(ctx)
if err != nil { // if err != nil {
log.Printf("Error reading wallet Kafka message: %v", err) // log.Printf("Error reading wallet Kafka message: %v", err)
continue // continue
} // }
var evt event.WalletEvent // var evt event.WalletEvent
if err := json.Unmarshal(m.Value, &evt); err != nil { // if err := json.Unmarshal(m.Value, &evt); err != nil {
log.Printf("Failed to unmarshal wallet event: %v", err) // log.Printf("Failed to unmarshal wallet event: %v", err)
continue // continue
} // }
payload := map[string]interface{}{ // payload := map[string]interface{}{
"type": evt.EventType, // "type": evt.EventType,
"wallet_id": evt.WalletID, // "wallet_id": evt.WalletID,
"user_id": evt.UserID, // "user_id": evt.UserID,
"balance": evt.Balance, // "balance": evt.Balance,
"wallet_type": evt.WalletType, // "wallet_type": evt.WalletType,
"trigger": evt.Trigger, // "trigger": evt.Trigger,
"recipient_id": evt.UserID, // "recipient_id": evt.UserID,
} // }
// Broadcast to appropriate WebSocket clients // // Broadcast to appropriate WebSocket clients
c.hub.Broadcast <- payload // c.hub.Broadcast <- payload
} // }
}() // }()
} // }
// func (c *WalletConsumer) Shutdown() error { // func (c *WalletConsumer) Shutdown() error {
// return c.reader.Close() // return c.reader.Close()

View File

@ -1,36 +1,36 @@
package kafka package kafka
import ( // import (
"context" // "context"
"encoding/json" // "encoding/json"
"time" // "time"
"github.com/segmentio/kafka-go" // "github.com/segmentio/kafka-go"
) // )
type Producer struct { // type Producer struct {
writer *kafka.Writer // writer *kafka.Writer
} // }
func NewProducer(brokers []string, topic string) *Producer { // func NewProducer(brokers []string, topic string) *Producer {
return &Producer{ // return &Producer{
writer: &kafka.Writer{ // writer: &kafka.Writer{
Addr: kafka.TCP(brokers...), // Addr: kafka.TCP(brokers...),
Topic: topic, // Topic: topic,
Balancer: &kafka.LeastBytes{}, // Balancer: &kafka.LeastBytes{},
}, // },
} // }
} // }
func (p *Producer) Publish(ctx context.Context, key string, event any) error { // func (p *Producer) Publish(ctx context.Context, key string, event any) error {
msgBytes, err := json.Marshal(event) // msgBytes, err := json.Marshal(event)
if err != nil { // if err != nil {
return err // return err
} // }
return p.writer.WriteMessages(ctx, kafka.Message{ // return p.writer.WriteMessages(ctx, kafka.Message{
Key: []byte(key), // Key: []byte(key),
Value: msgBytes, // Value: msgBytes,
Time: time.Now(), // Time: time.Now(),
}) // })
} // }

View File

@ -2,7 +2,6 @@ package notificationservice
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
// "errors" // "errors"
@ -12,19 +11,19 @@ import (
"github.com/SamuelTariku/FortuneBet-Backend/internal/config" "github.com/SamuelTariku/FortuneBet-Backend/internal/config"
"github.com/SamuelTariku/FortuneBet-Backend/internal/domain" "github.com/SamuelTariku/FortuneBet-Backend/internal/domain"
"github.com/SamuelTariku/FortuneBet-Backend/internal/event"
"github.com/SamuelTariku/FortuneBet-Backend/internal/pkgs/helpers" "github.com/SamuelTariku/FortuneBet-Backend/internal/pkgs/helpers"
"github.com/SamuelTariku/FortuneBet-Backend/internal/repository" "github.com/SamuelTariku/FortuneBet-Backend/internal/repository"
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/messenger" "github.com/SamuelTariku/FortuneBet-Backend/internal/services/messenger"
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/user" "github.com/SamuelTariku/FortuneBet-Backend/internal/services/user"
"github.com/segmentio/kafka-go" // "github.com/segmentio/kafka-go"
"go.uber.org/zap" "go.uber.org/zap"
// "github.com/SamuelTariku/FortuneBet-Backend/internal/services/wallet" // "github.com/SamuelTariku/FortuneBet-Backend/internal/services/wallet"
"github.com/SamuelTariku/FortuneBet-Backend/internal/web_server/ws" "github.com/SamuelTariku/FortuneBet-Backend/internal/web_server/ws"
// afro "github.com/amanuelabay/afrosms-go" // afro "github.com/amanuelabay/afrosms-go"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"github.com/redis/go-redis/v9" // "github.com/redis/go-redis/v9"
) )
type Service struct { type Service struct {
@ -39,8 +38,6 @@ type Service struct {
messengerSvc *messenger.Service messengerSvc *messenger.Service
mongoLogger *zap.Logger mongoLogger *zap.Logger
logger *slog.Logger logger *slog.Logger
redisClient *redis.Client
reader *kafka.Reader
} }
func New(repo repository.NotificationRepository, func New(repo repository.NotificationRepository,
@ -49,17 +46,8 @@ func New(repo repository.NotificationRepository,
cfg *config.Config, cfg *config.Config,
messengerSvc *messenger.Service, messengerSvc *messenger.Service,
userSvc *user.Service, userSvc *user.Service,
kafkaBrokers []string,
) *Service { ) *Service {
hub := ws.NewNotificationHub() hub := ws.NewNotificationHub()
rdb := redis.NewClient(&redis.Options{
Addr: cfg.RedisAddr, // e.g., "redis:6379"
})
walletReader := kafka.NewReader(kafka.ReaderConfig{
Brokers: kafkaBrokers,
Topic: "wallet-balance-topic",
GroupID: "notification-service-group", // Each service should have its own group
})
svc := &Service{ svc := &Service{
repo: repo, repo: repo,
@ -72,15 +60,13 @@ func New(repo repository.NotificationRepository,
messengerSvc: messengerSvc, messengerSvc: messengerSvc,
userSvc: userSvc, userSvc: userSvc,
config: cfg, config: cfg,
redisClient: rdb,
reader: walletReader,
} }
go hub.Run() go hub.Run()
go svc.startWorker() go svc.startWorker()
go svc.startRetryWorker() go svc.startRetryWorker()
go svc.RunRedisSubscriber(context.Background()) // go svc.RunRedisSubscriber(context.Background())
go svc.StartKafkaConsumer(context.Background()) // go svc.StartKafkaConsumer(context.Background())
return svc return svc
} }
@ -484,189 +470,192 @@ func (s *Service) CountUnreadNotifications(ctx context.Context, recipient_id int
return s.repo.CountUnreadNotifications(ctx, recipient_id) return s.repo.CountUnreadNotifications(ctx, recipient_id)
} }
func (s *Service) DeleteOldNotifications(ctx context.Context) error {
return s.repo.DeleteOldNotifications(ctx)
}
// func (s *Service) GetNotificationCounts(ctx context.Context, filter domain.ReportFilter) (total, read, unread int64, err error){ // func (s *Service) GetNotificationCounts(ctx context.Context, filter domain.ReportFilter) (total, read, unread int64, err error){
// return s.repo.Get(ctx, filter) // return s.repo.Get(ctx, filter)
// } // }
func (s *Service) RunRedisSubscriber(ctx context.Context) { // func (s *Service) RunRedisSubscriber(ctx context.Context) {
pubsub := s.redisClient.Subscribe(ctx, "live_metrics") // pubsub := s.redisClient.Subscribe(ctx, "live_metrics")
defer pubsub.Close() // defer pubsub.Close()
ch := pubsub.Channel() // ch := pubsub.Channel()
for msg := range ch { // for msg := range ch {
var parsed map[string]interface{} // var parsed map[string]interface{}
if err := json.Unmarshal([]byte(msg.Payload), &parsed); err != nil { // if err := json.Unmarshal([]byte(msg.Payload), &parsed); err != nil {
// s.logger.Error("invalid Redis message format", "payload", msg.Payload, "error", err) // // s.logger.Error("invalid Redis message format", "payload", msg.Payload, "error", err)
s.mongoLogger.Error("invalid Redis message format", // s.mongoLogger.Error("invalid Redis message format",
zap.String("payload", msg.Payload), // zap.String("payload", msg.Payload),
zap.Error(err), // zap.Error(err),
zap.Time("timestamp", time.Now()), // zap.Time("timestamp", time.Now()),
) // )
continue // continue
} // }
eventType, _ := parsed["type"].(string) // eventType, _ := parsed["type"].(string)
payload := parsed["payload"] // payload := parsed["payload"]
recipientID, hasRecipient := parsed["recipient_id"] // recipientID, hasRecipient := parsed["recipient_id"]
recipientType, _ := parsed["recipient_type"].(string) // recipientType, _ := parsed["recipient_type"].(string)
message := map[string]interface{}{ // message := map[string]interface{}{
"type": eventType, // "type": eventType,
"payload": payload, // "payload": payload,
} // }
if hasRecipient { // if hasRecipient {
message["recipient_id"] = recipientID // message["recipient_id"] = recipientID
message["recipient_type"] = recipientType // message["recipient_type"] = recipientType
} // }
s.Hub.Broadcast <- message // s.Hub.Broadcast <- message
} // }
} // }
func (s *Service) UpdateLiveWalletMetrics(ctx context.Context, companies []domain.GetCompany, branches []domain.BranchWallet) error { // func (s *Service) UpdateLiveWalletMetrics(ctx context.Context, companies []domain.GetCompany, branches []domain.BranchWallet) error {
const key = "live_metrics" // const key = "live_metrics"
companyBalances := make([]domain.CompanyWalletBalance, 0, len(companies)) // companyBalances := make([]domain.CompanyWalletBalance, 0, len(companies))
for _, c := range companies { // for _, c := range companies {
companyBalances = append(companyBalances, domain.CompanyWalletBalance{ // companyBalances = append(companyBalances, domain.CompanyWalletBalance{
CompanyID: c.ID, // CompanyID: c.ID,
CompanyName: c.Name, // CompanyName: c.Name,
Balance: float64(c.WalletBalance.Float32()), // Balance: float64(c.WalletBalance.Float32()),
}) // })
} // }
branchBalances := make([]domain.BranchWalletBalance, 0, len(branches)) // branchBalances := make([]domain.BranchWalletBalance, 0, len(branches))
for _, b := range branches { // for _, b := range branches {
branchBalances = append(branchBalances, domain.BranchWalletBalance{ // branchBalances = append(branchBalances, domain.BranchWalletBalance{
BranchID: b.ID, // BranchID: b.ID,
BranchName: b.Name, // BranchName: b.Name,
CompanyID: b.CompanyID, // CompanyID: b.CompanyID,
Balance: float64(b.Balance.Float32()), // Balance: float64(b.Balance.Float32()),
}) // })
} // }
payload := domain.LiveWalletMetrics{ // payload := domain.LiveWalletMetrics{
Timestamp: time.Now(), // Timestamp: time.Now(),
CompanyBalances: companyBalances, // CompanyBalances: companyBalances,
BranchBalances: branchBalances, // BranchBalances: branchBalances,
} // }
updatedData, err := json.Marshal(payload) // updatedData, err := json.Marshal(payload)
if err != nil { // if err != nil {
return err // return err
} // }
if err := s.redisClient.Set(ctx, key, updatedData, 0).Err(); err != nil { // if err := s.redisClient.Set(ctx, key, updatedData, 0).Err(); err != nil {
return err // return err
} // }
if err := s.redisClient.Publish(ctx, key, updatedData).Err(); err != nil { // if err := s.redisClient.Publish(ctx, key, updatedData).Err(); err != nil {
return err // return err
} // }
return nil // return nil
} // }
func (s *Service) GetLiveMetrics(ctx context.Context) (domain.LiveMetric, error) { // func (s *Service) GetLiveMetrics(ctx context.Context) (domain.LiveMetric, error) {
const key = "live_metrics" // const key = "live_metrics"
var metric domain.LiveMetric // var metric domain.LiveMetric
val, err := s.redisClient.Get(ctx, key).Result() // val, err := s.redisClient.Get(ctx, key).Result()
if err == redis.Nil { // if err == redis.Nil {
// Key does not exist yet, return zero-valued struct // // Key does not exist yet, return zero-valued struct
return domain.LiveMetric{}, nil // return domain.LiveMetric{}, nil
} else if err != nil { // } else if err != nil {
return domain.LiveMetric{}, err // return domain.LiveMetric{}, err
} // }
if err := json.Unmarshal([]byte(val), &metric); err != nil { // if err := json.Unmarshal([]byte(val), &metric); err != nil {
return domain.LiveMetric{}, err // return domain.LiveMetric{}, err
} // }
return metric, nil // return metric, nil
} // }
// func (s *Service) StartKafkaConsumer(ctx context.Context) {
// go func() {
// for {
// m, err := s.reader.ReadMessage(ctx)
// if err != nil {
// if err == context.Canceled {
// s.mongoLogger.Info("[NotificationSvc.KafkaConsumer] Stopped by context")
// return
// }
// s.mongoLogger.Error("[NotificationSvc.KafkaConsumer] Error reading message",
// zap.Error(err),
// zap.Time("timestamp", time.Now()),
// )
// time.Sleep(1 * time.Second) // backoff
// continue
// }
func (s *Service) StartKafkaConsumer(ctx context.Context) { // var walletEvent event.WalletEvent
go func() { // if err := json.Unmarshal(m.Value, &walletEvent); err != nil {
for { // s.mongoLogger.Error("[NotificationSvc.KafkaConsumer] Failed to unmarshal wallet event",
m, err := s.reader.ReadMessage(ctx) // zap.String("message", string(m.Value)),
if err != nil { // zap.Error(err),
if err == context.Canceled { // zap.Time("timestamp", time.Now()),
s.mongoLogger.Info("[NotificationSvc.KafkaConsumer] Stopped by context") // )
return // continue
} // }
s.mongoLogger.Error("[NotificationSvc.KafkaConsumer] Error reading message",
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
time.Sleep(1 * time.Second) // backoff
continue
}
var walletEvent event.WalletEvent // raw, _ := json.Marshal(map[string]any{
if err := json.Unmarshal(m.Value, &walletEvent); err != nil { // "balance": walletEvent.Balance.Float32(),
s.mongoLogger.Error("[NotificationSvc.KafkaConsumer] Failed to unmarshal wallet event", // "type": walletEvent.WalletType,
zap.String("message", string(m.Value)), // "timestamp": time.Now(),
zap.Error(err), // })
zap.Time("timestamp", time.Now()),
)
continue
}
raw, _ := json.Marshal(map[string]any{ // headline := ""
"balance": walletEvent.Balance.Float32(), // message := ""
"type": walletEvent.WalletType, // var receiver domain.NotificationRecieverSide
"timestamp": time.Now(), // switch walletEvent.WalletType {
})
headline := "" // case domain.StaticWalletType:
message := "" // headline = "Referral and Bonus Wallet Updated"
var receiver domain.NotificationRecieverSide // message = fmt.Sprintf("Your referral and bonus wallet balance is now %.2f", walletEvent.Balance.Float32())
switch walletEvent.WalletType { // receiver = domain.NotificationRecieverSideCustomer
// case domain.RegularWalletType:
// headline = "Wallet Updated"
// message = fmt.Sprintf("Your wallet balance is now %.2f", walletEvent.Balance.Float32())
// receiver = domain.NotificationRecieverSideCustomer
// case domain.BranchWalletType:
// headline = "Branch Wallet Updated"
// message = fmt.Sprintf("branch wallet balance is now %.2f", walletEvent.Balance.Float32())
// receiver = domain.NotificationRecieverSideBranchManager
// case domain.CompanyWalletType:
// headline = "Company Wallet Updated"
// message = fmt.Sprintf("company wallet balance is now %.2f", walletEvent.Balance.Float32())
// receiver = domain.NotificationRecieverSideAdmin
// }
// // Handle the wallet event: send notification
// notification := &domain.Notification{
// RecipientID: walletEvent.UserID,
// DeliveryChannel: domain.DeliveryChannelInApp,
// Reciever: receiver,
// Type: domain.NotificationTypeWalletUpdated,
// DeliveryStatus: domain.DeliveryStatusPending,
// IsRead: false,
// Level: domain.NotificationLevelInfo,
// Priority: 2,
// Metadata: raw,
// Payload: domain.NotificationPayload{
// Headline: headline,
// Message: message,
// },
// }
case domain.StaticWalletType: // if err := s.SendNotification(ctx, notification); err != nil {
headline = "Referral and Bonus Wallet Updated" // s.mongoLogger.Error("[NotificationSvc.KafkaConsumer] Failed to send notification",
message = fmt.Sprintf("Your referral and bonus wallet balance is now %.2f", walletEvent.Balance.Float32()) // zap.Error(err),
receiver = domain.NotificationRecieverSideCustomer // zap.Time("timestamp", time.Now()),
case domain.RegularWalletType: // )
headline = "Wallet Updated" // }
message = fmt.Sprintf("Your wallet balance is now %.2f", walletEvent.Balance.Float32()) // }
receiver = domain.NotificationRecieverSideCustomer // }()
case domain.BranchWalletType: // }
headline = "Branch Wallet Updated"
message = fmt.Sprintf("branch wallet balance is now %.2f", walletEvent.Balance.Float32())
receiver = domain.NotificationRecieverSideBranchManager
case domain.CompanyWalletType:
headline = "Company Wallet Updated"
message = fmt.Sprintf("company wallet balance is now %.2f", walletEvent.Balance.Float32())
receiver = domain.NotificationRecieverSideAdmin
}
// Handle the wallet event: send notification
notification := &domain.Notification{
RecipientID: walletEvent.UserID,
DeliveryChannel: domain.DeliveryChannelInApp,
Reciever: receiver,
Type: domain.NotificationTypeWalletUpdated,
DeliveryStatus: domain.DeliveryStatusPending,
IsRead: false,
Level: domain.NotificationLevelInfo,
Priority: 2,
Metadata: raw,
Payload: domain.NotificationPayload{
Headline: headline,
Message: message,
},
}
if err := s.SendNotification(ctx, notification); err != nil {
s.mongoLogger.Error("[NotificationSvc.KafkaConsumer] Failed to send notification",
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
}
}
}()
}
// func (s *Service) UpdateLiveWalletMetricForWallet(ctx context.Context, wallet domain.Wallet) { // func (s *Service) UpdateLiveWalletMetricForWallet(ctx context.Context, wallet domain.Wallet) {
// var ( // var (

View File

@ -54,7 +54,7 @@ func (s *Service) CreateShopDeposit(ctx context.Context, userID int64, role doma
// } // }
newTransaction, err := s.CreateShopTransaction(ctx, domain.CreateShopTransaction{ newTransaction, err := s.CreateShopTransaction(ctx, domain.CreateShopTransaction{
Amount: domain.Currency(req.Amount), Amount: domain.ToCurrency(req.Amount),
BranchID: branchID, BranchID: branchID,
CompanyID: companyID, CompanyID: companyID,
UserID: userID, UserID: userID,

View File

@ -8,7 +8,6 @@ import (
"time" "time"
"github.com/SamuelTariku/FortuneBet-Backend/internal/domain" "github.com/SamuelTariku/FortuneBet-Backend/internal/domain"
"github.com/SamuelTariku/FortuneBet-Backend/internal/event"
) )
// InitiateDirectDeposit creates a pending deposit request // InitiateDirectDeposit creates a pending deposit request
@ -73,8 +72,8 @@ func (s *Service) VerifyDirectDeposit(
} }
// Publish wallet update event // Publish wallet update event
go s.publishWalletUpdate(ctx, deposit.WalletID, deposit.Wallet.UserID, // go s.publishWalletUpdate(ctx, deposit.WalletID, deposit.Wallet.UserID,
deposit.Wallet.Balance+deposit.Amount, "direct_deposit_verified") // deposit.Wallet.Balance+deposit.Amount, "direct_deposit_verified")
// Update deposit status // Update deposit status
deposit.Status = domain.DepositStatusCompleted deposit.Status = domain.DepositStatusCompleted
@ -206,12 +205,12 @@ func (s *Service) notifyCustomerVerificationResult(ctx context.Context, deposit
} }
} }
func (s *Service) publishWalletUpdate(ctx context.Context, walletID, userID int64, newBalance domain.Currency, trigger string) { // func (s *Service) publishWalletUpdate(ctx context.Context, walletID, userID int64, newBalance domain.Currency, trigger string) {
s.kafkaProducer.Publish(ctx, fmt.Sprint(walletID), event.WalletEvent{ // s.kafkaProducer.Publish(ctx, fmt.Sprint(walletID), event.WalletEvent{
EventType: event.WalletBalanceUpdated, // EventType: event.WalletBalanceUpdated,
WalletID: walletID, // WalletID: walletID,
UserID: userID, // UserID: userID,
Balance: newBalance, // Balance: newBalance,
Trigger: trigger, // Trigger: trigger,
}) // })
} // }

View File

@ -2,10 +2,12 @@ package wallet
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"time"
"github.com/SamuelTariku/FortuneBet-Backend/internal/domain" "github.com/SamuelTariku/FortuneBet-Backend/internal/domain"
"go.uber.org/zap" "go.uber.org/zap"
"time"
) )
func (s *Service) GetAdminNotificationRecipients(ctx context.Context, walletID int64, walletType domain.WalletType) ([]int64, error) { func (s *Service) GetAdminNotificationRecipients(ctx context.Context, walletID int64, walletType domain.WalletType) ([]int64, error) {
@ -63,12 +65,65 @@ func (s *Service) GetAdminNotificationRecipients(ctx context.Context, walletID i
return recipients, nil return recipients, nil
} }
func (s *Service) SendWalletUpdateNotification(ctx context.Context, wallet domain.Wallet) error {
raw, _ := json.Marshal(map[string]any{
"balance": wallet.Balance.Float32(),
"type": wallet.Type,
"timestamp": time.Now(),
})
headline := ""
message := ""
var receiver domain.NotificationRecieverSide
switch wallet.Type {
case domain.StaticWalletType:
headline = "Referral and Bonus Wallet Updated"
message = fmt.Sprintf("Your referral and bonus wallet balance is now %.2f", wallet.Balance.Float32())
receiver = domain.NotificationRecieverSideCustomer
case domain.RegularWalletType:
headline = "Wallet Updated"
message = fmt.Sprintf("Your wallet balance is now %.2f", wallet.Balance.Float32())
receiver = domain.NotificationRecieverSideCustomer
case domain.BranchWalletType:
headline = "Branch Wallet Updated"
message = fmt.Sprintf("branch wallet balance is now %.2f", wallet.Balance.Float32())
receiver = domain.NotificationRecieverSideBranchManager
case domain.CompanyWalletType:
headline = "Company Wallet Updated"
message = fmt.Sprintf("company wallet balance is now %.2f", wallet.Balance.Float32())
receiver = domain.NotificationRecieverSideAdmin
}
// Handle the wallet event: send notification
notification := &domain.Notification{
RecipientID: wallet.UserID,
DeliveryChannel: domain.DeliveryChannelInApp,
Reciever: receiver,
Type: domain.NotificationTypeWalletUpdated,
DeliveryStatus: domain.DeliveryStatusPending,
IsRead: false,
Level: domain.NotificationLevelInfo,
Priority: 2,
Metadata: raw,
Payload: domain.NotificationPayload{
Headline: headline,
Message: message,
},
}
if err := s.notificationSvc.SendNotification(ctx, notification); err != nil {
s.mongoLogger.Error("[WalletSvc.SendWalletUpdateNotification] Failed to send notification",
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return err
}
return nil
}
func (s *Service) SendAdminWalletLowNotification(ctx context.Context, adminWallet domain.Wallet) error { func (s *Service) SendAdminWalletLowNotification(ctx context.Context, adminWallet domain.Wallet) error {
// Send different messages // Send different messages
// Send notification to admin team // Send notification to admin team
adminNotification := &domain.Notification{ adminNotification := &domain.Notification{
ErrorSeverity: "low", ErrorSeverity: "low",

View File

@ -3,7 +3,7 @@ package wallet
import ( import (
"log/slog" "log/slog"
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/kafka" // "github.com/SamuelTariku/FortuneBet-Backend/internal/services/kafka"
notificationservice "github.com/SamuelTariku/FortuneBet-Backend/internal/services/notification" notificationservice "github.com/SamuelTariku/FortuneBet-Backend/internal/services/notification"
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/user" "github.com/SamuelTariku/FortuneBet-Backend/internal/services/user"
"go.uber.org/zap" "go.uber.org/zap"
@ -18,7 +18,6 @@ type Service struct {
userSvc *user.Service userSvc *user.Service
mongoLogger *zap.Logger mongoLogger *zap.Logger
logger *slog.Logger logger *slog.Logger
kafkaProducer *kafka.Producer
} }
func NewService( func NewService(
@ -29,7 +28,6 @@ func NewService(
userSvc *user.Service, userSvc *user.Service,
mongoLogger *zap.Logger, mongoLogger *zap.Logger,
logger *slog.Logger, logger *slog.Logger,
kafkaProducer *kafka.Producer,
) *Service { ) *Service {
return &Service{ return &Service{
walletStore: walletStore, walletStore: walletStore,
@ -40,6 +38,5 @@ func NewService(
userSvc: userSvc, userSvc: userSvc,
mongoLogger: mongoLogger, mongoLogger: mongoLogger,
logger: logger, logger: logger,
kafkaProducer: kafkaProducer,
} }
} }

View File

@ -3,10 +3,12 @@ package wallet
import ( import (
"context" "context"
"errors" "errors"
"fmt"
// "fmt"
"github.com/SamuelTariku/FortuneBet-Backend/internal/domain" "github.com/SamuelTariku/FortuneBet-Backend/internal/domain"
"github.com/SamuelTariku/FortuneBet-Backend/internal/event" "go.uber.org/zap"
// "github.com/SamuelTariku/FortuneBet-Backend/internal/event"
) )
var ( var (
@ -92,16 +94,22 @@ func (s *Service) UpdateBalance(ctx context.Context, id int64, balance domain.Cu
return err return err
} }
go func() { // go func() {
s.kafkaProducer.Publish(ctx, fmt.Sprint(wallet.UserID), event.WalletEvent{ // s.kafkaProducer.Publish(ctx, fmt.Sprint(wallet.UserID), event.WalletEvent{
EventType: event.WalletBalanceUpdated, // EventType: event.WalletBalanceUpdated,
WalletID: wallet.ID, // WalletID: wallet.ID,
UserID: wallet.UserID, // UserID: wallet.UserID,
Balance: balance, // Balance: balance,
WalletType: wallet.Type, // WalletType: wallet.Type,
Trigger: "UpdateBalance", // Trigger: "UpdateBalance",
}) // })
}() // }()
if err := s.SendWalletUpdateNotification(ctx, wallet); err != nil {
s.mongoLogger.Info("Failed to send wallet update notification",
zap.Int64("wallet_id", wallet.ID),
zap.Error(err))
}
return nil return nil
} }
@ -118,16 +126,21 @@ func (s *Service) AddToWallet(
return domain.Transfer{}, err return domain.Transfer{}, err
} }
go func() { // go func() {
s.kafkaProducer.Publish(ctx, fmt.Sprint(wallet.ID), event.WalletEvent{ // s.kafkaProducer.Publish(ctx, fmt.Sprint(wallet.ID), event.WalletEvent{
EventType: event.WalletBalanceUpdated, // EventType: event.WalletBalanceUpdated,
WalletID: wallet.ID, // WalletID: wallet.ID,
UserID: wallet.UserID, // UserID: wallet.UserID,
Balance: wallet.Balance + amount, // Balance: wallet.Balance + amount,
WalletType: wallet.Type, // WalletType: wallet.Type,
Trigger: "AddToWallet", // Trigger: "AddToWallet",
}) // })
}() // }()
if err := s.SendWalletUpdateNotification(ctx, wallet); err != nil {
s.mongoLogger.Info("Failed to send wallet update notification",
zap.Int64("wallet_id", wallet.ID),
zap.Error(err))
}
// Log the transfer here for reference // Log the transfer here for reference
newTransfer, err := s.transferStore.CreateTransfer(ctx, domain.CreateTransfer{ newTransfer, err := s.transferStore.CreateTransfer(ctx, domain.CreateTransfer{
@ -184,17 +197,23 @@ func (s *Service) DeductFromWallet(ctx context.Context, id int64, amount domain.
return domain.Transfer{}, nil return domain.Transfer{}, nil
} }
go func() { // go func() {
s.kafkaProducer.Publish(ctx, fmt.Sprint(wallet.ID), event.WalletEvent{ // s.kafkaProducer.Publish(ctx, fmt.Sprint(wallet.ID), event.WalletEvent{
EventType: event.WalletBalanceUpdated, // EventType: event.WalletBalanceUpdated,
WalletID: wallet.ID, // WalletID: wallet.ID,
UserID: wallet.UserID, // UserID: wallet.UserID,
Balance: wallet.Balance - amount, // Balance: wallet.Balance - amount,
WalletType: wallet.Type, // WalletType: wallet.Type,
Trigger: "DeductFromWallet", // Trigger: "DeductFromWallet",
}) // })
}() // }()
if err := s.SendWalletUpdateNotification(ctx, wallet); err != nil {
s.mongoLogger.Info("Failed to send wallet update notification",
zap.Int64("wallet_id", wallet.ID),
zap.Error(err))
}
// Log the transfer here for reference // Log the transfer here for reference
newTransfer, err := s.transferStore.CreateTransfer(ctx, domain.CreateTransfer{ newTransfer, err := s.transferStore.CreateTransfer(ctx, domain.CreateTransfer{
Message: message, Message: message,

View File

@ -11,6 +11,7 @@ import (
betSvc "github.com/SamuelTariku/FortuneBet-Backend/internal/services/bet" betSvc "github.com/SamuelTariku/FortuneBet-Backend/internal/services/bet"
enetpulse "github.com/SamuelTariku/FortuneBet-Backend/internal/services/enet_pulse" enetpulse "github.com/SamuelTariku/FortuneBet-Backend/internal/services/enet_pulse"
eventsvc "github.com/SamuelTariku/FortuneBet-Backend/internal/services/event" eventsvc "github.com/SamuelTariku/FortuneBet-Backend/internal/services/event"
notificationservice "github.com/SamuelTariku/FortuneBet-Backend/internal/services/notification"
oddssvc "github.com/SamuelTariku/FortuneBet-Backend/internal/services/odds" oddssvc "github.com/SamuelTariku/FortuneBet-Backend/internal/services/odds"
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/report" "github.com/SamuelTariku/FortuneBet-Backend/internal/services/report"
resultsvc "github.com/SamuelTariku/FortuneBet-Backend/internal/services/result" resultsvc "github.com/SamuelTariku/FortuneBet-Backend/internal/services/result"
@ -27,75 +28,75 @@ func StartDataFetchingCrons(eventService eventsvc.Service, oddsService oddssvc.S
spec string spec string
task func() task func()
}{ }{
{ // {
spec: "0 0 * * * *", // Every 1 hour // spec: "0 0 * * * *", // Every 1 hour
task: func() { // task: func() {
mongoLogger.Info("Began fetching upcoming events cron task") // mongoLogger.Info("Began fetching upcoming events cron task")
if err := eventService.FetchUpcomingEvents(context.Background()); err != nil { // if err := eventService.FetchUpcomingEvents(context.Background()); err != nil {
mongoLogger.Error("Failed to fetch upcoming events", // mongoLogger.Error("Failed to fetch upcoming events",
zap.Error(err), // zap.Error(err),
) // )
} else { // } else {
mongoLogger.Info("Completed fetching upcoming events without errors") // mongoLogger.Info("Completed fetching upcoming events without errors")
} // }
}, // },
}, // },
{ // {
spec: "0 0 * * * *", // Every 1 hour (since its takes that long to fetch all the events) // spec: "0 0 * * * *", // Every 1 hour (since its takes that long to fetch all the events)
task: func() { // task: func() {
mongoLogger.Info("Began fetching non live odds cron task") // mongoLogger.Info("Began fetching non live odds cron task")
if err := oddsService.FetchNonLiveOdds(context.Background()); err != nil { // if err := oddsService.FetchNonLiveOdds(context.Background()); err != nil {
mongoLogger.Error("Failed to fetch non live odds", // mongoLogger.Error("Failed to fetch non live odds",
zap.Error(err), // zap.Error(err),
) // )
} else { // } else {
mongoLogger.Info("Completed fetching non live odds without errors") // mongoLogger.Info("Completed fetching non live odds without errors")
} // }
}, // },
}, // },
{ // {
spec: "0 */5 * * * *", // Every 5 Minutes // spec: "0 */5 * * * *", // Every 5 Minutes
task: func() { // task: func() {
mongoLogger.Info("Began update all expired events status cron task") // mongoLogger.Info("Began update all expired events status cron task")
if _, err := resultService.CheckAndUpdateExpiredB365Events(context.Background()); err != nil { // if _, err := resultService.CheckAndUpdateExpiredB365Events(context.Background()); err != nil {
mongoLogger.Error("Failed to update expired events status", // mongoLogger.Error("Failed to update expired events status",
zap.Error(err), // zap.Error(err),
) // )
} else { // } else {
mongoLogger.Info("Completed expired events without errors") // mongoLogger.Info("Completed expired events without errors")
} // }
}, // },
}, // },
{ // {
spec: "0 */15 * * * *", // Every 15 Minutes // spec: "0 */15 * * * *", // Every 15 Minutes
task: func() { // task: func() {
mongoLogger.Info("Began updating bets based on event results cron task") // mongoLogger.Info("Began updating bets based on event results cron task")
if err := resultService.FetchB365ResultAndUpdateBets(context.Background()); err != nil { // if err := resultService.FetchB365ResultAndUpdateBets(context.Background()); err != nil {
mongoLogger.Error("Failed to process result", // mongoLogger.Error("Failed to process result",
zap.Error(err), // zap.Error(err),
) // )
} else { // } else {
mongoLogger.Info("Completed processing all event result outcomes without errors") // mongoLogger.Info("Completed processing all event result outcomes without errors")
} // }
}, // },
}, // },
{ // {
spec: "0 0 0 * * 1", // Every Monday // spec: "0 0 0 * * 1", // Every Monday
task: func() { // task: func() {
mongoLogger.Info("Began Send weekly result notification cron task") // mongoLogger.Info("Began Send weekly result notification cron task")
if err := resultService.CheckAndSendResultNotifications(context.Background(), time.Now().Add(-7*24*time.Hour)); err != nil { // if err := resultService.CheckAndSendResultNotifications(context.Background(), time.Now().Add(-7*24*time.Hour)); err != nil {
mongoLogger.Error("Failed to process result", // mongoLogger.Error("Failed to process result",
zap.Error(err), // zap.Error(err),
) // )
} else { // } else {
mongoLogger.Info("Completed sending weekly result notification without errors") // mongoLogger.Info("Completed sending weekly result notification without errors")
} // }
}, // },
}, // },
} }
for _, job := range schedule { for _, job := range schedule {
// job.task() job.task()
if _, err := c.AddFunc(job.spec, job.task); err != nil { if _, err := c.AddFunc(job.spec, job.task); err != nil {
mongoLogger.Error("Failed to schedule data fetching cron job", mongoLogger.Error("Failed to schedule data fetching cron job",
zap.Error(err), zap.Error(err),
@ -108,7 +109,7 @@ func StartDataFetchingCrons(eventService eventsvc.Service, oddsService oddssvc.S
mongoLogger.Info("Cron jobs started for event and odds services") mongoLogger.Info("Cron jobs started for event and odds services")
} }
func StartTicketCrons(ticketService ticket.Service, mongoLogger *zap.Logger) { func StartCleanupCrons(ticketService ticket.Service, notificationSvc *notificationservice.Service, mongoLogger *zap.Logger) {
c := cron.New(cron.WithSeconds()) c := cron.New(cron.WithSeconds())
schedule := []struct { schedule := []struct {
@ -128,6 +129,19 @@ func StartTicketCrons(ticketService ticket.Service, mongoLogger *zap.Logger) {
} }
}, },
}, },
{
spec: "0 0 * * * *",
task: func() {
mongoLogger.Info("Deleting old notifications")
if err := notificationSvc.DeleteOldNotifications(context.Background()); err != nil {
mongoLogger.Error("Failed to remove old notifications",
zap.Error(err),
)
} else {
mongoLogger.Info("Successfully deleted old notifications")
}
},
},
} }
for _, job := range schedule { for _, job := range schedule {

View File

@ -643,7 +643,6 @@ func (h *Handler) GetAllTenantBets(c *fiber.Ctx) error {
h.BadRequestLogger().Error("invalid company id", zap.Any("company_id", companyID)) h.BadRequestLogger().Error("invalid company id", zap.Any("company_id", companyID))
return fiber.NewError(fiber.StatusBadRequest, "invalid company id") return fiber.NewError(fiber.StatusBadRequest, "invalid company id")
} }
role := c.Locals("role").(domain.Role)
page := c.QueryInt("page", 1) page := c.QueryInt("page", 1)
pageSize := c.QueryInt("page_size", 10) pageSize := c.QueryInt("page_size", 10)
@ -657,7 +656,7 @@ func (h *Handler) GetAllTenantBets(c *fiber.Ctx) error {
} }
var isShopBet domain.ValidBool var isShopBet domain.ValidBool
isShopBetQuery := c.Query("is_shop") isShopBetQuery := c.Query("is_shop")
if isShopBetQuery != "" && role == domain.RoleSuperAdmin { if isShopBetQuery != "" {
isShopBetParse, err := strconv.ParseBool(isShopBetQuery) isShopBetParse, err := strconv.ParseBool(isShopBetQuery)
if err != nil { if err != nil {
h.mongoLoggerSvc.Info("failed to parse is_shop_bet", h.mongoLoggerSvc.Info("failed to parse is_shop_bet",
@ -775,6 +774,8 @@ func (h *Handler) GetAllTenantBets(c *fiber.Ctx) error {
// @Failure 500 {object} response.APIResponse // @Failure 500 {object} response.APIResponse
// @Router /api/v1/sport/bet/{id} [get] // @Router /api/v1/sport/bet/{id} [get]
func (h *Handler) GetBetByID(c *fiber.Ctx) error { func (h *Handler) GetBetByID(c *fiber.Ctx) error {
companyID := c.Locals("company_id").(domain.ValidInt64)
betID := c.Params("id") betID := c.Params("id")
id, err := strconv.ParseInt(betID, 10, 64) id, err := strconv.ParseInt(betID, 10, 64)
if err != nil { if err != nil {
@ -800,6 +801,15 @@ func (h *Handler) GetBetByID(c *fiber.Ctx) error {
res := domain.ConvertBet(bet) res := domain.ConvertBet(bet)
if companyID.Valid && bet.CompanyID != companyID.Value {
h.mongoLoggerSvc.Warn("Warn - Company is trying to access another companies bet",
zap.Int64("betID", id),
zap.Int("status_code", fiber.StatusNotFound),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return fiber.NewError(fiber.StatusNotFound, "Failed to retrieve bet")
}
// h.mongoLoggerSvc.Info("Bet retrieved successfully", // h.mongoLoggerSvc.Info("Bet retrieved successfully",
// zap.Int64("betID", id), // zap.Int64("betID", id),
// zap.Int("status_code", fiber.StatusOK), // zap.Int("status_code", fiber.StatusOK),

View File

@ -611,6 +611,8 @@ func (h *Handler) GetAllBranches(c *fiber.Ctx) error {
// @Failure 500 {object} response.APIResponse // @Failure 500 {object} response.APIResponse
// @Router /api/v1/search/branch [get] // @Router /api/v1/search/branch [get]
func (h *Handler) SearchBranch(c *fiber.Ctx) error { func (h *Handler) SearchBranch(c *fiber.Ctx) error {
companyID := c.Locals("company_id").(domain.ValidInt64)
// Get search query from request // Get search query from request
searchQuery := c.Query("q") searchQuery := c.Query("q")
if searchQuery == "" { if searchQuery == "" {
@ -622,7 +624,7 @@ func (h *Handler) SearchBranch(c *fiber.Ctx) error {
} }
// Call the service to search for branches // Call the service to search for branches
branches, err := h.branchSvc.SearchBranchByName(c.Context(), searchQuery) branches, err := h.branchSvc.SearchBranchByName(c.Context(), searchQuery, companyID)
if err != nil { if err != nil {
h.mongoLoggerSvc.Info("Failed to search branches", h.mongoLoggerSvc.Info("Failed to search branches",
zap.String("query", searchQuery), zap.String("query", searchQuery),

View File

@ -270,22 +270,22 @@ func (h *Handler) GetTenantUpcomingEvents(c *fiber.Ctx) error {
Valid: searchQuery != "", Valid: searchQuery != "",
} }
// firstStartTimeQuery := c.Query("first_start_time") firstStartTimeQuery := c.Query("first_start_time")
// var firstStartTime domain.ValidTime var firstStartTime domain.ValidTime
// if firstStartTimeQuery != "" { if firstStartTimeQuery != "" {
// firstStartTimeParsed, err := time.Parse(time.RFC3339, firstStartTimeQuery) firstStartTimeParsed, err := time.Parse(time.RFC3339, firstStartTimeQuery)
// if err != nil { if err != nil {
// h.BadRequestLogger().Info("invalid start_time format", h.BadRequestLogger().Info("invalid start_time format",
// zap.String("first_start_time", firstStartTimeQuery), zap.String("first_start_time", firstStartTimeQuery),
// zap.Error(err), zap.Error(err),
// ) )
// return fiber.NewError(fiber.StatusBadRequest, "Invalid start_time format") return fiber.NewError(fiber.StatusBadRequest, "Invalid start_time format")
// } }
// firstStartTime = domain.ValidTime{ firstStartTime = domain.ValidTime{
// Value: firstStartTimeParsed, Value: firstStartTimeParsed,
// Valid: true, Valid: true,
// } }
// } }
lastStartTimeQuery := c.Query("last_start_time") lastStartTimeQuery := c.Query("last_start_time")
var lastStartTime domain.ValidTime var lastStartTime domain.ValidTime
@ -330,18 +330,15 @@ func (h *Handler) GetTenantUpcomingEvents(c *fiber.Ctx) error {
events, total, err := h.eventSvc.GetEventsWithSettings( events, total, err := h.eventSvc.GetEventsWithSettings(
c.Context(), companyID.Value, domain.EventFilter{ c.Context(), companyID.Value, domain.EventFilter{
SportID: sportID, SportID: sportID,
LeagueID: leagueID, LeagueID: leagueID,
Query: searchString, Query: searchString,
FirstStartTime: domain.ValidTime{ FirstStartTime: firstStartTime,
Value: time.Now(), LastStartTime: lastStartTime,
Valid: true, Limit: limit,
}, Offset: offset,
LastStartTime: lastStartTime, CountryCode: countryCode,
Limit: limit, Featured: isFeatured,
Offset: offset,
CountryCode: countryCode,
Featured: isFeatured,
Status: domain.ValidEventStatus{ Status: domain.ValidEventStatus{
Value: domain.STATUS_PENDING, Value: domain.STATUS_PENDING,
Valid: true, Valid: true,

View File

@ -348,8 +348,8 @@ func (a *App) initAppRoutes() {
tenant.Patch("/sport/bet/:id", a.authMiddleware, h.UpdateCashOut) tenant.Patch("/sport/bet/:id", a.authMiddleware, h.UpdateCashOut)
tenant.Delete("/sport/bet/:id", a.authMiddleware, h.DeleteTenantBet) tenant.Delete("/sport/bet/:id", a.authMiddleware, h.DeleteTenantBet)
groupV1.Get("/sport/bet/:id", a.authMiddleware, a.CompanyOnly, h.GetBetByID)
groupV1.Get("/sport/bet", a.authMiddleware, a.SuperAdminOnly, h.GetAllBet) groupV1.Get("/sport/bet", a.authMiddleware, a.SuperAdminOnly, h.GetAllBet)
groupV1.Get("/sport/bet/:id", a.authMiddleware, a.SuperAdminOnly, h.GetBetByID)
groupV1.Delete("/sport/bet/:id", a.authMiddleware, a.SuperAdminOnly, h.DeleteBet) groupV1.Delete("/sport/bet/:id", a.authMiddleware, a.SuperAdminOnly, h.DeleteBet)
tenant.Post("/sport/random/bet", a.authMiddleware, h.RandomBet) tenant.Post("/sport/random/bet", a.authMiddleware, h.RandomBet)

View File

@ -79,7 +79,7 @@ logs:
@mkdir -p logs @mkdir -p logs
db-up: | logs db-up: | logs
@mkdir -p logs @mkdir -p logs
@docker compose up -d postgres migrate mongo redis kafka @docker compose up -d postgres migrate mongo
@docker logs fortunebet-backend-postgres-1 > logs/postgres.log 2>&1 & @docker logs fortunebet-backend-postgres-1 > logs/postgres.log 2>&1 &
.PHONY: db-down .PHONY: db-down
db-down: db-down: