mirror of
https://github.com/HamzaSha1/zod-backend.git
synced 2026-03-10 18:41:46 +00:00
feat(allowance): enhance logging in allowance queue and worker services
- Added detailed logging for enqueueing allowance jobs, processing jobs, and handling errors. - Improved validation checks with warnings for invalid payloads and schedules. - Enhanced logging for credit creation and transfer processes, including success and failure scenarios. - Updated cron job logging to track the processing of due schedules and completion status.
This commit is contained in:
@ -78,7 +78,9 @@ export class AllowanceQueueService implements OnModuleDestroy {
|
|||||||
contentType: 'application/json',
|
contentType: 'application/json',
|
||||||
};
|
};
|
||||||
|
|
||||||
|
this.logger.log(`Enqueueing allowance job - scheduleId: ${scheduleId}, runAt: ${runAt.toISOString()}`);
|
||||||
await this.channel.sendToQueue(this.queueName, Buffer.from(JSON.stringify(payload)), options);
|
await this.channel.sendToQueue(this.queueName, Buffer.from(JSON.stringify(payload)), options);
|
||||||
|
this.logger.log(`Allowance job enqueued successfully - messageId: ${messageId}`);
|
||||||
}
|
}
|
||||||
|
|
||||||
async onModuleDestroy() {
|
async onModuleDestroy() {
|
||||||
|
|||||||
@ -107,39 +107,64 @@ export class AllowanceWorkerService implements OnModuleInit, OnModuleDestroy {
|
|||||||
private async processAllowanceJob(payload: AllowanceQueuePayload): Promise<void> {
|
private async processAllowanceJob(payload: AllowanceQueuePayload): Promise<void> {
|
||||||
const runAt = new Date(payload.runAt);
|
const runAt = new Date(payload.runAt);
|
||||||
if (!payload.scheduleId || Number.isNaN(runAt.getTime())) {
|
if (!payload.scheduleId || Number.isNaN(runAt.getTime())) {
|
||||||
|
this.logger.warn(`Invalid payload - scheduleId: ${payload.scheduleId}, runAt: ${payload.runAt}`);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.logger.log(`Processing allowance job - scheduleId: ${payload.scheduleId}, runAt: ${runAt.toISOString()}`);
|
||||||
|
|
||||||
const schedule = await this.allowanceScheduleRepository.findById(payload.scheduleId);
|
const schedule = await this.allowanceScheduleRepository.findById(payload.scheduleId);
|
||||||
if (!schedule) {
|
if (!schedule) {
|
||||||
|
this.logger.warn(`Schedule not found: ${payload.scheduleId}`);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.logger.debug(`Schedule found - juniorId: ${schedule.juniorId}, amount: ${schedule.amount}, status: ${schedule.status}, nextRunAt: ${schedule.nextRunAt}`);
|
||||||
|
|
||||||
if (schedule.status !== AllowanceScheduleStatus.ON) {
|
if (schedule.status !== AllowanceScheduleStatus.ON) {
|
||||||
|
this.logger.warn(`Schedule ${payload.scheduleId} is not ON (status: ${schedule.status}). Skipping.`);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (schedule.nextRunAt > runAt) {
|
if (schedule.nextRunAt > runAt) {
|
||||||
|
this.logger.warn(`Schedule ${payload.scheduleId} nextRunAt (${schedule.nextRunAt}) > runAt (${runAt}). Skipping.`);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Convert amount from decimal string to number
|
||||||
|
const amount = Number(schedule.amount);
|
||||||
|
if (isNaN(amount) || amount <= 0) {
|
||||||
|
this.logger.error(`Invalid amount for schedule ${payload.scheduleId}: ${schedule.amount}`);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.logger.log(`Creating allowance credit - scheduleId: ${payload.scheduleId}, amount: ${amount}`);
|
||||||
|
|
||||||
let credit = null;
|
let credit = null;
|
||||||
try {
|
try {
|
||||||
credit = await this.allowanceCreditRepository.createCredit(schedule.id, schedule.amount, runAt);
|
credit = await this.allowanceCreditRepository.createCredit(schedule.id, amount, runAt);
|
||||||
|
this.logger.log(`Credit created: ${credit.id}`);
|
||||||
} catch (error: any) {
|
} catch (error: any) {
|
||||||
if (error?.code === '23505') {
|
if (error?.code === '23505') {
|
||||||
|
this.logger.warn(`Credit already exists for schedule ${payload.scheduleId} at ${runAt.toISOString()} (idempotency check)`);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
throw error;
|
throw error;
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await this.cardService.transferToChild(schedule.juniorId, schedule.amount);
|
this.logger.log(`Transferring ${amount} to junior ${schedule.juniorId}`);
|
||||||
|
await this.cardService.transferToChild(schedule.juniorId, amount);
|
||||||
|
this.logger.log(`Transfer successful for junior ${schedule.juniorId}`);
|
||||||
|
|
||||||
const nextRunAt = this.computeNextRunAt(schedule.frequency);
|
const nextRunAt = this.computeNextRunAt(schedule.frequency);
|
||||||
await this.allowanceScheduleRepository.updateScheduleRun(schedule.id, runAt, nextRunAt);
|
await this.allowanceScheduleRepository.updateScheduleRun(schedule.id, runAt, nextRunAt);
|
||||||
|
this.logger.log(`Schedule ${payload.scheduleId} updated - lastRunAt: ${runAt.toISOString()}, nextRunAt: ${nextRunAt.toISOString()}`);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
|
this.logger.error(`Transfer failed for schedule ${payload.scheduleId}: ${error instanceof Error ? error.message : error}`);
|
||||||
if (credit) {
|
if (credit) {
|
||||||
await this.allowanceCreditRepository.deleteById(credit.id);
|
await this.allowanceCreditRepository.deleteById(credit.id);
|
||||||
|
this.logger.log(`Credit ${credit.id} deleted due to transfer failure`);
|
||||||
}
|
}
|
||||||
throw error;
|
throw error;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -20,11 +20,15 @@ export class AllowanceScheduleCron {
|
|||||||
|
|
||||||
@Cron(CronExpression.EVERY_5_MINUTES)
|
@Cron(CronExpression.EVERY_5_MINUTES)
|
||||||
async enqueueDueSchedules() {
|
async enqueueDueSchedules() {
|
||||||
|
this.logger.log('Starting allowance schedule cron job');
|
||||||
|
|
||||||
const hasLock = await this.baseCronService.acquireLock(this.lockKey, 240);
|
const hasLock = await this.baseCronService.acquireLock(this.lockKey, 240);
|
||||||
if (!hasLock) {
|
if (!hasLock) {
|
||||||
|
this.logger.warn('Could not acquire lock for allowance cron job - another instance may be running');
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.logger.log('Lock acquired, starting to process due schedules');
|
||||||
const cronRun = await this.cronRunService.start(this.jobName);
|
const cronRun = await this.cronRunService.start(this.jobName);
|
||||||
let processedCount = 0;
|
let processedCount = 0;
|
||||||
try {
|
try {
|
||||||
@ -34,10 +38,17 @@ export class AllowanceScheduleCron {
|
|||||||
|
|
||||||
while (processedBatches < 50) {
|
while (processedBatches < 50) {
|
||||||
const schedules = await this.allowanceScheduleRepository.findDueSchedulesBatch(batchSize, cursor);
|
const schedules = await this.allowanceScheduleRepository.findDueSchedulesBatch(batchSize, cursor);
|
||||||
|
this.logger.log(`Found ${schedules.length} due schedules in batch ${processedBatches + 1}`);
|
||||||
|
|
||||||
if (!schedules.length) {
|
if (!schedules.length) {
|
||||||
|
this.logger.log('No more due schedules to process');
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (const schedule of schedules) {
|
||||||
|
this.logger.debug(`Enqueueing schedule ${schedule.id} - juniorId: ${schedule.juniorId}, amount: ${schedule.amount}, nextRunAt: ${schedule.nextRunAt}`);
|
||||||
|
}
|
||||||
|
|
||||||
await Promise.all(
|
await Promise.all(
|
||||||
schedules.map((schedule) => this.allowanceQueueService.enqueueSchedule(schedule.id, schedule.nextRunAt)),
|
schedules.map((schedule) => this.allowanceQueueService.enqueueSchedule(schedule.id, schedule.nextRunAt)),
|
||||||
);
|
);
|
||||||
@ -51,6 +62,8 @@ export class AllowanceScheduleCron {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.logger.log(`Allowance cron job completed - processed ${processedCount} schedules`);
|
||||||
await this.cronRunService.success(cronRun.id, processedCount);
|
await this.cronRunService.success(cronRun.id, processedCount);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
const stack = error instanceof Error ? error.stack : undefined;
|
const stack = error instanceof Error ? error.stack : undefined;
|
||||||
|
|||||||
Reference in New Issue
Block a user