From 6e118129256d964123536ba7d3285a7e8622c899 Mon Sep 17 00:00:00 2001 From: Abdalhamid Alhamad Date: Wed, 4 Feb 2026 11:29:20 +0300 Subject: [PATCH] feat(allowance): enhance logging in allowance queue and worker services - Added detailed logging for enqueueing allowance jobs, processing jobs, and handling errors. - Improved validation checks with warnings for invalid payloads and schedules. - Enhanced logging for credit creation and transfer processes, including success and failure scenarios. - Updated cron job logging to track the processing of due schedules and completion status. --- .../services/allowance-queue.service.ts | 2 ++ .../services/allowance-worker.service.ts | 29 +++++++++++++++++-- src/cron/tasks/allowance-schedule.cron.ts | 13 +++++++++ 3 files changed, 42 insertions(+), 2 deletions(-) diff --git a/src/allowance/services/allowance-queue.service.ts b/src/allowance/services/allowance-queue.service.ts index dce8948..c88f1a9 100644 --- a/src/allowance/services/allowance-queue.service.ts +++ b/src/allowance/services/allowance-queue.service.ts @@ -78,7 +78,9 @@ export class AllowanceQueueService implements OnModuleDestroy { contentType: 'application/json', }; + this.logger.log(`Enqueueing allowance job - scheduleId: ${scheduleId}, runAt: ${runAt.toISOString()}`); await this.channel.sendToQueue(this.queueName, Buffer.from(JSON.stringify(payload)), options); + this.logger.log(`Allowance job enqueued successfully - messageId: ${messageId}`); } async onModuleDestroy() { diff --git a/src/allowance/services/allowance-worker.service.ts b/src/allowance/services/allowance-worker.service.ts index 558a053..03d1f4b 100644 --- a/src/allowance/services/allowance-worker.service.ts +++ b/src/allowance/services/allowance-worker.service.ts @@ -107,39 +107,64 @@ export class AllowanceWorkerService implements OnModuleInit, OnModuleDestroy { private async processAllowanceJob(payload: AllowanceQueuePayload): Promise { const runAt = new Date(payload.runAt); if (!payload.scheduleId || Number.isNaN(runAt.getTime())) { + this.logger.warn(`Invalid payload - scheduleId: ${payload.scheduleId}, runAt: ${payload.runAt}`); return; } + this.logger.log(`Processing allowance job - scheduleId: ${payload.scheduleId}, runAt: ${runAt.toISOString()}`); + const schedule = await this.allowanceScheduleRepository.findById(payload.scheduleId); if (!schedule) { + this.logger.warn(`Schedule not found: ${payload.scheduleId}`); return; } + this.logger.debug(`Schedule found - juniorId: ${schedule.juniorId}, amount: ${schedule.amount}, status: ${schedule.status}, nextRunAt: ${schedule.nextRunAt}`); + if (schedule.status !== AllowanceScheduleStatus.ON) { + this.logger.warn(`Schedule ${payload.scheduleId} is not ON (status: ${schedule.status}). Skipping.`); return; } if (schedule.nextRunAt > runAt) { + this.logger.warn(`Schedule ${payload.scheduleId} nextRunAt (${schedule.nextRunAt}) > runAt (${runAt}). Skipping.`); return; } + // Convert amount from decimal string to number + const amount = Number(schedule.amount); + if (isNaN(amount) || amount <= 0) { + this.logger.error(`Invalid amount for schedule ${payload.scheduleId}: ${schedule.amount}`); + return; + } + + this.logger.log(`Creating allowance credit - scheduleId: ${payload.scheduleId}, amount: ${amount}`); + let credit = null; try { - credit = await this.allowanceCreditRepository.createCredit(schedule.id, schedule.amount, runAt); + credit = await this.allowanceCreditRepository.createCredit(schedule.id, amount, runAt); + this.logger.log(`Credit created: ${credit.id}`); } catch (error: any) { if (error?.code === '23505') { + this.logger.warn(`Credit already exists for schedule ${payload.scheduleId} at ${runAt.toISOString()} (idempotency check)`); return; } throw error; } try { - await this.cardService.transferToChild(schedule.juniorId, schedule.amount); + this.logger.log(`Transferring ${amount} to junior ${schedule.juniorId}`); + await this.cardService.transferToChild(schedule.juniorId, amount); + this.logger.log(`Transfer successful for junior ${schedule.juniorId}`); + const nextRunAt = this.computeNextRunAt(schedule.frequency); await this.allowanceScheduleRepository.updateScheduleRun(schedule.id, runAt, nextRunAt); + this.logger.log(`Schedule ${payload.scheduleId} updated - lastRunAt: ${runAt.toISOString()}, nextRunAt: ${nextRunAt.toISOString()}`); } catch (error) { + this.logger.error(`Transfer failed for schedule ${payload.scheduleId}: ${error instanceof Error ? error.message : error}`); if (credit) { await this.allowanceCreditRepository.deleteById(credit.id); + this.logger.log(`Credit ${credit.id} deleted due to transfer failure`); } throw error; } diff --git a/src/cron/tasks/allowance-schedule.cron.ts b/src/cron/tasks/allowance-schedule.cron.ts index ff4de51..7581652 100644 --- a/src/cron/tasks/allowance-schedule.cron.ts +++ b/src/cron/tasks/allowance-schedule.cron.ts @@ -20,11 +20,15 @@ export class AllowanceScheduleCron { @Cron(CronExpression.EVERY_5_MINUTES) async enqueueDueSchedules() { + this.logger.log('Starting allowance schedule cron job'); + const hasLock = await this.baseCronService.acquireLock(this.lockKey, 240); if (!hasLock) { + this.logger.warn('Could not acquire lock for allowance cron job - another instance may be running'); return; } + this.logger.log('Lock acquired, starting to process due schedules'); const cronRun = await this.cronRunService.start(this.jobName); let processedCount = 0; try { @@ -34,10 +38,17 @@ export class AllowanceScheduleCron { while (processedBatches < 50) { const schedules = await this.allowanceScheduleRepository.findDueSchedulesBatch(batchSize, cursor); + this.logger.log(`Found ${schedules.length} due schedules in batch ${processedBatches + 1}`); + if (!schedules.length) { + this.logger.log('No more due schedules to process'); break; } + for (const schedule of schedules) { + this.logger.debug(`Enqueueing schedule ${schedule.id} - juniorId: ${schedule.juniorId}, amount: ${schedule.amount}, nextRunAt: ${schedule.nextRunAt}`); + } + await Promise.all( schedules.map((schedule) => this.allowanceQueueService.enqueueSchedule(schedule.id, schedule.nextRunAt)), ); @@ -51,6 +62,8 @@ export class AllowanceScheduleCron { break; } } + + this.logger.log(`Allowance cron job completed - processed ${processedCount} schedules`); await this.cronRunService.success(cronRun.id, processedCount); } catch (error) { const stack = error instanceof Error ? error.stack : undefined;