feat: add monitor manager and runner

This commit is contained in:
moonrailgun 2023-10-05 01:56:33 +08:00
parent 07199f0acc
commit a92dd513f1
9 changed files with 265 additions and 45 deletions

View File

@ -48,6 +48,7 @@
"openbadge": "^1.0.4",
"passport": "^0.6.0",
"passport-jwt": "^4.0.1",
"ping": "^0.4.4",
"pretty-ms": "7.0.1",
"react": "^18.2.0",
"react-dom": "^18.2.0",
@ -76,6 +77,7 @@
"@types/nodemailer": "^6.4.11",
"@types/passport": "^1.0.12",
"@types/passport-jwt": "^3.0.9",
"@types/ping": "^0.4.2",
"@types/react": "^18.2.21",
"@types/react-dom": "^18.2.7",
"@types/request-ip": "^0.0.38",

View File

@ -106,6 +106,9 @@ dependencies:
passport-jwt:
specifier: ^4.0.1
version: 4.0.1
ping:
specifier: ^0.4.4
version: 0.4.4
pretty-ms:
specifier: 7.0.1
version: 7.0.1
@ -186,6 +189,9 @@ devDependencies:
'@types/passport-jwt':
specifier: ^3.0.9
version: 3.0.9
'@types/ping':
specifier: ^0.4.2
version: 0.4.2
'@types/react':
specifier: ^18.2.21
version: 18.2.21
@ -2074,6 +2080,10 @@ packages:
'@types/express': 4.17.17
dev: true
/@types/ping@0.4.2:
resolution: {integrity: sha512-5WAzkazMQP7EBDfGtfOV8Tkh7SoXhyo8UYXsR7G7RCRihyM5Ds2zbxgrS+eI0bMqL/GPRO7FcJCK7czoxQmXNw==}
dev: true
/@types/prop-types@15.7.5:
resolution: {integrity: sha512-JCB8C6SnDoQf0cNycqd/35A7MjcnK+ZTqE7judS6o7utxUCg6imJg3QK2qzHKszlTjcj2cn+NwMB2i96ubpj7w==}
@ -4381,6 +4391,11 @@ packages:
engines: {node: '>=0.10.0'}
dev: true
/ping@0.4.4:
resolution: {integrity: sha512-56ZMC0j7SCsMMLdOoUg12VZCfj/+ZO+yfOSjaNCRrmZZr6GLbN2X/Ui56T15dI8NhiHckaw5X2pvyfAomanwqQ==}
engines: {node: '>=4.0.0'}
dev: false
/pirates@4.0.6:
resolution: {integrity: sha512-saLsH7WeYYPiD25LDuLRRY/i+6HaPYr6G1OUlN39otzkSTxKnubR9RTxS3/Kk50s1g2JTgFwWQDQyplC5/SHZg==}
engines: {node: '>= 6'}

View File

@ -123,7 +123,7 @@ model WebsiteEvent {
createdAt DateTime? @default(now()) @db.Timestamptz(6)
eventData WebsiteEventData[]
session WebsiteSession @relation(fields: [sessionId], references: [id],onUpdate: Cascade, onDelete: Cascade)
session WebsiteSession @relation(fields: [sessionId], references: [id], onUpdate: Cascade, onDelete: Cascade)
@@index([createdAt])
@@index([sessionId])
@ -234,8 +234,9 @@ model Monitor {
type String @db.VarChar(100)
active Boolean @default(true) @db.Boolean
interval Int @default(20) @db.Integer
maxRetry Int @default(0) @db.Integer
retryInterval Int @default(0) @db.Integer
// TODO
// maxRetry Int @default(0) @db.Integer
// retryInterval Int @default(0) @db.Integer
payload Json @db.Json
createdAt DateTime? @default(now()) @db.Timestamptz(6)
@ -252,7 +253,7 @@ model MonitorEvent {
id String @id @default(uuid()) @db.Uuid
message String @db.VarChar(500)
monitorId String @db.Uuid
type String @db.VarChar(100) // Up or Down
type String @db.VarChar(100) // UP or DOWN
createdAt DateTime? @default(now()) @db.Timestamptz(6)
monitor Monitor @relation(fields: [monitorId], references: [id], onUpdate: Cascade, onDelete: Cascade)

View File

@ -14,6 +14,7 @@ import { trpcExpressMiddleware } from './trpc';
import { initUdpServer } from './udp/server';
import { createServer } from 'http';
import { initSocketio } from './ws';
import { monitorManager } from './model/monitor';
const port = Number(process.env.PORT || 12345);
@ -24,6 +25,8 @@ initUdpServer(port);
initSocketio(httpServer);
monitorManager.startAll();
app.use(compression());
app.use(express.json());
app.use(passport.initialize());

View File

@ -0,0 +1,170 @@
import { Monitor } from '@prisma/client';
import { prisma } from '../_client';
import { monitorProviders } from './provider';
export type MonitorUpsertData = Pick<
Monitor,
'workspaceId' | 'name' | 'type' | 'interval'
> & { id?: string; active?: boolean; payload: Record<string, any> };
class MonitorManager {
private monitorRunner: Record<string, MonitorRunner> = {};
private isStarted = false;
/**
* create or update
*/
async upsert(data: MonitorUpsertData): Promise<Monitor> {
let monitor: Monitor;
if (data.id) {
// update
monitor = await prisma.monitor.update({
where: {
id: data.id,
},
data: { ...data },
});
return monitor;
} else {
// create
monitor = await prisma.monitor.create({
data: { ...data },
});
}
if (this.monitorRunner[monitor.id]) {
// Stop and remove old
this.monitorRunner[monitor.id].stopMonitor();
delete this.monitorRunner[monitor.id];
}
const runner = (this.monitorRunner[monitor.id] = new MonitorRunner(
monitor
));
runner.startMonitor();
return monitor;
}
/**
* Get and start all monitors
*/
async startAll() {
if (this.isStarted === true) {
console.warn('MonitorManager.startAll should only call once, skipped.');
return;
}
this.isStarted = true;
const monitors = await prisma.monitor.findMany({
where: {
active: true,
},
});
Promise.all(
monitors.map(async (m) => {
try {
const runner = new MonitorRunner(m);
this.monitorRunner[m.id] = runner;
await runner.startMonitor();
} catch (err) {
console.error('Start monitor error:', err);
}
})
).then(() => {
console.log('All monitor has been begin.');
});
}
}
class MonitorRunner {
isStopped = false;
timer: NodeJS.Timeout | null = null;
constructor(public monitor: Monitor) {}
/**
* Start single monitor
*/
async startMonitor() {
const monitor = this.monitor;
const { type, interval } = monitor;
const provider = monitorProviders[type];
if (!provider) {
throw new Error(`Unknown monitor type: ${type}`);
}
let currentStatus: 'UP' | 'DOWN' = 'UP';
const nextAction = () => {
if (this.isStopped === true) {
return;
}
this.timer = setTimeout(() => {
run();
}, interval);
};
async function run() {
const value = await provider.run(monitor);
// check event update
if (value < 0 && currentStatus === 'UP') {
await prisma.monitorEvent.create({
data: {
message: `Monitor ${monitor.name} has been down`,
monitorId: monitor.id,
type: 'DOWN',
},
});
} else if (value > 0 && currentStatus === 'DOWN') {
await prisma.monitorEvent.create({
data: {
message: `Monitor ${monitor.name} has been up`,
monitorId: monitor.id,
type: 'UP',
},
});
}
// insert into data
await prisma.monitorData.create({
data: {
monitorId: monitor.id,
value,
},
});
// Run next loop
nextAction();
}
nextAction();
console.log(`Start monitor ${monitor.name}(${monitor.id})`);
}
stopMonitor() {
const monitor = this.monitor;
this.isStopped = true;
if (this.timer) {
clearTimeout(this.timer);
this.timer = null;
}
console.log(`Stop monitor ${monitor.name}(${monitor.id})`);
}
async restartMonitor() {
this.stopMonitor();
this.startMonitor();
}
}
export const monitorManager = new MonitorManager();

View File

@ -0,0 +1,6 @@
import { ping } from './ping';
import type { MonitorProvider } from './type';
export const monitorProviders: Record<string, MonitorProvider> = {
ping,
};

View File

@ -0,0 +1,48 @@
import { MonitorProvider } from './type';
import pingUtils from 'ping';
export const ping: MonitorProvider = {
run: async (monitor) => {
if (typeof monitor.payload !== 'object') {
throw new Error('monitor.payload should be object');
}
const { hostname } = monitor.payload as any;
const res = await pingAction(hostname);
if (res === 'unknown') {
return -1;
}
return res;
},
};
const isWindows = /^win/.test(process.platform);
function pingAction(hostname: string, packetSize = 56) {
return new Promise<number | 'unknown'>((resolve, reject) => {
pingUtils.promise
.probe(hostname, {
min_reply: 1,
deadline: 10,
packetSize,
})
.then((res) => {
// If ping failed, it will set field to unknown
if (res.alive) {
resolve(res.time);
} else {
if (isWindows) {
reject(new Error(exports.convertToUTF8(res.output)));
} else {
reject(new Error(res.output));
}
}
})
.catch((err) => {
reject(err);
});
});
}

View File

@ -0,0 +1,5 @@
import { Monitor } from '@prisma/client';
export interface MonitorProvider {
run: (monitor: Monitor) => Promise<number>;
}

View File

@ -1,6 +1,7 @@
import { router, workspaceOwnerProcedure, workspaceProcedure } from '../trpc';
import { prisma } from '../../model/_client';
import { z } from 'zod';
import { monitorManager } from '../../model/monitor';
export const monitorRouter = router({
all: workspaceProcedure.query(async ({ input }) => {
@ -38,53 +39,22 @@ export const monitorRouter = router({
type: z.string(),
active: z.boolean().default(true),
interval: z.number().int().default(20),
maxRetry: z.number().int().default(0),
retryInterval: z.number().int().default(0),
payload: z.object({}).passthrough(),
})
)
.mutation(async ({ input }) => {
const {
id,
workspaceId,
name,
type,
active,
interval,
maxRetry,
retryInterval,
payload,
} = input;
const { id, workspaceId, name, type, active, interval, payload } = input;
if (id) {
return prisma.monitor.update({
data: {
name,
type,
active,
interval,
maxRetry,
retryInterval,
payload,
},
where: {
const monitor = await monitorManager.upsert({
id,
workspaceId,
},
});
} else {
return prisma.monitor.create({
data: {
workspaceId,
name,
type,
active,
interval,
maxRetry,
retryInterval,
payload,
},
});
}
return monitor;
}),
});