From a92dd513f165a54ec33aa8bbc382d0997aad05a1 Mon Sep 17 00:00:00 2001 From: moonrailgun Date: Thu, 5 Oct 2023 01:56:33 +0800 Subject: [PATCH] feat: add monitor manager and runner --- package.json | 2 + pnpm-lock.yaml | 15 ++ prisma/schema.prisma | 19 +-- src/server/main.ts | 3 + src/server/model/monitor/index.ts | 170 +++++++++++++++++++++ src/server/model/monitor/provider/index.ts | 6 + src/server/model/monitor/provider/ping.ts | 48 ++++++ src/server/model/monitor/provider/type.ts | 5 + src/server/trpc/routers/monitor.ts | 42 +---- 9 files changed, 265 insertions(+), 45 deletions(-) create mode 100644 src/server/model/monitor/index.ts create mode 100644 src/server/model/monitor/provider/index.ts create mode 100644 src/server/model/monitor/provider/ping.ts create mode 100644 src/server/model/monitor/provider/type.ts diff --git a/package.json b/package.json index 6f6cbfc..e81f4d5 100644 --- a/package.json +++ b/package.json @@ -48,6 +48,7 @@ "openbadge": "^1.0.4", "passport": "^0.6.0", "passport-jwt": "^4.0.1", + "ping": "^0.4.4", "pretty-ms": "7.0.1", "react": "^18.2.0", "react-dom": "^18.2.0", @@ -76,6 +77,7 @@ "@types/nodemailer": "^6.4.11", "@types/passport": "^1.0.12", "@types/passport-jwt": "^3.0.9", + "@types/ping": "^0.4.2", "@types/react": "^18.2.21", "@types/react-dom": "^18.2.7", "@types/request-ip": "^0.0.38", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index a91bd23..8de1189 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -106,6 +106,9 @@ dependencies: passport-jwt: specifier: ^4.0.1 version: 4.0.1 + ping: + specifier: ^0.4.4 + version: 0.4.4 pretty-ms: specifier: 7.0.1 version: 7.0.1 @@ -186,6 +189,9 @@ devDependencies: '@types/passport-jwt': specifier: ^3.0.9 version: 3.0.9 + '@types/ping': + specifier: ^0.4.2 + version: 0.4.2 '@types/react': specifier: ^18.2.21 version: 18.2.21 @@ -2074,6 +2080,10 @@ packages: '@types/express': 4.17.17 dev: true + /@types/ping@0.4.2: + resolution: {integrity: sha512-5WAzkazMQP7EBDfGtfOV8Tkh7SoXhyo8UYXsR7G7RCRihyM5Ds2zbxgrS+eI0bMqL/GPRO7FcJCK7czoxQmXNw==} + dev: true + /@types/prop-types@15.7.5: resolution: {integrity: sha512-JCB8C6SnDoQf0cNycqd/35A7MjcnK+ZTqE7judS6o7utxUCg6imJg3QK2qzHKszlTjcj2cn+NwMB2i96ubpj7w==} @@ -4381,6 +4391,11 @@ packages: engines: {node: '>=0.10.0'} dev: true + /ping@0.4.4: + resolution: {integrity: sha512-56ZMC0j7SCsMMLdOoUg12VZCfj/+ZO+yfOSjaNCRrmZZr6GLbN2X/Ui56T15dI8NhiHckaw5X2pvyfAomanwqQ==} + engines: {node: '>=4.0.0'} + dev: false + /pirates@4.0.6: resolution: {integrity: sha512-saLsH7WeYYPiD25LDuLRRY/i+6HaPYr6G1OUlN39otzkSTxKnubR9RTxS3/Kk50s1g2JTgFwWQDQyplC5/SHZg==} engines: {node: '>= 6'} diff --git a/prisma/schema.prisma b/prisma/schema.prisma index 929c3cd..908df5c 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -63,9 +63,9 @@ model Website { updatedAt DateTime? @updatedAt @db.Timestamptz(6) deletedAt DateTime? @db.Timestamptz(6) - workspace Workspace @relation(fields: [workspaceId], references: [id], onUpdate: Cascade, onDelete: Cascade) + workspace Workspace @relation(fields: [workspaceId], references: [id], onUpdate: Cascade, onDelete: Cascade) - sessions WebsiteSession[] + sessions WebsiteSession[] eventData WebsiteEventData[] sessionData WebsiteSessionData[] @@ -89,7 +89,7 @@ model WebsiteSession { city String? @db.VarChar(50) createdAt DateTime? @default(now()) @db.Timestamptz(6) - website Website @relation(fields: [websiteId], references: [id], onUpdate: Cascade, onDelete: Cascade) + website Website @relation(fields: [websiteId], references: [id], onUpdate: Cascade, onDelete: Cascade) websiteEvent WebsiteEvent[] sessionData WebsiteSessionData[] @@ -123,7 +123,7 @@ model WebsiteEvent { createdAt DateTime? @default(now()) @db.Timestamptz(6) eventData WebsiteEventData[] - session WebsiteSession @relation(fields: [sessionId], references: [id],onUpdate: Cascade, onDelete: Cascade) + session WebsiteSession @relation(fields: [sessionId], references: [id], onUpdate: Cascade, onDelete: Cascade) @@index([createdAt]) @@index([sessionId]) @@ -234,8 +234,9 @@ model Monitor { type String @db.VarChar(100) active Boolean @default(true) @db.Boolean interval Int @default(20) @db.Integer - maxRetry Int @default(0) @db.Integer - retryInterval Int @default(0) @db.Integer + // TODO + // maxRetry Int @default(0) @db.Integer + // retryInterval Int @default(0) @db.Integer payload Json @db.Json createdAt DateTime? @default(now()) @db.Timestamptz(6) @@ -243,7 +244,7 @@ model Monitor { notifications Notification[] events MonitorEvent[] - datas MonitorData[] + datas MonitorData[] @@index([workspaceId]) } @@ -252,7 +253,7 @@ model MonitorEvent { id String @id @default(uuid()) @db.Uuid message String @db.VarChar(500) monitorId String @db.Uuid - type String @db.VarChar(100) // Up or Down + type String @db.VarChar(100) // UP or DOWN createdAt DateTime? @default(now()) @db.Timestamptz(6) monitor Monitor @relation(fields: [monitorId], references: [id], onUpdate: Cascade, onDelete: Cascade) @@ -261,7 +262,7 @@ model MonitorEvent { model MonitorData { id String @id @default(uuid()) @db.Uuid monitorId String @db.Uuid - value Int @default(0) @db.Integer // -1 means error + value Int @default(0) @db.Integer // -1 means error createdAt DateTime? @default(now()) @db.Timestamptz(6) monitor Monitor @relation(fields: [monitorId], references: [id], onUpdate: Cascade, onDelete: Cascade) diff --git a/src/server/main.ts b/src/server/main.ts index e91796b..30c0130 100644 --- a/src/server/main.ts +++ b/src/server/main.ts @@ -14,6 +14,7 @@ import { trpcExpressMiddleware } from './trpc'; import { initUdpServer } from './udp/server'; import { createServer } from 'http'; import { initSocketio } from './ws'; +import { monitorManager } from './model/monitor'; const port = Number(process.env.PORT || 12345); @@ -24,6 +25,8 @@ initUdpServer(port); initSocketio(httpServer); +monitorManager.startAll(); + app.use(compression()); app.use(express.json()); app.use(passport.initialize()); diff --git a/src/server/model/monitor/index.ts b/src/server/model/monitor/index.ts new file mode 100644 index 0000000..96d0d48 --- /dev/null +++ b/src/server/model/monitor/index.ts @@ -0,0 +1,170 @@ +import { Monitor } from '@prisma/client'; +import { prisma } from '../_client'; +import { monitorProviders } from './provider'; + +export type MonitorUpsertData = Pick< + Monitor, + 'workspaceId' | 'name' | 'type' | 'interval' +> & { id?: string; active?: boolean; payload: Record }; + +class MonitorManager { + private monitorRunner: Record = {}; + private isStarted = false; + + /** + * create or update + */ + async upsert(data: MonitorUpsertData): Promise { + let monitor: Monitor; + if (data.id) { + // update + monitor = await prisma.monitor.update({ + where: { + id: data.id, + }, + data: { ...data }, + }); + + return monitor; + } else { + // create + monitor = await prisma.monitor.create({ + data: { ...data }, + }); + } + + if (this.monitorRunner[monitor.id]) { + // Stop and remove old + this.monitorRunner[monitor.id].stopMonitor(); + delete this.monitorRunner[monitor.id]; + } + + const runner = (this.monitorRunner[monitor.id] = new MonitorRunner( + monitor + )); + runner.startMonitor(); + + return monitor; + } + + /** + * Get and start all monitors + */ + async startAll() { + if (this.isStarted === true) { + console.warn('MonitorManager.startAll should only call once, skipped.'); + return; + } + + this.isStarted = true; + + const monitors = await prisma.monitor.findMany({ + where: { + active: true, + }, + }); + + Promise.all( + monitors.map(async (m) => { + try { + const runner = new MonitorRunner(m); + this.monitorRunner[m.id] = runner; + await runner.startMonitor(); + } catch (err) { + console.error('Start monitor error:', err); + } + }) + ).then(() => { + console.log('All monitor has been begin.'); + }); + } +} + +class MonitorRunner { + isStopped = false; + timer: NodeJS.Timeout | null = null; + + constructor(public monitor: Monitor) {} + + /** + * Start single monitor + */ + async startMonitor() { + const monitor = this.monitor; + const { type, interval } = monitor; + + const provider = monitorProviders[type]; + if (!provider) { + throw new Error(`Unknown monitor type: ${type}`); + } + + let currentStatus: 'UP' | 'DOWN' = 'UP'; + + const nextAction = () => { + if (this.isStopped === true) { + return; + } + + this.timer = setTimeout(() => { + run(); + }, interval); + }; + + async function run() { + const value = await provider.run(monitor); + + // check event update + if (value < 0 && currentStatus === 'UP') { + await prisma.monitorEvent.create({ + data: { + message: `Monitor ${monitor.name} has been down`, + monitorId: monitor.id, + type: 'DOWN', + }, + }); + } else if (value > 0 && currentStatus === 'DOWN') { + await prisma.monitorEvent.create({ + data: { + message: `Monitor ${monitor.name} has been up`, + monitorId: monitor.id, + type: 'UP', + }, + }); + } + + // insert into data + await prisma.monitorData.create({ + data: { + monitorId: monitor.id, + value, + }, + }); + + // Run next loop + nextAction(); + } + + nextAction(); + + console.log(`Start monitor ${monitor.name}(${monitor.id})`); + } + + stopMonitor() { + const monitor = this.monitor; + + this.isStopped = true; + if (this.timer) { + clearTimeout(this.timer); + this.timer = null; + } + + console.log(`Stop monitor ${monitor.name}(${monitor.id})`); + } + + async restartMonitor() { + this.stopMonitor(); + this.startMonitor(); + } +} + +export const monitorManager = new MonitorManager(); diff --git a/src/server/model/monitor/provider/index.ts b/src/server/model/monitor/provider/index.ts new file mode 100644 index 0000000..21e4502 --- /dev/null +++ b/src/server/model/monitor/provider/index.ts @@ -0,0 +1,6 @@ +import { ping } from './ping'; +import type { MonitorProvider } from './type'; + +export const monitorProviders: Record = { + ping, +}; diff --git a/src/server/model/monitor/provider/ping.ts b/src/server/model/monitor/provider/ping.ts new file mode 100644 index 0000000..571dcf8 --- /dev/null +++ b/src/server/model/monitor/provider/ping.ts @@ -0,0 +1,48 @@ +import { MonitorProvider } from './type'; +import pingUtils from 'ping'; + +export const ping: MonitorProvider = { + run: async (monitor) => { + if (typeof monitor.payload !== 'object') { + throw new Error('monitor.payload should be object'); + } + + const { hostname } = monitor.payload as any; + + const res = await pingAction(hostname); + + if (res === 'unknown') { + return -1; + } + + return res; + }, +}; + +const isWindows = /^win/.test(process.platform); + +function pingAction(hostname: string, packetSize = 56) { + return new Promise((resolve, reject) => { + pingUtils.promise + .probe(hostname, { + min_reply: 1, + deadline: 10, + packetSize, + }) + .then((res) => { + // If ping failed, it will set field to unknown + if (res.alive) { + resolve(res.time); + } else { + if (isWindows) { + reject(new Error(exports.convertToUTF8(res.output))); + } else { + reject(new Error(res.output)); + } + } + }) + .catch((err) => { + reject(err); + }); + }); +} diff --git a/src/server/model/monitor/provider/type.ts b/src/server/model/monitor/provider/type.ts new file mode 100644 index 0000000..26af0fc --- /dev/null +++ b/src/server/model/monitor/provider/type.ts @@ -0,0 +1,5 @@ +import { Monitor } from '@prisma/client'; + +export interface MonitorProvider { + run: (monitor: Monitor) => Promise; +} diff --git a/src/server/trpc/routers/monitor.ts b/src/server/trpc/routers/monitor.ts index 4f53442..1415cae 100644 --- a/src/server/trpc/routers/monitor.ts +++ b/src/server/trpc/routers/monitor.ts @@ -1,6 +1,7 @@ import { router, workspaceOwnerProcedure, workspaceProcedure } from '../trpc'; import { prisma } from '../../model/_client'; import { z } from 'zod'; +import { monitorManager } from '../../model/monitor'; export const monitorRouter = router({ all: workspaceProcedure.query(async ({ input }) => { @@ -38,53 +39,22 @@ export const monitorRouter = router({ type: z.string(), active: z.boolean().default(true), interval: z.number().int().default(20), - maxRetry: z.number().int().default(0), - retryInterval: z.number().int().default(0), payload: z.object({}).passthrough(), }) ) .mutation(async ({ input }) => { - const { + const { id, workspaceId, name, type, active, interval, payload } = input; + + const monitor = await monitorManager.upsert({ id, workspaceId, name, type, active, interval, - maxRetry, - retryInterval, payload, - } = input; + }); - if (id) { - return prisma.monitor.update({ - data: { - name, - type, - active, - interval, - maxRetry, - retryInterval, - payload, - }, - where: { - id, - workspaceId, - }, - }); - } else { - return prisma.monitor.create({ - data: { - workspaceId, - name, - type, - active, - interval, - maxRetry, - retryInterval, - payload, - }, - }); - } + return monitor; }), });