Merge pull request #434 from SyncrowIOT/fix-time-out-connections-db

Fix-time-out-connections-db
This commit is contained in:
faljawhary
2025-06-25 04:47:59 -06:00
committed by GitHub
26 changed files with 462 additions and 254 deletions

View File

@ -126,7 +126,7 @@ import { VisitorPasswordEntity } from '../modules/visitor-password/entities';
extra: { extra: {
charset: 'utf8mb4', charset: 'utf8mb4',
max: 100, // set pool max size max: 100, // set pool max size
idleTimeoutMillis: 5000, // close idle clients after 5 second idleTimeoutMillis: 3000, // close idle clients after 5 second
connectionTimeoutMillis: 12_000, // return an error after 11 second if connection could not be established connectionTimeoutMillis: 12_000, // return an error after 11 second if connection could not be established
maxUses: 7500, // close (and replace) a connection after it has been used 7500 times (see below for discussion) maxUses: 7500, // close (and replace) a connection after it has been used 7500 times (see below for discussion)
}, },

View File

@ -3,28 +3,12 @@ import { DeviceStatusFirebaseController } from './controllers/devices-status.con
import { DeviceStatusFirebaseService } from './services/devices-status.service'; import { DeviceStatusFirebaseService } from './services/devices-status.service';
import { DeviceRepository } from '@app/common/modules/device/repositories'; import { DeviceRepository } from '@app/common/modules/device/repositories';
import { DeviceStatusLogRepository } from '@app/common/modules/device-status-log/repositories/device-status.repository'; import { DeviceStatusLogRepository } from '@app/common/modules/device-status-log/repositories/device-status.repository';
import { PowerClampService } from '@app/common/helper/services/power.clamp.service';
import {
PowerClampHourlyRepository,
PowerClampDailyRepository,
PowerClampMonthlyRepository,
} from '@app/common/modules/power-clamp/repositories';
import { SqlLoaderService } from '@app/common/helper/services/sql-loader.service';
import { OccupancyService } from '@app/common/helper/services/occupancy.service';
import { AqiDataService } from '@app/common/helper/services/aqi.data.service';
@Module({ @Module({
providers: [ providers: [
DeviceStatusFirebaseService, DeviceStatusFirebaseService,
DeviceRepository, DeviceRepository,
DeviceStatusLogRepository, DeviceStatusLogRepository,
PowerClampService,
PowerClampHourlyRepository,
PowerClampDailyRepository,
PowerClampMonthlyRepository,
SqlLoaderService,
OccupancyService,
AqiDataService,
], ],
controllers: [DeviceStatusFirebaseController], controllers: [DeviceStatusFirebaseController],
exports: [DeviceStatusFirebaseService, DeviceStatusLogRepository], exports: [DeviceStatusFirebaseService, DeviceStatusLogRepository],

View File

@ -18,12 +18,6 @@ import {
runTransaction, runTransaction,
} from 'firebase/database'; } from 'firebase/database';
import { DeviceStatusLogRepository } from '@app/common/modules/device-status-log/repositories'; import { DeviceStatusLogRepository } from '@app/common/modules/device-status-log/repositories';
import { ProductType } from '@app/common/constants/product-type.enum';
import { PowerClampService } from '@app/common/helper/services/power.clamp.service';
import { PowerClampEnergyEnum } from '@app/common/constants/power.clamp.enargy.enum';
import { PresenceSensorEnum } from '@app/common/constants/presence.sensor.enum';
import { OccupancyService } from '@app/common/helper/services/occupancy.service';
import { AqiDataService } from '@app/common/helper/services/aqi.data.service';
@Injectable() @Injectable()
export class DeviceStatusFirebaseService { export class DeviceStatusFirebaseService {
private tuya: TuyaContext; private tuya: TuyaContext;
@ -31,9 +25,6 @@ export class DeviceStatusFirebaseService {
constructor( constructor(
private readonly configService: ConfigService, private readonly configService: ConfigService,
private readonly deviceRepository: DeviceRepository, private readonly deviceRepository: DeviceRepository,
private readonly powerClampService: PowerClampService,
private readonly occupancyService: OccupancyService,
private readonly aqiDataService: AqiDataService,
private deviceStatusLogRepository: DeviceStatusLogRepository, private deviceStatusLogRepository: DeviceStatusLogRepository,
) { ) {
const accessKey = this.configService.get<string>('auth-config.ACCESS_KEY'); const accessKey = this.configService.get<string>('auth-config.ACCESS_KEY');
@ -76,28 +67,85 @@ export class DeviceStatusFirebaseService {
); );
} }
} }
async addDeviceStatusToOurDb( async addBatchDeviceStatusToOurDb(
addDeviceStatusDto: AddDeviceStatusDto, batch: { deviceTuyaUuid: string; status: any; log: any }[],
): Promise<AddDeviceStatusDto | null> { ): Promise<void> {
try { const allLogs = [];
const device = await this.getDeviceByDeviceTuyaUuid( const deviceMap = new Map<string, any>();
addDeviceStatusDto.deviceTuyaUuid,
console.log(
`🧠 Starting device lookups for batch of ${batch.length} items...`,
); );
if (device?.uuid) { // Step 1: Parallel device fetching
return await this.createDeviceStatusInOurDb({ await Promise.all(
deviceUuid: device.uuid, batch.map(async (item) => {
...addDeviceStatusDto, if (!deviceMap.has(item.deviceTuyaUuid)) {
productType: device.productDevice.prodType, const device = await this.getDeviceByDeviceTuyaUuid(
}); item.deviceTuyaUuid,
);
device?.uuid && deviceMap.set(item.deviceTuyaUuid, device);
} }
// Return null if device not found or no UUID }),
return null; );
console.log(`🔍 Found ${deviceMap.size} devices from batch`);
// Step 2: Prepare logs and updates
for (const item of batch) {
const device = deviceMap.get(item.deviceTuyaUuid);
if (!device?.uuid) continue;
const logs = item.log.properties.map((property) =>
this.deviceStatusLogRepository.create({
deviceId: device.uuid,
deviceTuyaId: item.deviceTuyaUuid,
productId: item.log.productId,
log: item.log,
code: property.code,
value: property.value,
eventId: item.log.dataId,
eventTime: new Date(property.time).toISOString(),
}),
);
allLogs.push(...logs);
}
console.log(`📝 Total logs to insert: ${allLogs.length}`);
// Step 3: Insert logs in chunks with ON CONFLICT DO NOTHING
const insertLogsPromise = (async () => {
const chunkSize = 300;
let insertedCount = 0;
for (let i = 0; i < allLogs.length; i += chunkSize) {
const chunk = allLogs.slice(i, i + chunkSize);
try {
const result = await this.deviceStatusLogRepository
.createQueryBuilder()
.insert()
.into('device-status-log') // or use DeviceStatusLogEntity
.values(chunk)
.orIgnore() // skip duplicates
.execute();
insertedCount += result.identifiers.length;
console.log(
`✅ Inserted ${result.identifiers.length} / ${chunk.length} logs (chunk)`,
);
} catch (error) { } catch (error) {
// Handle the error silently, perhaps log it internally or ignore it console.error('❌ Insert error (skipped chunk):', error.message);
return null;
} }
} }
console.log(
`✅ Total logs inserted: ${insertedCount} / ${allLogs.length}`,
);
})();
// Step 5: Wait for both insert and post-processing to finish
await Promise.all([insertLogsPromise]);
}
async addDeviceStatusToFirebase( async addDeviceStatusToFirebase(
addDeviceStatusDto: AddDeviceStatusDto, addDeviceStatusDto: AddDeviceStatusDto,
): Promise<AddDeviceStatusDto | null> { ): Promise<AddDeviceStatusDto | null> {
@ -237,66 +285,4 @@ export class DeviceStatusFirebaseService {
const snapshot: DataSnapshot = await get(dataRef); const snapshot: DataSnapshot = await get(dataRef);
return snapshot.val(); return snapshot.val();
} }
async createDeviceStatusInOurDb(
addDeviceStatusDto: AddDeviceStatusDto,
): Promise<any> {
// Save logs to your repository
const newLogs = addDeviceStatusDto.log.properties.map((property) => {
return this.deviceStatusLogRepository.create({
deviceId: addDeviceStatusDto.deviceUuid,
deviceTuyaId: addDeviceStatusDto.deviceTuyaUuid,
productId: addDeviceStatusDto.log.productId,
log: addDeviceStatusDto.log,
code: property.code,
value: property.value,
eventId: addDeviceStatusDto.log.dataId,
eventTime: new Date(property.time).toISOString(),
});
});
await this.deviceStatusLogRepository.save(newLogs);
if (addDeviceStatusDto.productType === ProductType.PC) {
const energyCodes = new Set([
PowerClampEnergyEnum.ENERGY_CONSUMED,
PowerClampEnergyEnum.ENERGY_CONSUMED_A,
PowerClampEnergyEnum.ENERGY_CONSUMED_B,
PowerClampEnergyEnum.ENERGY_CONSUMED_C,
]);
const energyStatus = addDeviceStatusDto?.log?.properties?.find((status) =>
energyCodes.has(status.code),
);
if (energyStatus) {
await this.powerClampService.updateEnergyConsumedHistoricalData(
addDeviceStatusDto.deviceUuid,
);
}
}
if (
addDeviceStatusDto.productType === ProductType.CPS ||
addDeviceStatusDto.productType === ProductType.WPS
) {
const occupancyCodes = new Set([PresenceSensorEnum.PRESENCE_STATE]);
const occupancyStatus = addDeviceStatusDto?.log?.properties?.find(
(status) => occupancyCodes.has(status.code),
);
if (occupancyStatus) {
await this.occupancyService.updateOccupancySensorHistoricalData(
addDeviceStatusDto.deviceUuid,
);
await this.occupancyService.updateOccupancySensorHistoricalDurationData(
addDeviceStatusDto.deviceUuid,
);
}
}
if (addDeviceStatusDto.productType === ProductType.AQI) {
await this.aqiDataService.updateAQISensorHistoricalData(
addDeviceStatusDto.deviceUuid,
);
}
}
} }

View File

@ -1,44 +1,63 @@
import { DeviceRepository } from '@app/common/modules/device/repositories';
import { Injectable } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import { SqlLoaderService } from './sql-loader.service';
import { DataSource } from 'typeorm'; import { DataSource } from 'typeorm';
import { SQL_PROCEDURES_PATH } from '@app/common/constants/sql-query-path'; import { SQL_PROCEDURES_PATH } from '@app/common/constants/sql-query-path';
import { SqlLoaderService } from './sql-loader.service';
@Injectable() @Injectable()
export class AqiDataService { export class AqiDataService {
constructor( constructor(
private readonly sqlLoader: SqlLoaderService, private readonly sqlLoader: SqlLoaderService,
private readonly dataSource: DataSource, private readonly dataSource: DataSource,
private readonly deviceRepository: DeviceRepository,
) {} ) {}
async updateAQISensorHistoricalData(deviceUuid: string): Promise<void> {
try {
const now = new Date();
const dateStr = now.toLocaleDateString('en-CA'); // YYYY-MM-DD
const device = await this.deviceRepository.findOne({
where: { uuid: deviceUuid },
relations: ['spaceDevice'],
});
await this.executeProcedure( async updateAQISensorHistoricalData(): Promise<void> {
'fact_daily_space_aqi', try {
const { dateStr } = this.getFormattedDates();
// Execute all procedures in parallel
await Promise.all([
this.executeProcedureWithRetry(
'proceduce_update_daily_space_aqi', 'proceduce_update_daily_space_aqi',
[dateStr, device.spaceDevice?.uuid], [dateStr],
); 'fact_daily_space_aqi',
),
]);
} catch (err) { } catch (err) {
console.error('Failed to insert or update aqi data:', err); console.error('Failed to update AQI sensor historical data:', err);
throw err; throw err;
} }
} }
private getFormattedDates(): { dateStr: string } {
private async executeProcedure( const now = new Date();
procedureFolderName: string, return {
dateStr: now.toLocaleDateString('en-CA'), // YYYY-MM-DD
};
}
private async executeProcedureWithRetry(
procedureFileName: string, procedureFileName: string,
params: (string | number | null)[], params: (string | number | null)[],
folderName: string,
retries = 3,
): Promise<void> { ): Promise<void> {
const query = this.loadQuery(procedureFolderName, procedureFileName); try {
const query = this.loadQuery(folderName, procedureFileName);
await this.dataSource.query(query, params); await this.dataSource.query(query, params);
console.log(`Procedure ${procedureFileName} executed successfully.`); console.log(`Procedure ${procedureFileName} executed successfully.`);
} catch (err) {
if (retries > 0) {
const delayMs = 1000 * (4 - retries); // Exponential backoff
console.warn(`Retrying ${procedureFileName} (${retries} retries left)`);
await new Promise((resolve) => setTimeout(resolve, delayMs));
return this.executeProcedureWithRetry(
procedureFileName,
params,
folderName,
retries - 1,
);
}
console.error(`Failed to execute ${procedureFileName}:`, err);
throw err;
}
} }
private loadQuery(folderName: string, fileName: string): string { private loadQuery(folderName: string, fileName: string): string {

View File

@ -1,65 +1,68 @@
import { DeviceRepository } from '@app/common/modules/device/repositories';
import { Injectable } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import { SqlLoaderService } from './sql-loader.service';
import { DataSource } from 'typeorm'; import { DataSource } from 'typeorm';
import { SQL_PROCEDURES_PATH } from '@app/common/constants/sql-query-path'; import { SQL_PROCEDURES_PATH } from '@app/common/constants/sql-query-path';
import { SqlLoaderService } from './sql-loader.service';
@Injectable() @Injectable()
export class OccupancyService { export class OccupancyService {
constructor( constructor(
private readonly sqlLoader: SqlLoaderService, private readonly sqlLoader: SqlLoaderService,
private readonly dataSource: DataSource, private readonly dataSource: DataSource,
private readonly deviceRepository: DeviceRepository,
) {} ) {}
async updateOccupancySensorHistoricalDurationData(
deviceUuid: string,
): Promise<void> {
try {
const now = new Date();
const dateStr = now.toLocaleDateString('en-CA'); // YYYY-MM-DD
const device = await this.deviceRepository.findOne({
where: { uuid: deviceUuid },
relations: ['spaceDevice'],
});
await this.executeProcedure( async updateOccupancyDataProcedures(): Promise<void> {
'fact_daily_space_occupancy_duration',
'procedure_update_daily_space_occupancy_duration',
[dateStr, device.spaceDevice?.uuid],
);
} catch (err) {
console.error('Failed to insert or update occupancy duration data:', err);
throw err;
}
}
async updateOccupancySensorHistoricalData(deviceUuid: string): Promise<void> {
try { try {
const now = new Date(); const { dateStr } = this.getFormattedDates();
const dateStr = now.toLocaleDateString('en-CA'); // YYYY-MM-DD
const device = await this.deviceRepository.findOne({
where: { uuid: deviceUuid },
relations: ['spaceDevice'],
});
await this.executeProcedure( // Execute all procedures in parallel
'fact_space_occupancy_count', await Promise.all([
this.executeProcedureWithRetry(
'procedure_update_fact_space_occupancy', 'procedure_update_fact_space_occupancy',
[dateStr, device.spaceDevice?.uuid], [dateStr],
); 'fact_space_occupancy_count',
),
this.executeProcedureWithRetry(
'procedure_update_daily_space_occupancy_duration',
[dateStr],
'fact_daily_space_occupancy_duration',
),
]);
} catch (err) { } catch (err) {
console.error('Failed to insert or update occupancy data:', err); console.error('Failed to update occupancy data:', err);
throw err; throw err;
} }
} }
private getFormattedDates(): { dateStr: string } {
private async executeProcedure( const now = new Date();
procedureFolderName: string, return {
dateStr: now.toLocaleDateString('en-CA'), // YYYY-MM-DD
};
}
private async executeProcedureWithRetry(
procedureFileName: string, procedureFileName: string,
params: (string | number | null)[], params: (string | number | null)[],
folderName: string,
retries = 3,
): Promise<void> { ): Promise<void> {
const query = this.loadQuery(procedureFolderName, procedureFileName); try {
const query = this.loadQuery(folderName, procedureFileName);
await this.dataSource.query(query, params); await this.dataSource.query(query, params);
console.log(`Procedure ${procedureFileName} executed successfully.`); console.log(`Procedure ${procedureFileName} executed successfully.`);
} catch (err) {
if (retries > 0) {
const delayMs = 1000 * (4 - retries); // Exponential backoff
console.warn(`Retrying ${procedureFileName} (${retries} retries left)`);
await new Promise((resolve) => setTimeout(resolve, delayMs));
return this.executeProcedureWithRetry(
procedureFileName,
params,
folderName,
retries - 1,
);
}
console.error(`Failed to execute ${procedureFileName}:`, err);
throw err;
}
} }
private loadQuery(folderName: string, fileName: string): string { private loadQuery(folderName: string, fileName: string): string {

View File

@ -1,7 +1,7 @@
import { Injectable } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import { SqlLoaderService } from './sql-loader.service';
import { DataSource } from 'typeorm'; import { DataSource } from 'typeorm';
import { SQL_PROCEDURES_PATH } from '@app/common/constants/sql-query-path'; import { SQL_PROCEDURES_PATH } from '@app/common/constants/sql-query-path';
import { SqlLoaderService } from './sql-loader.service';
@Injectable() @Injectable()
export class PowerClampService { export class PowerClampService {
@ -10,48 +10,72 @@ export class PowerClampService {
private readonly dataSource: DataSource, private readonly dataSource: DataSource,
) {} ) {}
async updateEnergyConsumedHistoricalData(deviceUuid: string): Promise<void> { async updateEnergyConsumedHistoricalData(): Promise<void> {
try { try {
const now = new Date(); const { dateStr, monthYear } = this.getFormattedDates();
const dateStr = now.toLocaleDateString('en-CA'); // YYYY-MM-DD
const hour = now.getHours();
const monthYear = now
.toLocaleDateString('en-US', {
month: '2-digit',
year: 'numeric',
})
.replace('/', '-'); // MM-YYYY
await this.executeProcedure( // Execute all procedures in parallel
await Promise.all([
this.executeProcedureWithRetry(
'fact_hourly_device_energy_consumed_procedure', 'fact_hourly_device_energy_consumed_procedure',
[deviceUuid, dateStr, hour], [dateStr],
); 'fact_device_energy_consumed',
),
await this.executeProcedure( this.executeProcedureWithRetry(
'fact_daily_device_energy_consumed_procedure', 'fact_daily_device_energy_consumed_procedure',
[deviceUuid, dateStr], [dateStr],
); 'fact_device_energy_consumed',
),
await this.executeProcedure( this.executeProcedureWithRetry(
'fact_monthly_device_energy_consumed_procedure', 'fact_monthly_device_energy_consumed_procedure',
[deviceUuid, monthYear], [monthYear],
); 'fact_device_energy_consumed',
),
]);
} catch (err) { } catch (err) {
console.error('Failed to insert or update energy data:', err); console.error('Failed to update energy consumption data:', err);
throw err; throw err;
} }
} }
private async executeProcedure( private getFormattedDates(): { dateStr: string; monthYear: string } {
const now = new Date();
return {
dateStr: now.toLocaleDateString('en-CA'), // YYYY-MM-DD
monthYear: now
.toLocaleDateString('en-US', {
month: '2-digit',
year: 'numeric',
})
.replace('/', '-'), // MM-YYYY
};
}
private async executeProcedureWithRetry(
procedureFileName: string, procedureFileName: string,
params: (string | number | null)[], params: (string | number | null)[],
folderName: string,
retries = 3,
): Promise<void> { ): Promise<void> {
const query = this.loadQuery( try {
'fact_device_energy_consumed', const query = this.loadQuery(folderName, procedureFileName);
procedureFileName,
);
await this.dataSource.query(query, params); await this.dataSource.query(query, params);
console.log(`Procedure ${procedureFileName} executed successfully.`); console.log(`Procedure ${procedureFileName} executed successfully.`);
} catch (err) {
if (retries > 0) {
const delayMs = 1000 * (4 - retries); // Exponential backoff
console.warn(`Retrying ${procedureFileName} (${retries} retries left)`);
await new Promise((resolve) => setTimeout(resolve, delayMs));
return this.executeProcedureWithRetry(
procedureFileName,
params,
folderName,
retries - 1,
);
}
console.error(`Failed to execute ${procedureFileName}:`, err);
throw err;
}
} }
private loadQuery(folderName: string, fileName: string): string { private loadQuery(folderName: string, fileName: string): string {

View File

@ -23,6 +23,13 @@ export class SosHandlerService {
status: [{ code: 'sos', value: true }], status: [{ code: 'sos', value: true }],
log: logData, log: logData,
}); });
await this.deviceStatusFirebaseService.addBatchDeviceStatusToOurDb([
{
deviceTuyaUuid: devId,
status: [{ code: 'sos', value: true }],
log: logData,
},
]);
setTimeout(async () => { setTimeout(async () => {
try { try {
@ -31,30 +38,13 @@ export class SosHandlerService {
status: [{ code: 'sos', value: false }], status: [{ code: 'sos', value: false }],
log: logData, log: logData,
}); });
} catch (err) { await this.deviceStatusFirebaseService.addBatchDeviceStatusToOurDb([
this.logger.error('Failed to send SOS false value', err); {
}
}, 2000);
} catch (err) {
this.logger.error('Failed to send SOS true value', err);
}
}
async handleSosEventOurDb(devId: string, logData: any): Promise<void> {
try {
await this.deviceStatusFirebaseService.addDeviceStatusToOurDb({
deviceTuyaUuid: devId,
status: [{ code: 'sos', value: true }],
log: logData,
});
setTimeout(async () => {
try {
await this.deviceStatusFirebaseService.addDeviceStatusToOurDb({
deviceTuyaUuid: devId, deviceTuyaUuid: devId,
status: [{ code: 'sos', value: false }], status: [{ code: 'sos', value: false }],
log: logData, log: logData,
}); },
]);
} catch (err) { } catch (err) {
this.logger.error('Failed to send SOS false value', err); this.logger.error('Failed to send SOS false value', err);
} }

View File

@ -38,8 +38,8 @@ export class TuyaWebSocketService {
this.client.start(); this.client.start();
} }
// Trigger the queue processor every 2 seconds // Trigger the queue processor every 15 seconds
setInterval(() => this.processQueue(), 10000); setInterval(() => this.processQueue(), 15000);
} }
private setupEventHandlers() { private setupEventHandlers() {
@ -93,32 +93,30 @@ export class TuyaWebSocketService {
}); });
} }
private async processQueue() { private async processQueue() {
if (this.isProcessing || this.messageQueue.length === 0) return; if (this.isProcessing) {
console.log('⏳ Skipping: still processing previous batch');
return;
}
if (this.messageQueue.length === 0) return;
this.isProcessing = true; this.isProcessing = true;
const batch = [...this.messageQueue]; const batch = [...this.messageQueue];
this.messageQueue = []; this.messageQueue = [];
console.log(`🔁 Processing batch of size: ${batch.length}`);
try { try {
for (const item of batch) { await this.deviceStatusFirebaseService.addBatchDeviceStatusToOurDb(
if (this.sosHandlerService.isSosTriggered(item.status)) { batch?.map((item) => ({
await this.sosHandlerService.handleSosEventOurDb(
item.devId,
item.logData,
);
} else {
await this.deviceStatusFirebaseService.addDeviceStatusToOurDb({
deviceTuyaUuid: item.devId, deviceTuyaUuid: item.devId,
status: item.status, status: item.status,
log: item.logData, log: item.logData,
}); })),
} );
}
} catch (error) { } catch (error) {
console.error('Error processing batch:', error); console.error('Error processing batch:', error);
// Re-add the batch to the queue for retry this.messageQueue.unshift(...batch); // retry
this.messageQueue.unshift(...batch);
} finally { } finally {
this.isProcessing = false; this.isProcessing = false;
} }

View File

@ -1,6 +1,6 @@
WITH params AS ( WITH params AS (
SELECT SELECT
$2::date AS target_date $1::date AS target_date
), ),
total_energy AS ( total_energy AS (
SELECT SELECT

View File

@ -1,6 +1,6 @@
WITH params AS ( WITH params AS (
SELECT SELECT
$2::date AS target_date $1::date AS target_date
), ),
total_energy AS ( total_energy AS (
SELECT SELECT

View File

@ -1,6 +1,6 @@
WITH params AS ( WITH params AS (
SELECT SELECT
$2::text AS target_month -- Format should match 'MM-YYYY' $1::text AS target_month -- Format should match 'MM-YYYY'
), ),
total_energy AS ( total_energy AS (
SELECT SELECT

42
package-lock.json generated
View File

@ -18,6 +18,7 @@
"@nestjs/jwt": "^10.2.0", "@nestjs/jwt": "^10.2.0",
"@nestjs/passport": "^10.0.3", "@nestjs/passport": "^10.0.3",
"@nestjs/platform-express": "^10.0.0", "@nestjs/platform-express": "^10.0.0",
"@nestjs/schedule": "^6.0.0",
"@nestjs/swagger": "^7.3.0", "@nestjs/swagger": "^7.3.0",
"@nestjs/terminus": "^11.0.0", "@nestjs/terminus": "^11.0.0",
"@nestjs/throttler": "^6.4.0", "@nestjs/throttler": "^6.4.0",
@ -2538,6 +2539,19 @@
"@nestjs/core": "^10.0.0" "@nestjs/core": "^10.0.0"
} }
}, },
"node_modules/@nestjs/schedule": {
"version": "6.0.0",
"resolved": "https://registry.npmjs.org/@nestjs/schedule/-/schedule-6.0.0.tgz",
"integrity": "sha512-aQySMw6tw2nhitELXd3EiRacQRgzUKD9mFcUZVOJ7jPLqIBvXOyvRWLsK9SdurGA+jjziAlMef7iB5ZEFFoQpw==",
"license": "MIT",
"dependencies": {
"cron": "4.3.0"
},
"peerDependencies": {
"@nestjs/common": "^10.0.0 || ^11.0.0",
"@nestjs/core": "^10.0.0 || ^11.0.0"
}
},
"node_modules/@nestjs/schematics": { "node_modules/@nestjs/schematics": {
"version": "10.2.3", "version": "10.2.3",
"resolved": "https://registry.npmjs.org/@nestjs/schematics/-/schematics-10.2.3.tgz", "resolved": "https://registry.npmjs.org/@nestjs/schematics/-/schematics-10.2.3.tgz",
@ -3215,6 +3229,12 @@
"@types/node": "*" "@types/node": "*"
} }
}, },
"node_modules/@types/luxon": {
"version": "3.6.2",
"resolved": "https://registry.npmjs.org/@types/luxon/-/luxon-3.6.2.tgz",
"integrity": "sha512-R/BdP7OxEMc44l2Ex5lSXHoIXTB2JLNa3y2QISIbr58U/YcsffyQrYW//hZSdrfxrjRZj3GcUoxMPGdO8gSYuw==",
"license": "MIT"
},
"node_modules/@types/methods": { "node_modules/@types/methods": {
"version": "1.1.4", "version": "1.1.4",
"resolved": "https://registry.npmjs.org/@types/methods/-/methods-1.1.4.tgz", "resolved": "https://registry.npmjs.org/@types/methods/-/methods-1.1.4.tgz",
@ -5426,6 +5446,19 @@
"devOptional": true, "devOptional": true,
"license": "MIT" "license": "MIT"
}, },
"node_modules/cron": {
"version": "4.3.0",
"resolved": "https://registry.npmjs.org/cron/-/cron-4.3.0.tgz",
"integrity": "sha512-ciiYNLfSlF9MrDqnbMdRWFiA6oizSF7kA1osPP9lRzNu0Uu+AWog1UKy7SkckiDY2irrNjeO6qLyKnXC8oxmrw==",
"license": "MIT",
"dependencies": {
"@types/luxon": "~3.6.0",
"luxon": "~3.6.0"
},
"engines": {
"node": ">=18.x"
}
},
"node_modules/cross-spawn": { "node_modules/cross-spawn": {
"version": "7.0.6", "version": "7.0.6",
"resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.6.tgz", "resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.6.tgz",
@ -9777,6 +9810,15 @@
"yallist": "^3.0.2" "yallist": "^3.0.2"
} }
}, },
"node_modules/luxon": {
"version": "3.6.1",
"resolved": "https://registry.npmjs.org/luxon/-/luxon-3.6.1.tgz",
"integrity": "sha512-tJLxrKJhO2ukZ5z0gyjY1zPh3Rh88Ej9P7jNrZiHMUXHae1yvI2imgOZtL1TO8TW6biMMKfTtAOoEJANgtWBMQ==",
"license": "MIT",
"engines": {
"node": ">=12"
}
},
"node_modules/magic-string": { "node_modules/magic-string": {
"version": "0.30.8", "version": "0.30.8",
"resolved": "https://registry.npmjs.org/magic-string/-/magic-string-0.30.8.tgz", "resolved": "https://registry.npmjs.org/magic-string/-/magic-string-0.30.8.tgz",

View File

@ -30,6 +30,7 @@
"@nestjs/jwt": "^10.2.0", "@nestjs/jwt": "^10.2.0",
"@nestjs/passport": "^10.0.3", "@nestjs/passport": "^10.0.3",
"@nestjs/platform-express": "^10.0.0", "@nestjs/platform-express": "^10.0.0",
"@nestjs/schedule": "^6.0.0",
"@nestjs/swagger": "^7.3.0", "@nestjs/swagger": "^7.3.0",
"@nestjs/terminus": "^11.0.0", "@nestjs/terminus": "^11.0.0",
"@nestjs/throttler": "^6.4.0", "@nestjs/throttler": "^6.4.0",

View File

@ -42,6 +42,8 @@ import { winstonLoggerOptions } from '../libs/common/src/logger/services/winston
import { AqiModule } from './aqi/aqi.module'; import { AqiModule } from './aqi/aqi.module';
import { OccupancyModule } from './occupancy/occupancy.module'; import { OccupancyModule } from './occupancy/occupancy.module';
import { WeatherModule } from './weather/weather.module'; import { WeatherModule } from './weather/weather.module';
import { ScheduleModule as NestScheduleModule } from '@nestjs/schedule';
import { SchedulerModule } from './scheduler/scheduler.module';
@Module({ @Module({
imports: [ imports: [
ConfigModule.forRoot({ ConfigModule.forRoot({
@ -94,6 +96,8 @@ import { WeatherModule } from './weather/weather.module';
OccupancyModule, OccupancyModule,
WeatherModule, WeatherModule,
AqiModule, AqiModule,
SchedulerModule,
NestScheduleModule.forRoot(),
], ],
providers: [ providers: [
{ {

View File

@ -30,6 +30,8 @@ import { PowerClampService } from '@app/common/helper/services/power.clamp.servi
import { SqlLoaderService } from '@app/common/helper/services/sql-loader.service'; import { SqlLoaderService } from '@app/common/helper/services/sql-loader.service';
import { OccupancyService } from '@app/common/helper/services/occupancy.service'; import { OccupancyService } from '@app/common/helper/services/occupancy.service';
import { AqiDataService } from '@app/common/helper/services/aqi.data.service'; import { AqiDataService } from '@app/common/helper/services/aqi.data.service';
import { PresenceSensorDailySpaceRepository } from '@app/common/modules/presence-sensor/repositories';
import { AqiSpaceDailyPollutantStatsRepository } from '@app/common/modules/aqi/repositories';
@Module({ @Module({
imports: [ConfigModule, SpaceRepositoryModule], imports: [ConfigModule, SpaceRepositoryModule],
@ -59,6 +61,8 @@ import { AqiDataService } from '@app/common/helper/services/aqi.data.service';
SqlLoaderService, SqlLoaderService,
OccupancyService, OccupancyService,
AqiDataService, AqiDataService,
PresenceSensorDailySpaceRepository,
AqiSpaceDailyPollutantStatsRepository,
], ],
exports: [], exports: [],
}) })

View File

@ -64,6 +64,8 @@ import {
import { SqlLoaderService } from '@app/common/helper/services/sql-loader.service'; import { SqlLoaderService } from '@app/common/helper/services/sql-loader.service';
import { OccupancyService } from '@app/common/helper/services/occupancy.service'; import { OccupancyService } from '@app/common/helper/services/occupancy.service';
import { AqiDataService } from '@app/common/helper/services/aqi.data.service'; import { AqiDataService } from '@app/common/helper/services/aqi.data.service';
import { PresenceSensorDailySpaceRepository } from '@app/common/modules/presence-sensor/repositories';
import { AqiSpaceDailyPollutantStatsRepository } from '@app/common/modules/aqi/repositories';
@Module({ @Module({
imports: [ConfigModule, SpaceRepositoryModule, UserRepositoryModule], imports: [ConfigModule, SpaceRepositoryModule, UserRepositoryModule],
@ -118,6 +120,8 @@ import { AqiDataService } from '@app/common/helper/services/aqi.data.service';
SqlLoaderService, SqlLoaderService,
OccupancyService, OccupancyService,
AqiDataService, AqiDataService,
PresenceSensorDailySpaceRepository,
AqiSpaceDailyPollutantStatsRepository,
], ],
exports: [CommunityService, SpacePermissionService], exports: [CommunityService, SpacePermissionService],
}) })

View File

@ -30,6 +30,8 @@ import { SqlLoaderService } from '@app/common/helper/services/sql-loader.service
import { OccupancyService } from '@app/common/helper/services/occupancy.service'; import { OccupancyService } from '@app/common/helper/services/occupancy.service';
import { CommunityRepository } from '@app/common/modules/community/repositories'; import { CommunityRepository } from '@app/common/modules/community/repositories';
import { AqiDataService } from '@app/common/helper/services/aqi.data.service'; import { AqiDataService } from '@app/common/helper/services/aqi.data.service';
import { PresenceSensorDailySpaceRepository } from '@app/common/modules/presence-sensor/repositories';
import { AqiSpaceDailyPollutantStatsRepository } from '@app/common/modules/aqi/repositories';
@Module({ @Module({
imports: [ConfigModule, DeviceRepositoryModule], imports: [ConfigModule, DeviceRepositoryModule],
controllers: [DoorLockController], controllers: [DoorLockController],
@ -58,6 +60,8 @@ import { AqiDataService } from '@app/common/helper/services/aqi.data.service';
OccupancyService, OccupancyService,
CommunityRepository, CommunityRepository,
AqiDataService, AqiDataService,
PresenceSensorDailySpaceRepository,
AqiSpaceDailyPollutantStatsRepository,
], ],
exports: [DoorLockService], exports: [DoorLockService],
}) })

View File

@ -28,6 +28,8 @@ import { SqlLoaderService } from '@app/common/helper/services/sql-loader.service
import { OccupancyService } from '@app/common/helper/services/occupancy.service'; import { OccupancyService } from '@app/common/helper/services/occupancy.service';
import { CommunityRepository } from '@app/common/modules/community/repositories'; import { CommunityRepository } from '@app/common/modules/community/repositories';
import { AqiDataService } from '@app/common/helper/services/aqi.data.service'; import { AqiDataService } from '@app/common/helper/services/aqi.data.service';
import { PresenceSensorDailySpaceRepository } from '@app/common/modules/presence-sensor/repositories';
import { AqiSpaceDailyPollutantStatsRepository } from '@app/common/modules/aqi/repositories';
@Module({ @Module({
imports: [ConfigModule, DeviceRepositoryModule], imports: [ConfigModule, DeviceRepositoryModule],
controllers: [GroupController], controllers: [GroupController],
@ -55,6 +57,8 @@ import { AqiDataService } from '@app/common/helper/services/aqi.data.service';
OccupancyService, OccupancyService,
CommunityRepository, CommunityRepository,
AqiDataService, AqiDataService,
PresenceSensorDailySpaceRepository,
AqiSpaceDailyPollutantStatsRepository,
], ],
exports: [GroupService], exports: [GroupService],
}) })

View File

@ -82,6 +82,8 @@ import { SubspaceProductAllocationService } from 'src/space/services/subspace/su
import { TagService as NewTagService } from 'src/tags/services'; import { TagService as NewTagService } from 'src/tags/services';
import { UserDevicePermissionService } from 'src/user-device-permission/services'; import { UserDevicePermissionService } from 'src/user-device-permission/services';
import { UserService, UserSpaceService } from 'src/users/services'; import { UserService, UserSpaceService } from 'src/users/services';
import { PresenceSensorDailySpaceRepository } from '@app/common/modules/presence-sensor/repositories';
import { AqiSpaceDailyPollutantStatsRepository } from '@app/common/modules/aqi/repositories';
@Module({ @Module({
imports: [ConfigModule, InviteUserRepositoryModule, CommunityModule], imports: [ConfigModule, InviteUserRepositoryModule, CommunityModule],
@ -150,6 +152,8 @@ import { UserService, UserSpaceService } from 'src/users/services';
SqlLoaderService, SqlLoaderService,
OccupancyService, OccupancyService,
AqiDataService, AqiDataService,
PresenceSensorDailySpaceRepository,
AqiSpaceDailyPollutantStatsRepository,
], ],
exports: [InviteUserService], exports: [InviteUserService],
}) })

View File

@ -60,6 +60,8 @@ import { SubspaceProductAllocationService } from 'src/space/services/subspace/su
import { TagService } from 'src/tags/services'; import { TagService } from 'src/tags/services';
import { PowerClampController } from './controllers'; import { PowerClampController } from './controllers';
import { PowerClampService as PowerClamp } from './services/power-clamp.service'; import { PowerClampService as PowerClamp } from './services/power-clamp.service';
import { PresenceSensorDailySpaceRepository } from '@app/common/modules/presence-sensor/repositories';
import { AqiSpaceDailyPollutantStatsRepository } from '@app/common/modules/aqi/repositories';
@Module({ @Module({
imports: [ConfigModule], imports: [ConfigModule],
controllers: [PowerClampController], controllers: [PowerClampController],
@ -109,6 +111,8 @@ import { PowerClampService as PowerClamp } from './services/power-clamp.service'
SubspaceModelProductAllocationRepoitory, SubspaceModelProductAllocationRepoitory,
OccupancyService, OccupancyService,
AqiDataService, AqiDataService,
PresenceSensorDailySpaceRepository,
AqiSpaceDailyPollutantStatsRepository,
], ],
exports: [PowerClamp], exports: [PowerClamp],
}) })

View File

@ -67,6 +67,8 @@ import { ProjectUserController } from './controllers/project-user.controller';
import { CreateOrphanSpaceHandler } from './handler'; import { CreateOrphanSpaceHandler } from './handler';
import { ProjectService } from './services'; import { ProjectService } from './services';
import { ProjectUserService } from './services/project-user.service'; import { ProjectUserService } from './services/project-user.service';
import { PresenceSensorDailySpaceRepository } from '@app/common/modules/presence-sensor/repositories';
import { AqiSpaceDailyPollutantStatsRepository } from '@app/common/modules/aqi/repositories';
const CommandHandlers = [CreateOrphanSpaceHandler]; const CommandHandlers = [CreateOrphanSpaceHandler];
@ -124,6 +126,8 @@ const CommandHandlers = [CreateOrphanSpaceHandler];
SqlLoaderService, SqlLoaderService,
OccupancyService, OccupancyService,
AqiDataService, AqiDataService,
PresenceSensorDailySpaceRepository,
AqiSpaceDailyPollutantStatsRepository,
], ],
exports: [ProjectService, CqrsModule], exports: [ProjectService, CqrsModule],
}) })

View File

@ -0,0 +1,25 @@
import { DatabaseModule } from '@app/common/database/database.module';
import { SqlLoaderService } from '@app/common/helper/services/sql-loader.service';
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { SchedulerService } from './scheduler.service';
import { ScheduleModule as NestScheduleModule } from '@nestjs/schedule';
import { AqiDataService } from '@app/common/helper/services/aqi.data.service';
import { OccupancyService } from '@app/common/helper/services/occupancy.service';
import { PowerClampService } from '@app/common/helper/services/power.clamp.service';
@Module({
imports: [
NestScheduleModule.forRoot(),
TypeOrmModule.forFeature([]),
DatabaseModule,
],
providers: [
SchedulerService,
SqlLoaderService,
PowerClampService,
OccupancyService,
AqiDataService,
],
})
export class SchedulerModule {}

View File

@ -0,0 +1,92 @@
import { AqiDataService } from '@app/common/helper/services/aqi.data.service';
import { Injectable } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { OccupancyService } from '@app/common/helper/services/occupancy.service';
import { PowerClampService } from '@app/common/helper/services/power.clamp.service';
@Injectable()
export class SchedulerService {
constructor(
private readonly powerClampService: PowerClampService,
private readonly occupancyService: OccupancyService,
private readonly aqiDataService: AqiDataService,
) {
console.log('SchedulerService initialized!');
}
@Cron(CronExpression.EVERY_HOUR)
async runHourlyProcedures() {
console.log('\n======== Starting Procedures ========');
console.log(new Date().toISOString(), 'Scheduler running...');
try {
const results = await Promise.allSettled([
this.executeTask(
() => this.powerClampService.updateEnergyConsumedHistoricalData(),
'Energy Consumption',
),
this.executeTask(
() => this.occupancyService.updateOccupancyDataProcedures(),
'Occupancy Data',
),
this.executeTask(
() => this.aqiDataService.updateAQISensorHistoricalData(),
'AQI Data',
),
]);
this.logResults(results);
} catch (error) {
console.error('MAIN SCHEDULER ERROR:', error);
if (error.stack) {
console.error('Error stack:', error.stack);
}
}
}
private async executeTask(
task: () => Promise<void>,
name: string,
): Promise<{ name: string; status: string }> {
try {
console.log(`[${new Date().toISOString()}] Starting ${name} task...`);
await task();
console.log(
`[${new Date().toISOString()}] ${name} task completed successfully`,
);
return { name, status: 'success' };
} catch (error) {
console.error(
`[${new Date().toISOString()}] ${name} task failed:`,
error.message,
);
if (error.stack) {
console.error('Task error stack:', error.stack);
}
return { name, status: 'failed' };
}
}
private logResults(results: PromiseSettledResult<any>[]) {
const successCount = results.filter((r) => r.status === 'fulfilled').length;
const failedCount = results.length - successCount;
console.log('\n======== Task Results ========');
console.log(`Successful tasks: ${successCount}`);
console.log(`Failed tasks: ${failedCount}`);
if (failedCount > 0) {
console.log('\n======== Failed Tasks Details ========');
results.forEach((result, index) => {
if (result.status === 'rejected') {
console.error(`Task ${index + 1} failed:`, result.reason);
if (result.reason.stack) {
console.error('Error stack:', result.reason.stack);
}
}
});
}
console.log('\n======== Scheduler Completed ========\n');
}
}

View File

@ -63,6 +63,8 @@ import {
import { SpaceModelService, SubSpaceModelService } from './services'; import { SpaceModelService, SubSpaceModelService } from './services';
import { SpaceModelProductAllocationService } from './services/space-model-product-allocation.service'; import { SpaceModelProductAllocationService } from './services/space-model-product-allocation.service';
import { SubspaceModelProductAllocationService } from './services/subspace/subspace-model-product-allocation.service'; import { SubspaceModelProductAllocationService } from './services/subspace/subspace-model-product-allocation.service';
import { PresenceSensorDailySpaceRepository } from '@app/common/modules/presence-sensor/repositories';
import { AqiSpaceDailyPollutantStatsRepository } from '@app/common/modules/aqi/repositories';
const CommandHandlers = [ const CommandHandlers = [
PropogateUpdateSpaceModelHandler, PropogateUpdateSpaceModelHandler,
@ -120,6 +122,8 @@ const CommandHandlers = [
SqlLoaderService, SqlLoaderService,
OccupancyService, OccupancyService,
AqiDataService, AqiDataService,
PresenceSensorDailySpaceRepository,
AqiSpaceDailyPollutantStatsRepository,
], ],
exports: [CqrsModule, SpaceModelService], exports: [CqrsModule, SpaceModelService],
}) })

View File

@ -88,6 +88,8 @@ import {
} from './services'; } from './services';
import { SpaceProductAllocationService } from './services/space-product-allocation.service'; import { SpaceProductAllocationService } from './services/space-product-allocation.service';
import { SubspaceProductAllocationService } from './services/subspace/subspace-product-allocation.service'; import { SubspaceProductAllocationService } from './services/subspace/subspace-product-allocation.service';
import { PresenceSensorDailySpaceRepository } from '@app/common/modules/presence-sensor/repositories';
import { AqiSpaceDailyPollutantStatsRepository } from '@app/common/modules/aqi/repositories';
export const CommandHandlers = [DisableSpaceHandler]; export const CommandHandlers = [DisableSpaceHandler];
@ -161,6 +163,8 @@ export const CommandHandlers = [DisableSpaceHandler];
SqlLoaderService, SqlLoaderService,
OccupancyService, OccupancyService,
AqiDataService, AqiDataService,
PresenceSensorDailySpaceRepository,
AqiSpaceDailyPollutantStatsRepository,
], ],
exports: [SpaceService], exports: [SpaceService],
}) })

View File

@ -32,6 +32,8 @@ import { SqlLoaderService } from '@app/common/helper/services/sql-loader.service
import { OccupancyService } from '@app/common/helper/services/occupancy.service'; import { OccupancyService } from '@app/common/helper/services/occupancy.service';
import { CommunityRepository } from '@app/common/modules/community/repositories'; import { CommunityRepository } from '@app/common/modules/community/repositories';
import { AqiDataService } from '@app/common/helper/services/aqi.data.service'; import { AqiDataService } from '@app/common/helper/services/aqi.data.service';
import { PresenceSensorDailySpaceRepository } from '@app/common/modules/presence-sensor/repositories';
import { AqiSpaceDailyPollutantStatsRepository } from '@app/common/modules/aqi/repositories';
@Module({ @Module({
imports: [ConfigModule, DeviceRepositoryModule, DoorLockModule], imports: [ConfigModule, DeviceRepositoryModule, DoorLockModule],
controllers: [VisitorPasswordController], controllers: [VisitorPasswordController],
@ -61,6 +63,8 @@ import { AqiDataService } from '@app/common/helper/services/aqi.data.service';
OccupancyService, OccupancyService,
CommunityRepository, CommunityRepository,
AqiDataService, AqiDataService,
PresenceSensorDailySpaceRepository,
AqiSpaceDailyPollutantStatsRepository,
], ],
exports: [VisitorPasswordService], exports: [VisitorPasswordService],
}) })