mirror of
https://github.com/SyncrowIOT/backend.git
synced 2025-07-10 07:07:21 +00:00
feat: enhance device status handling with caching and batch processing improvements
This commit is contained in:
@ -68,33 +68,23 @@ export class DeviceStatusFirebaseService {
|
||||
}
|
||||
}
|
||||
async addBatchDeviceStatusToOurDb(
|
||||
batch: { deviceTuyaUuid: string; status: any; log: any }[],
|
||||
batch: {
|
||||
deviceTuyaUuid: string;
|
||||
status: any;
|
||||
log: any;
|
||||
device: any;
|
||||
}[],
|
||||
): Promise<void> {
|
||||
const allLogs = [];
|
||||
const deviceMap = new Map<string, any>();
|
||||
|
||||
console.log(
|
||||
`🧠 Starting device lookups for batch of ${batch.length} items...`,
|
||||
);
|
||||
console.log(`🔁 Preparing logs from batch of ${batch.length} items...`);
|
||||
|
||||
// Step 1: Parallel device fetching
|
||||
await Promise.all(
|
||||
batch.map(async (item) => {
|
||||
if (!deviceMap.has(item.deviceTuyaUuid)) {
|
||||
const device = await this.getDeviceByDeviceTuyaUuid(
|
||||
item.deviceTuyaUuid,
|
||||
);
|
||||
device?.uuid && deviceMap.set(item.deviceTuyaUuid, device);
|
||||
}
|
||||
}),
|
||||
);
|
||||
|
||||
console.log(`🔍 Found ${deviceMap.size} devices from batch`);
|
||||
|
||||
// Step 2: Prepare logs and updates
|
||||
for (const item of batch) {
|
||||
const device = deviceMap.get(item.deviceTuyaUuid);
|
||||
if (!device?.uuid) continue;
|
||||
const device = item.device;
|
||||
if (!device?.uuid) {
|
||||
console.log(`⛔ Skipped unknown device: ${item.deviceTuyaUuid}`);
|
||||
continue;
|
||||
}
|
||||
|
||||
const logs = item.log.properties.map((property) =>
|
||||
this.deviceStatusLogRepository.create({
|
||||
@ -142,23 +132,24 @@ export class DeviceStatusFirebaseService {
|
||||
);
|
||||
})();
|
||||
|
||||
// Step 5: Wait for both insert and post-processing to finish
|
||||
await Promise.all([insertLogsPromise]);
|
||||
await insertLogsPromise;
|
||||
}
|
||||
|
||||
async addDeviceStatusToFirebase(
|
||||
addDeviceStatusDto: AddDeviceStatusDto,
|
||||
addDeviceStatusDto: AddDeviceStatusDto & { device?: any },
|
||||
): Promise<AddDeviceStatusDto | null> {
|
||||
try {
|
||||
const device = await this.getDeviceByDeviceTuyaUuid(
|
||||
addDeviceStatusDto.deviceTuyaUuid,
|
||||
);
|
||||
|
||||
let device = addDeviceStatusDto.device;
|
||||
if (!device) {
|
||||
device = await this.getDeviceByDeviceTuyaUuid(
|
||||
addDeviceStatusDto.deviceTuyaUuid,
|
||||
);
|
||||
}
|
||||
if (device?.uuid) {
|
||||
return await this.createDeviceStatusFirebase({
|
||||
deviceUuid: device.uuid,
|
||||
...addDeviceStatusDto,
|
||||
productType: device.productDevice.prodType,
|
||||
productType: device.productDevice?.prodType,
|
||||
});
|
||||
}
|
||||
// Return null if device not found or no UUID
|
||||
@ -178,6 +169,15 @@ export class DeviceStatusFirebaseService {
|
||||
relations: ['productDevice'],
|
||||
});
|
||||
}
|
||||
async getAllDevices() {
|
||||
return await this.deviceRepository.find({
|
||||
where: {
|
||||
isActive: true,
|
||||
},
|
||||
relations: ['productDevice'],
|
||||
});
|
||||
}
|
||||
|
||||
async getDevicesInstructionStatus(deviceUuid: string) {
|
||||
try {
|
||||
const deviceDetails = await this.getDeviceByDeviceUuid(deviceUuid);
|
||||
|
@ -16,33 +16,44 @@ export class SosHandlerService {
|
||||
);
|
||||
}
|
||||
|
||||
async handleSosEventFirebase(devId: string, logData: any): Promise<void> {
|
||||
async handleSosEventFirebase(device: any, logData: any): Promise<void> {
|
||||
const sosTrueStatus = [{ code: 'sos', value: true }];
|
||||
const sosFalseStatus = [{ code: 'sos', value: false }];
|
||||
|
||||
try {
|
||||
// ✅ Send true status
|
||||
await this.deviceStatusFirebaseService.addDeviceStatusToFirebase({
|
||||
deviceTuyaUuid: devId,
|
||||
status: [{ code: 'sos', value: true }],
|
||||
deviceTuyaUuid: device.deviceTuyaUuid,
|
||||
status: sosTrueStatus,
|
||||
log: logData,
|
||||
device,
|
||||
});
|
||||
|
||||
await this.deviceStatusFirebaseService.addBatchDeviceStatusToOurDb([
|
||||
{
|
||||
deviceTuyaUuid: devId,
|
||||
status: [{ code: 'sos', value: true }],
|
||||
deviceTuyaUuid: device.deviceTuyaUuid,
|
||||
status: sosTrueStatus,
|
||||
log: logData,
|
||||
device,
|
||||
},
|
||||
]);
|
||||
|
||||
// ✅ Schedule false status
|
||||
setTimeout(async () => {
|
||||
try {
|
||||
await this.deviceStatusFirebaseService.addDeviceStatusToFirebase({
|
||||
deviceTuyaUuid: devId,
|
||||
status: [{ code: 'sos', value: false }],
|
||||
deviceTuyaUuid: device.deviceTuyaUuid,
|
||||
status: sosFalseStatus,
|
||||
log: logData,
|
||||
device,
|
||||
});
|
||||
|
||||
await this.deviceStatusFirebaseService.addBatchDeviceStatusToOurDb([
|
||||
{
|
||||
deviceTuyaUuid: devId,
|
||||
status: [{ code: 'sos', value: false }],
|
||||
deviceTuyaUuid: device.deviceTuyaUuid,
|
||||
status: sosFalseStatus,
|
||||
log: logData,
|
||||
device,
|
||||
},
|
||||
]);
|
||||
} catch (err) {
|
||||
|
@ -1,18 +1,21 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { Injectable, OnModuleInit } from '@nestjs/common';
|
||||
import TuyaWebsocket from '../../config/tuya-web-socket-config';
|
||||
import { ConfigService } from '@nestjs/config';
|
||||
import { DeviceStatusFirebaseService } from '@app/common/firebase/devices-status/services/devices-status.service';
|
||||
import { SosHandlerService } from './sos.handler.service';
|
||||
import * as NodeCache from 'node-cache';
|
||||
|
||||
@Injectable()
|
||||
export class TuyaWebSocketService {
|
||||
export class TuyaWebSocketService implements OnModuleInit {
|
||||
private client: any;
|
||||
private readonly isDevEnv: boolean;
|
||||
private readonly deviceCache = new NodeCache({ stdTTL: 7200 }); // TTL = 2 hour
|
||||
|
||||
private messageQueue: {
|
||||
devId: string;
|
||||
status: any;
|
||||
logData: any;
|
||||
device: any;
|
||||
}[] = [];
|
||||
|
||||
private isProcessing = false;
|
||||
@ -38,8 +41,29 @@ export class TuyaWebSocketService {
|
||||
this.client.start();
|
||||
}
|
||||
|
||||
// Trigger the queue processor every 15 seconds
|
||||
// Run the queue processor every 15 seconds
|
||||
setInterval(() => this.processQueue(), 15000);
|
||||
|
||||
// Refresh the cache every 1 hour
|
||||
setInterval(() => this.initializeDeviceCache(), 30 * 60 * 1000); // 30 minutes
|
||||
}
|
||||
|
||||
async onModuleInit() {
|
||||
await this.initializeDeviceCache();
|
||||
}
|
||||
|
||||
private async initializeDeviceCache() {
|
||||
try {
|
||||
const allDevices = await this.deviceStatusFirebaseService.getAllDevices();
|
||||
allDevices.forEach((device) => {
|
||||
if (device.deviceTuyaUuid) {
|
||||
this.deviceCache.set(device.deviceTuyaUuid, device);
|
||||
}
|
||||
});
|
||||
console.log(`✅ Refreshed cache with ${allDevices.length} devices.`);
|
||||
} catch (error) {
|
||||
console.error('❌ Failed to initialize device cache:', error);
|
||||
}
|
||||
}
|
||||
|
||||
private setupEventHandlers() {
|
||||
@ -52,6 +76,14 @@ export class TuyaWebSocketService {
|
||||
try {
|
||||
const { devId, status, logData } = this.extractMessageData(message);
|
||||
if (!Array.isArray(logData?.properties)) {
|
||||
this.client.ackMessage(message.messageId);
|
||||
return;
|
||||
}
|
||||
|
||||
const device = this.deviceCache.get(devId);
|
||||
if (!device) {
|
||||
// console.warn(`⚠️ Device not found in cache: ${devId}`);
|
||||
this.client.ackMessage(message.messageId);
|
||||
return;
|
||||
}
|
||||
if (this.sosHandlerService.isSosTriggered(status)) {
|
||||
@ -60,13 +92,14 @@ export class TuyaWebSocketService {
|
||||
// Firebase real-time update
|
||||
await this.deviceStatusFirebaseService.addDeviceStatusToFirebase({
|
||||
deviceTuyaUuid: devId,
|
||||
status: status,
|
||||
status,
|
||||
log: logData,
|
||||
device,
|
||||
});
|
||||
}
|
||||
|
||||
// Push to internal queue
|
||||
this.messageQueue.push({ devId, status, logData });
|
||||
this.messageQueue.push({ devId, status, logData, device });
|
||||
|
||||
// Acknowledge the message
|
||||
this.client.ackMessage(message.messageId);
|
||||
@ -111,10 +144,11 @@ export class TuyaWebSocketService {
|
||||
|
||||
try {
|
||||
await this.deviceStatusFirebaseService.addBatchDeviceStatusToOurDb(
|
||||
batch?.map((item) => ({
|
||||
batch.map((item) => ({
|
||||
deviceTuyaUuid: item.devId,
|
||||
status: item.status,
|
||||
log: item.logData,
|
||||
device: item.device,
|
||||
})),
|
||||
);
|
||||
} catch (error) {
|
||||
|
22
package-lock.json
generated
22
package-lock.json
generated
@ -39,6 +39,7 @@
|
||||
"ioredis": "^5.3.2",
|
||||
"morgan": "^1.10.0",
|
||||
"nest-winston": "^1.10.2",
|
||||
"node-cache": "^5.1.2",
|
||||
"nodemailer": "^6.9.10",
|
||||
"onesignal-node": "^3.4.0",
|
||||
"passport-jwt": "^4.0.1",
|
||||
@ -10184,6 +10185,27 @@
|
||||
"node": "^18 || ^20 || >= 21"
|
||||
}
|
||||
},
|
||||
"node_modules/node-cache": {
|
||||
"version": "5.1.2",
|
||||
"resolved": "https://registry.npmjs.org/node-cache/-/node-cache-5.1.2.tgz",
|
||||
"integrity": "sha512-t1QzWwnk4sjLWaQAS8CHgOJ+RAfmHpxFWmc36IWTiWHQfs0w5JDMBS1b1ZxQteo0vVVuWJvIUKHDkkeK7vIGCg==",
|
||||
"license": "MIT",
|
||||
"dependencies": {
|
||||
"clone": "2.x"
|
||||
},
|
||||
"engines": {
|
||||
"node": ">= 8.0.0"
|
||||
}
|
||||
},
|
||||
"node_modules/node-cache/node_modules/clone": {
|
||||
"version": "2.1.2",
|
||||
"resolved": "https://registry.npmjs.org/clone/-/clone-2.1.2.tgz",
|
||||
"integrity": "sha512-3Pe/CF1Nn94hyhIYpjtiLhdCoEoz0DqQ+988E9gmeEdQZlojxnOb74wctFyuwWQHzqyf9X7C7MG8juUpqBJT8w==",
|
||||
"license": "MIT",
|
||||
"engines": {
|
||||
"node": ">=0.8"
|
||||
}
|
||||
},
|
||||
"node_modules/node-emoji": {
|
||||
"version": "1.11.0",
|
||||
"resolved": "https://registry.npmjs.org/node-emoji/-/node-emoji-1.11.0.tgz",
|
||||
|
@ -51,6 +51,7 @@
|
||||
"ioredis": "^5.3.2",
|
||||
"morgan": "^1.10.0",
|
||||
"nest-winston": "^1.10.2",
|
||||
"node-cache": "^5.1.2",
|
||||
"nodemailer": "^6.9.10",
|
||||
"onesignal-node": "^3.4.0",
|
||||
"passport-jwt": "^4.0.1",
|
||||
|
Reference in New Issue
Block a user