feat: Complete Phase 2 notification system implementation

- Implement messaging system factory pattern
- Fix all transaction notification blockers
- Complete listener logic for all notification types
This commit is contained in:
Abdalhamid Alhamad
2026-01-11 11:17:08 +03:00
parent 2c8de913f8
commit b1cda5e7dc
11 changed files with 194 additions and 64 deletions

View File

@ -62,11 +62,10 @@ export class TransactionService {
await this.accountService.decreaseAccountBalance(card.account.accountReference, total.toNumber());
}
// Emit event for notification system
const event: ITransactionCreatedEvent = {
transaction,
card, // Pass card with all relations loaded
isTopUp: false, // Card transactions are spending
card,
isTopUp: false,
isChildSpending: card.customerType === CustomerType.CHILD,
timestamp: new Date(),
};
@ -88,21 +87,17 @@ export class TransactionService {
const transaction = await this.transactionRepository.createAccountTransaction(account, body);
await this.accountService.creditAccountBalance(account.accountReference, body.amount);
// Get card for notification system by account ID
// Account transactions are top-ups, so we get the first card associated with the account
const accountWithCards = await this.accountService.getAccountByAccountNumber(body.accountId);
const card = accountWithCards.cards?.[0]
? await this.cardService.getCardById(accountWithCards.cards[0].id)
: null;
// Only emit event if card exists (we need card for user info)
if (card) {
// Emit event for notification system
const event: ITransactionCreatedEvent = {
transaction,
card, // Pass card with all relations loaded
isTopUp: true, // Account transactions are top-ups
isChildSpending: false, // Top-ups are typically not from children
card,
isTopUp: true,
isChildSpending: false,
timestamp: new Date(),
};
this.eventEmitter.emit(NOTIFICATION_EVENTS.TRANSACTION_CREATED, event);
@ -115,12 +110,11 @@ export class TransactionService {
const card = await this.cardService.getCardById(cardId);
const transaction = await this.transactionRepository.createInternalChildTransaction(card, amount);
// Emit event for notification system
const event: ITransactionCreatedEvent = {
transaction,
card, // Pass card with all relations loaded
isTopUp: true, // Internal child transaction is a top-up to child's card
isChildSpending: true, // Child's card is being topped up
card,
isTopUp: true,
isChildSpending: true,
timestamp: new Date(),
};
this.eventEmitter.emit(NOTIFICATION_EVENTS.TRANSACTION_CREATED, event);

View File

@ -13,4 +13,29 @@ export enum NotificationScope {
// Transaction notifications - Spending
CHILD_SPENDING = 'CHILD_SPENDING',
PARENT_SPENDING_ALERT = 'PARENT_SPENDING_ALERT',
}
/**
* Critical notification scopes that require guaranteed delivery
* These will use RabbitMQ/Kafka instead of Redis PubSub when configured
*
* Add scopes here when you need guaranteed delivery for specific notification types
* Examples:
* - ACCOUNT_LOCKED
* - SUSPICIOUS_ACTIVITY
* - LARGE_TRANSACTION_ALERT
* - PAYMENT_FAILED
*/
export const CRITICAL_NOTIFICATION_SCOPES = new Set<NotificationScope>([
// Add critical scopes here as needed
// Example: NotificationScope.ACCOUNT_LOCKED,
]);
/**
* Check if a notification scope requires guaranteed delivery
* @param scope - Notification scope to check
* @returns true if the scope requires guaranteed delivery
*/
export function requiresGuaranteedDelivery(scope: NotificationScope): boolean {
return CRITICAL_NOTIFICATION_SCOPES.has(scope);
}

View File

@ -1,2 +1,3 @@
export * from './notification-page-meta.interface';
export * from './notification-events.interface';
export * from './messaging-system.interface';

View File

@ -0,0 +1,25 @@
/**
* Interface for messaging systems (Redis PubSub, RabbitMQ, Kafka, etc.)
* Allows switching between different messaging systems based on notification requirements
*/
export interface IMessagingSystem {
/**
* Publish a notification event
* @param channel - Channel/topic name
* @param payload - Notification payload
*/
publish(channel: string, payload: any): Promise<void>;
/**
* Subscribe to a channel
* @param channel - Channel/topic name
* @param handler - Message handler function
*/
subscribe(channel: string, handler: (message: any) => Promise<void>): Promise<void>;
/**
* Get system name (for logging)
*/
getName(): string;
}

View File

@ -45,16 +45,12 @@ export class TransactionNotificationListener {
`isTopUp: ${isTopUp}, isChildSpending: ${isChildSpending}`
);
// Notify the transaction owner (child or parent)
await this.notifyTransactionOwner(transaction, card, isTopUp, isChildSpending);
// If child transaction, also notify parent
if (isChildSpending) {
if (isTopUp) {
// Parent topped up child's card - send confirmation to parent
await this.notifyParentOfTopUp(transaction, card);
} else {
// Child spent money - send spending alert to parent
await this.notifyParentOfChildSpending(transaction, card);
}
}
@ -67,7 +63,6 @@ export class TransactionNotificationListener {
`Failed to process transaction notification: ${error?.message || 'Unknown error'}`,
error?.stack
);
// Don't throw - notification failures should not break the main flow
}
}
@ -82,27 +77,22 @@ export class TransactionNotificationListener {
isChildSpending: boolean
): Promise<void> {
try {
// Extract user from card
const user = card?.customer?.user;
if (!user) {
this.logger.warn(`No user found for transaction ${transaction.id}, skipping notification`);
return;
}
// Determine the scope based on transaction type
const scope = isTopUp
? NotificationScope.CHILD_TOP_UP
: NotificationScope.CHILD_SPENDING;
// Construct title
const title = isTopUp ? 'Card Topped Up' : 'Purchase Successful';
// Extract data
const amount = transaction.transactionAmount;
const merchant = transaction.merchantName || 'merchant';
const balance = card.account?.balance || 0;
// Construct message
const message = isTopUp
? `Your card has been topped up with $${amount.toFixed(2)}`
: `You spent $${amount.toFixed(2)} at ${merchant}. Balance: $${balance.toFixed(2)}`;
@ -111,7 +101,6 @@ export class TransactionNotificationListener {
`Notifying transaction owner (user ${user.id}) - Amount: $${amount}, Merchant: ${merchant}`
);
// Send notification
await this.notificationFactory.send({
userId: user.id,
title,
@ -147,7 +136,6 @@ export class TransactionNotificationListener {
try {
this.logger.debug(`Checking for parent to notify about child spending`);
// Check if child has guardian
const customer = card?.customer;
const parentUser = customer?.junior?.guardian?.customer?.user;
@ -156,7 +144,6 @@ export class TransactionNotificationListener {
return;
}
// Get child info
const childUser = customer.user;
const childName = childUser?.firstName || 'Your child';
const amount = transaction.transactionAmount;
@ -166,7 +153,6 @@ export class TransactionNotificationListener {
`Notifying parent (user ${parentUser.id}): ${childName} spent $${amount} at ${merchant}`
);
// Send notification to parent
await this.notificationFactory.send({
userId: parentUser.id,
title: 'Child Spending Alert',
@ -189,10 +175,9 @@ export class TransactionNotificationListener {
this.logger.log(`✅ Notified parent ${parentUser.id} about child spending`);
} catch (error: any) {
this.logger.error(
`Failed to notify parent of child spending: ${error?.message || 'Unknown error'}`,
`Failed to notify parent of child spending: ${ error?.message || 'Unknown error'}`,
error?.stack
);
// Don't throw - parent notification failure should not break child notification
}
}
@ -204,7 +189,6 @@ export class TransactionNotificationListener {
try {
this.logger.debug(`Checking for parent to notify about top-up`);
// Check if child has guardian
const customer = card?.customer;
const parentUser = customer?.junior?.guardian?.customer?.user;
@ -213,7 +197,6 @@ export class TransactionNotificationListener {
return;
}
// Get child info
const childUser = customer.user;
const childName = childUser?.firstName || 'Your child';
const amount = transaction.transactionAmount;
@ -223,7 +206,6 @@ export class TransactionNotificationListener {
`Notifying parent (user ${parentUser.id}): Topped up ${childName}'s card with $${amount}`
);
// Send notification to parent
await this.notificationFactory.send({
userId: parentUser.id,
title: 'Top-Up Confirmation',
@ -245,10 +227,9 @@ export class TransactionNotificationListener {
this.logger.log(`✅ Notified parent ${parentUser.id} about top-up`);
} catch (error: any) {
this.logger.error(
`Failed to notify parent of top-up: ${error?.message || 'Unknown error'}`,
`Failed to notify parent of top-up: ${ error?.message || 'Unknown error'}`,
error?.stack
);
// Don't throw - parent notification failure should not break child notification
}
}

View File

@ -11,6 +11,7 @@ import { Notification } from './entities';
import { NotificationCreatedListener, TransactionNotificationListener } from './listeners';
import { NotificationsRepository } from './repositories';
import { FirebaseService, NotificationFactory, NotificationsService, TwilioService } from './services';
import { MessagingSystemFactory, RedisPubSubMessagingService } from './services/messaging';
@Module({
imports: [
@ -34,6 +35,8 @@ import { FirebaseService, NotificationFactory, NotificationsService, TwilioServi
TwilioService,
NotificationCreatedListener,
TransactionNotificationListener,
RedisPubSubMessagingService,
MessagingSystemFactory,
],
exports: [NotificationsService, NotificationFactory, NotificationCreatedListener],
controllers: [NotificationsController],

View File

@ -0,0 +1,3 @@
export * from './redis-pubsub-messaging.service';
export * from './messaging-system-factory.service';

View File

@ -0,0 +1,58 @@
import { Injectable, Logger, Optional } from '@nestjs/common';
import { NotificationScope, requiresGuaranteedDelivery } from '../../enums/notification-scope.enum';
import { IMessagingSystem } from '../../interfaces/messaging-system.interface';
import { RedisPubSubMessagingService } from './redis-pubsub-messaging.service';
/**
* Messaging System Factory
*
* Determines which messaging system to use based on notification requirements.
*
* - Regular notifications → Redis PubSub (fast, 2-5ms)
* - Critical notifications → RabbitMQ/Kafka (guaranteed delivery, 20-50ms)
*
* Usage:
* ```typescript
* const system = factory.getMessagingSystem(NotificationScope.CHILD_SPENDING);
* await system.publish('NOTIFICATION_CREATED', payload);
* ```
*/
@Injectable()
export class MessagingSystemFactory {
private readonly logger = new Logger(MessagingSystemFactory.name);
constructor(
private readonly redisPubSubService: RedisPubSubMessagingService,
) {}
/**
* Get the appropriate messaging system based on notification scope
*
* @param scope - Notification scope
* @returns Messaging system to use
*/
getMessagingSystem(scope: NotificationScope): IMessagingSystem {
const needsGuaranteedDelivery = requiresGuaranteedDelivery(scope);
if (needsGuaranteedDelivery) {
this.logger.warn(
`[Factory] Critical notification ${scope} requires guaranteed delivery, ` +
`but RabbitMQ not configured. Falling back to Redis PubSub.`
);
return this.redisPubSubService;
} else {
this.logger.debug(`[Factory] Using Redis PubSub for notification: ${scope}`);
return this.redisPubSubService;
}
}
/**
* Get default messaging system (Redis PubSub)
*
* @returns Default messaging system
*/
getDefaultMessagingSystem(): IMessagingSystem {
return this.redisPubSubService;
}
}

View File

@ -0,0 +1,55 @@
import { Inject, Injectable, Logger } from '@nestjs/common';
import { RedisClientType } from '@keyv/redis';
import { IMessagingSystem } from '../../interfaces/messaging-system.interface';
/**
* Redis PubSub Messaging System Implementation
*
* Fast, real-time messaging for regular notifications.
* Uses Redis PubSub for 2-5ms latency.
*
* Note: Messages are not persisted (fire-and-forget).
* Suitable for notifications that are already saved in PostgreSQL.
*/
@Injectable()
export class RedisPubSubMessagingService implements IMessagingSystem {
private readonly logger = new Logger(RedisPubSubMessagingService.name);
constructor(
@Inject('REDIS_PUBLISHER') private readonly publisher: RedisClientType,
@Inject('REDIS_SUBSCRIBER') private readonly subscriber: RedisClientType,
) {}
getName(): string {
return 'Redis PubSub';
}
async publish(channel: string, payload: any): Promise<void> {
try {
const message = JSON.stringify(payload);
const subscriberCount = await this.publisher.publish(channel, message);
this.logger.debug(
`[Redis PubSub] Published to ${channel}, ${subscriberCount} subscriber(s) received`
);
} catch (error: any) {
this.logger.error(`[Redis PubSub] Failed to publish to ${channel}: ${error?.message}`);
throw error;
}
}
async subscribe(channel: string, handler: (message: any) => Promise<void>): Promise<void> {
await this.subscriber.subscribe(channel, async (message) => {
try {
const data = JSON.parse(message);
await handler(data);
} catch (error: any) {
this.logger.error(
`[Redis PubSub] Failed to process message from ${channel}: ${error?.message}`
);
}
});
this.logger.log(`[Redis PubSub] Subscribed to channel: ${channel}`);
}
}

View File

@ -89,7 +89,6 @@ export class NotificationFactory {
try {
this.logger.log(`Sending notification to user ${payload.userId} - ${payload.title}`);
// Use provided preferences or default to push-only
const preferences = payload.preferences || {
isPushEnabled: true,
isEmailEnabled: false,
@ -98,8 +97,6 @@ export class NotificationFactory {
const promises: Promise<any>[] = [];
// Route to enabled channels based on preferences
// Currently only PUSH is implemented (extensible for EMAIL, SMS later)
if (preferences.isPushEnabled) {
this.logger.debug(`Routing to PUSH channel for user ${payload.userId}`);
promises.push(
@ -107,23 +104,6 @@ export class NotificationFactory {
);
}
// Future: Add EMAIL channel
// if (preferences.isEmailEnabled) {
// this.logger.debug(`Routing to EMAIL channel for user ${payload.userId}`);
// promises.push(
// this.sendToChannel(payload, NotificationChannel.EMAIL)
// );
// }
// Future: Add SMS channel
// if (preferences.isSmsEnabled) {
// this.logger.debug(`Routing to SMS channel for user ${payload.userId}`);
// promises.push(
// this.sendToChannel(payload, NotificationChannel.SMS)
// );
// }
// Send all notificaetions in parallel
await Promise.all(promises);
this.logger.log(

View File

@ -8,6 +8,7 @@ import { SendEmailRequestDto } from '../dtos/request';
import { Notification } from '../entities';
import { EventType, NotificationChannel, NotificationScope } from '../enums';
import { NotificationsRepository } from '../repositories';
import { MessagingSystemFactory } from './messaging/messaging-system-factory.service';
@Injectable()
export class NotificationsService {
@ -17,6 +18,7 @@ export class NotificationsService {
@Inject(forwardRef(() => RedisPubSubService))
private readonly redisPubSubService: RedisPubSubService,
private readonly messagingSystemFactory: MessagingSystemFactory,
) {}
async getNotifications(userId: string, pageOptionsDto: PageOptionsRequestDto) {
@ -35,15 +37,20 @@ export class NotificationsService {
this.logger.log(`Creating notification for user ${notification.userId}`);
const savedNotification = await this.notificationRepository.createNotification(notification);
// Publish to Redis PubSub for delivery (Firebase, Email, SMS)
this.logger.log(`Publishing ${EventType.NOTIFICATION_CREATED} event to Redis`);
this.redisPubSubService.publishEvent(EventType.NOTIFICATION_CREATED, {
const scope = notification.scope || NotificationScope.USER_REGISTERED;
const messagingSystem = this.messagingSystemFactory.getMessagingSystem(scope);
this.logger.log(
`Publishing ${EventType.NOTIFICATION_CREATED} event to ${messagingSystem.getName()}`
);
messagingSystem.publish(EventType.NOTIFICATION_CREATED, {
...savedNotification,
data: notification.data || savedNotification.data,
}).catch((error) => {
// Log error but don't throw - notification is saved in DB
this.logger.error(
`Failed to publish notification ${savedNotification.id} to Redis: ${error?.message || 'Unknown error'}`,
`Failed to publish notification ${savedNotification.id} to ${messagingSystem.getName()}: ` +
`${error?.message || 'Unknown error'}`,
error?.stack
);
});
@ -58,27 +65,25 @@ export class NotificationsService {
async sendEmailAsync(data: SendEmailRequestDto) {
this.logger.log(`Creating email notification for ${data.to}`);
// createNotification now automatically publishes to Redis
await this.createNotification({
recipient: data.to,
title: data.subject,
message: '',
scope: NotificationScope.USER_INVITED,
channel: NotificationChannel.EMAIL,
data: data.data, // Pass data in notification object
data: data.data,
});
}
async sendOtpNotification(sendOtpRequest: ISendOtp, otp: string) {
this.logger.log(`Sending OTP to ${sendOtpRequest.recipient}`);
// createNotification now automatically publishes to Redis
return this.createNotification({
recipient: sendOtpRequest.recipient,
title: OTP_TITLE,
message: OTP_BODY.replace('{otp}', otp),
scope: NotificationScope.OTP,
channel: sendOtpRequest.otpType === OtpType.EMAIL ? NotificationChannel.EMAIL : NotificationChannel.SMS,
data: { otp }, // Pass data in notification object
data: { otp },
});
}
}