mirror of
https://github.com/HamzaSha1/zod-backend.git
synced 2026-03-10 18:41:46 +00:00
Compare commits
5 Commits
98f6aaf01f
...
11b2b25adc
| Author | SHA1 | Date | |
|---|---|---|---|
| 11b2b25adc | |||
| 63b0a42eca | |||
| ed8cf4b4f8 | |||
| b1cda5e7dc | |||
| 2c8de913f8 |
@ -62,11 +62,10 @@ export class TransactionService {
|
|||||||
await this.accountService.decreaseAccountBalance(card.account.accountReference, total.toNumber());
|
await this.accountService.decreaseAccountBalance(card.account.accountReference, total.toNumber());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Emit event for notification system
|
|
||||||
const event: ITransactionCreatedEvent = {
|
const event: ITransactionCreatedEvent = {
|
||||||
transaction,
|
transaction,
|
||||||
card, // Pass card with all relations loaded
|
card,
|
||||||
isTopUp: false, // Card transactions are spending
|
isTopUp: false,
|
||||||
isChildSpending: card.customerType === CustomerType.CHILD,
|
isChildSpending: card.customerType === CustomerType.CHILD,
|
||||||
timestamp: new Date(),
|
timestamp: new Date(),
|
||||||
};
|
};
|
||||||
@ -88,21 +87,17 @@ export class TransactionService {
|
|||||||
const transaction = await this.transactionRepository.createAccountTransaction(account, body);
|
const transaction = await this.transactionRepository.createAccountTransaction(account, body);
|
||||||
await this.accountService.creditAccountBalance(account.accountReference, body.amount);
|
await this.accountService.creditAccountBalance(account.accountReference, body.amount);
|
||||||
|
|
||||||
// Get card for notification system by account ID
|
|
||||||
// Account transactions are top-ups, so we get the first card associated with the account
|
|
||||||
const accountWithCards = await this.accountService.getAccountByAccountNumber(body.accountId);
|
const accountWithCards = await this.accountService.getAccountByAccountNumber(body.accountId);
|
||||||
const card = accountWithCards.cards?.[0]
|
const card = accountWithCards.cards?.[0]
|
||||||
? await this.cardService.getCardById(accountWithCards.cards[0].id)
|
? await this.cardService.getCardById(accountWithCards.cards[0].id)
|
||||||
: null;
|
: null;
|
||||||
|
|
||||||
// Only emit event if card exists (we need card for user info)
|
|
||||||
if (card) {
|
if (card) {
|
||||||
// Emit event for notification system
|
|
||||||
const event: ITransactionCreatedEvent = {
|
const event: ITransactionCreatedEvent = {
|
||||||
transaction,
|
transaction,
|
||||||
card, // Pass card with all relations loaded
|
card,
|
||||||
isTopUp: true, // Account transactions are top-ups
|
isTopUp: true,
|
||||||
isChildSpending: false, // Top-ups are typically not from children
|
isChildSpending: false,
|
||||||
timestamp: new Date(),
|
timestamp: new Date(),
|
||||||
};
|
};
|
||||||
this.eventEmitter.emit(NOTIFICATION_EVENTS.TRANSACTION_CREATED, event);
|
this.eventEmitter.emit(NOTIFICATION_EVENTS.TRANSACTION_CREATED, event);
|
||||||
@ -115,12 +110,11 @@ export class TransactionService {
|
|||||||
const card = await this.cardService.getCardById(cardId);
|
const card = await this.cardService.getCardById(cardId);
|
||||||
const transaction = await this.transactionRepository.createInternalChildTransaction(card, amount);
|
const transaction = await this.transactionRepository.createInternalChildTransaction(card, amount);
|
||||||
|
|
||||||
// Emit event for notification system
|
|
||||||
const event: ITransactionCreatedEvent = {
|
const event: ITransactionCreatedEvent = {
|
||||||
transaction,
|
transaction,
|
||||||
card, // Pass card with all relations loaded
|
card,
|
||||||
isTopUp: true, // Internal child transaction is a top-up to child's card
|
isTopUp: true,
|
||||||
isChildSpending: true, // Child's card is being topped up
|
isChildSpending: true,
|
||||||
timestamp: new Date(),
|
timestamp: new Date(),
|
||||||
};
|
};
|
||||||
this.eventEmitter.emit(NOTIFICATION_EVENTS.TRANSACTION_CREATED, event);
|
this.eventEmitter.emit(NOTIFICATION_EVENTS.TRANSACTION_CREATED, event);
|
||||||
|
|||||||
@ -13,4 +13,29 @@ export enum NotificationScope {
|
|||||||
// Transaction notifications - Spending
|
// Transaction notifications - Spending
|
||||||
CHILD_SPENDING = 'CHILD_SPENDING',
|
CHILD_SPENDING = 'CHILD_SPENDING',
|
||||||
PARENT_SPENDING_ALERT = 'PARENT_SPENDING_ALERT',
|
PARENT_SPENDING_ALERT = 'PARENT_SPENDING_ALERT',
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Critical notification scopes that require guaranteed delivery
|
||||||
|
* These will use RabbitMQ/Kafka instead of Redis PubSub when configured
|
||||||
|
*
|
||||||
|
* Add scopes here when you need guaranteed delivery for specific notification types
|
||||||
|
* Examples:
|
||||||
|
* - ACCOUNT_LOCKED
|
||||||
|
* - SUSPICIOUS_ACTIVITY
|
||||||
|
* - LARGE_TRANSACTION_ALERT
|
||||||
|
* - PAYMENT_FAILED
|
||||||
|
*/
|
||||||
|
export const CRITICAL_NOTIFICATION_SCOPES = new Set<NotificationScope>([
|
||||||
|
// Add critical scopes here as needed
|
||||||
|
// Example: NotificationScope.ACCOUNT_LOCKED,
|
||||||
|
]);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if a notification scope requires guaranteed delivery
|
||||||
|
* @param scope - Notification scope to check
|
||||||
|
* @returns true if the scope requires guaranteed delivery
|
||||||
|
*/
|
||||||
|
export function requiresGuaranteedDelivery(scope: NotificationScope): boolean {
|
||||||
|
return CRITICAL_NOTIFICATION_SCOPES.has(scope);
|
||||||
}
|
}
|
||||||
@ -1,2 +1,3 @@
|
|||||||
export * from './notification-page-meta.interface';
|
export * from './notification-page-meta.interface';
|
||||||
export * from './notification-events.interface';
|
export * from './notification-events.interface';
|
||||||
|
export * from './messaging-system.interface';
|
||||||
@ -0,0 +1,25 @@
|
|||||||
|
/**
|
||||||
|
* Interface for messaging systems (Redis PubSub, RabbitMQ, Kafka, etc.)
|
||||||
|
* Allows switching between different messaging systems based on notification requirements
|
||||||
|
*/
|
||||||
|
export interface IMessagingSystem {
|
||||||
|
/**
|
||||||
|
* Publish a notification event
|
||||||
|
* @param channel - Channel/topic name
|
||||||
|
* @param payload - Notification payload
|
||||||
|
*/
|
||||||
|
publish(channel: string, payload: any): Promise<void>;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Subscribe to a channel
|
||||||
|
* @param channel - Channel/topic name
|
||||||
|
* @param handler - Message handler function
|
||||||
|
*/
|
||||||
|
subscribe(channel: string, handler: (message: any) => Promise<void>): Promise<void>;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get system name (for logging)
|
||||||
|
*/
|
||||||
|
getName(): string;
|
||||||
|
}
|
||||||
|
|
||||||
@ -45,16 +45,12 @@ export class TransactionNotificationListener {
|
|||||||
`isTopUp: ${isTopUp}, isChildSpending: ${isChildSpending}`
|
`isTopUp: ${isTopUp}, isChildSpending: ${isChildSpending}`
|
||||||
);
|
);
|
||||||
|
|
||||||
// Notify the transaction owner (child or parent)
|
|
||||||
await this.notifyTransactionOwner(transaction, card, isTopUp, isChildSpending);
|
await this.notifyTransactionOwner(transaction, card, isTopUp, isChildSpending);
|
||||||
|
|
||||||
// If child transaction, also notify parent
|
|
||||||
if (isChildSpending) {
|
if (isChildSpending) {
|
||||||
if (isTopUp) {
|
if (isTopUp) {
|
||||||
// Parent topped up child's card - send confirmation to parent
|
|
||||||
await this.notifyParentOfTopUp(transaction, card);
|
await this.notifyParentOfTopUp(transaction, card);
|
||||||
} else {
|
} else {
|
||||||
// Child spent money - send spending alert to parent
|
|
||||||
await this.notifyParentOfChildSpending(transaction, card);
|
await this.notifyParentOfChildSpending(transaction, card);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -67,7 +63,6 @@ export class TransactionNotificationListener {
|
|||||||
`Failed to process transaction notification: ${error?.message || 'Unknown error'}`,
|
`Failed to process transaction notification: ${error?.message || 'Unknown error'}`,
|
||||||
error?.stack
|
error?.stack
|
||||||
);
|
);
|
||||||
// Don't throw - notification failures should not break the main flow
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -82,27 +77,22 @@ export class TransactionNotificationListener {
|
|||||||
isChildSpending: boolean
|
isChildSpending: boolean
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
try {
|
try {
|
||||||
// Extract user from card
|
|
||||||
const user = card?.customer?.user;
|
const user = card?.customer?.user;
|
||||||
if (!user) {
|
if (!user) {
|
||||||
this.logger.warn(`No user found for transaction ${transaction.id}, skipping notification`);
|
this.logger.warn(`No user found for transaction ${transaction.id}, skipping notification`);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Determine the scope based on transaction type
|
|
||||||
const scope = isTopUp
|
const scope = isTopUp
|
||||||
? NotificationScope.CHILD_TOP_UP
|
? NotificationScope.CHILD_TOP_UP
|
||||||
: NotificationScope.CHILD_SPENDING;
|
: NotificationScope.CHILD_SPENDING;
|
||||||
|
|
||||||
// Construct title
|
const title = isTopUp ? 'Card Topped Up' : 'Purchase Successful';
|
||||||
const title = isTopUp ? '💰 Card Topped Up' : '💳 Purchase Successful';
|
|
||||||
|
|
||||||
// Extract data
|
|
||||||
const amount = transaction.transactionAmount;
|
const amount = transaction.transactionAmount;
|
||||||
const merchant = transaction.merchantName || 'merchant';
|
const merchant = transaction.merchantName || 'merchant';
|
||||||
const balance = card.account?.balance || 0;
|
const balance = card.account?.balance || 0;
|
||||||
|
|
||||||
// Construct message
|
|
||||||
const message = isTopUp
|
const message = isTopUp
|
||||||
? `Your card has been topped up with $${amount.toFixed(2)}`
|
? `Your card has been topped up with $${amount.toFixed(2)}`
|
||||||
: `You spent $${amount.toFixed(2)} at ${merchant}. Balance: $${balance.toFixed(2)}`;
|
: `You spent $${amount.toFixed(2)} at ${merchant}. Balance: $${balance.toFixed(2)}`;
|
||||||
@ -111,7 +101,6 @@ export class TransactionNotificationListener {
|
|||||||
`Notifying transaction owner (user ${user.id}) - Amount: $${amount}, Merchant: ${merchant}`
|
`Notifying transaction owner (user ${user.id}) - Amount: $${amount}, Merchant: ${merchant}`
|
||||||
);
|
);
|
||||||
|
|
||||||
// Send notification
|
|
||||||
await this.notificationFactory.send({
|
await this.notificationFactory.send({
|
||||||
userId: user.id,
|
userId: user.id,
|
||||||
title,
|
title,
|
||||||
@ -147,7 +136,6 @@ export class TransactionNotificationListener {
|
|||||||
try {
|
try {
|
||||||
this.logger.debug(`Checking for parent to notify about child spending`);
|
this.logger.debug(`Checking for parent to notify about child spending`);
|
||||||
|
|
||||||
// Check if child has guardian
|
|
||||||
const customer = card?.customer;
|
const customer = card?.customer;
|
||||||
const parentUser = customer?.junior?.guardian?.customer?.user;
|
const parentUser = customer?.junior?.guardian?.customer?.user;
|
||||||
|
|
||||||
@ -156,7 +144,6 @@ export class TransactionNotificationListener {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get child info
|
|
||||||
const childUser = customer.user;
|
const childUser = customer.user;
|
||||||
const childName = childUser?.firstName || 'Your child';
|
const childName = childUser?.firstName || 'Your child';
|
||||||
const amount = transaction.transactionAmount;
|
const amount = transaction.transactionAmount;
|
||||||
@ -166,10 +153,9 @@ export class TransactionNotificationListener {
|
|||||||
`Notifying parent (user ${parentUser.id}): ${childName} spent $${amount} at ${merchant}`
|
`Notifying parent (user ${parentUser.id}): ${childName} spent $${amount} at ${merchant}`
|
||||||
);
|
);
|
||||||
|
|
||||||
// Send notification to parent
|
|
||||||
await this.notificationFactory.send({
|
await this.notificationFactory.send({
|
||||||
userId: parentUser.id,
|
userId: parentUser.id,
|
||||||
title: '🚨 Child Spending Alert',
|
title: 'Child Spending Alert',
|
||||||
message: `${childName} spent $${amount.toFixed(2)} at ${merchant}`,
|
message: `${childName} spent $${amount.toFixed(2)} at ${merchant}`,
|
||||||
scope: NotificationScope.PARENT_SPENDING_ALERT,
|
scope: NotificationScope.PARENT_SPENDING_ALERT,
|
||||||
preferences: this.getUserPreferences(parentUser),
|
preferences: this.getUserPreferences(parentUser),
|
||||||
@ -189,10 +175,9 @@ export class TransactionNotificationListener {
|
|||||||
this.logger.log(`✅ Notified parent ${parentUser.id} about child spending`);
|
this.logger.log(`✅ Notified parent ${parentUser.id} about child spending`);
|
||||||
} catch (error: any) {
|
} catch (error: any) {
|
||||||
this.logger.error(
|
this.logger.error(
|
||||||
`Failed to notify parent of child spending: ${error?.message || 'Unknown error'}`,
|
`Failed to notify parent of child spending: ${ error?.message || 'Unknown error'}`,
|
||||||
error?.stack
|
error?.stack
|
||||||
);
|
);
|
||||||
// Don't throw - parent notification failure should not break child notification
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -204,7 +189,6 @@ export class TransactionNotificationListener {
|
|||||||
try {
|
try {
|
||||||
this.logger.debug(`Checking for parent to notify about top-up`);
|
this.logger.debug(`Checking for parent to notify about top-up`);
|
||||||
|
|
||||||
// Check if child has guardian
|
|
||||||
const customer = card?.customer;
|
const customer = card?.customer;
|
||||||
const parentUser = customer?.junior?.guardian?.customer?.user;
|
const parentUser = customer?.junior?.guardian?.customer?.user;
|
||||||
|
|
||||||
@ -213,7 +197,6 @@ export class TransactionNotificationListener {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get child info
|
|
||||||
const childUser = customer.user;
|
const childUser = customer.user;
|
||||||
const childName = childUser?.firstName || 'Your child';
|
const childName = childUser?.firstName || 'Your child';
|
||||||
const amount = transaction.transactionAmount;
|
const amount = transaction.transactionAmount;
|
||||||
@ -223,10 +206,9 @@ export class TransactionNotificationListener {
|
|||||||
`Notifying parent (user ${parentUser.id}): Topped up ${childName}'s card with $${amount}`
|
`Notifying parent (user ${parentUser.id}): Topped up ${childName}'s card with $${amount}`
|
||||||
);
|
);
|
||||||
|
|
||||||
// Send notification to parent
|
|
||||||
await this.notificationFactory.send({
|
await this.notificationFactory.send({
|
||||||
userId: parentUser.id,
|
userId: parentUser.id,
|
||||||
title: '✅ Top-Up Confirmation',
|
title: 'Top-Up Confirmation',
|
||||||
message: `You topped up ${childName}'s card with $${amount.toFixed(2)}. New balance: $${balance.toFixed(2)}`,
|
message: `You topped up ${childName}'s card with $${amount.toFixed(2)}. New balance: $${balance.toFixed(2)}`,
|
||||||
scope: NotificationScope.PARENT_TOP_UP_CONFIRMATION,
|
scope: NotificationScope.PARENT_TOP_UP_CONFIRMATION,
|
||||||
preferences: this.getUserPreferences(parentUser),
|
preferences: this.getUserPreferences(parentUser),
|
||||||
@ -245,10 +227,9 @@ export class TransactionNotificationListener {
|
|||||||
this.logger.log(`✅ Notified parent ${parentUser.id} about top-up`);
|
this.logger.log(`✅ Notified parent ${parentUser.id} about top-up`);
|
||||||
} catch (error: any) {
|
} catch (error: any) {
|
||||||
this.logger.error(
|
this.logger.error(
|
||||||
`Failed to notify parent of top-up: ${error?.message || 'Unknown error'}`,
|
`Failed to notify parent of top-up: ${ error?.message || 'Unknown error'}`,
|
||||||
error?.stack
|
error?.stack
|
||||||
);
|
);
|
||||||
// Don't throw - parent notification failure should not break child notification
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -11,6 +11,7 @@ import { Notification } from './entities';
|
|||||||
import { NotificationCreatedListener, TransactionNotificationListener } from './listeners';
|
import { NotificationCreatedListener, TransactionNotificationListener } from './listeners';
|
||||||
import { NotificationsRepository } from './repositories';
|
import { NotificationsRepository } from './repositories';
|
||||||
import { FirebaseService, NotificationFactory, NotificationsService, TwilioService } from './services';
|
import { FirebaseService, NotificationFactory, NotificationsService, TwilioService } from './services';
|
||||||
|
import { MessagingSystemFactory, RedisPubSubMessagingService } from './services/messaging';
|
||||||
|
|
||||||
@Module({
|
@Module({
|
||||||
imports: [
|
imports: [
|
||||||
@ -34,6 +35,8 @@ import { FirebaseService, NotificationFactory, NotificationsService, TwilioServi
|
|||||||
TwilioService,
|
TwilioService,
|
||||||
NotificationCreatedListener,
|
NotificationCreatedListener,
|
||||||
TransactionNotificationListener,
|
TransactionNotificationListener,
|
||||||
|
RedisPubSubMessagingService,
|
||||||
|
MessagingSystemFactory,
|
||||||
],
|
],
|
||||||
exports: [NotificationsService, NotificationFactory, NotificationCreatedListener],
|
exports: [NotificationsService, NotificationFactory, NotificationCreatedListener],
|
||||||
controllers: [NotificationsController],
|
controllers: [NotificationsController],
|
||||||
|
|||||||
@ -0,0 +1,3 @@
|
|||||||
|
export * from './redis-pubsub-messaging.service';
|
||||||
|
export * from './messaging-system-factory.service';
|
||||||
|
|
||||||
@ -0,0 +1,58 @@
|
|||||||
|
import { Injectable, Logger, Optional } from '@nestjs/common';
|
||||||
|
import { NotificationScope, requiresGuaranteedDelivery } from '../../enums/notification-scope.enum';
|
||||||
|
import { IMessagingSystem } from '../../interfaces/messaging-system.interface';
|
||||||
|
import { RedisPubSubMessagingService } from './redis-pubsub-messaging.service';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Messaging System Factory
|
||||||
|
*
|
||||||
|
* Determines which messaging system to use based on notification requirements.
|
||||||
|
*
|
||||||
|
* - Regular notifications → Redis PubSub (fast, 2-5ms)
|
||||||
|
* - Critical notifications → RabbitMQ/Kafka (guaranteed delivery, 20-50ms)
|
||||||
|
*
|
||||||
|
* Usage:
|
||||||
|
* ```typescript
|
||||||
|
* const system = factory.getMessagingSystem(NotificationScope.CHILD_SPENDING);
|
||||||
|
* await system.publish('NOTIFICATION_CREATED', payload);
|
||||||
|
* ```
|
||||||
|
*/
|
||||||
|
@Injectable()
|
||||||
|
export class MessagingSystemFactory {
|
||||||
|
private readonly logger = new Logger(MessagingSystemFactory.name);
|
||||||
|
|
||||||
|
constructor(
|
||||||
|
private readonly redisPubSubService: RedisPubSubMessagingService,
|
||||||
|
) {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the appropriate messaging system based on notification scope
|
||||||
|
*
|
||||||
|
* @param scope - Notification scope
|
||||||
|
* @returns Messaging system to use
|
||||||
|
*/
|
||||||
|
getMessagingSystem(scope: NotificationScope): IMessagingSystem {
|
||||||
|
const needsGuaranteedDelivery = requiresGuaranteedDelivery(scope);
|
||||||
|
|
||||||
|
if (needsGuaranteedDelivery) {
|
||||||
|
this.logger.warn(
|
||||||
|
`[Factory] Critical notification ${scope} requires guaranteed delivery, ` +
|
||||||
|
`but RabbitMQ not configured. Falling back to Redis PubSub.`
|
||||||
|
);
|
||||||
|
return this.redisPubSubService;
|
||||||
|
} else {
|
||||||
|
this.logger.debug(`[Factory] Using Redis PubSub for notification: ${scope}`);
|
||||||
|
return this.redisPubSubService;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get default messaging system (Redis PubSub)
|
||||||
|
*
|
||||||
|
* @returns Default messaging system
|
||||||
|
*/
|
||||||
|
getDefaultMessagingSystem(): IMessagingSystem {
|
||||||
|
return this.redisPubSubService;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@ -0,0 +1,55 @@
|
|||||||
|
import { Inject, Injectable, Logger } from '@nestjs/common';
|
||||||
|
import { RedisClientType } from '@keyv/redis';
|
||||||
|
import { IMessagingSystem } from '../../interfaces/messaging-system.interface';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Redis PubSub Messaging System Implementation
|
||||||
|
*
|
||||||
|
* Fast, real-time messaging for regular notifications.
|
||||||
|
* Uses Redis PubSub for 2-5ms latency.
|
||||||
|
*
|
||||||
|
* Note: Messages are not persisted (fire-and-forget).
|
||||||
|
* Suitable for notifications that are already saved in PostgreSQL.
|
||||||
|
*/
|
||||||
|
@Injectable()
|
||||||
|
export class RedisPubSubMessagingService implements IMessagingSystem {
|
||||||
|
private readonly logger = new Logger(RedisPubSubMessagingService.name);
|
||||||
|
|
||||||
|
constructor(
|
||||||
|
@Inject('REDIS_PUBLISHER') private readonly publisher: RedisClientType,
|
||||||
|
@Inject('REDIS_SUBSCRIBER') private readonly subscriber: RedisClientType,
|
||||||
|
) {}
|
||||||
|
|
||||||
|
getName(): string {
|
||||||
|
return 'Redis PubSub';
|
||||||
|
}
|
||||||
|
|
||||||
|
async publish(channel: string, payload: any): Promise<void> {
|
||||||
|
try {
|
||||||
|
const message = JSON.stringify(payload);
|
||||||
|
const subscriberCount = await this.publisher.publish(channel, message);
|
||||||
|
|
||||||
|
this.logger.debug(
|
||||||
|
`[Redis PubSub] Published to ${channel}, ${subscriberCount} subscriber(s) received`
|
||||||
|
);
|
||||||
|
} catch (error: any) {
|
||||||
|
this.logger.error(`[Redis PubSub] Failed to publish to ${channel}: ${error?.message}`);
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async subscribe(channel: string, handler: (message: any) => Promise<void>): Promise<void> {
|
||||||
|
await this.subscriber.subscribe(channel, async (message) => {
|
||||||
|
try {
|
||||||
|
const data = JSON.parse(message);
|
||||||
|
await handler(data);
|
||||||
|
} catch (error: any) {
|
||||||
|
this.logger.error(
|
||||||
|
`[Redis PubSub] Failed to process message from ${channel}: ${error?.message}`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
this.logger.log(`[Redis PubSub] Subscribed to channel: ${channel}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -89,7 +89,6 @@ export class NotificationFactory {
|
|||||||
try {
|
try {
|
||||||
this.logger.log(`Sending notification to user ${payload.userId} - ${payload.title}`);
|
this.logger.log(`Sending notification to user ${payload.userId} - ${payload.title}`);
|
||||||
|
|
||||||
// Use provided preferences or default to push-only
|
|
||||||
const preferences = payload.preferences || {
|
const preferences = payload.preferences || {
|
||||||
isPushEnabled: true,
|
isPushEnabled: true,
|
||||||
isEmailEnabled: false,
|
isEmailEnabled: false,
|
||||||
@ -98,8 +97,6 @@ export class NotificationFactory {
|
|||||||
|
|
||||||
const promises: Promise<any>[] = [];
|
const promises: Promise<any>[] = [];
|
||||||
|
|
||||||
// Route to enabled channels based on preferences
|
|
||||||
// Currently only PUSH is implemented (extensible for EMAIL, SMS later)
|
|
||||||
if (preferences.isPushEnabled) {
|
if (preferences.isPushEnabled) {
|
||||||
this.logger.debug(`Routing to PUSH channel for user ${payload.userId}`);
|
this.logger.debug(`Routing to PUSH channel for user ${payload.userId}`);
|
||||||
promises.push(
|
promises.push(
|
||||||
@ -107,23 +104,6 @@ export class NotificationFactory {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Future: Add EMAIL channel
|
|
||||||
// if (preferences.isEmailEnabled) {
|
|
||||||
// this.logger.debug(`Routing to EMAIL channel for user ${payload.userId}`);
|
|
||||||
// promises.push(
|
|
||||||
// this.sendToChannel(payload, NotificationChannel.EMAIL)
|
|
||||||
// );
|
|
||||||
// }
|
|
||||||
|
|
||||||
// Future: Add SMS channel
|
|
||||||
// if (preferences.isSmsEnabled) {
|
|
||||||
// this.logger.debug(`Routing to SMS channel for user ${payload.userId}`);
|
|
||||||
// promises.push(
|
|
||||||
// this.sendToChannel(payload, NotificationChannel.SMS)
|
|
||||||
// );
|
|
||||||
// }
|
|
||||||
|
|
||||||
// Send all notificaetions in parallel
|
|
||||||
await Promise.all(promises);
|
await Promise.all(promises);
|
||||||
|
|
||||||
this.logger.log(
|
this.logger.log(
|
||||||
|
|||||||
@ -8,6 +8,7 @@ import { SendEmailRequestDto } from '../dtos/request';
|
|||||||
import { Notification } from '../entities';
|
import { Notification } from '../entities';
|
||||||
import { EventType, NotificationChannel, NotificationScope } from '../enums';
|
import { EventType, NotificationChannel, NotificationScope } from '../enums';
|
||||||
import { NotificationsRepository } from '../repositories';
|
import { NotificationsRepository } from '../repositories';
|
||||||
|
import { MessagingSystemFactory } from './messaging/messaging-system-factory.service';
|
||||||
|
|
||||||
@Injectable()
|
@Injectable()
|
||||||
export class NotificationsService {
|
export class NotificationsService {
|
||||||
@ -17,6 +18,7 @@ export class NotificationsService {
|
|||||||
|
|
||||||
@Inject(forwardRef(() => RedisPubSubService))
|
@Inject(forwardRef(() => RedisPubSubService))
|
||||||
private readonly redisPubSubService: RedisPubSubService,
|
private readonly redisPubSubService: RedisPubSubService,
|
||||||
|
private readonly messagingSystemFactory: MessagingSystemFactory,
|
||||||
) {}
|
) {}
|
||||||
|
|
||||||
async getNotifications(userId: string, pageOptionsDto: PageOptionsRequestDto) {
|
async getNotifications(userId: string, pageOptionsDto: PageOptionsRequestDto) {
|
||||||
@ -31,9 +33,29 @@ export class NotificationsService {
|
|||||||
return { notifications, count, unreadCount };
|
return { notifications, count, unreadCount };
|
||||||
}
|
}
|
||||||
|
|
||||||
createNotification(notification: Partial<Notification>) {
|
async createNotification(notification: Partial<Notification>) {
|
||||||
this.logger.log(`Creating notification for user ${notification.userId}`);
|
this.logger.log(`Creating notification for user ${notification.userId}`);
|
||||||
return this.notificationRepository.createNotification(notification);
|
const savedNotification = await this.notificationRepository.createNotification(notification);
|
||||||
|
|
||||||
|
const scope = notification.scope || NotificationScope.USER_REGISTERED;
|
||||||
|
const messagingSystem = this.messagingSystemFactory.getMessagingSystem(scope);
|
||||||
|
|
||||||
|
this.logger.log(
|
||||||
|
`Publishing ${EventType.NOTIFICATION_CREATED} event to ${messagingSystem.getName()}`
|
||||||
|
);
|
||||||
|
|
||||||
|
messagingSystem.publish(EventType.NOTIFICATION_CREATED, {
|
||||||
|
...savedNotification,
|
||||||
|
data: notification.data || savedNotification.data,
|
||||||
|
}).catch((error) => {
|
||||||
|
this.logger.error(
|
||||||
|
`Failed to publish notification ${savedNotification.id} to ${messagingSystem.getName()}: ` +
|
||||||
|
`${error?.message || 'Unknown error'}`,
|
||||||
|
error?.stack
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
return savedNotification;
|
||||||
}
|
}
|
||||||
|
|
||||||
markAsRead(userId: string) {
|
markAsRead(userId: string) {
|
||||||
@ -42,34 +64,25 @@ export class NotificationsService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async sendEmailAsync(data: SendEmailRequestDto) {
|
async sendEmailAsync(data: SendEmailRequestDto) {
|
||||||
this.logger.log(`emitting ${EventType.NOTIFICATION_CREATED} event`);
|
this.logger.log(`Creating email notification for ${data.to}`);
|
||||||
const notification = await this.createNotification({
|
await this.createNotification({
|
||||||
recipient: data.to,
|
recipient: data.to,
|
||||||
title: data.subject,
|
title: data.subject,
|
||||||
message: '',
|
message: '',
|
||||||
scope: NotificationScope.USER_INVITED,
|
scope: NotificationScope.USER_INVITED,
|
||||||
channel: NotificationChannel.EMAIL,
|
channel: NotificationChannel.EMAIL,
|
||||||
});
|
data: data.data,
|
||||||
// return this.redisPubSubService.emit(EventType.NOTIFICATION_CREATED, notification, data.data);
|
|
||||||
this.redisPubSubService.publishEvent(EventType.NOTIFICATION_CREATED, {
|
|
||||||
...notification,
|
|
||||||
data,
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async sendOtpNotification(sendOtpRequest: ISendOtp, otp: string) {
|
async sendOtpNotification(sendOtpRequest: ISendOtp, otp: string) {
|
||||||
this.logger.log(`Sending OTP to ${sendOtpRequest.recipient}`);
|
this.logger.log(`Sending OTP to ${sendOtpRequest.recipient}`);
|
||||||
const notification = await this.createNotification({
|
return this.createNotification({
|
||||||
recipient: sendOtpRequest.recipient,
|
recipient: sendOtpRequest.recipient,
|
||||||
title: OTP_TITLE,
|
title: OTP_TITLE,
|
||||||
message: OTP_BODY.replace('{otp}', otp),
|
message: OTP_BODY.replace('{otp}', otp),
|
||||||
scope: NotificationScope.OTP,
|
scope: NotificationScope.OTP,
|
||||||
channel: sendOtpRequest.otpType === OtpType.EMAIL ? NotificationChannel.EMAIL : NotificationChannel.SMS,
|
channel: sendOtpRequest.otpType === OtpType.EMAIL ? NotificationChannel.EMAIL : NotificationChannel.SMS,
|
||||||
});
|
|
||||||
|
|
||||||
this.logger.log(`emitting ${EventType.NOTIFICATION_CREATED} event`);
|
|
||||||
return this.redisPubSubService.publishEvent(EventType.NOTIFICATION_CREATED, {
|
|
||||||
...notification,
|
|
||||||
data: { otp },
|
data: { otp },
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
@ -32,7 +32,11 @@ export class RedisModule {
|
|||||||
},
|
},
|
||||||
RedisPubSubService,
|
RedisPubSubService,
|
||||||
],
|
],
|
||||||
exports: [RedisPubSubService],
|
exports: [
|
||||||
|
RedisPubSubService,
|
||||||
|
'REDIS_PUBLISHER',
|
||||||
|
'REDIS_SUBSCRIBER',
|
||||||
|
],
|
||||||
imports: [NotificationModule],
|
imports: [NotificationModule],
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user