fix: refactored the hadler for notification and added mark as read

This commit is contained in:
dawitel 2025-04-02 21:50:52 +03:00
parent 5cc125d450
commit b690a1c933
11 changed files with 178 additions and 54 deletions

View File

@ -7,38 +7,105 @@
│ ├── migrations │ ├── migrations
│ │ ├── 000001_fortune.down.sql │ │ ├── 000001_fortune.down.sql
│ │ ├── 000001_fortune.up.sql │ │ ├── 000001_fortune.up.sql
│ │ ├── 000002_notification.down.sql
│ │ ├── 000002_notification.up.sql
│ └── query │ └── query
│ ├── auth.sql
│ ├── bet.sql
│ ├── notification.sql
│ ├── otp.sql
│ ├── ticket.sql
│ ├── user.sql │ ├── user.sql
├── docs
│ ├── docs.go
│ ├── swagger.json
│ ├── swagger.yaml
├── gen ├── gen
│ └── db │ └── db
│ ├── auth.sql.go
│ ├── bet.sql.go
│ ├── db.go │ ├── db.go
│ ├── models.go │ ├── models.go
│ ├── notification.sql.go
│ ├── otp.sql.go
│ ├── ticket.sql.go
│ ├── user.sql.go │ ├── user.sql.go
└── internal └── internal
├── config ├── config
│ ├── config.go │ ├── config.go
├── domain ├── domain
│ ├── auth.go │ ├── auth.go
│ ├── bet.go
│ ├── branch.go
│ ├── common.go
│ ├── event.go
│ ├── notification.go │ ├── notification.go
│ ├── otp.go
│ ├── role.go
│ ├── ticket.go
│ ├── user.go │ ├── user.go
├── logger ├── logger
│ ├── logger.go │ ├── logger.go
├── mocks
│ ├── mock_email
│ │ ├── email.go
│ └── mock_sms
│ ├── sms.go
├── pkgs
│ └── helpers
│ ├── helpers.go
├── repository ├── repository
│ ├── auth.go
│ ├── bet.go
│ ├── notification.go
│ ├── otp.go
│ ├── store.go │ ├── store.go
│ ├── ticket.go
│ ├── user.go │ ├── user.go
├── services ├── services
│ ├── authentication
│ │ ├── impl.go
│ │ ├── port.go
│ │ ├── service.go
│ ├── bet
│ │ ├── port.go
│ │ ├── service.go
│ ├── notfication │ ├── notfication
│ │ ├── port.go │ │ ├── port.go
│ │ ├── service.go │ │ ├── service.go
│ ├── sportsbook
│ │ ├── events.go
│ │ ├── odds.go
│ │ ├── service.go
│ ├── ticket
│ │ ├── port.go
│ │ ├── service.go
│ └── user │ └── user
│ ├── common.go
│ ├── port.go │ ├── port.go
│ ├── register.go
│ ├── reset.go
│ ├── service.go │ ├── service.go
│ ├── user.go
└── web_server └── web_server
├── handlers
│ ├── auth_handler.go
│ ├── bet_handler.go
│ ├── notification_handler.go
│ ├── ticket_handler.go
│ ├── user.go
├── jwt
│ ├── jwt.go
│ ├── jwt_test.go
├── response
│ ├── res.go
└── validator └── validator
├── validatord.go ├── validatord.go
├── app.go ├── app.go
├── app_routes.go ├── middleware.go
├── routes.go
├── .air.toml ├── .air.toml
├── .env
├── .gitignore ├── .gitignore
├── README.md ├── README.md
├── compose.db.yaml ├── compose.db.yaml

View File

@ -1,6 +1,6 @@
CREATE TABLE IF NOT EXISTS notifications ( CREATE TABLE IF NOT EXISTS notifications (
id VARCHAR(255) PRIMARY KEY, id VARCHAR(255) NOT NULL PRIMARY KEY,
recipient_id VARCHAR(255) NOT NULL, recipient_id BIGSERIAL NOT NULL,
type TEXT NOT NULL CHECK ( type TEXT NOT NULL CHECK (
type IN ( type IN (
'cash_out_success', 'cash_out_success',

View File

@ -25,7 +25,7 @@ type Bet struct {
type Notification struct { type Notification struct {
ID string ID string
RecipientID string RecipientID int64
Type string Type string
Level string Level string
ErrorSeverity pgtype.Text ErrorSeverity pgtype.Text

View File

@ -21,7 +21,7 @@ INSERT INTO notifications (
type CreateNotificationParams struct { type CreateNotificationParams struct {
ID string ID string
RecipientID string RecipientID int64
Type string Type string
Level string Level string
ErrorSeverity pgtype.Text ErrorSeverity pgtype.Text
@ -141,7 +141,7 @@ SELECT id, recipient_id, type, level, error_severity, reciever, is_read, deliver
` `
type ListNotificationsParams struct { type ListNotificationsParams struct {
RecipientID string RecipientID int64
Limit int32 Limit int32
Offset int32 Offset int32
} }

View File

@ -58,7 +58,7 @@ type NotificationPayload struct {
type Notification struct { type Notification struct {
ID string `json:"id"` ID string `json:"id"`
RecipientID string `json:"recipient_id"` RecipientID int64 `json:"recipient_id"`
Type NotificationType `json:"type"` Type NotificationType `json:"type"`
Level NotificationLevel `json:"level"` Level NotificationLevel `json:"level"`
ErrorSeverity *NotificationErrorSeverity `json:"error_severity"` ErrorSeverity *NotificationErrorSeverity `json:"error_severity"`

View File

@ -12,7 +12,7 @@ import (
type NotificationRepository interface { type NotificationRepository interface {
CreateNotification(ctx context.Context, notification *domain.Notification) (*domain.Notification, error) CreateNotification(ctx context.Context, notification *domain.Notification) (*domain.Notification, error)
UpdateNotificationStatus(ctx context.Context, id, status string, isRead bool, metadata []byte) (*domain.Notification, error) UpdateNotificationStatus(ctx context.Context, id, status string, isRead bool, metadata []byte) (*domain.Notification, error)
ListNotifications(ctx context.Context, recipientID string, limit, offset int) ([]domain.Notification, error) ListNotifications(ctx context.Context, recipientID int64, limit, offset int) ([]domain.Notification, error)
ListFailedNotifications(ctx context.Context, limit int) ([]domain.Notification, error) ListFailedNotifications(ctx context.Context, limit int) ([]domain.Notification, error)
} }
@ -83,7 +83,7 @@ func (r *Repository) UpdateNotificationStatus(ctx context.Context, id, status st
return r.mapDBToDomain(&dbNotification), nil return r.mapDBToDomain(&dbNotification), nil
} }
func (r *Repository) ListNotifications(ctx context.Context, recipientID string, limit, offset int) ([]domain.Notification, error) { func (r *Repository) ListNotifications(ctx context.Context, recipientID int64, limit, offset int) ([]domain.Notification, error) {
params := dbgen.ListNotificationsParams{ params := dbgen.ListNotificationsParams{
RecipientID: recipientID, RecipientID: recipientID,
Limit: int32(limit), Limit: int32(limit),

View File

@ -9,10 +9,10 @@ import (
type NotificationStore interface { type NotificationStore interface {
SendNotification(ctx context.Context, notification *domain.Notification) error SendNotification(ctx context.Context, notification *domain.Notification) error
MarkAsRead(ctx context.Context, notificationID, recipientID string) error MarkAsRead(ctx context.Context, notificationID string, recipientID int64) error
ListNotifications(ctx context.Context, recipientID string, limit, offset int) ([]domain.Notification, error) ListNotifications(ctx context.Context, recipientID int64, limit, offset int) ([]domain.Notification, error)
ConnectWebSocket(ctx context.Context, recipientID string, c *websocket.Conn) error ConnectWebSocket(ctx context.Context, recipientID int64, c *websocket.Conn) error
DisconnectWebSocket(recipientID string) DisconnectWebSocket(recipientID int64)
SendSMS(ctx context.Context, recipientID, message string) error SendSMS(ctx context.Context, recipientID int64, message string) error
SendEmail(ctx context.Context, recipientID, subject, message string) error SendEmail(ctx context.Context, recipientID int64, subject, message string) error
} }

View File

@ -35,7 +35,7 @@ func New(repo repository.NotificationRepository, logger *slog.Logger) Notificati
return svc return svc
} }
func (s *Service) addConnection(ctx context.Context, recipientID string, c *websocket.Conn) { func (s *Service) addConnection(ctx context.Context, recipientID int64, c *websocket.Conn) {
if c == nil { if c == nil {
s.logger.Warn("Attempted to add nil WebSocket connection", "recipientID", recipientID) s.logger.Warn("Attempted to add nil WebSocket connection", "recipientID", recipientID)
return return
@ -66,7 +66,7 @@ func (s *Service) SendNotification(ctx context.Context, notification *domain.Not
return nil return nil
} }
func (s *Service) MarkAsRead(ctx context.Context, notificationID, recipientID string) error { func (s *Service) MarkAsRead(ctx context.Context, notificationID string, recipientID int64) error {
_, err := s.repo.UpdateNotificationStatus(ctx, notificationID, string(domain.DeliveryStatusSent), true, nil) _, err := s.repo.UpdateNotificationStatus(ctx, notificationID, string(domain.DeliveryStatusSent), true, nil)
if err != nil { if err != nil {
return err return err
@ -74,28 +74,16 @@ func (s *Service) MarkAsRead(ctx context.Context, notificationID, recipientID st
return nil return nil
} }
func (s *Service) ListNotifications(ctx context.Context, recipientID string, limit, offset int) ([]domain.Notification, error) { func (s *Service) ListNotifications(ctx context.Context, recipientID int64, limit, offset int) ([]domain.Notification, error) {
return s.repo.ListNotifications(ctx, recipientID, limit, offset) return s.repo.ListNotifications(ctx, recipientID, limit, offset)
} }
func (s *Service) ConnectWebSocket(ctx context.Context, recipientID string, c *websocket.Conn) error { func (s *Service) ConnectWebSocket(ctx context.Context, recipientID int64, c *websocket.Conn) error {
s.addConnection(ctx, recipientID, c) s.addConnection(ctx, recipientID, c)
defer func() {
s.DisconnectWebSocket(recipientID)
}()
for {
_, _, err := c.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
s.logger.Error("WebSocket error", "recipientID", recipientID, "error", err)
}
return nil return nil
}
}
} }
func (s *Service) DisconnectWebSocket(recipientID string) { func (s *Service) DisconnectWebSocket(recipientID int64) {
s.connections.Delete(recipientID) s.connections.Delete(recipientID)
if conn, loaded := s.connections.LoadAndDelete(recipientID); loaded { if conn, loaded := s.connections.LoadAndDelete(recipientID); loaded {
conn.(*websocket.Conn).Close() conn.(*websocket.Conn).Close()
@ -103,12 +91,12 @@ func (s *Service) DisconnectWebSocket(recipientID string) {
} }
} }
func (s *Service) SendSMS(ctx context.Context, recipientID, message string) error { func (s *Service) SendSMS(ctx context.Context, recipientID int64, message string) error {
s.logger.Info("SMS notification requested", "recipientID", recipientID, "message", message) s.logger.Info("SMS notification requested", "recipientID", recipientID, "message", message)
return nil return nil
} }
func (s *Service) SendEmail(ctx context.Context, recipientID, subject, message string) error { func (s *Service) SendEmail(ctx context.Context, recipientID int64, subject, message string) error {
s.logger.Info("Email notification requested", "recipientID", recipientID, "subject", subject) s.logger.Info("Email notification requested", "recipientID", recipientID, "subject", subject)
return nil return nil
} }

View File

@ -0,0 +1,22 @@
package handlers
import (
"log/slog"
notificationservice "github.com/SamuelTariku/FortuneBet-Backend/internal/services/notfication"
customvalidator "github.com/SamuelTariku/FortuneBet-Backend/internal/web_server/validator"
)
type Handler struct {
logger *slog.Logger
notificationSvc notificationservice.NotificationStore
validator *customvalidator.CustomValidator
}
func New(logger *slog.Logger, notificationSvc notificationservice.NotificationStore, validator *customvalidator.CustomValidator) *Handler {
return &Handler{
logger: logger,
notificationSvc: notificationSvc,
validator: validator,
}
}

View File

@ -2,31 +2,75 @@ package handlers
import ( import (
"context" "context"
"log/slog"
notificationservice "github.com/SamuelTariku/FortuneBet-Backend/internal/services/notfication"
customvalidator "github.com/SamuelTariku/FortuneBet-Backend/internal/web_server/validator"
"github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2"
"github.com/gofiber/websocket/v2" "github.com/gofiber/websocket/v2"
) )
func ConnectSocket(logger slog.Logger, NotidicationSvc notificationservice.NotificationStore, v *customvalidator.CustomValidator) fiber.Handler { func (h *Handler) ConnectSocket(c *fiber.Ctx) error {
return func(c *fiber.Ctx) error {
if !websocket.IsWebSocketUpgrade(c) { if !websocket.IsWebSocketUpgrade(c) {
h.logger.Warn("WebSocket upgrade required")
return fiber.ErrUpgradeRequired return fiber.ErrUpgradeRequired
} }
userID, ok := c.Locals("userID").(int64)
if !ok || userID == 0 {
h.logger.Error("Invalid user ID in context")
return fiber.NewError(fiber.StatusUnauthorized, "invalid user identification")
}
c.Locals("allowed", true) c.Locals("allowed", true)
return websocket.New(func(conn *websocket.Conn) { return websocket.New(func(conn *websocket.Conn) {
// TODO: get the recipientID from the token ctx := context.Background()
recipientID := c.Params("recipientID") logger := h.logger.With("userID", userID, "remoteAddr", conn.RemoteAddr())
NotidicationSvc.ConnectWebSocket(context.Background(), recipientID, conn)
if err := h.notificationSvc.ConnectWebSocket(ctx, userID, conn); err != nil {
logger.Error("Failed to connect WebSocket", "error", err)
_ = conn.Close()
return
}
logger.Info("WebSocket connection established")
defer func() { defer func() {
NotidicationSvc.DisconnectWebSocket(recipientID) h.notificationSvc.DisconnectWebSocket(userID)
conn.Close() logger.Info("WebSocket connection closed")
_ = conn.Close()
}() }()
})(c)
for {
if _, _, err := conn.ReadMessage(); err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
logger.Warn("WebSocket unexpected close", "error", err)
} }
break
}
}
})(c)
}
func (h *Handler) MarkNotificationAsRead(c *fiber.Ctx) error {
type Request struct {
NotificationID string `json:"notification_id" validate:"required"`
}
var req Request
if err := c.BodyParser(&req); err != nil {
h.logger.Error("Failed to parse request body", "error", err)
return fiber.NewError(fiber.StatusBadRequest, "Invalid request body")
}
userID, ok := c.Locals("userID").(int64)
if !ok || userID == 0 {
h.logger.Error("Invalid user ID in context")
return fiber.NewError(fiber.StatusUnauthorized, "invalid user identification")
}
if err := h.notificationSvc.MarkAsRead(context.Background(), req.NotificationID, userID); err != nil {
h.logger.Error("Failed to mark notification as read", "notificationID", req.NotificationID, "error", err)
return fiber.NewError(fiber.StatusInternalServerError, "Failed to update notification status")
}
return c.Status(fiber.StatusOK).JSON(fiber.Map{"message": "Notification marked as read"})
} }

View File

@ -8,6 +8,8 @@ import (
) )
func (a *App) initAppRoutes() { func (a *App) initAppRoutes() {
handler := handlers.New(a.logger, a.NotidicationStore, a.validator)
a.fiber.Post("/auth/login", handlers.LoginCustomer(a.logger, a.authSvc, a.validator, a.JwtConfig)) a.fiber.Post("/auth/login", handlers.LoginCustomer(a.logger, a.authSvc, a.validator, a.JwtConfig))
a.fiber.Post("/auth/refresh", a.authMiddleware, handlers.RefreshToken(a.logger, a.authSvc, a.validator, a.JwtConfig)) a.fiber.Post("/auth/refresh", a.authMiddleware, handlers.RefreshToken(a.logger, a.authSvc, a.validator, a.JwtConfig))
a.fiber.Post("/auth/logout", a.authMiddleware, handlers.LogOutCustomer(a.logger, a.authSvc, a.validator)) a.fiber.Post("/auth/logout", a.authMiddleware, handlers.LogOutCustomer(a.logger, a.authSvc, a.validator))
@ -41,7 +43,8 @@ func (a *App) initAppRoutes() {
a.fiber.Patch("/bet/:id", handlers.UpdateCashOut(a.logger, a.betSvc, a.validator)) a.fiber.Patch("/bet/:id", handlers.UpdateCashOut(a.logger, a.betSvc, a.validator))
a.fiber.Delete("/bet/:id", handlers.DeleteBet(a.logger, a.betSvc, a.validator)) a.fiber.Delete("/bet/:id", handlers.DeleteBet(a.logger, a.betSvc, a.validator))
a.fiber.Get("/ws/:recipientID", handlers.ConnectSocket(*a.logger, a.NotidicationStore, a.validator)) a.fiber.Get("/notifications/ws/connect/:recipientID", handler.ConnectSocket)
a.fiber.Post("/notifications/mark-as-read", handler.MarkNotificationAsRead)
} }
///user/profile get ///user/profile get