Compare commits

...

2 Commits

Author SHA1 Message Date
63b0a42eca feat: enhance Redis module exports for pub/sub functionality
- Added 'REDIS_PUBLISHER' and 'REDIS_SUBSCRIBER' to the exports of RedisModule to improve pub/sub capabilities.
2026-01-11 14:38:59 +03:00
b1cda5e7dc feat: Complete Phase 2 notification system implementation
- Implement messaging system factory pattern
- Fix all transaction notification blockers
- Complete listener logic for all notification types
2026-01-11 11:17:08 +03:00
12 changed files with 199 additions and 65 deletions

View File

@ -62,11 +62,10 @@ export class TransactionService {
await this.accountService.decreaseAccountBalance(card.account.accountReference, total.toNumber()); await this.accountService.decreaseAccountBalance(card.account.accountReference, total.toNumber());
} }
// Emit event for notification system
const event: ITransactionCreatedEvent = { const event: ITransactionCreatedEvent = {
transaction, transaction,
card, // Pass card with all relations loaded card,
isTopUp: false, // Card transactions are spending isTopUp: false,
isChildSpending: card.customerType === CustomerType.CHILD, isChildSpending: card.customerType === CustomerType.CHILD,
timestamp: new Date(), timestamp: new Date(),
}; };
@ -88,21 +87,17 @@ export class TransactionService {
const transaction = await this.transactionRepository.createAccountTransaction(account, body); const transaction = await this.transactionRepository.createAccountTransaction(account, body);
await this.accountService.creditAccountBalance(account.accountReference, body.amount); await this.accountService.creditAccountBalance(account.accountReference, body.amount);
// Get card for notification system by account ID
// Account transactions are top-ups, so we get the first card associated with the account
const accountWithCards = await this.accountService.getAccountByAccountNumber(body.accountId); const accountWithCards = await this.accountService.getAccountByAccountNumber(body.accountId);
const card = accountWithCards.cards?.[0] const card = accountWithCards.cards?.[0]
? await this.cardService.getCardById(accountWithCards.cards[0].id) ? await this.cardService.getCardById(accountWithCards.cards[0].id)
: null; : null;
// Only emit event if card exists (we need card for user info)
if (card) { if (card) {
// Emit event for notification system
const event: ITransactionCreatedEvent = { const event: ITransactionCreatedEvent = {
transaction, transaction,
card, // Pass card with all relations loaded card,
isTopUp: true, // Account transactions are top-ups isTopUp: true,
isChildSpending: false, // Top-ups are typically not from children isChildSpending: false,
timestamp: new Date(), timestamp: new Date(),
}; };
this.eventEmitter.emit(NOTIFICATION_EVENTS.TRANSACTION_CREATED, event); this.eventEmitter.emit(NOTIFICATION_EVENTS.TRANSACTION_CREATED, event);
@ -115,12 +110,11 @@ export class TransactionService {
const card = await this.cardService.getCardById(cardId); const card = await this.cardService.getCardById(cardId);
const transaction = await this.transactionRepository.createInternalChildTransaction(card, amount); const transaction = await this.transactionRepository.createInternalChildTransaction(card, amount);
// Emit event for notification system
const event: ITransactionCreatedEvent = { const event: ITransactionCreatedEvent = {
transaction, transaction,
card, // Pass card with all relations loaded card,
isTopUp: true, // Internal child transaction is a top-up to child's card isTopUp: true,
isChildSpending: true, // Child's card is being topped up isChildSpending: true,
timestamp: new Date(), timestamp: new Date(),
}; };
this.eventEmitter.emit(NOTIFICATION_EVENTS.TRANSACTION_CREATED, event); this.eventEmitter.emit(NOTIFICATION_EVENTS.TRANSACTION_CREATED, event);

View File

@ -13,4 +13,29 @@ export enum NotificationScope {
// Transaction notifications - Spending // Transaction notifications - Spending
CHILD_SPENDING = 'CHILD_SPENDING', CHILD_SPENDING = 'CHILD_SPENDING',
PARENT_SPENDING_ALERT = 'PARENT_SPENDING_ALERT', PARENT_SPENDING_ALERT = 'PARENT_SPENDING_ALERT',
}
/**
* Critical notification scopes that require guaranteed delivery
* These will use RabbitMQ/Kafka instead of Redis PubSub when configured
*
* Add scopes here when you need guaranteed delivery for specific notification types
* Examples:
* - ACCOUNT_LOCKED
* - SUSPICIOUS_ACTIVITY
* - LARGE_TRANSACTION_ALERT
* - PAYMENT_FAILED
*/
export const CRITICAL_NOTIFICATION_SCOPES = new Set<NotificationScope>([
// Add critical scopes here as needed
// Example: NotificationScope.ACCOUNT_LOCKED,
]);
/**
* Check if a notification scope requires guaranteed delivery
* @param scope - Notification scope to check
* @returns true if the scope requires guaranteed delivery
*/
export function requiresGuaranteedDelivery(scope: NotificationScope): boolean {
return CRITICAL_NOTIFICATION_SCOPES.has(scope);
} }

View File

@ -1,2 +1,3 @@
export * from './notification-page-meta.interface'; export * from './notification-page-meta.interface';
export * from './notification-events.interface'; export * from './notification-events.interface';
export * from './messaging-system.interface';

View File

@ -0,0 +1,25 @@
/**
* Interface for messaging systems (Redis PubSub, RabbitMQ, Kafka, etc.)
* Allows switching between different messaging systems based on notification requirements
*/
export interface IMessagingSystem {
/**
* Publish a notification event
* @param channel - Channel/topic name
* @param payload - Notification payload
*/
publish(channel: string, payload: any): Promise<void>;
/**
* Subscribe to a channel
* @param channel - Channel/topic name
* @param handler - Message handler function
*/
subscribe(channel: string, handler: (message: any) => Promise<void>): Promise<void>;
/**
* Get system name (for logging)
*/
getName(): string;
}

View File

@ -45,16 +45,12 @@ export class TransactionNotificationListener {
`isTopUp: ${isTopUp}, isChildSpending: ${isChildSpending}` `isTopUp: ${isTopUp}, isChildSpending: ${isChildSpending}`
); );
// Notify the transaction owner (child or parent)
await this.notifyTransactionOwner(transaction, card, isTopUp, isChildSpending); await this.notifyTransactionOwner(transaction, card, isTopUp, isChildSpending);
// If child transaction, also notify parent
if (isChildSpending) { if (isChildSpending) {
if (isTopUp) { if (isTopUp) {
// Parent topped up child's card - send confirmation to parent
await this.notifyParentOfTopUp(transaction, card); await this.notifyParentOfTopUp(transaction, card);
} else { } else {
// Child spent money - send spending alert to parent
await this.notifyParentOfChildSpending(transaction, card); await this.notifyParentOfChildSpending(transaction, card);
} }
} }
@ -67,7 +63,6 @@ export class TransactionNotificationListener {
`Failed to process transaction notification: ${error?.message || 'Unknown error'}`, `Failed to process transaction notification: ${error?.message || 'Unknown error'}`,
error?.stack error?.stack
); );
// Don't throw - notification failures should not break the main flow
} }
} }
@ -82,27 +77,22 @@ export class TransactionNotificationListener {
isChildSpending: boolean isChildSpending: boolean
): Promise<void> { ): Promise<void> {
try { try {
// Extract user from card
const user = card?.customer?.user; const user = card?.customer?.user;
if (!user) { if (!user) {
this.logger.warn(`No user found for transaction ${transaction.id}, skipping notification`); this.logger.warn(`No user found for transaction ${transaction.id}, skipping notification`);
return; return;
} }
// Determine the scope based on transaction type
const scope = isTopUp const scope = isTopUp
? NotificationScope.CHILD_TOP_UP ? NotificationScope.CHILD_TOP_UP
: NotificationScope.CHILD_SPENDING; : NotificationScope.CHILD_SPENDING;
// Construct title
const title = isTopUp ? 'Card Topped Up' : 'Purchase Successful'; const title = isTopUp ? 'Card Topped Up' : 'Purchase Successful';
// Extract data
const amount = transaction.transactionAmount; const amount = transaction.transactionAmount;
const merchant = transaction.merchantName || 'merchant'; const merchant = transaction.merchantName || 'merchant';
const balance = card.account?.balance || 0; const balance = card.account?.balance || 0;
// Construct message
const message = isTopUp const message = isTopUp
? `Your card has been topped up with $${amount.toFixed(2)}` ? `Your card has been topped up with $${amount.toFixed(2)}`
: `You spent $${amount.toFixed(2)} at ${merchant}. Balance: $${balance.toFixed(2)}`; : `You spent $${amount.toFixed(2)} at ${merchant}. Balance: $${balance.toFixed(2)}`;
@ -111,7 +101,6 @@ export class TransactionNotificationListener {
`Notifying transaction owner (user ${user.id}) - Amount: $${amount}, Merchant: ${merchant}` `Notifying transaction owner (user ${user.id}) - Amount: $${amount}, Merchant: ${merchant}`
); );
// Send notification
await this.notificationFactory.send({ await this.notificationFactory.send({
userId: user.id, userId: user.id,
title, title,
@ -147,7 +136,6 @@ export class TransactionNotificationListener {
try { try {
this.logger.debug(`Checking for parent to notify about child spending`); this.logger.debug(`Checking for parent to notify about child spending`);
// Check if child has guardian
const customer = card?.customer; const customer = card?.customer;
const parentUser = customer?.junior?.guardian?.customer?.user; const parentUser = customer?.junior?.guardian?.customer?.user;
@ -156,7 +144,6 @@ export class TransactionNotificationListener {
return; return;
} }
// Get child info
const childUser = customer.user; const childUser = customer.user;
const childName = childUser?.firstName || 'Your child'; const childName = childUser?.firstName || 'Your child';
const amount = transaction.transactionAmount; const amount = transaction.transactionAmount;
@ -166,7 +153,6 @@ export class TransactionNotificationListener {
`Notifying parent (user ${parentUser.id}): ${childName} spent $${amount} at ${merchant}` `Notifying parent (user ${parentUser.id}): ${childName} spent $${amount} at ${merchant}`
); );
// Send notification to parent
await this.notificationFactory.send({ await this.notificationFactory.send({
userId: parentUser.id, userId: parentUser.id,
title: 'Child Spending Alert', title: 'Child Spending Alert',
@ -189,10 +175,9 @@ export class TransactionNotificationListener {
this.logger.log(`✅ Notified parent ${parentUser.id} about child spending`); this.logger.log(`✅ Notified parent ${parentUser.id} about child spending`);
} catch (error: any) { } catch (error: any) {
this.logger.error( this.logger.error(
`Failed to notify parent of child spending: ${error?.message || 'Unknown error'}`, `Failed to notify parent of child spending: ${ error?.message || 'Unknown error'}`,
error?.stack error?.stack
); );
// Don't throw - parent notification failure should not break child notification
} }
} }
@ -204,7 +189,6 @@ export class TransactionNotificationListener {
try { try {
this.logger.debug(`Checking for parent to notify about top-up`); this.logger.debug(`Checking for parent to notify about top-up`);
// Check if child has guardian
const customer = card?.customer; const customer = card?.customer;
const parentUser = customer?.junior?.guardian?.customer?.user; const parentUser = customer?.junior?.guardian?.customer?.user;
@ -213,7 +197,6 @@ export class TransactionNotificationListener {
return; return;
} }
// Get child info
const childUser = customer.user; const childUser = customer.user;
const childName = childUser?.firstName || 'Your child'; const childName = childUser?.firstName || 'Your child';
const amount = transaction.transactionAmount; const amount = transaction.transactionAmount;
@ -223,7 +206,6 @@ export class TransactionNotificationListener {
`Notifying parent (user ${parentUser.id}): Topped up ${childName}'s card with $${amount}` `Notifying parent (user ${parentUser.id}): Topped up ${childName}'s card with $${amount}`
); );
// Send notification to parent
await this.notificationFactory.send({ await this.notificationFactory.send({
userId: parentUser.id, userId: parentUser.id,
title: 'Top-Up Confirmation', title: 'Top-Up Confirmation',
@ -245,10 +227,9 @@ export class TransactionNotificationListener {
this.logger.log(`✅ Notified parent ${parentUser.id} about top-up`); this.logger.log(`✅ Notified parent ${parentUser.id} about top-up`);
} catch (error: any) { } catch (error: any) {
this.logger.error( this.logger.error(
`Failed to notify parent of top-up: ${error?.message || 'Unknown error'}`, `Failed to notify parent of top-up: ${ error?.message || 'Unknown error'}`,
error?.stack error?.stack
); );
// Don't throw - parent notification failure should not break child notification
} }
} }

View File

@ -11,6 +11,7 @@ import { Notification } from './entities';
import { NotificationCreatedListener, TransactionNotificationListener } from './listeners'; import { NotificationCreatedListener, TransactionNotificationListener } from './listeners';
import { NotificationsRepository } from './repositories'; import { NotificationsRepository } from './repositories';
import { FirebaseService, NotificationFactory, NotificationsService, TwilioService } from './services'; import { FirebaseService, NotificationFactory, NotificationsService, TwilioService } from './services';
import { MessagingSystemFactory, RedisPubSubMessagingService } from './services/messaging';
@Module({ @Module({
imports: [ imports: [
@ -34,6 +35,8 @@ import { FirebaseService, NotificationFactory, NotificationsService, TwilioServi
TwilioService, TwilioService,
NotificationCreatedListener, NotificationCreatedListener,
TransactionNotificationListener, TransactionNotificationListener,
RedisPubSubMessagingService,
MessagingSystemFactory,
], ],
exports: [NotificationsService, NotificationFactory, NotificationCreatedListener], exports: [NotificationsService, NotificationFactory, NotificationCreatedListener],
controllers: [NotificationsController], controllers: [NotificationsController],

View File

@ -0,0 +1,3 @@
export * from './redis-pubsub-messaging.service';
export * from './messaging-system-factory.service';

View File

@ -0,0 +1,58 @@
import { Injectable, Logger, Optional } from '@nestjs/common';
import { NotificationScope, requiresGuaranteedDelivery } from '../../enums/notification-scope.enum';
import { IMessagingSystem } from '../../interfaces/messaging-system.interface';
import { RedisPubSubMessagingService } from './redis-pubsub-messaging.service';
/**
* Messaging System Factory
*
* Determines which messaging system to use based on notification requirements.
*
* - Regular notifications → Redis PubSub (fast, 2-5ms)
* - Critical notifications → RabbitMQ/Kafka (guaranteed delivery, 20-50ms)
*
* Usage:
* ```typescript
* const system = factory.getMessagingSystem(NotificationScope.CHILD_SPENDING);
* await system.publish('NOTIFICATION_CREATED', payload);
* ```
*/
@Injectable()
export class MessagingSystemFactory {
private readonly logger = new Logger(MessagingSystemFactory.name);
constructor(
private readonly redisPubSubService: RedisPubSubMessagingService,
) {}
/**
* Get the appropriate messaging system based on notification scope
*
* @param scope - Notification scope
* @returns Messaging system to use
*/
getMessagingSystem(scope: NotificationScope): IMessagingSystem {
const needsGuaranteedDelivery = requiresGuaranteedDelivery(scope);
if (needsGuaranteedDelivery) {
this.logger.warn(
`[Factory] Critical notification ${scope} requires guaranteed delivery, ` +
`but RabbitMQ not configured. Falling back to Redis PubSub.`
);
return this.redisPubSubService;
} else {
this.logger.debug(`[Factory] Using Redis PubSub for notification: ${scope}`);
return this.redisPubSubService;
}
}
/**
* Get default messaging system (Redis PubSub)
*
* @returns Default messaging system
*/
getDefaultMessagingSystem(): IMessagingSystem {
return this.redisPubSubService;
}
}

View File

@ -0,0 +1,55 @@
import { Inject, Injectable, Logger } from '@nestjs/common';
import { RedisClientType } from '@keyv/redis';
import { IMessagingSystem } from '../../interfaces/messaging-system.interface';
/**
* Redis PubSub Messaging System Implementation
*
* Fast, real-time messaging for regular notifications.
* Uses Redis PubSub for 2-5ms latency.
*
* Note: Messages are not persisted (fire-and-forget).
* Suitable for notifications that are already saved in PostgreSQL.
*/
@Injectable()
export class RedisPubSubMessagingService implements IMessagingSystem {
private readonly logger = new Logger(RedisPubSubMessagingService.name);
constructor(
@Inject('REDIS_PUBLISHER') private readonly publisher: RedisClientType,
@Inject('REDIS_SUBSCRIBER') private readonly subscriber: RedisClientType,
) {}
getName(): string {
return 'Redis PubSub';
}
async publish(channel: string, payload: any): Promise<void> {
try {
const message = JSON.stringify(payload);
const subscriberCount = await this.publisher.publish(channel, message);
this.logger.debug(
`[Redis PubSub] Published to ${channel}, ${subscriberCount} subscriber(s) received`
);
} catch (error: any) {
this.logger.error(`[Redis PubSub] Failed to publish to ${channel}: ${error?.message}`);
throw error;
}
}
async subscribe(channel: string, handler: (message: any) => Promise<void>): Promise<void> {
await this.subscriber.subscribe(channel, async (message) => {
try {
const data = JSON.parse(message);
await handler(data);
} catch (error: any) {
this.logger.error(
`[Redis PubSub] Failed to process message from ${channel}: ${error?.message}`
);
}
});
this.logger.log(`[Redis PubSub] Subscribed to channel: ${channel}`);
}
}

View File

@ -89,7 +89,6 @@ export class NotificationFactory {
try { try {
this.logger.log(`Sending notification to user ${payload.userId} - ${payload.title}`); this.logger.log(`Sending notification to user ${payload.userId} - ${payload.title}`);
// Use provided preferences or default to push-only
const preferences = payload.preferences || { const preferences = payload.preferences || {
isPushEnabled: true, isPushEnabled: true,
isEmailEnabled: false, isEmailEnabled: false,
@ -98,8 +97,6 @@ export class NotificationFactory {
const promises: Promise<any>[] = []; const promises: Promise<any>[] = [];
// Route to enabled channels based on preferences
// Currently only PUSH is implemented (extensible for EMAIL, SMS later)
if (preferences.isPushEnabled) { if (preferences.isPushEnabled) {
this.logger.debug(`Routing to PUSH channel for user ${payload.userId}`); this.logger.debug(`Routing to PUSH channel for user ${payload.userId}`);
promises.push( promises.push(
@ -107,23 +104,6 @@ export class NotificationFactory {
); );
} }
// Future: Add EMAIL channel
// if (preferences.isEmailEnabled) {
// this.logger.debug(`Routing to EMAIL channel for user ${payload.userId}`);
// promises.push(
// this.sendToChannel(payload, NotificationChannel.EMAIL)
// );
// }
// Future: Add SMS channel
// if (preferences.isSmsEnabled) {
// this.logger.debug(`Routing to SMS channel for user ${payload.userId}`);
// promises.push(
// this.sendToChannel(payload, NotificationChannel.SMS)
// );
// }
// Send all notificaetions in parallel
await Promise.all(promises); await Promise.all(promises);
this.logger.log( this.logger.log(

View File

@ -8,6 +8,7 @@ import { SendEmailRequestDto } from '../dtos/request';
import { Notification } from '../entities'; import { Notification } from '../entities';
import { EventType, NotificationChannel, NotificationScope } from '../enums'; import { EventType, NotificationChannel, NotificationScope } from '../enums';
import { NotificationsRepository } from '../repositories'; import { NotificationsRepository } from '../repositories';
import { MessagingSystemFactory } from './messaging/messaging-system-factory.service';
@Injectable() @Injectable()
export class NotificationsService { export class NotificationsService {
@ -17,6 +18,7 @@ export class NotificationsService {
@Inject(forwardRef(() => RedisPubSubService)) @Inject(forwardRef(() => RedisPubSubService))
private readonly redisPubSubService: RedisPubSubService, private readonly redisPubSubService: RedisPubSubService,
private readonly messagingSystemFactory: MessagingSystemFactory,
) {} ) {}
async getNotifications(userId: string, pageOptionsDto: PageOptionsRequestDto) { async getNotifications(userId: string, pageOptionsDto: PageOptionsRequestDto) {
@ -35,15 +37,20 @@ export class NotificationsService {
this.logger.log(`Creating notification for user ${notification.userId}`); this.logger.log(`Creating notification for user ${notification.userId}`);
const savedNotification = await this.notificationRepository.createNotification(notification); const savedNotification = await this.notificationRepository.createNotification(notification);
// Publish to Redis PubSub for delivery (Firebase, Email, SMS) const scope = notification.scope || NotificationScope.USER_REGISTERED;
this.logger.log(`Publishing ${EventType.NOTIFICATION_CREATED} event to Redis`); const messagingSystem = this.messagingSystemFactory.getMessagingSystem(scope);
this.redisPubSubService.publishEvent(EventType.NOTIFICATION_CREATED, {
this.logger.log(
`Publishing ${EventType.NOTIFICATION_CREATED} event to ${messagingSystem.getName()}`
);
messagingSystem.publish(EventType.NOTIFICATION_CREATED, {
...savedNotification, ...savedNotification,
data: notification.data || savedNotification.data, data: notification.data || savedNotification.data,
}).catch((error) => { }).catch((error) => {
// Log error but don't throw - notification is saved in DB
this.logger.error( this.logger.error(
`Failed to publish notification ${savedNotification.id} to Redis: ${error?.message || 'Unknown error'}`, `Failed to publish notification ${savedNotification.id} to ${messagingSystem.getName()}: ` +
`${error?.message || 'Unknown error'}`,
error?.stack error?.stack
); );
}); });
@ -58,27 +65,25 @@ export class NotificationsService {
async sendEmailAsync(data: SendEmailRequestDto) { async sendEmailAsync(data: SendEmailRequestDto) {
this.logger.log(`Creating email notification for ${data.to}`); this.logger.log(`Creating email notification for ${data.to}`);
// createNotification now automatically publishes to Redis
await this.createNotification({ await this.createNotification({
recipient: data.to, recipient: data.to,
title: data.subject, title: data.subject,
message: '', message: '',
scope: NotificationScope.USER_INVITED, scope: NotificationScope.USER_INVITED,
channel: NotificationChannel.EMAIL, channel: NotificationChannel.EMAIL,
data: data.data, // Pass data in notification object data: data.data,
}); });
} }
async sendOtpNotification(sendOtpRequest: ISendOtp, otp: string) { async sendOtpNotification(sendOtpRequest: ISendOtp, otp: string) {
this.logger.log(`Sending OTP to ${sendOtpRequest.recipient}`); this.logger.log(`Sending OTP to ${sendOtpRequest.recipient}`);
// createNotification now automatically publishes to Redis
return this.createNotification({ return this.createNotification({
recipient: sendOtpRequest.recipient, recipient: sendOtpRequest.recipient,
title: OTP_TITLE, title: OTP_TITLE,
message: OTP_BODY.replace('{otp}', otp), message: OTP_BODY.replace('{otp}', otp),
scope: NotificationScope.OTP, scope: NotificationScope.OTP,
channel: sendOtpRequest.otpType === OtpType.EMAIL ? NotificationChannel.EMAIL : NotificationChannel.SMS, channel: sendOtpRequest.otpType === OtpType.EMAIL ? NotificationChannel.EMAIL : NotificationChannel.SMS,
data: { otp }, // Pass data in notification object data: { otp },
}); });
} }
} }

View File

@ -32,7 +32,11 @@ export class RedisModule {
}, },
RedisPubSubService, RedisPubSubService,
], ],
exports: [RedisPubSubService], exports: [
RedisPubSubService,
'REDIS_PUBLISHER',
'REDIS_SUBSCRIBER',
],
imports: [NotificationModule], imports: [NotificationModule],
}; };
} }