feat: add feed event notification with event and daily
This commit is contained in:
parent
ab179e9af6
commit
7bfd92be0b
1948
pnpm-lock.yaml
1948
pnpm-lock.yaml
File diff suppressed because it is too large
Load Diff
53
src/server/cache/index.ts
vendored
Normal file
53
src/server/cache/index.ts
vendored
Normal file
@ -0,0 +1,53 @@
|
|||||||
|
import { caching, MemoryCache } from 'cache-manager';
|
||||||
|
import _ from 'lodash';
|
||||||
|
|
||||||
|
let _cacheManager: MemoryCache;
|
||||||
|
export async function getCacheManager() {
|
||||||
|
if (_cacheManager) {
|
||||||
|
return _cacheManager;
|
||||||
|
}
|
||||||
|
|
||||||
|
const cacheManager = await caching('memory', {
|
||||||
|
max: 100,
|
||||||
|
ttl: 10 * 60 * 1000 /*milliseconds*/,
|
||||||
|
});
|
||||||
|
|
||||||
|
_cacheManager = cacheManager;
|
||||||
|
|
||||||
|
return cacheManager;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function buildQueryWithCache<T, Args extends any[]>(
|
||||||
|
fetchFn: (...args: Args) => Promise<T>
|
||||||
|
) {
|
||||||
|
const id = _.uniqueId('cache-query');
|
||||||
|
|
||||||
|
const get = async (...args: Args): Promise<T> => {
|
||||||
|
const key = [id, ...args.map((a) => JSON.stringify(a))].join('|');
|
||||||
|
const cacheManager = await getCacheManager();
|
||||||
|
|
||||||
|
const cachedValue = await cacheManager.get(key);
|
||||||
|
if (cachedValue) {
|
||||||
|
try {
|
||||||
|
return JSON.parse(String(cachedValue));
|
||||||
|
} catch (err) {
|
||||||
|
console.error(err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const realValue = await fetchFn(...args);
|
||||||
|
|
||||||
|
await cacheManager.set(key, JSON.stringify(cachedValue));
|
||||||
|
|
||||||
|
return realValue;
|
||||||
|
};
|
||||||
|
|
||||||
|
const del = async (...args: Args) => {
|
||||||
|
const cacheManager = await getCacheManager();
|
||||||
|
const key = [id, ...args.map((a) => JSON.stringify(a))].join('|');
|
||||||
|
|
||||||
|
await cacheManager.del(key);
|
||||||
|
};
|
||||||
|
|
||||||
|
return { get, del };
|
||||||
|
}
|
@ -2,11 +2,13 @@ import { Cron } from 'croner';
|
|||||||
import { logger } from '../utils/logger';
|
import { logger } from '../utils/logger';
|
||||||
import { prisma } from '../model/_client';
|
import { prisma } from '../model/_client';
|
||||||
import dayjs from 'dayjs';
|
import dayjs from 'dayjs';
|
||||||
import { Prisma } from '@prisma/client';
|
import { FeedChannelNotifyFrequency, Prisma } from '@prisma/client';
|
||||||
import { env } from '../utils/env';
|
import { env } from '../utils/env';
|
||||||
import { sendNotification } from '../model/notification';
|
import { sendNotification } from '../model/notification';
|
||||||
import { token } from '../model/notification/token';
|
import { token } from '../model/notification/token';
|
||||||
import _ from 'lodash';
|
import _ from 'lodash';
|
||||||
|
import pMap from 'p-map';
|
||||||
|
import { sendFeedEventsNotify } from '../model/feed/event';
|
||||||
|
|
||||||
type WebsiteEventCountSqlReturn = {
|
type WebsiteEventCountSqlReturn = {
|
||||||
workspace_id: string;
|
workspace_id: string;
|
||||||
@ -24,6 +26,7 @@ export function initCronjob() {
|
|||||||
clearMonitorEventDaily().catch(logger.error),
|
clearMonitorEventDaily().catch(logger.error),
|
||||||
clearAuditLogDaily().catch(logger.error),
|
clearAuditLogDaily().catch(logger.error),
|
||||||
dailyHTTPCertCheckNotify().catch(logger.error),
|
dailyHTTPCertCheckNotify().catch(logger.error),
|
||||||
|
checkFeedEventsNotify(FeedChannelNotifyFrequency.day),
|
||||||
]);
|
]);
|
||||||
|
|
||||||
logger.info('Daily cronjob completed');
|
logger.info('Daily cronjob completed');
|
||||||
@ -32,6 +35,8 @@ export function initCronjob() {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// TODO: add more cronjob
|
||||||
|
|
||||||
logger.info('Daily job will start at:', dailyJob.nextRun()?.toISOString());
|
logger.info('Daily job will start at:', dailyJob.nextRun()?.toISOString());
|
||||||
|
|
||||||
return { dailyJob };
|
return { dailyJob };
|
||||||
@ -288,3 +293,54 @@ async function dailyHTTPCertCheckNotify() {
|
|||||||
`[dailyHTTPCertCheckNotify] run completed, send ${sendCount} notifications, time usage: ${Date.now() - start}ms`
|
`[dailyHTTPCertCheckNotify] run completed, send ${sendCount} notifications, time usage: ${Date.now() - start}ms`
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function checkFeedEventsNotify(
|
||||||
|
notifyFrequency: FeedChannelNotifyFrequency
|
||||||
|
) {
|
||||||
|
logger.info(
|
||||||
|
'[checkFeedEventsNotify] Start run checkFeedEventsNotify with:',
|
||||||
|
notifyFrequency
|
||||||
|
);
|
||||||
|
|
||||||
|
const channels = await prisma.feedChannel.findMany({
|
||||||
|
where: {
|
||||||
|
notifyFrequency,
|
||||||
|
},
|
||||||
|
include: {
|
||||||
|
notifications: true,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
let startDate = dayjs().subtract(1, 'day').toDate();
|
||||||
|
|
||||||
|
if (notifyFrequency === FeedChannelNotifyFrequency.month) {
|
||||||
|
startDate = dayjs().subtract(1, 'month').toDate();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (notifyFrequency === FeedChannelNotifyFrequency.week) {
|
||||||
|
startDate = dayjs().subtract(1, 'week').toDate();
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info(`[checkFeedEventsNotify] find ${channels.length} channel`);
|
||||||
|
|
||||||
|
await pMap(
|
||||||
|
channels,
|
||||||
|
async (channel) => {
|
||||||
|
const events = await prisma.feedEvent.findMany({
|
||||||
|
where: {
|
||||||
|
channelId: channel.id,
|
||||||
|
createdAt: {
|
||||||
|
gte: startDate,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
sendFeedEventsNotify(channel.notifications, events);
|
||||||
|
},
|
||||||
|
{
|
||||||
|
concurrency: 5,
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
logger.info(`[checkFeedEventsNotify] completed.`);
|
||||||
|
}
|
||||||
|
@ -1,7 +1,35 @@
|
|||||||
import { Prisma } from '@prisma/client';
|
import {
|
||||||
|
FeedChannelNotifyFrequency,
|
||||||
|
FeedEvent,
|
||||||
|
Notification,
|
||||||
|
Prisma,
|
||||||
|
} from '@prisma/client';
|
||||||
import { subscribeEventBus } from '../../ws/shared';
|
import { subscribeEventBus } from '../../ws/shared';
|
||||||
import { prisma } from '../_client';
|
import { prisma } from '../_client';
|
||||||
import { serializeJSON } from '../../utils/json';
|
import { serializeJSON } from '../../utils/json';
|
||||||
|
import { buildQueryWithCache } from '../../cache';
|
||||||
|
import { sendNotification } from '../notification';
|
||||||
|
import { token } from '../notification/token';
|
||||||
|
|
||||||
|
const { get: getFeedEventNotify, del: delFeedEventNotifyCache } =
|
||||||
|
buildQueryWithCache(async (channelId: string) => {
|
||||||
|
const channel = await prisma.feedChannel.findFirst({
|
||||||
|
where: {
|
||||||
|
id: channelId,
|
||||||
|
},
|
||||||
|
include: {
|
||||||
|
notifications: true,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!channel) {
|
||||||
|
return [null, []] as const;
|
||||||
|
}
|
||||||
|
|
||||||
|
return [channel.notifyFrequency, channel.notifications] as const;
|
||||||
|
});
|
||||||
|
|
||||||
|
export { delFeedEventNotifyCache };
|
||||||
|
|
||||||
export async function createFeedEvent(
|
export async function createFeedEvent(
|
||||||
workspaceId: string,
|
workspaceId: string,
|
||||||
@ -15,4 +43,33 @@ export async function createFeedEvent(
|
|||||||
workspaceId,
|
workspaceId,
|
||||||
serializeJSON(event)
|
serializeJSON(event)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
if (event.channelId) {
|
||||||
|
const [notify, notifications] = await getFeedEventNotify(event.channelId);
|
||||||
|
|
||||||
|
if (notify === FeedChannelNotifyFrequency.event) {
|
||||||
|
// send notify every event
|
||||||
|
sendFeedEventsNotify(notifications, [event]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function sendFeedEventsNotify(
|
||||||
|
notifications: Notification[],
|
||||||
|
events: FeedEvent[]
|
||||||
|
) {
|
||||||
|
const eventTokens = events
|
||||||
|
.map((event) => [
|
||||||
|
token.text(
|
||||||
|
`[${event.eventName}] ${event.senderName}: ${event.eventContent}`
|
||||||
|
),
|
||||||
|
token.newline(),
|
||||||
|
])
|
||||||
|
.flat();
|
||||||
|
|
||||||
|
await Promise.all(
|
||||||
|
notifications.map((notification) =>
|
||||||
|
sendNotification(notification, 'Feed Report', eventTokens)
|
||||||
|
)
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@tianji/server",
|
"name": "@tianji/server",
|
||||||
"private": true,
|
"private": true,
|
||||||
"description": "",
|
"description": "",
|
||||||
"main": "index.js",
|
"main": "index.js",
|
||||||
"scripts": {
|
"scripts": {
|
||||||
@ -28,6 +28,7 @@
|
|||||||
"axios": "^1.5.0",
|
"axios": "^1.5.0",
|
||||||
"badge-maker": "^3.3.1",
|
"badge-maker": "^3.3.1",
|
||||||
"bcryptjs": "^2.4.3",
|
"bcryptjs": "^2.4.3",
|
||||||
|
"cache-manager": "^5.7.2",
|
||||||
"chardet": "^2.0.0",
|
"chardet": "^2.0.0",
|
||||||
"compose-middleware": "^5.0.1",
|
"compose-middleware": "^5.0.1",
|
||||||
"compression": "^1.7.4",
|
"compression": "^1.7.4",
|
||||||
|
@ -16,6 +16,7 @@ import { prisma } from '../../../model/_client';
|
|||||||
import _ from 'lodash';
|
import _ from 'lodash';
|
||||||
import { buildFeedPublicOpenapi, feedIntegrationRouter } from './integration';
|
import { buildFeedPublicOpenapi, feedIntegrationRouter } from './integration';
|
||||||
import { fetchDataByCursor } from '../../../utils/prisma';
|
import { fetchDataByCursor } from '../../../utils/prisma';
|
||||||
|
import { delFeedEventNotifyCache } from '../../../model/feed/event';
|
||||||
|
|
||||||
export const feedRouter = router({
|
export const feedRouter = router({
|
||||||
channels: workspaceProcedure
|
channels: workspaceProcedure
|
||||||
@ -149,6 +150,8 @@ export const feedRouter = router({
|
|||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
|
delFeedEventNotifyCache(channelId);
|
||||||
|
|
||||||
if (!channel) {
|
if (!channel) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user