From 731819aeaa7dbaccfaaf2879e3480616746384f9 Mon Sep 17 00:00:00 2001 From: faris Aljohari <83524184+farisaljohari@users.noreply.github.com> Date: Wed, 25 Jun 2025 18:37:46 -0600 Subject: [PATCH 1/2] feat: enhance device status handling with caching and batch processing improvements --- .../services/devices-status.service.ts | 60 +++++++++---------- .../helper/services/sos.handler.service.ts | 29 ++++++--- .../services/tuya.web.socket.service.ts | 46 ++++++++++++-- package-lock.json | 22 +++++++ package.json | 1 + 5 files changed, 113 insertions(+), 45 deletions(-) diff --git a/libs/common/src/firebase/devices-status/services/devices-status.service.ts b/libs/common/src/firebase/devices-status/services/devices-status.service.ts index 3fae855..6560276 100644 --- a/libs/common/src/firebase/devices-status/services/devices-status.service.ts +++ b/libs/common/src/firebase/devices-status/services/devices-status.service.ts @@ -68,33 +68,23 @@ export class DeviceStatusFirebaseService { } } async addBatchDeviceStatusToOurDb( - batch: { deviceTuyaUuid: string; status: any; log: any }[], + batch: { + deviceTuyaUuid: string; + status: any; + log: any; + device: any; + }[], ): Promise { const allLogs = []; - const deviceMap = new Map(); - console.log( - `🧠 Starting device lookups for batch of ${batch.length} items...`, - ); + console.log(`🔁 Preparing logs from batch of ${batch.length} items...`); - // Step 1: Parallel device fetching - await Promise.all( - batch.map(async (item) => { - if (!deviceMap.has(item.deviceTuyaUuid)) { - const device = await this.getDeviceByDeviceTuyaUuid( - item.deviceTuyaUuid, - ); - device?.uuid && deviceMap.set(item.deviceTuyaUuid, device); - } - }), - ); - - console.log(`🔍 Found ${deviceMap.size} devices from batch`); - - // Step 2: Prepare logs and updates for (const item of batch) { - const device = deviceMap.get(item.deviceTuyaUuid); - if (!device?.uuid) continue; + const device = item.device; + if (!device?.uuid) { + console.log(`⛔ Skipped unknown device: ${item.deviceTuyaUuid}`); + continue; + } const logs = item.log.properties.map((property) => this.deviceStatusLogRepository.create({ @@ -142,23 +132,24 @@ export class DeviceStatusFirebaseService { ); })(); - // Step 5: Wait for both insert and post-processing to finish - await Promise.all([insertLogsPromise]); + await insertLogsPromise; } async addDeviceStatusToFirebase( - addDeviceStatusDto: AddDeviceStatusDto, + addDeviceStatusDto: AddDeviceStatusDto & { device?: any }, ): Promise { try { - const device = await this.getDeviceByDeviceTuyaUuid( - addDeviceStatusDto.deviceTuyaUuid, - ); - + let device = addDeviceStatusDto.device; + if (!device) { + device = await this.getDeviceByDeviceTuyaUuid( + addDeviceStatusDto.deviceTuyaUuid, + ); + } if (device?.uuid) { return await this.createDeviceStatusFirebase({ deviceUuid: device.uuid, ...addDeviceStatusDto, - productType: device.productDevice.prodType, + productType: device.productDevice?.prodType, }); } // Return null if device not found or no UUID @@ -178,6 +169,15 @@ export class DeviceStatusFirebaseService { relations: ['productDevice'], }); } + async getAllDevices() { + return await this.deviceRepository.find({ + where: { + isActive: true, + }, + relations: ['productDevice'], + }); + } + async getDevicesInstructionStatus(deviceUuid: string) { try { const deviceDetails = await this.getDeviceByDeviceUuid(deviceUuid); diff --git a/libs/common/src/helper/services/sos.handler.service.ts b/libs/common/src/helper/services/sos.handler.service.ts index dd69f33..55a0daf 100644 --- a/libs/common/src/helper/services/sos.handler.service.ts +++ b/libs/common/src/helper/services/sos.handler.service.ts @@ -16,33 +16,44 @@ export class SosHandlerService { ); } - async handleSosEventFirebase(devId: string, logData: any): Promise { + async handleSosEventFirebase(device: any, logData: any): Promise { + const sosTrueStatus = [{ code: 'sos', value: true }]; + const sosFalseStatus = [{ code: 'sos', value: false }]; + try { + // ✅ Send true status await this.deviceStatusFirebaseService.addDeviceStatusToFirebase({ - deviceTuyaUuid: devId, - status: [{ code: 'sos', value: true }], + deviceTuyaUuid: device.deviceTuyaUuid, + status: sosTrueStatus, log: logData, + device, }); + await this.deviceStatusFirebaseService.addBatchDeviceStatusToOurDb([ { - deviceTuyaUuid: devId, - status: [{ code: 'sos', value: true }], + deviceTuyaUuid: device.deviceTuyaUuid, + status: sosTrueStatus, log: logData, + device, }, ]); + // ✅ Schedule false status setTimeout(async () => { try { await this.deviceStatusFirebaseService.addDeviceStatusToFirebase({ - deviceTuyaUuid: devId, - status: [{ code: 'sos', value: false }], + deviceTuyaUuid: device.deviceTuyaUuid, + status: sosFalseStatus, log: logData, + device, }); + await this.deviceStatusFirebaseService.addBatchDeviceStatusToOurDb([ { - deviceTuyaUuid: devId, - status: [{ code: 'sos', value: false }], + deviceTuyaUuid: device.deviceTuyaUuid, + status: sosFalseStatus, log: logData, + device, }, ]); } catch (err) { diff --git a/libs/common/src/helper/services/tuya.web.socket.service.ts b/libs/common/src/helper/services/tuya.web.socket.service.ts index 9d56240..be32b1d 100644 --- a/libs/common/src/helper/services/tuya.web.socket.service.ts +++ b/libs/common/src/helper/services/tuya.web.socket.service.ts @@ -1,18 +1,21 @@ -import { Injectable } from '@nestjs/common'; +import { Injectable, OnModuleInit } from '@nestjs/common'; import TuyaWebsocket from '../../config/tuya-web-socket-config'; import { ConfigService } from '@nestjs/config'; import { DeviceStatusFirebaseService } from '@app/common/firebase/devices-status/services/devices-status.service'; import { SosHandlerService } from './sos.handler.service'; +import * as NodeCache from 'node-cache'; @Injectable() -export class TuyaWebSocketService { +export class TuyaWebSocketService implements OnModuleInit { private client: any; private readonly isDevEnv: boolean; + private readonly deviceCache = new NodeCache({ stdTTL: 7200 }); // TTL = 2 hour private messageQueue: { devId: string; status: any; logData: any; + device: any; }[] = []; private isProcessing = false; @@ -38,8 +41,29 @@ export class TuyaWebSocketService { this.client.start(); } - // Trigger the queue processor every 15 seconds + // Run the queue processor every 15 seconds setInterval(() => this.processQueue(), 15000); + + // Refresh the cache every 1 hour + setInterval(() => this.initializeDeviceCache(), 30 * 60 * 1000); // 30 minutes + } + + async onModuleInit() { + await this.initializeDeviceCache(); + } + + private async initializeDeviceCache() { + try { + const allDevices = await this.deviceStatusFirebaseService.getAllDevices(); + allDevices.forEach((device) => { + if (device.deviceTuyaUuid) { + this.deviceCache.set(device.deviceTuyaUuid, device); + } + }); + console.log(`✅ Refreshed cache with ${allDevices.length} devices.`); + } catch (error) { + console.error('❌ Failed to initialize device cache:', error); + } } private setupEventHandlers() { @@ -52,6 +76,14 @@ export class TuyaWebSocketService { try { const { devId, status, logData } = this.extractMessageData(message); if (!Array.isArray(logData?.properties)) { + this.client.ackMessage(message.messageId); + return; + } + + const device = this.deviceCache.get(devId); + if (!device) { + // console.warn(`⚠️ Device not found in cache: ${devId}`); + this.client.ackMessage(message.messageId); return; } if (this.sosHandlerService.isSosTriggered(status)) { @@ -60,13 +92,14 @@ export class TuyaWebSocketService { // Firebase real-time update await this.deviceStatusFirebaseService.addDeviceStatusToFirebase({ deviceTuyaUuid: devId, - status: status, + status, log: logData, + device, }); } // Push to internal queue - this.messageQueue.push({ devId, status, logData }); + this.messageQueue.push({ devId, status, logData, device }); // Acknowledge the message this.client.ackMessage(message.messageId); @@ -111,10 +144,11 @@ export class TuyaWebSocketService { try { await this.deviceStatusFirebaseService.addBatchDeviceStatusToOurDb( - batch?.map((item) => ({ + batch.map((item) => ({ deviceTuyaUuid: item.devId, status: item.status, log: item.logData, + device: item.device, })), ); } catch (error) { diff --git a/package-lock.json b/package-lock.json index e3305e5..e8718b3 100644 --- a/package-lock.json +++ b/package-lock.json @@ -39,6 +39,7 @@ "ioredis": "^5.3.2", "morgan": "^1.10.0", "nest-winston": "^1.10.2", + "node-cache": "^5.1.2", "nodemailer": "^6.9.10", "onesignal-node": "^3.4.0", "passport-jwt": "^4.0.1", @@ -10184,6 +10185,27 @@ "node": "^18 || ^20 || >= 21" } }, + "node_modules/node-cache": { + "version": "5.1.2", + "resolved": "https://registry.npmjs.org/node-cache/-/node-cache-5.1.2.tgz", + "integrity": "sha512-t1QzWwnk4sjLWaQAS8CHgOJ+RAfmHpxFWmc36IWTiWHQfs0w5JDMBS1b1ZxQteo0vVVuWJvIUKHDkkeK7vIGCg==", + "license": "MIT", + "dependencies": { + "clone": "2.x" + }, + "engines": { + "node": ">= 8.0.0" + } + }, + "node_modules/node-cache/node_modules/clone": { + "version": "2.1.2", + "resolved": "https://registry.npmjs.org/clone/-/clone-2.1.2.tgz", + "integrity": "sha512-3Pe/CF1Nn94hyhIYpjtiLhdCoEoz0DqQ+988E9gmeEdQZlojxnOb74wctFyuwWQHzqyf9X7C7MG8juUpqBJT8w==", + "license": "MIT", + "engines": { + "node": ">=0.8" + } + }, "node_modules/node-emoji": { "version": "1.11.0", "resolved": "https://registry.npmjs.org/node-emoji/-/node-emoji-1.11.0.tgz", diff --git a/package.json b/package.json index 55d546b..6e16079 100644 --- a/package.json +++ b/package.json @@ -51,6 +51,7 @@ "ioredis": "^5.3.2", "morgan": "^1.10.0", "nest-winston": "^1.10.2", + "node-cache": "^5.1.2", "nodemailer": "^6.9.10", "onesignal-node": "^3.4.0", "passport-jwt": "^4.0.1", From f80d097ff88b882e1b82b9dd5603289b75357f7e Mon Sep 17 00:00:00 2001 From: faris Aljohari <83524184+farisaljohari@users.noreply.github.com> Date: Wed, 25 Jun 2025 18:57:56 -0600 Subject: [PATCH 2/2] refactor: optimize log insertion and clean up device cache handling in TuyaWebSocketService --- .../services/devices-status.service.ts | 41 ++++++++++--------- .../services/tuya.web.socket.service.ts | 22 ++-------- 2 files changed, 24 insertions(+), 39 deletions(-) diff --git a/libs/common/src/firebase/devices-status/services/devices-status.service.ts b/libs/common/src/firebase/devices-status/services/devices-status.service.ts index 3972b2a..b3ef843 100644 --- a/libs/common/src/firebase/devices-status/services/devices-status.service.ts +++ b/libs/common/src/firebase/devices-status/services/devices-status.service.ts @@ -102,28 +102,30 @@ export class DeviceStatusFirebaseService { } console.log(`📝 Total logs to insert: ${allLogs.length}`); - const chunkSize = 300; - let insertedCount = 0; - for (let i = 0; i < allLogs.length; i += chunkSize) { - const chunk = allLogs.slice(i, i + chunkSize); - try { - const result = await this.deviceStatusLogRepository - .createQueryBuilder() - .insert() - .into('device-status-log') // or use DeviceStatusLogEntity - .values(chunk) - .orIgnore() // skip duplicates - .execute(); + const insertLogsPromise = (async () => { + const chunkSize = 300; + let insertedCount = 0; - insertedCount += result.identifiers.length; - console.log( - `✅ Inserted ${result.identifiers.length} / ${chunk.length} logs (chunk)`, - ); - } catch (error) { - console.error('❌ Insert error (skipped chunk):', error.message); + for (let i = 0; i < allLogs.length; i += chunkSize) { + const chunk = allLogs.slice(i, i + chunkSize); + try { + const result = await this.deviceStatusLogRepository + .createQueryBuilder() + .insert() + .into('device-status-log') // or use DeviceStatusLogEntity + .values(chunk) + .orIgnore() // skip duplicates + .execute(); + + insertedCount += result.identifiers.length; + console.log( + `✅ Inserted ${result.identifiers.length} / ${chunk.length} logs (chunk)`, + ); + } catch (error) { + console.error('❌ Insert error (skipped chunk):', error.message); + } } - } console.log( `✅ Total logs inserted: ${insertedCount} / ${allLogs.length}`, @@ -153,7 +155,6 @@ export class DeviceStatusFirebaseService { // Return null if device not found or no UUID return null; } catch (error) { - console.error('❌ Error in addDeviceStatusToFirebase:', error); return null; } } diff --git a/libs/common/src/helper/services/tuya.web.socket.service.ts b/libs/common/src/helper/services/tuya.web.socket.service.ts index 63de80b..d30200f 100644 --- a/libs/common/src/helper/services/tuya.web.socket.service.ts +++ b/libs/common/src/helper/services/tuya.web.socket.service.ts @@ -19,13 +19,11 @@ export class TuyaWebSocketService implements OnModuleInit { }[] = []; private isProcessing = false; - private deviceCache: Map = new Map(); constructor( private readonly configService: ConfigService, private readonly deviceStatusFirebaseService: DeviceStatusFirebaseService, private readonly sosHandlerService: SosHandlerService, - private readonly deviceRepository: DeviceRepository, ) { this.isDevEnv = this.configService.get('NODE_ENV') === 'development'; @@ -38,11 +36,6 @@ export class TuyaWebSocketService implements OnModuleInit { maxRetryTimes: 100, }); - this.loadAllActiveDevices(); - - // Reload device cache every 1 hour - setInterval(() => this.loadAllActiveDevices(), 60 * 60 * 1000); - if (this.configService.get('tuya-config.TRUN_ON_TUYA_SOCKET')) { this.setupEventHandlers(); this.client.start(); @@ -74,7 +67,6 @@ export class TuyaWebSocketService implements OnModuleInit { } private setupEventHandlers() { - // Event handlers this.client.open(() => { console.log('open'); }); @@ -89,19 +81,14 @@ export class TuyaWebSocketService implements OnModuleInit { const device = this.deviceCache.get(devId); if (!device) { - // console.warn(`⚠️ Device not found in cache: ${devId}`); + // console.log(⛔ Unknown device: ${devId}, message ignored.); this.client.ackMessage(message.messageId); return; } if (this.sosHandlerService.isSosTriggered(status)) { - await this.sosHandlerService.handleSosEventFirebase( - devId, - logData, - this.deviceCache, - ); + await this.sosHandlerService.handleSosEventFirebase(devId, logData); } else { - // Firebase real-time update await this.deviceStatusFirebaseService.addDeviceStatusToFirebase({ deviceTuyaUuid: devId, status, @@ -116,10 +103,9 @@ export class TuyaWebSocketService implements OnModuleInit { // Acknowledge the message this.client.ackMessage(message.messageId); } catch (error) { - console.error('Error receiving message:', error); + console.error('❌ Error receiving message:', error); } }); - this.client.reconnect(() => { console.log('reconnect'); }); @@ -156,14 +142,12 @@ export class TuyaWebSocketService implements OnModuleInit { try { await this.deviceStatusFirebaseService.addBatchDeviceStatusToOurDb( - batch.map((item) => ({ batch.map((item) => ({ deviceTuyaUuid: item.devId, status: item.status, log: item.logData, device: item.device, })), - this.deviceCache, ); } catch (error) { console.error('❌ Error processing batch:', error);