Merge remote-tracking branch 'origin/dev' into sport-support
This commit is contained in:
commit
e0aaf536a0
17
Dockerfile
Normal file
17
Dockerfile
Normal file
|
|
@ -0,0 +1,17 @@
|
||||||
|
# Builder stage
|
||||||
|
FROM golang:1.24-alpine AS builder
|
||||||
|
|
||||||
|
WORKDIR /app
|
||||||
|
COPY go.mod go.sum ./
|
||||||
|
RUN go mod download
|
||||||
|
COPY . .
|
||||||
|
RUN go build -ldflags="-s -w" -o ./bin/web ./cmd/main.go
|
||||||
|
|
||||||
|
# Runner stage
|
||||||
|
FROM alpine:3.21 AS runner
|
||||||
|
WORKDIR /app
|
||||||
|
COPY .env .
|
||||||
|
COPY --from=builder /app/bin/web /app/bin/web
|
||||||
|
RUN apk add --no-cache ca-certificates
|
||||||
|
EXPOSE ${PORT}
|
||||||
|
CMD ["/app/bin/web"]
|
||||||
|
|
@ -14,6 +14,9 @@ services:
|
||||||
interval: 5s
|
interval: 5s
|
||||||
timeout: 3s
|
timeout: 3s
|
||||||
retries: 5
|
retries: 5
|
||||||
|
volumes:
|
||||||
|
- postgres_data:/var/lib/postgresql/data
|
||||||
|
|
||||||
migrate:
|
migrate:
|
||||||
image: migrate/migrate
|
image: migrate/migrate
|
||||||
volumes:
|
volumes:
|
||||||
|
|
@ -32,6 +35,37 @@ services:
|
||||||
networks:
|
networks:
|
||||||
- app
|
- app
|
||||||
|
|
||||||
|
app:
|
||||||
|
build:
|
||||||
|
context: .
|
||||||
|
dockerfile: Dockerfile
|
||||||
|
target: runner
|
||||||
|
ports:
|
||||||
|
- ${PORT}:8080
|
||||||
|
environment:
|
||||||
|
- DB_URL=postgresql://root:secret@postgres:5432/gh?sslmode=disable
|
||||||
|
depends_on:
|
||||||
|
migrate:
|
||||||
|
condition: service_completed_successfully
|
||||||
|
networks:
|
||||||
|
- app
|
||||||
|
command: ["/app/bin/web"]
|
||||||
|
|
||||||
|
|
||||||
|
test:
|
||||||
|
build:
|
||||||
|
context: .
|
||||||
|
dockerfile: Dockerfile
|
||||||
|
target: builder
|
||||||
|
volumes:
|
||||||
|
- .:/app
|
||||||
|
command: ["tail", "-f", "/dev/null"]
|
||||||
|
networks:
|
||||||
|
- app
|
||||||
|
|
||||||
networks:
|
networks:
|
||||||
app:
|
app:
|
||||||
driver: bridge
|
driver: bridge
|
||||||
|
|
||||||
|
volumes:
|
||||||
|
postgres_data:
|
||||||
|
|
@ -4,7 +4,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"github.com/SamuelTariku/FortuneBet-Backend/internal/domain"
|
"github.com/SamuelTariku/FortuneBet-Backend/internal/domain"
|
||||||
"github.com/gofiber/websocket/v2"
|
"github.com/gorilla/websocket"
|
||||||
)
|
)
|
||||||
|
|
||||||
type NotificationStore interface {
|
type NotificationStore interface {
|
||||||
|
|
|
||||||
|
|
@ -11,12 +11,14 @@ import (
|
||||||
"github.com/SamuelTariku/FortuneBet-Backend/internal/domain"
|
"github.com/SamuelTariku/FortuneBet-Backend/internal/domain"
|
||||||
"github.com/SamuelTariku/FortuneBet-Backend/internal/pkgs/helpers"
|
"github.com/SamuelTariku/FortuneBet-Backend/internal/pkgs/helpers"
|
||||||
"github.com/SamuelTariku/FortuneBet-Backend/internal/repository"
|
"github.com/SamuelTariku/FortuneBet-Backend/internal/repository"
|
||||||
|
"github.com/SamuelTariku/FortuneBet-Backend/internal/web_server/ws"
|
||||||
afro "github.com/amanuelabay/afrosms-go"
|
afro "github.com/amanuelabay/afrosms-go"
|
||||||
"github.com/gofiber/websocket/v2"
|
"github.com/gorilla/websocket"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Service struct {
|
type Service struct {
|
||||||
repo repository.NotificationRepository
|
repo repository.NotificationRepository
|
||||||
|
Hub *ws.NotificationHub
|
||||||
connections sync.Map
|
connections sync.Map
|
||||||
notificationCh chan *domain.Notification
|
notificationCh chan *domain.Notification
|
||||||
stopCh chan struct{}
|
stopCh chan struct{}
|
||||||
|
|
@ -24,9 +26,11 @@ type Service struct {
|
||||||
logger *slog.Logger
|
logger *slog.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(repo repository.NotificationRepository, logger *slog.Logger, cfg *config.Config) NotificationStore {
|
func New(repo repository.NotificationRepository, logger *slog.Logger, cfg *config.Config) *Service {
|
||||||
|
hub := ws.NewNotificationHub()
|
||||||
svc := &Service{
|
svc := &Service{
|
||||||
repo: repo,
|
repo: repo,
|
||||||
|
Hub: hub,
|
||||||
logger: logger,
|
logger: logger,
|
||||||
connections: sync.Map{},
|
connections: sync.Map{},
|
||||||
notificationCh: make(chan *domain.Notification, 1000),
|
notificationCh: make(chan *domain.Notification, 1000),
|
||||||
|
|
@ -34,6 +38,7 @@ func New(repo repository.NotificationRepository, logger *slog.Logger, cfg *confi
|
||||||
config: cfg,
|
config: cfg,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
go hub.Run()
|
||||||
go svc.startWorker()
|
go svc.startWorker()
|
||||||
go svc.startRetryWorker()
|
go svc.startRetryWorker()
|
||||||
|
|
||||||
|
|
@ -63,10 +68,18 @@ func (s *Service) SendNotification(ctx context.Context, notification *domain.Not
|
||||||
|
|
||||||
notification = created
|
notification = created
|
||||||
|
|
||||||
|
if notification.DeliveryChannel == domain.DeliveryChannelInApp {
|
||||||
|
s.Hub.Broadcast <- map[string]interface{}{
|
||||||
|
"type": "CREATED_NOTIFICATION",
|
||||||
|
"recipient_id": notification.RecipientID,
|
||||||
|
"payload": notification,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case s.notificationCh <- notification:
|
case s.notificationCh <- notification:
|
||||||
default:
|
default:
|
||||||
s.logger.Error("[NotificationSvc.SendNotification] Notification channel full, dropping notification", "id", notification.ID)
|
s.logger.Warn("[NotificationSvc.SendNotification] Notification channel full, dropping notification", "id", notification.ID)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
@ -78,6 +91,21 @@ func (s *Service) MarkAsRead(ctx context.Context, notificationID string, recipie
|
||||||
s.logger.Error("[NotificationSvc.MarkAsRead] Failed to mark notification as read", "notificationID", notificationID, "recipientID", recipientID, "error", err)
|
s.logger.Error("[NotificationSvc.MarkAsRead] Failed to mark notification as read", "notificationID", notificationID, "recipientID", recipientID, "error", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// count, err := s.repo.CountUnreadNotifications(ctx, recipientID)
|
||||||
|
// if err != nil {
|
||||||
|
// s.logger.Error("[NotificationSvc.MarkAsRead] Failed to count unread notifications", "recipientID", recipientID, "error", err)
|
||||||
|
// return err
|
||||||
|
// }
|
||||||
|
|
||||||
|
// s.Hub.Broadcast <- map[string]interface{}{
|
||||||
|
// "type": "COUNT_NOT_OPENED_NOTIFICATION",
|
||||||
|
// "recipient_id": recipientID,
|
||||||
|
// "payload": map[string]int{
|
||||||
|
// "not_opened_notifications_count": int(count),
|
||||||
|
// },
|
||||||
|
// }
|
||||||
|
|
||||||
s.logger.Info("[NotificationSvc.MarkAsRead] Notification marked as read", "notificationID", notificationID, "recipientID", recipientID)
|
s.logger.Info("[NotificationSvc.MarkAsRead] Notification marked as read", "notificationID", notificationID, "recipientID", recipientID)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
@ -99,7 +127,6 @@ func (s *Service) ConnectWebSocket(ctx context.Context, recipientID int64, c *we
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) DisconnectWebSocket(recipientID int64) {
|
func (s *Service) DisconnectWebSocket(recipientID int64) {
|
||||||
s.connections.Delete(recipientID)
|
|
||||||
if conn, loaded := s.connections.LoadAndDelete(recipientID); loaded {
|
if conn, loaded := s.connections.LoadAndDelete(recipientID); loaded {
|
||||||
conn.(*websocket.Conn).Close()
|
conn.(*websocket.Conn).Close()
|
||||||
s.logger.Info("[NotificationSvc.DisconnectWebSocket] Disconnected WebSocket", "recipientID", recipientID)
|
s.logger.Info("[NotificationSvc.DisconnectWebSocket] Disconnected WebSocket", "recipientID", recipientID)
|
||||||
|
|
@ -160,21 +187,26 @@ func (s *Service) ListRecipientIDs(ctx context.Context, receiver domain.Notifica
|
||||||
func (s *Service) handleNotification(notification *domain.Notification) {
|
func (s *Service) handleNotification(notification *domain.Notification) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
if conn, ok := s.connections.Load(notification.RecipientID); ok {
|
switch notification.DeliveryChannel {
|
||||||
data, err := notification.ToJSON()
|
case domain.DeliveryChannelSMS:
|
||||||
|
err := s.SendSMS(ctx, notification.RecipientID, notification.Payload.Message)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.logger.Error("[NotificationSvc.HandleNotification] Failed to serialize notification", "id", notification.ID, "error", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err := conn.(*websocket.Conn).WriteMessage(websocket.TextMessage, data); err != nil {
|
|
||||||
s.logger.Error("[NotificationSvc.HandleNotification] Failed to send WebSocket message", "id", notification.ID, "error", err)
|
|
||||||
notification.DeliveryStatus = domain.DeliveryStatusFailed
|
notification.DeliveryStatus = domain.DeliveryStatusFailed
|
||||||
} else {
|
} else {
|
||||||
notification.DeliveryStatus = domain.DeliveryStatusSent
|
notification.DeliveryStatus = domain.DeliveryStatusSent
|
||||||
}
|
}
|
||||||
} else {
|
case domain.DeliveryChannelEmail:
|
||||||
s.logger.Warn("[NotificationSvc.HandleNotification] No WebSocket connection for recipient", "recipientID", notification.RecipientID)
|
err := s.SendEmail(ctx, notification.RecipientID, notification.Payload.Headline, notification.Payload.Message)
|
||||||
|
if err != nil {
|
||||||
notification.DeliveryStatus = domain.DeliveryStatusFailed
|
notification.DeliveryStatus = domain.DeliveryStatusFailed
|
||||||
|
} else {
|
||||||
|
notification.DeliveryStatus = domain.DeliveryStatusSent
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
if notification.DeliveryChannel != domain.DeliveryChannelInApp {
|
||||||
|
s.logger.Warn("[NotificationSvc.HandleNotification] Unsupported delivery channel", "channel", notification.DeliveryChannel)
|
||||||
|
notification.DeliveryStatus = domain.DeliveryStatusFailed
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := s.repo.UpdateNotificationStatus(ctx, notification.ID, string(notification.DeliveryStatus), notification.IsRead, notification.Metadata); err != nil {
|
if _, err := s.repo.UpdateNotificationStatus(ctx, notification.ID, string(notification.DeliveryStatus), notification.IsRead, notification.Metadata); err != nil {
|
||||||
|
|
@ -210,13 +242,17 @@ func (s *Service) retryFailedNotifications() {
|
||||||
go func(notification *domain.Notification) {
|
go func(notification *domain.Notification) {
|
||||||
for attempt := 0; attempt < 3; attempt++ {
|
for attempt := 0; attempt < 3; attempt++ {
|
||||||
time.Sleep(time.Duration(attempt) * time.Second)
|
time.Sleep(time.Duration(attempt) * time.Second)
|
||||||
if conn, ok := s.connections.Load(notification.RecipientID); ok {
|
if notification.DeliveryChannel == domain.DeliveryChannelSMS {
|
||||||
data, err := notification.ToJSON()
|
if err := s.SendSMS(ctx, notification.RecipientID, notification.Payload.Message); err == nil {
|
||||||
if err != nil {
|
notification.DeliveryStatus = domain.DeliveryStatusSent
|
||||||
s.logger.Error("[NotificationSvc.RetryFailedNotifications] Failed to serialize notification for retry", "id", notification.ID, "error", err)
|
if _, err := s.repo.UpdateNotificationStatus(ctx, notification.ID, string(notification.DeliveryStatus), notification.IsRead, notification.Metadata); err != nil {
|
||||||
continue
|
s.logger.Error("[NotificationSvc.RetryFailedNotifications] Failed to update after retry", "id", notification.ID, "error", err)
|
||||||
}
|
}
|
||||||
if err := conn.(*websocket.Conn).WriteMessage(websocket.TextMessage, data); err == nil {
|
s.logger.Info("[NotificationSvc.RetryFailedNotifications] Successfully retried notification", "id", notification.ID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
} else if notification.DeliveryChannel == domain.DeliveryChannelEmail {
|
||||||
|
if err := s.SendEmail(ctx, notification.RecipientID, notification.Payload.Headline, notification.Payload.Message); err == nil {
|
||||||
notification.DeliveryStatus = domain.DeliveryStatusSent
|
notification.DeliveryStatus = domain.DeliveryStatusSent
|
||||||
if _, err := s.repo.UpdateNotificationStatus(ctx, notification.ID, string(notification.DeliveryStatus), notification.IsRead, notification.Metadata); err != nil {
|
if _, err := s.repo.UpdateNotificationStatus(ctx, notification.ID, string(notification.DeliveryStatus), notification.IsRead, notification.Metadata); err != nil {
|
||||||
s.logger.Error("[NotificationSvc.RetryFailedNotifications] Failed to update after retry", "id", notification.ID, "error", err)
|
s.logger.Error("[NotificationSvc.RetryFailedNotifications] Failed to update after retry", "id", notification.ID, "error", err)
|
||||||
|
|
|
||||||
|
|
@ -29,7 +29,7 @@ import (
|
||||||
type App struct {
|
type App struct {
|
||||||
fiber *fiber.App
|
fiber *fiber.App
|
||||||
logger *slog.Logger
|
logger *slog.Logger
|
||||||
NotidicationStore notificationservice.NotificationStore
|
NotidicationStore *notificationservice.Service
|
||||||
referralSvc referralservice.ReferralStore
|
referralSvc referralservice.ReferralStore
|
||||||
port int
|
port int
|
||||||
authSvc *authentication.Service
|
authSvc *authentication.Service
|
||||||
|
|
@ -61,7 +61,7 @@ func NewApp(
|
||||||
transactionSvc *transaction.Service,
|
transactionSvc *transaction.Service,
|
||||||
branchSvc *branch.Service,
|
branchSvc *branch.Service,
|
||||||
companySvc *company.Service,
|
companySvc *company.Service,
|
||||||
notidicationStore notificationservice.NotificationStore,
|
notidicationStore *notificationservice.Service,
|
||||||
prematchSvc *odds.ServiceImpl,
|
prematchSvc *odds.ServiceImpl,
|
||||||
eventSvc event.Service,
|
eventSvc event.Service,
|
||||||
referralSvc referralservice.ReferralStore,
|
referralSvc referralservice.ReferralStore,
|
||||||
|
|
@ -76,9 +76,9 @@ func NewApp(
|
||||||
})
|
})
|
||||||
|
|
||||||
app.Use(cors.New(cors.Config{
|
app.Use(cors.New(cors.Config{
|
||||||
AllowOrigins: "*", // Specify your frontend's origin
|
AllowOrigins: "*",
|
||||||
AllowMethods: "GET,POST,PUT,DELETE,OPTIONS", // Specify the allowed HTTP methods
|
AllowMethods: "GET,POST,PUT,DELETE,OPTIONS",
|
||||||
AllowHeaders: "Content-Type,Authorization,platform", // Specify the allowed headers
|
AllowHeaders: "Content-Type,Authorization,platform",
|
||||||
// AllowCredentials: true,
|
// AllowCredentials: true,
|
||||||
}))
|
}))
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -22,7 +22,7 @@ import (
|
||||||
|
|
||||||
type Handler struct {
|
type Handler struct {
|
||||||
logger *slog.Logger
|
logger *slog.Logger
|
||||||
notificationSvc notificationservice.NotificationStore
|
notificationSvc *notificationservice.Service
|
||||||
userSvc *user.Service
|
userSvc *user.Service
|
||||||
referralSvc referralservice.ReferralStore
|
referralSvc referralservice.ReferralStore
|
||||||
walletSvc *wallet.Service
|
walletSvc *wallet.Service
|
||||||
|
|
@ -41,7 +41,7 @@ type Handler struct {
|
||||||
|
|
||||||
func New(
|
func New(
|
||||||
logger *slog.Logger,
|
logger *slog.Logger,
|
||||||
notificationSvc notificationservice.NotificationStore,
|
notificationSvc *notificationservice.Service,
|
||||||
validator *customvalidator.CustomValidator,
|
validator *customvalidator.CustomValidator,
|
||||||
walletSvc *wallet.Service,
|
walletSvc *wallet.Service,
|
||||||
referralSvc referralservice.ReferralStore,
|
referralSvc referralservice.ReferralStore,
|
||||||
|
|
|
||||||
|
|
@ -3,53 +3,100 @@ package handlers
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"net"
|
||||||
|
"net/http"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
"github.com/SamuelTariku/FortuneBet-Backend/internal/domain"
|
"github.com/SamuelTariku/FortuneBet-Backend/internal/domain"
|
||||||
|
"github.com/SamuelTariku/FortuneBet-Backend/internal/web_server/ws"
|
||||||
"github.com/gofiber/fiber/v2"
|
"github.com/gofiber/fiber/v2"
|
||||||
"github.com/gofiber/websocket/v2"
|
"github.com/gofiber/fiber/v2/middleware/adaptor"
|
||||||
|
"github.com/gorilla/websocket"
|
||||||
|
"github.com/valyala/fasthttp/fasthttpadaptor"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (h *Handler) ConnectSocket(c *fiber.Ctx) error {
|
func hijackHTTP(c *fiber.Ctx) (net.Conn, http.ResponseWriter, error) {
|
||||||
if !websocket.IsWebSocketUpgrade(c) {
|
var rw http.ResponseWriter
|
||||||
h.logger.Warn("WebSocket upgrade required")
|
var conn net.Conn
|
||||||
return fiber.ErrUpgradeRequired
|
|
||||||
}
|
|
||||||
|
|
||||||
|
// This is a trick: fasthttpadaptor gives us the HTTP interfaces
|
||||||
|
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
hj, ok := w.(http.Hijacker)
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var err error
|
||||||
|
conn, _, err = hj.Hijack()
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
rw = w
|
||||||
|
})
|
||||||
|
|
||||||
|
fasthttpadaptor.NewFastHTTPHandler(handler)(c.Context())
|
||||||
|
|
||||||
|
if conn == nil || rw == nil {
|
||||||
|
return nil, nil, fiber.NewError(fiber.StatusInternalServerError, "Failed to hijack connection")
|
||||||
|
}
|
||||||
|
return conn, rw, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) ConnectSocket(c *fiber.Ctx) error {
|
||||||
userID, ok := c.Locals("userID").(int64)
|
userID, ok := c.Locals("userID").(int64)
|
||||||
if !ok || userID == 0 {
|
if !ok || userID == 0 {
|
||||||
h.logger.Error("Invalid user ID in context")
|
h.logger.Error("Invalid user ID in context")
|
||||||
return fiber.NewError(fiber.StatusUnauthorized, "invalid user identification")
|
return fiber.NewError(fiber.StatusUnauthorized, "Invalid user identification")
|
||||||
}
|
}
|
||||||
|
|
||||||
c.Locals("allowed", true)
|
// Convert *fiber.Ctx to *http.Request
|
||||||
|
req, err := adaptor.ConvertRequest(c, false)
|
||||||
return websocket.New(func(conn *websocket.Conn) {
|
if err != nil {
|
||||||
ctx := context.Background()
|
h.logger.Error("Failed to convert request", "error", err)
|
||||||
logger := h.logger.With("userID", userID, "remoteAddr", conn.RemoteAddr())
|
return fiber.NewError(fiber.StatusInternalServerError, "Failed to convert request")
|
||||||
|
|
||||||
if err := h.notificationSvc.ConnectWebSocket(ctx, userID, conn); err != nil {
|
|
||||||
logger.Error("Failed to connect WebSocket", "error", err)
|
|
||||||
_ = conn.Close()
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Info("WebSocket connection established")
|
// Create a net.Conn hijacked from the fasthttp context
|
||||||
|
netConn, rw, err := hijackHTTP(c)
|
||||||
|
if err != nil {
|
||||||
|
h.logger.Error("Failed to hijack connection", "error", err)
|
||||||
|
return fiber.NewError(fiber.StatusInternalServerError, "Failed to hijack connection")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Upgrade the connection using Gorilla's Upgrader
|
||||||
|
conn, err := ws.Upgrader.Upgrade(rw, req, nil)
|
||||||
|
if err != nil {
|
||||||
|
h.logger.Error("WebSocket upgrade failed", "error", err)
|
||||||
|
netConn.Close()
|
||||||
|
return fiber.NewError(fiber.StatusInternalServerError, "WebSocket upgrade failed")
|
||||||
|
}
|
||||||
|
|
||||||
|
client := &ws.Client{
|
||||||
|
Conn: conn,
|
||||||
|
RecipientID: userID,
|
||||||
|
}
|
||||||
|
|
||||||
|
h.notificationSvc.Hub.Register <- client
|
||||||
|
h.logger.Info("WebSocket connection established", "userID", userID)
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
h.notificationSvc.DisconnectWebSocket(userID)
|
h.notificationSvc.Hub.Unregister <- client
|
||||||
logger.Info("WebSocket connection closed")
|
h.logger.Info("WebSocket connection closed", "userID", userID)
|
||||||
_ = conn.Close()
|
conn.Close()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
if _, _, err := conn.ReadMessage(); err != nil {
|
_, _, err := conn.ReadMessage()
|
||||||
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
|
if err != nil {
|
||||||
logger.Warn("WebSocket unexpected close", "error", err)
|
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
|
||||||
|
h.logger.Info("WebSocket closed normally", "userID", userID)
|
||||||
|
} else {
|
||||||
|
h.logger.Warn("Unexpected WebSocket closure", "userID", userID, "error", err)
|
||||||
}
|
}
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})(c)
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) MarkNotificationAsRead(c *fiber.Ctx) error {
|
func (h *Handler) MarkNotificationAsRead(c *fiber.Ctx) error {
|
||||||
|
|
@ -97,18 +144,18 @@ func (h *Handler) CreateAndSendNotification(c *fiber.Ctx) error {
|
||||||
return fiber.NewError(fiber.StatusBadRequest, "Invalid request body")
|
return fiber.NewError(fiber.StatusBadRequest, "Invalid request body")
|
||||||
}
|
}
|
||||||
|
|
||||||
userID, ok := c.Locals("userID").(int64)
|
// userID, ok := c.Locals("userID").(int64)
|
||||||
if !ok || userID == 0 {
|
// if !ok || userID == 0 {
|
||||||
h.logger.Error("[NotificationSvc.CreateAndSendNotification] Invalid user ID in context")
|
// h.logger.Error("[NotificationSvc.CreateAndSendNotification] Invalid user ID in context")
|
||||||
return fiber.NewError(fiber.StatusUnauthorized, "Invalid user identification")
|
// return fiber.NewError(fiber.StatusUnauthorized, "Invalid user identification")
|
||||||
}
|
// }
|
||||||
|
|
||||||
switch req.DeliveryScheme {
|
switch req.DeliveryScheme {
|
||||||
case domain.NotificationDeliverySchemeSingle:
|
case domain.NotificationDeliverySchemeSingle:
|
||||||
if req.Reciever == domain.NotificationRecieverSideCustomer && req.RecipientID != userID {
|
// if req.Reciever == domain.NotificationRecieverSideCustomer {
|
||||||
h.logger.Warn("[NotificationSvc.CreateAndSendNotification] Unauthorized attempt to send notification", "userID", userID, "recipientID", req.RecipientID)
|
// h.logger.Warn("[NotificationSvc.CreateAndSendNotification] Unauthorized attempt to send notification", "recipientID", req.RecipientID)
|
||||||
return fiber.NewError(fiber.StatusForbidden, "Unauthorized to send notification to this recipient")
|
// return fiber.NewError(fiber.StatusForbidden, "Unauthorized to send notification to this recipient")
|
||||||
}
|
// }
|
||||||
|
|
||||||
notification := &domain.Notification{
|
notification := &domain.Notification{
|
||||||
ID: "",
|
ID: "",
|
||||||
|
|
@ -177,6 +224,42 @@ func (h *Handler) CreateAndSendNotification(c *fiber.Ctx) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *Handler) GetNotifications(c *fiber.Ctx) error {
|
||||||
|
limitStr := c.Query("limit", "10")
|
||||||
|
offsetStr := c.Query("offset", "0")
|
||||||
|
|
||||||
|
// Convert limit and offset to integers
|
||||||
|
limit, err := strconv.Atoi(limitStr)
|
||||||
|
if err != nil || limit <= 0 {
|
||||||
|
h.logger.Error("[NotificationSvc.GetNotifications] Invalid limit value", "error", err)
|
||||||
|
return fiber.NewError(fiber.StatusBadRequest, "Invalid limit value")
|
||||||
|
}
|
||||||
|
offset, err := strconv.Atoi(offsetStr)
|
||||||
|
if err != nil || offset < 0 {
|
||||||
|
h.logger.Error("[NotificationSvc.GetNotifications] Invalid offset value", "error", err)
|
||||||
|
return fiber.NewError(fiber.StatusBadRequest, "Invalid offset value")
|
||||||
|
}
|
||||||
|
|
||||||
|
userID, ok := c.Locals("user_id").(int64)
|
||||||
|
if !ok || userID == 0 {
|
||||||
|
h.logger.Error("[NotificationSvc.GetNotifications] Invalid user ID in context")
|
||||||
|
return fiber.NewError(fiber.StatusUnauthorized, "Invalid user identification")
|
||||||
|
}
|
||||||
|
|
||||||
|
notifications, err := h.notificationSvc.ListNotifications(context.Background(), userID, limit, offset)
|
||||||
|
if err != nil {
|
||||||
|
h.logger.Error("[NotificationSvc.GetNotifications] Failed to fetch notifications", "error", err)
|
||||||
|
return fiber.NewError(fiber.StatusInternalServerError, "Failed to fetch notifications")
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.Status(fiber.StatusOK).JSON(fiber.Map{
|
||||||
|
"notifications": notifications,
|
||||||
|
"total_count": len(notifications),
|
||||||
|
"limit": limit,
|
||||||
|
"offset": offset,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func (h *Handler) getAllRecipientIDs(ctx context.Context, receiver domain.NotificationRecieverSide) ([]int64, error) {
|
func (h *Handler) getAllRecipientIDs(ctx context.Context, receiver domain.NotificationRecieverSide) ([]int64, error) {
|
||||||
return h.notificationSvc.ListRecipientIDs(ctx, receiver)
|
return h.notificationSvc.ListRecipientIDs(ctx, receiver)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -44,7 +44,7 @@ func (a *App) authMiddleware(c *fiber.Ctx) error {
|
||||||
|
|
||||||
}
|
}
|
||||||
// Asserting to make sure that there is no company role without a valid company id
|
// Asserting to make sure that there is no company role without a valid company id
|
||||||
if claim.Role != domain.RoleSuperAdmin && !claim.CompanyID.Valid {
|
if claim.Role != domain.RoleSuperAdmin && claim.Role != domain.RoleCustomer && !claim.CompanyID.Valid {
|
||||||
fmt.Println("Company Role without Company ID")
|
fmt.Println("Company Role without Company ID")
|
||||||
return fiber.NewError(fiber.StatusInternalServerError, "Company Role without Company ID")
|
return fiber.NewError(fiber.StatusInternalServerError, "Company Role without Company ID")
|
||||||
}
|
}
|
||||||
|
|
@ -71,3 +71,31 @@ func (a *App) CompanyOnly(c *fiber.Ctx) error {
|
||||||
}
|
}
|
||||||
return c.Next()
|
return c.Next()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *App) WebsocketAuthMiddleware(c *fiber.Ctx) error {
|
||||||
|
tokenStr := c.Query("token")
|
||||||
|
if tokenStr == "" {
|
||||||
|
a.logger.Error("Missing token in query parameter")
|
||||||
|
return fiber.NewError(fiber.StatusUnauthorized, "Missing token")
|
||||||
|
}
|
||||||
|
|
||||||
|
claim, err := jwtutil.ParseJwt(tokenStr, a.JwtConfig.JwtAccessKey)
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, jwtutil.ErrExpiredToken) {
|
||||||
|
a.logger.Error("Token expired")
|
||||||
|
return fiber.NewError(fiber.StatusUnauthorized, "Token expired")
|
||||||
|
}
|
||||||
|
a.logger.Error("Invalid token", "error", err)
|
||||||
|
return fiber.NewError(fiber.StatusUnauthorized, "Invalid token")
|
||||||
|
}
|
||||||
|
|
||||||
|
userID := claim.UserId
|
||||||
|
if userID == 0 {
|
||||||
|
a.logger.Error("Invalid user ID in token claims")
|
||||||
|
return fiber.NewError(fiber.StatusUnauthorized, "Invalid user ID")
|
||||||
|
}
|
||||||
|
|
||||||
|
c.Locals("userID", userID)
|
||||||
|
a.logger.Info("Authenticated WebSocket connection", "userID", userID)
|
||||||
|
return c.Next()
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -169,7 +169,8 @@ func (a *App) initAppRoutes() {
|
||||||
a.fiber.Put("/transaction/:id", a.authMiddleware, h.UpdateTransactionVerified)
|
a.fiber.Put("/transaction/:id", a.authMiddleware, h.UpdateTransactionVerified)
|
||||||
|
|
||||||
// Notification Routes
|
// Notification Routes
|
||||||
a.fiber.Get("/notifications/ws/connect/:recipientID", h.ConnectSocket)
|
a.fiber.Get("/ws/connect", a.WebsocketAuthMiddleware, h.ConnectSocket)
|
||||||
|
a.fiber.Get("/notifications", a.authMiddleware, h.GetNotifications)
|
||||||
a.fiber.Post("/notifications/mark-as-read", h.MarkNotificationAsRead)
|
a.fiber.Post("/notifications/mark-as-read", h.MarkNotificationAsRead)
|
||||||
a.fiber.Post("/notifications/create", h.CreateAndSendNotification)
|
a.fiber.Post("/notifications/create", h.CreateAndSendNotification)
|
||||||
|
|
||||||
|
|
|
||||||
73
internal/web_server/ws/ws.go
Normal file
73
internal/web_server/ws/ws.go
Normal file
|
|
@ -0,0 +1,73 @@
|
||||||
|
package ws
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
"net/http"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/gorilla/websocket"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Client struct {
|
||||||
|
Conn *websocket.Conn
|
||||||
|
RecipientID int64
|
||||||
|
}
|
||||||
|
|
||||||
|
type NotificationHub struct {
|
||||||
|
Clients map[*Client]bool
|
||||||
|
Broadcast chan interface{}
|
||||||
|
Register chan *Client
|
||||||
|
Unregister chan *Client
|
||||||
|
mu sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewNotificationHub() *NotificationHub {
|
||||||
|
return &NotificationHub{
|
||||||
|
Clients: make(map[*Client]bool),
|
||||||
|
Broadcast: make(chan interface{}, 1000),
|
||||||
|
Register: make(chan *Client),
|
||||||
|
Unregister: make(chan *Client),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *NotificationHub) Run() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case client := <-h.Register:
|
||||||
|
h.mu.Lock()
|
||||||
|
h.Clients[client] = true
|
||||||
|
h.mu.Unlock()
|
||||||
|
log.Printf("Client registered: %d", client.RecipientID)
|
||||||
|
case client := <-h.Unregister:
|
||||||
|
h.mu.Lock()
|
||||||
|
if _, ok := h.Clients[client]; ok {
|
||||||
|
delete(h.Clients, client)
|
||||||
|
client.Conn.Close()
|
||||||
|
}
|
||||||
|
h.mu.Unlock()
|
||||||
|
log.Printf("Client unregistered: %d", client.RecipientID)
|
||||||
|
case message := <-h.Broadcast:
|
||||||
|
h.mu.Lock()
|
||||||
|
for client := range h.Clients {
|
||||||
|
if payload, ok := message.(map[string]interface{}); ok {
|
||||||
|
if recipient, ok := payload["recipient_id"].(int64); ok && recipient == client.RecipientID {
|
||||||
|
err := client.Conn.WriteJSON(payload)
|
||||||
|
if err != nil {
|
||||||
|
delete(h.Clients, client)
|
||||||
|
client.Conn.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
h.mu.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var Upgrader = websocket.Upgrader{
|
||||||
|
ReadBufferSize: 1024,
|
||||||
|
WriteBufferSize: 1024,
|
||||||
|
CheckOrigin: func(r *http.Request) bool {
|
||||||
|
return true
|
||||||
|
},
|
||||||
|
}
|
||||||
38
makefile
38
makefile
|
|
@ -1,31 +1,44 @@
|
||||||
include .env
|
include .env
|
||||||
.PHONY: test
|
.PHONY: test
|
||||||
test:
|
test:
|
||||||
@go test ./app
|
@docker compose up -d test
|
||||||
|
@docker compose exec test go test ./...
|
||||||
|
@docker compose stop test
|
||||||
|
|
||||||
.PHONY: coverage
|
.PHONY: coverage
|
||||||
coverage:
|
coverage:
|
||||||
@mkdir -p coverage
|
@mkdir -p coverage
|
||||||
@go test -coverprofile=coverage.out ./internal/...
|
@docker compose up -d test
|
||||||
@go tool cover -func=coverage.out -o coverage/coverage.txt
|
@docker compose exec test sh -c "go test -coverprofile=coverage.out ./internal/... && go tool cover -func=coverage.out -o coverage/coverage.txt"
|
||||||
|
@docker cp $(shell docker ps -q -f "name=fortunebet-test-1"):/app/coverage ./ || true
|
||||||
|
@docker compose stop test
|
||||||
|
|
||||||
.PHONY: build
|
.PHONY: build
|
||||||
build:
|
build:
|
||||||
@go build -ldflags="-s" -o ./bin/web ./
|
@docker compose build app
|
||||||
|
|
||||||
.PHONY: run
|
.PHONY: run
|
||||||
run:
|
run:
|
||||||
@echo "Running Go application"
|
@docker compose up -d
|
||||||
@go run ./cmd/main.go
|
|
||||||
|
.PHONY: stop
|
||||||
|
stop:
|
||||||
|
@docker compose down
|
||||||
|
|
||||||
.PHONY: air
|
.PHONY: air
|
||||||
air:
|
air:
|
||||||
@echo "Running air"
|
@echo "Running air locally (not in Docker)"
|
||||||
@air -c .air.toml
|
@air -c .air.toml
|
||||||
.PHONY: migrations/up
|
|
||||||
|
.PHONY: migrations/new
|
||||||
migrations/new:
|
migrations/new:
|
||||||
@echo 'Creating migration files for DB_URL'
|
@echo 'Creating migration files for DB_URL'
|
||||||
@migrate create -seq -ext=.sql -dir=./db/migrations $(name)
|
@migrate create -seq -ext=.sql -dir=./db/migrations $(name)
|
||||||
|
|
||||||
.PHONY: migrations/up
|
.PHONY: migrations/up
|
||||||
migrations/up:
|
migrations/up:
|
||||||
@echo 'Running up migrations...'
|
@echo 'Running up migrations...'
|
||||||
@migrate -path ./db/migrations -database $(DB_URL) up
|
@docker compose up migrate
|
||||||
|
|
||||||
.PHONY: postgres
|
.PHONY: postgres
|
||||||
postgres:
|
postgres:
|
||||||
|
|
@ -35,12 +48,15 @@ postgres:
|
||||||
.PHONY: swagger
|
.PHONY: swagger
|
||||||
swagger:
|
swagger:
|
||||||
@swag init -g cmd/main.go
|
@swag init -g cmd/main.go
|
||||||
|
|
||||||
.PHONY: db-up
|
.PHONY: db-up
|
||||||
db-up:
|
db-up:
|
||||||
docker compose -f compose.db.yaml up
|
@docker compose up -d postgres
|
||||||
|
|
||||||
.PHONY: db-down
|
.PHONY: db-down
|
||||||
db-down:
|
db-down:
|
||||||
docker compose -f compose.db.yaml down
|
@docker compose down
|
||||||
|
|
||||||
.PHONY: sqlc-gen
|
.PHONY: sqlc-gen
|
||||||
sqlc-gen:
|
sqlc-gen:
|
||||||
@sqlc generate
|
@sqlc generate
|
||||||
Loading…
Reference in New Issue
Block a user