import { Injectable, OnModuleInit } from '@nestjs/common'; import TuyaWebsocket from '../../config/tuya-web-socket-config'; import { ConfigService } from '@nestjs/config'; import { DeviceStatusFirebaseService } from '@app/common/firebase/devices-status/services/devices-status.service'; import { SosHandlerService } from './sos.handler.service'; import * as NodeCache from 'node-cache'; @Injectable() export class TuyaWebSocketService implements OnModuleInit { private client: any; private readonly isDevEnv: boolean; private readonly deviceCache = new NodeCache({ stdTTL: 7200 }); // TTL = 2 hour private messageQueue: { devId: string; status: any; logData: any; device: any; }[] = []; private isProcessing = false; constructor( private readonly configService: ConfigService, private readonly deviceStatusFirebaseService: DeviceStatusFirebaseService, private readonly sosHandlerService: SosHandlerService, ) { this.isDevEnv = this.configService.get('NODE_ENV') === 'development'; this.client = new TuyaWebsocket({ accessId: this.configService.get('tuya-config.TUYA_ACCESS_ID'), accessKey: this.configService.get('tuya-config.TUYA_ACCESS_KEY'), url: TuyaWebsocket.URL.EU, env: TuyaWebsocket.env.PROD, maxRetryTimes: 100, }); if (this.configService.get('tuya-config.TRUN_ON_TUYA_SOCKET')) { this.setupEventHandlers(); this.client.start(); } // Run the queue processor every 15 seconds setInterval(() => this.processQueue(), 15000); // Refresh the cache every 1 hour setInterval(() => this.initializeDeviceCache(), 30 * 60 * 1000); // 30 minutes } async onModuleInit() { await this.initializeDeviceCache(); } private async initializeDeviceCache() { try { const allDevices = await this.deviceStatusFirebaseService.getAllDevices(); allDevices.forEach((device) => { if (device.deviceTuyaUuid) { this.deviceCache.set(device.deviceTuyaUuid, device); } }); console.log(`✅ Refreshed cache with ${allDevices.length} devices.`); } catch (error) { console.error('❌ Failed to initialize device cache:', error); } } private setupEventHandlers() { this.client.open(() => { console.log('open'); }); this.client.message(async (ws: WebSocket, message: any) => { try { const { devId, status, logData } = this.extractMessageData(message); if (!Array.isArray(logData?.properties)) { this.client.ackMessage(message.messageId); return; } const device = this.deviceCache.get(devId); if (!device) { // console.log(⛔ Unknown device: ${devId}, message ignored.); this.client.ackMessage(message.messageId); return; } if (this.sosHandlerService.isSosTriggered(status)) { await this.sosHandlerService.handleSosEventFirebase(devId, logData); } else { await this.deviceStatusFirebaseService.addDeviceStatusToFirebase({ deviceTuyaUuid: devId, status, log: logData, device, }); } // Push to internal queue this.messageQueue.push({ devId, status, logData, device }); // Acknowledge the message this.client.ackMessage(message.messageId); } catch (error) { console.error('❌ Error receiving message:', error); } }); this.client.reconnect(() => { console.log('reconnect'); }); this.client.ping(() => { console.log('ping'); }); this.client.pong(() => { console.log('pong'); }); this.client.close((ws: WebSocket, ...args: any[]) => { console.log('close', ...args); }); this.client.error((ws: WebSocket, error: any) => { console.error('WebSocket error:', error); }); } private async processQueue() { if (this.isProcessing) { console.log('⏳ Skipping: still processing previous batch'); return; } if (this.messageQueue.length === 0) return; this.isProcessing = true; const batch = [...this.messageQueue]; this.messageQueue = []; console.log(`🔁 Processing batch of size: ${batch.length}`); try { await this.deviceStatusFirebaseService.addBatchDeviceStatusToOurDb( batch.map((item) => ({ deviceTuyaUuid: item.devId, status: item.status, log: item.logData, device: item.device, })), ); } catch (error) { console.error('❌ Error processing batch:', error); this.messageQueue.unshift(...batch); // retry } finally { this.isProcessing = false; } } private extractMessageData(message: any): { devId: string; status: any; logData: any; } { const payloadData = message.payload.data; if (this.isDevEnv) { return { devId: payloadData.bizData?.devId, status: payloadData.bizData?.properties, logData: payloadData.bizData, }; } else { return { devId: payloadData.devId, status: payloadData.status, logData: payloadData, }; } } // private logDeviceData(devId: string, status: any, logData: any): void { // console.log('Device ID:', devId); // console.log('Status:', status); // console.log('Full Data:', logData); // } }