diff --git a/libs/common/src/firebase/devices-status/services/devices-status.service.ts b/libs/common/src/firebase/devices-status/services/devices-status.service.ts index 477df26..4b0b0f7 100644 --- a/libs/common/src/firebase/devices-status/services/devices-status.service.ts +++ b/libs/common/src/firebase/devices-status/services/devices-status.service.ts @@ -226,49 +226,49 @@ export class DeviceStatusFirebaseService { }); await this.deviceStatusLogRepository.save(newLogs); - // if (addDeviceStatusDto.productType === ProductType.PC) { - // const energyCodes = new Set([ - // PowerClampEnergyEnum.ENERGY_CONSUMED, - // PowerClampEnergyEnum.ENERGY_CONSUMED_A, - // PowerClampEnergyEnum.ENERGY_CONSUMED_B, - // PowerClampEnergyEnum.ENERGY_CONSUMED_C, - // ]); + if (addDeviceStatusDto.productType === ProductType.PC) { + const energyCodes = new Set([ + PowerClampEnergyEnum.ENERGY_CONSUMED, + PowerClampEnergyEnum.ENERGY_CONSUMED_A, + PowerClampEnergyEnum.ENERGY_CONSUMED_B, + PowerClampEnergyEnum.ENERGY_CONSUMED_C, + ]); - // const energyStatus = addDeviceStatusDto?.log?.properties?.find((status) => - // energyCodes.has(status.code), - // ); + const energyStatus = addDeviceStatusDto?.log?.properties?.find((status) => + energyCodes.has(status.code), + ); - // if (energyStatus) { - // await this.powerClampService.updateEnergyConsumedHistoricalData( - // addDeviceStatusDto.deviceUuid, - // ); - // } - // } + if (energyStatus) { + await this.powerClampService.updateEnergyConsumedHistoricalData( + addDeviceStatusDto.deviceUuid, + ); + } + } - // if ( - // addDeviceStatusDto.productType === ProductType.CPS || - // addDeviceStatusDto.productType === ProductType.WPS - // ) { - // const occupancyCodes = new Set([PresenceSensorEnum.PRESENCE_STATE]); + if ( + addDeviceStatusDto.productType === ProductType.CPS || + addDeviceStatusDto.productType === ProductType.WPS + ) { + const occupancyCodes = new Set([PresenceSensorEnum.PRESENCE_STATE]); - // const occupancyStatus = addDeviceStatusDto?.log?.properties?.find( - // (status) => occupancyCodes.has(status.code), - // ); + const occupancyStatus = addDeviceStatusDto?.log?.properties?.find( + (status) => occupancyCodes.has(status.code), + ); - // if (occupancyStatus) { - // await this.occupancyService.updateOccupancySensorHistoricalData( - // addDeviceStatusDto.deviceUuid, - // ); - // await this.occupancyService.updateOccupancySensorHistoricalDurationData( - // addDeviceStatusDto.deviceUuid, - // ); - // } - // } - // if (addDeviceStatusDto.productType === ProductType.AQI) { - // await this.aqiDataService.updateAQISensorHistoricalData( - // addDeviceStatusDto.deviceUuid, - // ); - // } + if (occupancyStatus) { + await this.occupancyService.updateOccupancySensorHistoricalData( + addDeviceStatusDto.deviceUuid, + ); + await this.occupancyService.updateOccupancySensorHistoricalDurationData( + addDeviceStatusDto.deviceUuid, + ); + } + } + if (addDeviceStatusDto.productType === ProductType.AQI) { + await this.aqiDataService.updateAQISensorHistoricalData( + addDeviceStatusDto.deviceUuid, + ); + } // Return the updated data const snapshot: DataSnapshot = await get(dataRef); return snapshot.val(); diff --git a/libs/common/src/helper/services/tuya.web.socket.service.ts b/libs/common/src/helper/services/tuya.web.socket.service.ts index 5a810ab..dcf07b9 100644 --- a/libs/common/src/helper/services/tuya.web.socket.service.ts +++ b/libs/common/src/helper/services/tuya.web.socket.service.ts @@ -9,6 +9,14 @@ export class TuyaWebSocketService { private client: any; private readonly isDevEnv: boolean; + private messageQueue: { + devId: string; + status: any; + logData: any; + }[] = []; + + private isProcessing = false; + constructor( private readonly configService: ConfigService, private readonly deviceStatusFirebaseService: DeviceStatusFirebaseService, @@ -26,12 +34,12 @@ export class TuyaWebSocketService { }); if (this.configService.get('tuya-config.TRUN_ON_TUYA_SOCKET')) { - // Set up event handlers this.setupEventHandlers(); - - // Start receiving messages this.client.start(); } + + // Trigger the queue processor every 2 seconds + setInterval(() => this.processQueue(), 10000); } private setupEventHandlers() { @@ -44,19 +52,13 @@ export class TuyaWebSocketService { try { const { devId, status, logData } = this.extractMessageData(message); - if (this.sosHandlerService.isSosTriggered(status)) { - await this.sosHandlerService.handleSosEvent(devId, logData); - } else { - await this.deviceStatusFirebaseService.addDeviceStatusToFirebase({ - deviceTuyaUuid: devId, - status: status, - log: logData, - }); - } + // Push to internal queue + this.messageQueue.push({ devId, status, logData }); + // Acknowledge the message this.client.ackMessage(message.messageId); } catch (error) { - console.error('Error processing message:', error); + console.error('Error receiving message:', error); } }); @@ -80,6 +82,35 @@ export class TuyaWebSocketService { console.error('WebSocket error:', error); }); } + private async processQueue() { + if (this.isProcessing || this.messageQueue.length === 0) return; + + this.isProcessing = true; + + const batch = [...this.messageQueue]; + this.messageQueue = []; + + try { + for (const item of batch) { + if (this.sosHandlerService.isSosTriggered(item.status)) { + await this.sosHandlerService.handleSosEvent(item.devId, item.logData); + } else { + await this.deviceStatusFirebaseService.addDeviceStatusToFirebase({ + deviceTuyaUuid: item.devId, + status: item.status, + log: item.logData, + }); + } + } + } catch (error) { + console.error('Error processing batch:', error); + // Re-add the batch to the queue for retry + this.messageQueue.unshift(...batch); + } finally { + this.isProcessing = false; + } + } + private extractMessageData(message: any): { devId: string; status: any;