From 43ab0030f0f70338363edfd01d5dacbbf76239c1 Mon Sep 17 00:00:00 2001 From: faris Aljohari <83524184+farisaljohari@users.noreply.github.com> Date: Wed, 25 Jun 2025 03:20:12 -0600 Subject: [PATCH] refactor: clean up unused services and optimize batch processing in DeviceStatusFirebaseService --- .../devices-status/devices-status.module.ts | 16 -- .../services/devices-status.service.ts | 166 ++++++++---------- .../services/tuya.web.socket.service.ts | 40 ++--- 3 files changed, 95 insertions(+), 127 deletions(-) diff --git a/libs/common/src/firebase/devices-status/devices-status.module.ts b/libs/common/src/firebase/devices-status/devices-status.module.ts index 52f6123..54d5cfa 100644 --- a/libs/common/src/firebase/devices-status/devices-status.module.ts +++ b/libs/common/src/firebase/devices-status/devices-status.module.ts @@ -3,28 +3,12 @@ import { DeviceStatusFirebaseController } from './controllers/devices-status.con import { DeviceStatusFirebaseService } from './services/devices-status.service'; import { DeviceRepository } from '@app/common/modules/device/repositories'; import { DeviceStatusLogRepository } from '@app/common/modules/device-status-log/repositories/device-status.repository'; -import { PowerClampService } from '@app/common/helper/services/power.clamp.service'; -import { - PowerClampHourlyRepository, - PowerClampDailyRepository, - PowerClampMonthlyRepository, -} from '@app/common/modules/power-clamp/repositories'; -import { SqlLoaderService } from '@app/common/helper/services/sql-loader.service'; -import { OccupancyService } from '@app/common/helper/services/occupancy.service'; -import { AqiDataService } from '@app/common/helper/services/aqi.data.service'; @Module({ providers: [ DeviceStatusFirebaseService, DeviceRepository, DeviceStatusLogRepository, - PowerClampService, - PowerClampHourlyRepository, - PowerClampDailyRepository, - PowerClampMonthlyRepository, - SqlLoaderService, - OccupancyService, - AqiDataService, ], controllers: [DeviceStatusFirebaseController], exports: [DeviceStatusFirebaseService, DeviceStatusLogRepository], diff --git a/libs/common/src/firebase/devices-status/services/devices-status.service.ts b/libs/common/src/firebase/devices-status/services/devices-status.service.ts index 695022b..3fae855 100644 --- a/libs/common/src/firebase/devices-status/services/devices-status.service.ts +++ b/libs/common/src/firebase/devices-status/services/devices-status.service.ts @@ -18,12 +18,6 @@ import { runTransaction, } from 'firebase/database'; import { DeviceStatusLogRepository } from '@app/common/modules/device-status-log/repositories'; -import { ProductType } from '@app/common/constants/product-type.enum'; -import { PowerClampService } from '@app/common/helper/services/power.clamp.service'; -import { PowerClampEnergyEnum } from '@app/common/constants/power.clamp.enargy.enum'; -import { PresenceSensorEnum } from '@app/common/constants/presence.sensor.enum'; -import { OccupancyService } from '@app/common/helper/services/occupancy.service'; -import { AqiDataService } from '@app/common/helper/services/aqi.data.service'; @Injectable() export class DeviceStatusFirebaseService { private tuya: TuyaContext; @@ -31,9 +25,6 @@ export class DeviceStatusFirebaseService { constructor( private readonly configService: ConfigService, private readonly deviceRepository: DeviceRepository, - private readonly powerClampService: PowerClampService, - private readonly occupancyService: OccupancyService, - private readonly aqiDataService: AqiDataService, private deviceStatusLogRepository: DeviceStatusLogRepository, ) { const accessKey = this.configService.get('auth-config.ACCESS_KEY'); @@ -76,28 +67,85 @@ export class DeviceStatusFirebaseService { ); } } - async addDeviceStatusToOurDb( - addDeviceStatusDto: AddDeviceStatusDto, - ): Promise { - try { - const device = await this.getDeviceByDeviceTuyaUuid( - addDeviceStatusDto.deviceTuyaUuid, - ); + async addBatchDeviceStatusToOurDb( + batch: { deviceTuyaUuid: string; status: any; log: any }[], + ): Promise { + const allLogs = []; + const deviceMap = new Map(); - if (device?.uuid) { - return await this.createDeviceStatusInOurDb({ - deviceUuid: device.uuid, - ...addDeviceStatusDto, - productType: device.productDevice.prodType, - }); - } - // Return null if device not found or no UUID - return null; - } catch (error) { - // Handle the error silently, perhaps log it internally or ignore it - return null; + console.log( + `🧠 Starting device lookups for batch of ${batch.length} items...`, + ); + + // Step 1: Parallel device fetching + await Promise.all( + batch.map(async (item) => { + if (!deviceMap.has(item.deviceTuyaUuid)) { + const device = await this.getDeviceByDeviceTuyaUuid( + item.deviceTuyaUuid, + ); + device?.uuid && deviceMap.set(item.deviceTuyaUuid, device); + } + }), + ); + + console.log(`🔍 Found ${deviceMap.size} devices from batch`); + + // Step 2: Prepare logs and updates + for (const item of batch) { + const device = deviceMap.get(item.deviceTuyaUuid); + if (!device?.uuid) continue; + + const logs = item.log.properties.map((property) => + this.deviceStatusLogRepository.create({ + deviceId: device.uuid, + deviceTuyaId: item.deviceTuyaUuid, + productId: item.log.productId, + log: item.log, + code: property.code, + value: property.value, + eventId: item.log.dataId, + eventTime: new Date(property.time).toISOString(), + }), + ); + allLogs.push(...logs); } + + console.log(`📝 Total logs to insert: ${allLogs.length}`); + // Step 3: Insert logs in chunks with ON CONFLICT DO NOTHING + const insertLogsPromise = (async () => { + const chunkSize = 300; + let insertedCount = 0; + + for (let i = 0; i < allLogs.length; i += chunkSize) { + const chunk = allLogs.slice(i, i + chunkSize); + try { + const result = await this.deviceStatusLogRepository + .createQueryBuilder() + .insert() + .into('device-status-log') // or use DeviceStatusLogEntity + .values(chunk) + .orIgnore() // skip duplicates + .execute(); + + insertedCount += result.identifiers.length; + console.log( + `✅ Inserted ${result.identifiers.length} / ${chunk.length} logs (chunk)`, + ); + } catch (error) { + console.error('❌ Insert error (skipped chunk):', error.message); + } + } + + console.log( + `✅ Total logs inserted: ${insertedCount} / ${allLogs.length}`, + ); + })(); + + // Step 5: Wait for both insert and post-processing to finish + await Promise.all([insertLogsPromise]); } + async addDeviceStatusToFirebase( addDeviceStatusDto: AddDeviceStatusDto, ): Promise { @@ -237,66 +285,4 @@ export class DeviceStatusFirebaseService { const snapshot: DataSnapshot = await get(dataRef); return snapshot.val(); } - async createDeviceStatusInOurDb( - addDeviceStatusDto: AddDeviceStatusDto, - ): Promise { - // Save logs to your repository - const newLogs = addDeviceStatusDto.log.properties.map((property) => { - return this.deviceStatusLogRepository.create({ - deviceId: addDeviceStatusDto.deviceUuid, - deviceTuyaId: addDeviceStatusDto.deviceTuyaUuid, - productId: addDeviceStatusDto.log.productId, - log: addDeviceStatusDto.log, - code: property.code, - value: property.value, - eventId: addDeviceStatusDto.log.dataId, - eventTime: new Date(property.time).toISOString(), - }); - }); - await this.deviceStatusLogRepository.save(newLogs); - - if (addDeviceStatusDto.productType === ProductType.PC) { - const energyCodes = new Set([ - PowerClampEnergyEnum.ENERGY_CONSUMED, - PowerClampEnergyEnum.ENERGY_CONSUMED_A, - PowerClampEnergyEnum.ENERGY_CONSUMED_B, - PowerClampEnergyEnum.ENERGY_CONSUMED_C, - ]); - - const energyStatus = addDeviceStatusDto?.log?.properties?.find((status) => - energyCodes.has(status.code), - ); - - if (energyStatus) { - await this.powerClampService.updateEnergyConsumedHistoricalData( - addDeviceStatusDto.deviceUuid, - ); - } - } - - if ( - addDeviceStatusDto.productType === ProductType.CPS || - addDeviceStatusDto.productType === ProductType.WPS - ) { - const occupancyCodes = new Set([PresenceSensorEnum.PRESENCE_STATE]); - - const occupancyStatus = addDeviceStatusDto?.log?.properties?.find( - (status) => occupancyCodes.has(status.code), - ); - - if (occupancyStatus) { - await this.occupancyService.updateOccupancySensorHistoricalData( - addDeviceStatusDto.deviceUuid, - ); - await this.occupancyService.updateOccupancySensorHistoricalDurationData( - addDeviceStatusDto.deviceUuid, - ); - } - } - if (addDeviceStatusDto.productType === ProductType.AQI) { - await this.aqiDataService.updateAQISensorHistoricalData( - addDeviceStatusDto.deviceUuid, - ); - } - } } diff --git a/libs/common/src/helper/services/tuya.web.socket.service.ts b/libs/common/src/helper/services/tuya.web.socket.service.ts index 0c32c04..3f56a63 100644 --- a/libs/common/src/helper/services/tuya.web.socket.service.ts +++ b/libs/common/src/helper/services/tuya.web.socket.service.ts @@ -38,8 +38,8 @@ export class TuyaWebSocketService { this.client.start(); } - // Trigger the queue processor every 2 seconds - setInterval(() => this.processQueue(), 10000); + // Trigger the queue processor every 15 seconds + setInterval(() => this.processQueue(), 15000); } private setupEventHandlers() { @@ -93,32 +93,30 @@ export class TuyaWebSocketService { }); } private async processQueue() { - if (this.isProcessing || this.messageQueue.length === 0) return; + if (this.isProcessing) { + console.log('⏳ Skipping: still processing previous batch'); + return; + } + + if (this.messageQueue.length === 0) return; this.isProcessing = true; - const batch = [...this.messageQueue]; this.messageQueue = []; + console.log(`🔁 Processing batch of size: ${batch.length}`); + try { - for (const item of batch) { - if (this.sosHandlerService.isSosTriggered(item.status)) { - await this.sosHandlerService.handleSosEventOurDb( - item.devId, - item.logData, - ); - } else { - await this.deviceStatusFirebaseService.addDeviceStatusToOurDb({ - deviceTuyaUuid: item.devId, - status: item.status, - log: item.logData, - }); - } - } + await this.deviceStatusFirebaseService.addBatchDeviceStatusToOurDb( + batch.map((item) => ({ + deviceTuyaUuid: item.devId, + status: item.status, + log: item.logData, + })), + ); } catch (error) { - console.error('Error processing batch:', error); - // Re-add the batch to the queue for retry - this.messageQueue.unshift(...batch); + console.error('❌ Error processing batch:', error); + this.messageQueue.unshift(...batch); // retry } finally { this.isProcessing = false; }