mirror of
https://github.com/SyncrowIOT/backend.git
synced 2025-11-26 07:34:54 +00:00
feat: implement scheduler for periodic data updates and optimize database procedures
- Added SchedulerModule and SchedulerService to handle hourly data updates for AQI, occupancy, and energy consumption. - Refactored existing services to remove unused device repository dependencies and streamline procedure execution. - Updated SQL procedures to use correct parameter indexing. - Enhanced error handling and logging for scheduled tasks. - Integrated new repositories for presence sensor and AQI pollutant stats across multiple modules. - Added NestJS schedule package for task scheduling capabilities.
This commit is contained in:
@ -126,7 +126,7 @@ import { VisitorPasswordEntity } from '../modules/visitor-password/entities';
|
||||
extra: {
|
||||
charset: 'utf8mb4',
|
||||
max: 100, // set pool max size
|
||||
idleTimeoutMillis: 5000, // close idle clients after 5 second
|
||||
idleTimeoutMillis: 3000, // close idle clients after 5 second
|
||||
connectionTimeoutMillis: 12_000, // return an error after 11 second if connection could not be established
|
||||
maxUses: 7500, // close (and replace) a connection after it has been used 7500 times (see below for discussion)
|
||||
},
|
||||
|
||||
@ -1,44 +1,63 @@
|
||||
import { DeviceRepository } from '@app/common/modules/device/repositories';
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { SqlLoaderService } from './sql-loader.service';
|
||||
import { DataSource } from 'typeorm';
|
||||
import { SQL_PROCEDURES_PATH } from '@app/common/constants/sql-query-path';
|
||||
import { SqlLoaderService } from './sql-loader.service';
|
||||
|
||||
@Injectable()
|
||||
export class AqiDataService {
|
||||
constructor(
|
||||
private readonly sqlLoader: SqlLoaderService,
|
||||
private readonly dataSource: DataSource,
|
||||
private readonly deviceRepository: DeviceRepository,
|
||||
) {}
|
||||
async updateAQISensorHistoricalData(deviceUuid: string): Promise<void> {
|
||||
try {
|
||||
const now = new Date();
|
||||
const dateStr = now.toLocaleDateString('en-CA'); // YYYY-MM-DD
|
||||
const device = await this.deviceRepository.findOne({
|
||||
where: { uuid: deviceUuid },
|
||||
relations: ['spaceDevice'],
|
||||
});
|
||||
|
||||
await this.executeProcedure(
|
||||
'fact_daily_space_aqi',
|
||||
'proceduce_update_daily_space_aqi',
|
||||
[dateStr, device.spaceDevice?.uuid],
|
||||
);
|
||||
async updateAQISensorHistoricalData(): Promise<void> {
|
||||
try {
|
||||
const { dateStr } = this.getFormattedDates();
|
||||
|
||||
// Execute all procedures in parallel
|
||||
await Promise.all([
|
||||
this.executeProcedureWithRetry(
|
||||
'proceduce_update_daily_space_aqi',
|
||||
[dateStr],
|
||||
'fact_daily_space_aqi',
|
||||
),
|
||||
]);
|
||||
} catch (err) {
|
||||
console.error('Failed to insert or update aqi data:', err);
|
||||
console.error('Failed to update AQI sensor historical data:', err);
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
private async executeProcedure(
|
||||
procedureFolderName: string,
|
||||
private getFormattedDates(): { dateStr: string } {
|
||||
const now = new Date();
|
||||
return {
|
||||
dateStr: now.toLocaleDateString('en-CA'), // YYYY-MM-DD
|
||||
};
|
||||
}
|
||||
private async executeProcedureWithRetry(
|
||||
procedureFileName: string,
|
||||
params: (string | number | null)[],
|
||||
folderName: string,
|
||||
retries = 3,
|
||||
): Promise<void> {
|
||||
const query = this.loadQuery(procedureFolderName, procedureFileName);
|
||||
await this.dataSource.query(query, params);
|
||||
console.log(`Procedure ${procedureFileName} executed successfully.`);
|
||||
try {
|
||||
const query = this.loadQuery(folderName, procedureFileName);
|
||||
await this.dataSource.query(query, params);
|
||||
console.log(`Procedure ${procedureFileName} executed successfully.`);
|
||||
} catch (err) {
|
||||
if (retries > 0) {
|
||||
const delayMs = 1000 * (4 - retries); // Exponential backoff
|
||||
console.warn(`Retrying ${procedureFileName} (${retries} retries left)`);
|
||||
await new Promise((resolve) => setTimeout(resolve, delayMs));
|
||||
return this.executeProcedureWithRetry(
|
||||
procedureFileName,
|
||||
params,
|
||||
folderName,
|
||||
retries - 1,
|
||||
);
|
||||
}
|
||||
console.error(`Failed to execute ${procedureFileName}:`, err);
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
private loadQuery(folderName: string, fileName: string): string {
|
||||
|
||||
@ -1,65 +1,68 @@
|
||||
import { DeviceRepository } from '@app/common/modules/device/repositories';
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { SqlLoaderService } from './sql-loader.service';
|
||||
import { DataSource } from 'typeorm';
|
||||
import { SQL_PROCEDURES_PATH } from '@app/common/constants/sql-query-path';
|
||||
import { SqlLoaderService } from './sql-loader.service';
|
||||
|
||||
@Injectable()
|
||||
export class OccupancyService {
|
||||
constructor(
|
||||
private readonly sqlLoader: SqlLoaderService,
|
||||
private readonly dataSource: DataSource,
|
||||
private readonly deviceRepository: DeviceRepository,
|
||||
) {}
|
||||
async updateOccupancySensorHistoricalDurationData(
|
||||
deviceUuid: string,
|
||||
): Promise<void> {
|
||||
try {
|
||||
const now = new Date();
|
||||
const dateStr = now.toLocaleDateString('en-CA'); // YYYY-MM-DD
|
||||
const device = await this.deviceRepository.findOne({
|
||||
where: { uuid: deviceUuid },
|
||||
relations: ['spaceDevice'],
|
||||
});
|
||||
|
||||
await this.executeProcedure(
|
||||
'fact_daily_space_occupancy_duration',
|
||||
'procedure_update_daily_space_occupancy_duration',
|
||||
[dateStr, device.spaceDevice?.uuid],
|
||||
);
|
||||
async updateOccupancyDataProcedures(): Promise<void> {
|
||||
try {
|
||||
const { dateStr } = this.getFormattedDates();
|
||||
|
||||
// Execute all procedures in parallel
|
||||
await Promise.all([
|
||||
this.executeProcedureWithRetry(
|
||||
'procedure_update_fact_space_occupancy',
|
||||
[dateStr],
|
||||
'fact_space_occupancy_count',
|
||||
),
|
||||
this.executeProcedureWithRetry(
|
||||
'procedure_update_daily_space_occupancy_duration',
|
||||
[dateStr],
|
||||
'fact_daily_space_occupancy_duration',
|
||||
),
|
||||
]);
|
||||
} catch (err) {
|
||||
console.error('Failed to insert or update occupancy duration data:', err);
|
||||
console.error('Failed to update occupancy data:', err);
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
async updateOccupancySensorHistoricalData(deviceUuid: string): Promise<void> {
|
||||
try {
|
||||
const now = new Date();
|
||||
const dateStr = now.toLocaleDateString('en-CA'); // YYYY-MM-DD
|
||||
const device = await this.deviceRepository.findOne({
|
||||
where: { uuid: deviceUuid },
|
||||
relations: ['spaceDevice'],
|
||||
});
|
||||
|
||||
await this.executeProcedure(
|
||||
'fact_space_occupancy_count',
|
||||
'procedure_update_fact_space_occupancy',
|
||||
[dateStr, device.spaceDevice?.uuid],
|
||||
);
|
||||
} catch (err) {
|
||||
console.error('Failed to insert or update occupancy data:', err);
|
||||
throw err;
|
||||
}
|
||||
private getFormattedDates(): { dateStr: string } {
|
||||
const now = new Date();
|
||||
return {
|
||||
dateStr: now.toLocaleDateString('en-CA'), // YYYY-MM-DD
|
||||
};
|
||||
}
|
||||
|
||||
private async executeProcedure(
|
||||
procedureFolderName: string,
|
||||
private async executeProcedureWithRetry(
|
||||
procedureFileName: string,
|
||||
params: (string | number | null)[],
|
||||
folderName: string,
|
||||
retries = 3,
|
||||
): Promise<void> {
|
||||
const query = this.loadQuery(procedureFolderName, procedureFileName);
|
||||
await this.dataSource.query(query, params);
|
||||
console.log(`Procedure ${procedureFileName} executed successfully.`);
|
||||
try {
|
||||
const query = this.loadQuery(folderName, procedureFileName);
|
||||
await this.dataSource.query(query, params);
|
||||
console.log(`Procedure ${procedureFileName} executed successfully.`);
|
||||
} catch (err) {
|
||||
if (retries > 0) {
|
||||
const delayMs = 1000 * (4 - retries); // Exponential backoff
|
||||
console.warn(`Retrying ${procedureFileName} (${retries} retries left)`);
|
||||
await new Promise((resolve) => setTimeout(resolve, delayMs));
|
||||
return this.executeProcedureWithRetry(
|
||||
procedureFileName,
|
||||
params,
|
||||
folderName,
|
||||
retries - 1,
|
||||
);
|
||||
}
|
||||
console.error(`Failed to execute ${procedureFileName}:`, err);
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
private loadQuery(folderName: string, fileName: string): string {
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { SqlLoaderService } from './sql-loader.service';
|
||||
import { DataSource } from 'typeorm';
|
||||
import { SQL_PROCEDURES_PATH } from '@app/common/constants/sql-query-path';
|
||||
import { SqlLoaderService } from './sql-loader.service';
|
||||
|
||||
@Injectable()
|
||||
export class PowerClampService {
|
||||
@ -10,48 +10,72 @@ export class PowerClampService {
|
||||
private readonly dataSource: DataSource,
|
||||
) {}
|
||||
|
||||
async updateEnergyConsumedHistoricalData(deviceUuid: string): Promise<void> {
|
||||
async updateEnergyConsumedHistoricalData(): Promise<void> {
|
||||
try {
|
||||
const now = new Date();
|
||||
const dateStr = now.toLocaleDateString('en-CA'); // YYYY-MM-DD
|
||||
const hour = now.getHours();
|
||||
const monthYear = now
|
||||
.toLocaleDateString('en-US', {
|
||||
month: '2-digit',
|
||||
year: 'numeric',
|
||||
})
|
||||
.replace('/', '-'); // MM-YYYY
|
||||
const { dateStr, monthYear } = this.getFormattedDates();
|
||||
|
||||
await this.executeProcedure(
|
||||
'fact_hourly_device_energy_consumed_procedure',
|
||||
[deviceUuid, dateStr, hour],
|
||||
);
|
||||
|
||||
await this.executeProcedure(
|
||||
'fact_daily_device_energy_consumed_procedure',
|
||||
[deviceUuid, dateStr],
|
||||
);
|
||||
|
||||
await this.executeProcedure(
|
||||
'fact_monthly_device_energy_consumed_procedure',
|
||||
[deviceUuid, monthYear],
|
||||
);
|
||||
// Execute all procedures in parallel
|
||||
await Promise.all([
|
||||
this.executeProcedureWithRetry(
|
||||
'fact_hourly_device_energy_consumed_procedure',
|
||||
[dateStr],
|
||||
'fact_device_energy_consumed',
|
||||
),
|
||||
this.executeProcedureWithRetry(
|
||||
'fact_daily_device_energy_consumed_procedure',
|
||||
[dateStr],
|
||||
'fact_device_energy_consumed',
|
||||
),
|
||||
this.executeProcedureWithRetry(
|
||||
'fact_monthly_device_energy_consumed_procedure',
|
||||
[monthYear],
|
||||
'fact_device_energy_consumed',
|
||||
),
|
||||
]);
|
||||
} catch (err) {
|
||||
console.error('Failed to insert or update energy data:', err);
|
||||
console.error('Failed to update energy consumption data:', err);
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
private async executeProcedure(
|
||||
private getFormattedDates(): { dateStr: string; monthYear: string } {
|
||||
const now = new Date();
|
||||
return {
|
||||
dateStr: now.toLocaleDateString('en-CA'), // YYYY-MM-DD
|
||||
monthYear: now
|
||||
.toLocaleDateString('en-US', {
|
||||
month: '2-digit',
|
||||
year: 'numeric',
|
||||
})
|
||||
.replace('/', '-'), // MM-YYYY
|
||||
};
|
||||
}
|
||||
|
||||
private async executeProcedureWithRetry(
|
||||
procedureFileName: string,
|
||||
params: (string | number | null)[],
|
||||
folderName: string,
|
||||
retries = 3,
|
||||
): Promise<void> {
|
||||
const query = this.loadQuery(
|
||||
'fact_device_energy_consumed',
|
||||
procedureFileName,
|
||||
);
|
||||
await this.dataSource.query(query, params);
|
||||
console.log(`Procedure ${procedureFileName} executed successfully.`);
|
||||
try {
|
||||
const query = this.loadQuery(folderName, procedureFileName);
|
||||
await this.dataSource.query(query, params);
|
||||
console.log(`Procedure ${procedureFileName} executed successfully.`);
|
||||
} catch (err) {
|
||||
if (retries > 0) {
|
||||
const delayMs = 1000 * (4 - retries); // Exponential backoff
|
||||
console.warn(`Retrying ${procedureFileName} (${retries} retries left)`);
|
||||
await new Promise((resolve) => setTimeout(resolve, delayMs));
|
||||
return this.executeProcedureWithRetry(
|
||||
procedureFileName,
|
||||
params,
|
||||
folderName,
|
||||
retries - 1,
|
||||
);
|
||||
}
|
||||
console.error(`Failed to execute ${procedureFileName}:`, err);
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
private loadQuery(folderName: string, fileName: string): string {
|
||||
|
||||
@ -23,6 +23,13 @@ export class SosHandlerService {
|
||||
status: [{ code: 'sos', value: true }],
|
||||
log: logData,
|
||||
});
|
||||
await this.deviceStatusFirebaseService.addBatchDeviceStatusToOurDb([
|
||||
{
|
||||
deviceTuyaUuid: devId,
|
||||
status: [{ code: 'sos', value: true }],
|
||||
log: logData,
|
||||
},
|
||||
]);
|
||||
|
||||
setTimeout(async () => {
|
||||
try {
|
||||
@ -31,30 +38,13 @@ export class SosHandlerService {
|
||||
status: [{ code: 'sos', value: false }],
|
||||
log: logData,
|
||||
});
|
||||
} catch (err) {
|
||||
this.logger.error('Failed to send SOS false value', err);
|
||||
}
|
||||
}, 2000);
|
||||
} catch (err) {
|
||||
this.logger.error('Failed to send SOS true value', err);
|
||||
}
|
||||
}
|
||||
|
||||
async handleSosEventOurDb(devId: string, logData: any): Promise<void> {
|
||||
try {
|
||||
await this.deviceStatusFirebaseService.addDeviceStatusToOurDb({
|
||||
deviceTuyaUuid: devId,
|
||||
status: [{ code: 'sos', value: true }],
|
||||
log: logData,
|
||||
});
|
||||
|
||||
setTimeout(async () => {
|
||||
try {
|
||||
await this.deviceStatusFirebaseService.addDeviceStatusToOurDb({
|
||||
deviceTuyaUuid: devId,
|
||||
status: [{ code: 'sos', value: false }],
|
||||
log: logData,
|
||||
});
|
||||
await this.deviceStatusFirebaseService.addBatchDeviceStatusToOurDb([
|
||||
{
|
||||
deviceTuyaUuid: devId,
|
||||
status: [{ code: 'sos', value: false }],
|
||||
log: logData,
|
||||
},
|
||||
]);
|
||||
} catch (err) {
|
||||
this.logger.error('Failed to send SOS false value', err);
|
||||
}
|
||||
|
||||
@ -108,7 +108,7 @@ export class TuyaWebSocketService {
|
||||
|
||||
try {
|
||||
await this.deviceStatusFirebaseService.addBatchDeviceStatusToOurDb(
|
||||
batch.map((item) => ({
|
||||
batch?.map((item) => ({
|
||||
deviceTuyaUuid: item.devId,
|
||||
status: item.status,
|
||||
log: item.logData,
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
WITH params AS (
|
||||
SELECT
|
||||
$2::date AS target_date
|
||||
$1::date AS target_date
|
||||
),
|
||||
total_energy AS (
|
||||
SELECT
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
WITH params AS (
|
||||
SELECT
|
||||
$2::date AS target_date
|
||||
$1::date AS target_date
|
||||
),
|
||||
total_energy AS (
|
||||
SELECT
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
WITH params AS (
|
||||
SELECT
|
||||
$2::text AS target_month -- Format should match 'MM-YYYY'
|
||||
$1::text AS target_month -- Format should match 'MM-YYYY'
|
||||
),
|
||||
total_energy AS (
|
||||
SELECT
|
||||
|
||||
Reference in New Issue
Block a user