Yimaru-BackEnd/internal/web_server/handlers/notification_handler.go

325 lines
12 KiB
Go

package handlers
import (
"context"
"encoding/json"
"fmt"
"net"
"net/http"
"strconv"
"github.com/SamuelTariku/FortuneBet-Backend/internal/domain"
"github.com/SamuelTariku/FortuneBet-Backend/internal/services/user"
"github.com/SamuelTariku/FortuneBet-Backend/internal/web_server/ws"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/adaptor"
"github.com/gorilla/websocket"
"github.com/valyala/fasthttp/fasthttpadaptor"
)
func hijackHTTP(c *fiber.Ctx) (net.Conn, http.ResponseWriter, error) {
var rw http.ResponseWriter
var conn net.Conn
// This is a trick: fasthttpadaptor gives us the HTTP interfaces
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
hj, ok := w.(http.Hijacker)
if !ok {
return
}
var err error
conn, _, err = hj.Hijack()
if err != nil {
return
}
rw = w
})
fasthttpadaptor.NewFastHTTPHandler(handler)(c.Context())
if conn == nil || rw == nil {
return nil, nil, fiber.NewError(fiber.StatusInternalServerError, "Failed to hijack connection")
}
return conn, rw, nil
}
func (h *Handler) ConnectSocket(c *fiber.Ctx) error {
userID, ok := c.Locals("userID").(int64)
if !ok || userID == 0 {
h.logger.Error("Invalid user ID in context")
return fiber.NewError(fiber.StatusUnauthorized, "Invalid user identification")
}
// Convert *fiber.Ctx to *http.Request
req, err := adaptor.ConvertRequest(c, false)
if err != nil {
h.logger.Error("Failed to convert request", "error", err)
return fiber.NewError(fiber.StatusInternalServerError, "Failed to convert request")
}
// Create a net.Conn hijacked from the fasthttp context
netConn, rw, err := hijackHTTP(c)
if err != nil {
h.logger.Error("Failed to hijack connection", "error", err)
return fiber.NewError(fiber.StatusInternalServerError, "Failed to hijack connection")
}
// Upgrade the connection using Gorilla's Upgrader
conn, err := ws.Upgrader.Upgrade(rw, req, nil)
if err != nil {
h.logger.Error("WebSocket upgrade failed", "error", err)
netConn.Close()
return fiber.NewError(fiber.StatusInternalServerError, "WebSocket upgrade failed")
}
client := &ws.Client{
Conn: conn,
RecipientID: userID,
}
h.notificationSvc.Hub.Register <- client
h.logger.Info("WebSocket connection established", "userID", userID)
defer func() {
h.notificationSvc.Hub.Unregister <- client
h.logger.Info("WebSocket connection closed", "userID", userID)
conn.Close()
}()
for {
_, _, err := conn.ReadMessage()
if err != nil {
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
h.logger.Info("WebSocket closed normally", "userID", userID)
} else {
h.logger.Warn("Unexpected WebSocket closure", "userID", userID, "error", err)
}
break
}
}
return nil
}
func (h *Handler) MarkNotificationAsRead(c *fiber.Ctx) error {
type Request struct {
NotificationIDs []string `json:"notification_ids" validate:"required"`
}
var req Request
if err := c.BodyParser(&req); err != nil {
h.logger.Error("Failed to parse request body", "error", err)
return fiber.NewError(fiber.StatusBadRequest, "Invalid request body")
}
userID, ok := c.Locals("user_id").(int64)
if !ok || userID == 0 {
h.logger.Error("Invalid user ID in context")
return fiber.NewError(fiber.StatusUnauthorized, "invalid user identification")
}
fmt.Printf("Notification IDs: %v \n", req.NotificationIDs)
if err := h.notificationSvc.MarkAsRead(context.Background(), req.NotificationIDs, userID); err != nil {
h.logger.Error("Failed to mark notifications as read", "notificationID", req.NotificationIDs, "error", err)
return fiber.NewError(fiber.StatusInternalServerError, "Failed to update notification status")
}
return c.Status(fiber.StatusOK).JSON(fiber.Map{"message": "Notification marked as read"})
}
func (h *Handler) CreateAndSendNotification(c *fiber.Ctx) error {
type Request struct {
RecipientID int64 `json:"recipient_id" validate:"required_if=DeliveryScheme single"`
Type domain.NotificationType `json:"type" validate:"required"`
Level domain.NotificationLevel `json:"level" validate:"required"`
ErrorSeverity *domain.NotificationErrorSeverity `json:"error_severity"`
Reciever domain.NotificationRecieverSide `json:"reciever" validate:"required"`
DeliveryScheme domain.NotificationDeliveryScheme `json:"delivery_scheme" validate:"required"`
DeliveryChannel domain.DeliveryChannel `json:"delivery_channel" validate:"required"`
Payload domain.NotificationPayload `json:"payload" validate:"required"`
Priority int `json:"priority"`
Metadata json.RawMessage `json:"metadata,omitempty"`
}
var req Request
if err := c.BodyParser(&req); err != nil {
h.logger.Error("[NotificationSvc.CreateAndSendNotification] Failed to parse request body", "error", err)
return fiber.NewError(fiber.StatusBadRequest, "Invalid request body")
}
// userID, ok := c.Locals("userID").(int64)
// if !ok || userID == 0 {
// h.logger.Error("[NotificationSvc.CreateAndSendNotification] Invalid user ID in context")
// return fiber.NewError(fiber.StatusUnauthorized, "Invalid user identification")
// }
switch req.DeliveryScheme {
case domain.NotificationDeliverySchemeSingle:
// if req.Reciever == domain.NotificationRecieverSideCustomer {
// h.logger.Warn("[NotificationSvc.CreateAndSendNotification] Unauthorized attempt to send notification", "recipientID", req.RecipientID)
// return fiber.NewError(fiber.StatusForbidden, "Unauthorized to send notification to this recipient")
// }
notification := &domain.Notification{
ID: "",
RecipientID: req.RecipientID,
Type: req.Type,
Level: req.Level,
ErrorSeverity: req.ErrorSeverity,
Reciever: req.Reciever,
IsRead: false,
DeliveryStatus: domain.DeliveryStatusPending,
DeliveryChannel: req.DeliveryChannel,
Payload: req.Payload,
Priority: req.Priority,
Metadata: req.Metadata,
}
if err := h.notificationSvc.SendNotification(context.Background(), notification); err != nil {
h.logger.Error("[NotificationSvc.CreateAndSendNotification] Failed to send single notification", "recipientID", req.RecipientID, "error", err)
return fiber.NewError(fiber.StatusInternalServerError, "Failed to send notification")
}
h.logger.Info("[NotificationSvc.CreateAndSendNotification] Single notification sent successfully", "recipientID", req.RecipientID, "type", req.Type)
return c.Status(fiber.StatusCreated).JSON(fiber.Map{"message": "Single notification sent successfully", "notification_id": notification.ID})
case domain.NotificationDeliverySchemeBulk:
recipients, _, err := h.userSvc.GetAllUsers(context.Background(), user.Filter{
Role: string(req.Reciever),
})
if err != nil {
h.logger.Error("[NotificationSvc.CreateAndSendNotification] Failed to fetch recipients for bulk notification", "error", err)
return fiber.NewError(fiber.StatusInternalServerError, "Failed to fetch recipients")
}
fmt.Printf("Number of Recipients %d \n", len(recipients))
notificationIDs := make([]string, 0, len(recipients))
for _, user := range recipients {
notification := &domain.Notification{
ID: "",
RecipientID: user.ID,
Type: req.Type,
Level: req.Level,
ErrorSeverity: req.ErrorSeverity,
Reciever: req.Reciever,
IsRead: false,
DeliveryStatus: domain.DeliveryStatusPending,
DeliveryChannel: req.DeliveryChannel,
Payload: req.Payload,
Priority: req.Priority,
Metadata: req.Metadata,
}
if err := h.notificationSvc.SendNotification(context.Background(), notification); err != nil {
h.logger.Error("[NotificationSvc.CreateAndSendNotification] Failed to send bulk notification", "UserID", user.ID, "error", err)
continue
}
notificationIDs = append(notificationIDs, notification.ID)
}
h.logger.Info("[NotificationSvc.CreateAndSendNotification] Bulk notification sent successfully", "recipient_count", len(recipients), "type", req.Type)
return c.Status(fiber.StatusCreated).JSON(fiber.Map{
"message": "Bulk notification sent successfully",
"recipient_count": len(recipients),
"notification_ids": notificationIDs,
})
default:
h.logger.Error("[NotificationSvc.CreateAndSendNotification] Invalid delivery scheme", "delivery_scheme", req.DeliveryScheme)
return fiber.NewError(fiber.StatusBadRequest, "Invalid delivery scheme")
}
}
func (h *Handler) GetNotifications(c *fiber.Ctx) error {
limitStr := c.Query("limit", "10")
offsetStr := c.Query("offset", "0")
// Convert limit and offset to integers
limit, err := strconv.Atoi(limitStr)
if err != nil || limit <= 0 {
h.logger.Error("[NotificationSvc.GetNotifications] Invalid limit value", "error", err)
return fiber.NewError(fiber.StatusBadRequest, "Invalid limit value")
}
offset, err := strconv.Atoi(offsetStr)
if err != nil || offset < 0 {
h.logger.Error("[NotificationSvc.GetNotifications] Invalid offset value", "error", err)
return fiber.NewError(fiber.StatusBadRequest, "Invalid offset value")
}
userID, ok := c.Locals("user_id").(int64)
if !ok || userID == 0 {
h.logger.Error("[NotificationSvc.GetNotifications] Invalid user ID in context")
return fiber.NewError(fiber.StatusUnauthorized, "Invalid user identification")
}
notifications, err := h.notificationSvc.ListNotifications(context.Background(), userID, limit, offset)
if err != nil {
h.logger.Error("[NotificationSvc.GetNotifications] Failed to fetch notifications", "error", err)
return fiber.NewError(fiber.StatusInternalServerError, "Failed to fetch notifications")
}
return c.Status(fiber.StatusOK).JSON(fiber.Map{
"notifications": notifications,
"total_count": len(notifications),
"limit": limit,
"offset": offset,
})
}
func (h *Handler) getAllRecipientIDs(ctx context.Context, receiver domain.NotificationRecieverSide) ([]int64, error) {
return h.notificationSvc.ListRecipientIDs(ctx, receiver)
}
func (h *Handler) CountUnreadNotifications(c *fiber.Ctx) error {
userID, ok := c.Locals("user_id").(int64)
if !ok || userID == 0 {
h.logger.Error("[NotificationSvc.GetNotifications] Invalid user ID in context")
return fiber.NewError(fiber.StatusUnauthorized, "Invalid user identification")
}
total, err := h.notificationSvc.CountUnreadNotifications(c.Context(), userID)
if err != nil {
h.logger.Error("[NotificationSvc.CountUnreadNotifications] Failed to fetch unread notification count", "error", err)
return fiber.NewError(fiber.StatusInternalServerError, "Failed to fetch notifications")
}
return c.Status(fiber.StatusOK).JSON(fiber.Map{
"unread": total,
})
}
func (h *Handler) GetAllNotifications(c *fiber.Ctx) error {
limitStr := c.Query("limit", "10")
pageStr := c.Query("page", "1")
// Convert limit and offset to integers
limit, err := strconv.Atoi(limitStr)
if err != nil || limit <= 0 {
h.logger.Error("[NotificationSvc.GetNotifications] Invalid limit value", "error", err)
return fiber.NewError(fiber.StatusBadRequest, "Invalid limit value")
}
page, err := strconv.Atoi(pageStr)
if err != nil || page <= 0 {
h.logger.Error("[NotificationSvc.GetNotifications] Invalid page value", "error", err)
return fiber.NewError(fiber.StatusBadRequest, "Invalid page value")
}
notifications, err := h.notificationSvc.GetAllNotifications(context.Background(), limit, ((page - 1) * limit))
if err != nil {
h.logger.Error("[NotificationSvc.GetNotifications] Failed to fetch notifications", "error", err)
return fiber.NewError(fiber.StatusInternalServerError, "Failed to fetch notifications")
}
return c.Status(fiber.StatusOK).JSON(fiber.Map{
"notifications": notifications,
"total_count": len(notifications),
"limit": limit,
"page": page,
})
}