Merge pull request #439 from SyncrowIOT/add-check-log-to-trace-the-map-issue

feat: enhance device status handling with caching and batch processin…
This commit is contained in:
faljawhary
2025-06-25 18:59:37 -06:00
committed by GitHub
5 changed files with 151 additions and 134 deletions

View File

@ -68,22 +68,21 @@ export class DeviceStatusFirebaseService {
} }
} }
async addBatchDeviceStatusToOurDb( async addBatchDeviceStatusToOurDb(
batch: { deviceTuyaUuid: string; status: any; log: any }[], batch: {
deviceCache: Map<string, any>, deviceTuyaUuid: string;
status: any;
log: any;
device: any;
}[],
): Promise<void> { ): Promise<void> {
const allLogs = []; const allLogs = [];
console.log( console.log(`🔁 Preparing logs from batch of ${batch.length} items...`);
`🧠 Preparing logs from batch of ${batch.length} items using cached devices only...`,
);
for (const item of batch) { for (const item of batch) {
const device = deviceCache.get(item.deviceTuyaUuid); const device = item.device;
if (!device?.uuid) { if (!device?.uuid) {
console.log( console.log(`⛔ Skipped unknown device: ${item.deviceTuyaUuid}`);
`⛔ Ignored unknown device in batch: ${item.deviceTuyaUuid}`,
);
continue; continue;
} }
@ -103,61 +102,59 @@ export class DeviceStatusFirebaseService {
} }
console.log(`📝 Total logs to insert: ${allLogs.length}`); console.log(`📝 Total logs to insert: ${allLogs.length}`);
const chunkSize = 300;
let insertedCount = 0;
for (let i = 0; i < allLogs.length; i += chunkSize) { const insertLogsPromise = (async () => {
const chunk = allLogs.slice(i, i + chunkSize); const chunkSize = 300;
try { let insertedCount = 0;
const result = await this.deviceStatusLogRepository
.createQueryBuilder()
.insert()
.into('device-status-log') // or use DeviceStatusLogEntity
.values(chunk)
.orIgnore() // skip duplicates
.execute();
insertedCount += result.identifiers.length; for (let i = 0; i < allLogs.length; i += chunkSize) {
console.log( const chunk = allLogs.slice(i, i + chunkSize);
`✅ Inserted ${result.identifiers.length} / ${chunk.length} logs (chunk)`, try {
); const result = await this.deviceStatusLogRepository
} catch (error) { .createQueryBuilder()
console.error('❌ Insert error (skipped chunk):', error.message); .insert()
.into('device-status-log') // or use DeviceStatusLogEntity
.values(chunk)
.orIgnore() // skip duplicates
.execute();
insertedCount += result.identifiers.length;
console.log(
`✅ Inserted ${result.identifiers.length} / ${chunk.length} logs (chunk)`,
);
} catch (error) {
console.error('❌ Insert error (skipped chunk):', error.message);
}
} }
}
console.log(`✅ Total logs inserted: ${insertedCount} / ${allLogs.length}`); console.log(
`✅ Total logs inserted: ${insertedCount} / ${allLogs.length}`,
);
})();
await insertLogsPromise;
} }
async addDeviceStatusToFirebase( async addDeviceStatusToFirebase(
addDeviceStatusDto: AddDeviceStatusDto, addDeviceStatusDto: AddDeviceStatusDto & { device?: any },
deviceCache?: Map<string, any>,
): Promise<AddDeviceStatusDto | null> { ): Promise<AddDeviceStatusDto | null> {
try { try {
let device; let device = addDeviceStatusDto.device;
if (!device) {
if (deviceCache) {
device = deviceCache.get(addDeviceStatusDto.deviceTuyaUuid);
} else {
device = await this.getDeviceByDeviceTuyaUuid( device = await this.getDeviceByDeviceTuyaUuid(
addDeviceStatusDto.deviceTuyaUuid, addDeviceStatusDto.deviceTuyaUuid,
); );
} }
if (device?.uuid) {
if (!device?.uuid) { return await this.createDeviceStatusFirebase({
console.log( deviceUuid: device.uuid,
`⛔ Skipping Firebase update for unknown device: ${addDeviceStatusDto.deviceTuyaUuid}`, ...addDeviceStatusDto,
); productType: device.productDevice?.prodType,
return null; });
} }
// Return null if device not found or no UUID
addDeviceStatusDto.deviceUuid = device.uuid; return null;
addDeviceStatusDto.productUuid = device.productDevice?.uuid;
addDeviceStatusDto.productType = device.productDevice?.prodType;
return await this.createDeviceStatusFirebase(addDeviceStatusDto);
} catch (error) { } catch (error) {
console.error('❌ Error in addDeviceStatusToFirebase:', error);
return null; return null;
} }
} }
@ -171,6 +168,15 @@ export class DeviceStatusFirebaseService {
relations: ['productDevice'], relations: ['productDevice'],
}); });
} }
async getAllDevices() {
return await this.deviceRepository.find({
where: {
isActive: true,
},
relations: ['productDevice'],
});
}
async getDevicesInstructionStatus(deviceUuid: string) { async getDevicesInstructionStatus(deviceUuid: string) {
try { try {
const deviceDetails = await this.getDeviceByDeviceUuid(deviceUuid); const deviceDetails = await this.getDeviceByDeviceUuid(deviceUuid);

View File

@ -16,53 +16,46 @@ export class SosHandlerService {
); );
} }
async handleSosEventFirebase( async handleSosEventFirebase(device: any, logData: any): Promise<void> {
devId: string, const sosTrueStatus = [{ code: 'sos', value: true }];
logData: any, const sosFalseStatus = [{ code: 'sos', value: false }];
deviceCache: Map<string, any>,
): Promise<void> {
try { try {
await this.deviceStatusFirebaseService.addDeviceStatusToFirebase( // ✅ Send true status
await this.deviceStatusFirebaseService.addDeviceStatusToFirebase({
deviceTuyaUuid: device.deviceTuyaUuid,
status: sosTrueStatus,
log: logData,
device,
});
await this.deviceStatusFirebaseService.addBatchDeviceStatusToOurDb([
{ {
deviceTuyaUuid: devId, deviceTuyaUuid: device.deviceTuyaUuid,
status: [{ code: 'sos', value: true }], status: sosTrueStatus,
log: logData, log: logData,
device,
}, },
deviceCache, ]);
);
await this.deviceStatusFirebaseService.addBatchDeviceStatusToOurDb(
[
{
deviceTuyaUuid: devId,
status: [{ code: 'sos', value: true }],
log: logData,
},
],
deviceCache,
);
// ✅ Schedule false status
setTimeout(async () => { setTimeout(async () => {
try { try {
await this.deviceStatusFirebaseService.addDeviceStatusToFirebase( await this.deviceStatusFirebaseService.addDeviceStatusToFirebase({
{ deviceTuyaUuid: device.deviceTuyaUuid,
deviceTuyaUuid: devId, status: sosFalseStatus,
status: [{ code: 'sos', value: false }], log: logData,
log: logData, device,
}, });
deviceCache,
);
await this.deviceStatusFirebaseService.addBatchDeviceStatusToOurDb( await this.deviceStatusFirebaseService.addBatchDeviceStatusToOurDb([
[ {
{ deviceTuyaUuid: device.deviceTuyaUuid,
deviceTuyaUuid: devId, status: sosFalseStatus,
status: [{ code: 'sos', value: false }], log: logData,
log: logData, device,
}, },
], ]);
deviceCache,
);
} catch (err) { } catch (err) {
this.logger.error('Failed to send SOS false value', err); this.logger.error('Failed to send SOS false value', err);
} }

View File

@ -1,29 +1,29 @@
import { Injectable } from '@nestjs/common'; import { Injectable, OnModuleInit } from '@nestjs/common';
import TuyaWebsocket from '../../config/tuya-web-socket-config'; import TuyaWebsocket from '../../config/tuya-web-socket-config';
import { ConfigService } from '@nestjs/config'; import { ConfigService } from '@nestjs/config';
import { DeviceStatusFirebaseService } from '@app/common/firebase/devices-status/services/devices-status.service'; import { DeviceStatusFirebaseService } from '@app/common/firebase/devices-status/services/devices-status.service';
import { SosHandlerService } from './sos.handler.service'; import { SosHandlerService } from './sos.handler.service';
import { DeviceRepository } from '@app/common/modules/device/repositories'; import * as NodeCache from 'node-cache';
@Injectable() @Injectable()
export class TuyaWebSocketService { export class TuyaWebSocketService implements OnModuleInit {
private client: any; private client: any;
private readonly isDevEnv: boolean; private readonly isDevEnv: boolean;
private readonly deviceCache = new NodeCache({ stdTTL: 7200 }); // TTL = 2 hour
private messageQueue: { private messageQueue: {
devId: string; devId: string;
status: any; status: any;
logData: any; logData: any;
device: any;
}[] = []; }[] = [];
private isProcessing = false; private isProcessing = false;
private deviceCache: Map<string, any> = new Map();
constructor( constructor(
private readonly configService: ConfigService, private readonly configService: ConfigService,
private readonly deviceStatusFirebaseService: DeviceStatusFirebaseService, private readonly deviceStatusFirebaseService: DeviceStatusFirebaseService,
private readonly sosHandlerService: SosHandlerService, private readonly sosHandlerService: SosHandlerService,
private readonly deviceRepository: DeviceRepository,
) { ) {
this.isDevEnv = this.isDevEnv =
this.configService.get<string>('NODE_ENV') === 'development'; this.configService.get<string>('NODE_ENV') === 'development';
@ -36,38 +36,37 @@ export class TuyaWebSocketService {
maxRetryTimes: 100, maxRetryTimes: 100,
}); });
this.loadAllActiveDevices();
// Reload device cache every 1 hour
setInterval(() => this.loadAllActiveDevices(), 60 * 60 * 1000);
if (this.configService.get<string>('tuya-config.TRUN_ON_TUYA_SOCKET')) { if (this.configService.get<string>('tuya-config.TRUN_ON_TUYA_SOCKET')) {
this.setupEventHandlers(); this.setupEventHandlers();
this.client.start(); this.client.start();
} }
// Trigger the queue processor every 15 seconds // Run the queue processor every 15 seconds
setInterval(() => this.processQueue(), 15000); setInterval(() => this.processQueue(), 15000);
// Refresh the cache every 1 hour
setInterval(() => this.initializeDeviceCache(), 30 * 60 * 1000); // 30 minutes
} }
private async loadAllActiveDevices(): Promise<void> { async onModuleInit() {
const devices = await this.deviceRepository.find({ await this.initializeDeviceCache();
where: { isActive: true }, }
relations: ['productDevice'],
});
this.deviceCache.clear(); private async initializeDeviceCache() {
devices.forEach((device) => { try {
this.deviceCache.set(device.deviceTuyaUuid, device); const allDevices = await this.deviceStatusFirebaseService.getAllDevices();
}); allDevices.forEach((device) => {
if (device.deviceTuyaUuid) {
console.log( this.deviceCache.set(device.deviceTuyaUuid, device);
`🔄 Device cache reloaded: ${this.deviceCache.size} active devices at ${new Date().toISOString()}`, }
); });
console.log(`✅ Refreshed cache with ${allDevices.length} devices.`);
} catch (error) {
console.error('❌ Failed to initialize device cache:', error);
}
} }
private setupEventHandlers() { private setupEventHandlers() {
// Event handlers
this.client.open(() => { this.client.open(() => {
console.log('open'); console.log('open');
}); });
@ -75,42 +74,38 @@ export class TuyaWebSocketService {
this.client.message(async (ws: WebSocket, message: any) => { this.client.message(async (ws: WebSocket, message: any) => {
try { try {
const { devId, status, logData } = this.extractMessageData(message); const { devId, status, logData } = this.extractMessageData(message);
if (!Array.isArray(logData?.properties)) return; if (!Array.isArray(logData?.properties)) {
this.client.ackMessage(message.messageId);
return;
}
const device = this.deviceCache.get(devId); const device = this.deviceCache.get(devId);
if (!device) { if (!device) {
// console.log(`Ignored unknown device: ${devId}`); // console.log(⛔ Unknown device: ${devId}, message ignored.);
this.client.ackMessage(message.messageId);
return; return;
} }
if (this.sosHandlerService.isSosTriggered(status)) { if (this.sosHandlerService.isSosTriggered(status)) {
await this.sosHandlerService.handleSosEventFirebase( await this.sosHandlerService.handleSosEventFirebase(devId, logData);
devId,
logData,
this.deviceCache,
);
} else { } else {
// Firebase real-time update await this.deviceStatusFirebaseService.addDeviceStatusToFirebase({
await this.deviceStatusFirebaseService.addDeviceStatusToFirebase( deviceTuyaUuid: devId,
{ status,
deviceTuyaUuid: devId, log: logData,
status, device,
log: logData, });
},
this.deviceCache,
);
} }
// Push to internal queue // Push to internal queue
this.messageQueue.push({ devId, status, logData }); this.messageQueue.push({ devId, status, logData, device });
// Acknowledge the message // Acknowledge the message
this.client.ackMessage(message.messageId); this.client.ackMessage(message.messageId);
} catch (error) { } catch (error) {
console.error('Error receiving message:', error); console.error('Error receiving message:', error);
} }
}); });
this.client.reconnect(() => { this.client.reconnect(() => {
console.log('reconnect'); console.log('reconnect');
}); });
@ -151,8 +146,8 @@ export class TuyaWebSocketService {
deviceTuyaUuid: item.devId, deviceTuyaUuid: item.devId,
status: item.status, status: item.status,
log: item.logData, log: item.logData,
device: item.device,
})), })),
this.deviceCache,
); );
} catch (error) { } catch (error) {
console.error('❌ Error processing batch:', error); console.error('❌ Error processing batch:', error);

22
package-lock.json generated
View File

@ -39,6 +39,7 @@
"ioredis": "^5.3.2", "ioredis": "^5.3.2",
"morgan": "^1.10.0", "morgan": "^1.10.0",
"nest-winston": "^1.10.2", "nest-winston": "^1.10.2",
"node-cache": "^5.1.2",
"nodemailer": "^6.9.10", "nodemailer": "^6.9.10",
"onesignal-node": "^3.4.0", "onesignal-node": "^3.4.0",
"passport-jwt": "^4.0.1", "passport-jwt": "^4.0.1",
@ -10184,6 +10185,27 @@
"node": "^18 || ^20 || >= 21" "node": "^18 || ^20 || >= 21"
} }
}, },
"node_modules/node-cache": {
"version": "5.1.2",
"resolved": "https://registry.npmjs.org/node-cache/-/node-cache-5.1.2.tgz",
"integrity": "sha512-t1QzWwnk4sjLWaQAS8CHgOJ+RAfmHpxFWmc36IWTiWHQfs0w5JDMBS1b1ZxQteo0vVVuWJvIUKHDkkeK7vIGCg==",
"license": "MIT",
"dependencies": {
"clone": "2.x"
},
"engines": {
"node": ">= 8.0.0"
}
},
"node_modules/node-cache/node_modules/clone": {
"version": "2.1.2",
"resolved": "https://registry.npmjs.org/clone/-/clone-2.1.2.tgz",
"integrity": "sha512-3Pe/CF1Nn94hyhIYpjtiLhdCoEoz0DqQ+988E9gmeEdQZlojxnOb74wctFyuwWQHzqyf9X7C7MG8juUpqBJT8w==",
"license": "MIT",
"engines": {
"node": ">=0.8"
}
},
"node_modules/node-emoji": { "node_modules/node-emoji": {
"version": "1.11.0", "version": "1.11.0",
"resolved": "https://registry.npmjs.org/node-emoji/-/node-emoji-1.11.0.tgz", "resolved": "https://registry.npmjs.org/node-emoji/-/node-emoji-1.11.0.tgz",

View File

@ -51,6 +51,7 @@
"ioredis": "^5.3.2", "ioredis": "^5.3.2",
"morgan": "^1.10.0", "morgan": "^1.10.0",
"nest-winston": "^1.10.2", "nest-winston": "^1.10.2",
"node-cache": "^5.1.2",
"nodemailer": "^6.9.10", "nodemailer": "^6.9.10",
"onesignal-node": "^3.4.0", "onesignal-node": "^3.4.0",
"passport-jwt": "^4.0.1", "passport-jwt": "^4.0.1",