From 0bd98adf96fde351cda499ca215cedf36d5ec724 Mon Sep 17 00:00:00 2001 From: moonrailgun Date: Tue, 27 Feb 2024 20:36:56 +0800 Subject: [PATCH] feat: add telemetry trpc feature --- src/server/model/_schema/filter.ts | 19 +-- src/server/model/telemetry.ts | 175 ++++++++++++++++++++++++++- src/server/model/website.ts | 49 +++++--- src/server/model/workspace.ts | 9 +- src/server/trpc/routers/telemetry.ts | 165 ++++++++++++++++++++++++- src/server/trpc/routers/website.ts | 21 ++-- src/server/utils/prisma.ts | 81 +++++++++++-- 7 files changed, 465 insertions(+), 54 deletions(-) diff --git a/src/server/model/_schema/filter.ts b/src/server/model/_schema/filter.ts index f21810f..2cce966 100644 --- a/src/server/model/_schema/filter.ts +++ b/src/server/model/_schema/filter.ts @@ -1,18 +1,23 @@ import { z } from 'zod'; -export const websiteFilterSchema = z.object({ - timezone: z.string(), +export const baseFilterSchema = z.object({ url: z.string(), - referrer: z.string(), - title: z.string(), - os: z.string(), - browser: z.string(), - device: z.string(), country: z.string(), region: z.string(), city: z.string(), }); +export const websiteFilterSchema = baseFilterSchema.merge( + z.object({ + timezone: z.string(), + referrer: z.string(), + title: z.string(), + os: z.string(), + browser: z.string(), + device: z.string(), + }) +); + const websiteStatsItemType = z.object({ value: z.number(), prev: z.number(), diff --git a/src/server/model/telemetry.ts b/src/server/model/telemetry.ts index d4baaec..091ff5c 100644 --- a/src/server/model/telemetry.ts +++ b/src/server/model/telemetry.ts @@ -1,8 +1,15 @@ -import { TelemetrySession } from '@prisma/client'; +import { Prisma, Telemetry, TelemetrySession } from '@prisma/client'; import { Request } from 'express'; import { hashUuid } from '../utils/common'; import { getRequestInfo } from '../utils/detect'; import { prisma } from './_client'; +import { + BaseQueryFilters, + getDateQuery, + getTimestampIntervalQuery, + parseTelemetryFilters, +} from '../utils/prisma'; +import { SESSION_COLUMNS } from '../utils/const'; export async function recordTelemetryEvent(req: Request) { const { url = req.headers.referer, name, ...others } = req.query; @@ -135,3 +142,169 @@ async function loadSession( return session; } + +export async function loadTelemetry( + telemetryId: string +): Promise { + const telemetry = await prisma.telemetry.findUnique({ + where: { + id: telemetryId, + }, + }); + + if (!telemetry || telemetry.deletedAt) { + return null; + } + + return telemetry; +} + +export async function getTelemetryPageview( + telemetryId: string, + filters: BaseQueryFilters +) { + const { timezone = 'utc', unit = 'day' } = filters; + const { filterQuery, joinSession, params } = await parseTelemetryFilters( + telemetryId, + { + ...filters, + } + ); + + return prisma.$queryRaw` + select + ${getDateQuery('"TelemetryEvent"."createdAt"', unit, timezone)} x, + count(1) y + from "TelemetryEvent" + ${joinSession} + where "TelemetryEvent"."telemetryId" = ${params.telemetryId} + and "TelemetryEvent"."createdAt" between ${ + params.startDate + }::timestamptz and ${params.endDate}::timestamptz + ${filterQuery} + group by 1 + `; +} + +export async function getTelemetrySession( + telemetryId: string, + filters: BaseQueryFilters +) { + const { timezone = 'utc', unit = 'day' } = filters; + const { filterQuery, joinSession, params } = await parseTelemetryFilters( + telemetryId, + { + ...filters, + } + ); + + return prisma.$queryRaw` + select + ${getDateQuery('"TelemetryEvent"."createdAt"', unit, timezone)} x, + count(distinct "TelemetryEvent"."sessionId") y + from "TelemetryEvent" + ${joinSession} + where "TelemetryEvent"."telemetryId" = ${params.telemetryId} + and "TelemetryEvent"."createdAt" between ${ + params.startDate + }::timestamptz and ${params.endDate}::timestamptz + ${filterQuery} + group by 1 + `; +} + +export async function getTelemetryStats( + telemetryId: string, + filters: BaseQueryFilters +): Promise { + const { filterQuery, joinSession, params } = await parseTelemetryFilters( + telemetryId, + { + ...filters, + } + ); + + return prisma.$queryRaw` + select + sum(t.c) as "pageviews", + count(distinct t."sessionId") as "uniques" + from ( + select + "TelemetryEvent"."sessionId", + ${getDateQuery('"TelemetryEvent"."createdAt"', 'hour')} + from "TelemetryEvent" + join "Telemetry" + on "TelemetryEvent"."telemetryId" = "Telemetry"."id" + ${joinSession} + where "Telemetry"."id" = ${params.telemetryId} + and "TelemetryEvent"."createdAt" between ${ + params.startDate + }::timestamptz and ${params.endDate}::timestamptz + ${filterQuery} + group by 1, 2 + ) as t + `; +} + +export async function getTelemetrySessionMetrics( + telemetryId: string, + column: string, + filters: BaseQueryFilters +): Promise<{ x: string; y: number }[]> { + const { filterQuery, joinSession, params } = await parseTelemetryFilters( + telemetryId, + { + ...filters, + }, + { + joinSession: SESSION_COLUMNS.includes(column), + } + ); + const includeCountry = column === 'city' || column === 'subdivision1'; + + return prisma.$queryRaw`select + ${Prisma.sql([`"${column}"`])} x, + count(distinct "TelemetryEvent"."sessionId") y + ${includeCountry ? Prisma.sql([', country']) : Prisma.empty} + from "TelemetryEvent" + ${joinSession} + where "TelemetryEvent"."telemetryId" = ${telemetryId} + and "TelemetryEvent"."createdAt" + between ${params.startDate}::timestamptz and ${ + params.endDate + }::timestamptz + ${filterQuery} + group by 1 + ${includeCountry ? Prisma.sql([', 3']) : Prisma.empty} + order by 2 desc + limit 100`; +} + +export async function getTelemetryPageviewMetrics( + telemetryId: string, + column: string, + filters: BaseQueryFilters +): Promise<{ x: string; y: number }[]> { + const { filterQuery, joinSession, params } = await parseTelemetryFilters( + telemetryId, + { + ...filters, + }, + { joinSession: SESSION_COLUMNS.includes(column) } + ); + + return prisma.$queryRaw` + select ${Prisma.sql([`"${column}"`])} x, count(*) y + from "TelemetryEvent" + ${joinSession} + where "TelemetryEvent"."telemetryId" = ${telemetryId} + and "TelemetryEvent"."createdAt" + between ${params.startDate}::timestamptz and ${ + params.endDate + }::timestamptz + ${filterQuery} + group by 1 + order by 2 desc + limit 100 + `; +} diff --git a/src/server/model/website.ts b/src/server/model/website.ts index 530dd00..fea2300 100644 --- a/src/server/model/website.ts +++ b/src/server/model/website.ts @@ -13,10 +13,10 @@ import { import type { DynamicData } from '../utils/types'; import dayjs from 'dayjs'; import { - QueryFilters, + WebsiteQueryFilters, getDateQuery, getTimestampIntervalQuery, - parseFilters, + parseWebsiteFilters, } from '../utils/prisma'; export interface WebsiteEventPayload { @@ -277,12 +277,12 @@ export async function getWebsiteOnlineUserCount( return Number(res?.[0].x ?? 0); } -export async function getSessionMetrics( +export async function getWebsiteSessionMetrics( websiteId: string, column: string, - filters: QueryFilters + filters: WebsiteQueryFilters ): Promise<{ x: string; y: number }[]> { - const { filterQuery, joinSession, params } = await parseFilters( + const { filterQuery, joinSession, params } = await parseWebsiteFilters( websiteId, { ...filters, @@ -312,14 +312,14 @@ export async function getSessionMetrics( limit 100`; } -export async function getPageviewMetrics( +export async function getWebsitePageviewMetrics( websiteId: string, column: string, - filters: QueryFilters + filters: WebsiteQueryFilters ): Promise<{ x: string; y: number }[]> { const eventType = column === 'eventName' ? EVENT_TYPE.customEvent : EVENT_TYPE.pageView; - const { filterQuery, joinSession, params } = await parseFilters( + const { filterQuery, joinSession, params } = await parseWebsiteFilters( websiteId, { ...filters, @@ -352,12 +352,15 @@ export async function getPageviewMetrics( export async function getWorkspaceWebsitePageview( websiteId: string, - filters: QueryFilters + filters: WebsiteQueryFilters ) { const { timezone = 'utc', unit = 'day' } = filters; - const { filterQuery, joinSession, params } = await parseFilters(websiteId, { - ...filters, - }); + const { filterQuery, joinSession, params } = await parseWebsiteFilters( + websiteId, + { + ...filters, + } + ); return prisma.$queryRaw` select @@ -377,12 +380,15 @@ export async function getWorkspaceWebsitePageview( export async function getWorkspaceWebsiteSession( websiteId: string, - filters: QueryFilters + filters: WebsiteQueryFilters ) { const { timezone = 'utc', unit = 'day' } = filters; - const { filterQuery, joinSession, params } = await parseFilters(websiteId, { - ...filters, - }); + const { filterQuery, joinSession, params } = await parseWebsiteFilters( + websiteId, + { + ...filters, + } + ); return prisma.$queryRaw` select @@ -402,11 +408,14 @@ export async function getWorkspaceWebsiteSession( export async function getWorkspaceWebsiteStats( websiteId: string, - filters: QueryFilters + filters: WebsiteQueryFilters ): Promise { - const { filterQuery, joinSession, params } = await parseFilters(websiteId, { - ...filters, - }); + const { filterQuery, joinSession, params } = await parseWebsiteFilters( + websiteId, + { + ...filters, + } + ); return prisma.$queryRaw` select diff --git a/src/server/model/workspace.ts b/src/server/model/workspace.ts index e792849..9b22885 100644 --- a/src/server/model/workspace.ts +++ b/src/server/model/workspace.ts @@ -1,10 +1,5 @@ import { prisma } from './_client'; -import { - QueryFilters, - parseFilters, - getDateQuery, - getTimestampIntervalQuery, -} from '../utils/prisma'; +import { parseWebsiteFilters } from '../utils/prisma'; import { DEFAULT_RESET_DATE, EVENT_TYPE } from '../utils/const'; export async function getWorkspaceUser(workspaceId: string, userId: string) { @@ -67,7 +62,7 @@ export async function deleteWorkspaceWebsite( } export async function getWorkspaceWebsiteDateRange(websiteId: string) { - const { params } = await parseFilters(websiteId, { + const { params } = await parseWebsiteFilters(websiteId, { startDate: new Date(DEFAULT_RESET_DATE), }); diff --git a/src/server/trpc/routers/telemetry.ts b/src/server/trpc/routers/telemetry.ts index 917ab0e..ed4e16d 100644 --- a/src/server/trpc/routers/telemetry.ts +++ b/src/server/trpc/routers/telemetry.ts @@ -5,10 +5,23 @@ import { workspaceOwnerProcedure, workspaceProcedure, } from '../trpc'; -import { OPENAPI_TAG } from '../../utils/const'; +import { + EVENT_COLUMNS, + FILTER_COLUMNS, + OPENAPI_TAG, + SESSION_COLUMNS, +} from '../../utils/const'; import { prisma } from '../../model/_client'; import { TelemetryModelSchema } from '../../prisma/zod'; import { OpenApiMeta } from 'trpc-openapi'; +import { baseFilterSchema } from '../../model/_schema/filter'; +import { + getTelemetryPageview, + getTelemetryPageviewMetrics, + getTelemetrySession, + getTelemetrySessionMetrics, +} from '../../model/telemetry'; +import { BaseQueryFilters } from '../../utils/prisma'; export const telemetryRouter = router({ all: workspaceProcedure @@ -94,6 +107,156 @@ export const telemetryRouter = router({ }); } }), + pageviews: workspaceProcedure + .meta( + buildTelemetryOpenapi({ + method: 'GET', + path: '/pageviews', + }) + ) + .input( + z + .object({ + telemetryId: z.string(), + startAt: z.number(), + endAt: z.number(), + unit: z.string().optional(), + }) + .merge(baseFilterSchema.partial()) + ) + .output(z.object({ pageviews: z.any(), sessions: z.any() })) + .query(async ({ input }) => { + const { telemetryId, startAt, endAt, url, country, region, city } = input; + + const startDate = new Date(startAt); + const endDate = new Date(endAt); + + // const { startDate, endDate, unit } = await parseDateRange({ + // websiteId, + // startAt: Number(startAt), + // endAt: Number(endAt), + // unit: String(input.unit), + // }); + + const filters = { + startDate, + endDate, + unit: input.unit, + url, + country, + region, + city, + }; + + const [pageviews, sessions] = await Promise.all([ + getTelemetryPageview(telemetryId, filters as BaseQueryFilters), + getTelemetrySession(telemetryId, filters as BaseQueryFilters), + ]); + + return { + pageviews, + sessions, + }; + }), + metrics: workspaceProcedure + .meta( + buildTelemetryOpenapi({ + method: 'GET', + path: '/metrics', + }) + ) + .input( + z + .object({ + websiteId: z.string(), + type: z.enum([ + 'url', + 'language', + 'referrer', + 'browser', + 'os', + 'device', + 'country', + 'event', + ]), + startAt: z.number(), + endAt: z.number(), + }) + .merge(baseFilterSchema.partial()) + ) + .output( + z.array( + z.object({ + x: z.string().nullable(), + y: z.number(), + }) + ) + ) + .query(async ({ input }) => { + const { websiteId, type, startAt, endAt, url, country, region, city } = + input; + + const startDate = new Date(startAt); + const endDate = new Date(endAt); + + // const { startDate, endDate } = await parseDateRange({ + // websiteId, + // startAt, + // endAt, + // }); + + const filters = { + startDate, + endDate, + url, + country, + region, + city, + }; + + const column = FILTER_COLUMNS[type] || type; + + if (SESSION_COLUMNS.includes(type)) { + const data = await getTelemetrySessionMetrics( + websiteId, + column, + filters + ); + + if (type === 'language') { + const combined: Record = {}; + + for (const { x, y } of data) { + const key = String(x).toLowerCase().split('-')[0]; + + if (combined[key] === undefined) { + combined[key] = { x: key, y }; + } else { + combined[key].y += y; + } + } + + return Object.values(combined).map((d) => ({ + x: d.x, + y: Number(d.y), + })); + } + + return data.map((d) => ({ x: d.x, y: Number(d.y) })); + } + + if (EVENT_COLUMNS.includes(type)) { + const data = await getTelemetryPageviewMetrics( + websiteId, + column, + filters + ); + + return data.map((d) => ({ x: d.x, y: Number(d.y) })); + } + + return []; + }), }); function buildTelemetryOpenapi(meta: OpenApiMetaInfo): OpenApiMeta { diff --git a/src/server/trpc/routers/website.ts b/src/server/trpc/routers/website.ts index bf8c027..6b4c7c4 100644 --- a/src/server/trpc/routers/website.ts +++ b/src/server/trpc/routers/website.ts @@ -19,7 +19,10 @@ import { SESSION_COLUMNS, } from '../../utils/const'; import { parseDateRange } from '../../utils/common'; -import { getSessionMetrics, getPageviewMetrics } from '../../model/website'; +import { + getWebsiteSessionMetrics, + getWebsitePageviewMetrics, +} from '../../model/website'; import { websiteInfoSchema } from '../../model/_schema'; import { OpenApiMeta } from 'trpc-openapi'; import { hostnameRegex } from '@tianji/shared'; @@ -28,7 +31,7 @@ import { websiteStatsSchema, } from '../../model/_schema/filter'; import dayjs from 'dayjs'; -import { QueryFilters } from '../../utils/prisma'; +import { WebsiteQueryFilters } from '../../utils/prisma'; const websiteNameSchema = z.string().max(100); const websiteDomainSchema = z.union([ @@ -163,7 +166,7 @@ export const websiteRouter = router({ country, region, city, - } as QueryFilters; + } as WebsiteQueryFilters; const [metrics, prevPeriod] = await Promise.all([ getWorkspaceWebsiteStats(websiteId, { @@ -301,8 +304,8 @@ export const websiteRouter = router({ }; const [pageviews, sessions] = await Promise.all([ - getWorkspaceWebsitePageview(websiteId, filters as QueryFilters), - getWorkspaceWebsiteSession(websiteId, filters as QueryFilters), + getWorkspaceWebsitePageview(websiteId, filters as WebsiteQueryFilters), + getWorkspaceWebsiteSession(websiteId, filters as WebsiteQueryFilters), ]); return { @@ -397,7 +400,7 @@ export const websiteRouter = router({ const column = FILTER_COLUMNS[type] || type; if (SESSION_COLUMNS.includes(type)) { - const data = await getSessionMetrics(websiteId, column, filters); + const data = await getWebsiteSessionMetrics(websiteId, column, filters); if (type === 'language') { const combined: Record = {}; @@ -422,7 +425,11 @@ export const websiteRouter = router({ } if (EVENT_COLUMNS.includes(type)) { - const data = await getPageviewMetrics(websiteId, column, filters); + const data = await getWebsitePageviewMetrics( + websiteId, + column, + filters + ); return data.map((d) => ({ x: d.x, y: Number(d.y) })); } diff --git a/src/server/utils/prisma.ts b/src/server/utils/prisma.ts index 463e2d5..0aab955 100644 --- a/src/server/utils/prisma.ts +++ b/src/server/utils/prisma.ts @@ -4,6 +4,7 @@ import _ from 'lodash'; import { loadWebsite } from '../model/website'; import { maxDate } from './common'; import { FILTER_COLUMNS, OPERATORS, SESSION_COLUMNS } from './const'; +import { loadTelemetry } from '../model/telemetry'; const POSTGRESQL_DATE_FORMATS = { minute: 'YYYY-MM-DD HH24:MI:00', @@ -13,22 +14,25 @@ const POSTGRESQL_DATE_FORMATS = { year: 'YYYY-01-01', }; -export interface QueryFilters { +export interface BaseQueryFilters { startDate?: Date; endDate?: Date; timezone?: string; unit?: keyof typeof POSTGRESQL_DATE_FORMATS; - eventType?: number; url?: string; + country?: string; + region?: string; + city?: string; +} + +export interface WebsiteQueryFilters extends BaseQueryFilters { + eventType?: number; referrer?: string; title?: string; query?: string; os?: string; browser?: string; device?: string; - country?: string; - region?: string; - city?: string; language?: string; event?: string; } @@ -38,9 +42,9 @@ export interface QueryOptions { columns?: { [key: string]: string }; } -export async function parseFilters( +export async function parseWebsiteFilters( websiteId: string, - filters: QueryFilters = {}, + filters: WebsiteQueryFilters = {}, options: QueryOptions = {} ) { const website = await loadWebsite(websiteId); @@ -62,7 +66,7 @@ export async function parseFilters( `inner join "WebsiteSession" on "WebsiteEvent"."sessionId" = "WebsiteSession"."id"`, ]) : Prisma.empty, - filterQuery: getFilterQuery(filters, options, websiteDomain), + filterQuery: getWebsiteFilterQuery(filters, options, websiteDomain), params: { ...normalizeFilters(filters), websiteId, @@ -77,6 +81,40 @@ export async function parseFilters( }; } +export async function parseTelemetryFilters( + telemetryId: string, + filters: BaseQueryFilters = {}, + options: QueryOptions = {} +) { + const telemetry = await loadTelemetry(telemetryId); + + if (!telemetry) { + throw new Error('Not found telemetry'); + } + + return { + joinSession: + options?.joinSession || + Object.entries(filters).find( + ([key, value]) => + typeof value !== 'undefined' && SESSION_COLUMNS.includes(key) + ) + ? Prisma.sql([ + `inner join "WebsiteSession" on "WebsiteEvent"."sessionId" = "WebsiteSession"."id"`, + ]) + : Prisma.empty, + filterQuery: getTelemetryFilterQuery(filters, options), + params: { + ...normalizeFilters(filters), + telemetryId, + startDate: dayjs(filters.startDate).toISOString(), + endDate: filters.endDate + ? dayjs(filters.endDate).toISOString() + : undefined, + }, + }; +} + function normalizeFilters(filters: Record = {}) { return Object.keys(filters).reduce((obj, key) => { const value = filters[key]; @@ -87,13 +125,13 @@ function normalizeFilters(filters: Record = {}) { }, {} as Record); } -export function getFilterQuery( - filters: QueryFilters = {}, +export function getWebsiteFilterQuery( + filters: WebsiteQueryFilters = {}, options: QueryOptions = {}, websiteDomain: string | null = null ) { const query = Object.keys(filters).reduce((arr, name) => { - const value: any = filters[name as keyof QueryFilters]; + const value: any = filters[name as keyof WebsiteQueryFilters]; const operator = value?.filter ?? OPERATORS.equals; const column = _.get(FILTER_COLUMNS, name, options?.columns?.[name]); @@ -115,6 +153,27 @@ export function getFilterQuery( return Prisma.sql([query.join('\n')]); } +export function getTelemetryFilterQuery( + filters: BaseQueryFilters = {}, + options: QueryOptions = {} +) { + const query = Object.keys(filters).reduce((arr, name) => { + const value: any = filters[name as keyof BaseQueryFilters]; + const operator = value?.filter ?? OPERATORS.equals; + const column = _.get(FILTER_COLUMNS, name, options?.columns?.[name]); + + // TODO + + if (value !== undefined && column) { + arr.push(`AND ${mapFilter(column, operator, name)}`); + } + + return arr; + }, []); + + return Prisma.sql([query.join('\n')]); +} + function mapFilter( column: string, operator: (typeof OPERATORS)[keyof typeof OPERATORS],