Add Tuya WebSocket configuration and credentials

This commit is contained in:
faris Aljohari
2024-05-26 00:36:37 +03:00
parent 7946c5673c
commit 853251304f
5 changed files with 308 additions and 1 deletions

View File

@ -1,3 +1,6 @@
import emailConfig from './email.config';
import superAdminConfig from './super.admin.config';
export default [emailConfig, superAdminConfig];
import tuyaConfig from './tuya.config';
import oneSignalConfig from './onesignal.config';
export default [emailConfig, superAdminConfig, tuyaConfig, oneSignalConfig];

View File

@ -0,0 +1,30 @@
export enum TuyaRegionConfigEnum {
CN = 'wss://mqe.tuyacn.com:8285/',
US = 'wss://mqe.tuyaus.com:8285/',
EU = 'wss://mqe.tuyaeu.com:8285/',
IN = 'wss://mqe.tuyain.com:8285/',
}
export enum TUYA_PASULAR_ENV {
PROD = 'prod',
TEST = 'test',
}
export const TuyaEnvConfig = Object.freeze({
[TUYA_PASULAR_ENV.PROD]: {
name: TUYA_PASULAR_ENV.PROD,
value: 'event',
desc: 'online environment',
},
[TUYA_PASULAR_ENV.TEST]: {
name: TUYA_PASULAR_ENV.TEST,
value: 'event-test',
desc: 'test environment',
},
});
type IEnvConfig = typeof TuyaEnvConfig;
export function getTuyaEnvConfig<K extends keyof IEnvConfig>(
env: TUYA_PASULAR_ENV,
): IEnvConfig[K] {
return TuyaEnvConfig[env];
}

View File

@ -0,0 +1,214 @@
import { EventEmitter } from 'events';
import { WebSocket } from 'ws';
import {
TUYA_PASULAR_ENV,
getTuyaEnvConfig,
TuyaRegionConfigEnum,
} from './config';
import { getTopicUrl, buildQuery, buildPassword, decrypt } from './utils';
type LoggerLevel = 'INFO' | 'ERROR';
interface IConfig {
accessId: string;
accessKey: string;
env: TUYA_PASULAR_ENV;
url: TuyaRegionConfigEnum;
timeout?: number;
maxRetryTimes?: number;
retryTimeout?: number;
logger?: (level: LoggerLevel, ...args: any) => void;
}
class TuyaMessageSubscribeWebsocket {
static URL = TuyaRegionConfigEnum;
static env = TUYA_PASULAR_ENV;
static data = 'TUTA_DATA';
static error = 'TUYA_ERROR';
static open = 'TUYA_OPEN';
static close = 'TUYA_CLOSE';
static reconnect = 'TUYA_RECONNECT';
static ping = 'TUYA_PING';
static pong = 'TUYA_PONG';
private config: IConfig;
private server?: WebSocket;
private timer: any;
private retryTimes: number;
private event: EventEmitter;
constructor(config: IConfig) {
this.config = Object.assign(
{
ackTimeoutMillis: 3000,
subscriptionType: 'Failover',
retryTimeout: 1000,
maxRetryTimes: 100,
timeout: 30000,
logger: console.log,
},
config,
);
this.event = new EventEmitter();
this.retryTimes = 0;
}
public start() {
this.server = this._connect();
}
public open(cb: (ws: WebSocket) => void) {
this.event.on(TuyaMessageSubscribeWebsocket.open, cb);
}
public message(cb: (ws: WebSocket, message: any) => void) {
this.event.on(TuyaMessageSubscribeWebsocket.data, cb);
}
public ping(cb: (ws: WebSocket) => void) {
this.event.on(TuyaMessageSubscribeWebsocket.ping, cb);
}
public pong(cb: (ws: WebSocket) => void) {
this.event.on(TuyaMessageSubscribeWebsocket.pong, cb);
}
public reconnect(cb: (ws: WebSocket) => void) {
this.event.on(TuyaMessageSubscribeWebsocket.reconnect, cb);
}
public ackMessage(messageId: string) {
this.server && this.server.send(JSON.stringify({ messageId }));
}
public error(cb: (ws: WebSocket, error: any) => void) {
this.event.on(TuyaMessageSubscribeWebsocket.error, cb);
}
public close(cb: (ws: WebSocket) => void) {
this.event.on(TuyaMessageSubscribeWebsocket.close, cb);
}
private _reconnect() {
if (
this.config.maxRetryTimes &&
this.retryTimes < this.config.maxRetryTimes
) {
const timer = setTimeout(() => {
clearTimeout(timer);
this.retryTimes++;
this._connect(false);
}, this.config.retryTimeout);
}
}
private _connect(isInit = true) {
const { accessId, accessKey, env, url } = this.config;
const topicUrl = getTopicUrl(
url,
accessId,
getTuyaEnvConfig(env).value,
`?${buildQuery({ subscriptionType: 'Failover', ackTimeoutMillis: 30000 })}`,
);
const password = buildPassword(accessId, accessKey);
this.server = new WebSocket(topicUrl, {
rejectUnauthorized: false,
headers: { username: accessId, password },
});
this.subOpen(this.server, isInit);
this.subMessage(this.server);
this.subPing(this.server);
this.subPong(this.server);
this.subError(this.server);
this.subClose(this.server);
return this.server;
}
private subOpen(server: WebSocket, isInit = true) {
server.on('open', () => {
if (server.readyState === server.OPEN) {
this.retryTimes = 0;
}
this.keepAlive(server);
this.event.emit(
isInit
? TuyaMessageSubscribeWebsocket.open
: TuyaMessageSubscribeWebsocket.reconnect,
this.server,
);
});
}
private subPing(server: WebSocket) {
server.on('ping', () => {
this.event.emit(TuyaMessageSubscribeWebsocket.ping, this.server);
this.keepAlive(server);
server.pong(this.config.accessId);
});
}
private subPong(server: WebSocket) {
server.on('pong', () => {
this.keepAlive(server);
this.event.emit(TuyaMessageSubscribeWebsocket.pong, this.server);
});
}
private subMessage(server: WebSocket) {
server.on('message', (data: any) => {
try {
this.keepAlive(server);
const start = Date.now();
const obj = this.handleMessage(data);
this.event.emit(TuyaMessageSubscribeWebsocket.data, this.server, obj);
const end = Date.now();
} catch (e) {
this.logger('ERROR', e);
this.event.emit(TuyaMessageSubscribeWebsocket.error, e);
}
});
}
private subClose(server: WebSocket) {
server.on('close', (...data) => {
this._reconnect();
this.clearKeepAlive();
this.event.emit(TuyaMessageSubscribeWebsocket.close, ...data);
});
}
private subError(server: WebSocket) {
server.on('error', (e) => {
this.event.emit(TuyaMessageSubscribeWebsocket.error, this.server, e);
});
}
private clearKeepAlive() {
clearTimeout(this.timer);
}
private keepAlive(server: WebSocket) {
this.clearKeepAlive();
this.timer = setTimeout(() => {
server.ping(this.config.accessId);
}, this.config.timeout);
}
private handleMessage(data: string) {
const { payload, ...others } = JSON.parse(data);
const pStr = Buffer.from(payload, 'base64').toString('utf-8');
const pJson = JSON.parse(pStr);
pJson.data = decrypt(pJson.data, this.config.accessKey);
return { payload: pJson, ...others };
}
private logger(level: LoggerLevel, ...info: any) {
const realInfo = `${Date.now()} `;
this.config.logger && this.config.logger(level, realInfo, ...info);
}
}
export default TuyaMessageSubscribeWebsocket;

View File

@ -0,0 +1,51 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
/* eslint-disable @typescript-eslint/explicit-module-boundary-types */
import { MD5, AES, enc, mode, pad } from 'crypto-js';
export function getTopicUrl(
websocketUrl: string,
accessId: string,
env: string,
query: string,
) {
return `${websocketUrl}ws/v2/consumer/persistent/${accessId}/out/${env}/${accessId}-sub${query}`;
}
export function buildQuery(query: { [key: string]: number | string }) {
return Object.keys(query)
.map((key) => `${key}=${encodeURIComponent(query[key])}`)
.join('&');
}
export function buildPassword(accessId: string, accessKey: string) {
const key = MD5(accessKey).toString();
return MD5(`${accessId}${key}`).toString().substr(8, 16);
}
export function decrypt(data: string, accessKey: string) {
try {
const realKey = enc.Utf8.parse(accessKey.substring(8, 24));
const json = AES.decrypt(data, realKey, {
mode: mode.ECB,
padding: pad.Pkcs7,
});
const dataStr = enc.Utf8.stringify(json).toString();
return JSON.parse(dataStr);
} catch (e) {
return '';
}
}
export function encrypt(data: any, accessKey: string) {
try {
const realKey = enc.Utf8.parse(accessKey.substring(8, 24));
const realData = JSON.stringify(data);
const retData = AES.encrypt(realData, realKey, {
mode: mode.ECB,
padding: pad.Pkcs7,
}).toString();
return retData;
} catch (e) {
return '';
}
}

View File

@ -0,0 +1,9 @@
import { registerAs } from '@nestjs/config';
export default registerAs(
'tuya-config',
(): Record<string, any> => ({
TUYA_ACCESS_ID: process.env.TUYA_ACCESS_ID,
TUYA_ACCESS_KEY: process.env.TUYA_ACCESS_KEY,
}),
);