diff --git a/libs/common/src/config/index.ts b/libs/common/src/config/index.ts index d4cbbdb..b642e54 100644 --- a/libs/common/src/config/index.ts +++ b/libs/common/src/config/index.ts @@ -1,3 +1,6 @@ import emailConfig from './email.config'; import superAdminConfig from './super.admin.config'; -export default [emailConfig, superAdminConfig]; +import tuyaConfig from './tuya.config'; +import oneSignalConfig from './onesignal.config'; + +export default [emailConfig, superAdminConfig, tuyaConfig, oneSignalConfig]; diff --git a/libs/common/src/config/tuya-web-socket-config/config.ts b/libs/common/src/config/tuya-web-socket-config/config.ts new file mode 100644 index 0000000..5103805 --- /dev/null +++ b/libs/common/src/config/tuya-web-socket-config/config.ts @@ -0,0 +1,30 @@ +export enum TuyaRegionConfigEnum { + CN = 'wss://mqe.tuyacn.com:8285/', + US = 'wss://mqe.tuyaus.com:8285/', + EU = 'wss://mqe.tuyaeu.com:8285/', + IN = 'wss://mqe.tuyain.com:8285/', +} + +export enum TUYA_PASULAR_ENV { + PROD = 'prod', + TEST = 'test', +} + +export const TuyaEnvConfig = Object.freeze({ + [TUYA_PASULAR_ENV.PROD]: { + name: TUYA_PASULAR_ENV.PROD, + value: 'event', + desc: 'online environment', + }, + [TUYA_PASULAR_ENV.TEST]: { + name: TUYA_PASULAR_ENV.TEST, + value: 'event-test', + desc: 'test environment', + }, +}); +type IEnvConfig = typeof TuyaEnvConfig; +export function getTuyaEnvConfig( + env: TUYA_PASULAR_ENV, +): IEnvConfig[K] { + return TuyaEnvConfig[env]; +} diff --git a/libs/common/src/config/tuya-web-socket-config/index.ts b/libs/common/src/config/tuya-web-socket-config/index.ts new file mode 100644 index 0000000..93e15c0 --- /dev/null +++ b/libs/common/src/config/tuya-web-socket-config/index.ts @@ -0,0 +1,214 @@ +import { EventEmitter } from 'events'; +import { WebSocket } from 'ws'; + +import { + TUYA_PASULAR_ENV, + getTuyaEnvConfig, + TuyaRegionConfigEnum, +} from './config'; +import { getTopicUrl, buildQuery, buildPassword, decrypt } from './utils'; + +type LoggerLevel = 'INFO' | 'ERROR'; + +interface IConfig { + accessId: string; + accessKey: string; + env: TUYA_PASULAR_ENV; + url: TuyaRegionConfigEnum; + + timeout?: number; + maxRetryTimes?: number; + retryTimeout?: number; + logger?: (level: LoggerLevel, ...args: any) => void; +} + +class TuyaMessageSubscribeWebsocket { + static URL = TuyaRegionConfigEnum; + static env = TUYA_PASULAR_ENV; + + static data = 'TUTA_DATA'; + static error = 'TUYA_ERROR'; + static open = 'TUYA_OPEN'; + static close = 'TUYA_CLOSE'; + static reconnect = 'TUYA_RECONNECT'; + static ping = 'TUYA_PING'; + static pong = 'TUYA_PONG'; + + private config: IConfig; + private server?: WebSocket; + private timer: any; + private retryTimes: number; + private event: EventEmitter; + + constructor(config: IConfig) { + this.config = Object.assign( + { + ackTimeoutMillis: 3000, + subscriptionType: 'Failover', + retryTimeout: 1000, + maxRetryTimes: 100, + timeout: 30000, + logger: console.log, + }, + config, + ); + this.event = new EventEmitter(); + this.retryTimes = 0; + } + + public start() { + this.server = this._connect(); + } + + public open(cb: (ws: WebSocket) => void) { + this.event.on(TuyaMessageSubscribeWebsocket.open, cb); + } + + public message(cb: (ws: WebSocket, message: any) => void) { + this.event.on(TuyaMessageSubscribeWebsocket.data, cb); + } + + public ping(cb: (ws: WebSocket) => void) { + this.event.on(TuyaMessageSubscribeWebsocket.ping, cb); + } + + public pong(cb: (ws: WebSocket) => void) { + this.event.on(TuyaMessageSubscribeWebsocket.pong, cb); + } + + public reconnect(cb: (ws: WebSocket) => void) { + this.event.on(TuyaMessageSubscribeWebsocket.reconnect, cb); + } + + public ackMessage(messageId: string) { + this.server && this.server.send(JSON.stringify({ messageId })); + } + + public error(cb: (ws: WebSocket, error: any) => void) { + this.event.on(TuyaMessageSubscribeWebsocket.error, cb); + } + + public close(cb: (ws: WebSocket) => void) { + this.event.on(TuyaMessageSubscribeWebsocket.close, cb); + } + + private _reconnect() { + if ( + this.config.maxRetryTimes && + this.retryTimes < this.config.maxRetryTimes + ) { + const timer = setTimeout(() => { + clearTimeout(timer); + this.retryTimes++; + this._connect(false); + }, this.config.retryTimeout); + } + } + + private _connect(isInit = true) { + const { accessId, accessKey, env, url } = this.config; + const topicUrl = getTopicUrl( + url, + accessId, + getTuyaEnvConfig(env).value, + `?${buildQuery({ subscriptionType: 'Failover', ackTimeoutMillis: 30000 })}`, + ); + const password = buildPassword(accessId, accessKey); + this.server = new WebSocket(topicUrl, { + rejectUnauthorized: false, + headers: { username: accessId, password }, + }); + this.subOpen(this.server, isInit); + this.subMessage(this.server); + this.subPing(this.server); + this.subPong(this.server); + this.subError(this.server); + this.subClose(this.server); + return this.server; + } + + private subOpen(server: WebSocket, isInit = true) { + server.on('open', () => { + if (server.readyState === server.OPEN) { + this.retryTimes = 0; + } + this.keepAlive(server); + this.event.emit( + isInit + ? TuyaMessageSubscribeWebsocket.open + : TuyaMessageSubscribeWebsocket.reconnect, + this.server, + ); + }); + } + + private subPing(server: WebSocket) { + server.on('ping', () => { + this.event.emit(TuyaMessageSubscribeWebsocket.ping, this.server); + this.keepAlive(server); + server.pong(this.config.accessId); + }); + } + + private subPong(server: WebSocket) { + server.on('pong', () => { + this.keepAlive(server); + this.event.emit(TuyaMessageSubscribeWebsocket.pong, this.server); + }); + } + + private subMessage(server: WebSocket) { + server.on('message', (data: any) => { + try { + this.keepAlive(server); + const start = Date.now(); + const obj = this.handleMessage(data); + this.event.emit(TuyaMessageSubscribeWebsocket.data, this.server, obj); + const end = Date.now(); + } catch (e) { + this.logger('ERROR', e); + this.event.emit(TuyaMessageSubscribeWebsocket.error, e); + } + }); + } + + private subClose(server: WebSocket) { + server.on('close', (...data) => { + this._reconnect(); + this.clearKeepAlive(); + this.event.emit(TuyaMessageSubscribeWebsocket.close, ...data); + }); + } + + private subError(server: WebSocket) { + server.on('error', (e) => { + this.event.emit(TuyaMessageSubscribeWebsocket.error, this.server, e); + }); + } + + private clearKeepAlive() { + clearTimeout(this.timer); + } + + private keepAlive(server: WebSocket) { + this.clearKeepAlive(); + this.timer = setTimeout(() => { + server.ping(this.config.accessId); + }, this.config.timeout); + } + + private handleMessage(data: string) { + const { payload, ...others } = JSON.parse(data); + const pStr = Buffer.from(payload, 'base64').toString('utf-8'); + const pJson = JSON.parse(pStr); + pJson.data = decrypt(pJson.data, this.config.accessKey); + return { payload: pJson, ...others }; + } + + private logger(level: LoggerLevel, ...info: any) { + const realInfo = `${Date.now()} `; + this.config.logger && this.config.logger(level, realInfo, ...info); + } +} + +export default TuyaMessageSubscribeWebsocket; diff --git a/libs/common/src/config/tuya-web-socket-config/utils.ts b/libs/common/src/config/tuya-web-socket-config/utils.ts new file mode 100644 index 0000000..5c22043 --- /dev/null +++ b/libs/common/src/config/tuya-web-socket-config/utils.ts @@ -0,0 +1,51 @@ +/* eslint-disable @typescript-eslint/no-explicit-any */ +/* eslint-disable @typescript-eslint/explicit-module-boundary-types */ +import { MD5, AES, enc, mode, pad } from 'crypto-js'; + +export function getTopicUrl( + websocketUrl: string, + accessId: string, + env: string, + query: string, +) { + return `${websocketUrl}ws/v2/consumer/persistent/${accessId}/out/${env}/${accessId}-sub${query}`; +} + +export function buildQuery(query: { [key: string]: number | string }) { + return Object.keys(query) + .map((key) => `${key}=${encodeURIComponent(query[key])}`) + .join('&'); +} + +export function buildPassword(accessId: string, accessKey: string) { + const key = MD5(accessKey).toString(); + return MD5(`${accessId}${key}`).toString().substr(8, 16); +} + +export function decrypt(data: string, accessKey: string) { + try { + const realKey = enc.Utf8.parse(accessKey.substring(8, 24)); + const json = AES.decrypt(data, realKey, { + mode: mode.ECB, + padding: pad.Pkcs7, + }); + const dataStr = enc.Utf8.stringify(json).toString(); + return JSON.parse(dataStr); + } catch (e) { + return ''; + } +} + +export function encrypt(data: any, accessKey: string) { + try { + const realKey = enc.Utf8.parse(accessKey.substring(8, 24)); + const realData = JSON.stringify(data); + const retData = AES.encrypt(realData, realKey, { + mode: mode.ECB, + padding: pad.Pkcs7, + }).toString(); + return retData; + } catch (e) { + return ''; + } +} diff --git a/libs/common/src/config/tuya.config.ts b/libs/common/src/config/tuya.config.ts new file mode 100644 index 0000000..4745c2e --- /dev/null +++ b/libs/common/src/config/tuya.config.ts @@ -0,0 +1,9 @@ +import { registerAs } from '@nestjs/config'; + +export default registerAs( + 'tuya-config', + (): Record => ({ + TUYA_ACCESS_ID: process.env.TUYA_ACCESS_ID, + TUYA_ACCESS_KEY: process.env.TUYA_ACCESS_KEY, + }), +);