From c0a069b4607e91459c07fb07e9d59f7b1a415b8c Mon Sep 17 00:00:00 2001 From: faris Aljohari <83524184+farisaljohari@users.noreply.github.com> Date: Wed, 25 Jun 2025 08:03:23 -0600 Subject: [PATCH] fix: enhance device status handling by integrating device cache for improved performance --- .../services/devices-status.service.ts | 105 ++++++++---------- libs/common/src/helper/helper.module.ts | 6 +- .../helper/services/sos.handler.service.ts | 48 +++++--- .../services/tuya.web.socket.service.ts | 53 +++++++-- 4 files changed, 128 insertions(+), 84 deletions(-) diff --git a/libs/common/src/firebase/devices-status/services/devices-status.service.ts b/libs/common/src/firebase/devices-status/services/devices-status.service.ts index 3fae855..3c431e3 100644 --- a/libs/common/src/firebase/devices-status/services/devices-status.service.ts +++ b/libs/common/src/firebase/devices-status/services/devices-status.service.ts @@ -69,32 +69,23 @@ export class DeviceStatusFirebaseService { } async addBatchDeviceStatusToOurDb( batch: { deviceTuyaUuid: string; status: any; log: any }[], + deviceCache: Map, ): Promise { const allLogs = []; - const deviceMap = new Map(); console.log( - `🧠 Starting device lookups for batch of ${batch.length} items...`, + `🧠 Preparing logs from batch of ${batch.length} items using cached devices only...`, ); - // Step 1: Parallel device fetching - await Promise.all( - batch.map(async (item) => { - if (!deviceMap.has(item.deviceTuyaUuid)) { - const device = await this.getDeviceByDeviceTuyaUuid( - item.deviceTuyaUuid, - ); - device?.uuid && deviceMap.set(item.deviceTuyaUuid, device); - } - }), - ); - - console.log(`🔍 Found ${deviceMap.size} devices from batch`); - - // Step 2: Prepare logs and updates for (const item of batch) { - const device = deviceMap.get(item.deviceTuyaUuid); - if (!device?.uuid) continue; + const device = deviceCache.get(item.deviceTuyaUuid); + + if (!device?.uuid) { + console.log( + `⛔ Ignored unknown device in batch: ${item.deviceTuyaUuid}`, + ); + continue; + } const logs = item.log.properties.map((property) => this.deviceStatusLogRepository.create({ @@ -112,59 +103,53 @@ export class DeviceStatusFirebaseService { } console.log(`📝 Total logs to insert: ${allLogs.length}`); - // Step 3: Insert logs in chunks with ON CONFLICT DO NOTHING - const insertLogsPromise = (async () => { - const chunkSize = 300; - let insertedCount = 0; + const chunkSize = 300; + let insertedCount = 0; - for (let i = 0; i < allLogs.length; i += chunkSize) { - const chunk = allLogs.slice(i, i + chunkSize); - try { - const result = await this.deviceStatusLogRepository - .createQueryBuilder() - .insert() - .into('device-status-log') // or use DeviceStatusLogEntity - .values(chunk) - .orIgnore() // skip duplicates - .execute(); + for (let i = 0; i < allLogs.length; i += chunkSize) { + const chunk = allLogs.slice(i, i + chunkSize); + try { + const result = await this.deviceStatusLogRepository + .createQueryBuilder() + .insert() + .into('device-status-log') // or use DeviceStatusLogEntity + .values(chunk) + .orIgnore() // skip duplicates + .execute(); - insertedCount += result.identifiers.length; - console.log( - `✅ Inserted ${result.identifiers.length} / ${chunk.length} logs (chunk)`, - ); - } catch (error) { - console.error('❌ Insert error (skipped chunk):', error.message); - } + insertedCount += result.identifiers.length; + console.log( + `✅ Inserted ${result.identifiers.length} / ${chunk.length} logs (chunk)`, + ); + } catch (error) { + console.error('❌ Insert error (skipped chunk):', error.message); } + } - console.log( - `✅ Total logs inserted: ${insertedCount} / ${allLogs.length}`, - ); - })(); - - // Step 5: Wait for both insert and post-processing to finish - await Promise.all([insertLogsPromise]); + console.log(`✅ Total logs inserted: ${insertedCount} / ${allLogs.length}`); } async addDeviceStatusToFirebase( addDeviceStatusDto: AddDeviceStatusDto, + deviceCache: Map, ): Promise { try { - const device = await this.getDeviceByDeviceTuyaUuid( - addDeviceStatusDto.deviceTuyaUuid, - ); - - if (device?.uuid) { - return await this.createDeviceStatusFirebase({ - deviceUuid: device.uuid, - ...addDeviceStatusDto, - productType: device.productDevice.prodType, - }); + const device = deviceCache.get(addDeviceStatusDto.deviceTuyaUuid); + if (!device?.uuid) { + console.log( + `⛔ Skipping Firebase update for unknown device: ${addDeviceStatusDto.deviceTuyaUuid}`, + ); + return null; } - // Return null if device not found or no UUID - return null; + + // Ensure product info and uuid are attached + addDeviceStatusDto.deviceUuid = device.uuid; + addDeviceStatusDto.productUuid = device.productDevice?.uuid; + addDeviceStatusDto.productType = device.productDevice?.prodType; + + return await this.createDeviceStatusFirebase(addDeviceStatusDto); } catch (error) { - // Handle the error silently, perhaps log it internally or ignore it + console.error('❌ Error in addDeviceStatusToFirebase:', error); return null; } } diff --git a/libs/common/src/helper/helper.module.ts b/libs/common/src/helper/helper.module.ts index df152e2..41992d3 100644 --- a/libs/common/src/helper/helper.module.ts +++ b/libs/common/src/helper/helper.module.ts @@ -8,7 +8,10 @@ import { TuyaWebSocketService } from './services/tuya.web.socket.service'; import { OneSignalService } from './services/onesignal.service'; import { DeviceMessagesService } from './services/device.messages.service'; import { DeviceRepositoryModule } from '../modules/device/device.repository.module'; -import { DeviceNotificationRepository } from '../modules/device/repositories'; +import { + DeviceNotificationRepository, + DeviceRepository, +} from '../modules/device/repositories'; import { DeviceStatusFirebaseModule } from '../firebase/devices-status/devices-status.module'; import { CommunityPermissionService } from './services/community.permission.service'; import { CommunityRepository } from '../modules/community/repositories'; @@ -27,6 +30,7 @@ import { SosHandlerService } from './services/sos.handler.service'; DeviceNotificationRepository, CommunityRepository, SosHandlerService, + DeviceRepository, ], exports: [ HelperHashService, diff --git a/libs/common/src/helper/services/sos.handler.service.ts b/libs/common/src/helper/services/sos.handler.service.ts index dd69f33..e883b62 100644 --- a/libs/common/src/helper/services/sos.handler.service.ts +++ b/libs/common/src/helper/services/sos.handler.service.ts @@ -16,35 +16,53 @@ export class SosHandlerService { ); } - async handleSosEventFirebase(devId: string, logData: any): Promise { + async handleSosEventFirebase( + devId: string, + logData: any, + deviceCache: Map, + ): Promise { try { - await this.deviceStatusFirebaseService.addDeviceStatusToFirebase({ - deviceTuyaUuid: devId, - status: [{ code: 'sos', value: true }], - log: logData, - }); - await this.deviceStatusFirebaseService.addBatchDeviceStatusToOurDb([ + await this.deviceStatusFirebaseService.addDeviceStatusToFirebase( { deviceTuyaUuid: devId, status: [{ code: 'sos', value: true }], log: logData, }, - ]); + deviceCache, + ); + + await this.deviceStatusFirebaseService.addBatchDeviceStatusToOurDb( + [ + { + deviceTuyaUuid: devId, + status: [{ code: 'sos', value: true }], + log: logData, + }, + ], + deviceCache, + ); setTimeout(async () => { try { - await this.deviceStatusFirebaseService.addDeviceStatusToFirebase({ - deviceTuyaUuid: devId, - status: [{ code: 'sos', value: false }], - log: logData, - }); - await this.deviceStatusFirebaseService.addBatchDeviceStatusToOurDb([ + await this.deviceStatusFirebaseService.addDeviceStatusToFirebase( { deviceTuyaUuid: devId, status: [{ code: 'sos', value: false }], log: logData, }, - ]); + deviceCache, + ); + + await this.deviceStatusFirebaseService.addBatchDeviceStatusToOurDb( + [ + { + deviceTuyaUuid: devId, + status: [{ code: 'sos', value: false }], + log: logData, + }, + ], + deviceCache, + ); } catch (err) { this.logger.error('Failed to send SOS false value', err); } diff --git a/libs/common/src/helper/services/tuya.web.socket.service.ts b/libs/common/src/helper/services/tuya.web.socket.service.ts index 9d56240..e0850be 100644 --- a/libs/common/src/helper/services/tuya.web.socket.service.ts +++ b/libs/common/src/helper/services/tuya.web.socket.service.ts @@ -3,6 +3,7 @@ import TuyaWebsocket from '../../config/tuya-web-socket-config'; import { ConfigService } from '@nestjs/config'; import { DeviceStatusFirebaseService } from '@app/common/firebase/devices-status/services/devices-status.service'; import { SosHandlerService } from './sos.handler.service'; +import { DeviceRepository } from '@app/common/modules/device/repositories'; @Injectable() export class TuyaWebSocketService { @@ -16,11 +17,13 @@ export class TuyaWebSocketService { }[] = []; private isProcessing = false; + private deviceCache: Map = new Map(); constructor( private readonly configService: ConfigService, private readonly deviceStatusFirebaseService: DeviceStatusFirebaseService, private readonly sosHandlerService: SosHandlerService, + private readonly deviceRepository: DeviceRepository, ) { this.isDevEnv = this.configService.get('NODE_ENV') === 'development'; @@ -33,6 +36,11 @@ export class TuyaWebSocketService { maxRetryTimes: 100, }); + this.loadAllActiveDevices(); + + // Reload device cache every 1 hour + setInterval(() => this.loadAllActiveDevices(), 60 * 60 * 1000); + if (this.configService.get('tuya-config.TRUN_ON_TUYA_SOCKET')) { this.setupEventHandlers(); this.client.start(); @@ -42,6 +50,22 @@ export class TuyaWebSocketService { setInterval(() => this.processQueue(), 15000); } + private async loadAllActiveDevices(): Promise { + const devices = await this.deviceRepository.find({ + where: { isActive: true }, + relations: ['productDevice'], + }); + + this.deviceCache.clear(); + devices.forEach((device) => { + this.deviceCache.set(device.deviceTuyaUuid, device); + }); + + console.log( + `🔄 Device cache reloaded: ${this.deviceCache.size} active devices at ${new Date().toISOString()}`, + ); + } + private setupEventHandlers() { // Event handlers this.client.open(() => { @@ -51,18 +75,30 @@ export class TuyaWebSocketService { this.client.message(async (ws: WebSocket, message: any) => { try { const { devId, status, logData } = this.extractMessageData(message); - if (!Array.isArray(logData?.properties)) { + if (!Array.isArray(logData?.properties)) return; + + const device = this.deviceCache.get(devId); + if (!device) { + // console.log(`⛔ Ignored unknown device: ${devId}`); return; } + if (this.sosHandlerService.isSosTriggered(status)) { - await this.sosHandlerService.handleSosEventFirebase(devId, logData); + await this.sosHandlerService.handleSosEventFirebase( + devId, + logData, + this.deviceCache, + ); } else { // Firebase real-time update - await this.deviceStatusFirebaseService.addDeviceStatusToFirebase({ - deviceTuyaUuid: devId, - status: status, - log: logData, - }); + await this.deviceStatusFirebaseService.addDeviceStatusToFirebase( + { + deviceTuyaUuid: devId, + status, + log: logData, + }, + this.deviceCache, + ); } // Push to internal queue @@ -111,11 +147,12 @@ export class TuyaWebSocketService { try { await this.deviceStatusFirebaseService.addBatchDeviceStatusToOurDb( - batch?.map((item) => ({ + batch.map((item) => ({ deviceTuyaUuid: item.devId, status: item.status, log: item.logData, })), + this.deviceCache, ); } catch (error) { console.error('❌ Error processing batch:', error);