mirror of
https://github.com/HamzaSha1/zod-backend.git
synced 2026-03-10 18:41:46 +00:00
Compare commits
2 Commits
2c8de913f8
...
63b0a42eca
| Author | SHA1 | Date | |
|---|---|---|---|
| 63b0a42eca | |||
| b1cda5e7dc |
@ -62,11 +62,10 @@ export class TransactionService {
|
||||
await this.accountService.decreaseAccountBalance(card.account.accountReference, total.toNumber());
|
||||
}
|
||||
|
||||
// Emit event for notification system
|
||||
const event: ITransactionCreatedEvent = {
|
||||
transaction,
|
||||
card, // Pass card with all relations loaded
|
||||
isTopUp: false, // Card transactions are spending
|
||||
card,
|
||||
isTopUp: false,
|
||||
isChildSpending: card.customerType === CustomerType.CHILD,
|
||||
timestamp: new Date(),
|
||||
};
|
||||
@ -88,21 +87,17 @@ export class TransactionService {
|
||||
const transaction = await this.transactionRepository.createAccountTransaction(account, body);
|
||||
await this.accountService.creditAccountBalance(account.accountReference, body.amount);
|
||||
|
||||
// Get card for notification system by account ID
|
||||
// Account transactions are top-ups, so we get the first card associated with the account
|
||||
const accountWithCards = await this.accountService.getAccountByAccountNumber(body.accountId);
|
||||
const card = accountWithCards.cards?.[0]
|
||||
? await this.cardService.getCardById(accountWithCards.cards[0].id)
|
||||
: null;
|
||||
|
||||
// Only emit event if card exists (we need card for user info)
|
||||
if (card) {
|
||||
// Emit event for notification system
|
||||
const event: ITransactionCreatedEvent = {
|
||||
transaction,
|
||||
card, // Pass card with all relations loaded
|
||||
isTopUp: true, // Account transactions are top-ups
|
||||
isChildSpending: false, // Top-ups are typically not from children
|
||||
card,
|
||||
isTopUp: true,
|
||||
isChildSpending: false,
|
||||
timestamp: new Date(),
|
||||
};
|
||||
this.eventEmitter.emit(NOTIFICATION_EVENTS.TRANSACTION_CREATED, event);
|
||||
@ -115,12 +110,11 @@ export class TransactionService {
|
||||
const card = await this.cardService.getCardById(cardId);
|
||||
const transaction = await this.transactionRepository.createInternalChildTransaction(card, amount);
|
||||
|
||||
// Emit event for notification system
|
||||
const event: ITransactionCreatedEvent = {
|
||||
transaction,
|
||||
card, // Pass card with all relations loaded
|
||||
isTopUp: true, // Internal child transaction is a top-up to child's card
|
||||
isChildSpending: true, // Child's card is being topped up
|
||||
card,
|
||||
isTopUp: true,
|
||||
isChildSpending: true,
|
||||
timestamp: new Date(),
|
||||
};
|
||||
this.eventEmitter.emit(NOTIFICATION_EVENTS.TRANSACTION_CREATED, event);
|
||||
|
||||
@ -14,3 +14,28 @@ export enum NotificationScope {
|
||||
CHILD_SPENDING = 'CHILD_SPENDING',
|
||||
PARENT_SPENDING_ALERT = 'PARENT_SPENDING_ALERT',
|
||||
}
|
||||
|
||||
/**
|
||||
* Critical notification scopes that require guaranteed delivery
|
||||
* These will use RabbitMQ/Kafka instead of Redis PubSub when configured
|
||||
*
|
||||
* Add scopes here when you need guaranteed delivery for specific notification types
|
||||
* Examples:
|
||||
* - ACCOUNT_LOCKED
|
||||
* - SUSPICIOUS_ACTIVITY
|
||||
* - LARGE_TRANSACTION_ALERT
|
||||
* - PAYMENT_FAILED
|
||||
*/
|
||||
export const CRITICAL_NOTIFICATION_SCOPES = new Set<NotificationScope>([
|
||||
// Add critical scopes here as needed
|
||||
// Example: NotificationScope.ACCOUNT_LOCKED,
|
||||
]);
|
||||
|
||||
/**
|
||||
* Check if a notification scope requires guaranteed delivery
|
||||
* @param scope - Notification scope to check
|
||||
* @returns true if the scope requires guaranteed delivery
|
||||
*/
|
||||
export function requiresGuaranteedDelivery(scope: NotificationScope): boolean {
|
||||
return CRITICAL_NOTIFICATION_SCOPES.has(scope);
|
||||
}
|
||||
@ -1,2 +1,3 @@
|
||||
export * from './notification-page-meta.interface';
|
||||
export * from './notification-events.interface';
|
||||
export * from './messaging-system.interface';
|
||||
@ -0,0 +1,25 @@
|
||||
/**
|
||||
* Interface for messaging systems (Redis PubSub, RabbitMQ, Kafka, etc.)
|
||||
* Allows switching between different messaging systems based on notification requirements
|
||||
*/
|
||||
export interface IMessagingSystem {
|
||||
/**
|
||||
* Publish a notification event
|
||||
* @param channel - Channel/topic name
|
||||
* @param payload - Notification payload
|
||||
*/
|
||||
publish(channel: string, payload: any): Promise<void>;
|
||||
|
||||
/**
|
||||
* Subscribe to a channel
|
||||
* @param channel - Channel/topic name
|
||||
* @param handler - Message handler function
|
||||
*/
|
||||
subscribe(channel: string, handler: (message: any) => Promise<void>): Promise<void>;
|
||||
|
||||
/**
|
||||
* Get system name (for logging)
|
||||
*/
|
||||
getName(): string;
|
||||
}
|
||||
|
||||
@ -45,16 +45,12 @@ export class TransactionNotificationListener {
|
||||
`isTopUp: ${isTopUp}, isChildSpending: ${isChildSpending}`
|
||||
);
|
||||
|
||||
// Notify the transaction owner (child or parent)
|
||||
await this.notifyTransactionOwner(transaction, card, isTopUp, isChildSpending);
|
||||
|
||||
// If child transaction, also notify parent
|
||||
if (isChildSpending) {
|
||||
if (isTopUp) {
|
||||
// Parent topped up child's card - send confirmation to parent
|
||||
await this.notifyParentOfTopUp(transaction, card);
|
||||
} else {
|
||||
// Child spent money - send spending alert to parent
|
||||
await this.notifyParentOfChildSpending(transaction, card);
|
||||
}
|
||||
}
|
||||
@ -67,7 +63,6 @@ export class TransactionNotificationListener {
|
||||
`Failed to process transaction notification: ${error?.message || 'Unknown error'}`,
|
||||
error?.stack
|
||||
);
|
||||
// Don't throw - notification failures should not break the main flow
|
||||
}
|
||||
}
|
||||
|
||||
@ -82,27 +77,22 @@ export class TransactionNotificationListener {
|
||||
isChildSpending: boolean
|
||||
): Promise<void> {
|
||||
try {
|
||||
// Extract user from card
|
||||
const user = card?.customer?.user;
|
||||
if (!user) {
|
||||
this.logger.warn(`No user found for transaction ${transaction.id}, skipping notification`);
|
||||
return;
|
||||
}
|
||||
|
||||
// Determine the scope based on transaction type
|
||||
const scope = isTopUp
|
||||
? NotificationScope.CHILD_TOP_UP
|
||||
: NotificationScope.CHILD_SPENDING;
|
||||
|
||||
// Construct title
|
||||
const title = isTopUp ? 'Card Topped Up' : 'Purchase Successful';
|
||||
|
||||
// Extract data
|
||||
const amount = transaction.transactionAmount;
|
||||
const merchant = transaction.merchantName || 'merchant';
|
||||
const balance = card.account?.balance || 0;
|
||||
|
||||
// Construct message
|
||||
const message = isTopUp
|
||||
? `Your card has been topped up with $${amount.toFixed(2)}`
|
||||
: `You spent $${amount.toFixed(2)} at ${merchant}. Balance: $${balance.toFixed(2)}`;
|
||||
@ -111,7 +101,6 @@ export class TransactionNotificationListener {
|
||||
`Notifying transaction owner (user ${user.id}) - Amount: $${amount}, Merchant: ${merchant}`
|
||||
);
|
||||
|
||||
// Send notification
|
||||
await this.notificationFactory.send({
|
||||
userId: user.id,
|
||||
title,
|
||||
@ -147,7 +136,6 @@ export class TransactionNotificationListener {
|
||||
try {
|
||||
this.logger.debug(`Checking for parent to notify about child spending`);
|
||||
|
||||
// Check if child has guardian
|
||||
const customer = card?.customer;
|
||||
const parentUser = customer?.junior?.guardian?.customer?.user;
|
||||
|
||||
@ -156,7 +144,6 @@ export class TransactionNotificationListener {
|
||||
return;
|
||||
}
|
||||
|
||||
// Get child info
|
||||
const childUser = customer.user;
|
||||
const childName = childUser?.firstName || 'Your child';
|
||||
const amount = transaction.transactionAmount;
|
||||
@ -166,7 +153,6 @@ export class TransactionNotificationListener {
|
||||
`Notifying parent (user ${parentUser.id}): ${childName} spent $${amount} at ${merchant}`
|
||||
);
|
||||
|
||||
// Send notification to parent
|
||||
await this.notificationFactory.send({
|
||||
userId: parentUser.id,
|
||||
title: 'Child Spending Alert',
|
||||
@ -189,10 +175,9 @@ export class TransactionNotificationListener {
|
||||
this.logger.log(`✅ Notified parent ${parentUser.id} about child spending`);
|
||||
} catch (error: any) {
|
||||
this.logger.error(
|
||||
`Failed to notify parent of child spending: ${error?.message || 'Unknown error'}`,
|
||||
`Failed to notify parent of child spending: ${ error?.message || 'Unknown error'}`,
|
||||
error?.stack
|
||||
);
|
||||
// Don't throw - parent notification failure should not break child notification
|
||||
}
|
||||
}
|
||||
|
||||
@ -204,7 +189,6 @@ export class TransactionNotificationListener {
|
||||
try {
|
||||
this.logger.debug(`Checking for parent to notify about top-up`);
|
||||
|
||||
// Check if child has guardian
|
||||
const customer = card?.customer;
|
||||
const parentUser = customer?.junior?.guardian?.customer?.user;
|
||||
|
||||
@ -213,7 +197,6 @@ export class TransactionNotificationListener {
|
||||
return;
|
||||
}
|
||||
|
||||
// Get child info
|
||||
const childUser = customer.user;
|
||||
const childName = childUser?.firstName || 'Your child';
|
||||
const amount = transaction.transactionAmount;
|
||||
@ -223,7 +206,6 @@ export class TransactionNotificationListener {
|
||||
`Notifying parent (user ${parentUser.id}): Topped up ${childName}'s card with $${amount}`
|
||||
);
|
||||
|
||||
// Send notification to parent
|
||||
await this.notificationFactory.send({
|
||||
userId: parentUser.id,
|
||||
title: 'Top-Up Confirmation',
|
||||
@ -245,10 +227,9 @@ export class TransactionNotificationListener {
|
||||
this.logger.log(`✅ Notified parent ${parentUser.id} about top-up`);
|
||||
} catch (error: any) {
|
||||
this.logger.error(
|
||||
`Failed to notify parent of top-up: ${error?.message || 'Unknown error'}`,
|
||||
`Failed to notify parent of top-up: ${ error?.message || 'Unknown error'}`,
|
||||
error?.stack
|
||||
);
|
||||
// Don't throw - parent notification failure should not break child notification
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -11,6 +11,7 @@ import { Notification } from './entities';
|
||||
import { NotificationCreatedListener, TransactionNotificationListener } from './listeners';
|
||||
import { NotificationsRepository } from './repositories';
|
||||
import { FirebaseService, NotificationFactory, NotificationsService, TwilioService } from './services';
|
||||
import { MessagingSystemFactory, RedisPubSubMessagingService } from './services/messaging';
|
||||
|
||||
@Module({
|
||||
imports: [
|
||||
@ -34,6 +35,8 @@ import { FirebaseService, NotificationFactory, NotificationsService, TwilioServi
|
||||
TwilioService,
|
||||
NotificationCreatedListener,
|
||||
TransactionNotificationListener,
|
||||
RedisPubSubMessagingService,
|
||||
MessagingSystemFactory,
|
||||
],
|
||||
exports: [NotificationsService, NotificationFactory, NotificationCreatedListener],
|
||||
controllers: [NotificationsController],
|
||||
|
||||
@ -0,0 +1,3 @@
|
||||
export * from './redis-pubsub-messaging.service';
|
||||
export * from './messaging-system-factory.service';
|
||||
|
||||
@ -0,0 +1,58 @@
|
||||
import { Injectable, Logger, Optional } from '@nestjs/common';
|
||||
import { NotificationScope, requiresGuaranteedDelivery } from '../../enums/notification-scope.enum';
|
||||
import { IMessagingSystem } from '../../interfaces/messaging-system.interface';
|
||||
import { RedisPubSubMessagingService } from './redis-pubsub-messaging.service';
|
||||
|
||||
/**
|
||||
* Messaging System Factory
|
||||
*
|
||||
* Determines which messaging system to use based on notification requirements.
|
||||
*
|
||||
* - Regular notifications → Redis PubSub (fast, 2-5ms)
|
||||
* - Critical notifications → RabbitMQ/Kafka (guaranteed delivery, 20-50ms)
|
||||
*
|
||||
* Usage:
|
||||
* ```typescript
|
||||
* const system = factory.getMessagingSystem(NotificationScope.CHILD_SPENDING);
|
||||
* await system.publish('NOTIFICATION_CREATED', payload);
|
||||
* ```
|
||||
*/
|
||||
@Injectable()
|
||||
export class MessagingSystemFactory {
|
||||
private readonly logger = new Logger(MessagingSystemFactory.name);
|
||||
|
||||
constructor(
|
||||
private readonly redisPubSubService: RedisPubSubMessagingService,
|
||||
) {}
|
||||
|
||||
/**
|
||||
* Get the appropriate messaging system based on notification scope
|
||||
*
|
||||
* @param scope - Notification scope
|
||||
* @returns Messaging system to use
|
||||
*/
|
||||
getMessagingSystem(scope: NotificationScope): IMessagingSystem {
|
||||
const needsGuaranteedDelivery = requiresGuaranteedDelivery(scope);
|
||||
|
||||
if (needsGuaranteedDelivery) {
|
||||
this.logger.warn(
|
||||
`[Factory] Critical notification ${scope} requires guaranteed delivery, ` +
|
||||
`but RabbitMQ not configured. Falling back to Redis PubSub.`
|
||||
);
|
||||
return this.redisPubSubService;
|
||||
} else {
|
||||
this.logger.debug(`[Factory] Using Redis PubSub for notification: ${scope}`);
|
||||
return this.redisPubSubService;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get default messaging system (Redis PubSub)
|
||||
*
|
||||
* @returns Default messaging system
|
||||
*/
|
||||
getDefaultMessagingSystem(): IMessagingSystem {
|
||||
return this.redisPubSubService;
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,55 @@
|
||||
import { Inject, Injectable, Logger } from '@nestjs/common';
|
||||
import { RedisClientType } from '@keyv/redis';
|
||||
import { IMessagingSystem } from '../../interfaces/messaging-system.interface';
|
||||
|
||||
/**
|
||||
* Redis PubSub Messaging System Implementation
|
||||
*
|
||||
* Fast, real-time messaging for regular notifications.
|
||||
* Uses Redis PubSub for 2-5ms latency.
|
||||
*
|
||||
* Note: Messages are not persisted (fire-and-forget).
|
||||
* Suitable for notifications that are already saved in PostgreSQL.
|
||||
*/
|
||||
@Injectable()
|
||||
export class RedisPubSubMessagingService implements IMessagingSystem {
|
||||
private readonly logger = new Logger(RedisPubSubMessagingService.name);
|
||||
|
||||
constructor(
|
||||
@Inject('REDIS_PUBLISHER') private readonly publisher: RedisClientType,
|
||||
@Inject('REDIS_SUBSCRIBER') private readonly subscriber: RedisClientType,
|
||||
) {}
|
||||
|
||||
getName(): string {
|
||||
return 'Redis PubSub';
|
||||
}
|
||||
|
||||
async publish(channel: string, payload: any): Promise<void> {
|
||||
try {
|
||||
const message = JSON.stringify(payload);
|
||||
const subscriberCount = await this.publisher.publish(channel, message);
|
||||
|
||||
this.logger.debug(
|
||||
`[Redis PubSub] Published to ${channel}, ${subscriberCount} subscriber(s) received`
|
||||
);
|
||||
} catch (error: any) {
|
||||
this.logger.error(`[Redis PubSub] Failed to publish to ${channel}: ${error?.message}`);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async subscribe(channel: string, handler: (message: any) => Promise<void>): Promise<void> {
|
||||
await this.subscriber.subscribe(channel, async (message) => {
|
||||
try {
|
||||
const data = JSON.parse(message);
|
||||
await handler(data);
|
||||
} catch (error: any) {
|
||||
this.logger.error(
|
||||
`[Redis PubSub] Failed to process message from ${channel}: ${error?.message}`
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
this.logger.log(`[Redis PubSub] Subscribed to channel: ${channel}`);
|
||||
}
|
||||
}
|
||||
@ -89,7 +89,6 @@ export class NotificationFactory {
|
||||
try {
|
||||
this.logger.log(`Sending notification to user ${payload.userId} - ${payload.title}`);
|
||||
|
||||
// Use provided preferences or default to push-only
|
||||
const preferences = payload.preferences || {
|
||||
isPushEnabled: true,
|
||||
isEmailEnabled: false,
|
||||
@ -98,8 +97,6 @@ export class NotificationFactory {
|
||||
|
||||
const promises: Promise<any>[] = [];
|
||||
|
||||
// Route to enabled channels based on preferences
|
||||
// Currently only PUSH is implemented (extensible for EMAIL, SMS later)
|
||||
if (preferences.isPushEnabled) {
|
||||
this.logger.debug(`Routing to PUSH channel for user ${payload.userId}`);
|
||||
promises.push(
|
||||
@ -107,23 +104,6 @@ export class NotificationFactory {
|
||||
);
|
||||
}
|
||||
|
||||
// Future: Add EMAIL channel
|
||||
// if (preferences.isEmailEnabled) {
|
||||
// this.logger.debug(`Routing to EMAIL channel for user ${payload.userId}`);
|
||||
// promises.push(
|
||||
// this.sendToChannel(payload, NotificationChannel.EMAIL)
|
||||
// );
|
||||
// }
|
||||
|
||||
// Future: Add SMS channel
|
||||
// if (preferences.isSmsEnabled) {
|
||||
// this.logger.debug(`Routing to SMS channel for user ${payload.userId}`);
|
||||
// promises.push(
|
||||
// this.sendToChannel(payload, NotificationChannel.SMS)
|
||||
// );
|
||||
// }
|
||||
|
||||
// Send all notificaetions in parallel
|
||||
await Promise.all(promises);
|
||||
|
||||
this.logger.log(
|
||||
|
||||
@ -8,6 +8,7 @@ import { SendEmailRequestDto } from '../dtos/request';
|
||||
import { Notification } from '../entities';
|
||||
import { EventType, NotificationChannel, NotificationScope } from '../enums';
|
||||
import { NotificationsRepository } from '../repositories';
|
||||
import { MessagingSystemFactory } from './messaging/messaging-system-factory.service';
|
||||
|
||||
@Injectable()
|
||||
export class NotificationsService {
|
||||
@ -17,6 +18,7 @@ export class NotificationsService {
|
||||
|
||||
@Inject(forwardRef(() => RedisPubSubService))
|
||||
private readonly redisPubSubService: RedisPubSubService,
|
||||
private readonly messagingSystemFactory: MessagingSystemFactory,
|
||||
) {}
|
||||
|
||||
async getNotifications(userId: string, pageOptionsDto: PageOptionsRequestDto) {
|
||||
@ -35,15 +37,20 @@ export class NotificationsService {
|
||||
this.logger.log(`Creating notification for user ${notification.userId}`);
|
||||
const savedNotification = await this.notificationRepository.createNotification(notification);
|
||||
|
||||
// Publish to Redis PubSub for delivery (Firebase, Email, SMS)
|
||||
this.logger.log(`Publishing ${EventType.NOTIFICATION_CREATED} event to Redis`);
|
||||
this.redisPubSubService.publishEvent(EventType.NOTIFICATION_CREATED, {
|
||||
const scope = notification.scope || NotificationScope.USER_REGISTERED;
|
||||
const messagingSystem = this.messagingSystemFactory.getMessagingSystem(scope);
|
||||
|
||||
this.logger.log(
|
||||
`Publishing ${EventType.NOTIFICATION_CREATED} event to ${messagingSystem.getName()}`
|
||||
);
|
||||
|
||||
messagingSystem.publish(EventType.NOTIFICATION_CREATED, {
|
||||
...savedNotification,
|
||||
data: notification.data || savedNotification.data,
|
||||
}).catch((error) => {
|
||||
// Log error but don't throw - notification is saved in DB
|
||||
this.logger.error(
|
||||
`Failed to publish notification ${savedNotification.id} to Redis: ${error?.message || 'Unknown error'}`,
|
||||
`Failed to publish notification ${savedNotification.id} to ${messagingSystem.getName()}: ` +
|
||||
`${error?.message || 'Unknown error'}`,
|
||||
error?.stack
|
||||
);
|
||||
});
|
||||
@ -58,27 +65,25 @@ export class NotificationsService {
|
||||
|
||||
async sendEmailAsync(data: SendEmailRequestDto) {
|
||||
this.logger.log(`Creating email notification for ${data.to}`);
|
||||
// createNotification now automatically publishes to Redis
|
||||
await this.createNotification({
|
||||
recipient: data.to,
|
||||
title: data.subject,
|
||||
message: '',
|
||||
scope: NotificationScope.USER_INVITED,
|
||||
channel: NotificationChannel.EMAIL,
|
||||
data: data.data, // Pass data in notification object
|
||||
data: data.data,
|
||||
});
|
||||
}
|
||||
|
||||
async sendOtpNotification(sendOtpRequest: ISendOtp, otp: string) {
|
||||
this.logger.log(`Sending OTP to ${sendOtpRequest.recipient}`);
|
||||
// createNotification now automatically publishes to Redis
|
||||
return this.createNotification({
|
||||
recipient: sendOtpRequest.recipient,
|
||||
title: OTP_TITLE,
|
||||
message: OTP_BODY.replace('{otp}', otp),
|
||||
scope: NotificationScope.OTP,
|
||||
channel: sendOtpRequest.otpType === OtpType.EMAIL ? NotificationChannel.EMAIL : NotificationChannel.SMS,
|
||||
data: { otp }, // Pass data in notification object
|
||||
data: { otp },
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@ -32,7 +32,11 @@ export class RedisModule {
|
||||
},
|
||||
RedisPubSubService,
|
||||
],
|
||||
exports: [RedisPubSubService],
|
||||
exports: [
|
||||
RedisPubSubService,
|
||||
'REDIS_PUBLISHER',
|
||||
'REDIS_SUBSCRIBER',
|
||||
],
|
||||
imports: [NotificationModule],
|
||||
};
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user