Enetpulse tournament+tournament_stages imlementations + mergeconflict fixes

This commit is contained in:
Yared Yemane 2025-09-28 18:28:00 +03:00
commit af7c792dee
58 changed files with 1212 additions and 889 deletions

View File

@ -119,13 +119,13 @@ func main() {
oddsSvc := odds.New(store, cfg, eventSvc, logger, domain.MongoDBLogger)
notificationRepo := repository.NewNotificationRepository(store)
virtuaGamesRepo := repository.NewVirtualGameRepository(store)
notificationSvc := notificationservice.New(notificationRepo, domain.MongoDBLogger, logger, cfg, messengerSvc, userSvc)
// var userStore user.UserStore
// Initialize producer
brokers := []string{"localhost:9092"}
topic := "wallet-balance-topic"
producer := kafka.NewProducer(brokers, topic)
producer := kafka.NewProducer(cfg.KafkaBrokers, topic)
notificationSvc := notificationservice.New(notificationRepo, domain.MongoDBLogger, logger, cfg, messengerSvc, userSvc, cfg.KafkaBrokers)
walletSvc := wallet.NewService(
wallet.WalletStore(store),

View File

@ -78,9 +78,11 @@ SET value = EXCLUDED.value;
INSERT INTO global_settings (key, value)
VALUES ('sms_provider', 'afro_message'),
('max_number_of_outcomes', '30'),
('max_unsettled_bets', '100'),
('bet_amount_limit', '10000000'),
('daily_ticket_limit', '50'),
('total_winnings_limit', '1000000'),
('total_winnings_limit', '100000000000'),
('total_winnings_notify', '100000000'),
('amount_for_bet_referral', '1000000'),
('cashback_amount_cap', '1000'),
('default_winning_limit', '5000000'),

View File

@ -76,7 +76,7 @@ VALUES (
TRUE,
TRUE,
TRUE,
1,
5,
'regular_wallet',
'ETB',
TRUE,
@ -89,7 +89,7 @@ VALUES (
FALSE,
TRUE,
TRUE,
1,
5,
'static_wallet',
'ETB',
TRUE,
@ -102,7 +102,7 @@ VALUES (
TRUE,
TRUE,
TRUE,
1,
6,
'regular_wallet',
'ETB',
TRUE,
@ -115,7 +115,7 @@ VALUES (
FALSE,
TRUE,
TRUE,
1,
6,
'static_wallet',
'ETB',
TRUE,

View File

@ -72,8 +72,11 @@ CREATE TABLE IF NOT EXISTS wallets (
),
is_active BOOLEAN NOT NULL DEFAULT true,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(user_id, type)
);
CREATE TABLE refresh_tokens (
id BIGSERIAL PRIMARY KEY,
user_id BIGINT NOT NULL,
@ -537,6 +540,16 @@ CREATE TABLE IF NOT EXISTS raffle_game_filters (
game_id VARCHAR(150) NOT NULL,
CONSTRAINT unique_raffle_game UNIQUE (raffle_id, game_id)
);
CREATE TABLE IF NOT EXISTS accumulator (
outcome_count BIGINT PRIMARY KEY,
default_multiplier REAL NOT NULL
);
CREATE TABLE IF NOT EXISTS company_accumulator (
id SERIAL PRIMARY KEY,
company_id BIGINT NOT NULL,
outcome_count BIGINT NOT NULL,
multiplier REAL NOT NULL
);
------ Views
CREATE VIEW companies_details AS
SELECT companies.*,

View File

@ -13,6 +13,7 @@ CREATE TABLE IF NOT EXISTS notifications (
'signup_welcome',
'otp_sent',
'wallet_threshold',
'wallet_updated',
'transfer_failed',
'transfer_success',
'admin_alert',

View File

@ -0,0 +1,2 @@
DROP TABLE IF EXISTS virtual_games;
DROP TABLE IF EXISTS user_game_interactions;

View File

@ -1,11 +0,0 @@
-- -- Settings Initial Data
-- INSERT INTO global_settings (key, value)
-- VALUES ('sms_provider', 'afro_message'),
-- ('max_number_of_outcomes', '30'),
-- ('bet_amount_limit', '10000000'),
-- ('daily_ticket_limit', '50'),
-- ('total_winnings_limit', '1000000'),
-- ('amount_for_bet_referral', '1000000'),
-- ('cashback_amount_cap', '1000') ON CONFLICT (key) DO
-- UPDATE
-- SET value = EXCLUDED.value;

View File

@ -1,75 +0,0 @@
-- -- Locations Initial Data
-- INSERT INTO branch_locations (key, value)
-- VALUES ('addis_ababa', 'Addis Ababa'),
-- ('dire_dawa', 'Dire Dawa'),
-- ('mekelle', 'Mekelle'),
-- ('adama', 'Adama'),
-- ('awassa', 'Awassa'),
-- ('bahir_dar', 'Bahir Dar'),
-- ('gonder', 'Gonder'),
-- ('dessie', 'Dessie'),
-- ('jimma', 'Jimma'),
-- ('jijiga', 'Jijiga'),
-- ('shashamane', 'Shashamane'),
-- ('bishoftu', 'Bishoftu'),
-- ('sodo', 'Sodo'),
-- ('arba_minch', 'Arba Minch'),
-- ('hosaena', 'Hosaena'),
-- ('harar', 'Harar'),
-- ('dilla', 'Dilla'),
-- ('nekemte', 'Nekemte'),
-- ('debre_birhan', 'Debre Birhan'),
-- ('asella', 'Asella'),
-- ('debre_markos', 'Debre Markos'),
-- ('kombolcha', 'Kombolcha'),
-- ('debre_tabor', 'Debre Tabor'),
-- ('adigrat', 'Adigrat'),
-- ('areka', 'Areka'),
-- ('weldiya', 'Weldiya'),
-- ('sebeta', 'Sebeta'),
-- ('burayu', 'Burayu'),
-- ('shire', 'Shire'),
-- ('ambo', 'Ambo'),
-- ('arsi_negele', 'Arsi Negele'),
-- ('aksum', 'Aksum'),
-- ('gambela', 'Gambela'),
-- ('bale_robe', 'Bale Robe'),
-- ('butajira', 'Butajira'),
-- ('batu', 'Batu'),
-- ('boditi', 'Boditi'),
-- ('adwa', 'Adwa'),
-- ('yirgalem', 'Yirgalem'),
-- ('waliso', 'Waliso'),
-- ('welkite', 'Welkite'),
-- ('gode', 'Gode'),
-- ('meki', 'Meki'),
-- ('negele_borana', 'Negele Borana'),
-- ('alaba_kulito', 'Alaba Kulito'),
-- ('alamata,', 'Alamata,'),
-- ('chiro', 'Chiro'),
-- ('tepi', 'Tepi'),
-- ('durame', 'Durame'),
-- ('goba', 'Goba'),
-- ('assosa', 'Assosa'),
-- ('gimbi', 'Gimbi'),
-- ('wukro', 'Wukro'),
-- ('haramaya', 'Haramaya'),
-- ('mizan_teferi', 'Mizan Teferi'),
-- ('sawla', 'Sawla'),
-- ('mojo', 'Mojo'),
-- ('dembi_dolo', 'Dembi Dolo'),
-- ('aleta_wendo', 'Aleta Wendo'),
-- ('metu', 'Metu'),
-- ('mota', 'Mota'),
-- ('fiche', 'Fiche'),
-- ('finote_selam', 'Finote Selam'),
-- ('bule_hora_town', 'Bule Hora Town'),
-- ('bonga', 'Bonga'),
-- ('kobo', 'Kobo'),
-- ('jinka', 'Jinka'),
-- ('dangila', 'Dangila'),
-- ('degehabur', 'Degehabur'),
-- ('bedessa', 'Bedessa'),
-- ('agaro', 'Agaro') ON CONFLICT (key) DO
-- UPDATE
-- SET value = EXCLUDED.value;

View File

@ -1,220 +0,0 @@
-- CREATE EXTENSION IF NOT EXISTS pgcrypto;
-- -- Users
-- INSERT INTO users (
-- id,
-- first_name,
-- last_name,
-- email,
-- phone_number,
-- password,
-- role,
-- email_verified,
-- phone_verified,
-- created_at,
-- updated_at,
-- suspended,
-- company_id
-- )
-- VALUES (
-- 1,
-- 'John',
-- 'Doe',
-- 'john.doe@example.com',
-- NULL,
-- crypt('password@123', gen_salt('bf'))::bytea,
-- 'customer',
-- TRUE,
-- FALSE,
-- CURRENT_TIMESTAMP,
-- CURRENT_TIMESTAMP,
-- FALSE,
-- 1
-- ),
-- (
-- 2,
-- 'Test',
-- 'Admin',
-- 'test.admin@gmail.com',
-- '0988554466',
-- crypt('password@123', gen_salt('bf'))::bytea,
-- 'admin',
-- TRUE,
-- TRUE,
-- CURRENT_TIMESTAMP,
-- CURRENT_TIMESTAMP,
-- FALSE,
-- 1
-- ),
-- (
-- 3,
-- 'Samuel',
-- 'Tariku',
-- 'cybersamt@gmail.com',
-- '0911111111',
-- crypt('password@123', gen_salt('bf'))::bytea,
-- 'super_admin',
-- TRUE,
-- TRUE,
-- CURRENT_TIMESTAMP,
-- CURRENT_TIMESTAMP,
-- FALSE,
-- NULL
-- ),
-- (
-- 4,
-- 'Kirubel',
-- 'Kibru',
-- 'kirubel.jkl679@gmail.com',
-- '0911554486',
-- crypt('password@123', gen_salt('bf'))::bytea,
-- 'super_admin',
-- TRUE,
-- TRUE,
-- CURRENT_TIMESTAMP,
-- CURRENT_TIMESTAMP,
-- FALSE,
-- NULL
-- ),
-- (
-- 5,
-- 'Test',
-- 'Veli',
-- 'test.veli@example.com',
-- NULL,
-- crypt('password@123', gen_salt('bf'))::bytea,
-- 'customer',
-- TRUE,
-- FALSE,
-- CURRENT_TIMESTAMP,
-- CURRENT_TIMESTAMP,
-- FALSE,
-- 1
-- );
-- -- Supported Operations
-- INSERT INTO supported_operations (id, name, description)
-- VALUES (1, 'SportBook', 'Sportbook operations'),
-- (2, 'Virtual', 'Virtual operations');
-- -- Wallets
-- INSERT INTO wallets (
-- id,
-- balance,
-- is_withdraw,
-- is_bettable,
-- is_transferable,
-- user_id,
-- type,
-- currency,
-- is_active,
-- created_at,
-- updated_at
-- )
-- VALUES (
-- 1,
-- 10000,
-- TRUE,
-- TRUE,
-- TRUE,
-- 1,
-- 'regular_wallet',
-- 'ETB',
-- TRUE,
-- CURRENT_TIMESTAMP,
-- CURRENT_TIMESTAMP
-- ),
-- (
-- 2,
-- 5000,
-- FALSE,
-- TRUE,
-- TRUE,
-- 1,
-- 'static_wallet',
-- 'ETB',
-- TRUE,
-- CURRENT_TIMESTAMP,
-- CURRENT_TIMESTAMP
-- ),
-- (
-- 3,
-- 20000,
-- TRUE,
-- TRUE,
-- TRUE,
-- 2,
-- 'company_wallet',
-- 'ETB',
-- TRUE,
-- CURRENT_TIMESTAMP,
-- CURRENT_TIMESTAMP
-- ),
-- (
-- 4,
-- 15000,
-- TRUE,
-- TRUE,
-- TRUE,
-- 2,
-- 'branch_wallet',
-- 'ETB',
-- TRUE,
-- CURRENT_TIMESTAMP,
-- CURRENT_TIMESTAMP
-- );
-- -- Customer Wallets
-- INSERT INTO customer_wallets (
-- id,
-- customer_id,
-- regular_wallet_id,
-- static_wallet_id
-- )
-- VALUES (1, 1, 1, 2);
-- -- Company
-- INSERT INTO companies (
-- id,
-- name,
-- slug,
-- admin_id,
-- wallet_id,
-- deducted_percentage,
-- is_active,
-- created_at,
-- updated_at
-- )
-- VALUES (
-- 1,
-- 'FortuneBets',
-- 'fortunebets',
-- 2,
-- 3,
-- 0.10,
-- TRUE,
-- CURRENT_TIMESTAMP,
-- CURRENT_TIMESTAMP
-- );
-- -- Branch
-- INSERT INTO branches (
-- id,
-- name,
-- location,
-- wallet_id,
-- branch_manager_id,
-- company_id,
-- is_self_owned,
-- profit_percent,
-- is_active,
-- created_at,
-- updated_at
-- )
-- VALUES (
-- 1,
-- 'Test Branch',
-- 'addis_ababa',
-- 4,
-- 2,
-- 1,
-- TRUE,
-- 0.10,
-- TRUE,
-- CURRENT_TIMESTAMP,
-- CURRENT_TIMESTAMP
-- );

View File

@ -0,0 +1,2 @@
DROP TABLE IF EXISTS enetpulse_sports;
DROP TABLE IF EXISTS enetpulse_tournament_templates;

View File

@ -0,0 +1,23 @@
CREATE TABLE IF NOT EXISTS enetpulse_sports (
id BIGSERIAL PRIMARY KEY,
sport_id VARCHAR(50) NOT NULL UNIQUE, -- from API "id"
name VARCHAR(255) NOT NULL, -- from API "name"
updates_count INT DEFAULT 0, -- from API "n"
last_updated_at TIMESTAMPTZ, -- from API "ut"
status INT DEFAULT 1, -- optional status (active/inactive)
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMPTZ
);
CREATE TABLE IF NOT EXISTS enetpulse_tournament_templates (
id BIGSERIAL PRIMARY KEY,
template_id VARCHAR(50) NOT NULL UNIQUE, -- from API "id"
name VARCHAR(255) NOT NULL, -- from API "name"
sport_fk VARCHAR(50) NOT NULL REFERENCES enetpulse_sports(sport_id) ON DELETE CASCADE,
gender VARCHAR(20) DEFAULT 'unknown', -- from API "gender" {male, female, mixed, unknown}
updates_count INT DEFAULT 0, -- from API "n"
last_updated_at TIMESTAMPTZ, -- from API "ut"
status INT DEFAULT 1, -- optional status (active/inactive)
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMPTZ
);

View File

@ -57,6 +57,47 @@ wHERE (
company_id = sqlc.narg('company_id')
OR sqlc.narg('company_id') IS NULL
)
AND (
status = sqlc.narg('status')
OR sqlc.narg('status') IS NULL
)
AND (
cashed_out = sqlc.narg('cashed_out')
OR sqlc.narg('cashed_out') IS NULL
)
AND (
full_name ILIKE '%' || sqlc.narg('query') || '%'
OR phone_number ILIKE '%' || sqlc.narg('query') || '%'
OR sqlc.narg('query') IS NULL
)
AND (
created_at > sqlc.narg('created_before')
OR sqlc.narg('created_before') IS NULL
)
AND (
created_at < sqlc.narg('created_after')
OR sqlc.narg('created_after') IS NULL
)
LIMIT sqlc.narg('limit') OFFSET sqlc.narg('offset');
-- name: GetTotalBets :one
SELECT COUNT(*)
FROM bets
wHERE (
user_id = sqlc.narg('user_id')
OR sqlc.narg('user_id') IS NULL
)
AND (
is_shop_bet = sqlc.narg('is_shop_bet')
OR sqlc.narg('is_shop_bet') IS NULL
)
AND (
company_id = sqlc.narg('company_id')
OR sqlc.narg('company_id') IS NULL
)
AND (
status = sqlc.narg('status')
OR sqlc.narg('status') IS NULL
)
AND (
cashed_out = sqlc.narg('cashed_out')
OR sqlc.narg('cashed_out') IS NULL

View File

@ -40,12 +40,19 @@ SELECT *
FROM notifications
ORDER BY timestamp DESC
LIMIT $1 OFFSET $2;
-- name: ListNotifications :many
-- name: GetTotalNotificationCount :one
SELECT COUNT(*)
FROM notifications;
-- name: GetUserNotifications :many
SELECT *
FROM notifications
WHERE recipient_id = $1
ORDER BY timestamp DESC
LIMIT $2 OFFSET $3;
-- name: GetUserNotificationCount :one
SELECT COUNT(*)
FROM notifications
WHERE recipient_id = $1;
-- name: CountUnreadNotifications :one
SELECT count(id)
FROM notifications
@ -69,10 +76,16 @@ LIMIT $1;
SELECT recipient_id
FROM notifications
WHERE reciever = $1;
-- name: GetNotificationCounts :many
SELECT
COUNT(*) as total,
COUNT(CASE WHEN is_read = true THEN 1 END) as read,
COUNT(CASE WHEN is_read = false THEN 1 END) as unread
SELECT COUNT(*) as total,
COUNT(
CASE
WHEN is_read = true THEN 1
END
) as read,
COUNT(
CASE
WHEN is_read = false THEN 1
END
) as unread
FROM notifications;

View File

@ -107,11 +107,14 @@ SELECT id,
suspended_at,
company_id
FROM users
WHERE (company_id = $1)
WHERE (
company_id = sqlc.narg('company_id')
OR sqlc.narg('company_id') IS NULL
)
AND (
first_name ILIKE '%' || $2 || '%'
OR last_name ILIKE '%' || $2 || '%'
OR phone_number LIKE '%' || $2 || '%'
first_name ILIKE '%' || $1 || '%'
OR last_name ILIKE '%' || $1 || '%'
OR phone_number LIKE '%' || $1 || '%'
)
AND (
role = sqlc.narg('role')

View File

@ -19,7 +19,6 @@ services:
volumes:
- postgres_data:/var/lib/postgresql/data
- ./exports:/exports
mongo:
container_name: fortunebet-mongo
image: mongo:7.0.11
@ -38,7 +37,6 @@ services:
interval: 10s
timeout: 5s
retries: 5
migrate:
image: migrate/migrate
volumes:
@ -56,7 +54,6 @@ services:
]
networks:
- app
redis:
image: redis:7-alpine
ports:
@ -69,6 +66,34 @@ services:
timeout: 5s
retries: 5
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
networks:
- app
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:29092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ports:
- "9092:9092"
- "29092:29092"
networks:
- app
app:
build:
context: .
@ -80,6 +105,7 @@ services:
- DB_URL=postgresql://root:secret@postgres:5432/gh?sslmode=disable
- MONGO_URI=mongodb://root:secret@mongo:27017
- REDIS_ADDR=redis:6379
- KAFKA_BROKERS=kafka:9092
depends_on:
migrate:
condition: service_completed_successfully

View File

@ -119,32 +119,40 @@ wHERE (
OR $3 IS NULL
)
AND (
cashed_out = $4
status = $4
OR $4 IS NULL
)
AND (
full_name ILIKE '%' || $5 || '%'
OR phone_number ILIKE '%' || $5 || '%'
cashed_out = $5
OR $5 IS NULL
)
AND (
created_at > $6
full_name ILIKE '%' || $6 || '%'
OR phone_number ILIKE '%' || $6 || '%'
OR $6 IS NULL
)
AND (
created_at < $7
created_at > $7
OR $7 IS NULL
)
AND (
created_at < $8
OR $8 IS NULL
)
LIMIT $10 OFFSET $9
`
type GetAllBetsParams struct {
UserID pgtype.Int8 `json:"user_id"`
IsShopBet pgtype.Bool `json:"is_shop_bet"`
CompanyID pgtype.Int8 `json:"company_id"`
Status pgtype.Int4 `json:"status"`
CashedOut pgtype.Bool `json:"cashed_out"`
Query pgtype.Text `json:"query"`
CreatedBefore pgtype.Timestamp `json:"created_before"`
CreatedAfter pgtype.Timestamp `json:"created_after"`
Offset pgtype.Int4 `json:"offset"`
Limit pgtype.Int4 `json:"limit"`
}
func (q *Queries) GetAllBets(ctx context.Context, arg GetAllBetsParams) ([]BetWithOutcome, error) {
@ -152,10 +160,13 @@ func (q *Queries) GetAllBets(ctx context.Context, arg GetAllBetsParams) ([]BetWi
arg.UserID,
arg.IsShopBet,
arg.CompanyID,
arg.Status,
arg.CashedOut,
arg.Query,
arg.CreatedBefore,
arg.CreatedAfter,
arg.Offset,
arg.Limit,
)
if err != nil {
return nil, err
@ -481,6 +492,71 @@ func (q *Queries) GetBetsForCashback(ctx context.Context) ([]BetWithOutcome, err
return items, nil
}
const GetTotalBets = `-- name: GetTotalBets :one
SELECT COUNT(*)
FROM bets
wHERE (
user_id = $1
OR $1 IS NULL
)
AND (
is_shop_bet = $2
OR $2 IS NULL
)
AND (
company_id = $3
OR $3 IS NULL
)
AND (
status = $4
OR $4 IS NULL
)
AND (
cashed_out = $5
OR $5 IS NULL
)
AND (
full_name ILIKE '%' || $6 || '%'
OR phone_number ILIKE '%' || $6 || '%'
OR $6 IS NULL
)
AND (
created_at > $7
OR $7 IS NULL
)
AND (
created_at < $8
OR $8 IS NULL
)
`
type GetTotalBetsParams struct {
UserID pgtype.Int8 `json:"user_id"`
IsShopBet pgtype.Bool `json:"is_shop_bet"`
CompanyID pgtype.Int8 `json:"company_id"`
Status pgtype.Int4 `json:"status"`
CashedOut pgtype.Bool `json:"cashed_out"`
Query pgtype.Text `json:"query"`
CreatedBefore pgtype.Timestamp `json:"created_before"`
CreatedAfter pgtype.Timestamp `json:"created_after"`
}
func (q *Queries) GetTotalBets(ctx context.Context, arg GetTotalBetsParams) (int64, error) {
row := q.db.QueryRow(ctx, GetTotalBets,
arg.UserID,
arg.IsShopBet,
arg.CompanyID,
arg.Status,
arg.CashedOut,
arg.Query,
arg.CreatedBefore,
arg.CreatedAfter,
)
var count int64
err := row.Scan(&count)
return count, err
}
const UpdateBetOutcomeStatus = `-- name: UpdateBetOutcomeStatus :one
UPDATE bet_outcomes
SET status = $1

View File

@ -8,6 +8,11 @@ import (
"github.com/jackc/pgx/v5/pgtype"
)
type Accumulator struct {
OutcomeCount int64 `json:"outcome_count"`
DefaultMultiplier float32 `json:"default_multiplier"`
}
type Bank struct {
ID int64 `json:"id"`
Slug string `json:"slug"`
@ -159,6 +164,13 @@ type Company struct {
UpdatedAt pgtype.Timestamp `json:"updated_at"`
}
type CompanyAccumulator struct {
ID int32 `json:"id"`
CompanyID int64 `json:"company_id"`
OutcomeCount int64 `json:"outcome_count"`
Multiplier float32 `json:"multiplier"`
}
type CompanyEventSetting struct {
ID int64 `json:"id"`
CompanyID int64 `json:"company_id"`

View File

@ -188,10 +188,17 @@ func (q *Queries) GetNotification(ctx context.Context, id string) (Notification,
}
const GetNotificationCounts = `-- name: GetNotificationCounts :many
SELECT
COUNT(*) as total,
COUNT(CASE WHEN is_read = true THEN 1 END) as read,
COUNT(CASE WHEN is_read = false THEN 1 END) as unread
SELECT COUNT(*) as total,
COUNT(
CASE
WHEN is_read = true THEN 1
END
) as read,
COUNT(
CASE
WHEN is_read = false THEN 1
END
) as unread
FROM notifications
`
@ -221,17 +228,47 @@ func (q *Queries) GetNotificationCounts(ctx context.Context) ([]GetNotificationC
return items, nil
}
const ListFailedNotifications = `-- name: ListFailedNotifications :many
SELECT id, recipient_id, type, level, error_severity, reciever, is_read, delivery_status, delivery_channel, payload, priority, version, timestamp, metadata
const GetTotalNotificationCount = `-- name: GetTotalNotificationCount :one
SELECT COUNT(*)
FROM notifications
WHERE delivery_status = 'failed'
AND timestamp < NOW() - INTERVAL '1 hour'
ORDER BY timestamp ASC
LIMIT $1
`
func (q *Queries) ListFailedNotifications(ctx context.Context, limit int32) ([]Notification, error) {
rows, err := q.db.Query(ctx, ListFailedNotifications, limit)
func (q *Queries) GetTotalNotificationCount(ctx context.Context) (int64, error) {
row := q.db.QueryRow(ctx, GetTotalNotificationCount)
var count int64
err := row.Scan(&count)
return count, err
}
const GetUserNotificationCount = `-- name: GetUserNotificationCount :one
SELECT COUNT(*)
FROM notifications
WHERE recipient_id = $1
`
func (q *Queries) GetUserNotificationCount(ctx context.Context, recipientID int64) (int64, error) {
row := q.db.QueryRow(ctx, GetUserNotificationCount, recipientID)
var count int64
err := row.Scan(&count)
return count, err
}
const GetUserNotifications = `-- name: GetUserNotifications :many
SELECT id, recipient_id, type, level, error_severity, reciever, is_read, delivery_status, delivery_channel, payload, priority, version, timestamp, metadata
FROM notifications
WHERE recipient_id = $1
ORDER BY timestamp DESC
LIMIT $2 OFFSET $3
`
type GetUserNotificationsParams struct {
RecipientID int64 `json:"recipient_id"`
Limit int32 `json:"limit"`
Offset int32 `json:"offset"`
}
func (q *Queries) GetUserNotifications(ctx context.Context, arg GetUserNotificationsParams) ([]Notification, error) {
rows, err := q.db.Query(ctx, GetUserNotifications, arg.RecipientID, arg.Limit, arg.Offset)
if err != nil {
return nil, err
}
@ -265,22 +302,17 @@ func (q *Queries) ListFailedNotifications(ctx context.Context, limit int32) ([]N
return items, nil
}
const ListNotifications = `-- name: ListNotifications :many
const ListFailedNotifications = `-- name: ListFailedNotifications :many
SELECT id, recipient_id, type, level, error_severity, reciever, is_read, delivery_status, delivery_channel, payload, priority, version, timestamp, metadata
FROM notifications
WHERE recipient_id = $1
ORDER BY timestamp DESC
LIMIT $2 OFFSET $3
WHERE delivery_status = 'failed'
AND timestamp < NOW() - INTERVAL '1 hour'
ORDER BY timestamp ASC
LIMIT $1
`
type ListNotificationsParams struct {
RecipientID int64 `json:"recipient_id"`
Limit int32 `json:"limit"`
Offset int32 `json:"offset"`
}
func (q *Queries) ListNotifications(ctx context.Context, arg ListNotificationsParams) ([]Notification, error) {
rows, err := q.db.Query(ctx, ListNotifications, arg.RecipientID, arg.Limit, arg.Offset)
func (q *Queries) ListFailedNotifications(ctx context.Context, limit int32) ([]Notification, error) {
rows, err := q.db.Query(ctx, ListFailedNotifications, limit)
if err != nil {
return nil, err
}

View File

@ -489,11 +489,14 @@ SELECT id,
suspended_at,
company_id
FROM users
WHERE (company_id = $1)
WHERE (
company_id = $2
OR $2 IS NULL
)
AND (
first_name ILIKE '%' || $2 || '%'
OR last_name ILIKE '%' || $2 || '%'
OR phone_number LIKE '%' || $2 || '%'
first_name ILIKE '%' || $1 || '%'
OR last_name ILIKE '%' || $1 || '%'
OR phone_number LIKE '%' || $1 || '%'
)
AND (
role = $3
@ -502,8 +505,8 @@ WHERE (company_id = $1)
`
type SearchUserByNameOrPhoneParams struct {
Column1 pgtype.Text `json:"column_1"`
CompanyID pgtype.Int8 `json:"company_id"`
Column2 pgtype.Text `json:"column_2"`
Role pgtype.Text `json:"role"`
}
@ -524,7 +527,7 @@ type SearchUserByNameOrPhoneRow struct {
}
func (q *Queries) SearchUserByNameOrPhone(ctx context.Context, arg SearchUserByNameOrPhoneParams) ([]SearchUserByNameOrPhoneRow, error) {
rows, err := q.db.Query(ctx, SearchUserByNameOrPhone, arg.CompanyID, arg.Column2, arg.Role)
rows, err := q.db.Query(ctx, SearchUserByNameOrPhone, arg.Column1, arg.CompanyID, arg.Role)
if err != nil {
return nil, err
}

View File

@ -6,6 +6,7 @@ import (
"log/slog"
"os"
"strconv"
"strings"
"time"
"github.com/SamuelTariku/FortuneBet-Backend/internal/domain"
@ -14,23 +15,29 @@ import (
)
var (
ErrInvalidDbUrl = errors.New("db url is invalid")
ErrInvalidPort = errors.New("port number is invalid")
ErrRefreshExpiry = errors.New("refresh token expiry is invalid")
ErrAccessExpiry = errors.New("access token expiry is invalid")
ErrInvalidJwtKey = errors.New("jwt key is invalid")
ErrLogLevel = errors.New("log level not set")
ErrInvalidLevel = errors.New("invalid log level")
ErrInvalidEnv = errors.New("env not set or invalid")
ErrInvalidSMSAPIKey = errors.New("SMS API key is invalid")
ErrMissingBetToken = errors.New("missing BET365_TOKEN in .env")
ErrInvalidPopOKClientID = errors.New("PopOK client ID is invalid")
ErrInvalidPopOKSecretKey = errors.New("PopOK secret key is invalid")
ErrInvalidPopOKBaseURL = errors.New("PopOK base URL is invalid")
ErrInvalidPopOKCallbackURL = errors.New("PopOK callback URL is invalid")
ErrInvalidVeliAPIURL = errors.New("Veli API URL is invalid")
ErrInvalidVeliOperatorKey = errors.New("Veli operator key is invalid")
ErrInvalidVeliSecretKey = errors.New("Veli secret key is invalid")
ErrInvalidDbUrl = errors.New("db url is invalid")
ErrInvalidPort = errors.New("port number is invalid")
ErrRefreshExpiry = errors.New("refresh token expiry is invalid")
ErrAccessExpiry = errors.New("access token expiry is invalid")
ErrInvalidJwtKey = errors.New("jwt key is invalid")
ErrLogLevel = errors.New("log level not set")
ErrInvalidLevel = errors.New("invalid log level")
ErrInvalidEnv = errors.New("env not set or invalid")
ErrInvalidSMSAPIKey = errors.New("SMS API key is invalid")
ErrMissingBetToken = errors.New("missing BET365_TOKEN in .env")
ErrInvalidPopOKClientID = errors.New("PopOK client ID is invalid")
ErrInvalidPopOKSecretKey = errors.New("PopOK secret key is invalid")
ErrInvalidPopOKBaseURL = errors.New("PopOK base URL is invalid")
ErrInvalidPopOKCallbackURL = errors.New("PopOK callback URL is invalid")
ErrInvalidVeliAPIURL = errors.New("Veli API URL is invalid")
ErrInvalidVeliOperatorKey = errors.New("Veli operator key is invalid")
ErrInvalidVeliSecretKey = errors.New("Veli secret key is invalid")
ErrInvalidAtlasBaseUrl = errors.New("Atlas Base URL is invalid")
ErrInvalidAtlasOperatorID = errors.New("Atlas operator ID is invalid")
ErrInvalidAtlasSecretKey = errors.New("Atlas secret key is invalid")
ErrInvalidAtlasBrandID = errors.New("Atlas brand ID is invalid")
ErrInvalidAtlasPartnerID = errors.New("Atlas Partner ID is invalid")
ErrMissingResendApiKey = errors.New("missing Resend Api key")
ErrMissingResendSenderEmail = errors.New("missing Resend sender name")
ErrMissingTwilioAccountSid = errors.New("missing twilio account sid")
@ -151,6 +158,7 @@ type Config struct {
TwilioAuthToken string
TwilioSenderPhoneNumber string
RedisAddr string
KafkaBrokers []string
}
func NewConfig() (*Config, error) {
@ -176,6 +184,7 @@ func (c *Config) loadEnv() error {
c.ReportExportPath = os.Getenv("REPORT_EXPORT_PATH")
c.RedisAddr = os.Getenv("REDIS_ADDR")
c.KafkaBrokers = strings.Split(os.Getenv("KAFKA_BROKERS"), ",")
c.EnetPulseConfig.Token = os.Getenv("ENETPULSE_TOKEN")
c.EnetPulseConfig.UserName = os.Getenv("ENETPULSE_USERNAME")
@ -417,6 +426,36 @@ func (c *Config) loadEnv() error {
CallbackURL: popOKCallbackURL,
Platform: popOKPlatform,
}
AtlasBaseUrl := os.Getenv("ATLAS_BASE_URL")
if AtlasBaseUrl == "" {
return ErrInvalidAtlasBaseUrl
}
AtlasSecretKey := os.Getenv("ATLAS_SECRET_KEY")
if AtlasSecretKey == "" {
return ErrInvalidAtlasSecretKey
}
AtlasBrandID := os.Getenv("ATLAS_BRAND_ID")
if AtlasBrandID == "" {
return ErrInvalidAtlasBrandID
}
AtlasPartnerID := os.Getenv("ATLAS_PARTNER_ID")
if AtlasPartnerID == "" {
return ErrInvalidAtlasPartnerID
}
AtlasOperatorID := os.Getenv("ATLAS_OPERATOR_ID")
if AtlasOperatorID == "" {
return ErrInvalidAtlasOperatorID
}
c.Atlas = AtlasConfig{
BaseURL: AtlasBaseUrl,
SecretKey: AtlasSecretKey,
CasinoID: AtlasBrandID,
PartnerID: AtlasPartnerID,
OperatorID: AtlasOperatorID,
}
betToken := os.Getenv("BET365_TOKEN")
if betToken == "" {
return ErrMissingBetToken

View File

@ -60,6 +60,7 @@ type Bet struct {
}
type BetFilter struct {
Status ValidOutcomeStatus
UserID ValidInt64
CompanyID ValidInt64
CashedOut ValidBool
@ -67,6 +68,8 @@ type BetFilter struct {
Query ValidString
CreatedBefore ValidTime
CreatedAfter ValidTime
Limit ValidInt32
Offset ValidInt32
}
type Flag struct {

View File

@ -14,6 +14,7 @@ type NotificationDeliveryStatus string
type DeliveryChannel string
const (
NotificationTypeWalletUpdated NotificationType = "wallet_updated"
NotificationTypeDepositResult NotificationType = "deposit_result"
NotificationTypeDepositVerification NotificationType = "deposit_verification"
NotificationTypeCashOutSuccess NotificationType = "cash_out_success"

View File

@ -282,10 +282,6 @@ type BetAnalysis struct {
AverageOdds float64 `json:"average_odds"`
}
type ValidOutcomeStatus struct {
Value OutcomeStatus
Valid bool // Valid is true if Value is not NULL
}
// ReportFilter contains filters for report generation
type ReportFilter struct {

View File

@ -3,7 +3,7 @@ package domain
import (
"time"
dbgen "github.com/SamuelTariku/FortuneBet-Backend/gen/db"
"github.com/jackc/pgx/v5/pgtype"
)
type MarketConfig struct {
@ -67,6 +67,20 @@ func (o *OutcomeStatus) String() string {
}
}
type ValidOutcomeStatus struct {
Value OutcomeStatus
Valid bool
}
func (v ValidOutcomeStatus) ToPG() pgtype.Int4 {
return pgtype.Int4{
Int32: int32(v.Value),
Valid: v.Valid,
}
}
type TimeStatus int32
const (
@ -84,79 +98,3 @@ const (
TIME_STATUS_DECIDED_BY_FA TimeStatus = 11
TIME_STATUS_REMOVED TimeStatus = 99
)
type ResultLog struct {
ID int64 `json:"id"`
StatusNotFinishedCount int `json:"status_not_finished_count"`
StatusNotFinishedBets int `json:"status_not_finished_bets"`
StatusToBeFixedCount int `json:"status_to_be_fixed_count"`
StatusToBeFixedBets int `json:"status_to_be_fixed_bets"`
StatusPostponedCount int `json:"status_postponed_count"`
StatusPostponedBets int `json:"status_postponed_bets"`
StatusEndedCount int `json:"status_ended_count"`
StatusEndedBets int `json:"status_ended_bets"`
StatusRemovedCount int `json:"status_removed_count"`
StatusRemovedBets int `json:"status_removed_bets"`
RemovedCount int `json:"removed"`
CreatedAt time.Time `json:"created_at"`
}
type CreateResultLog struct {
StatusNotFinishedCount int `json:"status_not_finished_count"`
StatusNotFinishedBets int `json:"status_not_finished_bets"`
StatusToBeFixedCount int `json:"status_to_be_fixed_count"`
StatusToBeFixedBets int `json:"status_to_be_fixed_bets"`
StatusPostponedCount int `json:"status_postponed_count"`
StatusPostponedBets int `json:"status_postponed_bets"`
StatusEndedCount int `json:"status_ended_count"`
StatusEndedBets int `json:"status_ended_bets"`
StatusRemovedCount int `json:"status_removed_count"`
StatusRemovedBets int `json:"status_removed_bets"`
RemovedCount int `json:"removed"`
}
type ResultFilter struct {
CreatedBefore ValidTime
CreatedAfter ValidTime
}
type ResultStatusBets struct {
StatusNotFinished []int64 `json:"status_not_finished"`
StatusToBeFixed []int64 `json:"status_to_be_fixed"`
StatusPostponed []int64 `json:"status_postponed"`
StatusEnded []int64 `json:"status_ended"`
StatusRemoved []int64 `json:"status_removed"`
}
func ConvertDBResultLog(result dbgen.ResultLog) ResultLog {
return ResultLog{
ID: result.ID,
StatusNotFinishedCount: int(result.StatusNotFinishedCount),
StatusNotFinishedBets: int(result.StatusNotFinishedBets),
StatusToBeFixedCount: int(result.StatusToBeFixedCount),
StatusToBeFixedBets: int(result.StatusToBeFixedBets),
StatusPostponedCount: int(result.StatusPostponedCount),
StatusPostponedBets: int(result.StatusPostponedBets),
StatusEndedCount: int(result.StatusEndedCount),
StatusEndedBets: int(result.StatusEndedBets),
StatusRemovedCount: int(result.StatusRemovedCount),
StatusRemovedBets: int(result.StatusRemovedBets),
RemovedCount: int(result.RemovedCount),
CreatedAt: result.CreatedAt.Time,
}
}
func ConvertCreateResultLog(result CreateResultLog) dbgen.CreateResultLogParams {
return dbgen.CreateResultLogParams{
StatusNotFinishedCount: int32(result.StatusNotFinishedCount),
StatusNotFinishedBets: int32(result.StatusNotFinishedBets),
StatusToBeFixedCount: int32(result.StatusToBeFixedCount),
StatusToBeFixedBets: int32(result.StatusToBeFixedBets),
StatusPostponedCount: int32(result.StatusPostponedCount),
StatusPostponedBets: int32(result.StatusPostponedBets),
StatusEndedCount: int32(result.StatusEndedCount),
StatusEndedBets: int32(result.StatusEndedBets),
StatusRemovedCount: int32(result.StatusRemovedCount),
StatusRemovedBets: int32(result.StatusRemovedBets),
RemovedCount: int32(result.RemovedCount),
}
}

View File

@ -0,0 +1,84 @@
package domain
import (
"time"
dbgen "github.com/SamuelTariku/FortuneBet-Backend/gen/db"
)
type ResultLog struct {
ID int64 `json:"id"`
StatusNotFinishedCount int `json:"status_not_finished_count"`
StatusNotFinishedBets int `json:"status_not_finished_bets"`
StatusToBeFixedCount int `json:"status_to_be_fixed_count"`
StatusToBeFixedBets int `json:"status_to_be_fixed_bets"`
StatusPostponedCount int `json:"status_postponed_count"`
StatusPostponedBets int `json:"status_postponed_bets"`
StatusEndedCount int `json:"status_ended_count"`
StatusEndedBets int `json:"status_ended_bets"`
StatusRemovedCount int `json:"status_removed_count"`
StatusRemovedBets int `json:"status_removed_bets"`
RemovedCount int `json:"removed"`
CreatedAt time.Time `json:"created_at"`
}
type CreateResultLog struct {
StatusNotFinishedCount int `json:"status_not_finished_count"`
StatusNotFinishedBets int `json:"status_not_finished_bets"`
StatusToBeFixedCount int `json:"status_to_be_fixed_count"`
StatusToBeFixedBets int `json:"status_to_be_fixed_bets"`
StatusPostponedCount int `json:"status_postponed_count"`
StatusPostponedBets int `json:"status_postponed_bets"`
StatusEndedCount int `json:"status_ended_count"`
StatusEndedBets int `json:"status_ended_bets"`
StatusRemovedCount int `json:"status_removed_count"`
StatusRemovedBets int `json:"status_removed_bets"`
RemovedCount int `json:"removed"`
}
type ResultLogFilter struct {
CreatedBefore ValidTime
CreatedAfter ValidTime
}
type ResultStatusBets struct {
StatusNotFinished []int64 `json:"status_not_finished"`
StatusToBeFixed []int64 `json:"status_to_be_fixed"`
StatusPostponed []int64 `json:"status_postponed"`
StatusEnded []int64 `json:"status_ended"`
StatusRemoved []int64 `json:"status_removed"`
}
func ConvertDBResultLog(result dbgen.ResultLog) ResultLog {
return ResultLog{
ID: result.ID,
StatusNotFinishedCount: int(result.StatusNotFinishedCount),
StatusNotFinishedBets: int(result.StatusNotFinishedBets),
StatusToBeFixedCount: int(result.StatusToBeFixedCount),
StatusToBeFixedBets: int(result.StatusToBeFixedBets),
StatusPostponedCount: int(result.StatusPostponedCount),
StatusPostponedBets: int(result.StatusPostponedBets),
StatusEndedCount: int(result.StatusEndedCount),
StatusEndedBets: int(result.StatusEndedBets),
StatusRemovedCount: int(result.StatusRemovedCount),
StatusRemovedBets: int(result.StatusRemovedBets),
RemovedCount: int(result.RemovedCount),
CreatedAt: result.CreatedAt.Time,
}
}
func ConvertCreateResultLog(result CreateResultLog) dbgen.CreateResultLogParams {
return dbgen.CreateResultLogParams{
StatusNotFinishedCount: int32(result.StatusNotFinishedCount),
StatusNotFinishedBets: int32(result.StatusNotFinishedBets),
StatusToBeFixedCount: int32(result.StatusToBeFixedCount),
StatusToBeFixedBets: int32(result.StatusToBeFixedBets),
StatusPostponedCount: int32(result.StatusPostponedCount),
StatusPostponedBets: int32(result.StatusPostponedBets),
StatusEndedCount: int32(result.StatusEndedCount),
StatusEndedBets: int32(result.StatusEndedBets),
StatusRemovedCount: int32(result.StatusRemovedCount),
StatusRemovedBets: int32(result.StatusRemovedBets),
RemovedCount: int32(result.RemovedCount),
}
}

View File

@ -18,9 +18,11 @@ var (
type SettingList struct {
SMSProvider SMSProvider `json:"sms_provider"`
MaxNumberOfOutcomes int64 `json:"max_number_of_outcomes"`
MaxUnsettledBets int64 `json:"max_unsettled_bets"`
BetAmountLimit Currency `json:"bet_amount_limit"`
DailyTicketPerIP int64 `json:"daily_ticket_limit"`
TotalWinningLimit Currency `json:"total_winning_limit"`
TotalWinningNotify Currency `json:"total_winning_notify"`
AmountForBetReferral Currency `json:"amount_for_bet_referral"`
CashbackAmountCap Currency `json:"cashback_amount_cap"`
DefaultWinningLimit int64 `json:"default_winning_limit"`
@ -41,9 +43,11 @@ type SettingList struct {
type SettingListRes struct {
SMSProvider SMSProvider `json:"sms_provider"`
MaxNumberOfOutcomes int64 `json:"max_number_of_outcomes"`
MaxUnsettledBets int64 `json:"max_unsettled_bets"`
BetAmountLimit float32 `json:"bet_amount_limit"`
DailyTicketPerIP int64 `json:"daily_ticket_limit"`
TotalWinningLimit float32 `json:"total_winning_limit"`
TotalWinningNotify float32 `json:"total_winning_notify"`
AmountForBetReferral float32 `json:"amount_for_bet_referral"`
CashbackAmountCap float32 `json:"cashback_amount_cap"`
DefaultWinningLimit int64 `json:"default_winning_limit"`
@ -65,9 +69,11 @@ func ConvertSettingListRes(settings SettingList) SettingListRes {
return SettingListRes{
SMSProvider: settings.SMSProvider,
MaxNumberOfOutcomes: settings.MaxNumberOfOutcomes,
MaxUnsettledBets: settings.MaxUnsettledBets,
BetAmountLimit: settings.BetAmountLimit.Float32(),
DailyTicketPerIP: settings.DailyTicketPerIP,
TotalWinningLimit: settings.TotalWinningLimit.Float32(),
TotalWinningNotify: settings.TotalWinningNotify.Float32(),
AmountForBetReferral: settings.AmountForBetReferral.Float32(),
CashbackAmountCap: settings.CashbackAmountCap.Float32(),
DefaultWinningLimit: settings.DefaultWinningLimit,
@ -89,32 +95,36 @@ func ConvertSettingListRes(settings SettingList) SettingListRes {
type SaveSettingListReq struct {
SMSProvider *string `json:"sms_provider,omitempty"`
MaxNumberOfOutcomes *int64 `json:"max_number_of_outcomes,omitempty"`
MaxUnsettledBets *int64 `json:"max_unsettled_bets,omitempty"`
BetAmountLimit *float32 `json:"bet_amount_limit,omitempty"`
DailyTicketPerIP *int64 `json:"daily_ticket_limit,omitempty"`
TotalWinningLimit *float32 `json:"total_winning_limit,omitempty"`
TotalWinningNotify *float32 `json:"total_winning_notify,omitempty"`
AmountForBetReferral *float32 `json:"amount_for_bet_referral,omitempty"`
CashbackAmountCap *float32 `json:"cashback_amount_cap,omitempty"`
DefaultWinningLimit *int64 `json:"default_winning_limit,omitempty"`
ReferralRewardAmount *float32 `json:"referral_reward_amount"`
CashbackPercentage *float32 `json:"cashback_percentage"`
DefaultMaxReferrals *int64 `json:"default_max_referrals"`
MinimumBetAmount *float32 `json:"minimum_bet_amount"`
BetDuplicateLimit *int64 `json:"bet_duplicate_limit"`
SendEmailOnBetFinish *bool `json:"send_email_on_bet_finish"`
SendSMSOnBetFinish *bool `json:"send_sms_on_bet_finish"`
WelcomeBonusActive *bool `json:"welcome_bonus_active"`
WelcomeBonusMultiplier *float32 `json:"welcome_bonus_multiplier"`
WelcomeBonusCap *float32 `json:"welcome_bonus_cap"`
WelcomeBonusCount *int64 `json:"welcome_bonus_count"`
WelcomeBonusExpire *int64 `json:"welcome_bonus_expiry"`
ReferralRewardAmount *float32 `json:"referral_reward_amount,omitempty"`
CashbackPercentage *float32 `json:"cashback_percentage,omitempty"`
DefaultMaxReferrals *int64 `json:"default_max_referrals,omitempty"`
MinimumBetAmount *float32 `json:"minimum_bet_amount,omitempty"`
BetDuplicateLimit *int64 `json:"bet_duplicate_limit,omitempty"`
SendEmailOnBetFinish *bool `json:"send_email_on_bet_finish,omitempty"`
SendSMSOnBetFinish *bool `json:"send_sms_on_bet_finish,omitempty"`
WelcomeBonusActive *bool `json:"welcome_bonus_active,omitempty"`
WelcomeBonusMultiplier *float32 `json:"welcome_bonus_multiplier,omitempty"`
WelcomeBonusCap *float32 `json:"welcome_bonus_cap,omitempty"`
WelcomeBonusCount *int64 `json:"welcome_bonus_count,omitempty"`
WelcomeBonusExpire *int64 `json:"welcome_bonus_expiry,omitempty"`
}
type ValidSettingList struct {
SMSProvider ValidString
MaxNumberOfOutcomes ValidInt64
MaxUnsettledBets ValidInt64
BetAmountLimit ValidCurrency
DailyTicketPerIP ValidInt64
TotalWinningLimit ValidCurrency
TotalWinningNotify ValidCurrency
AmountForBetReferral ValidCurrency
CashbackAmountCap ValidCurrency
DefaultWinningLimit ValidInt64
@ -136,9 +146,11 @@ func ConvertSaveSettingListReq(settings SaveSettingListReq) ValidSettingList {
return ValidSettingList{
SMSProvider: ConvertStringPtr(settings.SMSProvider),
MaxNumberOfOutcomes: ConvertInt64Ptr(settings.MaxNumberOfOutcomes),
MaxUnsettledBets: ConvertInt64Ptr(settings.MaxUnsettledBets),
BetAmountLimit: ConvertFloat32PtrToCurrency(settings.BetAmountLimit),
DailyTicketPerIP: ConvertInt64Ptr(settings.DailyTicketPerIP),
TotalWinningLimit: ConvertFloat32PtrToCurrency(settings.TotalWinningLimit),
TotalWinningNotify: ConvertFloat32PtrToCurrency(settings.TotalWinningNotify),
AmountForBetReferral: ConvertFloat32PtrToCurrency(settings.AmountForBetReferral),
CashbackAmountCap: ConvertFloat32PtrToCurrency(settings.CashbackAmountCap),
DefaultWinningLimit: ConvertInt64Ptr(settings.DefaultWinningLimit),
@ -162,9 +174,11 @@ func (vsl *ValidSettingList) ToSettingList() SettingList {
return SettingList{
SMSProvider: SMSProvider(vsl.SMSProvider.Value),
MaxNumberOfOutcomes: vsl.MaxNumberOfOutcomes.Value,
MaxUnsettledBets: vsl.MaxUnsettledBets.Value,
BetAmountLimit: vsl.BetAmountLimit.Value,
DailyTicketPerIP: vsl.DailyTicketPerIP.Value,
TotalWinningLimit: vsl.TotalWinningLimit.Value,
TotalWinningNotify: vsl.TotalWinningNotify.Value,
AmountForBetReferral: vsl.AmountForBetReferral.Value,
CashbackAmountCap: vsl.CashbackAmountCap.Value,
DefaultWinningLimit: vsl.DefaultWinningLimit.Value,
@ -194,6 +208,7 @@ func (vsl *ValidSettingList) CustomValidationSettings() error {
func (vsl *ValidSettingList) GetInt64SettingsMap() map[string]*ValidInt64 {
return map[string]*ValidInt64{
"max_number_of_outcomes": &vsl.MaxNumberOfOutcomes,
"max_unsettled_bets": &vsl.MaxUnsettledBets,
"daily_ticket_limit": &vsl.DailyTicketPerIP,
"default_winning_limit": &vsl.DefaultWinningLimit,
"default_max_referrals": &vsl.DefaultMaxReferrals,
@ -207,6 +222,7 @@ func (vsl *ValidSettingList) GetCurrencySettingsMap() map[string]*ValidCurrency
return map[string]*ValidCurrency{
"bet_amount_limit": &vsl.BetAmountLimit,
"total_winnings_limit": &vsl.TotalWinningLimit,
"total_winnings_notify": &vsl.TotalWinningNotify,
"amount_for_bet_referral": &vsl.AmountForBetReferral,
"cashback_amount_cap": &vsl.CashbackAmountCap,
"referral_reward_amount": &vsl.ReferralRewardAmount,

View File

@ -10,9 +10,10 @@ const (
)
type WalletEvent struct {
EventType WalletEventType `json:"event_type"`
WalletID int64 `json:"wallet_id"`
UserID int64 `json:"user_id"`
Balance domain.Currency `json:"balance"`
Trigger string `json:"trigger"` // e.g. "AddToWallet", "DeductFromWallet"
EventType WalletEventType `json:"event_type"`
WalletID int64 `json:"wallet_id"`
UserID int64 `json:"user_id"`
Balance domain.Currency `json:"balance"`
WalletType domain.WalletType `json:"wallet_type"`
Trigger string `json:"trigger"` // e.g. "AddToWallet", "DeductFromWallet"
}

View File

@ -93,30 +93,44 @@ func (s *Store) GetBetByID(ctx context.Context, id int64) (domain.GetBet, error)
return domain.ConvertDBBetWithOutcomes(bet), nil
}
func (s *Store) GetAllBets(ctx context.Context, filter domain.BetFilter) ([]domain.GetBet, error) {
func (s *Store) GetAllBets(ctx context.Context, filter domain.BetFilter) ([]domain.GetBet, int64, error) {
bets, err := s.queries.GetAllBets(ctx, dbgen.GetAllBetsParams{
UserID: filter.UserID.ToPG(),
CompanyID: filter.CompanyID.ToPG(),
Status: filter.Status.ToPG(),
CashedOut: filter.CashedOut.ToPG(),
IsShopBet: filter.IsShopBet.ToPG(),
Query: filter.Query.ToPG(),
CreatedBefore: filter.CreatedBefore.ToPG(),
CreatedAfter: filter.CreatedAfter.ToPG(),
Offset: filter.Offset.ToPG(),
Limit: filter.Limit.ToPG(),
})
if err != nil {
domain.MongoDBLogger.Error("failed to get all bets",
zap.Any("filter", filter),
zap.Error(err),
)
return nil, err
return nil, 0, err
}
total, err := s.queries.GetTotalBets(ctx, dbgen.GetTotalBetsParams{
UserID: filter.UserID.ToPG(),
CompanyID: filter.CompanyID.ToPG(),
Status: filter.Status.ToPG(),
CashedOut: filter.CashedOut.ToPG(),
IsShopBet: filter.IsShopBet.ToPG(),
Query: filter.Query.ToPG(),
CreatedBefore: filter.CreatedBefore.ToPG(),
CreatedAfter: filter.CreatedAfter.ToPG(),
});
var result []domain.GetBet = make([]domain.GetBet, 0, len(bets))
for _, bet := range bets {
result = append(result, domain.ConvertDBBetWithOutcomes(bet))
}
return result, nil
return result, total, nil
}
func (s *Store) GetBetByUserID(ctx context.Context, UserID int64) ([]domain.GetBet, error) {

View File

@ -4,7 +4,6 @@ import (
"context"
"fmt"
"math"
"strconv"
dbgen "github.com/SamuelTariku/FortuneBet-Backend/gen/db"
"github.com/SamuelTariku/FortuneBet-Backend/internal/domain"
@ -294,12 +293,8 @@ func (s *Store) DeleteEvent(ctx context.Context, eventID int64) error {
return nil
}
func (s *Store) GetSportAndLeagueIDs(ctx context.Context, eventID string) ([]int64, error) {
id, err := strconv.ParseInt(eventID, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid eventID: %v", err)
}
sportAndLeagueIDs, err := s.queries.GetSportAndLeagueIDs(ctx, id)
func (s *Store) GetSportAndLeagueIDs(ctx context.Context, eventID int64) ([]int64, error) {
sportAndLeagueIDs, err := s.queries.GetSportAndLeagueIDs(ctx, eventID)
if err != nil {
return nil, err
}

View File

@ -14,7 +14,7 @@ import (
type NotificationRepository interface {
CreateNotification(ctx context.Context, notification *domain.Notification) (*domain.Notification, error)
UpdateNotificationStatus(ctx context.Context, id, status string, isRead bool, metadata []byte) (*domain.Notification, error)
ListNotifications(ctx context.Context, recipientID int64, limit, offset int) ([]domain.Notification, error)
GetUserNotifications(ctx context.Context, recipientID int64, limit, offset int) ([]domain.Notification, int64, error)
ListFailedNotifications(ctx context.Context, limit int) ([]domain.Notification, error)
ListRecipientIDs(ctx context.Context, receiver domain.NotificationRecieverSide) ([]int64, error)
CountUnreadNotifications(ctx context.Context, recipient_id int64) (int64, error)
@ -96,16 +96,22 @@ func (r *Repository) UpdateNotificationStatus(ctx context.Context, id, status st
return r.mapDBToDomain(&dbNotification), nil
}
func (r *Repository) ListNotifications(ctx context.Context, recipientID int64, limit, offset int) ([]domain.Notification, error) {
params := dbgen.ListNotificationsParams{
func (r *Repository) GetUserNotifications(ctx context.Context, recipientID int64, limit, offset int) ([]domain.Notification, int64, error) {
params := dbgen.GetUserNotificationsParams{
RecipientID: recipientID,
Limit: int32(limit),
Offset: int32(offset),
}
dbNotifications, err := r.store.queries.ListNotifications(ctx, params)
dbNotifications, err := r.store.queries.GetUserNotifications(ctx, params)
if err != nil {
return nil, err
return nil, 0, err
}
total, err := r.store.queries.GetUserNotificationCount(ctx, recipientID)
if err != nil {
return nil, 0, err
}
var result []domain.Notification = make([]domain.Notification, 0, len(dbNotifications))
@ -114,7 +120,7 @@ func (r *Repository) ListNotifications(ctx context.Context, recipientID int64, l
result = append(result, *domainNotif)
}
return result, nil
return result, total, nil
}
func (r *Repository) GetAllNotifications(ctx context.Context, limit, offset int) ([]domain.Notification, error) {

View File

@ -8,9 +8,6 @@ import (
"github.com/jackc/pgx/v5/pgtype"
)
func (s *Store) CreateResultLog(ctx context.Context, result domain.CreateResultLog) (domain.ResultLog, error) {
dbResult, err := s.queries.CreateResultLog(ctx, domain.ConvertCreateResultLog(result))
if err != nil {
@ -19,7 +16,7 @@ func (s *Store) CreateResultLog(ctx context.Context, result domain.CreateResultL
return domain.ConvertDBResultLog(dbResult), nil
}
func (s *Store) GetAllResultLog(ctx context.Context, filter domain.ResultFilter) ([]domain.ResultLog, error) {
func (s *Store) GetAllResultLog(ctx context.Context, filter domain.ResultLogFilter) ([]domain.ResultLog, error) {
dbResultLogs, err := s.queries.GetAllResultLog(ctx, dbgen.GetAllResultLogParams{
CreatedBefore: pgtype.Timestamp{
Time: filter.CreatedBefore.Value,

View File

@ -260,7 +260,7 @@ func (s *Store) SearchUserByNameOrPhone(ctx context.Context, searchString string
query := dbgen.SearchUserByNameOrPhoneParams{
CompanyID: companyID.ToPG(),
Column2: pgtype.Text{
Column1: pgtype.Text{
String: searchString,
Valid: true,
},

View File

@ -245,19 +245,10 @@ func (s *Service) SendAdminErrorNotification(ctx context.Context, betID int64, s
return nil
}
func (s *Service) SendAdminLargeNotification(ctx context.Context, betID int64, status domain.OutcomeStatus, extra string, companyID int64) error {
func (s *Service) SendAdminLargeBetNotification(ctx context.Context, betID int64, totalWinnings float32, extra string, companyID int64) error {
var headline string
var message string
switch status {
case domain.OUTCOME_STATUS_ERROR, domain.OUTCOME_STATUS_PENDING:
headline = fmt.Sprintf("Processing Error for Bet #%v", betID)
message = "A processing error occurred with this bet. Please review and take corrective action."
default:
return fmt.Errorf("unsupported status: %v", status)
}
headline := fmt.Sprintf("SYSTEM WARNING: High Risk Bet", betID, totalWinnings)
message := fmt.Sprintf("Bet #%d has been created with %v payout", betID, totalWinnings)
super_admin_users, _, err := s.userSvc.GetAllUsers(ctx, domain.UserFilter{
Role: string(domain.RoleSuperAdmin),
@ -294,10 +285,27 @@ func (s *Service) SendAdminLargeNotification(ctx context.Context, betID int64, s
domain.DeliveryChannelInApp,
domain.DeliveryChannelEmail,
} {
n := newBetResultNotification(user.ID, domain.NotificationLevelError, channel, headline, message, map[string]any{
"status": status,
"more": extra,
raw, _ := json.Marshal(map[string]any{
"winnings": totalWinnings,
"more": extra,
})
n := &domain.Notification{
RecipientID: user.ID,
DeliveryStatus: domain.DeliveryStatusPending,
IsRead: false,
Type: domain.NOTIFICATION_TYPE_ADMIN_ALERT,
Level: domain.NotificationLevelWarning,
Reciever: domain.NotificationRecieverSideAdmin,
DeliveryChannel: channel,
Payload: domain.NotificationPayload{
Headline: headline,
Message: message,
},
Priority: 2,
Metadata: raw,
}
// n := newBetResultNotification(user.ID, domain.NotificationLevelWarning, channel, headline, message)
if err := s.notificationSvc.SendNotification(ctx, n); err != nil {
return err
}

View File

@ -12,7 +12,7 @@ type BetStore interface {
CreateBetOutcome(ctx context.Context, outcomes []domain.CreateBetOutcome) (int64, error)
CreateFlag(ctx context.Context, flag domain.CreateFlagReq) (domain.Flag, error)
GetBetByID(ctx context.Context, id int64) (domain.GetBet, error)
GetAllBets(ctx context.Context, filter domain.BetFilter) ([]domain.GetBet, error)
GetAllBets(ctx context.Context, filter domain.BetFilter) ([]domain.GetBet, int64, error)
GetBetByUserID(ctx context.Context, UserID int64) ([]domain.GetBet, error)
GetBetByFastCode(ctx context.Context, fastcode string) (domain.GetBet, error)
GetBetOutcomeByEventID(ctx context.Context, eventID int64, is_filtered bool) ([]domain.BetOutcome, error)

View File

@ -42,6 +42,7 @@ var (
ErrOutcomeLimit = errors.New("too many outcomes on a single bet")
ErrTotalBalanceNotEnough = errors.New("total Wallet balance is insufficient to create bet")
ErrTooManyUnsettled = errors.New("too many unsettled bets")
ErrInvalidAmount = errors.New("invalid amount")
ErrBetAmountTooHigh = errors.New("cannot create a bet with an amount above limit")
ErrBetWinningTooHigh = errors.New("total Winnings over set limit")
@ -215,6 +216,25 @@ func (s *Service) PlaceBet(ctx context.Context, req domain.CreateBetReq, userID
if err != nil {
return domain.CreateBetRes{}, err
}
_, totalUnsettledBets, err := s.GetAllBets(ctx, domain.BetFilter{
Status: domain.ValidOutcomeStatus{
Value: domain.OUTCOME_STATUS_ERROR,
Valid: true,
},
})
if err != nil {
return domain.CreateBetRes{}, err
}
if totalUnsettledBets > settingsList.MaxUnsettledBets {
s.mongoLogger.Error("System block bet creation until unsettled bets fixed",
zap.Int64("total_unsettled_bets", totalUnsettledBets),
zap.Int64("user_id", userID),
)
return domain.CreateBetRes{}, ErrTooManyUnsettled
}
if req.Amount < settingsList.MinimumBetAmount.Float32() {
return domain.CreateBetRes{}, ErrInvalidAmount
}
@ -283,7 +303,8 @@ func (s *Service) PlaceBet(ctx context.Context, req domain.CreateBetReq, userID
}
fastCode := helpers.GenerateFastCode()
amount := req.Amount + (req.Amount * calculateAccumulator(len(outcomes)))
accumulator := calculateAccumulator(len(outcomes))
amount := req.Amount + (req.Amount * accumulator)
newBet := domain.CreateBet{
Amount: domain.ToCurrency(amount),
@ -316,6 +337,7 @@ func (s *Service) PlaceBet(ctx context.Context, req domain.CreateBetReq, userID
return domain.CreateBetRes{}, err
}
// For
case domain.RoleBranchManager, domain.RoleAdmin, domain.RoleSuperAdmin:
newBet.IsShopBet = true
// Branch Manager, Admin and Super Admin are required to pass a branch id if they want to create a bet
@ -367,7 +389,7 @@ func (s *Service) PlaceBet(ctx context.Context, req domain.CreateBetReq, userID
newBet.IsShopBet = false
err = s.DeductBetFromCustomerWallet(ctx, req.Amount, userID)
if err != nil {
s.mongoLogger.Error("customer wallet deduction failed",
s.mongoLogger.Warn("customer wallet deduction failed",
zap.Float32("amount", req.Amount),
zap.Int64("user_id", userID),
zap.Error(err),
@ -477,6 +499,14 @@ func (s *Service) PlaceBet(ctx context.Context, req domain.CreateBetReq, userID
}
}
if totalWinnings > settingsList.TotalWinningNotify.Float32() {
err = s.SendAdminLargeBetNotification(ctx, bet.ID, totalWinnings, "", companyID)
if err != nil {
s.mongoLogger.Error("Failed to send large bet notification", zap.Int64("betID", bet.ID),
zap.Int64("companyID", companyID), zap.Float32("totalWinnings", totalWinnings))
}
}
res := domain.ConvertCreateBetRes(bet, rows)
return res, nil
@ -557,7 +587,7 @@ func (s *Service) DeductBetFromCustomerWallet(ctx context.Context, amount float3
return err
}
// Empty remaining from static balance
remainingAmount := wallets.RegularBalance - domain.Currency(amount)
remainingAmount := wallets.RegularBalance - domain.ToCurrency(amount)
_, err = s.walletSvc.DeductFromWallet(ctx, wallets.StaticID,
remainingAmount, domain.ValidInt64{}, domain.TRANSFER_DIRECT,
fmt.Sprintf("Deducted %v amount from wallet by system while placing bet", remainingAmount.Float32()))
@ -806,7 +836,7 @@ func (s *Service) CreateBetOutcome(ctx context.Context, outcomes []domain.Create
func (s *Service) GetBetByID(ctx context.Context, id int64) (domain.GetBet, error) {
return s.betStore.GetBetByID(ctx, id)
}
func (s *Service) GetAllBets(ctx context.Context, filter domain.BetFilter) ([]domain.GetBet, error) {
func (s *Service) GetAllBets(ctx context.Context, filter domain.BetFilter) ([]domain.GetBet, int64, error) {
return s.betStore.GetAllBets(ctx, filter)
}

View File

@ -40,7 +40,7 @@ func (s *Service) SendBonusNotification(ctx context.Context, param SendBonusNoti
headline = "You've been awarded a welcome bonus!"
message = fmt.Sprintf(
"Congratulations! A you've been given %.2f as a welcome bonus for you to bet on.",
param.Amount,
param.Amount.Float32(),
)
default:
return fmt.Errorf("unsupported bonus type: %v", param.Type)

View File

@ -19,5 +19,5 @@ type Service interface {
GetEventsWithSettings(ctx context.Context, companyID int64, filter domain.EventFilter) ([]domain.EventWithSettings, int64, error)
GetEventWithSettingByID(ctx context.Context, ID int64, companyID int64) (domain.EventWithSettings, error)
UpdateEventSettings(ctx context.Context, event domain.CreateEventSettings) error
GetSportAndLeagueIDs(ctx context.Context, eventID string) ([]int64, error)
GetSportAndLeagueIDs(ctx context.Context, eventID int64) ([]int64, error)
}

View File

@ -225,7 +225,7 @@ func (s *service) fetchUpcomingEventsFromProvider(ctx context.Context, source_ur
// Restricting the page to 1 on development, which drastically reduces the amount of events that is fetched
if s.cfg.Env == "development" {
pageLimit = 1
pageLimit = 2
sportIDs = []int{1}
} else {
pageLimit = 200
@ -465,7 +465,6 @@ func (s *service) GetAllEvents(ctx context.Context, filter domain.EventFilter) (
return s.store.GetAllEvents(ctx, filter)
}
func (s *service) GetEventByID(ctx context.Context, ID int64) (domain.BaseEvent, error) {
return s.store.GetEventByID(ctx, ID)
}
@ -496,6 +495,6 @@ func (s *service) UpdateEventSettings(ctx context.Context, event domain.CreateEv
return s.store.UpdateEventSettings(ctx, event)
}
func (s *service) GetSportAndLeagueIDs(ctx context.Context, eventID string) ([]int64, error) {
func (s *service) GetSportAndLeagueIDs(ctx context.Context, eventID int64) ([]int64, error) {
return s.store.GetSportAndLeagueIDs(ctx, eventID)
}

View File

@ -52,6 +52,7 @@ func (c *WalletConsumer) Start(ctx context.Context) {
"wallet_id": evt.WalletID,
"user_id": evt.UserID,
"balance": evt.Balance,
"wallet_type": evt.WalletType,
"trigger": evt.Trigger,
"recipient_id": evt.UserID,
}

View File

@ -10,7 +10,7 @@ import (
type NotificationStore interface {
SendNotification(ctx context.Context, notification *domain.Notification) error
MarkAsRead(ctx context.Context, notificationID string, recipientID int64) error
ListNotifications(ctx context.Context, recipientID int64, limit, offset int) ([]domain.Notification, error)
GetUserNotifications(ctx context.Context, recipientID int64, limit, offset int) ([]domain.Notification, int64, error)
ConnectWebSocket(ctx context.Context, recipientID int64, c *websocket.Conn) error
DisconnectWebSocket(recipientID int64)
SendSMS(ctx context.Context, recipientID int64, message string) error

View File

@ -12,10 +12,12 @@ import (
"github.com/SamuelTariku/FortuneBet-Backend/internal/config"
"github.com/SamuelTariku/FortuneBet-Backend/internal/domain"
"github.com/SamuelTariku/FortuneBet-Backend/internal/event"
"github.com/SamuelTariku/FortuneBet-Backend/internal/pkgs/helpers"
"github.com/SamuelTariku/FortuneBet-Backend/internal/repository"
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/messenger"
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/user"
"github.com/segmentio/kafka-go"
"go.uber.org/zap"
// "github.com/SamuelTariku/FortuneBet-Backend/internal/services/wallet"
@ -38,6 +40,7 @@ type Service struct {
mongoLogger *zap.Logger
logger *slog.Logger
redisClient *redis.Client
reader *kafka.Reader
}
func New(repo repository.NotificationRepository,
@ -46,11 +49,17 @@ func New(repo repository.NotificationRepository,
cfg *config.Config,
messengerSvc *messenger.Service,
userSvc *user.Service,
kafkaBrokers []string,
) *Service {
hub := ws.NewNotificationHub()
rdb := redis.NewClient(&redis.Options{
Addr: cfg.RedisAddr, // e.g., "redis:6379"
})
walletReader := kafka.NewReader(kafka.ReaderConfig{
Brokers: kafkaBrokers,
Topic: "wallet-balance-topic",
GroupID: "notification-service-group", // Each service should have its own group
})
svc := &Service{
repo: repo,
@ -64,12 +73,14 @@ func New(repo repository.NotificationRepository,
userSvc: userSvc,
config: cfg,
redisClient: rdb,
reader: walletReader,
}
go hub.Run()
go svc.startWorker()
go svc.startRetryWorker()
go svc.RunRedisSubscriber(context.Background())
go svc.StartKafkaConsumer(context.Background())
return svc
}
@ -167,24 +178,25 @@ func (s *Service) MarkAsRead(ctx context.Context, notificationIDs []string, reci
return nil
}
func (s *Service) ListNotifications(ctx context.Context, recipientID int64, limit, offset int) ([]domain.Notification, error) {
notifications, err := s.repo.ListNotifications(ctx, recipientID, limit, offset)
func (s *Service) GetUserNotifications(ctx context.Context, recipientID int64, limit, offset int) ([]domain.Notification, int64, error) {
notifications, total, err := s.repo.GetUserNotifications(ctx, recipientID, limit, offset)
if err != nil {
s.mongoLogger.Error("[NotificationSvc.ListNotifications] Failed to list notifications",
s.mongoLogger.Error("[NotificationSvc.GetUserNotifications] Failed to list notifications",
zap.Int64("recipientID", recipientID),
zap.Int("limit", limit),
zap.Int("offset", offset),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return nil, err
return nil, 0, err
}
s.mongoLogger.Info("[NotificationSvc.ListNotifications] Successfully listed notifications",
s.mongoLogger.Info("[NotificationSvc.GetUserNotifications] Successfully listed notifications",
zap.Int64("recipientID", recipientID),
zap.Int("count", len(notifications)),
zap.Int64("total", total),
zap.Time("timestamp", time.Now()),
)
return notifications, nil
return notifications, total, nil
}
func (s *Service) GetAllNotifications(ctx context.Context, limit, offset int) ([]domain.Notification, error) {
@ -574,6 +586,88 @@ func (s *Service) GetLiveMetrics(ctx context.Context) (domain.LiveMetric, error)
return metric, nil
}
func (s *Service) StartKafkaConsumer(ctx context.Context) {
go func() {
for {
m, err := s.reader.ReadMessage(ctx)
if err != nil {
if err == context.Canceled {
s.mongoLogger.Info("[NotificationSvc.KafkaConsumer] Stopped by context")
return
}
s.mongoLogger.Error("[NotificationSvc.KafkaConsumer] Error reading message",
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
time.Sleep(1 * time.Second) // backoff
continue
}
var walletEvent event.WalletEvent
if err := json.Unmarshal(m.Value, &walletEvent); err != nil {
s.mongoLogger.Error("[NotificationSvc.KafkaConsumer] Failed to unmarshal wallet event",
zap.String("message", string(m.Value)),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
continue
}
raw, _ := json.Marshal(map[string]any{
"balance": walletEvent.Balance.Float32(),
"type": walletEvent.WalletType,
"timestamp": time.Now(),
})
headline := ""
message := ""
var receiver domain.NotificationRecieverSide
switch walletEvent.WalletType {
case domain.StaticWalletType:
headline = "Referral and Bonus Wallet Updated"
message = fmt.Sprintf("Your referral and bonus wallet balance is now %.2f", walletEvent.Balance.Float32())
receiver = domain.NotificationRecieverSideCustomer
case domain.RegularWalletType:
headline = "Wallet Updated"
message = fmt.Sprintf("Your wallet balance is now %.2f", walletEvent.Balance.Float32())
receiver = domain.NotificationRecieverSideCustomer
case domain.BranchWalletType:
headline = "Branch Wallet Updated"
message = fmt.Sprintf("branch wallet balance is now %.2f", walletEvent.Balance.Float32())
receiver = domain.NotificationRecieverSideBranchManager
case domain.CompanyWalletType:
headline = "Company Wallet Updated"
message = fmt.Sprintf("company wallet balance is now %.2f", walletEvent.Balance.Float32())
receiver = domain.NotificationRecieverSideAdmin
}
// Handle the wallet event: send notification
notification := &domain.Notification{
RecipientID: walletEvent.UserID,
DeliveryChannel: domain.DeliveryChannelInApp,
Reciever: receiver,
Type: domain.NotificationTypeWalletUpdated,
DeliveryStatus: domain.DeliveryStatusPending,
IsRead: false,
Level: domain.NotificationLevelInfo,
Priority: 2,
Metadata: raw,
Payload: domain.NotificationPayload{
Headline: headline,
Message: message,
},
}
if err := s.SendNotification(ctx, notification); err != nil {
s.mongoLogger.Error("[NotificationSvc.KafkaConsumer] Failed to send notification",
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
}
}
}()
}
// func (s *Service) UpdateLiveWalletMetricForWallet(ctx context.Context, wallet domain.Wallet) {
// var (
// payload domain.LiveWalletMetrics

View File

@ -80,10 +80,6 @@ func (s *ServiceImpl) FetchNonLiveOdds(ctx context.Context) error {
func (s *ServiceImpl) ProcessBet365Odds(ctx context.Context) error {
eventIDs, _, err := s.eventSvc.GetAllEvents(ctx, domain.EventFilter{
LastStartTime: domain.ValidTime{
Value: time.Now(),
Valid: true,
},
Status: domain.ValidEventStatus{
Value: domain.STATUS_PENDING,
Valid: true,

View File

@ -13,5 +13,5 @@ type ResultService interface {
type ResultLogStore interface {
CreateResultLog(ctx context.Context, result domain.CreateResultLog) (domain.ResultLog, error)
GetAllResultLog(ctx context.Context, filter domain.ResultFilter) ([]domain.ResultLog, error)
GetAllResultLog(ctx context.Context, filter domain.ResultLogFilter) ([]domain.ResultLog, error)
}

View File

@ -459,7 +459,7 @@ func (s *Service) FetchB365ResultAndUpdateBets(ctx context.Context) error {
func (s *Service) CheckAndSendResultNotifications(ctx context.Context, createdAfter time.Time) error {
resultLog, err := s.repo.GetAllResultLog(ctx, domain.ResultFilter{
resultLog, err := s.repo.GetAllResultLog(ctx, domain.ResultLogFilter{
CreatedAfter: domain.ValidTime{
Value: createdAfter,
Valid: true,
@ -557,10 +557,10 @@ func buildHeadlineAndMessageEmail(counts domain.ResultLog, user domain.User) (st
greeting := fmt.Sprintf("Hi %s %s,", user.FirstName, user.LastName)
if totalIssues == 0 {
headline := "✅ Daily Results Report — All Events Processed Successfully"
headline := "✅ Weekly Results Report — All Events Processed Successfully"
plain := fmt.Sprintf(`%s
Daily Results Summary:
Weekly Results Summary:
- %d Ended Events
- %d Total Bets
@ -570,7 +570,7 @@ Best regards,
The System`, greeting, counts.StatusEndedCount, totalBets)
html := fmt.Sprintf(`<p>%s</p>
<h2>Daily Results Summary</h2>
<h2>Weekly Results Summary</h2>
<ul>
<li><strong>%d Ended Events</strong></li>
<li><strong>%d Total Bets</strong></li>
@ -616,11 +616,11 @@ The System`, greeting, counts.StatusEndedCount, totalBets)
fmt.Sprintf("<li><strong>%d Successfully Ended Events</strong> (%d Bets)</li>", counts.StatusEndedCount, counts.StatusEndedBets))
}
headline := "⚠️ Daily Results Report — Review Required"
headline := "⚠️ Weekly Results Report — Review Required"
plain := fmt.Sprintf(`%s
Daily Results Summary:
Weekly Results Summary:
%s
Totals:
@ -639,7 +639,7 @@ The System`,
)
html := fmt.Sprintf(`<p>%s</p>
<h2>Daily Results Summary</h2>
<h2>Weekly Results Summary</h2>
<ul>
%s
</ul>

View File

@ -16,21 +16,21 @@ import (
)
type Client struct {
http *http.Client
BaseURL string
http *http.Client
BaseURL string
PrivateKey string
CasinoID string
PartnerID string
walletSvc *wallet.Service
CasinoID string
PartnerID string
walletSvc *wallet.Service
}
func NewClient(cfg *config.Config, walletSvc *wallet.Service) *Client {
return &Client{
http: &http.Client{Timeout: 10 * time.Second},
BaseURL: cfg.Atlas.BaseURL,
PrivateKey: cfg.Atlas.SecretKey, // PRIVATE_KEY from Atlas
CasinoID: cfg.Atlas.CasinoID, // provided by Atlas
PartnerID: cfg.Atlas.PartnerID, // aggregator/casino partner_id
PrivateKey: cfg.Atlas.SecretKey, // PRIVATE_KEY from Atlas
CasinoID: cfg.Atlas.CasinoID, // provided by Atlas
PartnerID: cfg.Atlas.PartnerID, // aggregator/casino partner_id
walletSvc: walletSvc,
}
}
@ -61,6 +61,7 @@ func (c *Client) post(ctx context.Context, path string, body map[string]any, res
hash := c.generateHash(tmp, timestamp)
body["hash"] = hash
fmt.Printf("atlasPost: %v \n", body)
// Marshal final body
data, _ := json.Marshal(body)

View File

@ -0,0 +1,234 @@
package wallet
import (
"context"
"fmt"
"github.com/SamuelTariku/FortuneBet-Backend/internal/domain"
"go.uber.org/zap"
"time"
)
func (s *Service) GetAdminNotificationRecipients(ctx context.Context, walletID int64, walletType domain.WalletType) ([]int64, error) {
var recipients []int64
switch walletType {
case domain.BranchWalletType:
branch, err := s.GetBranchByWalletID(ctx, walletID)
if err != nil {
s.mongoLogger.Error("[GetAdminNotificationRecipients] failed to GetBranchWalletByID", zap.Int64("walletID", walletID))
return nil, err
}
// Branch managers will be notified when branch wallet is empty
recipients = append(recipients, branch.BranchManagerID)
// Cashier will be notified
cashiers, err := s.userSvc.GetCashiersByBranch(ctx, branch.ID)
if err != nil {
return nil, err
}
for _, cashier := range cashiers {
recipients = append(recipients, cashier.ID)
}
// Admin will also be notified
admin, err := s.userSvc.GetAdminByCompanyID(ctx, branch.CompanyID)
if err != nil {
return nil, err
}
recipients = append(recipients, admin.ID)
case domain.CompanyWalletType:
company, err := s.GetCompanyByWalletID(ctx, walletID)
if err != nil {
return nil, err
}
recipients = append(recipients, company.AdminID)
default:
return nil, fmt.Errorf("Invalid wallet type")
}
users, _, err := s.userSvc.GetAllUsers(ctx, domain.UserFilter{
Role: string(domain.RoleSuperAdmin),
})
if err != nil {
return nil, err
}
for _, user := range users {
recipients = append(recipients, user.ID)
}
return recipients, nil
}
func (s *Service) SendAdminWalletLowNotification(ctx context.Context, adminWallet domain.Wallet) error {
// Send different messages
// Send notification to admin team
adminNotification := &domain.Notification{
ErrorSeverity: "low",
IsRead: false,
DeliveryStatus: domain.DeliveryStatusPending,
RecipientID: adminWallet.UserID,
Type: domain.NOTIFICATION_TYPE_ADMIN_ALERT,
Level: domain.NotificationLevelWarning,
Reciever: domain.NotificationRecieverSideAdmin,
DeliveryChannel: domain.DeliveryChannelInApp, // Or any preferred admin channel
Payload: domain.NotificationPayload{
Headline: "CREDIT WARNING: System Running Out of Funds",
Message: fmt.Sprintf(
"Wallet ID %d is running low. Current balance: %.2f",
adminWallet.ID,
adminWallet.Balance.Float32(),
),
},
Priority: 1, // High priority for admin alerts
Metadata: fmt.Appendf(nil, `{
"wallet_id": %d,
"balance": %d,
"notification_type": "admin_alert"
}`, adminWallet.ID, adminWallet.Balance),
}
// Get admin recipients and send to all
adminRecipients, err := s.GetAdminNotificationRecipients(ctx, adminWallet.ID, adminWallet.Type)
if err != nil {
s.mongoLogger.Error("failed to get admin recipients",
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return err
}
for _, adminID := range adminRecipients {
adminNotification.RecipientID = adminID
if err := s.notificationSvc.SendNotification(ctx, adminNotification); err != nil {
s.mongoLogger.Error("failed to send admin notification",
zap.Int64("admin_id", adminID),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
}
adminNotification.DeliveryChannel = domain.DeliveryChannelEmail
if err := s.notificationSvc.SendNotification(ctx, adminNotification); err != nil {
s.mongoLogger.Error("failed to send email admin notification",
zap.Int64("admin_id", adminID),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return err
}
}
return nil
}
func (s *Service) SendAdminWalletInsufficientNotification(ctx context.Context, adminWallet domain.Wallet, amount domain.Currency) error {
// Send notification to admin team
adminNotification := &domain.Notification{
ErrorSeverity: domain.NotificationErrorSeverityLow,
IsRead: false,
DeliveryStatus: domain.DeliveryStatusPending,
RecipientID: adminWallet.UserID,
Type: domain.NOTIFICATION_TYPE_ADMIN_ALERT,
Level: domain.NotificationLevelError,
Reciever: domain.NotificationRecieverSideAdmin,
DeliveryChannel: domain.DeliveryChannelInApp, // Or any preferred admin channel
Payload: domain.NotificationPayload{
Headline: "CREDIT Error: Admin Wallet insufficient to process customer request",
Message: fmt.Sprintf(
"Wallet ID %d. Transaction Amount %.2f. Current balance: %.2f",
adminWallet.ID,
amount.Float32(),
adminWallet.Balance.Float32(),
),
},
Priority: 1, // High priority for admin alerts
Metadata: fmt.Appendf(nil, `{
"wallet_id": %d,
"balance": %d,
"transaction amount": %.2f,
"notification_type": "admin_alert"
}`, adminWallet.ID, adminWallet.Balance, amount.Float32()),
}
// Get admin recipients and send to all
recipients, err := s.GetAdminNotificationRecipients(ctx, adminWallet.ID, adminWallet.Type)
if err != nil {
s.mongoLogger.Error("failed to get admin recipients",
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return err
}
for _, adminID := range recipients {
adminNotification.RecipientID = adminID
if err := s.notificationSvc.SendNotification(ctx, adminNotification); err != nil {
s.mongoLogger.Error("failed to send admin notification",
zap.Int64("admin_id", adminID),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
}
adminNotification.DeliveryChannel = domain.DeliveryChannelEmail
if err := s.notificationSvc.SendNotification(ctx, adminNotification); err != nil {
s.mongoLogger.Error("failed to send email admin notification",
zap.Int64("admin_id", adminID),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return err
}
}
return nil
}
func (s *Service) SendCustomerWalletInsufficientNotification(ctx context.Context, customerWallet domain.Wallet, amount domain.Currency) error {
// Send notification to admin team
customerNotification := &domain.Notification{
ErrorSeverity: domain.NotificationErrorSeverityLow,
IsRead: false,
DeliveryStatus: domain.DeliveryStatusPending,
RecipientID: customerWallet.UserID,
Type: domain.NOTIFICATION_TYPE_WALLET,
Level: domain.NotificationLevelError,
Reciever: domain.NotificationRecieverSideCustomer,
DeliveryChannel: domain.DeliveryChannelInApp, // Or any preferred admin channel
Payload: domain.NotificationPayload{
Headline: "CREDIT Error: Wallet insufficient",
Message: fmt.Sprintf(
"Wallet ID %d. Transaction Amount %.2f. Current balance: %.2f",
customerWallet.ID,
amount.Float32(),
customerWallet.Balance.Float32(),
),
},
Priority: 1, // High priority for admin alerts
Metadata: fmt.Appendf(nil, `{
"wallet_id": %d,
"balance": %d,
"transaction amount": %.2f,
"notification_type": "admin_alert"
}`, customerWallet.ID, customerWallet.Balance, amount.Float32()),
}
if err := s.notificationSvc.SendNotification(ctx, customerNotification); err != nil {
s.mongoLogger.Error("failed to create customer notification",
zap.Int64("customer_id", customerWallet.UserID),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return err
}
return nil
}

View File

@ -4,11 +4,9 @@ import (
"context"
"errors"
"fmt"
"time"
"github.com/SamuelTariku/FortuneBet-Backend/internal/domain"
"github.com/SamuelTariku/FortuneBet-Backend/internal/event"
"go.uber.org/zap"
)
var (
@ -96,11 +94,12 @@ func (s *Service) UpdateBalance(ctx context.Context, id int64, balance domain.Cu
go func() {
s.kafkaProducer.Publish(ctx, fmt.Sprint(wallet.UserID), event.WalletEvent{
EventType: event.WalletBalanceUpdated,
WalletID: wallet.ID,
UserID: wallet.UserID,
Balance: balance,
Trigger: "UpdateBalance",
EventType: event.WalletBalanceUpdated,
WalletID: wallet.ID,
UserID: wallet.UserID,
Balance: balance,
WalletType: wallet.Type,
Trigger: "UpdateBalance",
})
}()
@ -121,11 +120,12 @@ func (s *Service) AddToWallet(
go func() {
s.kafkaProducer.Publish(ctx, fmt.Sprint(wallet.ID), event.WalletEvent{
EventType: event.WalletBalanceUpdated,
WalletID: wallet.ID,
UserID: wallet.UserID,
Balance: wallet.Balance + amount,
Trigger: "AddToWallet",
EventType: event.WalletBalanceUpdated,
WalletID: wallet.ID,
UserID: wallet.UserID,
Balance: wallet.Balance + amount,
WalletType: wallet.Type,
Trigger: "AddToWallet",
})
}()
@ -173,11 +173,8 @@ func (s *Service) DeductFromWallet(ctx context.Context, id int64, amount domain.
}
balance := wallet.Balance.Float32()
for _, threshold := range thresholds {
if balance < threshold {
s.SendAdminWalletLowNotification(ctx, wallet)
break // only send once per check
}
if balance < thresholds[0] {
s.SendAdminWalletLowNotification(ctx, wallet)
}
}
@ -189,11 +186,12 @@ func (s *Service) DeductFromWallet(ctx context.Context, id int64, amount domain.
go func() {
s.kafkaProducer.Publish(ctx, fmt.Sprint(wallet.ID), event.WalletEvent{
EventType: event.WalletBalanceUpdated,
WalletID: wallet.ID,
UserID: wallet.UserID,
Balance: wallet.Balance - amount,
Trigger: "DeductFromWallet",
EventType: event.WalletBalanceUpdated,
WalletID: wallet.ID,
UserID: wallet.UserID,
Balance: wallet.Balance - amount,
WalletType: wallet.Type,
Trigger: "DeductFromWallet",
})
}()
@ -215,9 +213,6 @@ func (s *Service) DeductFromWallet(ctx context.Context, id int64, amount domain.
return newTransfer, err
}
// Directly Refilling wallet without
// func (s *Service) RefillWallet(ctx context.Context, transfer domain.CreateTransfer) (domain.Transfer, error) {
// receiverWallet, err := s.GetWalletByID(ctx, transfer.ReceiverWalletID)
@ -257,219 +252,3 @@ func (s *Service) DeductFromWallet(ctx context.Context, id int64, amount domain.
func (s *Service) UpdateWalletActive(ctx context.Context, id int64, isActive bool) error {
return s.walletStore.UpdateWalletActive(ctx, id, isActive)
}
func (s *Service) GetAdminNotificationRecipients(ctx context.Context, walletID int64, walletType domain.WalletType) ([]int64, error) {
var recipients []int64
switch walletType {
case domain.BranchWalletType:
branch, err := s.GetBranchByWalletID(ctx, walletID)
if err != nil {
return nil, err
}
recipients = append(recipients, branch.BranchManagerID)
cashiers, err := s.userSvc.GetCashiersByBranch(ctx, branch.ID)
if err != nil {
return nil, err
}
for _, cashier := range cashiers {
recipients = append(recipients, cashier.ID)
}
admin, err := s.userSvc.GetAdminByCompanyID(ctx, branch.CompanyID)
if err != nil {
return nil, err
}
recipients = append(recipients, admin.ID)
case domain.CompanyWalletType:
company, err := s.GetCompanyByWalletID(ctx, walletID)
if err != nil {
return nil, err
}
recipients = append(recipients, company.AdminID)
default:
return nil, fmt.Errorf("Invalid wallet type")
}
users, _, err := s.userSvc.GetAllUsers(ctx, domain.UserFilter{
Role: string(domain.RoleSuperAdmin),
})
if err != nil {
return nil, err
}
for _, user := range users {
recipients = append(recipients, user.ID)
}
return recipients, nil
}
func (s *Service) SendAdminWalletLowNotification(ctx context.Context, adminWallet domain.Wallet) error {
// Send notification to admin team
adminNotification := &domain.Notification{
ErrorSeverity: "low",
IsRead: false,
DeliveryStatus: domain.DeliveryStatusPending,
RecipientID: adminWallet.UserID,
Type: domain.NOTIFICATION_TYPE_ADMIN_ALERT,
Level: domain.NotificationLevelWarning,
Reciever: domain.NotificationRecieverSideAdmin,
DeliveryChannel: domain.DeliveryChannelInApp, // Or any preferred admin channel
Payload: domain.NotificationPayload{
Headline: "CREDIT WARNING: System Running Out of Funds",
Message: fmt.Sprintf(
"Wallet ID %d is running low. Current balance: %.2f",
adminWallet.ID,
adminWallet.Balance.Float32(),
),
},
Priority: 1, // High priority for admin alerts
Metadata: fmt.Appendf(nil, `{
"wallet_id": %d,
"balance": %d,
"notification_type": "admin_alert"
}`, adminWallet.ID, adminWallet.Balance),
}
// Get admin recipients and send to all
adminRecipients, err := s.GetAdminNotificationRecipients(ctx, adminWallet.ID, adminWallet.Type)
if err != nil {
s.mongoLogger.Error("failed to get admin recipients",
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return err
}
for _, adminID := range adminRecipients {
adminNotification.RecipientID = adminID
if err := s.notificationSvc.SendNotification(ctx, adminNotification); err != nil {
s.mongoLogger.Error("failed to send admin notification",
zap.Int64("admin_id", adminID),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
}
adminNotification.DeliveryChannel = domain.DeliveryChannelEmail
if err := s.notificationSvc.SendNotification(ctx, adminNotification); err != nil {
s.mongoLogger.Error("failed to send email admin notification",
zap.Int64("admin_id", adminID),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return err
}
}
return nil
}
func (s *Service) SendAdminWalletInsufficientNotification(ctx context.Context, adminWallet domain.Wallet, amount domain.Currency) error {
// Send notification to admin team
adminNotification := &domain.Notification{
ErrorSeverity: domain.NotificationErrorSeverityLow,
IsRead: false,
DeliveryStatus: domain.DeliveryStatusPending,
RecipientID: adminWallet.UserID,
Type: domain.NOTIFICATION_TYPE_ADMIN_ALERT,
Level: domain.NotificationLevelError,
Reciever: domain.NotificationRecieverSideAdmin,
DeliveryChannel: domain.DeliveryChannelInApp, // Or any preferred admin channel
Payload: domain.NotificationPayload{
Headline: "CREDIT Error: Admin Wallet insufficient to process customer request",
Message: fmt.Sprintf(
"Wallet ID %d. Transaction Amount %.2f. Current balance: %.2f",
adminWallet.ID,
amount.Float32(),
adminWallet.Balance.Float32(),
),
},
Priority: 1, // High priority for admin alerts
Metadata: fmt.Appendf(nil, `{
"wallet_id": %d,
"balance": %d,
"transaction amount": %.2f,
"notification_type": "admin_alert"
}`, adminWallet.ID, adminWallet.Balance, amount.Float32()),
}
// Get admin recipients and send to all
recipients, err := s.GetAdminNotificationRecipients(ctx, adminWallet.ID, adminWallet.Type)
if err != nil {
s.mongoLogger.Error("failed to get admin recipients",
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return err
}
for _, adminID := range recipients {
adminNotification.RecipientID = adminID
if err := s.notificationSvc.SendNotification(ctx, adminNotification); err != nil {
s.mongoLogger.Error("failed to send admin notification",
zap.Int64("admin_id", adminID),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
}
adminNotification.DeliveryChannel = domain.DeliveryChannelEmail
if err := s.notificationSvc.SendNotification(ctx, adminNotification); err != nil {
s.mongoLogger.Error("failed to send email admin notification",
zap.Int64("admin_id", adminID),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return err
}
}
return nil
}
func (s *Service) SendCustomerWalletInsufficientNotification(ctx context.Context, customerWallet domain.Wallet, amount domain.Currency) error {
// Send notification to admin team
customerNotification := &domain.Notification{
ErrorSeverity: domain.NotificationErrorSeverityLow,
IsRead: false,
DeliveryStatus: domain.DeliveryStatusPending,
RecipientID: customerWallet.UserID,
Type: domain.NOTIFICATION_TYPE_WALLET,
Level: domain.NotificationLevelError,
Reciever: domain.NotificationRecieverSideCustomer,
DeliveryChannel: domain.DeliveryChannelInApp, // Or any preferred admin channel
Payload: domain.NotificationPayload{
Headline: "CREDIT Error: Wallet insufficient",
Message: fmt.Sprintf(
"Wallet ID %d. Transaction Amount %.2f. Current balance: %.2f",
customerWallet.ID,
amount.Float32(),
customerWallet.Balance.Float32(),
),
},
Priority: 1, // High priority for admin alerts
Metadata: fmt.Appendf(nil, `{
"wallet_id": %d,
"balance": %d,
"transaction amount": %.2f,
"notification_type": "admin_alert"
}`, customerWallet.ID, customerWallet.Balance, amount.Float32()),
}
if err := s.notificationSvc.SendNotification(ctx, customerNotification); err != nil {
s.mongoLogger.Error("failed to create customer notification",
zap.Int64("customer_id", customerWallet.UserID),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return err
}
return nil
}

View File

@ -27,58 +27,58 @@ func StartDataFetchingCrons(eventService eventsvc.Service, oddsService oddssvc.S
spec string
task func()
}{
// {
// spec: "0 0 * * * *", // Every 1 hour
// task: func() {
// mongoLogger.Info("Began fetching upcoming events cron task")
// if err := eventService.FetchUpcomingEvents(context.Background()); err != nil {
// mongoLogger.Error("Failed to fetch upcoming events",
// zap.Error(err),
// )
// } else {
// mongoLogger.Info("Completed fetching upcoming events without errors")
// }
// },
// },
// {
// spec: "0 0 * * * *", // Every 1 hour (since its takes that long to fetch all the events)
// task: func() {
// mongoLogger.Info("Began fetching non live odds cron task")
// if err := oddsService.FetchNonLiveOdds(context.Background()); err != nil {
// mongoLogger.Error("Failed to fetch non live odds",
// zap.Error(err),
// )
// } else {
// mongoLogger.Info("Completed fetching non live odds without errors")
// }
// },
// },
// {
// spec: "0 */5 * * * *", // Every 5 Minutes
// task: func() {
// mongoLogger.Info("Began update all expired events status cron task")
// if _, err := resultService.CheckAndUpdateExpiredB365Events(context.Background()); err != nil {
// mongoLogger.Error("Failed to update expired events status",
// zap.Error(err),
// )
// } else {
// mongoLogger.Info("Completed expired events without errors")
// }
// },
// },
// {
// spec: "0 */15 * * * *", // Every 15 Minutes
// task: func() {
// mongoLogger.Info("Began updating bets based on event results cron task")
// if err := resultService.FetchB365ResultAndUpdateBets(context.Background()); err != nil {
// mongoLogger.Error("Failed to process result",
// zap.Error(err),
// )
// } else {
// mongoLogger.Info("Completed processing all event result outcomes without errors")
// }
// },
// },
{
spec: "0 0 * * * *", // Every 1 hour
task: func() {
mongoLogger.Info("Began fetching upcoming events cron task")
if err := eventService.FetchUpcomingEvents(context.Background()); err != nil {
mongoLogger.Error("Failed to fetch upcoming events",
zap.Error(err),
)
} else {
mongoLogger.Info("Completed fetching upcoming events without errors")
}
},
},
{
spec: "0 0 * * * *", // Every 1 hour (since its takes that long to fetch all the events)
task: func() {
mongoLogger.Info("Began fetching non live odds cron task")
if err := oddsService.FetchNonLiveOdds(context.Background()); err != nil {
mongoLogger.Error("Failed to fetch non live odds",
zap.Error(err),
)
} else {
mongoLogger.Info("Completed fetching non live odds without errors")
}
},
},
{
spec: "0 */5 * * * *", // Every 5 Minutes
task: func() {
mongoLogger.Info("Began update all expired events status cron task")
if _, err := resultService.CheckAndUpdateExpiredB365Events(context.Background()); err != nil {
mongoLogger.Error("Failed to update expired events status",
zap.Error(err),
)
} else {
mongoLogger.Info("Completed expired events without errors")
}
},
},
{
spec: "0 */15 * * * *", // Every 15 Minutes
task: func() {
mongoLogger.Info("Began updating bets based on event results cron task")
if err := resultService.FetchB365ResultAndUpdateBets(context.Background()); err != nil {
mongoLogger.Error("Failed to process result",
zap.Error(err),
)
} else {
mongoLogger.Info("Completed processing all event result outcomes without errors")
}
},
},
{
spec: "0 0 0 * * 1", // Every Monday
task: func() {
@ -95,7 +95,7 @@ func StartDataFetchingCrons(eventService eventsvc.Service, oddsService oddssvc.S
}
for _, job := range schedule {
job.task()
// job.task()
if _, err := c.AddFunc(job.spec, job.task); err != nil {
mongoLogger.Error("Failed to schedule data fetching cron job",
zap.Error(err),

View File

@ -251,7 +251,7 @@ func (h *Handler) CreateBetInternal(c *fiber.Ctx, req domain.CreateBetReq, userI
sportAndLeagueIDs := [][]int64{}
for _, outcome := range req.Outcomes {
ids, err := h.eventSvc.GetSportAndLeagueIDs(c.Context(), fmt.Sprintf("%d", outcome.EventID))
ids, err := h.eventSvc.GetSportAndLeagueIDs(c.Context(), outcome.EventID)
if err != nil {
continue
}
@ -459,8 +459,17 @@ func (h *Handler) RandomBet(c *fiber.Ctx) error {
// @Router /api/v1/{tenant_slug}/sport/bet [get]
func (h *Handler) GetAllBet(c *fiber.Ctx) error {
role := c.Locals("role").(domain.Role)
// companyID := c.Locals("company_id").(domain.ValidInt64)
// branchID := c.Locals("branch_id").(domain.ValidInt64)
page := c.QueryInt("page", 1)
pageSize := c.QueryInt("page_size", 10)
limit := domain.ValidInt32{
Value: int32(pageSize),
Valid: true,
}
offset := domain.ValidInt32{
Value: int32(page - 1),
Valid: true,
}
var isShopBet domain.ValidBool
isShopBetQuery := c.Query("is_shop")
@ -525,11 +534,32 @@ func (h *Handler) GetAllBet(c *fiber.Ctx) error {
}
}
bets, err := h.betSvc.GetAllBets(c.Context(), domain.BetFilter{
var statusFilter domain.ValidOutcomeStatus
statusQuery := c.Query("status")
if statusQuery != "" {
statusParsed, err := strconv.ParseInt(statusQuery, 10, 32)
if err != nil {
h.mongoLoggerSvc.Info("invalid status format",
zap.String("status", statusQuery),
zap.Int("status_code", fiber.StatusBadRequest),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return fiber.NewError(fiber.StatusBadRequest, "Invalid status format")
}
statusFilter = domain.ValidOutcomeStatus{
Value: domain.OutcomeStatus(statusParsed),
Valid: true,
}
}
bets, total, err := h.betSvc.GetAllBets(c.Context(), domain.BetFilter{
IsShopBet: isShopBet,
Query: searchString,
CreatedBefore: createdBefore,
CreatedAfter: createdAfter,
Status: statusFilter,
Limit: limit,
Offset: offset,
})
if err != nil {
h.mongoLoggerSvc.Error("Failed to get all bets",
@ -545,7 +575,7 @@ func (h *Handler) GetAllBet(c *fiber.Ctx) error {
res[i] = domain.ConvertBet(bet)
}
return response.WriteJSON(c, fiber.StatusOK, "All bets retrieved successfully", res, nil)
return response.WritePaginatedJSON(c, fiber.StatusOK, "All bets retrieved successfully", res, nil, page, int(total))
}
// GetAllTenants godoc
@ -565,9 +595,17 @@ func (h *Handler) GetAllTenantBets(c *fiber.Ctx) error {
return fiber.NewError(fiber.StatusBadRequest, "invalid company id")
}
role := c.Locals("role").(domain.Role)
// companyID := c.Locals("company_id").(domain.ValidInt64)
// branchID := c.Locals("branch_id").(domain.ValidInt64)
page := c.QueryInt("page", 1)
pageSize := c.QueryInt("page_size", 10)
limit := domain.ValidInt32{
Value: int32(pageSize),
Valid: true,
}
offset := domain.ValidInt32{
Value: int32(page - 1),
Valid: true,
}
var isShopBet domain.ValidBool
isShopBetQuery := c.Query("is_shop")
if isShopBetQuery != "" && role == domain.RoleSuperAdmin {
@ -631,12 +669,33 @@ func (h *Handler) GetAllTenantBets(c *fiber.Ctx) error {
}
}
bets, err := h.betSvc.GetAllBets(c.Context(), domain.BetFilter{
var statusFilter domain.ValidOutcomeStatus
statusQuery := c.Query("status")
if statusQuery != "" {
statusParsed, err := strconv.ParseInt(statusQuery, 10, 32)
if err != nil {
h.mongoLoggerSvc.Info("invalid status format",
zap.String("status", statusQuery),
zap.Int("status_code", fiber.StatusBadRequest),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return fiber.NewError(fiber.StatusBadRequest, "Invalid status format")
}
statusFilter = domain.ValidOutcomeStatus{
Value: domain.OutcomeStatus(statusParsed),
Valid: true,
}
}
bets, total, err := h.betSvc.GetAllBets(c.Context(), domain.BetFilter{
CompanyID: companyID,
IsShopBet: isShopBet,
Query: searchString,
CreatedBefore: createdBefore,
CreatedAfter: createdAfter,
Status: statusFilter,
Limit: limit,
Offset: offset,
})
if err != nil {
h.mongoLoggerSvc.Error("Failed to get all bets",
@ -652,7 +711,7 @@ func (h *Handler) GetAllTenantBets(c *fiber.Ctx) error {
res[i] = domain.ConvertBet(bet)
}
return response.WriteJSON(c, fiber.StatusOK, "All bets retrieved successfully", res, nil)
return response.WritePaginatedJSON(c, fiber.StatusOK, "All bets retrieved successfully", res, nil, page, int(total))
}
// GetBetByID godoc

View File

@ -234,22 +234,22 @@ func (h *Handler) GetTenantUpcomingEvents(c *fiber.Ctx) error {
Valid: searchQuery != "",
}
firstStartTimeQuery := c.Query("first_start_time")
var firstStartTime domain.ValidTime
if firstStartTimeQuery != "" {
firstStartTimeParsed, err := time.Parse(time.RFC3339, firstStartTimeQuery)
if err != nil {
h.BadRequestLogger().Info("invalid start_time format",
zap.String("first_start_time", firstStartTimeQuery),
zap.Error(err),
)
return fiber.NewError(fiber.StatusBadRequest, "Invalid start_time format")
}
firstStartTime = domain.ValidTime{
Value: firstStartTimeParsed,
Valid: true,
}
}
// firstStartTimeQuery := c.Query("first_start_time")
// var firstStartTime domain.ValidTime
// if firstStartTimeQuery != "" {
// firstStartTimeParsed, err := time.Parse(time.RFC3339, firstStartTimeQuery)
// if err != nil {
// h.BadRequestLogger().Info("invalid start_time format",
// zap.String("first_start_time", firstStartTimeQuery),
// zap.Error(err),
// )
// return fiber.NewError(fiber.StatusBadRequest, "Invalid start_time format")
// }
// firstStartTime = domain.ValidTime{
// Value: firstStartTimeParsed,
// Valid: true,
// }
// }
lastStartTimeQuery := c.Query("last_start_time")
var lastStartTime domain.ValidTime
@ -297,7 +297,10 @@ func (h *Handler) GetTenantUpcomingEvents(c *fiber.Ctx) error {
SportID: sportID,
LeagueID: leagueID,
Query: searchString,
FirstStartTime: firstStartTime,
FirstStartTime: domain.ValidTime{
Value: time.Now(),
Valid: true,
},
LastStartTime: lastStartTime,
Limit: limit,
Offset: offset,

View File

@ -316,14 +316,14 @@ func (h *Handler) CreateAndSendNotification(c *fiber.Ctx) error {
}
}
func (h *Handler) GetNotifications(c *fiber.Ctx) error {
func (h *Handler) GetUserNotification(c *fiber.Ctx) error {
limitStr := c.Query("limit", "10")
offsetStr := c.Query("offset", "0")
// Convert limit and offset to integers
limit, err := strconv.Atoi(limitStr)
if err != nil || limit <= 0 {
h.mongoLoggerSvc.Info("[NotificationSvc.GetNotifications] Invalid limit value",
h.mongoLoggerSvc.Info("[NotificationSvc.GetUserNotification] Invalid limit value",
zap.String("limit", limitStr),
zap.Int("status_code", fiber.StatusBadRequest),
zap.Error(err),
@ -333,7 +333,7 @@ func (h *Handler) GetNotifications(c *fiber.Ctx) error {
}
offset, err := strconv.Atoi(offsetStr)
if err != nil || offset < 0 {
h.mongoLoggerSvc.Info("[NotificationSvc.GetNotifications] Invalid offset value",
h.mongoLoggerSvc.Info("[NotificationSvc.GetUserNotification] Invalid offset value",
zap.String("offset", offsetStr),
zap.Int("status_code", fiber.StatusBadRequest),
zap.Error(err),
@ -344,7 +344,7 @@ func (h *Handler) GetNotifications(c *fiber.Ctx) error {
userID, ok := c.Locals("user_id").(int64)
if !ok || userID == 0 {
h.mongoLoggerSvc.Error("[NotificationSvc.GetNotifications] Invalid user ID in context",
h.mongoLoggerSvc.Error("[NotificationSvc.GetUserNotification] Invalid user ID in context",
zap.Int64("userID", userID),
zap.Int("status_code", fiber.StatusInternalServerError),
zap.Error(err),
@ -353,9 +353,9 @@ func (h *Handler) GetNotifications(c *fiber.Ctx) error {
return fiber.NewError(fiber.StatusInternalServerError, "Invalid user identification")
}
notifications, err := h.notificationSvc.ListNotifications(context.Background(), userID, limit, offset)
notifications, total, err := h.notificationSvc.GetUserNotifications(context.Background(), userID, limit, offset)
if err != nil {
h.mongoLoggerSvc.Error("[NotificationSvc.GetNotifications] Failed to fetch notifications",
h.mongoLoggerSvc.Error("[NotificationSvc.GetUserNotification] Failed to fetch notifications",
zap.Int64("userID", userID),
zap.Int("status_code", fiber.StatusInternalServerError),
zap.Error(err),
@ -366,7 +366,7 @@ func (h *Handler) GetNotifications(c *fiber.Ctx) error {
return c.Status(fiber.StatusOK).JSON(fiber.Map{
"notifications": notifications,
"total_count": len(notifications),
"total_count": total,
"limit": limit,
"offset": offset,
})

View File

@ -139,7 +139,7 @@ func (h *Handler) GetOddsByMarketID(c *fiber.Ctx) error {
rawOdds, err := h.prematchSvc.GetOddsByMarketID(c.Context(), marketID, eventID)
if err != nil {
// Lets turn this into a warn because this is constantly going off
h.InternalServerErrorLogger().Warn("Failed to get raw odds by market ID", append(logFields, zap.Error(err))...)
// h.InternalServerErrorLogger().Warn("Failed to get raw odds by market ID", append(logFields, zap.Error(err))...)
return fiber.NewError(fiber.StatusInternalServerError, err.Error())
}
@ -189,7 +189,7 @@ func (h *Handler) GetTenantOddsByMarketID(c *fiber.Ctx) error {
if err != nil {
// Lets turn this into a warn because this is constantly going off
h.InternalServerErrorLogger().Warn("Failed to get raw odds by market ID", append(logFields, zap.Error(err))...)
// h.InternalServerErrorLogger().Warn("Failed to get raw odds by market ID", append(logFields, zap.Error(err))...)
return fiber.NewError(fiber.StatusInternalServerError, err.Error())
}

View File

@ -762,8 +762,9 @@ type SearchUserByNameOrPhoneReq struct {
// @Success 200 {object} UserProfileRes
// @Failure 400 {object} response.APIResponse
// @Failure 500 {object} response.APIResponse
// @Router /api/v1/{tenant_slug}/user/search [post]
// @Router /api/v1/user/search [post]
func (h *Handler) SearchUserByNameOrPhone(c *fiber.Ctx) error {
// TODO: Add filtering by role based on which user is calling this
var req SearchUserByNameOrPhoneReq
if err := c.BodyParser(&req); err != nil {
@ -783,6 +784,7 @@ func (h *Handler) SearchUserByNameOrPhone(c *fiber.Ctx) error {
}
return fiber.NewError(fiber.StatusBadRequest, errMsg)
}
companyID := c.Locals("company_id").(domain.ValidInt64)
users, err := h.userSvc.SearchUserByNameOrPhone(c.Context(), req.SearchString, req.Role, companyID)
@ -831,6 +833,89 @@ func (h *Handler) SearchUserByNameOrPhone(c *fiber.Ctx) error {
}
// SearchUserByNameOrPhone godoc
// @Summary Search for user using name or phone
// @Description Search for user using name or phone
// @Tags user
// @Accept json
// @Produce json
// @Param searchUserByNameOrPhone body SearchUserByNameOrPhoneReq true "Search for using his name or phone"
// @Success 200 {object} UserProfileRes
// @Failure 400 {object} response.APIResponse
// @Failure 500 {object} response.APIResponse
// @Router /api/v1/{tenant_slug}/user/search [post]
func (h *Handler) SearchCompanyUserByNameOrPhone(c *fiber.Ctx) error {
companyID := c.Locals("company_id").(domain.ValidInt64)
if !companyID.Valid {
h.BadRequestLogger().Error("invalid company id")
return fiber.NewError(fiber.StatusBadRequest, "invalid company id")
}
var req SearchUserByNameOrPhoneReq
if err := c.BodyParser(&req); err != nil {
h.mongoLoggerSvc.Error("Failed to Search UserBy Name Or Phone failed",
zap.Any("request", req),
zap.Int("status_code", fiber.StatusBadRequest),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return fiber.NewError(fiber.StatusBadRequest, "Invalid request body:"+err.Error())
}
valErrs, ok := h.validator.Validate(c, req)
if !ok {
var errMsg string
for field, msg := range valErrs {
errMsg += fmt.Sprintf("%s: %s; ", field, msg)
}
return fiber.NewError(fiber.StatusBadRequest, errMsg)
}
users, err := h.userSvc.SearchUserByNameOrPhone(c.Context(), req.SearchString, req.Role, companyID)
if err != nil {
h.mongoLoggerSvc.Error("Failed to get user by name or phone",
zap.Any("request", req),
zap.Int("status_code", fiber.StatusBadRequest),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return fiber.NewError(fiber.StatusBadRequest, "failed to get users"+err.Error())
}
var res []UserProfileRes = make([]UserProfileRes, 0, len(users))
for _, user := range users {
lastLogin, err := h.authSvc.GetLastLogin(c.Context(), user.ID)
if err != nil {
if err != authentication.ErrRefreshTokenNotFound {
h.mongoLoggerSvc.Error("Failed to get user last login",
zap.Any("userID", user.ID),
zap.Int("status_code", fiber.StatusInternalServerError),
zap.Error(err),
zap.Time("timestamp", time.Now()),
)
return fiber.NewError(fiber.StatusInternalServerError, "Failed to retrieve user last login"+err.Error())
}
lastLogin = &user.CreatedAt
}
res = append(res, UserProfileRes{
ID: user.ID,
FirstName: user.FirstName,
LastName: user.LastName,
Email: user.Email,
PhoneNumber: user.PhoneNumber,
Role: user.Role,
EmailVerified: user.EmailVerified,
PhoneVerified: user.PhoneVerified,
CreatedAt: user.CreatedAt,
UpdatedAt: user.UpdatedAt,
SuspendedAt: user.SuspendedAt,
Suspended: user.Suspended,
LastLogin: *lastLogin,
})
}
return response.WriteJSON(c, fiber.StatusOK, "Search Successful", res, nil)
}
// GetUserByID godoc
// @Summary Get user by id
// @Description Get a single user by id

View File

@ -61,7 +61,7 @@ func (a *App) initAppRoutes() {
a.fiber.Get("/", func(c *fiber.Ctx) error {
return c.JSON(fiber.Map{
"message": "Welcome to the FortuneBet API",
"version": "1.0.dev15",
"version": "1.0.dev16",
})
})
@ -186,8 +186,8 @@ func (a *App) initAppRoutes() {
groupV1.Get("/user/single/:id", a.authMiddleware, h.GetUserByID)
groupV1.Post("/user/suspend", a.authMiddleware, h.UpdateUserSuspend)
groupV1.Delete("/user/delete/:id", a.authMiddleware, h.DeleteUser)
groupV1.Post("/user/search", a.authMiddleware, h.SearchUserByNameOrPhone)
tenant.Get("/user/wallet", a.authMiddleware, h.GetCustomerWallet)
tenant.Post("/user/search", a.authMiddleware, h.SearchUserByNameOrPhone)
// Referral Routes
tenant.Post("/referral/create", a.authMiddleware, h.CreateReferralCode)
@ -264,7 +264,7 @@ func (a *App) initAppRoutes() {
groupV1.Delete("/events/:id", a.authMiddleware, a.SuperAdminOnly, h.SetEventStatusToRemoved)
groupV1.Patch("/events/:id/is_monitored", a.authMiddleware, a.SuperAdminOnly, h.SetEventIsMonitored)
tenant.Get("/events", h.GetTenantUpcomingEvents)
tenant.Get("/upcoming-events", h.GetTenantUpcomingEvents)
tenant.Get("/events/:id", h.GetTenantEventByID)
tenant.Get("/top-leagues", h.GetTopLeagues)
tenant.Put("/events/:id/settings", h.UpdateEventSettings)
@ -419,7 +419,7 @@ func (a *App) initAppRoutes() {
// Notification Routes
groupV1.Get("/ws/connect", a.WebsocketAuthMiddleware, h.ConnectSocket)
groupV1.Get("/notifications", a.authMiddleware, h.GetNotifications)
groupV1.Get("/notifications", a.authMiddleware, h.GetUserNotification)
groupV1.Get("/notifications/all", a.authMiddleware, h.GetAllNotifications)
groupV1.Post("/notifications/mark-as-read", a.authMiddleware, h.MarkNotificationAsRead)
groupV1.Get("/notifications/unread", a.authMiddleware, h.CountUnreadNotifications)

View File

@ -165,6 +165,7 @@ func (h *NotificationHub) BroadcastWalletUpdate(userID int64, event event.Wallet
payload := map[string]interface{}{
"type": event.EventType,
"wallet_id": event.WalletID,
"wallet_type": event.WalletType,
"user_id": event.UserID,
"balance": event.Balance,
"trigger": event.Trigger,

View File

@ -79,7 +79,7 @@ logs:
@mkdir -p logs
db-up: | logs
@mkdir -p logs
@docker compose up -d postgres migrate mongo redis
@docker compose up -d postgres migrate mongo redis kafka
@docker logs fortunebet-backend-postgres-1 > logs/postgres.log 2>&1 &
.PHONY: db-down
db-down: