mirror of
https://github.com/SyncrowIOT/backend.git
synced 2025-07-10 15:17:41 +00:00
refactor: clean up unused services and optimize batch processing in DeviceStatusFirebaseService
This commit is contained in:
@ -3,28 +3,12 @@ import { DeviceStatusFirebaseController } from './controllers/devices-status.con
|
|||||||
import { DeviceStatusFirebaseService } from './services/devices-status.service';
|
import { DeviceStatusFirebaseService } from './services/devices-status.service';
|
||||||
import { DeviceRepository } from '@app/common/modules/device/repositories';
|
import { DeviceRepository } from '@app/common/modules/device/repositories';
|
||||||
import { DeviceStatusLogRepository } from '@app/common/modules/device-status-log/repositories/device-status.repository';
|
import { DeviceStatusLogRepository } from '@app/common/modules/device-status-log/repositories/device-status.repository';
|
||||||
import { PowerClampService } from '@app/common/helper/services/power.clamp.service';
|
|
||||||
import {
|
|
||||||
PowerClampHourlyRepository,
|
|
||||||
PowerClampDailyRepository,
|
|
||||||
PowerClampMonthlyRepository,
|
|
||||||
} from '@app/common/modules/power-clamp/repositories';
|
|
||||||
import { SqlLoaderService } from '@app/common/helper/services/sql-loader.service';
|
|
||||||
import { OccupancyService } from '@app/common/helper/services/occupancy.service';
|
|
||||||
import { AqiDataService } from '@app/common/helper/services/aqi.data.service';
|
|
||||||
|
|
||||||
@Module({
|
@Module({
|
||||||
providers: [
|
providers: [
|
||||||
DeviceStatusFirebaseService,
|
DeviceStatusFirebaseService,
|
||||||
DeviceRepository,
|
DeviceRepository,
|
||||||
DeviceStatusLogRepository,
|
DeviceStatusLogRepository,
|
||||||
PowerClampService,
|
|
||||||
PowerClampHourlyRepository,
|
|
||||||
PowerClampDailyRepository,
|
|
||||||
PowerClampMonthlyRepository,
|
|
||||||
SqlLoaderService,
|
|
||||||
OccupancyService,
|
|
||||||
AqiDataService,
|
|
||||||
],
|
],
|
||||||
controllers: [DeviceStatusFirebaseController],
|
controllers: [DeviceStatusFirebaseController],
|
||||||
exports: [DeviceStatusFirebaseService, DeviceStatusLogRepository],
|
exports: [DeviceStatusFirebaseService, DeviceStatusLogRepository],
|
||||||
|
@ -18,12 +18,6 @@ import {
|
|||||||
runTransaction,
|
runTransaction,
|
||||||
} from 'firebase/database';
|
} from 'firebase/database';
|
||||||
import { DeviceStatusLogRepository } from '@app/common/modules/device-status-log/repositories';
|
import { DeviceStatusLogRepository } from '@app/common/modules/device-status-log/repositories';
|
||||||
import { ProductType } from '@app/common/constants/product-type.enum';
|
|
||||||
import { PowerClampService } from '@app/common/helper/services/power.clamp.service';
|
|
||||||
import { PowerClampEnergyEnum } from '@app/common/constants/power.clamp.enargy.enum';
|
|
||||||
import { PresenceSensorEnum } from '@app/common/constants/presence.sensor.enum';
|
|
||||||
import { OccupancyService } from '@app/common/helper/services/occupancy.service';
|
|
||||||
import { AqiDataService } from '@app/common/helper/services/aqi.data.service';
|
|
||||||
@Injectable()
|
@Injectable()
|
||||||
export class DeviceStatusFirebaseService {
|
export class DeviceStatusFirebaseService {
|
||||||
private tuya: TuyaContext;
|
private tuya: TuyaContext;
|
||||||
@ -31,9 +25,6 @@ export class DeviceStatusFirebaseService {
|
|||||||
constructor(
|
constructor(
|
||||||
private readonly configService: ConfigService,
|
private readonly configService: ConfigService,
|
||||||
private readonly deviceRepository: DeviceRepository,
|
private readonly deviceRepository: DeviceRepository,
|
||||||
private readonly powerClampService: PowerClampService,
|
|
||||||
private readonly occupancyService: OccupancyService,
|
|
||||||
private readonly aqiDataService: AqiDataService,
|
|
||||||
private deviceStatusLogRepository: DeviceStatusLogRepository,
|
private deviceStatusLogRepository: DeviceStatusLogRepository,
|
||||||
) {
|
) {
|
||||||
const accessKey = this.configService.get<string>('auth-config.ACCESS_KEY');
|
const accessKey = this.configService.get<string>('auth-config.ACCESS_KEY');
|
||||||
@ -76,28 +67,85 @@ export class DeviceStatusFirebaseService {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
async addDeviceStatusToOurDb(
|
async addBatchDeviceStatusToOurDb(
|
||||||
addDeviceStatusDto: AddDeviceStatusDto,
|
batch: { deviceTuyaUuid: string; status: any; log: any }[],
|
||||||
): Promise<AddDeviceStatusDto | null> {
|
): Promise<void> {
|
||||||
try {
|
const allLogs = [];
|
||||||
const device = await this.getDeviceByDeviceTuyaUuid(
|
const deviceMap = new Map<string, any>();
|
||||||
addDeviceStatusDto.deviceTuyaUuid,
|
|
||||||
);
|
|
||||||
|
|
||||||
if (device?.uuid) {
|
console.log(
|
||||||
return await this.createDeviceStatusInOurDb({
|
`🧠 Starting device lookups for batch of ${batch.length} items...`,
|
||||||
deviceUuid: device.uuid,
|
);
|
||||||
...addDeviceStatusDto,
|
|
||||||
productType: device.productDevice.prodType,
|
// Step 1: Parallel device fetching
|
||||||
});
|
await Promise.all(
|
||||||
}
|
batch.map(async (item) => {
|
||||||
// Return null if device not found or no UUID
|
if (!deviceMap.has(item.deviceTuyaUuid)) {
|
||||||
return null;
|
const device = await this.getDeviceByDeviceTuyaUuid(
|
||||||
} catch (error) {
|
item.deviceTuyaUuid,
|
||||||
// Handle the error silently, perhaps log it internally or ignore it
|
);
|
||||||
return null;
|
device?.uuid && deviceMap.set(item.deviceTuyaUuid, device);
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
console.log(`🔍 Found ${deviceMap.size} devices from batch`);
|
||||||
|
|
||||||
|
// Step 2: Prepare logs and updates
|
||||||
|
for (const item of batch) {
|
||||||
|
const device = deviceMap.get(item.deviceTuyaUuid);
|
||||||
|
if (!device?.uuid) continue;
|
||||||
|
|
||||||
|
const logs = item.log.properties.map((property) =>
|
||||||
|
this.deviceStatusLogRepository.create({
|
||||||
|
deviceId: device.uuid,
|
||||||
|
deviceTuyaId: item.deviceTuyaUuid,
|
||||||
|
productId: item.log.productId,
|
||||||
|
log: item.log,
|
||||||
|
code: property.code,
|
||||||
|
value: property.value,
|
||||||
|
eventId: item.log.dataId,
|
||||||
|
eventTime: new Date(property.time).toISOString(),
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
allLogs.push(...logs);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
console.log(`📝 Total logs to insert: ${allLogs.length}`);
|
||||||
|
// Step 3: Insert logs in chunks with ON CONFLICT DO NOTHING
|
||||||
|
const insertLogsPromise = (async () => {
|
||||||
|
const chunkSize = 300;
|
||||||
|
let insertedCount = 0;
|
||||||
|
|
||||||
|
for (let i = 0; i < allLogs.length; i += chunkSize) {
|
||||||
|
const chunk = allLogs.slice(i, i + chunkSize);
|
||||||
|
try {
|
||||||
|
const result = await this.deviceStatusLogRepository
|
||||||
|
.createQueryBuilder()
|
||||||
|
.insert()
|
||||||
|
.into('device-status-log') // or use DeviceStatusLogEntity
|
||||||
|
.values(chunk)
|
||||||
|
.orIgnore() // skip duplicates
|
||||||
|
.execute();
|
||||||
|
|
||||||
|
insertedCount += result.identifiers.length;
|
||||||
|
console.log(
|
||||||
|
`✅ Inserted ${result.identifiers.length} / ${chunk.length} logs (chunk)`,
|
||||||
|
);
|
||||||
|
} catch (error) {
|
||||||
|
console.error('❌ Insert error (skipped chunk):', error.message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
console.log(
|
||||||
|
`✅ Total logs inserted: ${insertedCount} / ${allLogs.length}`,
|
||||||
|
);
|
||||||
|
})();
|
||||||
|
|
||||||
|
// Step 5: Wait for both insert and post-processing to finish
|
||||||
|
await Promise.all([insertLogsPromise]);
|
||||||
}
|
}
|
||||||
|
|
||||||
async addDeviceStatusToFirebase(
|
async addDeviceStatusToFirebase(
|
||||||
addDeviceStatusDto: AddDeviceStatusDto,
|
addDeviceStatusDto: AddDeviceStatusDto,
|
||||||
): Promise<AddDeviceStatusDto | null> {
|
): Promise<AddDeviceStatusDto | null> {
|
||||||
@ -237,66 +285,4 @@ export class DeviceStatusFirebaseService {
|
|||||||
const snapshot: DataSnapshot = await get(dataRef);
|
const snapshot: DataSnapshot = await get(dataRef);
|
||||||
return snapshot.val();
|
return snapshot.val();
|
||||||
}
|
}
|
||||||
async createDeviceStatusInOurDb(
|
|
||||||
addDeviceStatusDto: AddDeviceStatusDto,
|
|
||||||
): Promise<any> {
|
|
||||||
// Save logs to your repository
|
|
||||||
const newLogs = addDeviceStatusDto.log.properties.map((property) => {
|
|
||||||
return this.deviceStatusLogRepository.create({
|
|
||||||
deviceId: addDeviceStatusDto.deviceUuid,
|
|
||||||
deviceTuyaId: addDeviceStatusDto.deviceTuyaUuid,
|
|
||||||
productId: addDeviceStatusDto.log.productId,
|
|
||||||
log: addDeviceStatusDto.log,
|
|
||||||
code: property.code,
|
|
||||||
value: property.value,
|
|
||||||
eventId: addDeviceStatusDto.log.dataId,
|
|
||||||
eventTime: new Date(property.time).toISOString(),
|
|
||||||
});
|
|
||||||
});
|
|
||||||
await this.deviceStatusLogRepository.save(newLogs);
|
|
||||||
|
|
||||||
if (addDeviceStatusDto.productType === ProductType.PC) {
|
|
||||||
const energyCodes = new Set([
|
|
||||||
PowerClampEnergyEnum.ENERGY_CONSUMED,
|
|
||||||
PowerClampEnergyEnum.ENERGY_CONSUMED_A,
|
|
||||||
PowerClampEnergyEnum.ENERGY_CONSUMED_B,
|
|
||||||
PowerClampEnergyEnum.ENERGY_CONSUMED_C,
|
|
||||||
]);
|
|
||||||
|
|
||||||
const energyStatus = addDeviceStatusDto?.log?.properties?.find((status) =>
|
|
||||||
energyCodes.has(status.code),
|
|
||||||
);
|
|
||||||
|
|
||||||
if (energyStatus) {
|
|
||||||
await this.powerClampService.updateEnergyConsumedHistoricalData(
|
|
||||||
addDeviceStatusDto.deviceUuid,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (
|
|
||||||
addDeviceStatusDto.productType === ProductType.CPS ||
|
|
||||||
addDeviceStatusDto.productType === ProductType.WPS
|
|
||||||
) {
|
|
||||||
const occupancyCodes = new Set([PresenceSensorEnum.PRESENCE_STATE]);
|
|
||||||
|
|
||||||
const occupancyStatus = addDeviceStatusDto?.log?.properties?.find(
|
|
||||||
(status) => occupancyCodes.has(status.code),
|
|
||||||
);
|
|
||||||
|
|
||||||
if (occupancyStatus) {
|
|
||||||
await this.occupancyService.updateOccupancySensorHistoricalData(
|
|
||||||
addDeviceStatusDto.deviceUuid,
|
|
||||||
);
|
|
||||||
await this.occupancyService.updateOccupancySensorHistoricalDurationData(
|
|
||||||
addDeviceStatusDto.deviceUuid,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (addDeviceStatusDto.productType === ProductType.AQI) {
|
|
||||||
await this.aqiDataService.updateAQISensorHistoricalData(
|
|
||||||
addDeviceStatusDto.deviceUuid,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -38,8 +38,8 @@ export class TuyaWebSocketService {
|
|||||||
this.client.start();
|
this.client.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Trigger the queue processor every 2 seconds
|
// Trigger the queue processor every 15 seconds
|
||||||
setInterval(() => this.processQueue(), 10000);
|
setInterval(() => this.processQueue(), 15000);
|
||||||
}
|
}
|
||||||
|
|
||||||
private setupEventHandlers() {
|
private setupEventHandlers() {
|
||||||
@ -93,32 +93,30 @@ export class TuyaWebSocketService {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
private async processQueue() {
|
private async processQueue() {
|
||||||
if (this.isProcessing || this.messageQueue.length === 0) return;
|
if (this.isProcessing) {
|
||||||
|
console.log('⏳ Skipping: still processing previous batch');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.messageQueue.length === 0) return;
|
||||||
|
|
||||||
this.isProcessing = true;
|
this.isProcessing = true;
|
||||||
|
|
||||||
const batch = [...this.messageQueue];
|
const batch = [...this.messageQueue];
|
||||||
this.messageQueue = [];
|
this.messageQueue = [];
|
||||||
|
|
||||||
|
console.log(`🔁 Processing batch of size: ${batch.length}`);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
for (const item of batch) {
|
await this.deviceStatusFirebaseService.addBatchDeviceStatusToOurDb(
|
||||||
if (this.sosHandlerService.isSosTriggered(item.status)) {
|
batch.map((item) => ({
|
||||||
await this.sosHandlerService.handleSosEventOurDb(
|
deviceTuyaUuid: item.devId,
|
||||||
item.devId,
|
status: item.status,
|
||||||
item.logData,
|
log: item.logData,
|
||||||
);
|
})),
|
||||||
} else {
|
);
|
||||||
await this.deviceStatusFirebaseService.addDeviceStatusToOurDb({
|
|
||||||
deviceTuyaUuid: item.devId,
|
|
||||||
status: item.status,
|
|
||||||
log: item.logData,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('Error processing batch:', error);
|
console.error('❌ Error processing batch:', error);
|
||||||
// Re-add the batch to the queue for retry
|
this.messageQueue.unshift(...batch); // retry
|
||||||
this.messageQueue.unshift(...batch);
|
|
||||||
} finally {
|
} finally {
|
||||||
this.isProcessing = false;
|
this.isProcessing = false;
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user