diff --git a/reporter/main.go b/reporter/main.go index 1d1d8e7..91663d2 100644 --- a/reporter/main.go +++ b/reporter/main.go @@ -1,10 +1,13 @@ package main import ( + "bytes" "flag" + "fmt" jsoniter "github.com/json-iterator/go" "log" "net" + "net/http" "net/url" "os" "tianji-reporter/utils" @@ -20,6 +23,7 @@ type ReportData struct { } var ( + Mode = flag.String("mode", "http", "The send mode of report data, you can select: `http` or `udp`, default is `http`") Url = flag.String("url", "", "The http url of tianji, for example: https://tianji.msgbyte.com") WorkspaceId = flag.String("workspace", "", "The workspace id for tianji, this should be a uuid") Name = flag.String("name", "", "The identification name for this machine") @@ -27,6 +31,8 @@ var ( IsVnstat = flag.Bool("vnstat", false, "Use vnstat for traffic statistics, linux only") ) +var version = "1.0.0" + func main() { flag.Parse() @@ -56,21 +62,34 @@ func main() { ticker := time.Tick(time.Duration(interval) * time.Second) + log.Println("Start reporting...") + log.Println("Mode:", *Mode) + log.Println("Version:", version) + for { log.Println("Send report data to:", parsedURL.String()) - sendUDPTestPack(*parsedURL, ReportData{ + payload := ReportData{ WorkspaceId: *WorkspaceId, Name: name, Hostname: hostname, Timeout: interval * 2, Payload: utils.GetReportDataPaylod(interval, *IsVnstat), - }) + } + + if *Mode == "udp" { + sendUDPPack(*parsedURL, payload) + } else { + sendHTTPRequest(*parsedURL, payload) + } <-ticker } } -func sendUDPTestPack(url url.URL, payload ReportData) { +/** + * Send UDP Pack to report server data + */ +func sendUDPPack(url url.URL, payload ReportData) { // parse target url addr, err := net.ResolveUDPAddr("udp", url.Hostname()+":"+url.Port()) if err != nil { @@ -103,3 +122,49 @@ func sendUDPTestPack(url url.URL, payload ReportData) { log.Println("Message sent successfully!") } + +/** + * Send HTTP Request to report server data + */ +func sendHTTPRequest(_url url.URL, payload ReportData) { + jsonData, err := jsoniter.Marshal(payload) + if err != nil { + log.Println("Error encoding JSON:", err) + return + } + log.Printf("[Report] %s\n", jsonData) + + reportUrl, err := url.JoinPath(_url.String(), "/serverStatus/report") + if err != nil { + log.Println("Join url error:", err) + return + } + + req, err := http.NewRequest("POST", reportUrl, bytes.NewBuffer(jsonData)) + if err != nil { + log.Println("Create request error:", err) + return + } + + req.Header.Set("Content-Type", "application/json") + req.Header.Set("x-tianji-report-version", version) + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + fmt.Println("Send request error:", err) + return + } + + defer resp.Body.Close() + + // Read response + body := new(bytes.Buffer) + _, err = body.ReadFrom(resp.Body) + if err != nil { + fmt.Println("Read response error:", err) + return + } + + log.Println("Response:", body) +} diff --git a/src/server/main.ts b/src/server/main.ts index f30caf5..40de856 100644 --- a/src/server/main.ts +++ b/src/server/main.ts @@ -22,6 +22,7 @@ import { monitorManager } from './model/monitor'; import { settings } from './utils/settings'; import { env } from './utils/env'; import cors from 'cors'; +import { serverStatusRouter } from './router/serverStatus'; const port = settings.port; @@ -53,13 +54,15 @@ app.use( app.use('/api/website', websiteRouter); app.use('/api/workspace', workspaceRouter); app.use('/telemetry', telemetryRouter); +app.use('/serverStatus', serverStatusRouter); + +app.use('/trpc', trpcExpressMiddleware); if (env.allowOpenapi) { app.use('/open/_ui', swaggerUI.serve, swaggerUI.setup(trpcOpenapiDocument)); app.use('/open/_document', (req, res) => res.send(trpcOpenapiDocument)); app.use('/open', trpcOpenapiHttpHandler); } -app.use('/trpc', trpcExpressMiddleware); app.use((err: any, req: any, res: any, next: any) => { console.error(err); diff --git a/src/server/model/serverStatus.ts b/src/server/model/serverStatus.ts new file mode 100644 index 0000000..0371f00 --- /dev/null +++ b/src/server/model/serverStatus.ts @@ -0,0 +1,49 @@ +import { ServerStatusInfo } from '../../types'; +import { createSubscribeInitializer, subscribeEventBus } from '../ws/shared'; + +const serverMap: Record< + string, // workspaceId + Record< + string, // nodeName or hostname + ServerStatusInfo + > +> = {}; + +createSubscribeInitializer('onServerStatusUpdate', (workspaceId) => { + if (!serverMap[workspaceId]) { + serverMap[workspaceId] = {}; + } + + return serverMap[workspaceId]; +}); + +export function recordServerStatus(info: ServerStatusInfo) { + const { workspaceId, name, hostname, timeout, payload } = info; + + if (!workspaceId || !name || !hostname) { + console.warn( + '[ServerStatus] lost some necessary params, request will be ignore', + info + ); + return; + } + + if (!serverMap[workspaceId]) { + serverMap[workspaceId] = {}; + } + + serverMap[workspaceId][name || hostname] = { + workspaceId, + name, + hostname, + timeout, + updatedAt: Date.now(), + payload, + }; + + subscribeEventBus.emit( + 'onServerStatusUpdate', + workspaceId, + serverMap[workspaceId] + ); +} diff --git a/src/server/router/serverStatus.ts b/src/server/router/serverStatus.ts new file mode 100644 index 0000000..e899b3f --- /dev/null +++ b/src/server/router/serverStatus.ts @@ -0,0 +1,24 @@ +import { Router } from 'express'; +import { body, header, validate } from '../middleware/validate'; +import { recordServerStatus } from '../model/serverStatus'; + +export const serverStatusRouter = Router(); + +serverStatusRouter.post( + '/report', + validate( + header('x-tianji-report-version').isSemVer(), + body('workspaceId').isString(), + body('name').isString(), + body('hostname').isString(), + body('timeout').optional().isInt(), + body('payload').isObject() + ), + async (req, res) => { + const body = req.body; + + recordServerStatus(body); + + res.send('success'); + } +); diff --git a/src/server/udp/server.ts b/src/server/udp/server.ts index 6169d85..54c6402 100644 --- a/src/server/udp/server.ts +++ b/src/server/udp/server.ts @@ -1,22 +1,5 @@ import dgram from 'dgram'; -import type { ServerStatusInfo } from '../../types'; -import { createSubscribeInitializer, subscribeEventBus } from '../ws/shared'; - -const serverMap: Record< - string, // workspaceId - Record< - string, // nodeName or hostname - ServerStatusInfo - > -> = {}; - -createSubscribeInitializer('onServerStatusUpdate', (workspaceId) => { - if (!serverMap[workspaceId]) { - serverMap[workspaceId] = {}; - } - - return serverMap[workspaceId]; -}); +import { recordServerStatus } from '../model/serverStatus'; export function initUdpServer(port: number) { const server = dgram.createSocket('udp4'); @@ -30,35 +13,10 @@ export function initUdpServer(port: number) { try { const raw = String(msg); const json = JSON.parse(String(msg)); - const { workspaceId, name, hostname, timeout, payload } = json; - if (!workspaceId || !name || !hostname) { - console.warn( - '[UDP] lost some necessary params, request will be ignore', - json - ); - } + console.log('[UDP] recevice tianji report:', raw, 'info', rinfo); - console.log('recevice tianji report:', raw, 'info', rinfo); - - if (!serverMap[workspaceId]) { - serverMap[workspaceId] = {}; - } - - serverMap[workspaceId][name || hostname] = { - workspaceId, - name, - hostname, - timeout, - updatedAt: Date.now(), - payload, - }; - - subscribeEventBus.emit( - 'onServerStatusUpdate', - workspaceId, - serverMap[workspaceId] - ); + recordServerStatus(json); } catch (err) {} });