package repository import ( "context" "encoding/json" "fmt" dbgen "github.com/SamuelTariku/FortuneBet-Backend/gen/db" "github.com/SamuelTariku/FortuneBet-Backend/internal/domain" "github.com/gorilla/websocket" "github.com/jackc/pgx/v5/pgtype" ) type NotificationRepository interface { CreateNotification(ctx context.Context, notification *domain.Notification) (*domain.Notification, error) UpdateNotificationStatus(ctx context.Context, id, status string, isRead bool, metadata []byte) (*domain.Notification, error) ListNotifications(ctx context.Context, recipientID int64, limit, offset int) ([]domain.Notification, error) ListFailedNotifications(ctx context.Context, limit int) ([]domain.Notification, error) ListRecipientIDs(ctx context.Context, receiver domain.NotificationRecieverSide) ([]int64, error) CountUnreadNotifications(ctx context.Context, recipient_id int64) (int64, error) GetAllNotifications(ctx context.Context, limit, offset int) ([]domain.Notification, error) GetNotificationCounts(ctx context.Context, filter domain.ReportFilter) (total, read, unread int64, err error) } type Repository struct { store *Store } func NewNotificationRepository(store *Store) NotificationRepository { return &Repository{store: store} } func (s *Store) ConnectWebSocket(ctx context.Context, recipientID int64, c *websocket.Conn) error { return nil } func (s *Store) DisconnectWebSocket(recipientID int64) { } func (r *Repository) CreateNotification(ctx context.Context, notification *domain.Notification) (*domain.Notification, error) { var errorSeverity pgtype.Text if notification.ErrorSeverity != "" { errorSeverity.String = string(notification.ErrorSeverity) errorSeverity.Valid = true } var deliveryChannel pgtype.Text if notification.DeliveryChannel != "" { deliveryChannel.String = string(notification.DeliveryChannel) deliveryChannel.Valid = true } var priority pgtype.Int4 if notification.Priority != 0 { priority.Int32 = int32(notification.Priority) priority.Valid = true } params := dbgen.CreateNotificationParams{ ID: notification.ID, RecipientID: notification.RecipientID, Type: string(notification.Type), Level: string(notification.Level), ErrorSeverity: errorSeverity, Reciever: string(notification.Reciever), IsRead: notification.IsRead, DeliveryStatus: string(notification.DeliveryStatus), DeliveryChannel: deliveryChannel, Payload: marshalPayload(notification.Payload), Priority: priority, Timestamp: pgtype.Timestamptz{Time: notification.Timestamp, Valid: true}, Metadata: notification.Metadata, } dbNotification, err := r.store.queries.CreateNotification(ctx, params) if err != nil { return nil, err } return r.mapDBToDomain(&dbNotification), nil } func (r *Repository) UpdateNotificationStatus(ctx context.Context, id, status string, isRead bool, metadata []byte) (*domain.Notification, error) { params := dbgen.UpdateNotificationStatusParams{ ID: id, DeliveryStatus: status, IsRead: isRead, Metadata: metadata, } dbNotification, err := r.store.queries.UpdateNotificationStatus(ctx, params) if err != nil { return nil, err } return r.mapDBToDomain(&dbNotification), nil } func (r *Repository) ListNotifications(ctx context.Context, recipientID int64, limit, offset int) ([]domain.Notification, error) { params := dbgen.ListNotificationsParams{ RecipientID: recipientID, Limit: int32(limit), Offset: int32(offset), } dbNotifications, err := r.store.queries.ListNotifications(ctx, params) if err != nil { return nil, err } var result []domain.Notification = make([]domain.Notification, 0, len(dbNotifications)) for _, dbNotif := range dbNotifications { domainNotif := r.mapDBToDomain(&dbNotif) result = append(result, *domainNotif) } return result, nil } func (r *Repository) GetAllNotifications(ctx context.Context, limit, offset int) ([]domain.Notification, error) { dbNotifications, err := r.store.queries.GetAllNotifications(ctx, dbgen.GetAllNotificationsParams{ Limit: int32(limit), Offset: int32(offset), }) if err != nil { return nil, err } var result []domain.Notification = make([]domain.Notification, 0, len(dbNotifications)) for _, dbNotif := range dbNotifications { domainNotif := r.mapDBToDomain(&dbNotif) result = append(result, *domainNotif) } return result, nil } func (r *Repository) ListFailedNotifications(ctx context.Context, limit int) ([]domain.Notification, error) { dbNotifications, err := r.store.queries.ListFailedNotifications(ctx, int32(limit)) if err != nil { return nil, err } var result []domain.Notification for _, dbNotif := range dbNotifications { domainNotif := r.mapDBToDomain(&dbNotif) result = append(result, *domainNotif) } return result, nil } func (r *Repository) ListRecipientIDs(ctx context.Context, receiver domain.NotificationRecieverSide) ([]int64, error) { return r.store.queries.ListRecipientIDsByReceiver(ctx, string(receiver)) } func (r *Repository) mapDBToDomain(dbNotif *dbgen.Notification) *domain.Notification { var errorSeverity domain.NotificationErrorSeverity if dbNotif.ErrorSeverity.Valid { errorSeverity = domain.NotificationErrorSeverity(dbNotif.ErrorSeverity.String) } else { errorSeverity = "" } var deliveryChannel domain.DeliveryChannel if dbNotif.DeliveryChannel.Valid { deliveryChannel = domain.DeliveryChannel(dbNotif.DeliveryChannel.String) } else { deliveryChannel = "" } var priority int if dbNotif.Priority.Valid { priority = int(dbNotif.Priority.Int32) } payload, err := unmarshalPayload(dbNotif.Payload) if err != nil { payload = domain.NotificationPayload{} } return &domain.Notification{ ID: dbNotif.ID, RecipientID: dbNotif.RecipientID, Type: domain.NotificationType(dbNotif.Type), Level: domain.NotificationLevel(dbNotif.Level), ErrorSeverity: errorSeverity, Reciever: domain.NotificationRecieverSide(dbNotif.Reciever), IsRead: dbNotif.IsRead, DeliveryStatus: domain.NotificationDeliveryStatus(dbNotif.DeliveryStatus), DeliveryChannel: deliveryChannel, Payload: payload, Priority: priority, Timestamp: dbNotif.Timestamp.Time, Metadata: dbNotif.Metadata, } } func marshalPayload(payload domain.NotificationPayload) []byte { data, _ := json.Marshal(payload) return data } func unmarshalPayload(data []byte) (domain.NotificationPayload, error) { var payload domain.NotificationPayload if err := json.Unmarshal(data, &payload); err != nil { return domain.NotificationPayload{}, err } return payload, nil } func (r *Repository) CountUnreadNotifications(ctx context.Context, recipient_id int64) (int64, error) { return r.store.queries.CountUnreadNotifications(ctx, recipient_id) } func (r *Repository) GetNotificationCounts(ctx context.Context, filter domain.ReportFilter) (total, read, unread int64, err error) { rows, err := r.store.queries.GetNotificationCounts(ctx) if err != nil { return 0, 0, 0, fmt.Errorf("failed to get notification counts: %w", err) } // var total, read, unread int64 for _, row := range rows { total += row.Total read += row.Read unread += row.Unread } return total, read, unread, nil } func (s *Store) GetMostActiveNotificationRecipients(ctx context.Context, filter domain.ReportFilter, limit int) ([]domain.ActiveNotificationRecipient, error) { query := `SELECT n.recipient_id, u.first_name || ' ' || u.last_name as recipient_name, COUNT(*) as notification_count, MAX(n.timestamp) as last_notification_time FROM notifications n JOIN users u ON n.recipient_id = u.id WHERE n.timestamp BETWEEN $1 AND $2 GROUP BY n.recipient_id, u.first_name, u.last_name ORDER BY notification_count DESC LIMIT $3` var recipients []domain.ActiveNotificationRecipient rows, err := s.conn.Query(ctx, query, filter.StartTime.Value, filter.EndTime.Value, limit) if err != nil { return nil, fmt.Errorf("failed to get active notification recipients: %w", err) } defer rows.Close() for rows.Next() { var r domain.ActiveNotificationRecipient if err := rows.Scan(&r.RecipientID, &r.RecipientName, &r.NotificationCount, &r.LastNotificationTime); err != nil { return nil, err } recipients = append(recipients, r) } return recipients, nil } // GetNotificationDeliveryStats func (s *Store) GetNotificationDeliveryStats(ctx context.Context, filter domain.ReportFilter) (domain.NotificationDeliveryStats, error) { query := `SELECT COUNT(*) as total_sent, COUNT(CASE WHEN delivery_status = 'failed' THEN 1 END) as failed_deliveries, (COUNT(CASE WHEN delivery_status = 'sent' THEN 1 END) * 100.0 / NULLIF(COUNT(*), 0)) as success_rate, MODE() WITHIN GROUP (ORDER BY delivery_channel) as most_used_channel FROM notifications WHERE timestamp BETWEEN $1 AND $2` var stats domain.NotificationDeliveryStats row := s.conn.QueryRow(ctx, query, filter.StartTime.Value, filter.EndTime.Value) err := row.Scan(&stats.TotalSent, &stats.FailedDeliveries, &stats.SuccessRate, &stats.MostUsedChannel) if err != nil { return domain.NotificationDeliveryStats{}, fmt.Errorf("failed to get notification delivery stats: %w", err) } return stats, nil } // GetNotificationCountsByType func (s *Store) GetNotificationCountsByType(ctx context.Context, filter domain.ReportFilter) (map[string]domain.NotificationTypeCount, error) { query := `SELECT type, COUNT(*) as total, COUNT(CASE WHEN is_read = true THEN 1 END) as read, COUNT(CASE WHEN is_read = false THEN 1 END) as unread FROM notifications WHERE timestamp BETWEEN $1 AND $2 GROUP BY type` counts := make(map[string]domain.NotificationTypeCount) rows, err := s.conn.Query(ctx, query, filter.StartTime.Value, filter.EndTime.Value) if err != nil { return nil, fmt.Errorf("failed to get notification counts by type: %w", err) } defer rows.Close() for rows.Next() { var nt domain.NotificationTypeCount var typ string if err := rows.Scan(&typ, &nt.Total, &nt.Read, &nt.Unread); err != nil { return nil, err } counts[typ] = nt } return counts, nil } func (s *Store) CountUnreadNotifications(ctx context.Context, userID int64) (int64, error) { count, err := s.queries.CountUnreadNotifications(ctx, userID) if err != nil { return 0, err } return count, nil } // func (s *Store) GetAllNotifications(ctx context.Context, limit, offset int) ([]domain.Notification, error) { // dbNotifications, err := s.queries.GetAllNotifications(ctx, dbgen.GetAllNotificationsParams{ // Limit: int32(limit), // Offset: int32(offset), // }) // if err != nil { // return nil, err // } // result := make([]domain.Notification, 0, len(dbNotifications)) // for _, dbNotif := range dbNotifications { // // You may want to move this mapping logic to a shared function if not already present // var errorSeverity *domain.NotificationErrorSeverity // if dbNotif.ErrorSeverity.Valid { // s := domain.NotificationErrorSeverity(dbNotif.ErrorSeverity.String) // errorSeverity = &s // } // var deliveryChannel domain.DeliveryChannel // if dbNotif.DeliveryChannel.Valid { // deliveryChannel = domain.DeliveryChannel(dbNotif.DeliveryChannel.String) // } else { // deliveryChannel = "" // } // var priority int // if dbNotif.Priority.Valid { // priority = int(dbNotif.Priority.Int32) // } // payload, err := unmarshalPayload(dbNotif.Payload) // if err != nil { // payload = domain.NotificationPayload{} // } // result = append(result, domain.Notification{ // ID: dbNotif.ID, // RecipientID: dbNotif.RecipientID, // Type: domain.NotificationType(dbNotif.Type), // Level: domain.NotificationLevel(dbNotif.Level), // ErrorSeverity: errorSeverity, // Reciever: domain.NotificationRecieverSide(dbNotif.Reciever), // IsRead: dbNotif.IsRead, // DeliveryStatus: domain.NotificationDeliveryStatus(dbNotif.DeliveryStatus), // DeliveryChannel: deliveryChannel, // Payload: payload, // Priority: priority, // Timestamp: dbNotif.Timestamp.Time, // Metadata: dbNotif.Metadata, // }) // } // return result, nil // }