375 lines
12 KiB
Go
375 lines
12 KiB
Go
package repository
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
|
|
dbgen "github.com/SamuelTariku/FortuneBet-Backend/gen/db"
|
|
"github.com/SamuelTariku/FortuneBet-Backend/internal/domain"
|
|
"github.com/gorilla/websocket"
|
|
"github.com/jackc/pgx/v5/pgtype"
|
|
)
|
|
|
|
type NotificationRepository interface {
|
|
CreateNotification(ctx context.Context, notification *domain.Notification) (*domain.Notification, error)
|
|
UpdateNotificationStatus(ctx context.Context, id, status string, isRead bool, metadata []byte) (*domain.Notification, error)
|
|
ListNotifications(ctx context.Context, recipientID int64, limit, offset int) ([]domain.Notification, error)
|
|
ListFailedNotifications(ctx context.Context, limit int) ([]domain.Notification, error)
|
|
ListRecipientIDs(ctx context.Context, receiver domain.NotificationRecieverSide) ([]int64, error)
|
|
CountUnreadNotifications(ctx context.Context, recipient_id int64) (int64, error)
|
|
GetAllNotifications(ctx context.Context, limit, offset int) ([]domain.Notification, error)
|
|
GetNotificationCounts(ctx context.Context, filter domain.ReportFilter) (total, read, unread int64, err error)
|
|
}
|
|
|
|
type Repository struct {
|
|
store *Store
|
|
}
|
|
|
|
func NewNotificationRepository(store *Store) NotificationRepository {
|
|
return &Repository{store: store}
|
|
}
|
|
|
|
func (s *Store) ConnectWebSocket(ctx context.Context, recipientID int64, c *websocket.Conn) error {
|
|
return nil
|
|
}
|
|
|
|
func (s *Store) DisconnectWebSocket(recipientID int64) {
|
|
}
|
|
|
|
func (r *Repository) CreateNotification(ctx context.Context, notification *domain.Notification) (*domain.Notification, error) {
|
|
var errorSeverity pgtype.Text
|
|
if notification.ErrorSeverity != "" {
|
|
errorSeverity.String = string(notification.ErrorSeverity)
|
|
errorSeverity.Valid = true
|
|
}
|
|
|
|
var deliveryChannel pgtype.Text
|
|
if notification.DeliveryChannel != "" {
|
|
deliveryChannel.String = string(notification.DeliveryChannel)
|
|
deliveryChannel.Valid = true
|
|
}
|
|
|
|
var priority pgtype.Int4
|
|
if notification.Priority != 0 {
|
|
priority.Int32 = int32(notification.Priority)
|
|
priority.Valid = true
|
|
}
|
|
|
|
params := dbgen.CreateNotificationParams{
|
|
ID: notification.ID,
|
|
RecipientID: notification.RecipientID,
|
|
Type: string(notification.Type),
|
|
Level: string(notification.Level),
|
|
ErrorSeverity: errorSeverity,
|
|
Reciever: string(notification.Reciever),
|
|
IsRead: notification.IsRead,
|
|
DeliveryStatus: string(notification.DeliveryStatus),
|
|
DeliveryChannel: deliveryChannel,
|
|
Payload: marshalPayload(notification.Payload),
|
|
Priority: priority,
|
|
Timestamp: pgtype.Timestamptz{Time: notification.Timestamp, Valid: true},
|
|
Metadata: notification.Metadata,
|
|
}
|
|
|
|
dbNotification, err := r.store.queries.CreateNotification(ctx, params)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return r.mapDBToDomain(&dbNotification), nil
|
|
}
|
|
|
|
func (r *Repository) UpdateNotificationStatus(ctx context.Context, id, status string, isRead bool, metadata []byte) (*domain.Notification, error) {
|
|
params := dbgen.UpdateNotificationStatusParams{
|
|
ID: id,
|
|
DeliveryStatus: status,
|
|
IsRead: isRead,
|
|
Metadata: metadata,
|
|
}
|
|
|
|
dbNotification, err := r.store.queries.UpdateNotificationStatus(ctx, params)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return r.mapDBToDomain(&dbNotification), nil
|
|
}
|
|
|
|
func (r *Repository) ListNotifications(ctx context.Context, recipientID int64, limit, offset int) ([]domain.Notification, error) {
|
|
params := dbgen.ListNotificationsParams{
|
|
RecipientID: recipientID,
|
|
Limit: int32(limit),
|
|
Offset: int32(offset),
|
|
}
|
|
|
|
dbNotifications, err := r.store.queries.ListNotifications(ctx, params)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var result []domain.Notification = make([]domain.Notification, 0, len(dbNotifications))
|
|
for _, dbNotif := range dbNotifications {
|
|
domainNotif := r.mapDBToDomain(&dbNotif)
|
|
result = append(result, *domainNotif)
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func (r *Repository) GetAllNotifications(ctx context.Context, limit, offset int) ([]domain.Notification, error) {
|
|
|
|
dbNotifications, err := r.store.queries.GetAllNotifications(ctx, dbgen.GetAllNotificationsParams{
|
|
Limit: int32(limit),
|
|
Offset: int32(offset),
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var result []domain.Notification = make([]domain.Notification, 0, len(dbNotifications))
|
|
for _, dbNotif := range dbNotifications {
|
|
domainNotif := r.mapDBToDomain(&dbNotif)
|
|
result = append(result, *domainNotif)
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
func (r *Repository) ListFailedNotifications(ctx context.Context, limit int) ([]domain.Notification, error) {
|
|
dbNotifications, err := r.store.queries.ListFailedNotifications(ctx, int32(limit))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var result []domain.Notification
|
|
for _, dbNotif := range dbNotifications {
|
|
domainNotif := r.mapDBToDomain(&dbNotif)
|
|
result = append(result, *domainNotif)
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func (r *Repository) ListRecipientIDs(ctx context.Context, receiver domain.NotificationRecieverSide) ([]int64, error) {
|
|
return r.store.queries.ListRecipientIDsByReceiver(ctx, string(receiver))
|
|
}
|
|
|
|
func (r *Repository) mapDBToDomain(dbNotif *dbgen.Notification) *domain.Notification {
|
|
var errorSeverity domain.NotificationErrorSeverity
|
|
if dbNotif.ErrorSeverity.Valid {
|
|
errorSeverity = domain.NotificationErrorSeverity(dbNotif.ErrorSeverity.String)
|
|
|
|
} else {
|
|
errorSeverity = ""
|
|
}
|
|
|
|
var deliveryChannel domain.DeliveryChannel
|
|
if dbNotif.DeliveryChannel.Valid {
|
|
deliveryChannel = domain.DeliveryChannel(dbNotif.DeliveryChannel.String)
|
|
} else {
|
|
deliveryChannel = ""
|
|
}
|
|
|
|
var priority int
|
|
if dbNotif.Priority.Valid {
|
|
priority = int(dbNotif.Priority.Int32)
|
|
}
|
|
|
|
payload, err := unmarshalPayload(dbNotif.Payload)
|
|
if err != nil {
|
|
payload = domain.NotificationPayload{}
|
|
}
|
|
|
|
return &domain.Notification{
|
|
ID: dbNotif.ID,
|
|
RecipientID: dbNotif.RecipientID,
|
|
Type: domain.NotificationType(dbNotif.Type),
|
|
Level: domain.NotificationLevel(dbNotif.Level),
|
|
ErrorSeverity: errorSeverity,
|
|
Reciever: domain.NotificationRecieverSide(dbNotif.Reciever),
|
|
IsRead: dbNotif.IsRead,
|
|
DeliveryStatus: domain.NotificationDeliveryStatus(dbNotif.DeliveryStatus),
|
|
DeliveryChannel: deliveryChannel,
|
|
Payload: payload,
|
|
Priority: priority,
|
|
Timestamp: dbNotif.Timestamp.Time,
|
|
Metadata: dbNotif.Metadata,
|
|
}
|
|
}
|
|
|
|
func marshalPayload(payload domain.NotificationPayload) []byte {
|
|
data, _ := json.Marshal(payload)
|
|
return data
|
|
}
|
|
|
|
func unmarshalPayload(data []byte) (domain.NotificationPayload, error) {
|
|
var payload domain.NotificationPayload
|
|
if err := json.Unmarshal(data, &payload); err != nil {
|
|
return domain.NotificationPayload{}, err
|
|
}
|
|
return payload, nil
|
|
}
|
|
|
|
func (r *Repository) CountUnreadNotifications(ctx context.Context, recipient_id int64) (int64, error) {
|
|
return r.store.queries.CountUnreadNotifications(ctx, recipient_id)
|
|
}
|
|
|
|
func (r *Repository) GetNotificationCounts(ctx context.Context, filter domain.ReportFilter) (total, read, unread int64, err error) {
|
|
rows, err := r.store.queries.GetNotificationCounts(ctx)
|
|
if err != nil {
|
|
return 0, 0, 0, fmt.Errorf("failed to get notification counts: %w", err)
|
|
}
|
|
|
|
// var total, read, unread int64
|
|
for _, row := range rows {
|
|
total += row.Total
|
|
read += row.Read
|
|
unread += row.Unread
|
|
}
|
|
|
|
return total, read, unread, nil
|
|
}
|
|
|
|
func (s *Store) GetMostActiveNotificationRecipients(ctx context.Context, filter domain.ReportFilter, limit int) ([]domain.ActiveNotificationRecipient, error) {
|
|
query := `SELECT
|
|
n.recipient_id,
|
|
u.first_name || ' ' || u.last_name as recipient_name,
|
|
COUNT(*) as notification_count,
|
|
MAX(n.timestamp) as last_notification_time
|
|
FROM notifications n
|
|
JOIN users u ON n.recipient_id = u.id
|
|
WHERE n.timestamp BETWEEN $1 AND $2
|
|
GROUP BY n.recipient_id, u.first_name, u.last_name
|
|
ORDER BY notification_count DESC
|
|
LIMIT $3`
|
|
|
|
var recipients []domain.ActiveNotificationRecipient
|
|
rows, err := s.conn.Query(ctx, query, filter.StartTime.Value, filter.EndTime.Value, limit)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get active notification recipients: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
for rows.Next() {
|
|
var r domain.ActiveNotificationRecipient
|
|
if err := rows.Scan(&r.RecipientID, &r.RecipientName, &r.NotificationCount, &r.LastNotificationTime); err != nil {
|
|
return nil, err
|
|
}
|
|
recipients = append(recipients, r)
|
|
}
|
|
|
|
return recipients, nil
|
|
}
|
|
|
|
// GetNotificationDeliveryStats
|
|
func (s *Store) GetNotificationDeliveryStats(ctx context.Context, filter domain.ReportFilter) (domain.NotificationDeliveryStats, error) {
|
|
query := `SELECT
|
|
COUNT(*) as total_sent,
|
|
COUNT(CASE WHEN delivery_status = 'failed' THEN 1 END) as failed_deliveries,
|
|
(COUNT(CASE WHEN delivery_status = 'sent' THEN 1 END) * 100.0 / NULLIF(COUNT(*), 0)) as success_rate,
|
|
MODE() WITHIN GROUP (ORDER BY delivery_channel) as most_used_channel
|
|
FROM notifications
|
|
WHERE timestamp BETWEEN $1 AND $2`
|
|
|
|
var stats domain.NotificationDeliveryStats
|
|
row := s.conn.QueryRow(ctx, query, filter.StartTime.Value, filter.EndTime.Value)
|
|
err := row.Scan(&stats.TotalSent, &stats.FailedDeliveries, &stats.SuccessRate, &stats.MostUsedChannel)
|
|
if err != nil {
|
|
return domain.NotificationDeliveryStats{}, fmt.Errorf("failed to get notification delivery stats: %w", err)
|
|
}
|
|
|
|
return stats, nil
|
|
}
|
|
|
|
// GetNotificationCountsByType
|
|
func (s *Store) GetNotificationCountsByType(ctx context.Context, filter domain.ReportFilter) (map[string]domain.NotificationTypeCount, error) {
|
|
query := `SELECT
|
|
type,
|
|
COUNT(*) as total,
|
|
COUNT(CASE WHEN is_read = true THEN 1 END) as read,
|
|
COUNT(CASE WHEN is_read = false THEN 1 END) as unread
|
|
FROM notifications
|
|
WHERE timestamp BETWEEN $1 AND $2
|
|
GROUP BY type`
|
|
|
|
counts := make(map[string]domain.NotificationTypeCount)
|
|
rows, err := s.conn.Query(ctx, query, filter.StartTime.Value, filter.EndTime.Value)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get notification counts by type: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
for rows.Next() {
|
|
var nt domain.NotificationTypeCount
|
|
var typ string
|
|
if err := rows.Scan(&typ, &nt.Total, &nt.Read, &nt.Unread); err != nil {
|
|
return nil, err
|
|
}
|
|
counts[typ] = nt
|
|
}
|
|
|
|
return counts, nil
|
|
}
|
|
|
|
func (s *Store) CountUnreadNotifications(ctx context.Context, userID int64) (int64, error) {
|
|
count, err := s.queries.CountUnreadNotifications(ctx, userID)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return count, nil
|
|
}
|
|
|
|
// func (s *Store) GetAllNotifications(ctx context.Context, limit, offset int) ([]domain.Notification, error) {
|
|
// dbNotifications, err := s.queries.GetAllNotifications(ctx, dbgen.GetAllNotificationsParams{
|
|
// Limit: int32(limit),
|
|
// Offset: int32(offset),
|
|
// })
|
|
// if err != nil {
|
|
// return nil, err
|
|
// }
|
|
|
|
// result := make([]domain.Notification, 0, len(dbNotifications))
|
|
// for _, dbNotif := range dbNotifications {
|
|
// // You may want to move this mapping logic to a shared function if not already present
|
|
// var errorSeverity *domain.NotificationErrorSeverity
|
|
// if dbNotif.ErrorSeverity.Valid {
|
|
// s := domain.NotificationErrorSeverity(dbNotif.ErrorSeverity.String)
|
|
// errorSeverity = &s
|
|
// }
|
|
|
|
// var deliveryChannel domain.DeliveryChannel
|
|
// if dbNotif.DeliveryChannel.Valid {
|
|
// deliveryChannel = domain.DeliveryChannel(dbNotif.DeliveryChannel.String)
|
|
// } else {
|
|
// deliveryChannel = ""
|
|
// }
|
|
|
|
// var priority int
|
|
// if dbNotif.Priority.Valid {
|
|
// priority = int(dbNotif.Priority.Int32)
|
|
// }
|
|
|
|
// payload, err := unmarshalPayload(dbNotif.Payload)
|
|
// if err != nil {
|
|
// payload = domain.NotificationPayload{}
|
|
// }
|
|
|
|
// result = append(result, domain.Notification{
|
|
// ID: dbNotif.ID,
|
|
// RecipientID: dbNotif.RecipientID,
|
|
// Type: domain.NotificationType(dbNotif.Type),
|
|
// Level: domain.NotificationLevel(dbNotif.Level),
|
|
// ErrorSeverity: errorSeverity,
|
|
// Reciever: domain.NotificationRecieverSide(dbNotif.Reciever),
|
|
// IsRead: dbNotif.IsRead,
|
|
// DeliveryStatus: domain.NotificationDeliveryStatus(dbNotif.DeliveryStatus),
|
|
// DeliveryChannel: deliveryChannel,
|
|
// Payload: payload,
|
|
// Priority: priority,
|
|
// Timestamp: dbNotif.Timestamp.Time,
|
|
// Metadata: dbNotif.Metadata,
|
|
// })
|
|
// }
|
|
// return result, nil
|
|
// }
|