diff --git a/libs/common/src/helper/services/tuya.web.socket.service.ts b/libs/common/src/helper/services/tuya.web.socket.service.ts index 5a810ab..dcf07b9 100644 --- a/libs/common/src/helper/services/tuya.web.socket.service.ts +++ b/libs/common/src/helper/services/tuya.web.socket.service.ts @@ -9,6 +9,14 @@ export class TuyaWebSocketService { private client: any; private readonly isDevEnv: boolean; + private messageQueue: { + devId: string; + status: any; + logData: any; + }[] = []; + + private isProcessing = false; + constructor( private readonly configService: ConfigService, private readonly deviceStatusFirebaseService: DeviceStatusFirebaseService, @@ -26,12 +34,12 @@ export class TuyaWebSocketService { }); if (this.configService.get('tuya-config.TRUN_ON_TUYA_SOCKET')) { - // Set up event handlers this.setupEventHandlers(); - - // Start receiving messages this.client.start(); } + + // Trigger the queue processor every 2 seconds + setInterval(() => this.processQueue(), 10000); } private setupEventHandlers() { @@ -44,19 +52,13 @@ export class TuyaWebSocketService { try { const { devId, status, logData } = this.extractMessageData(message); - if (this.sosHandlerService.isSosTriggered(status)) { - await this.sosHandlerService.handleSosEvent(devId, logData); - } else { - await this.deviceStatusFirebaseService.addDeviceStatusToFirebase({ - deviceTuyaUuid: devId, - status: status, - log: logData, - }); - } + // Push to internal queue + this.messageQueue.push({ devId, status, logData }); + // Acknowledge the message this.client.ackMessage(message.messageId); } catch (error) { - console.error('Error processing message:', error); + console.error('Error receiving message:', error); } }); @@ -80,6 +82,35 @@ export class TuyaWebSocketService { console.error('WebSocket error:', error); }); } + private async processQueue() { + if (this.isProcessing || this.messageQueue.length === 0) return; + + this.isProcessing = true; + + const batch = [...this.messageQueue]; + this.messageQueue = []; + + try { + for (const item of batch) { + if (this.sosHandlerService.isSosTriggered(item.status)) { + await this.sosHandlerService.handleSosEvent(item.devId, item.logData); + } else { + await this.deviceStatusFirebaseService.addDeviceStatusToFirebase({ + deviceTuyaUuid: item.devId, + status: item.status, + log: item.logData, + }); + } + } + } catch (error) { + console.error('Error processing batch:', error); + // Re-add the batch to the queue for retry + this.messageQueue.unshift(...batch); + } finally { + this.isProcessing = false; + } + } + private extractMessageData(message: any): { devId: string; status: any;