feat: add telemetry trpc feature

This commit is contained in:
moonrailgun 2024-02-27 20:36:56 +08:00
parent b50de6b4e0
commit 0bd98adf96
7 changed files with 465 additions and 54 deletions

View File

@ -1,17 +1,22 @@
import { z } from 'zod';
export const websiteFilterSchema = z.object({
timezone: z.string(),
export const baseFilterSchema = z.object({
url: z.string(),
country: z.string(),
region: z.string(),
city: z.string(),
});
export const websiteFilterSchema = baseFilterSchema.merge(
z.object({
timezone: z.string(),
referrer: z.string(),
title: z.string(),
os: z.string(),
browser: z.string(),
device: z.string(),
country: z.string(),
region: z.string(),
city: z.string(),
});
})
);
const websiteStatsItemType = z.object({
value: z.number(),

View File

@ -1,8 +1,15 @@
import { TelemetrySession } from '@prisma/client';
import { Prisma, Telemetry, TelemetrySession } from '@prisma/client';
import { Request } from 'express';
import { hashUuid } from '../utils/common';
import { getRequestInfo } from '../utils/detect';
import { prisma } from './_client';
import {
BaseQueryFilters,
getDateQuery,
getTimestampIntervalQuery,
parseTelemetryFilters,
} from '../utils/prisma';
import { SESSION_COLUMNS } from '../utils/const';
export async function recordTelemetryEvent(req: Request) {
const { url = req.headers.referer, name, ...others } = req.query;
@ -135,3 +142,169 @@ async function loadSession(
return session;
}
export async function loadTelemetry(
telemetryId: string
): Promise<Telemetry | null> {
const telemetry = await prisma.telemetry.findUnique({
where: {
id: telemetryId,
},
});
if (!telemetry || telemetry.deletedAt) {
return null;
}
return telemetry;
}
export async function getTelemetryPageview(
telemetryId: string,
filters: BaseQueryFilters
) {
const { timezone = 'utc', unit = 'day' } = filters;
const { filterQuery, joinSession, params } = await parseTelemetryFilters(
telemetryId,
{
...filters,
}
);
return prisma.$queryRaw`
select
${getDateQuery('"TelemetryEvent"."createdAt"', unit, timezone)} x,
count(1) y
from "TelemetryEvent"
${joinSession}
where "TelemetryEvent"."telemetryId" = ${params.telemetryId}
and "TelemetryEvent"."createdAt" between ${
params.startDate
}::timestamptz and ${params.endDate}::timestamptz
${filterQuery}
group by 1
`;
}
export async function getTelemetrySession(
telemetryId: string,
filters: BaseQueryFilters
) {
const { timezone = 'utc', unit = 'day' } = filters;
const { filterQuery, joinSession, params } = await parseTelemetryFilters(
telemetryId,
{
...filters,
}
);
return prisma.$queryRaw`
select
${getDateQuery('"TelemetryEvent"."createdAt"', unit, timezone)} x,
count(distinct "TelemetryEvent"."sessionId") y
from "TelemetryEvent"
${joinSession}
where "TelemetryEvent"."telemetryId" = ${params.telemetryId}
and "TelemetryEvent"."createdAt" between ${
params.startDate
}::timestamptz and ${params.endDate}::timestamptz
${filterQuery}
group by 1
`;
}
export async function getTelemetryStats(
telemetryId: string,
filters: BaseQueryFilters
): Promise<any> {
const { filterQuery, joinSession, params } = await parseTelemetryFilters(
telemetryId,
{
...filters,
}
);
return prisma.$queryRaw`
select
sum(t.c) as "pageviews",
count(distinct t."sessionId") as "uniques"
from (
select
"TelemetryEvent"."sessionId",
${getDateQuery('"TelemetryEvent"."createdAt"', 'hour')}
from "TelemetryEvent"
join "Telemetry"
on "TelemetryEvent"."telemetryId" = "Telemetry"."id"
${joinSession}
where "Telemetry"."id" = ${params.telemetryId}
and "TelemetryEvent"."createdAt" between ${
params.startDate
}::timestamptz and ${params.endDate}::timestamptz
${filterQuery}
group by 1, 2
) as t
`;
}
export async function getTelemetrySessionMetrics(
telemetryId: string,
column: string,
filters: BaseQueryFilters
): Promise<{ x: string; y: number }[]> {
const { filterQuery, joinSession, params } = await parseTelemetryFilters(
telemetryId,
{
...filters,
},
{
joinSession: SESSION_COLUMNS.includes(column),
}
);
const includeCountry = column === 'city' || column === 'subdivision1';
return prisma.$queryRaw`select
${Prisma.sql([`"${column}"`])} x,
count(distinct "TelemetryEvent"."sessionId") y
${includeCountry ? Prisma.sql([', country']) : Prisma.empty}
from "TelemetryEvent"
${joinSession}
where "TelemetryEvent"."telemetryId" = ${telemetryId}
and "TelemetryEvent"."createdAt"
between ${params.startDate}::timestamptz and ${
params.endDate
}::timestamptz
${filterQuery}
group by 1
${includeCountry ? Prisma.sql([', 3']) : Prisma.empty}
order by 2 desc
limit 100`;
}
export async function getTelemetryPageviewMetrics(
telemetryId: string,
column: string,
filters: BaseQueryFilters
): Promise<{ x: string; y: number }[]> {
const { filterQuery, joinSession, params } = await parseTelemetryFilters(
telemetryId,
{
...filters,
},
{ joinSession: SESSION_COLUMNS.includes(column) }
);
return prisma.$queryRaw`
select ${Prisma.sql([`"${column}"`])} x, count(*) y
from "TelemetryEvent"
${joinSession}
where "TelemetryEvent"."telemetryId" = ${telemetryId}
and "TelemetryEvent"."createdAt"
between ${params.startDate}::timestamptz and ${
params.endDate
}::timestamptz
${filterQuery}
group by 1
order by 2 desc
limit 100
`;
}

View File

@ -13,10 +13,10 @@ import {
import type { DynamicData } from '../utils/types';
import dayjs from 'dayjs';
import {
QueryFilters,
WebsiteQueryFilters,
getDateQuery,
getTimestampIntervalQuery,
parseFilters,
parseWebsiteFilters,
} from '../utils/prisma';
export interface WebsiteEventPayload {
@ -277,12 +277,12 @@ export async function getWebsiteOnlineUserCount(
return Number(res?.[0].x ?? 0);
}
export async function getSessionMetrics(
export async function getWebsiteSessionMetrics(
websiteId: string,
column: string,
filters: QueryFilters
filters: WebsiteQueryFilters
): Promise<{ x: string; y: number }[]> {
const { filterQuery, joinSession, params } = await parseFilters(
const { filterQuery, joinSession, params } = await parseWebsiteFilters(
websiteId,
{
...filters,
@ -312,14 +312,14 @@ export async function getSessionMetrics(
limit 100`;
}
export async function getPageviewMetrics(
export async function getWebsitePageviewMetrics(
websiteId: string,
column: string,
filters: QueryFilters
filters: WebsiteQueryFilters
): Promise<{ x: string; y: number }[]> {
const eventType =
column === 'eventName' ? EVENT_TYPE.customEvent : EVENT_TYPE.pageView;
const { filterQuery, joinSession, params } = await parseFilters(
const { filterQuery, joinSession, params } = await parseWebsiteFilters(
websiteId,
{
...filters,
@ -352,12 +352,15 @@ export async function getPageviewMetrics(
export async function getWorkspaceWebsitePageview(
websiteId: string,
filters: QueryFilters
filters: WebsiteQueryFilters
) {
const { timezone = 'utc', unit = 'day' } = filters;
const { filterQuery, joinSession, params } = await parseFilters(websiteId, {
const { filterQuery, joinSession, params } = await parseWebsiteFilters(
websiteId,
{
...filters,
});
}
);
return prisma.$queryRaw`
select
@ -377,12 +380,15 @@ export async function getWorkspaceWebsitePageview(
export async function getWorkspaceWebsiteSession(
websiteId: string,
filters: QueryFilters
filters: WebsiteQueryFilters
) {
const { timezone = 'utc', unit = 'day' } = filters;
const { filterQuery, joinSession, params } = await parseFilters(websiteId, {
const { filterQuery, joinSession, params } = await parseWebsiteFilters(
websiteId,
{
...filters,
});
}
);
return prisma.$queryRaw`
select
@ -402,11 +408,14 @@ export async function getWorkspaceWebsiteSession(
export async function getWorkspaceWebsiteStats(
websiteId: string,
filters: QueryFilters
filters: WebsiteQueryFilters
): Promise<any> {
const { filterQuery, joinSession, params } = await parseFilters(websiteId, {
const { filterQuery, joinSession, params } = await parseWebsiteFilters(
websiteId,
{
...filters,
});
}
);
return prisma.$queryRaw`
select

View File

@ -1,10 +1,5 @@
import { prisma } from './_client';
import {
QueryFilters,
parseFilters,
getDateQuery,
getTimestampIntervalQuery,
} from '../utils/prisma';
import { parseWebsiteFilters } from '../utils/prisma';
import { DEFAULT_RESET_DATE, EVENT_TYPE } from '../utils/const';
export async function getWorkspaceUser(workspaceId: string, userId: string) {
@ -67,7 +62,7 @@ export async function deleteWorkspaceWebsite(
}
export async function getWorkspaceWebsiteDateRange(websiteId: string) {
const { params } = await parseFilters(websiteId, {
const { params } = await parseWebsiteFilters(websiteId, {
startDate: new Date(DEFAULT_RESET_DATE),
});

View File

@ -5,10 +5,23 @@ import {
workspaceOwnerProcedure,
workspaceProcedure,
} from '../trpc';
import { OPENAPI_TAG } from '../../utils/const';
import {
EVENT_COLUMNS,
FILTER_COLUMNS,
OPENAPI_TAG,
SESSION_COLUMNS,
} from '../../utils/const';
import { prisma } from '../../model/_client';
import { TelemetryModelSchema } from '../../prisma/zod';
import { OpenApiMeta } from 'trpc-openapi';
import { baseFilterSchema } from '../../model/_schema/filter';
import {
getTelemetryPageview,
getTelemetryPageviewMetrics,
getTelemetrySession,
getTelemetrySessionMetrics,
} from '../../model/telemetry';
import { BaseQueryFilters } from '../../utils/prisma';
export const telemetryRouter = router({
all: workspaceProcedure
@ -94,6 +107,156 @@ export const telemetryRouter = router({
});
}
}),
pageviews: workspaceProcedure
.meta(
buildTelemetryOpenapi({
method: 'GET',
path: '/pageviews',
})
)
.input(
z
.object({
telemetryId: z.string(),
startAt: z.number(),
endAt: z.number(),
unit: z.string().optional(),
})
.merge(baseFilterSchema.partial())
)
.output(z.object({ pageviews: z.any(), sessions: z.any() }))
.query(async ({ input }) => {
const { telemetryId, startAt, endAt, url, country, region, city } = input;
const startDate = new Date(startAt);
const endDate = new Date(endAt);
// const { startDate, endDate, unit } = await parseDateRange({
// websiteId,
// startAt: Number(startAt),
// endAt: Number(endAt),
// unit: String(input.unit),
// });
const filters = {
startDate,
endDate,
unit: input.unit,
url,
country,
region,
city,
};
const [pageviews, sessions] = await Promise.all([
getTelemetryPageview(telemetryId, filters as BaseQueryFilters),
getTelemetrySession(telemetryId, filters as BaseQueryFilters),
]);
return {
pageviews,
sessions,
};
}),
metrics: workspaceProcedure
.meta(
buildTelemetryOpenapi({
method: 'GET',
path: '/metrics',
})
)
.input(
z
.object({
websiteId: z.string(),
type: z.enum([
'url',
'language',
'referrer',
'browser',
'os',
'device',
'country',
'event',
]),
startAt: z.number(),
endAt: z.number(),
})
.merge(baseFilterSchema.partial())
)
.output(
z.array(
z.object({
x: z.string().nullable(),
y: z.number(),
})
)
)
.query(async ({ input }) => {
const { websiteId, type, startAt, endAt, url, country, region, city } =
input;
const startDate = new Date(startAt);
const endDate = new Date(endAt);
// const { startDate, endDate } = await parseDateRange({
// websiteId,
// startAt,
// endAt,
// });
const filters = {
startDate,
endDate,
url,
country,
region,
city,
};
const column = FILTER_COLUMNS[type] || type;
if (SESSION_COLUMNS.includes(type)) {
const data = await getTelemetrySessionMetrics(
websiteId,
column,
filters
);
if (type === 'language') {
const combined: Record<string, any> = {};
for (const { x, y } of data) {
const key = String(x).toLowerCase().split('-')[0];
if (combined[key] === undefined) {
combined[key] = { x: key, y };
} else {
combined[key].y += y;
}
}
return Object.values(combined).map((d) => ({
x: d.x,
y: Number(d.y),
}));
}
return data.map((d) => ({ x: d.x, y: Number(d.y) }));
}
if (EVENT_COLUMNS.includes(type)) {
const data = await getTelemetryPageviewMetrics(
websiteId,
column,
filters
);
return data.map((d) => ({ x: d.x, y: Number(d.y) }));
}
return [];
}),
});
function buildTelemetryOpenapi(meta: OpenApiMetaInfo): OpenApiMeta {

View File

@ -19,7 +19,10 @@ import {
SESSION_COLUMNS,
} from '../../utils/const';
import { parseDateRange } from '../../utils/common';
import { getSessionMetrics, getPageviewMetrics } from '../../model/website';
import {
getWebsiteSessionMetrics,
getWebsitePageviewMetrics,
} from '../../model/website';
import { websiteInfoSchema } from '../../model/_schema';
import { OpenApiMeta } from 'trpc-openapi';
import { hostnameRegex } from '@tianji/shared';
@ -28,7 +31,7 @@ import {
websiteStatsSchema,
} from '../../model/_schema/filter';
import dayjs from 'dayjs';
import { QueryFilters } from '../../utils/prisma';
import { WebsiteQueryFilters } from '../../utils/prisma';
const websiteNameSchema = z.string().max(100);
const websiteDomainSchema = z.union([
@ -163,7 +166,7 @@ export const websiteRouter = router({
country,
region,
city,
} as QueryFilters;
} as WebsiteQueryFilters;
const [metrics, prevPeriod] = await Promise.all([
getWorkspaceWebsiteStats(websiteId, {
@ -301,8 +304,8 @@ export const websiteRouter = router({
};
const [pageviews, sessions] = await Promise.all([
getWorkspaceWebsitePageview(websiteId, filters as QueryFilters),
getWorkspaceWebsiteSession(websiteId, filters as QueryFilters),
getWorkspaceWebsitePageview(websiteId, filters as WebsiteQueryFilters),
getWorkspaceWebsiteSession(websiteId, filters as WebsiteQueryFilters),
]);
return {
@ -397,7 +400,7 @@ export const websiteRouter = router({
const column = FILTER_COLUMNS[type] || type;
if (SESSION_COLUMNS.includes(type)) {
const data = await getSessionMetrics(websiteId, column, filters);
const data = await getWebsiteSessionMetrics(websiteId, column, filters);
if (type === 'language') {
const combined: Record<string, any> = {};
@ -422,7 +425,11 @@ export const websiteRouter = router({
}
if (EVENT_COLUMNS.includes(type)) {
const data = await getPageviewMetrics(websiteId, column, filters);
const data = await getWebsitePageviewMetrics(
websiteId,
column,
filters
);
return data.map((d) => ({ x: d.x, y: Number(d.y) }));
}

View File

@ -4,6 +4,7 @@ import _ from 'lodash';
import { loadWebsite } from '../model/website';
import { maxDate } from './common';
import { FILTER_COLUMNS, OPERATORS, SESSION_COLUMNS } from './const';
import { loadTelemetry } from '../model/telemetry';
const POSTGRESQL_DATE_FORMATS = {
minute: 'YYYY-MM-DD HH24:MI:00',
@ -13,22 +14,25 @@ const POSTGRESQL_DATE_FORMATS = {
year: 'YYYY-01-01',
};
export interface QueryFilters {
export interface BaseQueryFilters {
startDate?: Date;
endDate?: Date;
timezone?: string;
unit?: keyof typeof POSTGRESQL_DATE_FORMATS;
eventType?: number;
url?: string;
country?: string;
region?: string;
city?: string;
}
export interface WebsiteQueryFilters extends BaseQueryFilters {
eventType?: number;
referrer?: string;
title?: string;
query?: string;
os?: string;
browser?: string;
device?: string;
country?: string;
region?: string;
city?: string;
language?: string;
event?: string;
}
@ -38,9 +42,9 @@ export interface QueryOptions {
columns?: { [key: string]: string };
}
export async function parseFilters(
export async function parseWebsiteFilters(
websiteId: string,
filters: QueryFilters = {},
filters: WebsiteQueryFilters = {},
options: QueryOptions = {}
) {
const website = await loadWebsite(websiteId);
@ -62,7 +66,7 @@ export async function parseFilters(
`inner join "WebsiteSession" on "WebsiteEvent"."sessionId" = "WebsiteSession"."id"`,
])
: Prisma.empty,
filterQuery: getFilterQuery(filters, options, websiteDomain),
filterQuery: getWebsiteFilterQuery(filters, options, websiteDomain),
params: {
...normalizeFilters(filters),
websiteId,
@ -77,6 +81,40 @@ export async function parseFilters(
};
}
export async function parseTelemetryFilters(
telemetryId: string,
filters: BaseQueryFilters = {},
options: QueryOptions = {}
) {
const telemetry = await loadTelemetry(telemetryId);
if (!telemetry) {
throw new Error('Not found telemetry');
}
return {
joinSession:
options?.joinSession ||
Object.entries(filters).find(
([key, value]) =>
typeof value !== 'undefined' && SESSION_COLUMNS.includes(key)
)
? Prisma.sql([
`inner join "WebsiteSession" on "WebsiteEvent"."sessionId" = "WebsiteSession"."id"`,
])
: Prisma.empty,
filterQuery: getTelemetryFilterQuery(filters, options),
params: {
...normalizeFilters(filters),
telemetryId,
startDate: dayjs(filters.startDate).toISOString(),
endDate: filters.endDate
? dayjs(filters.endDate).toISOString()
: undefined,
},
};
}
function normalizeFilters(filters: Record<string, any> = {}) {
return Object.keys(filters).reduce((obj, key) => {
const value = filters[key];
@ -87,13 +125,13 @@ function normalizeFilters(filters: Record<string, any> = {}) {
}, {} as Record<string, any>);
}
export function getFilterQuery(
filters: QueryFilters = {},
export function getWebsiteFilterQuery(
filters: WebsiteQueryFilters = {},
options: QueryOptions = {},
websiteDomain: string | null = null
) {
const query = Object.keys(filters).reduce<string[]>((arr, name) => {
const value: any = filters[name as keyof QueryFilters];
const value: any = filters[name as keyof WebsiteQueryFilters];
const operator = value?.filter ?? OPERATORS.equals;
const column = _.get(FILTER_COLUMNS, name, options?.columns?.[name]);
@ -115,6 +153,27 @@ export function getFilterQuery(
return Prisma.sql([query.join('\n')]);
}
export function getTelemetryFilterQuery(
filters: BaseQueryFilters = {},
options: QueryOptions = {}
) {
const query = Object.keys(filters).reduce<string[]>((arr, name) => {
const value: any = filters[name as keyof BaseQueryFilters];
const operator = value?.filter ?? OPERATORS.equals;
const column = _.get(FILTER_COLUMNS, name, options?.columns?.[name]);
// TODO
if (value !== undefined && column) {
arr.push(`AND ${mapFilter(column, operator, name)}`);
}
return arr;
}, []);
return Prisma.sql([query.join('\n')]);
}
function mapFilter(
column: string,
operator: (typeof OPERATORS)[keyof typeof OPERATORS],