package handlers import ( "context" "encoding/json" "fmt" "net" "net/http" "strconv" "strings" "time" "github.com/SamuelTariku/FortuneBet-Backend/internal/domain" "github.com/SamuelTariku/FortuneBet-Backend/internal/web_server/ws" "github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2/middleware/adaptor" "github.com/gorilla/websocket" "github.com/valyala/fasthttp/fasthttpadaptor" "go.uber.org/zap" ) func hijackHTTP(c *fiber.Ctx) (net.Conn, http.ResponseWriter, error) { var rw http.ResponseWriter var conn net.Conn // This is a trick: fasthttpadaptor gives us the HTTP interfaces handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { hj, ok := w.(http.Hijacker) if !ok { return } var err error conn, _, err = hj.Hijack() if err != nil { return } rw = w }) fasthttpadaptor.NewFastHTTPHandler(handler)(c.Context()) if conn == nil || rw == nil { return nil, nil, fiber.NewError(fiber.StatusInternalServerError, "Failed to hijack connection") } return conn, rw, nil } func (h *Handler) ConnectSocket(c *fiber.Ctx) error { userID, ok := c.Locals("userID").(int64) if !ok || userID == 0 { h.mongoLoggerSvc.Info("Invalid user ID in context", zap.Int64("userID", userID), zap.Int("status_code", fiber.StatusUnauthorized), zap.Time("timestamp", time.Now()), ) return fiber.NewError(fiber.StatusUnauthorized, "Invalid user identification") } // Convert *fiber.Ctx to *http.Request req, err := adaptor.ConvertRequest(c, false) if err != nil { h.mongoLoggerSvc.Error("Failed to convert socket request", zap.Int64("userID", userID), zap.Int("status_code", fiber.StatusInternalServerError), zap.Error(err), zap.Time("timestamp", time.Now()), ) return fiber.NewError(fiber.StatusInternalServerError, "Failed to convert request") } // Create a net.Conn hijacked from the fasthttp context netConn, rw, err := hijackHTTP(c) if err != nil { h.mongoLoggerSvc.Error("Failed to hijack connection", zap.Int64("userID", userID), zap.Int("status_code", fiber.StatusInternalServerError), zap.Error(err), zap.Time("timestamp", time.Now()), ) return fiber.NewError(fiber.StatusInternalServerError, "Failed to hijack connection:"+err.Error()) } // Upgrade the connection using Gorilla's Upgrader conn, err := ws.Upgrader.Upgrade(rw, req, nil) if err != nil { h.mongoLoggerSvc.Error("WebSocket upgrade failed", zap.Int64("userID", userID), zap.Int("status_code", fiber.StatusInternalServerError), zap.Error(err), zap.Time("timestamp", time.Now()), ) netConn.Close() return fiber.NewError(fiber.StatusInternalServerError, "WebSocket upgrade failed:"+err.Error()) } client := &ws.Client{ Conn: conn, RecipientID: userID, } h.notificationSvc.Hub.Register <- client // h.logger.Info("WebSocket connection established", "userID", userID) defer func() { h.notificationSvc.Hub.Unregister <- client conn.Close() }() for { _, _, err := conn.ReadMessage() if err != nil { if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { h.mongoLoggerSvc.Info("WebSocket closed normally", zap.Int64("userID", userID), zap.Int("status_code", fiber.StatusBadRequest), zap.Error(err), zap.Time("timestamp", time.Now()), ) } else { h.mongoLoggerSvc.Info("Unexpected WebSocket closure", zap.Int64("userID", userID), zap.Int("status_code", fiber.StatusBadRequest), zap.Error(err), zap.Time("timestamp", time.Now()), ) } break } } return nil } func (h *Handler) MarkNotificationAsRead(c *fiber.Ctx) error { type Request struct { NotificationIDs []string `json:"notification_ids" validate:"required"` } var req Request if err := c.BodyParser(&req); err != nil { h.mongoLoggerSvc.Info("Failed to parse request body", zap.Int("status_code", fiber.StatusBadRequest), zap.Error(err), zap.Time("timestamp", time.Now()), ) return fiber.NewError(fiber.StatusBadRequest, "Invalid request body") } userID, ok := c.Locals("user_id").(int64) if !ok || userID == 0 { h.mongoLoggerSvc.Error("Invalid user ID in context", zap.Int64("userID", userID), zap.Int("status_code", fiber.StatusInternalServerError), zap.Time("timestamp", time.Now()), ) return fiber.NewError(fiber.StatusInternalServerError, "invalid user ID in context") } fmt.Printf("Notification IDs: %v \n", req.NotificationIDs) if err := h.notificationSvc.MarkAsRead(context.Background(), req.NotificationIDs, userID); err != nil { h.mongoLoggerSvc.Error("Failed to mark notifications as read", zap.String("notificationID", strings.Join(req.NotificationIDs, ",")), zap.Int64("userID", userID), zap.Int("status_code", fiber.StatusInternalServerError), zap.Error(err), zap.Time("timestamp", time.Now()), ) return fiber.NewError(fiber.StatusInternalServerError, "Failed to update notification status:", err.Error()) } return c.Status(fiber.StatusOK).JSON(fiber.Map{"message": "Notification marked as read"}) } func (h *Handler) CreateAndSendNotification(c *fiber.Ctx) error { type Request struct { RecipientID int64 `json:"recipient_id" validate:"required_if=DeliveryScheme single"` Type domain.NotificationType `json:"type" validate:"required"` Level domain.NotificationLevel `json:"level" validate:"required"` ErrorSeverity *domain.NotificationErrorSeverity `json:"error_severity"` Reciever domain.NotificationRecieverSide `json:"reciever" validate:"required"` DeliveryScheme domain.NotificationDeliveryScheme `json:"delivery_scheme" validate:"required"` DeliveryChannel domain.DeliveryChannel `json:"delivery_channel" validate:"required"` Payload domain.NotificationPayload `json:"payload" validate:"required"` Priority int `json:"priority"` Metadata json.RawMessage `json:"metadata,omitempty"` } var req Request if err := c.BodyParser(&req); err != nil { h.mongoLoggerSvc.Info("[NotificationSvc.CreateAndSendNotification] Failed to parse request body", zap.Int("status_code", fiber.StatusBadRequest), zap.Error(err), zap.Time("timestamp", time.Now()), ) return fiber.NewError(fiber.StatusBadRequest, "Invalid request body") } // userID, ok := c.Locals("userID").(int64) // if !ok || userID == 0 { // h.logger.Error("[NotificationSvc.CreateAndSendNotification] Invalid user ID in context") // return fiber.NewError(fiber.StatusUnauthorized, "Invalid user identification") // } switch req.DeliveryScheme { case domain.NotificationDeliverySchemeSingle: // if req.Reciever == domain.NotificationRecieverSideCustomer { // h.logger.Warn("[NotificationSvc.CreateAndSendNotification] Unauthorized attempt to send notification", "recipientID", req.RecipientID) // return fiber.NewError(fiber.StatusForbidden, "Unauthorized to send notification to this recipient") // } notification := &domain.Notification{ ID: "", RecipientID: req.RecipientID, Type: req.Type, Level: req.Level, ErrorSeverity: req.ErrorSeverity, Reciever: req.Reciever, IsRead: false, DeliveryStatus: domain.DeliveryStatusPending, DeliveryChannel: req.DeliveryChannel, Payload: req.Payload, Priority: req.Priority, Metadata: req.Metadata, } if err := h.notificationSvc.SendNotification(context.Background(), notification); err != nil { h.mongoLoggerSvc.Error("[NotificationSvc.CreateAndSendNotification] Failed to send single notification", zap.Int64("recipientID", req.RecipientID), zap.Int("status_code", fiber.StatusInternalServerError), zap.Error(err), zap.Time("timestamp", time.Now()), ) return fiber.NewError(fiber.StatusInternalServerError, "Failed to send notification:"+err.Error()) } h.mongoLoggerSvc.Info("[NotificationSvc.CreateAndSendNotification] Single notification sent successfully", zap.Int64("recipientID", req.RecipientID), zap.String("type", string(req.Type)), zap.Time("timestamp", time.Now()), ) return c.Status(fiber.StatusCreated).JSON(fiber.Map{"message": "Single notification sent successfully", "notification_id": notification.ID}) case domain.NotificationDeliverySchemeBulk: recipients, _, err := h.userSvc.GetAllUsers(context.Background(), domain.UserFilter{ Role: string(req.Reciever), }) if err != nil { h.mongoLoggerSvc.Error("[NotificationSvc.CreateAndSendNotification] Failed to fetch recipients for bulk notification", zap.Int64("RecipientID", req.RecipientID), zap.String("Reciever", string(req.Reciever)), zap.Int("status_code", fiber.StatusInternalServerError), zap.Error(err), zap.Time("timestamp", time.Now()), ) return fiber.NewError(fiber.StatusInternalServerError, "Failed to fetch recipients:"+err.Error()) } notificationIDs := make([]string, 0, len(recipients)) for _, user := range recipients { notification := &domain.Notification{ ID: "", RecipientID: user.ID, Type: req.Type, Level: req.Level, ErrorSeverity: req.ErrorSeverity, Reciever: req.Reciever, IsRead: false, DeliveryStatus: domain.DeliveryStatusPending, DeliveryChannel: req.DeliveryChannel, Payload: req.Payload, Priority: req.Priority, Metadata: req.Metadata, } if err := h.notificationSvc.SendNotification(context.Background(), notification); err != nil { h.mongoLoggerSvc.Error("[NotificationSvc.CreateAndSendNotification] Failed to send bulk notification", zap.Int64("UserID", user.ID), zap.Error(err), zap.Time("timestamp", time.Now()), ) continue } notificationIDs = append(notificationIDs, notification.ID) } h.mongoLoggerSvc.Error("[NotificationSvc.CreateAndSendNotification] Bulk notification sent successfully", zap.Int("recipient_count", len(recipients)), zap.String("type", string(req.Type)), zap.Int("status_code", fiber.StatusCreated), zap.Error(err), zap.Time("timestamp", time.Now()), ) return c.Status(fiber.StatusCreated).JSON(fiber.Map{ "message": "Bulk notification sent successfully", "recipient_count": len(recipients), "notification_ids": notificationIDs, }) default: h.mongoLoggerSvc.Info("[NotificationSvc.CreateAndSendNotification] Invalid delivery scheme", zap.String("delivery_scheme", string(req.DeliveryScheme)), zap.Int("status_code", fiber.StatusBadRequest), zap.Time("timestamp", time.Now()), ) return fiber.NewError(fiber.StatusBadRequest, "Invalid delivery scheme") } } func (h *Handler) GetNotifications(c *fiber.Ctx) error { limitStr := c.Query("limit", "10") offsetStr := c.Query("offset", "0") // Convert limit and offset to integers limit, err := strconv.Atoi(limitStr) if err != nil || limit <= 0 { h.mongoLoggerSvc.Info("[NotificationSvc.GetNotifications] Invalid limit value", zap.String("limit", limitStr), zap.Int("status_code", fiber.StatusBadRequest), zap.Error(err), zap.Time("timestamp", time.Now()), ) return fiber.NewError(fiber.StatusBadRequest, "Invalid limit value") } offset, err := strconv.Atoi(offsetStr) if err != nil || offset < 0 { h.mongoLoggerSvc.Info("[NotificationSvc.GetNotifications] Invalid offset value", zap.String("offset", offsetStr), zap.Int("status_code", fiber.StatusBadRequest), zap.Error(err), zap.Time("timestamp", time.Now()), ) return fiber.NewError(fiber.StatusBadRequest, "Invalid offset value") } userID, ok := c.Locals("user_id").(int64) if !ok || userID == 0 { h.mongoLoggerSvc.Error("[NotificationSvc.GetNotifications] Invalid user ID in context", zap.Int64("userID", userID), zap.Int("status_code", fiber.StatusInternalServerError), zap.Error(err), zap.Time("timestamp", time.Now()), ) return fiber.NewError(fiber.StatusInternalServerError, "Invalid user identification") } notifications, err := h.notificationSvc.ListNotifications(context.Background(), userID, limit, offset) if err != nil { h.mongoLoggerSvc.Error("[NotificationSvc.GetNotifications] Failed to fetch notifications", zap.Int64("userID", userID), zap.Int("status_code", fiber.StatusInternalServerError), zap.Error(err), zap.Time("timestamp", time.Now()), ) return fiber.NewError(fiber.StatusInternalServerError, "Failed to fetch notifications:"+err.Error()) } return c.Status(fiber.StatusOK).JSON(fiber.Map{ "notifications": notifications, "total_count": len(notifications), "limit": limit, "offset": offset, }) } func (h *Handler) GetAllRecipientIDs(ctx context.Context, receiver domain.NotificationRecieverSide) ([]int64, error) { return h.notificationSvc.ListRecipientIDs(ctx, receiver) } func (h *Handler) CountUnreadNotifications(c *fiber.Ctx) error { userID, ok := c.Locals("user_id").(int64) if !ok || userID == 0 { h.mongoLoggerSvc.Error("NotificationSvc.GetNotifications] Invalid user ID in context", zap.Int64("userID", userID), zap.Int("status_code", fiber.StatusInternalServerError), zap.Time("timestamp", time.Now()), ) return fiber.NewError(fiber.StatusInternalServerError, "Invalid user identification") } total, err := h.notificationSvc.CountUnreadNotifications(c.Context(), userID) if err != nil { h.mongoLoggerSvc.Error("[NotificationSvc.CountUnreadNotifications] Failed to fetch unread notification count", zap.Int64("userID", userID), zap.Int("status_code", fiber.StatusInternalServerError), zap.Error(err), zap.Time("timestamp", time.Now()), ) return fiber.NewError(fiber.StatusInternalServerError, "Failed to fetch notifications:"+err.Error()) } return c.Status(fiber.StatusOK).JSON(fiber.Map{ "unread": total, }) } func (h *Handler) GetAllNotifications(c *fiber.Ctx) error { limitStr := c.Query("limit", "10") pageStr := c.Query("page", "1") // Convert limit and offset to integers limit, err := strconv.Atoi(limitStr) if err != nil || limit <= 0 { h.mongoLoggerSvc.Info("[NotificationSvc.GetNotifications] Invalid limit value", zap.String("limit", limitStr), zap.Int("status_code", fiber.StatusBadRequest), zap.Error(err), zap.Time("timestamp", time.Now()), ) return fiber.NewError(fiber.StatusBadRequest, "Invalid limit value") } page, err := strconv.Atoi(pageStr) if err != nil || page <= 0 { h.mongoLoggerSvc.Info("[NotificationSvc.GetNotifications] Invalid page value", zap.String("page", pageStr), zap.Int("status_code", fiber.StatusBadRequest), zap.Error(err), zap.Time("timestamp", time.Now()), ) return fiber.NewError(fiber.StatusBadRequest, "Invalid page value") } notifications, err := h.notificationSvc.GetAllNotifications(context.Background(), limit, ((page - 1) * limit)) if err != nil { h.mongoLoggerSvc.Error("[NotificationSvc.GetNotifications] Failed to fetch notifications", zap.Int64("limit", int64(limit)), zap.Int("status_code", fiber.StatusInternalServerError), zap.Error(err), zap.Time("timestamp", time.Now()), ) return fiber.NewError(fiber.StatusInternalServerError, "Failed to fetch notifications") } return c.Status(fiber.StatusOK).JSON(fiber.Map{ "notifications": notifications, "total_count": len(notifications), "limit": limit, "page": page, }) }