Compare commits

..

16 Commits

Author SHA1 Message Date
f80d097ff8 refactor: optimize log insertion and clean up device cache handling in TuyaWebSocketService 2025-06-25 18:57:56 -06:00
04bd156df1 Merge branch 'dev' into add-check-log-to-trace-the-map-issue 2025-06-25 18:42:43 -06:00
731819aeaa feat: enhance device status handling with caching and batch processing improvements 2025-06-25 18:37:46 -06:00
68d2d3b53d fix: improve device retrieval logic in addDeviceStatusToFirebase method 2025-06-25 08:13:02 -06:00
3fcfe2d92f Merge pull request #438 from SyncrowIOT/temp-fix-to-check
fix: enhance device status handling by integrating device cache for improved performance
2025-06-25 08:06:29 -06:00
c0a069b460 fix: enhance device status handling by integrating device cache for improved performance 2025-06-25 08:03:23 -06:00
30724d7d37 Merge pull request #436 from SyncrowIOT/add-check-log-to-trace-the-map-issue
fix: add validation for missing properties in device status logs
2025-06-25 05:32:50 -06:00
324661e1ee fix: add missing check for device UUID in batch processing logs 2025-06-25 05:30:15 -06:00
a83424f45b fix: remove unnecessary validation for missing properties in device status logs 2025-06-25 05:29:28 -06:00
71f6ccb4db fix: add validation for missing properties in device status logs 2025-06-25 05:20:26 -06:00
68692b7c8b increase rate limit to 100 per minute for each IP (#435) 2025-06-25 13:50:38 +03:00
4d60c1ed54 Merge pull request #434 from SyncrowIOT/fix-time-out-connections-db
Fix-time-out-connections-db
2025-06-25 04:47:59 -06:00
27dbe04299 fix: remove unnecessary comment from ScheduleModule import in scheduler module 2025-06-25 04:47:38 -06:00
9bebcb2f3e feat: implement scheduler for periodic data updates and optimize database procedures
- Added SchedulerModule and SchedulerService to handle hourly data updates for AQI, occupancy, and energy consumption.
- Refactored existing services to remove unused device repository dependencies and streamline procedure execution.
- Updated SQL procedures to use correct parameter indexing.
- Enhanced error handling and logging for scheduled tasks.
- Integrated new repositories for presence sensor and AQI pollutant stats across multiple modules.
- Added NestJS schedule package for task scheduling capabilities.
2025-06-25 03:20:25 -06:00
43ab0030f0 refactor: clean up unused services and optimize batch processing in DeviceStatusFirebaseService 2025-06-25 03:20:12 -06:00
c48adb73b5 Merge pull request #433 from SyncrowIOT/DATA-adjust-remaining-procedures
DATA-adjust-remaining-procedures
2025-06-25 01:55:12 -06:00
27 changed files with 554 additions and 274 deletions

View File

@ -126,7 +126,7 @@ import { VisitorPasswordEntity } from '../modules/visitor-password/entities';
extra: { extra: {
charset: 'utf8mb4', charset: 'utf8mb4',
max: 100, // set pool max size max: 100, // set pool max size
idleTimeoutMillis: 5000, // close idle clients after 5 second idleTimeoutMillis: 3000, // close idle clients after 5 second
connectionTimeoutMillis: 12_000, // return an error after 11 second if connection could not be established connectionTimeoutMillis: 12_000, // return an error after 11 second if connection could not be established
maxUses: 7500, // close (and replace) a connection after it has been used 7500 times (see below for discussion) maxUses: 7500, // close (and replace) a connection after it has been used 7500 times (see below for discussion)
}, },

View File

@ -3,28 +3,12 @@ import { DeviceStatusFirebaseController } from './controllers/devices-status.con
import { DeviceStatusFirebaseService } from './services/devices-status.service'; import { DeviceStatusFirebaseService } from './services/devices-status.service';
import { DeviceRepository } from '@app/common/modules/device/repositories'; import { DeviceRepository } from '@app/common/modules/device/repositories';
import { DeviceStatusLogRepository } from '@app/common/modules/device-status-log/repositories/device-status.repository'; import { DeviceStatusLogRepository } from '@app/common/modules/device-status-log/repositories/device-status.repository';
import { PowerClampService } from '@app/common/helper/services/power.clamp.service';
import {
PowerClampHourlyRepository,
PowerClampDailyRepository,
PowerClampMonthlyRepository,
} from '@app/common/modules/power-clamp/repositories';
import { SqlLoaderService } from '@app/common/helper/services/sql-loader.service';
import { OccupancyService } from '@app/common/helper/services/occupancy.service';
import { AqiDataService } from '@app/common/helper/services/aqi.data.service';
@Module({ @Module({
providers: [ providers: [
DeviceStatusFirebaseService, DeviceStatusFirebaseService,
DeviceRepository, DeviceRepository,
DeviceStatusLogRepository, DeviceStatusLogRepository,
PowerClampService,
PowerClampHourlyRepository,
PowerClampDailyRepository,
PowerClampMonthlyRepository,
SqlLoaderService,
OccupancyService,
AqiDataService,
], ],
controllers: [DeviceStatusFirebaseController], controllers: [DeviceStatusFirebaseController],
exports: [DeviceStatusFirebaseService, DeviceStatusLogRepository], exports: [DeviceStatusFirebaseService, DeviceStatusLogRepository],

View File

@ -18,12 +18,6 @@ import {
runTransaction, runTransaction,
} from 'firebase/database'; } from 'firebase/database';
import { DeviceStatusLogRepository } from '@app/common/modules/device-status-log/repositories'; import { DeviceStatusLogRepository } from '@app/common/modules/device-status-log/repositories';
import { ProductType } from '@app/common/constants/product-type.enum';
import { PowerClampService } from '@app/common/helper/services/power.clamp.service';
import { PowerClampEnergyEnum } from '@app/common/constants/power.clamp.enargy.enum';
import { PresenceSensorEnum } from '@app/common/constants/presence.sensor.enum';
import { OccupancyService } from '@app/common/helper/services/occupancy.service';
import { AqiDataService } from '@app/common/helper/services/aqi.data.service';
@Injectable() @Injectable()
export class DeviceStatusFirebaseService { export class DeviceStatusFirebaseService {
private tuya: TuyaContext; private tuya: TuyaContext;
@ -31,9 +25,6 @@ export class DeviceStatusFirebaseService {
constructor( constructor(
private readonly configService: ConfigService, private readonly configService: ConfigService,
private readonly deviceRepository: DeviceRepository, private readonly deviceRepository: DeviceRepository,
private readonly powerClampService: PowerClampService,
private readonly occupancyService: OccupancyService,
private readonly aqiDataService: AqiDataService,
private deviceStatusLogRepository: DeviceStatusLogRepository, private deviceStatusLogRepository: DeviceStatusLogRepository,
) { ) {
const accessKey = this.configService.get<string>('auth-config.ACCESS_KEY'); const accessKey = this.configService.get<string>('auth-config.ACCESS_KEY');
@ -76,47 +67,94 @@ export class DeviceStatusFirebaseService {
); );
} }
} }
async addDeviceStatusToOurDb( async addBatchDeviceStatusToOurDb(
addDeviceStatusDto: AddDeviceStatusDto, batch: {
): Promise<AddDeviceStatusDto | null> { deviceTuyaUuid: string;
try { status: any;
const device = await this.getDeviceByDeviceTuyaUuid( log: any;
addDeviceStatusDto.deviceTuyaUuid, device: any;
); }[],
): Promise<void> {
const allLogs = [];
if (device?.uuid) { console.log(`🔁 Preparing logs from batch of ${batch.length} items...`);
return await this.createDeviceStatusInOurDb({
deviceUuid: device.uuid, for (const item of batch) {
...addDeviceStatusDto, const device = item.device;
productType: device.productDevice.prodType, if (!device?.uuid) {
}); console.log(`⛔ Skipped unknown device: ${item.deviceTuyaUuid}`);
continue;
} }
// Return null if device not found or no UUID
return null; const logs = item.log.properties.map((property) =>
} catch (error) { this.deviceStatusLogRepository.create({
// Handle the error silently, perhaps log it internally or ignore it deviceId: device.uuid,
return null; deviceTuyaId: item.deviceTuyaUuid,
productId: item.log.productId,
log: item.log,
code: property.code,
value: property.value,
eventId: item.log.dataId,
eventTime: new Date(property.time).toISOString(),
}),
);
allLogs.push(...logs);
} }
console.log(`📝 Total logs to insert: ${allLogs.length}`);
const insertLogsPromise = (async () => {
const chunkSize = 300;
let insertedCount = 0;
for (let i = 0; i < allLogs.length; i += chunkSize) {
const chunk = allLogs.slice(i, i + chunkSize);
try {
const result = await this.deviceStatusLogRepository
.createQueryBuilder()
.insert()
.into('device-status-log') // or use DeviceStatusLogEntity
.values(chunk)
.orIgnore() // skip duplicates
.execute();
insertedCount += result.identifiers.length;
console.log(
`✅ Inserted ${result.identifiers.length} / ${chunk.length} logs (chunk)`,
);
} catch (error) {
console.error('❌ Insert error (skipped chunk):', error.message);
}
}
console.log(
`✅ Total logs inserted: ${insertedCount} / ${allLogs.length}`,
);
})();
await insertLogsPromise;
} }
async addDeviceStatusToFirebase( async addDeviceStatusToFirebase(
addDeviceStatusDto: AddDeviceStatusDto, addDeviceStatusDto: AddDeviceStatusDto & { device?: any },
): Promise<AddDeviceStatusDto | null> { ): Promise<AddDeviceStatusDto | null> {
try { try {
const device = await this.getDeviceByDeviceTuyaUuid( let device = addDeviceStatusDto.device;
addDeviceStatusDto.deviceTuyaUuid, if (!device) {
); device = await this.getDeviceByDeviceTuyaUuid(
addDeviceStatusDto.deviceTuyaUuid,
);
}
if (device?.uuid) { if (device?.uuid) {
return await this.createDeviceStatusFirebase({ return await this.createDeviceStatusFirebase({
deviceUuid: device.uuid, deviceUuid: device.uuid,
...addDeviceStatusDto, ...addDeviceStatusDto,
productType: device.productDevice.prodType, productType: device.productDevice?.prodType,
}); });
} }
// Return null if device not found or no UUID // Return null if device not found or no UUID
return null; return null;
} catch (error) { } catch (error) {
// Handle the error silently, perhaps log it internally or ignore it
return null; return null;
} }
} }
@ -130,6 +168,15 @@ export class DeviceStatusFirebaseService {
relations: ['productDevice'], relations: ['productDevice'],
}); });
} }
async getAllDevices() {
return await this.deviceRepository.find({
where: {
isActive: true,
},
relations: ['productDevice'],
});
}
async getDevicesInstructionStatus(deviceUuid: string) { async getDevicesInstructionStatus(deviceUuid: string) {
try { try {
const deviceDetails = await this.getDeviceByDeviceUuid(deviceUuid); const deviceDetails = await this.getDeviceByDeviceUuid(deviceUuid);
@ -237,66 +284,4 @@ export class DeviceStatusFirebaseService {
const snapshot: DataSnapshot = await get(dataRef); const snapshot: DataSnapshot = await get(dataRef);
return snapshot.val(); return snapshot.val();
} }
async createDeviceStatusInOurDb(
addDeviceStatusDto: AddDeviceStatusDto,
): Promise<any> {
// Save logs to your repository
const newLogs = addDeviceStatusDto.log.properties.map((property) => {
return this.deviceStatusLogRepository.create({
deviceId: addDeviceStatusDto.deviceUuid,
deviceTuyaId: addDeviceStatusDto.deviceTuyaUuid,
productId: addDeviceStatusDto.log.productId,
log: addDeviceStatusDto.log,
code: property.code,
value: property.value,
eventId: addDeviceStatusDto.log.dataId,
eventTime: new Date(property.time).toISOString(),
});
});
await this.deviceStatusLogRepository.save(newLogs);
if (addDeviceStatusDto.productType === ProductType.PC) {
const energyCodes = new Set([
PowerClampEnergyEnum.ENERGY_CONSUMED,
PowerClampEnergyEnum.ENERGY_CONSUMED_A,
PowerClampEnergyEnum.ENERGY_CONSUMED_B,
PowerClampEnergyEnum.ENERGY_CONSUMED_C,
]);
const energyStatus = addDeviceStatusDto?.log?.properties?.find((status) =>
energyCodes.has(status.code),
);
if (energyStatus) {
await this.powerClampService.updateEnergyConsumedHistoricalData(
addDeviceStatusDto.deviceUuid,
);
}
}
if (
addDeviceStatusDto.productType === ProductType.CPS ||
addDeviceStatusDto.productType === ProductType.WPS
) {
const occupancyCodes = new Set([PresenceSensorEnum.PRESENCE_STATE]);
const occupancyStatus = addDeviceStatusDto?.log?.properties?.find(
(status) => occupancyCodes.has(status.code),
);
if (occupancyStatus) {
await this.occupancyService.updateOccupancySensorHistoricalData(
addDeviceStatusDto.deviceUuid,
);
await this.occupancyService.updateOccupancySensorHistoricalDurationData(
addDeviceStatusDto.deviceUuid,
);
}
}
if (addDeviceStatusDto.productType === ProductType.AQI) {
await this.aqiDataService.updateAQISensorHistoricalData(
addDeviceStatusDto.deviceUuid,
);
}
}
} }

View File

@ -8,7 +8,10 @@ import { TuyaWebSocketService } from './services/tuya.web.socket.service';
import { OneSignalService } from './services/onesignal.service'; import { OneSignalService } from './services/onesignal.service';
import { DeviceMessagesService } from './services/device.messages.service'; import { DeviceMessagesService } from './services/device.messages.service';
import { DeviceRepositoryModule } from '../modules/device/device.repository.module'; import { DeviceRepositoryModule } from '../modules/device/device.repository.module';
import { DeviceNotificationRepository } from '../modules/device/repositories'; import {
DeviceNotificationRepository,
DeviceRepository,
} from '../modules/device/repositories';
import { DeviceStatusFirebaseModule } from '../firebase/devices-status/devices-status.module'; import { DeviceStatusFirebaseModule } from '../firebase/devices-status/devices-status.module';
import { CommunityPermissionService } from './services/community.permission.service'; import { CommunityPermissionService } from './services/community.permission.service';
import { CommunityRepository } from '../modules/community/repositories'; import { CommunityRepository } from '../modules/community/repositories';
@ -27,6 +30,7 @@ import { SosHandlerService } from './services/sos.handler.service';
DeviceNotificationRepository, DeviceNotificationRepository,
CommunityRepository, CommunityRepository,
SosHandlerService, SosHandlerService,
DeviceRepository,
], ],
exports: [ exports: [
HelperHashService, HelperHashService,

View File

@ -1,44 +1,63 @@
import { DeviceRepository } from '@app/common/modules/device/repositories';
import { Injectable } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import { SqlLoaderService } from './sql-loader.service';
import { DataSource } from 'typeorm'; import { DataSource } from 'typeorm';
import { SQL_PROCEDURES_PATH } from '@app/common/constants/sql-query-path'; import { SQL_PROCEDURES_PATH } from '@app/common/constants/sql-query-path';
import { SqlLoaderService } from './sql-loader.service';
@Injectable() @Injectable()
export class AqiDataService { export class AqiDataService {
constructor( constructor(
private readonly sqlLoader: SqlLoaderService, private readonly sqlLoader: SqlLoaderService,
private readonly dataSource: DataSource, private readonly dataSource: DataSource,
private readonly deviceRepository: DeviceRepository,
) {} ) {}
async updateAQISensorHistoricalData(deviceUuid: string): Promise<void> {
try {
const now = new Date();
const dateStr = now.toLocaleDateString('en-CA'); // YYYY-MM-DD
const device = await this.deviceRepository.findOne({
where: { uuid: deviceUuid },
relations: ['spaceDevice'],
});
await this.executeProcedure( async updateAQISensorHistoricalData(): Promise<void> {
'fact_daily_space_aqi', try {
'proceduce_update_daily_space_aqi', const { dateStr } = this.getFormattedDates();
[dateStr, device.spaceDevice?.uuid],
); // Execute all procedures in parallel
await Promise.all([
this.executeProcedureWithRetry(
'proceduce_update_daily_space_aqi',
[dateStr],
'fact_daily_space_aqi',
),
]);
} catch (err) { } catch (err) {
console.error('Failed to insert or update aqi data:', err); console.error('Failed to update AQI sensor historical data:', err);
throw err; throw err;
} }
} }
private getFormattedDates(): { dateStr: string } {
private async executeProcedure( const now = new Date();
procedureFolderName: string, return {
dateStr: now.toLocaleDateString('en-CA'), // YYYY-MM-DD
};
}
private async executeProcedureWithRetry(
procedureFileName: string, procedureFileName: string,
params: (string | number | null)[], params: (string | number | null)[],
folderName: string,
retries = 3,
): Promise<void> { ): Promise<void> {
const query = this.loadQuery(procedureFolderName, procedureFileName); try {
await this.dataSource.query(query, params); const query = this.loadQuery(folderName, procedureFileName);
console.log(`Procedure ${procedureFileName} executed successfully.`); await this.dataSource.query(query, params);
console.log(`Procedure ${procedureFileName} executed successfully.`);
} catch (err) {
if (retries > 0) {
const delayMs = 1000 * (4 - retries); // Exponential backoff
console.warn(`Retrying ${procedureFileName} (${retries} retries left)`);
await new Promise((resolve) => setTimeout(resolve, delayMs));
return this.executeProcedureWithRetry(
procedureFileName,
params,
folderName,
retries - 1,
);
}
console.error(`Failed to execute ${procedureFileName}:`, err);
throw err;
}
} }
private loadQuery(folderName: string, fileName: string): string { private loadQuery(folderName: string, fileName: string): string {

View File

@ -1,65 +1,68 @@
import { DeviceRepository } from '@app/common/modules/device/repositories';
import { Injectable } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import { SqlLoaderService } from './sql-loader.service';
import { DataSource } from 'typeorm'; import { DataSource } from 'typeorm';
import { SQL_PROCEDURES_PATH } from '@app/common/constants/sql-query-path'; import { SQL_PROCEDURES_PATH } from '@app/common/constants/sql-query-path';
import { SqlLoaderService } from './sql-loader.service';
@Injectable() @Injectable()
export class OccupancyService { export class OccupancyService {
constructor( constructor(
private readonly sqlLoader: SqlLoaderService, private readonly sqlLoader: SqlLoaderService,
private readonly dataSource: DataSource, private readonly dataSource: DataSource,
private readonly deviceRepository: DeviceRepository,
) {} ) {}
async updateOccupancySensorHistoricalDurationData(
deviceUuid: string,
): Promise<void> {
try {
const now = new Date();
const dateStr = now.toLocaleDateString('en-CA'); // YYYY-MM-DD
const device = await this.deviceRepository.findOne({
where: { uuid: deviceUuid },
relations: ['spaceDevice'],
});
await this.executeProcedure( async updateOccupancyDataProcedures(): Promise<void> {
'fact_daily_space_occupancy_duration', try {
'procedure_update_daily_space_occupancy_duration', const { dateStr } = this.getFormattedDates();
[dateStr, device.spaceDevice?.uuid],
); // Execute all procedures in parallel
await Promise.all([
this.executeProcedureWithRetry(
'procedure_update_fact_space_occupancy',
[dateStr],
'fact_space_occupancy_count',
),
this.executeProcedureWithRetry(
'procedure_update_daily_space_occupancy_duration',
[dateStr],
'fact_daily_space_occupancy_duration',
),
]);
} catch (err) { } catch (err) {
console.error('Failed to insert or update occupancy duration data:', err); console.error('Failed to update occupancy data:', err);
throw err; throw err;
} }
} }
async updateOccupancySensorHistoricalData(deviceUuid: string): Promise<void> { private getFormattedDates(): { dateStr: string } {
try { const now = new Date();
const now = new Date(); return {
const dateStr = now.toLocaleDateString('en-CA'); // YYYY-MM-DD dateStr: now.toLocaleDateString('en-CA'), // YYYY-MM-DD
const device = await this.deviceRepository.findOne({ };
where: { uuid: deviceUuid },
relations: ['spaceDevice'],
});
await this.executeProcedure(
'fact_space_occupancy_count',
'procedure_update_fact_space_occupancy',
[dateStr, device.spaceDevice?.uuid],
);
} catch (err) {
console.error('Failed to insert or update occupancy data:', err);
throw err;
}
} }
private async executeProcedureWithRetry(
private async executeProcedure(
procedureFolderName: string,
procedureFileName: string, procedureFileName: string,
params: (string | number | null)[], params: (string | number | null)[],
folderName: string,
retries = 3,
): Promise<void> { ): Promise<void> {
const query = this.loadQuery(procedureFolderName, procedureFileName); try {
await this.dataSource.query(query, params); const query = this.loadQuery(folderName, procedureFileName);
console.log(`Procedure ${procedureFileName} executed successfully.`); await this.dataSource.query(query, params);
console.log(`Procedure ${procedureFileName} executed successfully.`);
} catch (err) {
if (retries > 0) {
const delayMs = 1000 * (4 - retries); // Exponential backoff
console.warn(`Retrying ${procedureFileName} (${retries} retries left)`);
await new Promise((resolve) => setTimeout(resolve, delayMs));
return this.executeProcedureWithRetry(
procedureFileName,
params,
folderName,
retries - 1,
);
}
console.error(`Failed to execute ${procedureFileName}:`, err);
throw err;
}
} }
private loadQuery(folderName: string, fileName: string): string { private loadQuery(folderName: string, fileName: string): string {

View File

@ -1,7 +1,7 @@
import { Injectable } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import { SqlLoaderService } from './sql-loader.service';
import { DataSource } from 'typeorm'; import { DataSource } from 'typeorm';
import { SQL_PROCEDURES_PATH } from '@app/common/constants/sql-query-path'; import { SQL_PROCEDURES_PATH } from '@app/common/constants/sql-query-path';
import { SqlLoaderService } from './sql-loader.service';
@Injectable() @Injectable()
export class PowerClampService { export class PowerClampService {
@ -10,48 +10,72 @@ export class PowerClampService {
private readonly dataSource: DataSource, private readonly dataSource: DataSource,
) {} ) {}
async updateEnergyConsumedHistoricalData(deviceUuid: string): Promise<void> { async updateEnergyConsumedHistoricalData(): Promise<void> {
try { try {
const now = new Date(); const { dateStr, monthYear } = this.getFormattedDates();
const dateStr = now.toLocaleDateString('en-CA'); // YYYY-MM-DD
const hour = now.getHours();
const monthYear = now
.toLocaleDateString('en-US', {
month: '2-digit',
year: 'numeric',
})
.replace('/', '-'); // MM-YYYY
await this.executeProcedure( // Execute all procedures in parallel
'fact_hourly_device_energy_consumed_procedure', await Promise.all([
[deviceUuid, dateStr, hour], this.executeProcedureWithRetry(
); 'fact_hourly_device_energy_consumed_procedure',
[dateStr],
await this.executeProcedure( 'fact_device_energy_consumed',
'fact_daily_device_energy_consumed_procedure', ),
[deviceUuid, dateStr], this.executeProcedureWithRetry(
); 'fact_daily_device_energy_consumed_procedure',
[dateStr],
await this.executeProcedure( 'fact_device_energy_consumed',
'fact_monthly_device_energy_consumed_procedure', ),
[deviceUuid, monthYear], this.executeProcedureWithRetry(
); 'fact_monthly_device_energy_consumed_procedure',
[monthYear],
'fact_device_energy_consumed',
),
]);
} catch (err) { } catch (err) {
console.error('Failed to insert or update energy data:', err); console.error('Failed to update energy consumption data:', err);
throw err; throw err;
} }
} }
private async executeProcedure( private getFormattedDates(): { dateStr: string; monthYear: string } {
const now = new Date();
return {
dateStr: now.toLocaleDateString('en-CA'), // YYYY-MM-DD
monthYear: now
.toLocaleDateString('en-US', {
month: '2-digit',
year: 'numeric',
})
.replace('/', '-'), // MM-YYYY
};
}
private async executeProcedureWithRetry(
procedureFileName: string, procedureFileName: string,
params: (string | number | null)[], params: (string | number | null)[],
folderName: string,
retries = 3,
): Promise<void> { ): Promise<void> {
const query = this.loadQuery( try {
'fact_device_energy_consumed', const query = this.loadQuery(folderName, procedureFileName);
procedureFileName, await this.dataSource.query(query, params);
); console.log(`Procedure ${procedureFileName} executed successfully.`);
await this.dataSource.query(query, params); } catch (err) {
console.log(`Procedure ${procedureFileName} executed successfully.`); if (retries > 0) {
const delayMs = 1000 * (4 - retries); // Exponential backoff
console.warn(`Retrying ${procedureFileName} (${retries} retries left)`);
await new Promise((resolve) => setTimeout(resolve, delayMs));
return this.executeProcedureWithRetry(
procedureFileName,
params,
folderName,
retries - 1,
);
}
console.error(`Failed to execute ${procedureFileName}:`, err);
throw err;
}
} }
private loadQuery(folderName: string, fileName: string): string { private loadQuery(folderName: string, fileName: string): string {

View File

@ -16,45 +16,46 @@ export class SosHandlerService {
); );
} }
async handleSosEventFirebase(devId: string, logData: any): Promise<void> { async handleSosEventFirebase(device: any, logData: any): Promise<void> {
const sosTrueStatus = [{ code: 'sos', value: true }];
const sosFalseStatus = [{ code: 'sos', value: false }];
try { try {
// ✅ Send true status
await this.deviceStatusFirebaseService.addDeviceStatusToFirebase({ await this.deviceStatusFirebaseService.addDeviceStatusToFirebase({
deviceTuyaUuid: devId, deviceTuyaUuid: device.deviceTuyaUuid,
status: [{ code: 'sos', value: true }], status: sosTrueStatus,
log: logData, log: logData,
device,
}); });
await this.deviceStatusFirebaseService.addBatchDeviceStatusToOurDb([
{
deviceTuyaUuid: device.deviceTuyaUuid,
status: sosTrueStatus,
log: logData,
device,
},
]);
// ✅ Schedule false status
setTimeout(async () => { setTimeout(async () => {
try { try {
await this.deviceStatusFirebaseService.addDeviceStatusToFirebase({ await this.deviceStatusFirebaseService.addDeviceStatusToFirebase({
deviceTuyaUuid: devId, deviceTuyaUuid: device.deviceTuyaUuid,
status: [{ code: 'sos', value: false }], status: sosFalseStatus,
log: logData, log: logData,
device,
}); });
} catch (err) {
this.logger.error('Failed to send SOS false value', err);
}
}, 2000);
} catch (err) {
this.logger.error('Failed to send SOS true value', err);
}
}
async handleSosEventOurDb(devId: string, logData: any): Promise<void> { await this.deviceStatusFirebaseService.addBatchDeviceStatusToOurDb([
try { {
await this.deviceStatusFirebaseService.addDeviceStatusToOurDb({ deviceTuyaUuid: device.deviceTuyaUuid,
deviceTuyaUuid: devId, status: sosFalseStatus,
status: [{ code: 'sos', value: true }], log: logData,
log: logData, device,
}); },
]);
setTimeout(async () => {
try {
await this.deviceStatusFirebaseService.addDeviceStatusToOurDb({
deviceTuyaUuid: devId,
status: [{ code: 'sos', value: false }],
log: logData,
});
} catch (err) { } catch (err) {
this.logger.error('Failed to send SOS false value', err); this.logger.error('Failed to send SOS false value', err);
} }

View File

@ -1,18 +1,21 @@
import { Injectable } from '@nestjs/common'; import { Injectable, OnModuleInit } from '@nestjs/common';
import TuyaWebsocket from '../../config/tuya-web-socket-config'; import TuyaWebsocket from '../../config/tuya-web-socket-config';
import { ConfigService } from '@nestjs/config'; import { ConfigService } from '@nestjs/config';
import { DeviceStatusFirebaseService } from '@app/common/firebase/devices-status/services/devices-status.service'; import { DeviceStatusFirebaseService } from '@app/common/firebase/devices-status/services/devices-status.service';
import { SosHandlerService } from './sos.handler.service'; import { SosHandlerService } from './sos.handler.service';
import * as NodeCache from 'node-cache';
@Injectable() @Injectable()
export class TuyaWebSocketService { export class TuyaWebSocketService implements OnModuleInit {
private client: any; private client: any;
private readonly isDevEnv: boolean; private readonly isDevEnv: boolean;
private readonly deviceCache = new NodeCache({ stdTTL: 7200 }); // TTL = 2 hour
private messageQueue: { private messageQueue: {
devId: string; devId: string;
status: any; status: any;
logData: any; logData: any;
device: any;
}[] = []; }[] = [];
private isProcessing = false; private isProcessing = false;
@ -38,12 +41,32 @@ export class TuyaWebSocketService {
this.client.start(); this.client.start();
} }
// Trigger the queue processor every 2 seconds // Run the queue processor every 15 seconds
setInterval(() => this.processQueue(), 10000); setInterval(() => this.processQueue(), 15000);
// Refresh the cache every 1 hour
setInterval(() => this.initializeDeviceCache(), 30 * 60 * 1000); // 30 minutes
}
async onModuleInit() {
await this.initializeDeviceCache();
}
private async initializeDeviceCache() {
try {
const allDevices = await this.deviceStatusFirebaseService.getAllDevices();
allDevices.forEach((device) => {
if (device.deviceTuyaUuid) {
this.deviceCache.set(device.deviceTuyaUuid, device);
}
});
console.log(`✅ Refreshed cache with ${allDevices.length} devices.`);
} catch (error) {
console.error('❌ Failed to initialize device cache:', error);
}
} }
private setupEventHandlers() { private setupEventHandlers() {
// Event handlers
this.client.open(() => { this.client.open(() => {
console.log('open'); console.log('open');
}); });
@ -51,27 +74,38 @@ export class TuyaWebSocketService {
this.client.message(async (ws: WebSocket, message: any) => { this.client.message(async (ws: WebSocket, message: any) => {
try { try {
const { devId, status, logData } = this.extractMessageData(message); const { devId, status, logData } = this.extractMessageData(message);
if (!Array.isArray(logData?.properties)) {
this.client.ackMessage(message.messageId);
return;
}
const device = this.deviceCache.get(devId);
if (!device) {
// console.log(⛔ Unknown device: ${devId}, message ignored.);
this.client.ackMessage(message.messageId);
return;
}
if (this.sosHandlerService.isSosTriggered(status)) { if (this.sosHandlerService.isSosTriggered(status)) {
await this.sosHandlerService.handleSosEventFirebase(devId, logData); await this.sosHandlerService.handleSosEventFirebase(devId, logData);
} else { } else {
// Firebase real-time update
await this.deviceStatusFirebaseService.addDeviceStatusToFirebase({ await this.deviceStatusFirebaseService.addDeviceStatusToFirebase({
deviceTuyaUuid: devId, deviceTuyaUuid: devId,
status: status, status,
log: logData, log: logData,
device,
}); });
} }
// Push to internal queue // Push to internal queue
this.messageQueue.push({ devId, status, logData }); this.messageQueue.push({ devId, status, logData, device });
// Acknowledge the message // Acknowledge the message
this.client.ackMessage(message.messageId); this.client.ackMessage(message.messageId);
} catch (error) { } catch (error) {
console.error('Error receiving message:', error); console.error('Error receiving message:', error);
} }
}); });
this.client.reconnect(() => { this.client.reconnect(() => {
console.log('reconnect'); console.log('reconnect');
}); });
@ -93,32 +127,31 @@ export class TuyaWebSocketService {
}); });
} }
private async processQueue() { private async processQueue() {
if (this.isProcessing || this.messageQueue.length === 0) return; if (this.isProcessing) {
console.log('⏳ Skipping: still processing previous batch');
return;
}
if (this.messageQueue.length === 0) return;
this.isProcessing = true; this.isProcessing = true;
const batch = [...this.messageQueue]; const batch = [...this.messageQueue];
this.messageQueue = []; this.messageQueue = [];
console.log(`🔁 Processing batch of size: ${batch.length}`);
try { try {
for (const item of batch) { await this.deviceStatusFirebaseService.addBatchDeviceStatusToOurDb(
if (this.sosHandlerService.isSosTriggered(item.status)) { batch.map((item) => ({
await this.sosHandlerService.handleSosEventOurDb( deviceTuyaUuid: item.devId,
item.devId, status: item.status,
item.logData, log: item.logData,
); device: item.device,
} else { })),
await this.deviceStatusFirebaseService.addDeviceStatusToOurDb({ );
deviceTuyaUuid: item.devId,
status: item.status,
log: item.logData,
});
}
}
} catch (error) { } catch (error) {
console.error('Error processing batch:', error); console.error('Error processing batch:', error);
// Re-add the batch to the queue for retry this.messageQueue.unshift(...batch); // retry
this.messageQueue.unshift(...batch);
} finally { } finally {
this.isProcessing = false; this.isProcessing = false;
} }

View File

@ -1,6 +1,6 @@
WITH params AS ( WITH params AS (
SELECT SELECT
$2::date AS target_date $1::date AS target_date
), ),
total_energy AS ( total_energy AS (
SELECT SELECT

View File

@ -1,6 +1,6 @@
WITH params AS ( WITH params AS (
SELECT SELECT
$2::date AS target_date $1::date AS target_date
), ),
total_energy AS ( total_energy AS (
SELECT SELECT

View File

@ -1,6 +1,6 @@
WITH params AS ( WITH params AS (
SELECT SELECT
$2::text AS target_month -- Format should match 'MM-YYYY' $1::text AS target_month -- Format should match 'MM-YYYY'
), ),
total_energy AS ( total_energy AS (
SELECT SELECT

64
package-lock.json generated
View File

@ -18,6 +18,7 @@
"@nestjs/jwt": "^10.2.0", "@nestjs/jwt": "^10.2.0",
"@nestjs/passport": "^10.0.3", "@nestjs/passport": "^10.0.3",
"@nestjs/platform-express": "^10.0.0", "@nestjs/platform-express": "^10.0.0",
"@nestjs/schedule": "^6.0.0",
"@nestjs/swagger": "^7.3.0", "@nestjs/swagger": "^7.3.0",
"@nestjs/terminus": "^11.0.0", "@nestjs/terminus": "^11.0.0",
"@nestjs/throttler": "^6.4.0", "@nestjs/throttler": "^6.4.0",
@ -38,6 +39,7 @@
"ioredis": "^5.3.2", "ioredis": "^5.3.2",
"morgan": "^1.10.0", "morgan": "^1.10.0",
"nest-winston": "^1.10.2", "nest-winston": "^1.10.2",
"node-cache": "^5.1.2",
"nodemailer": "^6.9.10", "nodemailer": "^6.9.10",
"onesignal-node": "^3.4.0", "onesignal-node": "^3.4.0",
"passport-jwt": "^4.0.1", "passport-jwt": "^4.0.1",
@ -2538,6 +2540,19 @@
"@nestjs/core": "^10.0.0" "@nestjs/core": "^10.0.0"
} }
}, },
"node_modules/@nestjs/schedule": {
"version": "6.0.0",
"resolved": "https://registry.npmjs.org/@nestjs/schedule/-/schedule-6.0.0.tgz",
"integrity": "sha512-aQySMw6tw2nhitELXd3EiRacQRgzUKD9mFcUZVOJ7jPLqIBvXOyvRWLsK9SdurGA+jjziAlMef7iB5ZEFFoQpw==",
"license": "MIT",
"dependencies": {
"cron": "4.3.0"
},
"peerDependencies": {
"@nestjs/common": "^10.0.0 || ^11.0.0",
"@nestjs/core": "^10.0.0 || ^11.0.0"
}
},
"node_modules/@nestjs/schematics": { "node_modules/@nestjs/schematics": {
"version": "10.2.3", "version": "10.2.3",
"resolved": "https://registry.npmjs.org/@nestjs/schematics/-/schematics-10.2.3.tgz", "resolved": "https://registry.npmjs.org/@nestjs/schematics/-/schematics-10.2.3.tgz",
@ -3215,6 +3230,12 @@
"@types/node": "*" "@types/node": "*"
} }
}, },
"node_modules/@types/luxon": {
"version": "3.6.2",
"resolved": "https://registry.npmjs.org/@types/luxon/-/luxon-3.6.2.tgz",
"integrity": "sha512-R/BdP7OxEMc44l2Ex5lSXHoIXTB2JLNa3y2QISIbr58U/YcsffyQrYW//hZSdrfxrjRZj3GcUoxMPGdO8gSYuw==",
"license": "MIT"
},
"node_modules/@types/methods": { "node_modules/@types/methods": {
"version": "1.1.4", "version": "1.1.4",
"resolved": "https://registry.npmjs.org/@types/methods/-/methods-1.1.4.tgz", "resolved": "https://registry.npmjs.org/@types/methods/-/methods-1.1.4.tgz",
@ -5426,6 +5447,19 @@
"devOptional": true, "devOptional": true,
"license": "MIT" "license": "MIT"
}, },
"node_modules/cron": {
"version": "4.3.0",
"resolved": "https://registry.npmjs.org/cron/-/cron-4.3.0.tgz",
"integrity": "sha512-ciiYNLfSlF9MrDqnbMdRWFiA6oizSF7kA1osPP9lRzNu0Uu+AWog1UKy7SkckiDY2irrNjeO6qLyKnXC8oxmrw==",
"license": "MIT",
"dependencies": {
"@types/luxon": "~3.6.0",
"luxon": "~3.6.0"
},
"engines": {
"node": ">=18.x"
}
},
"node_modules/cross-spawn": { "node_modules/cross-spawn": {
"version": "7.0.6", "version": "7.0.6",
"resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.6.tgz", "resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.6.tgz",
@ -9777,6 +9811,15 @@
"yallist": "^3.0.2" "yallist": "^3.0.2"
} }
}, },
"node_modules/luxon": {
"version": "3.6.1",
"resolved": "https://registry.npmjs.org/luxon/-/luxon-3.6.1.tgz",
"integrity": "sha512-tJLxrKJhO2ukZ5z0gyjY1zPh3Rh88Ej9P7jNrZiHMUXHae1yvI2imgOZtL1TO8TW6biMMKfTtAOoEJANgtWBMQ==",
"license": "MIT",
"engines": {
"node": ">=12"
}
},
"node_modules/magic-string": { "node_modules/magic-string": {
"version": "0.30.8", "version": "0.30.8",
"resolved": "https://registry.npmjs.org/magic-string/-/magic-string-0.30.8.tgz", "resolved": "https://registry.npmjs.org/magic-string/-/magic-string-0.30.8.tgz",
@ -10142,6 +10185,27 @@
"node": "^18 || ^20 || >= 21" "node": "^18 || ^20 || >= 21"
} }
}, },
"node_modules/node-cache": {
"version": "5.1.2",
"resolved": "https://registry.npmjs.org/node-cache/-/node-cache-5.1.2.tgz",
"integrity": "sha512-t1QzWwnk4sjLWaQAS8CHgOJ+RAfmHpxFWmc36IWTiWHQfs0w5JDMBS1b1ZxQteo0vVVuWJvIUKHDkkeK7vIGCg==",
"license": "MIT",
"dependencies": {
"clone": "2.x"
},
"engines": {
"node": ">= 8.0.0"
}
},
"node_modules/node-cache/node_modules/clone": {
"version": "2.1.2",
"resolved": "https://registry.npmjs.org/clone/-/clone-2.1.2.tgz",
"integrity": "sha512-3Pe/CF1Nn94hyhIYpjtiLhdCoEoz0DqQ+988E9gmeEdQZlojxnOb74wctFyuwWQHzqyf9X7C7MG8juUpqBJT8w==",
"license": "MIT",
"engines": {
"node": ">=0.8"
}
},
"node_modules/node-emoji": { "node_modules/node-emoji": {
"version": "1.11.0", "version": "1.11.0",
"resolved": "https://registry.npmjs.org/node-emoji/-/node-emoji-1.11.0.tgz", "resolved": "https://registry.npmjs.org/node-emoji/-/node-emoji-1.11.0.tgz",

View File

@ -30,6 +30,7 @@
"@nestjs/jwt": "^10.2.0", "@nestjs/jwt": "^10.2.0",
"@nestjs/passport": "^10.0.3", "@nestjs/passport": "^10.0.3",
"@nestjs/platform-express": "^10.0.0", "@nestjs/platform-express": "^10.0.0",
"@nestjs/schedule": "^6.0.0",
"@nestjs/swagger": "^7.3.0", "@nestjs/swagger": "^7.3.0",
"@nestjs/terminus": "^11.0.0", "@nestjs/terminus": "^11.0.0",
"@nestjs/throttler": "^6.4.0", "@nestjs/throttler": "^6.4.0",
@ -50,6 +51,7 @@
"ioredis": "^5.3.2", "ioredis": "^5.3.2",
"morgan": "^1.10.0", "morgan": "^1.10.0",
"nest-winston": "^1.10.2", "nest-winston": "^1.10.2",
"node-cache": "^5.1.2",
"nodemailer": "^6.9.10", "nodemailer": "^6.9.10",
"onesignal-node": "^3.4.0", "onesignal-node": "^3.4.0",
"passport-jwt": "^4.0.1", "passport-jwt": "^4.0.1",

View File

@ -42,13 +42,15 @@ import { winstonLoggerOptions } from '../libs/common/src/logger/services/winston
import { AqiModule } from './aqi/aqi.module'; import { AqiModule } from './aqi/aqi.module';
import { OccupancyModule } from './occupancy/occupancy.module'; import { OccupancyModule } from './occupancy/occupancy.module';
import { WeatherModule } from './weather/weather.module'; import { WeatherModule } from './weather/weather.module';
import { ScheduleModule as NestScheduleModule } from '@nestjs/schedule';
import { SchedulerModule } from './scheduler/scheduler.module';
@Module({ @Module({
imports: [ imports: [
ConfigModule.forRoot({ ConfigModule.forRoot({
load: config, load: config,
}), }),
ThrottlerModule.forRoot({ ThrottlerModule.forRoot({
throttlers: [{ ttl: 60000, limit: 30 }], throttlers: [{ ttl: 60000, limit: 100 }],
generateKey: (context) => { generateKey: (context) => {
const req = context.switchToHttp().getRequest(); const req = context.switchToHttp().getRequest();
console.log('Real IP:', req.headers['x-forwarded-for']); console.log('Real IP:', req.headers['x-forwarded-for']);
@ -94,6 +96,8 @@ import { WeatherModule } from './weather/weather.module';
OccupancyModule, OccupancyModule,
WeatherModule, WeatherModule,
AqiModule, AqiModule,
SchedulerModule,
NestScheduleModule.forRoot(),
], ],
providers: [ providers: [
{ {

View File

@ -30,6 +30,8 @@ import { PowerClampService } from '@app/common/helper/services/power.clamp.servi
import { SqlLoaderService } from '@app/common/helper/services/sql-loader.service'; import { SqlLoaderService } from '@app/common/helper/services/sql-loader.service';
import { OccupancyService } from '@app/common/helper/services/occupancy.service'; import { OccupancyService } from '@app/common/helper/services/occupancy.service';
import { AqiDataService } from '@app/common/helper/services/aqi.data.service'; import { AqiDataService } from '@app/common/helper/services/aqi.data.service';
import { PresenceSensorDailySpaceRepository } from '@app/common/modules/presence-sensor/repositories';
import { AqiSpaceDailyPollutantStatsRepository } from '@app/common/modules/aqi/repositories';
@Module({ @Module({
imports: [ConfigModule, SpaceRepositoryModule], imports: [ConfigModule, SpaceRepositoryModule],
@ -59,6 +61,8 @@ import { AqiDataService } from '@app/common/helper/services/aqi.data.service';
SqlLoaderService, SqlLoaderService,
OccupancyService, OccupancyService,
AqiDataService, AqiDataService,
PresenceSensorDailySpaceRepository,
AqiSpaceDailyPollutantStatsRepository,
], ],
exports: [], exports: [],
}) })

View File

@ -64,6 +64,8 @@ import {
import { SqlLoaderService } from '@app/common/helper/services/sql-loader.service'; import { SqlLoaderService } from '@app/common/helper/services/sql-loader.service';
import { OccupancyService } from '@app/common/helper/services/occupancy.service'; import { OccupancyService } from '@app/common/helper/services/occupancy.service';
import { AqiDataService } from '@app/common/helper/services/aqi.data.service'; import { AqiDataService } from '@app/common/helper/services/aqi.data.service';
import { PresenceSensorDailySpaceRepository } from '@app/common/modules/presence-sensor/repositories';
import { AqiSpaceDailyPollutantStatsRepository } from '@app/common/modules/aqi/repositories';
@Module({ @Module({
imports: [ConfigModule, SpaceRepositoryModule, UserRepositoryModule], imports: [ConfigModule, SpaceRepositoryModule, UserRepositoryModule],
@ -118,6 +120,8 @@ import { AqiDataService } from '@app/common/helper/services/aqi.data.service';
SqlLoaderService, SqlLoaderService,
OccupancyService, OccupancyService,
AqiDataService, AqiDataService,
PresenceSensorDailySpaceRepository,
AqiSpaceDailyPollutantStatsRepository,
], ],
exports: [CommunityService, SpacePermissionService], exports: [CommunityService, SpacePermissionService],
}) })

View File

@ -30,6 +30,8 @@ import { SqlLoaderService } from '@app/common/helper/services/sql-loader.service
import { OccupancyService } from '@app/common/helper/services/occupancy.service'; import { OccupancyService } from '@app/common/helper/services/occupancy.service';
import { CommunityRepository } from '@app/common/modules/community/repositories'; import { CommunityRepository } from '@app/common/modules/community/repositories';
import { AqiDataService } from '@app/common/helper/services/aqi.data.service'; import { AqiDataService } from '@app/common/helper/services/aqi.data.service';
import { PresenceSensorDailySpaceRepository } from '@app/common/modules/presence-sensor/repositories';
import { AqiSpaceDailyPollutantStatsRepository } from '@app/common/modules/aqi/repositories';
@Module({ @Module({
imports: [ConfigModule, DeviceRepositoryModule], imports: [ConfigModule, DeviceRepositoryModule],
controllers: [DoorLockController], controllers: [DoorLockController],
@ -58,6 +60,8 @@ import { AqiDataService } from '@app/common/helper/services/aqi.data.service';
OccupancyService, OccupancyService,
CommunityRepository, CommunityRepository,
AqiDataService, AqiDataService,
PresenceSensorDailySpaceRepository,
AqiSpaceDailyPollutantStatsRepository,
], ],
exports: [DoorLockService], exports: [DoorLockService],
}) })

View File

@ -28,6 +28,8 @@ import { SqlLoaderService } from '@app/common/helper/services/sql-loader.service
import { OccupancyService } from '@app/common/helper/services/occupancy.service'; import { OccupancyService } from '@app/common/helper/services/occupancy.service';
import { CommunityRepository } from '@app/common/modules/community/repositories'; import { CommunityRepository } from '@app/common/modules/community/repositories';
import { AqiDataService } from '@app/common/helper/services/aqi.data.service'; import { AqiDataService } from '@app/common/helper/services/aqi.data.service';
import { PresenceSensorDailySpaceRepository } from '@app/common/modules/presence-sensor/repositories';
import { AqiSpaceDailyPollutantStatsRepository } from '@app/common/modules/aqi/repositories';
@Module({ @Module({
imports: [ConfigModule, DeviceRepositoryModule], imports: [ConfigModule, DeviceRepositoryModule],
controllers: [GroupController], controllers: [GroupController],
@ -55,6 +57,8 @@ import { AqiDataService } from '@app/common/helper/services/aqi.data.service';
OccupancyService, OccupancyService,
CommunityRepository, CommunityRepository,
AqiDataService, AqiDataService,
PresenceSensorDailySpaceRepository,
AqiSpaceDailyPollutantStatsRepository,
], ],
exports: [GroupService], exports: [GroupService],
}) })

View File

@ -82,6 +82,8 @@ import { SubspaceProductAllocationService } from 'src/space/services/subspace/su
import { TagService as NewTagService } from 'src/tags/services'; import { TagService as NewTagService } from 'src/tags/services';
import { UserDevicePermissionService } from 'src/user-device-permission/services'; import { UserDevicePermissionService } from 'src/user-device-permission/services';
import { UserService, UserSpaceService } from 'src/users/services'; import { UserService, UserSpaceService } from 'src/users/services';
import { PresenceSensorDailySpaceRepository } from '@app/common/modules/presence-sensor/repositories';
import { AqiSpaceDailyPollutantStatsRepository } from '@app/common/modules/aqi/repositories';
@Module({ @Module({
imports: [ConfigModule, InviteUserRepositoryModule, CommunityModule], imports: [ConfigModule, InviteUserRepositoryModule, CommunityModule],
@ -150,6 +152,8 @@ import { UserService, UserSpaceService } from 'src/users/services';
SqlLoaderService, SqlLoaderService,
OccupancyService, OccupancyService,
AqiDataService, AqiDataService,
PresenceSensorDailySpaceRepository,
AqiSpaceDailyPollutantStatsRepository,
], ],
exports: [InviteUserService], exports: [InviteUserService],
}) })

View File

@ -60,6 +60,8 @@ import { SubspaceProductAllocationService } from 'src/space/services/subspace/su
import { TagService } from 'src/tags/services'; import { TagService } from 'src/tags/services';
import { PowerClampController } from './controllers'; import { PowerClampController } from './controllers';
import { PowerClampService as PowerClamp } from './services/power-clamp.service'; import { PowerClampService as PowerClamp } from './services/power-clamp.service';
import { PresenceSensorDailySpaceRepository } from '@app/common/modules/presence-sensor/repositories';
import { AqiSpaceDailyPollutantStatsRepository } from '@app/common/modules/aqi/repositories';
@Module({ @Module({
imports: [ConfigModule], imports: [ConfigModule],
controllers: [PowerClampController], controllers: [PowerClampController],
@ -109,6 +111,8 @@ import { PowerClampService as PowerClamp } from './services/power-clamp.service'
SubspaceModelProductAllocationRepoitory, SubspaceModelProductAllocationRepoitory,
OccupancyService, OccupancyService,
AqiDataService, AqiDataService,
PresenceSensorDailySpaceRepository,
AqiSpaceDailyPollutantStatsRepository,
], ],
exports: [PowerClamp], exports: [PowerClamp],
}) })

View File

@ -67,6 +67,8 @@ import { ProjectUserController } from './controllers/project-user.controller';
import { CreateOrphanSpaceHandler } from './handler'; import { CreateOrphanSpaceHandler } from './handler';
import { ProjectService } from './services'; import { ProjectService } from './services';
import { ProjectUserService } from './services/project-user.service'; import { ProjectUserService } from './services/project-user.service';
import { PresenceSensorDailySpaceRepository } from '@app/common/modules/presence-sensor/repositories';
import { AqiSpaceDailyPollutantStatsRepository } from '@app/common/modules/aqi/repositories';
const CommandHandlers = [CreateOrphanSpaceHandler]; const CommandHandlers = [CreateOrphanSpaceHandler];
@ -124,6 +126,8 @@ const CommandHandlers = [CreateOrphanSpaceHandler];
SqlLoaderService, SqlLoaderService,
OccupancyService, OccupancyService,
AqiDataService, AqiDataService,
PresenceSensorDailySpaceRepository,
AqiSpaceDailyPollutantStatsRepository,
], ],
exports: [ProjectService, CqrsModule], exports: [ProjectService, CqrsModule],
}) })

View File

@ -0,0 +1,25 @@
import { DatabaseModule } from '@app/common/database/database.module';
import { SqlLoaderService } from '@app/common/helper/services/sql-loader.service';
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { SchedulerService } from './scheduler.service';
import { ScheduleModule as NestScheduleModule } from '@nestjs/schedule';
import { AqiDataService } from '@app/common/helper/services/aqi.data.service';
import { OccupancyService } from '@app/common/helper/services/occupancy.service';
import { PowerClampService } from '@app/common/helper/services/power.clamp.service';
@Module({
imports: [
NestScheduleModule.forRoot(),
TypeOrmModule.forFeature([]),
DatabaseModule,
],
providers: [
SchedulerService,
SqlLoaderService,
PowerClampService,
OccupancyService,
AqiDataService,
],
})
export class SchedulerModule {}

View File

@ -0,0 +1,92 @@
import { AqiDataService } from '@app/common/helper/services/aqi.data.service';
import { Injectable } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { OccupancyService } from '@app/common/helper/services/occupancy.service';
import { PowerClampService } from '@app/common/helper/services/power.clamp.service';
@Injectable()
export class SchedulerService {
constructor(
private readonly powerClampService: PowerClampService,
private readonly occupancyService: OccupancyService,
private readonly aqiDataService: AqiDataService,
) {
console.log('SchedulerService initialized!');
}
@Cron(CronExpression.EVERY_HOUR)
async runHourlyProcedures() {
console.log('\n======== Starting Procedures ========');
console.log(new Date().toISOString(), 'Scheduler running...');
try {
const results = await Promise.allSettled([
this.executeTask(
() => this.powerClampService.updateEnergyConsumedHistoricalData(),
'Energy Consumption',
),
this.executeTask(
() => this.occupancyService.updateOccupancyDataProcedures(),
'Occupancy Data',
),
this.executeTask(
() => this.aqiDataService.updateAQISensorHistoricalData(),
'AQI Data',
),
]);
this.logResults(results);
} catch (error) {
console.error('MAIN SCHEDULER ERROR:', error);
if (error.stack) {
console.error('Error stack:', error.stack);
}
}
}
private async executeTask(
task: () => Promise<void>,
name: string,
): Promise<{ name: string; status: string }> {
try {
console.log(`[${new Date().toISOString()}] Starting ${name} task...`);
await task();
console.log(
`[${new Date().toISOString()}] ${name} task completed successfully`,
);
return { name, status: 'success' };
} catch (error) {
console.error(
`[${new Date().toISOString()}] ${name} task failed:`,
error.message,
);
if (error.stack) {
console.error('Task error stack:', error.stack);
}
return { name, status: 'failed' };
}
}
private logResults(results: PromiseSettledResult<any>[]) {
const successCount = results.filter((r) => r.status === 'fulfilled').length;
const failedCount = results.length - successCount;
console.log('\n======== Task Results ========');
console.log(`Successful tasks: ${successCount}`);
console.log(`Failed tasks: ${failedCount}`);
if (failedCount > 0) {
console.log('\n======== Failed Tasks Details ========');
results.forEach((result, index) => {
if (result.status === 'rejected') {
console.error(`Task ${index + 1} failed:`, result.reason);
if (result.reason.stack) {
console.error('Error stack:', result.reason.stack);
}
}
});
}
console.log('\n======== Scheduler Completed ========\n');
}
}

View File

@ -63,6 +63,8 @@ import {
import { SpaceModelService, SubSpaceModelService } from './services'; import { SpaceModelService, SubSpaceModelService } from './services';
import { SpaceModelProductAllocationService } from './services/space-model-product-allocation.service'; import { SpaceModelProductAllocationService } from './services/space-model-product-allocation.service';
import { SubspaceModelProductAllocationService } from './services/subspace/subspace-model-product-allocation.service'; import { SubspaceModelProductAllocationService } from './services/subspace/subspace-model-product-allocation.service';
import { PresenceSensorDailySpaceRepository } from '@app/common/modules/presence-sensor/repositories';
import { AqiSpaceDailyPollutantStatsRepository } from '@app/common/modules/aqi/repositories';
const CommandHandlers = [ const CommandHandlers = [
PropogateUpdateSpaceModelHandler, PropogateUpdateSpaceModelHandler,
@ -120,6 +122,8 @@ const CommandHandlers = [
SqlLoaderService, SqlLoaderService,
OccupancyService, OccupancyService,
AqiDataService, AqiDataService,
PresenceSensorDailySpaceRepository,
AqiSpaceDailyPollutantStatsRepository,
], ],
exports: [CqrsModule, SpaceModelService], exports: [CqrsModule, SpaceModelService],
}) })

View File

@ -88,6 +88,8 @@ import {
} from './services'; } from './services';
import { SpaceProductAllocationService } from './services/space-product-allocation.service'; import { SpaceProductAllocationService } from './services/space-product-allocation.service';
import { SubspaceProductAllocationService } from './services/subspace/subspace-product-allocation.service'; import { SubspaceProductAllocationService } from './services/subspace/subspace-product-allocation.service';
import { PresenceSensorDailySpaceRepository } from '@app/common/modules/presence-sensor/repositories';
import { AqiSpaceDailyPollutantStatsRepository } from '@app/common/modules/aqi/repositories';
export const CommandHandlers = [DisableSpaceHandler]; export const CommandHandlers = [DisableSpaceHandler];
@ -161,6 +163,8 @@ export const CommandHandlers = [DisableSpaceHandler];
SqlLoaderService, SqlLoaderService,
OccupancyService, OccupancyService,
AqiDataService, AqiDataService,
PresenceSensorDailySpaceRepository,
AqiSpaceDailyPollutantStatsRepository,
], ],
exports: [SpaceService], exports: [SpaceService],
}) })

View File

@ -32,6 +32,8 @@ import { SqlLoaderService } from '@app/common/helper/services/sql-loader.service
import { OccupancyService } from '@app/common/helper/services/occupancy.service'; import { OccupancyService } from '@app/common/helper/services/occupancy.service';
import { CommunityRepository } from '@app/common/modules/community/repositories'; import { CommunityRepository } from '@app/common/modules/community/repositories';
import { AqiDataService } from '@app/common/helper/services/aqi.data.service'; import { AqiDataService } from '@app/common/helper/services/aqi.data.service';
import { PresenceSensorDailySpaceRepository } from '@app/common/modules/presence-sensor/repositories';
import { AqiSpaceDailyPollutantStatsRepository } from '@app/common/modules/aqi/repositories';
@Module({ @Module({
imports: [ConfigModule, DeviceRepositoryModule, DoorLockModule], imports: [ConfigModule, DeviceRepositoryModule, DoorLockModule],
controllers: [VisitorPasswordController], controllers: [VisitorPasswordController],
@ -61,6 +63,8 @@ import { AqiDataService } from '@app/common/helper/services/aqi.data.service';
OccupancyService, OccupancyService,
CommunityRepository, CommunityRepository,
AqiDataService, AqiDataService,
PresenceSensorDailySpaceRepository,
AqiSpaceDailyPollutantStatsRepository,
], ],
exports: [VisitorPasswordService], exports: [VisitorPasswordService],
}) })