Merge branch 'dev' into SP-1754-be-implement-configure-space

This commit is contained in:
faris Aljohari
2025-07-02 02:25:43 -06:00
43 changed files with 1132 additions and 726 deletions

View File

@ -8,7 +8,10 @@ import { TuyaWebSocketService } from './services/tuya.web.socket.service';
import { OneSignalService } from './services/onesignal.service';
import { DeviceMessagesService } from './services/device.messages.service';
import { DeviceRepositoryModule } from '../modules/device/device.repository.module';
import { DeviceNotificationRepository } from '../modules/device/repositories';
import {
DeviceNotificationRepository,
DeviceRepository,
} from '../modules/device/repositories';
import { DeviceStatusFirebaseModule } from '../firebase/devices-status/devices-status.module';
import { CommunityPermissionService } from './services/community.permission.service';
import { CommunityRepository } from '../modules/community/repositories';
@ -27,6 +30,7 @@ import { SosHandlerService } from './services/sos.handler.service';
DeviceNotificationRepository,
CommunityRepository,
SosHandlerService,
DeviceRepository,
],
exports: [
HelperHashService,

View File

@ -1,44 +1,63 @@
import { DeviceRepository } from '@app/common/modules/device/repositories';
import { Injectable } from '@nestjs/common';
import { SqlLoaderService } from './sql-loader.service';
import { DataSource } from 'typeorm';
import { SQL_PROCEDURES_PATH } from '@app/common/constants/sql-query-path';
import { SqlLoaderService } from './sql-loader.service';
@Injectable()
export class AqiDataService {
constructor(
private readonly sqlLoader: SqlLoaderService,
private readonly dataSource: DataSource,
private readonly deviceRepository: DeviceRepository,
) {}
async updateAQISensorHistoricalData(deviceUuid: string): Promise<void> {
try {
const now = new Date();
const dateStr = now.toLocaleDateString('en-CA'); // YYYY-MM-DD
const device = await this.deviceRepository.findOne({
where: { uuid: deviceUuid },
relations: ['spaceDevice'],
});
await this.executeProcedure(
'fact_daily_space_aqi',
'proceduce_update_daily_space_aqi',
[dateStr, device.spaceDevice?.uuid],
);
async updateAQISensorHistoricalData(): Promise<void> {
try {
const { dateStr } = this.getFormattedDates();
// Execute all procedures in parallel
await Promise.all([
this.executeProcedureWithRetry(
'proceduce_update_daily_space_aqi',
[dateStr],
'fact_daily_space_aqi',
),
]);
} catch (err) {
console.error('Failed to insert or update aqi data:', err);
console.error('Failed to update AQI sensor historical data:', err);
throw err;
}
}
private async executeProcedure(
procedureFolderName: string,
private getFormattedDates(): { dateStr: string } {
const now = new Date();
return {
dateStr: now.toLocaleDateString('en-CA'), // YYYY-MM-DD
};
}
private async executeProcedureWithRetry(
procedureFileName: string,
params: (string | number | null)[],
folderName: string,
retries = 3,
): Promise<void> {
const query = this.loadQuery(procedureFolderName, procedureFileName);
await this.dataSource.query(query, params);
console.log(`Procedure ${procedureFileName} executed successfully.`);
try {
const query = this.loadQuery(folderName, procedureFileName);
await this.dataSource.query(query, params);
console.log(`Procedure ${procedureFileName} executed successfully.`);
} catch (err) {
if (retries > 0) {
const delayMs = 1000 * (4 - retries); // Exponential backoff
console.warn(`Retrying ${procedureFileName} (${retries} retries left)`);
await new Promise((resolve) => setTimeout(resolve, delayMs));
return this.executeProcedureWithRetry(
procedureFileName,
params,
folderName,
retries - 1,
);
}
console.error(`Failed to execute ${procedureFileName}:`, err);
throw err;
}
}
private loadQuery(folderName: string, fileName: string): string {

View File

@ -1,65 +1,68 @@
import { DeviceRepository } from '@app/common/modules/device/repositories';
import { Injectable } from '@nestjs/common';
import { SqlLoaderService } from './sql-loader.service';
import { DataSource } from 'typeorm';
import { SQL_PROCEDURES_PATH } from '@app/common/constants/sql-query-path';
import { SqlLoaderService } from './sql-loader.service';
@Injectable()
export class OccupancyService {
constructor(
private readonly sqlLoader: SqlLoaderService,
private readonly dataSource: DataSource,
private readonly deviceRepository: DeviceRepository,
) {}
async updateOccupancySensorHistoricalDurationData(
deviceUuid: string,
): Promise<void> {
try {
const now = new Date();
const dateStr = now.toLocaleDateString('en-CA'); // YYYY-MM-DD
const device = await this.deviceRepository.findOne({
where: { uuid: deviceUuid },
relations: ['spaceDevice'],
});
await this.executeProcedure(
'fact_daily_space_occupancy_duration',
'procedure_update_daily_space_occupancy_duration',
[dateStr, device.spaceDevice?.uuid],
);
async updateOccupancyDataProcedures(): Promise<void> {
try {
const { dateStr } = this.getFormattedDates();
// Execute all procedures in parallel
await Promise.all([
this.executeProcedureWithRetry(
'procedure_update_fact_space_occupancy',
[dateStr],
'fact_space_occupancy_count',
),
this.executeProcedureWithRetry(
'procedure_update_daily_space_occupancy_duration',
[dateStr],
'fact_daily_space_occupancy_duration',
),
]);
} catch (err) {
console.error('Failed to insert or update occupancy duration data:', err);
console.error('Failed to update occupancy data:', err);
throw err;
}
}
async updateOccupancySensorHistoricalData(deviceUuid: string): Promise<void> {
try {
const now = new Date();
const dateStr = now.toLocaleDateString('en-CA'); // YYYY-MM-DD
const device = await this.deviceRepository.findOne({
where: { uuid: deviceUuid },
relations: ['spaceDevice'],
});
await this.executeProcedure(
'fact_space_occupancy_count',
'procedure_update_fact_space_occupancy',
[dateStr, device.spaceDevice?.uuid],
);
} catch (err) {
console.error('Failed to insert or update occupancy data:', err);
throw err;
}
private getFormattedDates(): { dateStr: string } {
const now = new Date();
return {
dateStr: now.toLocaleDateString('en-CA'), // YYYY-MM-DD
};
}
private async executeProcedure(
procedureFolderName: string,
private async executeProcedureWithRetry(
procedureFileName: string,
params: (string | number | null)[],
folderName: string,
retries = 3,
): Promise<void> {
const query = this.loadQuery(procedureFolderName, procedureFileName);
await this.dataSource.query(query, params);
console.log(`Procedure ${procedureFileName} executed successfully.`);
try {
const query = this.loadQuery(folderName, procedureFileName);
await this.dataSource.query(query, params);
console.log(`Procedure ${procedureFileName} executed successfully.`);
} catch (err) {
if (retries > 0) {
const delayMs = 1000 * (4 - retries); // Exponential backoff
console.warn(`Retrying ${procedureFileName} (${retries} retries left)`);
await new Promise((resolve) => setTimeout(resolve, delayMs));
return this.executeProcedureWithRetry(
procedureFileName,
params,
folderName,
retries - 1,
);
}
console.error(`Failed to execute ${procedureFileName}:`, err);
throw err;
}
}
private loadQuery(folderName: string, fileName: string): string {

View File

@ -1,7 +1,7 @@
import { Injectable } from '@nestjs/common';
import { SqlLoaderService } from './sql-loader.service';
import { DataSource } from 'typeorm';
import { SQL_PROCEDURES_PATH } from '@app/common/constants/sql-query-path';
import { SqlLoaderService } from './sql-loader.service';
@Injectable()
export class PowerClampService {
@ -10,48 +10,72 @@ export class PowerClampService {
private readonly dataSource: DataSource,
) {}
async updateEnergyConsumedHistoricalData(deviceUuid: string): Promise<void> {
async updateEnergyConsumedHistoricalData(): Promise<void> {
try {
const now = new Date();
const dateStr = now.toLocaleDateString('en-CA'); // YYYY-MM-DD
const hour = now.getHours();
const monthYear = now
.toLocaleDateString('en-US', {
month: '2-digit',
year: 'numeric',
})
.replace('/', '-'); // MM-YYYY
const { dateStr, monthYear } = this.getFormattedDates();
await this.executeProcedure(
'fact_hourly_device_energy_consumed_procedure',
[deviceUuid, dateStr, hour],
);
await this.executeProcedure(
'fact_daily_device_energy_consumed_procedure',
[deviceUuid, dateStr],
);
await this.executeProcedure(
'fact_monthly_device_energy_consumed_procedure',
[deviceUuid, monthYear],
);
// Execute all procedures in parallel
await Promise.all([
this.executeProcedureWithRetry(
'fact_hourly_device_energy_consumed_procedure',
[dateStr],
'fact_device_energy_consumed',
),
this.executeProcedureWithRetry(
'fact_daily_device_energy_consumed_procedure',
[dateStr],
'fact_device_energy_consumed',
),
this.executeProcedureWithRetry(
'fact_monthly_device_energy_consumed_procedure',
[monthYear],
'fact_device_energy_consumed',
),
]);
} catch (err) {
console.error('Failed to insert or update energy data:', err);
console.error('Failed to update energy consumption data:', err);
throw err;
}
}
private async executeProcedure(
private getFormattedDates(): { dateStr: string; monthYear: string } {
const now = new Date();
return {
dateStr: now.toLocaleDateString('en-CA'), // YYYY-MM-DD
monthYear: now
.toLocaleDateString('en-US', {
month: '2-digit',
year: 'numeric',
})
.replace('/', '-'), // MM-YYYY
};
}
private async executeProcedureWithRetry(
procedureFileName: string,
params: (string | number | null)[],
folderName: string,
retries = 3,
): Promise<void> {
const query = this.loadQuery(
'fact_device_energy_consumed',
procedureFileName,
);
await this.dataSource.query(query, params);
console.log(`Procedure ${procedureFileName} executed successfully.`);
try {
const query = this.loadQuery(folderName, procedureFileName);
await this.dataSource.query(query, params);
console.log(`Procedure ${procedureFileName} executed successfully.`);
} catch (err) {
if (retries > 0) {
const delayMs = 1000 * (4 - retries); // Exponential backoff
console.warn(`Retrying ${procedureFileName} (${retries} retries left)`);
await new Promise((resolve) => setTimeout(resolve, delayMs));
return this.executeProcedureWithRetry(
procedureFileName,
params,
folderName,
retries - 1,
);
}
console.error(`Failed to execute ${procedureFileName}:`, err);
throw err;
}
}
private loadQuery(folderName: string, fileName: string): string {

View File

@ -16,21 +16,46 @@ export class SosHandlerService {
);
}
async handleSosEvent(devId: string, logData: any): Promise<void> {
async handleSosEventFirebase(device: any, logData: any): Promise<void> {
const sosTrueStatus = [{ code: 'sos', value: true }];
const sosFalseStatus = [{ code: 'sos', value: false }];
try {
// ✅ Send true status
await this.deviceStatusFirebaseService.addDeviceStatusToFirebase({
deviceTuyaUuid: devId,
status: [{ code: 'sos', value: true }],
deviceTuyaUuid: device.deviceTuyaUuid,
status: sosTrueStatus,
log: logData,
device,
});
await this.deviceStatusFirebaseService.addBatchDeviceStatusToOurDb([
{
deviceTuyaUuid: device.deviceTuyaUuid,
status: sosTrueStatus,
log: logData,
device,
},
]);
// ✅ Schedule false status
setTimeout(async () => {
try {
await this.deviceStatusFirebaseService.addDeviceStatusToFirebase({
deviceTuyaUuid: devId,
status: [{ code: 'sos', value: false }],
deviceTuyaUuid: device.deviceTuyaUuid,
status: sosFalseStatus,
log: logData,
device,
});
await this.deviceStatusFirebaseService.addBatchDeviceStatusToOurDb([
{
deviceTuyaUuid: device.deviceTuyaUuid,
status: sosFalseStatus,
log: logData,
device,
},
]);
} catch (err) {
this.logger.error('Failed to send SOS false value', err);
}

View File

@ -1,13 +1,24 @@
import { Injectable } from '@nestjs/common';
import { Injectable, OnModuleInit } from '@nestjs/common';
import TuyaWebsocket from '../../config/tuya-web-socket-config';
import { ConfigService } from '@nestjs/config';
import { DeviceStatusFirebaseService } from '@app/common/firebase/devices-status/services/devices-status.service';
import { SosHandlerService } from './sos.handler.service';
import * as NodeCache from 'node-cache';
@Injectable()
export class TuyaWebSocketService {
export class TuyaWebSocketService implements OnModuleInit {
private client: any;
private readonly isDevEnv: boolean;
private readonly deviceCache = new NodeCache({ stdTTL: 7200 }); // TTL = 2 hour
private messageQueue: {
devId: string;
status: any;
logData: any;
device: any;
}[] = [];
private isProcessing = false;
constructor(
private readonly configService: ConfigService,
@ -26,16 +37,36 @@ export class TuyaWebSocketService {
});
if (this.configService.get<string>('tuya-config.TRUN_ON_TUYA_SOCKET')) {
// Set up event handlers
this.setupEventHandlers();
// Start receiving messages
this.client.start();
}
// Run the queue processor every 15 seconds
setInterval(() => this.processQueue(), 15000);
// Refresh the cache every 1 hour
setInterval(() => this.initializeDeviceCache(), 30 * 60 * 1000); // 30 minutes
}
async onModuleInit() {
await this.initializeDeviceCache();
}
private async initializeDeviceCache() {
try {
const allDevices = await this.deviceStatusFirebaseService.getAllDevices();
allDevices.forEach((device) => {
if (device.deviceTuyaUuid) {
this.deviceCache.set(device.deviceTuyaUuid, device);
}
});
console.log(`✅ Refreshed cache with ${allDevices.length} devices.`);
} catch (error) {
console.error('❌ Failed to initialize device cache:', error);
}
}
private setupEventHandlers() {
// Event handlers
this.client.open(() => {
console.log('open');
});
@ -43,23 +74,38 @@ export class TuyaWebSocketService {
this.client.message(async (ws: WebSocket, message: any) => {
try {
const { devId, status, logData } = this.extractMessageData(message);
if (!Array.isArray(logData?.properties)) {
this.client.ackMessage(message.messageId);
return;
}
const device = this.deviceCache.get(devId);
if (!device) {
// console.log(⛔ Unknown device: ${devId}, message ignored.);
this.client.ackMessage(message.messageId);
return;
}
if (this.sosHandlerService.isSosTriggered(status)) {
await this.sosHandlerService.handleSosEvent(devId, logData);
await this.sosHandlerService.handleSosEventFirebase(devId, logData);
} else {
await this.deviceStatusFirebaseService.addDeviceStatusToFirebase({
deviceTuyaUuid: devId,
status: status,
status,
log: logData,
device,
});
}
// Push to internal queue
this.messageQueue.push({ devId, status, logData, device });
// Acknowledge the message
this.client.ackMessage(message.messageId);
} catch (error) {
console.error('Error processing message:', error);
console.error('Error receiving message:', error);
}
});
this.client.reconnect(() => {
console.log('reconnect');
});
@ -80,6 +126,37 @@ export class TuyaWebSocketService {
console.error('WebSocket error:', error);
});
}
private async processQueue() {
if (this.isProcessing) {
console.log('⏳ Skipping: still processing previous batch');
return;
}
if (this.messageQueue.length === 0) return;
this.isProcessing = true;
const batch = [...this.messageQueue];
this.messageQueue = [];
console.log(`🔁 Processing batch of size: ${batch.length}`);
try {
await this.deviceStatusFirebaseService.addBatchDeviceStatusToOurDb(
batch.map((item) => ({
deviceTuyaUuid: item.devId,
status: item.status,
log: item.logData,
device: item.device,
})),
);
} catch (error) {
console.error('❌ Error processing batch:', error);
this.messageQueue.unshift(...batch); // retry
} finally {
this.isProcessing = false;
}
}
private extractMessageData(message: any): {
devId: string;
status: any;