diff --git a/cmd/main.go b/cmd/main.go index 7899d54..2e0c742 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -2,11 +2,15 @@ package main import ( "fmt" + "log" "log/slog" "os" "github.com/SamuelTariku/FortuneBet-Backend/internal/config" + customlogger "github.com/SamuelTariku/FortuneBet-Backend/internal/logger" "github.com/SamuelTariku/FortuneBet-Backend/internal/repository" + notificationservice "github.com/SamuelTariku/FortuneBet-Backend/internal/services/notfication" + httpserver "github.com/SamuelTariku/FortuneBet-Backend/internal/web_server" "github.com/joho/godotenv" ) @@ -21,11 +25,21 @@ func main() { slog.Error(err.Error()) os.Exit(1) } + db, _, err := repository.OpenDB(cfg.DbUrl) if err != nil { fmt.Print(err) os.Exit(1) } + + logger := customlogger.NewLogger("development", slog.LevelDebug, "1.0") + store := repository.NewStore(db) - fmt.Println(store) + notificationRepo := repository.NewNotificationRepository(store) + notificationSvc := notificationservice.New(notificationRepo, logger) + + app := httpserver.NewApp(cfg.Port, logger, notificationSvc) + if err := app.Run(); err != nil { + log.Fatal("Failed to start server with error: ", err) + } } diff --git a/db/query/notification.sql b/db/query/notification.sql new file mode 100644 index 0000000..170f46b --- /dev/null +++ b/db/query/notification.sql @@ -0,0 +1,18 @@ +-- name: CreateNotification :one +INSERT INTO notifications ( + id, recipient_id, type, level, error_severity, reciever, is_read, delivery_status, delivery_channel, payload, priority, timestamp, metadata +) VALUES ( + $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13 +) RETURNING *; + +-- name: GetNotification :one +SELECT * FROM notifications WHERE id = $1 LIMIT 1; + +-- name: ListNotifications :many +SELECT * FROM notifications WHERE recipient_id = $1 ORDER BY timestamp DESC LIMIT $2 OFFSET $3; + +-- name: UpdateNotificationStatus :one +UPDATE notifications SET delivery_status = $2, is_read = $3, metadata = $4 WHERE id = $1 RETURNING *; + +-- name: ListFailedNotifications :many +SELECT * FROM notifications WHERE delivery_status = 'failed' AND timestamp < NOW() - INTERVAL '1 hour' ORDER BY timestamp ASC LIMIT $1; diff --git a/gen/db/db.go b/gen/db/db.go index fe4ae38..136f20a 100644 --- a/gen/db/db.go +++ b/gen/db/db.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.26.0 +// sqlc v1.28.0 package dbgen diff --git a/gen/db/models.go b/gen/db/models.go index 307326d..176b692 100644 --- a/gen/db/models.go +++ b/gen/db/models.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.26.0 +// sqlc v1.28.0 package dbgen @@ -8,6 +8,23 @@ import ( "github.com/jackc/pgx/v5/pgtype" ) +type Notification struct { + ID string + RecipientID string + Type string + Level string + ErrorSeverity pgtype.Text + Reciever string + IsRead bool + DeliveryStatus string + DeliveryChannel pgtype.Text + Payload []byte + Priority pgtype.Int4 + Version int32 + Timestamp pgtype.Timestamptz + Metadata []byte +} + type User struct { ID int64 FirstName string diff --git a/gen/db/notification.sql.go b/gen/db/notification.sql.go new file mode 100644 index 0000000..0df4dd1 --- /dev/null +++ b/gen/db/notification.sql.go @@ -0,0 +1,220 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.28.0 +// source: notification.sql + +package dbgen + +import ( + "context" + + "github.com/jackc/pgx/v5/pgtype" +) + +const CreateNotification = `-- name: CreateNotification :one +INSERT INTO notifications ( + id, recipient_id, type, level, error_severity, reciever, is_read, delivery_status, delivery_channel, payload, priority, timestamp, metadata +) VALUES ( + $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13 +) RETURNING id, recipient_id, type, level, error_severity, reciever, is_read, delivery_status, delivery_channel, payload, priority, version, timestamp, metadata +` + +type CreateNotificationParams struct { + ID string + RecipientID string + Type string + Level string + ErrorSeverity pgtype.Text + Reciever string + IsRead bool + DeliveryStatus string + DeliveryChannel pgtype.Text + Payload []byte + Priority pgtype.Int4 + Timestamp pgtype.Timestamptz + Metadata []byte +} + +func (q *Queries) CreateNotification(ctx context.Context, arg CreateNotificationParams) (Notification, error) { + row := q.db.QueryRow(ctx, CreateNotification, + arg.ID, + arg.RecipientID, + arg.Type, + arg.Level, + arg.ErrorSeverity, + arg.Reciever, + arg.IsRead, + arg.DeliveryStatus, + arg.DeliveryChannel, + arg.Payload, + arg.Priority, + arg.Timestamp, + arg.Metadata, + ) + var i Notification + err := row.Scan( + &i.ID, + &i.RecipientID, + &i.Type, + &i.Level, + &i.ErrorSeverity, + &i.Reciever, + &i.IsRead, + &i.DeliveryStatus, + &i.DeliveryChannel, + &i.Payload, + &i.Priority, + &i.Version, + &i.Timestamp, + &i.Metadata, + ) + return i, err +} + +const GetNotification = `-- name: GetNotification :one +SELECT id, recipient_id, type, level, error_severity, reciever, is_read, delivery_status, delivery_channel, payload, priority, version, timestamp, metadata FROM notifications WHERE id = $1 LIMIT 1 +` + +func (q *Queries) GetNotification(ctx context.Context, id string) (Notification, error) { + row := q.db.QueryRow(ctx, GetNotification, id) + var i Notification + err := row.Scan( + &i.ID, + &i.RecipientID, + &i.Type, + &i.Level, + &i.ErrorSeverity, + &i.Reciever, + &i.IsRead, + &i.DeliveryStatus, + &i.DeliveryChannel, + &i.Payload, + &i.Priority, + &i.Version, + &i.Timestamp, + &i.Metadata, + ) + return i, err +} + +const ListFailedNotifications = `-- name: ListFailedNotifications :many +SELECT id, recipient_id, type, level, error_severity, reciever, is_read, delivery_status, delivery_channel, payload, priority, version, timestamp, metadata FROM notifications WHERE delivery_status = 'failed' AND timestamp < NOW() - INTERVAL '1 hour' ORDER BY timestamp ASC LIMIT $1 +` + +func (q *Queries) ListFailedNotifications(ctx context.Context, limit int32) ([]Notification, error) { + rows, err := q.db.Query(ctx, ListFailedNotifications, limit) + if err != nil { + return nil, err + } + defer rows.Close() + var items []Notification + for rows.Next() { + var i Notification + if err := rows.Scan( + &i.ID, + &i.RecipientID, + &i.Type, + &i.Level, + &i.ErrorSeverity, + &i.Reciever, + &i.IsRead, + &i.DeliveryStatus, + &i.DeliveryChannel, + &i.Payload, + &i.Priority, + &i.Version, + &i.Timestamp, + &i.Metadata, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const ListNotifications = `-- name: ListNotifications :many +SELECT id, recipient_id, type, level, error_severity, reciever, is_read, delivery_status, delivery_channel, payload, priority, version, timestamp, metadata FROM notifications WHERE recipient_id = $1 ORDER BY timestamp DESC LIMIT $2 OFFSET $3 +` + +type ListNotificationsParams struct { + RecipientID string + Limit int32 + Offset int32 +} + +func (q *Queries) ListNotifications(ctx context.Context, arg ListNotificationsParams) ([]Notification, error) { + rows, err := q.db.Query(ctx, ListNotifications, arg.RecipientID, arg.Limit, arg.Offset) + if err != nil { + return nil, err + } + defer rows.Close() + var items []Notification + for rows.Next() { + var i Notification + if err := rows.Scan( + &i.ID, + &i.RecipientID, + &i.Type, + &i.Level, + &i.ErrorSeverity, + &i.Reciever, + &i.IsRead, + &i.DeliveryStatus, + &i.DeliveryChannel, + &i.Payload, + &i.Priority, + &i.Version, + &i.Timestamp, + &i.Metadata, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const UpdateNotificationStatus = `-- name: UpdateNotificationStatus :one +UPDATE notifications SET delivery_status = $2, is_read = $3, metadata = $4 WHERE id = $1 RETURNING id, recipient_id, type, level, error_severity, reciever, is_read, delivery_status, delivery_channel, payload, priority, version, timestamp, metadata +` + +type UpdateNotificationStatusParams struct { + ID string + DeliveryStatus string + IsRead bool + Metadata []byte +} + +func (q *Queries) UpdateNotificationStatus(ctx context.Context, arg UpdateNotificationStatusParams) (Notification, error) { + row := q.db.QueryRow(ctx, UpdateNotificationStatus, + arg.ID, + arg.DeliveryStatus, + arg.IsRead, + arg.Metadata, + ) + var i Notification + err := row.Scan( + &i.ID, + &i.RecipientID, + &i.Type, + &i.Level, + &i.ErrorSeverity, + &i.Reciever, + &i.IsRead, + &i.DeliveryStatus, + &i.DeliveryChannel, + &i.Payload, + &i.Priority, + &i.Version, + &i.Timestamp, + &i.Metadata, + ) + return i, err +} diff --git a/gen/db/user.sql.go b/gen/db/user.sql.go index 1608a32..c5b0e90 100644 --- a/gen/db/user.sql.go +++ b/gen/db/user.sql.go @@ -1,6 +1,6 @@ // Code generated by sqlc. DO NOT EDIT. // versions: -// sqlc v1.26.0 +// sqlc v1.28.0 // source: user.sql package dbgen diff --git a/go.mod b/go.mod index 18620e4..9f9763c 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,8 @@ go 1.24.1 require ( github.com/bytedance/sonic v1.13.2 github.com/gofiber/fiber/v2 v2.52.6 + github.com/gofiber/websocket/v2 v2.2.1 + github.com/google/uuid v1.6.0 github.com/jackc/pgx/v5 v5.7.4 github.com/joho/godotenv v1.5.1 ) @@ -13,7 +15,7 @@ require ( github.com/andybalholm/brotli v1.1.0 // indirect github.com/bytedance/sonic/loader v0.2.4 // indirect github.com/cloudwego/base64x v0.1.5 // indirect - github.com/google/uuid v1.6.0 // indirect + github.com/fasthttp/websocket v1.5.3 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect @@ -23,6 +25,7 @@ require ( github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-runewidth v0.0.16 // indirect github.com/rivo/uniseg v0.2.0 // indirect + github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee // indirect github.com/stretchr/testify v1.8.4 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect diff --git a/go.sum b/go.sum index 9d6be49..0801c72 100644 --- a/go.sum +++ b/go.sum @@ -11,8 +11,12 @@ github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fasthttp/websocket v1.5.3 h1:TPpQuLwJYfd4LJPXvHDYPMFWbLjsT91n3GpWtCQtdek= +github.com/fasthttp/websocket v1.5.3/go.mod h1:46gg/UBmTU1kUaTcwQXpUxtRwG2PvIZYeA8oL6vF3Fs= github.com/gofiber/fiber/v2 v2.52.6 h1:Rfp+ILPiYSvvVuIPvxrBns+HJp8qGLDnLJawAu27XVI= github.com/gofiber/fiber/v2 v2.52.6/go.mod h1:YEcBbO/FB+5M1IZNBP9FO3J9281zgPAreiI1oqg8nDw= +github.com/gofiber/websocket/v2 v2.2.1 h1:C9cjxvloojayOp9AovmpQrk8VqvVnT8Oao3+IUygH7w= +github.com/gofiber/websocket/v2 v2.2.1/go.mod h1:Ao/+nyNnX5u/hIFPuHl28a+NIkrqK7PRimyKaj4JxVU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= @@ -41,6 +45,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee h1:8Iv5m6xEo1NR1AvpV+7XmhI4r39LGNzwUL4YpMuL5vk= +github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee/go.mod h1:qwtSXrKuJh/zsFQ12yEE89xfCrGKK63Rr7ctU/uCo4g= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= diff --git a/internal/pkgs/helpers/helpers.go b/internal/pkgs/helpers/helpers.go new file mode 100644 index 0000000..8c6645e --- /dev/null +++ b/internal/pkgs/helpers/helpers.go @@ -0,0 +1,7 @@ +package helpers + +import "github.com/google/uuid" + +func GenerateID() string { + return uuid.New().String() +} diff --git a/internal/repository/notification.go b/internal/repository/notification.go new file mode 100644 index 0000000..5c62849 --- /dev/null +++ b/internal/repository/notification.go @@ -0,0 +1,174 @@ +package repository + +import ( + "context" + "encoding/json" + + dbgen "github.com/SamuelTariku/FortuneBet-Backend/gen/db" + "github.com/SamuelTariku/FortuneBet-Backend/internal/domain" + "github.com/jackc/pgx/v5/pgtype" +) + +type NotificationRepository interface { + CreateNotification(ctx context.Context, notification *domain.Notification) (*domain.Notification, error) + UpdateNotificationStatus(ctx context.Context, id, status string, isRead bool, metadata []byte) (*domain.Notification, error) + ListNotifications(ctx context.Context, recipientID string, limit, offset int) ([]domain.Notification, error) + ListFailedNotifications(ctx context.Context, limit int) ([]domain.Notification, error) +} + +type Repository struct { + store *Store +} + +func NewNotificationRepository(store *Store) NotificationRepository { + return &Repository{store: store} +} + +func (r *Repository) CreateNotification(ctx context.Context, notification *domain.Notification) (*domain.Notification, error) { + var errorSeverity pgtype.Text + if notification.ErrorSeverity != nil { + errorSeverity.String = string(*notification.ErrorSeverity) + errorSeverity.Valid = true + } + + var deliveryChannel pgtype.Text + if notification.DeliveryChannel != "" { + deliveryChannel.String = string(notification.DeliveryChannel) + deliveryChannel.Valid = true + } + + var priority pgtype.Int4 + if notification.Priority != 0 { + priority.Int32 = int32(notification.Priority) + priority.Valid = true + } + + params := dbgen.CreateNotificationParams{ + ID: notification.ID, + RecipientID: notification.RecipientID, + Type: string(notification.Type), + Level: string(notification.Level), + ErrorSeverity: errorSeverity, + Reciever: string(notification.Reciever), + IsRead: notification.IsRead, + DeliveryStatus: string(notification.DeliveryStatus), + DeliveryChannel: deliveryChannel, + Payload: marshalPayload(notification.Payload), + Priority: priority, + Timestamp: pgtype.Timestamptz{Time: notification.Timestamp, Valid: true}, + Metadata: notification.Metadata, + } + + dbNotification, err := r.store.queries.CreateNotification(ctx, params) + if err != nil { + return nil, err + } + + return r.mapDBToDomain(&dbNotification), nil +} + +func (r *Repository) UpdateNotificationStatus(ctx context.Context, id, status string, isRead bool, metadata []byte) (*domain.Notification, error) { + params := dbgen.UpdateNotificationStatusParams{ + ID: id, + DeliveryStatus: status, + IsRead: isRead, + Metadata: metadata, + } + + dbNotification, err := r.store.queries.UpdateNotificationStatus(ctx, params) + if err != nil { + return nil, err + } + + return r.mapDBToDomain(&dbNotification), nil +} + +func (r *Repository) ListNotifications(ctx context.Context, recipientID string, limit, offset int) ([]domain.Notification, error) { + params := dbgen.ListNotificationsParams{ + RecipientID: recipientID, + Limit: int32(limit), + Offset: int32(offset), + } + + dbNotifications, err := r.store.queries.ListNotifications(ctx, params) + if err != nil { + return nil, err + } + + var result []domain.Notification + for _, dbNotif := range dbNotifications { + domainNotif := r.mapDBToDomain(&dbNotif) + result = append(result, *domainNotif) + } + + return result, nil +} + +func (r *Repository) ListFailedNotifications(ctx context.Context, limit int) ([]domain.Notification, error) { + dbNotifications, err := r.store.queries.ListFailedNotifications(ctx, int32(limit)) + if err != nil { + return nil, err + } + + var result []domain.Notification + for _, dbNotif := range dbNotifications { + domainNotif := r.mapDBToDomain(&dbNotif) + result = append(result, *domainNotif) + } + + return result, nil +} + +func (r *Repository) mapDBToDomain(dbNotif *dbgen.Notification) *domain.Notification { + var errorSeverity *domain.NotificationErrorSeverity + if dbNotif.ErrorSeverity.Valid { + s := domain.NotificationErrorSeverity(dbNotif.ErrorSeverity.String) + errorSeverity = &s + } + + var deliveryChannel domain.DeliveryChannel + if dbNotif.DeliveryChannel.Valid { + deliveryChannel = domain.DeliveryChannel(dbNotif.DeliveryChannel.String) + } else { + deliveryChannel = "" + } + + var priority int + if dbNotif.Priority.Valid { + priority = int(dbNotif.Priority.Int32) + } + + payload, err := unmarshalPayload(dbNotif.Payload) + if err != nil { + payload = domain.NotificationPayload{} + } + + return &domain.Notification{ + ID: dbNotif.ID, + RecipientID: dbNotif.RecipientID, + Type: domain.NotificationType(dbNotif.Type), + Level: domain.NotificationLevel(dbNotif.Level), + ErrorSeverity: errorSeverity, + Reciever: domain.NotificationRecieverSide(dbNotif.Reciever), + IsRead: dbNotif.IsRead, + DeliveryStatus: domain.NotificationDeliveryStatus(dbNotif.DeliveryStatus), + DeliveryChannel: deliveryChannel, + Payload: payload, + Priority: priority, + Timestamp: dbNotif.Timestamp.Time, + Metadata: dbNotif.Metadata, + } +} + +func marshalPayload(payload domain.NotificationPayload) []byte { + data, _ := json.Marshal(payload) + return data +} + +func unmarshalPayload(data []byte) (domain.NotificationPayload, error) { + var payload domain.NotificationPayload + if err := json.Unmarshal(data, &payload); err != nil { + return domain.NotificationPayload{}, err + } + return payload, nil +} diff --git a/internal/services/notfication/port.go b/internal/services/notfication/port.go index af928fc..4bca486 100644 --- a/internal/services/notfication/port.go +++ b/internal/services/notfication/port.go @@ -1,4 +1,18 @@ -package notficationservice +package notificationservice + +import ( + "context" + + "github.com/SamuelTariku/FortuneBet-Backend/internal/domain" + "github.com/gofiber/websocket/v2" +) type NotificationStore interface { + SendNotification(ctx context.Context, notification *domain.Notification) error + MarkAsRead(ctx context.Context, notificationID, recipientID string) error + ListNotifications(ctx context.Context, recipientID string, limit, offset int) ([]domain.Notification, error) + ConnectWebSocket(ctx context.Context, recipientID string, c *websocket.Conn) error + DisconnectWebSocket(recipientID string) + SendSMS(ctx context.Context, recipientID, message string) error + SendEmail(ctx context.Context, recipientID, subject, message string) error } diff --git a/internal/services/notfication/service.go b/internal/services/notfication/service.go index 634cbac..8b6f696 100644 --- a/internal/services/notfication/service.go +++ b/internal/services/notfication/service.go @@ -1,11 +1,196 @@ -package notficationservice +package notificationservice + +import ( + "context" + "log/slog" + "sync" + "time" + + "github.com/SamuelTariku/FortuneBet-Backend/internal/domain" + "github.com/SamuelTariku/FortuneBet-Backend/internal/pkgs/helpers" + "github.com/SamuelTariku/FortuneBet-Backend/internal/repository" + "github.com/gofiber/websocket/v2" +) type Service struct { - store NotificationStore + repo repository.NotificationRepository + logger *slog.Logger + connections sync.Map + notificationCh chan *domain.Notification + stopCh chan struct{} } -func New(store NotificationStore) *Service { - return &Service{ - store: store, +func New(repo repository.NotificationRepository, logger *slog.Logger) NotificationStore { + svc := &Service{ + repo: repo, + logger: logger, + connections: sync.Map{}, + notificationCh: make(chan *domain.Notification, 1000), + stopCh: make(chan struct{}), + } + + go svc.startWorker() + go svc.startRetryWorker() + + return svc +} + +func (s *Service) addConnection(ctx context.Context, recipientID string, c *websocket.Conn) { + if c == nil { + s.logger.Warn("Attempted to add nil WebSocket connection", "recipientID", recipientID) + return + } + + s.connections.Store(recipientID, c) + s.logger.Info("Added WebSocket connection", "recipientID", recipientID) +} + +func (s *Service) SendNotification(ctx context.Context, notification *domain.Notification) error { + notification.ID = helpers.GenerateID() + notification.Timestamp = time.Now() + notification.DeliveryStatus = domain.DeliveryStatusPending + + created, err := s.repo.CreateNotification(ctx, notification) + if err != nil { + return err + } + + notification = created + + select { + case s.notificationCh <- notification: + default: + s.logger.Error("Notification channel full, dropping notification", "id", notification.ID) + } + + return nil +} + +func (s *Service) MarkAsRead(ctx context.Context, notificationID, recipientID string) error { + _, err := s.repo.UpdateNotificationStatus(ctx, notificationID, string(domain.DeliveryStatusSent), true, nil) + if err != nil { + return err + } + return nil +} + +func (s *Service) ListNotifications(ctx context.Context, recipientID string, limit, offset int) ([]domain.Notification, error) { + return s.repo.ListNotifications(ctx, recipientID, limit, offset) +} + +func (s *Service) ConnectWebSocket(ctx context.Context, recipientID string, c *websocket.Conn) error { + s.addConnection(ctx, recipientID, c) + defer func() { + s.DisconnectWebSocket(recipientID) + }() + + for { + _, _, err := c.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + s.logger.Error("WebSocket error", "recipientID", recipientID, "error", err) + } + return nil + } + } +} + +func (s *Service) DisconnectWebSocket(recipientID string) { + s.connections.Delete(recipientID) + if conn, loaded := s.connections.LoadAndDelete(recipientID); loaded { + conn.(*websocket.Conn).Close() + s.logger.Info("Disconnected WebSocket", "recipientID", recipientID) + } +} + +func (s *Service) SendSMS(ctx context.Context, recipientID, message string) error { + s.logger.Info("SMS notification requested", "recipientID", recipientID, "message", message) + return nil +} + +func (s *Service) SendEmail(ctx context.Context, recipientID, subject, message string) error { + s.logger.Info("Email notification requested", "recipientID", recipientID, "subject", subject) + return nil +} + +func (s *Service) startWorker() { + for { + select { + case notification := <-s.notificationCh: + s.handleNotification(notification) + case <-s.stopCh: + return + } + } +} + +func (s *Service) handleNotification(notification *domain.Notification) { + ctx := context.Background() + + if conn, ok := s.connections.Load(notification.RecipientID); ok { + data, err := notification.ToJSON() + if err != nil { + s.logger.Error("Failed to serialize notification", "id", notification.ID, "error", err) + return + } + if err := conn.(*websocket.Conn).WriteMessage(websocket.TextMessage, data); err != nil { + s.logger.Error("Failed to send WebSocket message", "id", notification.ID, "error", err) + notification.DeliveryStatus = domain.DeliveryStatusFailed + } else { + notification.DeliveryStatus = domain.DeliveryStatusSent + } + } else { + s.logger.Warn("No WebSocket connection for recipient", "recipientID", notification.RecipientID) + notification.DeliveryStatus = domain.DeliveryStatusFailed + } + + if _, err := s.repo.UpdateNotificationStatus(ctx, notification.ID, string(notification.DeliveryStatus), notification.IsRead, notification.Metadata); err != nil { + s.logger.Error("Failed to update notification status", "id", notification.ID, "error", err) + } +} + +func (s *Service) startRetryWorker() { + ticker := time.NewTicker(1 * time.Minute) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + s.retryFailedNotifications() + case <-s.stopCh: + return + } + } +} + +func (s *Service) retryFailedNotifications() { + ctx := context.Background() + failedNotifications, err := s.repo.ListFailedNotifications(ctx, 100) + if err != nil { + s.logger.Error("Failed to list failed notifications", "error", err) + return + } + + for _, n := range failedNotifications { + notification := &n + go func(notification *domain.Notification) { + for attempt := 0; attempt < 3; attempt++ { + time.Sleep(time.Duration(attempt) * time.Second) + if conn, ok := s.connections.Load(notification.RecipientID); ok { + data, err := notification.ToJSON() + if err != nil { + continue + } + if err := conn.(*websocket.Conn).WriteMessage(websocket.TextMessage, data); err == nil { + notification.DeliveryStatus = domain.DeliveryStatusSent + if _, err := s.repo.UpdateNotificationStatus(ctx, notification.ID, string(notification.DeliveryStatus), notification.IsRead, notification.Metadata); err != nil { + s.logger.Error("Failed to update after retry", "id", notification.ID, "error", err) + } + return + } + } + } + s.logger.Error("Max retries reached for notification", "id", notification.ID) + }(notification) } } diff --git a/internal/web_server/app.go b/internal/web_server/app.go index b7da149..710ec37 100644 --- a/internal/web_server/app.go +++ b/internal/web_server/app.go @@ -2,17 +2,21 @@ package httpserver import ( "fmt" + "log/slog" + notificationservice "github.com/SamuelTariku/FortuneBet-Backend/internal/services/notfication" "github.com/bytedance/sonic" "github.com/gofiber/fiber/v2" ) type App struct { - fiber *fiber.App - port int + fiber *fiber.App + NotidicationStore notificationservice.NotificationStore + port int + Logger *slog.Logger } -func NewApp(port int) *App { +func NewApp(port int, logger *slog.Logger, notidicationStore notificationservice.NotificationStore) *App { app := fiber.New(fiber.Config{ CaseSensitive: true, DisableHeaderNormalizing: true, @@ -20,8 +24,10 @@ func NewApp(port int) *App { JSONDecoder: sonic.Unmarshal, }) s := &App{ - fiber: app, - port: port, + fiber: app, + port: port, + NotidicationStore: notidicationStore, + Logger: logger, } s.initAppRoutes() diff --git a/internal/web_server/app_routes.go b/internal/web_server/app_routes.go index 8388c84..da6618c 100644 --- a/internal/web_server/app_routes.go +++ b/internal/web_server/app_routes.go @@ -1,5 +1,35 @@ package httpserver +import ( + "context" + + "github.com/gofiber/fiber/v2" + "github.com/gofiber/websocket/v2" +) + func (a *App) initAppRoutes() { // a.fiber.Group("/users", users.CreateAccount(a.userAPI)) + + a.fiber.Get("/ws/:recipientID", func(c *fiber.Ctx) error { + if websocket.IsWebSocketUpgrade(c) { + c.Locals("allowed", true) + return c.Next() + } + return fiber.ErrUpgradeRequired + }, websocket.New(func(c *websocket.Conn) { + recipientID := c.Params("recipientID") + a.NotidicationStore.ConnectWebSocket(context.Background(), recipientID, c) + + defer a.NotidicationStore.DisconnectWebSocket(recipientID) + + for { + _, _, err := c.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + a.Logger.Error("WebSocket error", "recipientID", recipientID, "error", err) + } + return + } + } + })) } diff --git a/makefile b/makefile index d22975e..f8b2189 100644 --- a/makefile +++ b/makefile @@ -30,3 +30,7 @@ migrations/up: .PHONY: swagger swagger: @swag init -g cmd/main.go + +.PHONY: sqlc-gen +sqlc-gen: + @sqlc generate