fix: enhance device status handling by integrating device cache for improved performance

This commit is contained in:
faris Aljohari
2025-06-25 08:03:23 -06:00
parent 324661e1ee
commit c0a069b460
4 changed files with 128 additions and 84 deletions

View File

@ -69,32 +69,23 @@ export class DeviceStatusFirebaseService {
}
async addBatchDeviceStatusToOurDb(
batch: { deviceTuyaUuid: string; status: any; log: any }[],
deviceCache: Map<string, any>,
): Promise<void> {
const allLogs = [];
const deviceMap = new Map<string, any>();
console.log(
`🧠 Starting device lookups for batch of ${batch.length} items...`,
`🧠 Preparing logs from batch of ${batch.length} items using cached devices only...`,
);
// Step 1: Parallel device fetching
await Promise.all(
batch.map(async (item) => {
if (!deviceMap.has(item.deviceTuyaUuid)) {
const device = await this.getDeviceByDeviceTuyaUuid(
item.deviceTuyaUuid,
);
device?.uuid && deviceMap.set(item.deviceTuyaUuid, device);
}
}),
);
console.log(`🔍 Found ${deviceMap.size} devices from batch`);
// Step 2: Prepare logs and updates
for (const item of batch) {
const device = deviceMap.get(item.deviceTuyaUuid);
if (!device?.uuid) continue;
const device = deviceCache.get(item.deviceTuyaUuid);
if (!device?.uuid) {
console.log(
`⛔ Ignored unknown device in batch: ${item.deviceTuyaUuid}`,
);
continue;
}
const logs = item.log.properties.map((property) =>
this.deviceStatusLogRepository.create({
@ -112,59 +103,53 @@ export class DeviceStatusFirebaseService {
}
console.log(`📝 Total logs to insert: ${allLogs.length}`);
// Step 3: Insert logs in chunks with ON CONFLICT DO NOTHING
const insertLogsPromise = (async () => {
const chunkSize = 300;
let insertedCount = 0;
const chunkSize = 300;
let insertedCount = 0;
for (let i = 0; i < allLogs.length; i += chunkSize) {
const chunk = allLogs.slice(i, i + chunkSize);
try {
const result = await this.deviceStatusLogRepository
.createQueryBuilder()
.insert()
.into('device-status-log') // or use DeviceStatusLogEntity
.values(chunk)
.orIgnore() // skip duplicates
.execute();
for (let i = 0; i < allLogs.length; i += chunkSize) {
const chunk = allLogs.slice(i, i + chunkSize);
try {
const result = await this.deviceStatusLogRepository
.createQueryBuilder()
.insert()
.into('device-status-log') // or use DeviceStatusLogEntity
.values(chunk)
.orIgnore() // skip duplicates
.execute();
insertedCount += result.identifiers.length;
console.log(
`✅ Inserted ${result.identifiers.length} / ${chunk.length} logs (chunk)`,
);
} catch (error) {
console.error('❌ Insert error (skipped chunk):', error.message);
}
insertedCount += result.identifiers.length;
console.log(
`✅ Inserted ${result.identifiers.length} / ${chunk.length} logs (chunk)`,
);
} catch (error) {
console.error('❌ Insert error (skipped chunk):', error.message);
}
}
console.log(
`✅ Total logs inserted: ${insertedCount} / ${allLogs.length}`,
);
})();
// Step 5: Wait for both insert and post-processing to finish
await Promise.all([insertLogsPromise]);
console.log(`✅ Total logs inserted: ${insertedCount} / ${allLogs.length}`);
}
async addDeviceStatusToFirebase(
addDeviceStatusDto: AddDeviceStatusDto,
deviceCache: Map<string, any>,
): Promise<AddDeviceStatusDto | null> {
try {
const device = await this.getDeviceByDeviceTuyaUuid(
addDeviceStatusDto.deviceTuyaUuid,
);
if (device?.uuid) {
return await this.createDeviceStatusFirebase({
deviceUuid: device.uuid,
...addDeviceStatusDto,
productType: device.productDevice.prodType,
});
const device = deviceCache.get(addDeviceStatusDto.deviceTuyaUuid);
if (!device?.uuid) {
console.log(
`⛔ Skipping Firebase update for unknown device: ${addDeviceStatusDto.deviceTuyaUuid}`,
);
return null;
}
// Return null if device not found or no UUID
return null;
// Ensure product info and uuid are attached
addDeviceStatusDto.deviceUuid = device.uuid;
addDeviceStatusDto.productUuid = device.productDevice?.uuid;
addDeviceStatusDto.productType = device.productDevice?.prodType;
return await this.createDeviceStatusFirebase(addDeviceStatusDto);
} catch (error) {
// Handle the error silently, perhaps log it internally or ignore it
console.error('❌ Error in addDeviceStatusToFirebase:', error);
return null;
}
}

View File

@ -8,7 +8,10 @@ import { TuyaWebSocketService } from './services/tuya.web.socket.service';
import { OneSignalService } from './services/onesignal.service';
import { DeviceMessagesService } from './services/device.messages.service';
import { DeviceRepositoryModule } from '../modules/device/device.repository.module';
import { DeviceNotificationRepository } from '../modules/device/repositories';
import {
DeviceNotificationRepository,
DeviceRepository,
} from '../modules/device/repositories';
import { DeviceStatusFirebaseModule } from '../firebase/devices-status/devices-status.module';
import { CommunityPermissionService } from './services/community.permission.service';
import { CommunityRepository } from '../modules/community/repositories';
@ -27,6 +30,7 @@ import { SosHandlerService } from './services/sos.handler.service';
DeviceNotificationRepository,
CommunityRepository,
SosHandlerService,
DeviceRepository,
],
exports: [
HelperHashService,

View File

@ -16,35 +16,53 @@ export class SosHandlerService {
);
}
async handleSosEventFirebase(devId: string, logData: any): Promise<void> {
async handleSosEventFirebase(
devId: string,
logData: any,
deviceCache: Map<string, any>,
): Promise<void> {
try {
await this.deviceStatusFirebaseService.addDeviceStatusToFirebase({
deviceTuyaUuid: devId,
status: [{ code: 'sos', value: true }],
log: logData,
});
await this.deviceStatusFirebaseService.addBatchDeviceStatusToOurDb([
await this.deviceStatusFirebaseService.addDeviceStatusToFirebase(
{
deviceTuyaUuid: devId,
status: [{ code: 'sos', value: true }],
log: logData,
},
]);
deviceCache,
);
await this.deviceStatusFirebaseService.addBatchDeviceStatusToOurDb(
[
{
deviceTuyaUuid: devId,
status: [{ code: 'sos', value: true }],
log: logData,
},
],
deviceCache,
);
setTimeout(async () => {
try {
await this.deviceStatusFirebaseService.addDeviceStatusToFirebase({
deviceTuyaUuid: devId,
status: [{ code: 'sos', value: false }],
log: logData,
});
await this.deviceStatusFirebaseService.addBatchDeviceStatusToOurDb([
await this.deviceStatusFirebaseService.addDeviceStatusToFirebase(
{
deviceTuyaUuid: devId,
status: [{ code: 'sos', value: false }],
log: logData,
},
]);
deviceCache,
);
await this.deviceStatusFirebaseService.addBatchDeviceStatusToOurDb(
[
{
deviceTuyaUuid: devId,
status: [{ code: 'sos', value: false }],
log: logData,
},
],
deviceCache,
);
} catch (err) {
this.logger.error('Failed to send SOS false value', err);
}

View File

@ -3,6 +3,7 @@ import TuyaWebsocket from '../../config/tuya-web-socket-config';
import { ConfigService } from '@nestjs/config';
import { DeviceStatusFirebaseService } from '@app/common/firebase/devices-status/services/devices-status.service';
import { SosHandlerService } from './sos.handler.service';
import { DeviceRepository } from '@app/common/modules/device/repositories';
@Injectable()
export class TuyaWebSocketService {
@ -16,11 +17,13 @@ export class TuyaWebSocketService {
}[] = [];
private isProcessing = false;
private deviceCache: Map<string, any> = new Map();
constructor(
private readonly configService: ConfigService,
private readonly deviceStatusFirebaseService: DeviceStatusFirebaseService,
private readonly sosHandlerService: SosHandlerService,
private readonly deviceRepository: DeviceRepository,
) {
this.isDevEnv =
this.configService.get<string>('NODE_ENV') === 'development';
@ -33,6 +36,11 @@ export class TuyaWebSocketService {
maxRetryTimes: 100,
});
this.loadAllActiveDevices();
// Reload device cache every 1 hour
setInterval(() => this.loadAllActiveDevices(), 60 * 60 * 1000);
if (this.configService.get<string>('tuya-config.TRUN_ON_TUYA_SOCKET')) {
this.setupEventHandlers();
this.client.start();
@ -42,6 +50,22 @@ export class TuyaWebSocketService {
setInterval(() => this.processQueue(), 15000);
}
private async loadAllActiveDevices(): Promise<void> {
const devices = await this.deviceRepository.find({
where: { isActive: true },
relations: ['productDevice'],
});
this.deviceCache.clear();
devices.forEach((device) => {
this.deviceCache.set(device.deviceTuyaUuid, device);
});
console.log(
`🔄 Device cache reloaded: ${this.deviceCache.size} active devices at ${new Date().toISOString()}`,
);
}
private setupEventHandlers() {
// Event handlers
this.client.open(() => {
@ -51,18 +75,30 @@ export class TuyaWebSocketService {
this.client.message(async (ws: WebSocket, message: any) => {
try {
const { devId, status, logData } = this.extractMessageData(message);
if (!Array.isArray(logData?.properties)) {
if (!Array.isArray(logData?.properties)) return;
const device = this.deviceCache.get(devId);
if (!device) {
// console.log(`⛔ Ignored unknown device: ${devId}`);
return;
}
if (this.sosHandlerService.isSosTriggered(status)) {
await this.sosHandlerService.handleSosEventFirebase(devId, logData);
await this.sosHandlerService.handleSosEventFirebase(
devId,
logData,
this.deviceCache,
);
} else {
// Firebase real-time update
await this.deviceStatusFirebaseService.addDeviceStatusToFirebase({
deviceTuyaUuid: devId,
status: status,
log: logData,
});
await this.deviceStatusFirebaseService.addDeviceStatusToFirebase(
{
deviceTuyaUuid: devId,
status,
log: logData,
},
this.deviceCache,
);
}
// Push to internal queue
@ -111,11 +147,12 @@ export class TuyaWebSocketService {
try {
await this.deviceStatusFirebaseService.addBatchDeviceStatusToOurDb(
batch?.map((item) => ({
batch.map((item) => ({
deviceTuyaUuid: item.devId,
status: item.status,
log: item.logData,
})),
this.deviceCache,
);
} catch (error) {
console.error('❌ Error processing batch:', error);