452 lines
16 KiB
Go
452 lines
16 KiB
Go
package handlers
|
|
|
|
import (
|
|
"Yimaru-Backend/internal/domain"
|
|
"Yimaru-Backend/internal/web_server/ws"
|
|
"context"
|
|
"encoding/json"
|
|
"net"
|
|
"net/http"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/gofiber/fiber/v2"
|
|
"github.com/gofiber/fiber/v2/middleware/adaptor"
|
|
"github.com/gorilla/websocket"
|
|
"github.com/valyala/fasthttp/fasthttpadaptor"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
func hijackHTTP(c *fiber.Ctx) (net.Conn, http.ResponseWriter, error) {
|
|
var rw http.ResponseWriter
|
|
var conn net.Conn
|
|
|
|
// This is a trick: fasthttpadaptor gives us the HTTP interfaces
|
|
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
hj, ok := w.(http.Hijacker)
|
|
if !ok {
|
|
return
|
|
}
|
|
var err error
|
|
conn, _, err = hj.Hijack()
|
|
if err != nil {
|
|
return
|
|
}
|
|
rw = w
|
|
})
|
|
|
|
fasthttpadaptor.NewFastHTTPHandler(handler)(c.Context())
|
|
|
|
if conn == nil || rw == nil {
|
|
return nil, nil, fiber.NewError(fiber.StatusInternalServerError, "Failed to hijack connection")
|
|
}
|
|
return conn, rw, nil
|
|
}
|
|
|
|
func (h *Handler) ConnectSocket(c *fiber.Ctx) error {
|
|
userID, ok := c.Locals("userID").(int64)
|
|
if !ok || userID == 0 {
|
|
h.mongoLoggerSvc.Info("Invalid user ID in context",
|
|
zap.Int64("userID", userID),
|
|
zap.Int("status_code", fiber.StatusUnauthorized),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return fiber.NewError(fiber.StatusUnauthorized, "Invalid user identification")
|
|
}
|
|
|
|
// Convert *fiber.Ctx to *http.Request
|
|
req, err := adaptor.ConvertRequest(c, false)
|
|
if err != nil {
|
|
h.mongoLoggerSvc.Error("Failed to convert socket request",
|
|
zap.Int64("userID", userID),
|
|
zap.Int("status_code", fiber.StatusInternalServerError),
|
|
zap.Error(err),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return fiber.NewError(fiber.StatusInternalServerError, "Failed to convert request")
|
|
}
|
|
|
|
// Create a net.Conn hijacked from the fasthttp context
|
|
netConn, rw, err := hijackHTTP(c)
|
|
if err != nil {
|
|
h.mongoLoggerSvc.Error("Failed to hijack connection",
|
|
zap.Int64("userID", userID),
|
|
zap.Int("status_code", fiber.StatusInternalServerError),
|
|
zap.Error(err),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return fiber.NewError(fiber.StatusInternalServerError, "Failed to hijack connection:"+err.Error())
|
|
}
|
|
|
|
// Upgrade the connection using Gorilla's Upgrader
|
|
conn, err := ws.Upgrader.Upgrade(rw, req, nil)
|
|
if err != nil {
|
|
h.mongoLoggerSvc.Error("WebSocket upgrade failed",
|
|
zap.Int64("userID", userID),
|
|
zap.Int("status_code", fiber.StatusInternalServerError),
|
|
zap.Error(err),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
netConn.Close()
|
|
return fiber.NewError(fiber.StatusInternalServerError, "WebSocket upgrade failed:"+err.Error())
|
|
}
|
|
|
|
client := &ws.Client{
|
|
Conn: conn,
|
|
RecipientID: userID,
|
|
}
|
|
|
|
h.notificationSvc.Hub.Register <- client
|
|
// h.logger.Info("WebSocket connection established", "userID", userID)
|
|
|
|
defer func() {
|
|
h.notificationSvc.Hub.Unregister <- client
|
|
conn.Close()
|
|
}()
|
|
|
|
for {
|
|
_, _, err := conn.ReadMessage()
|
|
if err != nil {
|
|
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
|
|
h.mongoLoggerSvc.Info("WebSocket closed normally",
|
|
zap.Int64("userID", userID),
|
|
zap.Int("status_code", fiber.StatusBadRequest),
|
|
zap.Error(err),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
} else {
|
|
h.mongoLoggerSvc.Info("Unexpected WebSocket closure",
|
|
zap.Int64("userID", userID),
|
|
zap.Int("status_code", fiber.StatusBadRequest),
|
|
zap.Error(err),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
}
|
|
break
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// func (h *Handler) MarkNotificationAsRead(c *fiber.Ctx) error {
|
|
// type Request struct {
|
|
// NotificationIDs []string `json:"notification_ids" validate:"required"`
|
|
// }
|
|
|
|
// var req Request
|
|
// if err := c.BodyParser(&req); err != nil {
|
|
// h.mongoLoggerSvc.Info("Failed to parse request body",
|
|
// zap.Int("status_code", fiber.StatusBadRequest),
|
|
// zap.Error(err),
|
|
// zap.Time("timestamp", time.Now()),
|
|
// )
|
|
// return fiber.NewError(fiber.StatusBadRequest, "Invalid request body")
|
|
// }
|
|
|
|
// userID, ok := c.Locals("user_id").(int64)
|
|
// if !ok || userID == 0 {
|
|
// h.mongoLoggerSvc.Error("Invalid user ID in context",
|
|
// zap.Int64("userID", userID),
|
|
// zap.Int("status_code", fiber.StatusInternalServerError),
|
|
// zap.Time("timestamp", time.Now()),
|
|
// )
|
|
// return fiber.NewError(fiber.StatusInternalServerError, "invalid user ID in context")
|
|
// }
|
|
|
|
// fmt.Printf("Notification IDs: %v \n", req.NotificationIDs)
|
|
// if err := h.notificationSvc.MarkAsRead(context.Background(), req.NotificationIDs, userID); err != nil {
|
|
// h.mongoLoggerSvc.Error("Failed to mark notifications as read",
|
|
// zap.String("notificationID", strings.Join(req.NotificationIDs, ",")),
|
|
// zap.Int64("userID", userID),
|
|
// zap.Int("status_code", fiber.StatusInternalServerError),
|
|
// zap.Error(err),
|
|
// zap.Time("timestamp", time.Now()),
|
|
// )
|
|
// return fiber.NewError(fiber.StatusInternalServerError, "Failed to update notification status:", err.Error())
|
|
// }
|
|
|
|
// return c.Status(fiber.StatusOK).JSON(fiber.Map{"message": "Notification marked as read"})
|
|
// }
|
|
|
|
func (h *Handler) CreateAndSendNotification(c *fiber.Ctx) error {
|
|
type Request struct {
|
|
RecipientID int64 `json:"recipient_id" validate:"required_if=DeliveryScheme single"`
|
|
Type domain.NotificationType `json:"type" validate:"required"`
|
|
Level domain.NotificationLevel `json:"level" validate:"required"`
|
|
ErrorSeverity *domain.NotificationErrorSeverity `json:"error_severity"`
|
|
Reciever domain.NotificationRecieverSide `json:"reciever" validate:"required"`
|
|
DeliveryScheme domain.NotificationDeliveryScheme `json:"delivery_scheme" validate:"required"`
|
|
DeliveryChannel domain.DeliveryChannel `json:"delivery_channel" validate:"required"`
|
|
Payload domain.NotificationPayload `json:"payload" validate:"required"`
|
|
Priority int `json:"priority"`
|
|
Metadata json.RawMessage `json:"metadata,omitempty"`
|
|
}
|
|
|
|
var req Request
|
|
if err := c.BodyParser(&req); err != nil {
|
|
h.mongoLoggerSvc.Info("[NotificationSvc.CreateAndSendNotification] Failed to parse request body",
|
|
zap.Int("status_code", fiber.StatusBadRequest),
|
|
zap.Error(err),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return fiber.NewError(fiber.StatusBadRequest, "Invalid request body")
|
|
}
|
|
|
|
// userID, ok := c.Locals("userID").(int64)
|
|
// if !ok || userID == 0 {
|
|
// h.logger.Error("[NotificationSvc.CreateAndSendNotification] Invalid user ID in context")
|
|
// return fiber.NewError(fiber.StatusUnauthorized, "Invalid user identification")
|
|
// }
|
|
|
|
switch req.DeliveryScheme {
|
|
case domain.NotificationDeliverySchemeSingle:
|
|
// if req.Reciever == domain.NotificationRecieverSideCustomer {
|
|
// h.logger.Warn("[NotificationSvc.CreateAndSendNotification] Unauthorized attempt to send notification", "recipientID", req.RecipientID)
|
|
// return fiber.NewError(fiber.StatusForbidden, "Unauthorized to send notification to this recipient")
|
|
// }
|
|
|
|
errorSeverity := domain.NotificationErrorSeverityMedium
|
|
if req.ErrorSeverity != nil {
|
|
errorSeverity = *req.ErrorSeverity
|
|
}
|
|
|
|
notification := &domain.Notification{
|
|
ID: "",
|
|
RecipientID: req.RecipientID,
|
|
Type: req.Type,
|
|
Level: req.Level,
|
|
ErrorSeverity: errorSeverity,
|
|
Reciever: req.Reciever,
|
|
IsRead: false,
|
|
DeliveryStatus: domain.DeliveryStatusPending,
|
|
DeliveryChannel: req.DeliveryChannel,
|
|
Payload: req.Payload,
|
|
Priority: req.Priority,
|
|
Metadata: req.Metadata,
|
|
}
|
|
|
|
if err := h.notificationSvc.SendNotification(context.Background(), notification); err != nil {
|
|
h.mongoLoggerSvc.Error("[NotificationSvc.CreateAndSendNotification] Failed to send single notification",
|
|
zap.Int64("recipientID", req.RecipientID),
|
|
zap.Int("status_code", fiber.StatusInternalServerError),
|
|
zap.Error(err),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return fiber.NewError(fiber.StatusInternalServerError, "Failed to send notification:"+err.Error())
|
|
}
|
|
|
|
h.mongoLoggerSvc.Info("[NotificationSvc.CreateAndSendNotification] Single notification sent successfully",
|
|
zap.Int64("recipientID", req.RecipientID),
|
|
zap.String("type", string(req.Type)),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return c.Status(fiber.StatusCreated).JSON(fiber.Map{"message": "Single notification sent successfully", "notification_id": notification.ID})
|
|
|
|
case domain.NotificationDeliverySchemeBulk:
|
|
recipients, _, err := h.userSvc.GetAllUsers(context.Background(), domain.UserFilter{
|
|
Role: string(req.Reciever),
|
|
})
|
|
if err != nil {
|
|
h.mongoLoggerSvc.Error("[NotificationSvc.CreateAndSendNotification] Failed to fetch recipients for bulk notification",
|
|
zap.Int64("RecipientID", req.RecipientID),
|
|
zap.String("Reciever", string(req.Reciever)),
|
|
zap.Int("status_code", fiber.StatusInternalServerError),
|
|
zap.Error(err),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return fiber.NewError(fiber.StatusInternalServerError, "Failed to fetch recipients:"+err.Error())
|
|
}
|
|
|
|
notificationIDs := make([]string, 0, len(recipients))
|
|
for _, user := range recipients {
|
|
errorSeverity := domain.NotificationErrorSeverityMedium
|
|
if req.ErrorSeverity != nil {
|
|
errorSeverity = *req.ErrorSeverity
|
|
}
|
|
|
|
notification := &domain.Notification{
|
|
ID: "",
|
|
RecipientID: user.ID,
|
|
Type: req.Type,
|
|
Level: req.Level,
|
|
ErrorSeverity: errorSeverity,
|
|
Reciever: req.Reciever,
|
|
IsRead: false,
|
|
DeliveryStatus: domain.DeliveryStatusPending,
|
|
DeliveryChannel: req.DeliveryChannel,
|
|
Payload: req.Payload,
|
|
Priority: req.Priority,
|
|
Metadata: req.Metadata,
|
|
}
|
|
|
|
if err := h.notificationSvc.SendNotification(context.Background(), notification); err != nil {
|
|
h.mongoLoggerSvc.Error("[NotificationSvc.CreateAndSendNotification] Failed to send bulk notification",
|
|
zap.Int64("UserID", user.ID),
|
|
zap.Error(err),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
continue
|
|
}
|
|
notificationIDs = append(notificationIDs, notification.ID)
|
|
}
|
|
|
|
h.mongoLoggerSvc.Error("[NotificationSvc.CreateAndSendNotification] Bulk notification sent successfully",
|
|
zap.Int("recipient_count", len(recipients)),
|
|
zap.String("type", string(req.Type)),
|
|
zap.Int("status_code", fiber.StatusCreated),
|
|
zap.Error(err),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return c.Status(fiber.StatusCreated).JSON(fiber.Map{
|
|
"message": "Bulk notification sent successfully",
|
|
"recipient_count": len(recipients),
|
|
"notification_ids": notificationIDs,
|
|
})
|
|
|
|
default:
|
|
h.mongoLoggerSvc.Info("[NotificationSvc.CreateAndSendNotification] Invalid delivery scheme",
|
|
zap.String("delivery_scheme", string(req.DeliveryScheme)),
|
|
zap.Int("status_code", fiber.StatusBadRequest),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return fiber.NewError(fiber.StatusBadRequest, "Invalid delivery scheme")
|
|
}
|
|
}
|
|
|
|
func (h *Handler) GetUserNotification(c *fiber.Ctx) error {
|
|
limitStr := c.Query("limit", "10")
|
|
offsetStr := c.Query("offset", "0")
|
|
|
|
// Convert limit and offset to integers
|
|
limit, err := strconv.Atoi(limitStr)
|
|
if err != nil || limit <= 0 {
|
|
h.mongoLoggerSvc.Info("[NotificationSvc.GetUserNotification] Invalid limit value",
|
|
zap.String("limit", limitStr),
|
|
zap.Int("status_code", fiber.StatusBadRequest),
|
|
zap.Error(err),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return fiber.NewError(fiber.StatusBadRequest, "Invalid limit value")
|
|
}
|
|
offset, err := strconv.Atoi(offsetStr)
|
|
if err != nil || offset < 0 {
|
|
h.mongoLoggerSvc.Info("[NotificationSvc.GetUserNotification] Invalid offset value",
|
|
zap.String("offset", offsetStr),
|
|
zap.Int("status_code", fiber.StatusBadRequest),
|
|
zap.Error(err),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return fiber.NewError(fiber.StatusBadRequest, "Invalid offset value")
|
|
}
|
|
|
|
userID, ok := c.Locals("user_id").(int64)
|
|
if !ok || userID == 0 {
|
|
h.mongoLoggerSvc.Error("[NotificationSvc.GetUserNotification] Invalid user ID in context",
|
|
zap.Int64("userID", userID),
|
|
zap.Int("status_code", fiber.StatusInternalServerError),
|
|
zap.Error(err),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return fiber.NewError(fiber.StatusInternalServerError, "Invalid user identification")
|
|
}
|
|
|
|
notifications, total, err := h.notificationSvc.GetUserNotifications(context.Background(), userID, limit, offset)
|
|
if err != nil {
|
|
h.mongoLoggerSvc.Error("[NotificationSvc.GetUserNotification] Failed to fetch notifications",
|
|
zap.Int64("userID", userID),
|
|
zap.Int("status_code", fiber.StatusInternalServerError),
|
|
zap.Error(err),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return fiber.NewError(fiber.StatusInternalServerError, "Failed to fetch notifications:"+err.Error())
|
|
}
|
|
|
|
return c.Status(fiber.StatusOK).JSON(fiber.Map{
|
|
"notifications": notifications,
|
|
"total_count": total,
|
|
"limit": limit,
|
|
"offset": offset,
|
|
})
|
|
|
|
}
|
|
|
|
// func (h *Handler) GetAllRecipientIDs(ctx context.Context, receiver domain.NotificationRecieverSide) ([]int64, error) {
|
|
// return h.notificationSvc.ListRecipientIDs(ctx, receiver)
|
|
// }
|
|
|
|
func (h *Handler) CountUnreadNotifications(c *fiber.Ctx) error {
|
|
|
|
userID, ok := c.Locals("user_id").(int64)
|
|
if !ok || userID == 0 {
|
|
h.mongoLoggerSvc.Error("NotificationSvc.GetNotifications] Invalid user ID in context",
|
|
zap.Int64("userID", userID),
|
|
zap.Int("status_code", fiber.StatusInternalServerError),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return fiber.NewError(fiber.StatusInternalServerError, "Invalid user identification")
|
|
}
|
|
|
|
total, err := h.notificationSvc.CountUnreadNotifications(c.Context(), userID)
|
|
|
|
if err != nil {
|
|
h.mongoLoggerSvc.Error("[NotificationSvc.CountUnreadNotifications] Failed to fetch unread notification count",
|
|
zap.Int64("userID", userID),
|
|
zap.Int("status_code", fiber.StatusInternalServerError),
|
|
zap.Error(err),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return fiber.NewError(fiber.StatusInternalServerError, "Failed to fetch notifications:"+err.Error())
|
|
}
|
|
|
|
return c.Status(fiber.StatusOK).JSON(fiber.Map{
|
|
"unread": total,
|
|
})
|
|
}
|
|
|
|
func (h *Handler) GetAllNotifications(c *fiber.Ctx) error {
|
|
limitStr := c.Query("limit", "10")
|
|
pageStr := c.Query("page", "1")
|
|
|
|
// Convert limit and offset to integers
|
|
limit, err := strconv.Atoi(limitStr)
|
|
if err != nil || limit <= 0 {
|
|
h.mongoLoggerSvc.Info("[NotificationSvc.GetNotifications] Invalid limit value",
|
|
zap.String("limit", limitStr),
|
|
zap.Int("status_code", fiber.StatusBadRequest),
|
|
zap.Error(err),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return fiber.NewError(fiber.StatusBadRequest, "Invalid limit value")
|
|
}
|
|
page, err := strconv.Atoi(pageStr)
|
|
if err != nil || page <= 0 {
|
|
h.mongoLoggerSvc.Info("[NotificationSvc.GetNotifications] Invalid page value",
|
|
zap.String("page", pageStr),
|
|
zap.Int("status_code", fiber.StatusBadRequest),
|
|
zap.Error(err),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return fiber.NewError(fiber.StatusBadRequest, "Invalid page value")
|
|
}
|
|
|
|
notifications, err := h.notificationSvc.GetAllNotifications(context.Background(), limit, ((page - 1) * limit))
|
|
if err != nil {
|
|
h.mongoLoggerSvc.Error("[NotificationSvc.GetNotifications] Failed to fetch notifications",
|
|
zap.Int64("limit", int64(limit)),
|
|
zap.Int("status_code", fiber.StatusInternalServerError),
|
|
zap.Error(err),
|
|
zap.Time("timestamp", time.Now()),
|
|
)
|
|
return fiber.NewError(fiber.StatusInternalServerError, "Failed to fetch notifications")
|
|
}
|
|
|
|
return c.Status(fiber.StatusOK).JSON(fiber.Map{
|
|
"notifications": notifications,
|
|
"total_count": len(notifications),
|
|
"limit": limit,
|
|
"page": page,
|
|
})
|
|
|
|
}
|