From 59bd1c42f7dbee239324ee3ea75e228859dff0b1 Mon Sep 17 00:00:00 2001 From: moonrailgun Date: Mon, 1 Jan 2024 16:37:34 +0800 Subject: [PATCH] refactor: split the monitor's manager and runner into different files --- src/server/model/monitor/index.ts | 260 +--------------------------- src/server/model/monitor/manager.ts | 127 ++++++++++++++ src/server/model/monitor/runner.ts | 135 +++++++++++++++ 3 files changed, 263 insertions(+), 259 deletions(-) create mode 100644 src/server/model/monitor/manager.ts create mode 100644 src/server/model/monitor/runner.ts diff --git a/src/server/model/monitor/index.ts b/src/server/model/monitor/index.ts index 5387c3c..856928d 100644 --- a/src/server/model/monitor/index.ts +++ b/src/server/model/monitor/index.ts @@ -1,261 +1,3 @@ -import { Monitor, Notification } from '@prisma/client'; -import { subscribeEventBus } from '../../ws/shared'; -import { prisma } from '../_client'; -import { monitorProviders } from './provider'; -import { sendNotification } from '../notification'; -import dayjs from 'dayjs'; -import { logger } from '../../utils/logger'; - -export type MonitorUpsertData = Pick< - Monitor, - 'workspaceId' | 'name' | 'type' | 'interval' -> & { - id?: string; - active?: boolean; - notificationIds?: string[]; - payload: Record; -}; - -type MonitorWithNotification = Monitor & { notifications: Notification[] }; - -class MonitorManager { - private monitorRunner: Record = {}; - private isStarted = false; - - /** - * create or update - */ - async upsert(data: MonitorUpsertData): Promise { - let monitor: MonitorWithNotification; - const { id, notificationIds = [], ...others } = data; - if (id) { - // update - monitor = await prisma.monitor.update({ - where: { - id, - }, - data: { - ...others, - notifications: { - set: notificationIds.map((id) => ({ id })), - }, - }, - include: { - notifications: true, - }, - }); - } else { - // create - monitor = await prisma.monitor.create({ - data: { - ...others, - notifications: { - connect: notificationIds.map((id) => ({ id })), - }, - }, - include: { - notifications: true, - }, - }); - } - - if (this.monitorRunner[monitor.id]) { - // Stop and remove old - this.monitorRunner[monitor.id].stopMonitor(); - delete this.monitorRunner[monitor.id]; - } - - const runner = (this.monitorRunner[monitor.id] = new MonitorRunner( - monitor - )); - runner.startMonitor(); - - return monitor; - } - - async delete(workspaceId: string, monitorId: string) { - const runner = this.getRunner(monitorId); - if (!runner) { - throw new Error('This monitor not found'); - } - - runner.stopMonitor(); - delete this.monitorRunner[monitorId]; - - return prisma.monitor.delete({ - where: { - workspaceId, - id: monitorId, - }, - }); - } - - /** - * Get and start all monitors - */ - async startAll() { - if (this.isStarted === true) { - console.warn('MonitorManager.startAll should only call once, skipped.'); - return; - } - - this.isStarted = true; - - const monitors = await prisma.monitor.findMany({ - where: { - active: true, - }, - include: { - notifications: true, - }, - }); - - Promise.all( - monitors.map(async (m) => { - try { - const runner = new MonitorRunner(m); - this.monitorRunner[m.id] = runner; - await runner.startMonitor(); - } catch (err) { - console.error('Start monitor error:', String(err)); - } - }) - ).then(() => { - console.log('All monitor has been begin.'); - }); - } - - getRunner(monitorId: string): MonitorRunner | undefined { - return this.monitorRunner[monitorId]; - } -} - -/** - * Class which actually run monitor data collect - */ -class MonitorRunner { - isStopped = false; - timer: NodeJS.Timeout | null = null; - - constructor(public monitor: Monitor & { notifications: Notification[] }) {} - - /** - * Start single monitor - */ - async startMonitor() { - const monitor = this.monitor; - const { type, interval, workspaceId } = monitor; - - const provider = monitorProviders[type]; - if (!provider) { - throw new Error(`Unknown monitor type: ${type}`); - } - - let currentStatus: 'UP' | 'DOWN' = 'UP'; - - const nextAction = () => { - if (this.isStopped === true) { - return; - } - - this.timer = setTimeout(() => { - run(); - }, interval * 1000); - }; - - const run = async () => { - try { - let value = 0; - try { - value = await provider.run(monitor); - } catch (err) { - logger.error(`[Monitor] (id: ${monitor.id}) run error:`, String(err)); - value = -1; - } - - // check event update - if (value < 0 && currentStatus === 'UP') { - await this.createEvent( - 'DOWN', - `Monitor [${monitor.name}] has been down` - ); - await this.notify( - `[${monitor.name}] 🔴 Down`, - `[${monitor.name}] 🔴 Down\nTime: ${dayjs().format( - 'YYYY-MM-DD HH:mm:ss (z)' - )}` - ); - currentStatus = 'DOWN'; - } else if (value > 0 && currentStatus === 'DOWN') { - await this.createEvent('UP', `Monitor [${monitor.name}] has been up`); - await this.notify( - `[${monitor.name}] ✅ Up`, - `[${monitor.name}] ✅ Up\nTime: ${dayjs().format( - 'YYYY-MM-DD HH:mm:ss (z)' - )}` - ); - currentStatus = 'UP'; - } - - // insert into data - const data = await prisma.monitorData.create({ - data: { - monitorId: monitor.id, - value, - }, - }); - - subscribeEventBus.emit('onMonitorReceiveNewData', workspaceId, data); - - // Run next loop - nextAction(); - } catch (err) { - logger.error('Run monitor error,', monitor.id, String(err)); - } - }; - - run(); - - console.log(`Start monitor ${monitor.name}(${monitor.id})`); - } - - stopMonitor() { - const monitor = this.monitor; - - this.isStopped = true; - if (this.timer) { - clearTimeout(this.timer); - this.timer = null; - } - - console.log(`Stop monitor ${monitor.name}(${monitor.id})`); - } - - async restartMonitor() { - this.stopMonitor(); - this.startMonitor(); - } - - async createEvent(type: 'UP' | 'DOWN', message: string) { - return await prisma.monitorEvent.create({ - data: { - message, - monitorId: this.monitor.id, - type, - }, - }); - } - - async notify(title: string, message: string) { - const notifications = this.monitor.notifications; - await Promise.all( - notifications.map((n) => - sendNotification(n, title, message).catch((err) => { - console.error(err); - }) - ) - ); - } -} +import { MonitorManager } from './manager'; export const monitorManager = new MonitorManager(); diff --git a/src/server/model/monitor/manager.ts b/src/server/model/monitor/manager.ts new file mode 100644 index 0000000..3f0a1ba --- /dev/null +++ b/src/server/model/monitor/manager.ts @@ -0,0 +1,127 @@ +import { Monitor, Notification } from '@prisma/client'; +import { prisma } from '../_client'; +import { MonitorRunner } from './runner'; + +export type MonitorUpsertData = Pick< + Monitor, + 'workspaceId' | 'name' | 'type' | 'interval' +> & { + id?: string; + active?: boolean; + notificationIds?: string[]; + payload: Record; +}; + +type MonitorWithNotification = Monitor & { notifications: Notification[] }; + +export class MonitorManager { + private monitorRunner: Record = {}; + private isStarted = false; + + /** + * create or update + */ + async upsert(data: MonitorUpsertData): Promise { + let monitor: MonitorWithNotification; + const { id, notificationIds = [], ...others } = data; + if (id) { + // update + monitor = await prisma.monitor.update({ + where: { + id, + }, + data: { + ...others, + notifications: { + set: notificationIds.map((id) => ({ id })), + }, + }, + include: { + notifications: true, + }, + }); + } else { + // create + monitor = await prisma.monitor.create({ + data: { + ...others, + notifications: { + connect: notificationIds.map((id) => ({ id })), + }, + }, + include: { + notifications: true, + }, + }); + } + + if (this.monitorRunner[monitor.id]) { + // Stop and remove old + this.monitorRunner[monitor.id].stopMonitor(); + delete this.monitorRunner[monitor.id]; + } + + const runner = (this.monitorRunner[monitor.id] = new MonitorRunner( + monitor + )); + runner.startMonitor(); + + return monitor; + } + + async delete(workspaceId: string, monitorId: string) { + const runner = this.getRunner(monitorId); + if (!runner) { + throw new Error('This monitor not found'); + } + + runner.stopMonitor(); + delete this.monitorRunner[monitorId]; + + return prisma.monitor.delete({ + where: { + workspaceId, + id: monitorId, + }, + }); + } + + /** + * Get and start all monitors + */ + async startAll() { + if (this.isStarted === true) { + console.warn('MonitorManager.startAll should only call once, skipped.'); + return; + } + + this.isStarted = true; + + const monitors = await prisma.monitor.findMany({ + where: { + active: true, + }, + include: { + notifications: true, + }, + }); + + Promise.all( + monitors.map(async (m) => { + try { + const runner = new MonitorRunner(m); + this.monitorRunner[m.id] = runner; + await runner.startMonitor(); + } catch (err) { + console.error('Start monitor error:', String(err)); + } + }) + ).then(() => { + console.log('All monitor has been begin.'); + }); + } + + getRunner(monitorId: string): MonitorRunner | undefined { + return this.monitorRunner[monitorId]; + } +} diff --git a/src/server/model/monitor/runner.ts b/src/server/model/monitor/runner.ts new file mode 100644 index 0000000..0fbc3f9 --- /dev/null +++ b/src/server/model/monitor/runner.ts @@ -0,0 +1,135 @@ +import { Monitor, Notification } from '@prisma/client'; +import { subscribeEventBus } from '../../ws/shared'; +import { prisma } from '../_client'; +import { monitorProviders } from './provider'; +import { sendNotification } from '../notification'; +import dayjs from 'dayjs'; +import { logger } from '../../utils/logger'; + +/** + * Class which actually run monitor data collect + */ +export class MonitorRunner { + isStopped = false; + timer: NodeJS.Timeout | null = null; + + constructor(public monitor: Monitor & { notifications: Notification[] }) {} + + /** + * Start single monitor + */ + async startMonitor() { + const monitor = this.monitor; + const { type, interval, workspaceId } = monitor; + + const provider = monitorProviders[type]; + if (!provider) { + throw new Error(`Unknown monitor type: ${type}`); + } + + let currentStatus: 'UP' | 'DOWN' = 'UP'; + + const nextAction = () => { + if (this.isStopped === true) { + return; + } + + this.timer = setTimeout(() => { + run(); + }, interval * 1000); + }; + + const run = async () => { + try { + let value = 0; + try { + value = await provider.run(monitor); + } catch (err) { + logger.error(`[Monitor] (id: ${monitor.id}) run error:`, String(err)); + value = -1; + } + + // check event update + if (value < 0 && currentStatus === 'UP') { + await this.createEvent( + 'DOWN', + `Monitor [${monitor.name}] has been down` + ); + await this.notify( + `[${monitor.name}] 🔴 Down`, + `[${monitor.name}] 🔴 Down\nTime: ${dayjs().format( + 'YYYY-MM-DD HH:mm:ss (z)' + )}` + ); + currentStatus = 'DOWN'; + } else if (value > 0 && currentStatus === 'DOWN') { + await this.createEvent('UP', `Monitor [${monitor.name}] has been up`); + await this.notify( + `[${monitor.name}] ✅ Up`, + `[${monitor.name}] ✅ Up\nTime: ${dayjs().format( + 'YYYY-MM-DD HH:mm:ss (z)' + )}` + ); + currentStatus = 'UP'; + } + + // insert into data + const data = await prisma.monitorData.create({ + data: { + monitorId: monitor.id, + value, + }, + }); + + subscribeEventBus.emit('onMonitorReceiveNewData', workspaceId, data); + + // Run next loop + nextAction(); + } catch (err) { + logger.error('Run monitor error,', monitor.id, String(err)); + } + }; + + run(); + + console.log(`Start monitor ${monitor.name}(${monitor.id})`); + } + + stopMonitor() { + const monitor = this.monitor; + + this.isStopped = true; + if (this.timer) { + clearTimeout(this.timer); + this.timer = null; + } + + console.log(`Stop monitor ${monitor.name}(${monitor.id})`); + } + + async restartMonitor() { + this.stopMonitor(); + this.startMonitor(); + } + + async createEvent(type: 'UP' | 'DOWN', message: string) { + return await prisma.monitorEvent.create({ + data: { + message, + monitorId: this.monitor.id, + type, + }, + }); + } + + async notify(title: string, message: string) { + const notifications = this.monitor.notifications; + await Promise.all( + notifications.map((n) => + sendNotification(n, title, message).catch((err) => { + console.error(err); + }) + ) + ); + } +}