Merge pull request #97 from Zod-Alkhair/feature/allowance-scheduling-cron-queue

feat(allowance): enhance logging in allowance queue and worker services
This commit is contained in:
Majdalkilany0
2026-02-04 11:45:45 +03:00
committed by GitHub
3 changed files with 42 additions and 2 deletions

View File

@ -78,7 +78,9 @@ export class AllowanceQueueService implements OnModuleDestroy {
contentType: 'application/json',
};
this.logger.log(`Enqueueing allowance job - scheduleId: ${scheduleId}, runAt: ${runAt.toISOString()}`);
await this.channel.sendToQueue(this.queueName, Buffer.from(JSON.stringify(payload)), options);
this.logger.log(`Allowance job enqueued successfully - messageId: ${messageId}`);
}
async onModuleDestroy() {

View File

@ -107,39 +107,64 @@ export class AllowanceWorkerService implements OnModuleInit, OnModuleDestroy {
private async processAllowanceJob(payload: AllowanceQueuePayload): Promise<void> {
const runAt = new Date(payload.runAt);
if (!payload.scheduleId || Number.isNaN(runAt.getTime())) {
this.logger.warn(`Invalid payload - scheduleId: ${payload.scheduleId}, runAt: ${payload.runAt}`);
return;
}
this.logger.log(`Processing allowance job - scheduleId: ${payload.scheduleId}, runAt: ${runAt.toISOString()}`);
const schedule = await this.allowanceScheduleRepository.findById(payload.scheduleId);
if (!schedule) {
this.logger.warn(`Schedule not found: ${payload.scheduleId}`);
return;
}
this.logger.debug(`Schedule found - juniorId: ${schedule.juniorId}, amount: ${schedule.amount}, status: ${schedule.status}, nextRunAt: ${schedule.nextRunAt}`);
if (schedule.status !== AllowanceScheduleStatus.ON) {
this.logger.warn(`Schedule ${payload.scheduleId} is not ON (status: ${schedule.status}). Skipping.`);
return;
}
if (schedule.nextRunAt > runAt) {
this.logger.warn(`Schedule ${payload.scheduleId} nextRunAt (${schedule.nextRunAt}) > runAt (${runAt}). Skipping.`);
return;
}
// Convert amount from decimal string to number
const amount = Number(schedule.amount);
if (isNaN(amount) || amount <= 0) {
this.logger.error(`Invalid amount for schedule ${payload.scheduleId}: ${schedule.amount}`);
return;
}
this.logger.log(`Creating allowance credit - scheduleId: ${payload.scheduleId}, amount: ${amount}`);
let credit = null;
try {
credit = await this.allowanceCreditRepository.createCredit(schedule.id, schedule.amount, runAt);
credit = await this.allowanceCreditRepository.createCredit(schedule.id, amount, runAt);
this.logger.log(`Credit created: ${credit.id}`);
} catch (error: any) {
if (error?.code === '23505') {
this.logger.warn(`Credit already exists for schedule ${payload.scheduleId} at ${runAt.toISOString()} (idempotency check)`);
return;
}
throw error;
}
try {
await this.cardService.transferToChild(schedule.juniorId, schedule.amount);
this.logger.log(`Transferring ${amount} to junior ${schedule.juniorId}`);
await this.cardService.transferToChild(schedule.juniorId, amount);
this.logger.log(`Transfer successful for junior ${schedule.juniorId}`);
const nextRunAt = this.computeNextRunAt(schedule.frequency);
await this.allowanceScheduleRepository.updateScheduleRun(schedule.id, runAt, nextRunAt);
this.logger.log(`Schedule ${payload.scheduleId} updated - lastRunAt: ${runAt.toISOString()}, nextRunAt: ${nextRunAt.toISOString()}`);
} catch (error) {
this.logger.error(`Transfer failed for schedule ${payload.scheduleId}: ${error instanceof Error ? error.message : error}`);
if (credit) {
await this.allowanceCreditRepository.deleteById(credit.id);
this.logger.log(`Credit ${credit.id} deleted due to transfer failure`);
}
throw error;
}

View File

@ -20,11 +20,15 @@ export class AllowanceScheduleCron {
@Cron(CronExpression.EVERY_5_MINUTES)
async enqueueDueSchedules() {
this.logger.log('Starting allowance schedule cron job');
const hasLock = await this.baseCronService.acquireLock(this.lockKey, 240);
if (!hasLock) {
this.logger.warn('Could not acquire lock for allowance cron job - another instance may be running');
return;
}
this.logger.log('Lock acquired, starting to process due schedules');
const cronRun = await this.cronRunService.start(this.jobName);
let processedCount = 0;
try {
@ -34,10 +38,17 @@ export class AllowanceScheduleCron {
while (processedBatches < 50) {
const schedules = await this.allowanceScheduleRepository.findDueSchedulesBatch(batchSize, cursor);
this.logger.log(`Found ${schedules.length} due schedules in batch ${processedBatches + 1}`);
if (!schedules.length) {
this.logger.log('No more due schedules to process');
break;
}
for (const schedule of schedules) {
this.logger.debug(`Enqueueing schedule ${schedule.id} - juniorId: ${schedule.juniorId}, amount: ${schedule.amount}, nextRunAt: ${schedule.nextRunAt}`);
}
await Promise.all(
schedules.map((schedule) => this.allowanceQueueService.enqueueSchedule(schedule.id, schedule.nextRunAt)),
);
@ -51,6 +62,8 @@ export class AllowanceScheduleCron {
break;
}
}
this.logger.log(`Allowance cron job completed - processed ${processedCount} schedules`);
await this.cronRunService.success(cronRun.id, processedCount);
} catch (error) {
const stack = error instanceof Error ? error.stack : undefined;