feat: add socketio support

This commit is contained in:
moonrailgun 2023-10-03 19:47:17 +08:00
parent d20aedda3f
commit 46c83fcb01
7 changed files with 240 additions and 14 deletions

View File

@ -32,6 +32,7 @@
"dayjs": "^1.11.9",
"detect-browser": "^5.3.0",
"dotenv": "^16.3.1",
"eventemitter-strict": "^1.0.1",
"express": "^4.18.2",
"express-async-errors": "^3.1.1",
"express-validator": "^7.0.1",

View File

@ -58,6 +58,9 @@ dependencies:
dotenv:
specifier: ^16.3.1
version: 16.3.1
eventemitter-strict:
specifier: ^1.0.1
version: 1.0.1
express:
specifier: ^4.18.2
version: 4.18.2
@ -3135,6 +3138,10 @@ packages:
engines: {node: '>= 0.6'}
dev: false
/eventemitter-strict@1.0.1:
resolution: {integrity: sha512-zbePIHR/HVKUpbKSKGxtAVhHjpuiLNp/s0ZYElSvxujE2jNb505+Wv+uUWbRi/bmsbgjvvFImS8HhhNMVHgJdg==}
dev: false
/eventemitter3@4.0.7:
resolution: {integrity: sha512-8guHBZCwKnFhYdHr2ysuRWErTwhoN2X8XELRlrRwpmfeY2jjuUN4taQMsULKUVo1K4DvZl+0pgfyoysHxvmvEw==}
dev: false

106
src/client/api/socketio.ts Normal file
View File

@ -0,0 +1,106 @@
import { io, Socket } from 'socket.io-client';
import { getJWT } from './auth';
import type { SubscribeEventMap, SocketEventMap } from '../../server/ws/shared';
import { create } from 'zustand';
import { useEvent } from '../hooks/useEvent';
import { useEffect } from 'react';
const useSocketStore = create<{
socket: Socket | null;
}>(() => ({
socket: null,
}));
export function createSocketIOClient(workspaceId: string) {
const token = getJWT();
const socket = io(`/${workspaceId}`, {
transports: ['websocket'],
reconnectionDelayMax: 10000,
auth: {
token,
},
forceNew: true,
});
useSocketStore.setState({
socket,
});
}
type SocketEventData<Name extends keyof SocketEventMap> = Parameters<
SocketEventMap[Name]
>[0];
type SocketEventRet<Name extends keyof SocketEventMap> = Parameters<
Parameters<SocketEventMap[Name]>[2]
>[0];
type SubscribeEventData<Name extends keyof SubscribeEventMap> = Parameters<
SubscribeEventMap[Name]
>[1];
export function useSocket() {
const socket = useSocketStore((state) => state.socket);
const emit = useEvent(
async <T extends keyof SocketEventMap>(
eventName: T,
eventData: SocketEventData<T>
): Promise<SocketEventRet<T>> => {
if (!socket) {
throw new Error('Socketio not init');
}
return await socket.emitWithAck(eventName, eventData);
}
);
const subscribe = useEvent(
<T extends keyof SubscribeEventMap>(
name: T,
onData: (data: SubscribeEventData<T>) => void
) => {
if (!socket) {
throw new Error('Socketio not init');
}
const p = emit('$subscribe', { name });
const receiveDataUpdate = (data: any) => {
onData(data);
};
const unsubscribe = () => {
Promise.resolve(p).then((cursor) => {
emit('$unsubscribe', { name, cursor });
socket.off(`${name}#${cursor}` as string, receiveDataUpdate);
});
};
Promise.resolve(p).then((cursor) => {
console.log('aaa');
socket.on(`${name}#${cursor}` as string, receiveDataUpdate);
});
return unsubscribe;
}
);
return { emit, subscribe };
}
export function useSocketSubscribe<T extends keyof SubscribeEventMap>(
name: T,
onData: (data: SubscribeEventData<T>) => void
) {
const { subscribe } = useSocket();
const cb = useEvent(onData);
useEffect(() => {
const unsubscribe = subscribe(name, cb);
return () => {
unsubscribe();
};
}, [name]);
}

View File

@ -1,5 +1,6 @@
import { create } from 'zustand';
import { UserLoginInfo } from '../api/model/user';
import { createSocketIOClient } from '../api/socketio';
interface UserState {
info: UserLoginInfo | null;
@ -21,6 +22,9 @@ export function setUserInfo(info: UserLoginInfo) {
useUserStore.setState({
info,
});
// create socketio after login
createSocketIOClient(info.currentWorkspace.id);
}
export function useCurrentWorkspaceId() {

View File

@ -10,22 +10,24 @@ import { userRouter } from './router/user';
import { websiteRouter } from './router/website';
import { workspaceRouter } from './router/workspace';
import { telemetryRouter } from './router/telemetry';
import { initSocketio } from './ws';
import { trpcExpressMiddleware } from './trpc';
import { initUdpServer } from './udp/server';
import { createServer } from 'http';
import { initSocketio } from './ws';
const port = Number(process.env.PORT || 12345);
const app = express();
const httpServer = createServer(app);
initUdpServer(port);
initSocketio(app);
initSocketio(httpServer);
app.use(compression());
app.use(express.json());
app.use(passport.initialize());
app.use(morgan('tiny'));
// app.use(morgan('tiny'));
// http://expressjs.com/en/advanced/best-practice-security.html#at-a-minimum-disable-x-powered-by-header
app.disable('x-powered-by');
@ -42,7 +44,9 @@ app.use((err: any, req: any, res: any, next: any) => {
res.status(500).json({ message: err.message });
});
ViteExpress.listen(app, port, () => {
httpServer.listen(port, () => {
ViteExpress.bind(app, httpServer, () => {
console.log(`Server is listening on port ${port}...`);
console.log(`Website: http://127.0.0.1:${port}`);
});
});

View File

@ -1,11 +1,60 @@
import { Server } from 'socket.io';
import { Express } from 'express';
import { createServer } from 'http';
import { Server as SocketIOServer } from 'socket.io';
import { Server as HTTPServer } from 'http';
import { jwtVerify } from '../middleware/auth';
import { socketEventBus } from './shared';
import { isUuid } from '../utils/common';
export function initSocketio(app: Express) {
const httpServer = createServer(app);
export function initSocketio(httpServer: HTTPServer) {
const io = new SocketIOServer(httpServer, {
transports: ['websocket'],
serveClient: false,
cors: {
origin: '*',
methods: ['GET', 'POST'],
},
});
const io = new Server(httpServer, {
// transports: ['websocket'],
io.of((name, auth, next) => {
const workspaceId = name.replace(/^\//, '');
next(null, isUuid(workspaceId)); // or false, when the creation is denied
})
.use(async (socket, next) => {
// Auth
try {
const token = socket.handshake.auth['token'];
if (typeof token !== 'string') {
throw new Error('Token cannot be empty');
}
try {
const user = jwtVerify(token);
console.log('[Socket] Authenticated via JWT:', user.username);
socket.data.user = user;
socket.data.token = token;
const workspaceId = socket.nsp.name.replace(/^\//, '');
socket.data.workspaceId = workspaceId;
next();
} catch (err) {
console.error(err);
next(new Error('TokenInvalid'));
}
} catch (err: any) {
next(err);
}
})
.on('connection', (socket) => {
if (!socket.data.user) {
return;
}
socket.onAny((eventName, eventData, callback) => {
console.log('[Socket] receive:', { eventName, eventData });
socketEventBus.emit(eventName, eventData, socket, callback);
});
});
}

55
src/server/ws/shared.ts Normal file
View File

@ -0,0 +1,55 @@
import { EventEmitter } from 'eventemitter-strict';
import { Socket } from 'socket.io';
type SubscribeEventFn<T> = (workspaceId: string, eventData: T) => void;
export interface SubscribeEventMap {
__test: SubscribeEventFn<number>;
}
type SocketEventFn<T, U = unknown> = (
eventData: T,
socket: Socket,
callback: (payload: U) => void
) => void;
export interface SocketEventMap {
// test: SocketEventFn<number, number>
$subscribe: SocketEventFn<{ name: keyof SubscribeEventMap }, number>;
$unsubscribe: SocketEventFn<
{ name: keyof SubscribeEventMap; cursor: number },
void
>;
}
export const socketEventBus = new EventEmitter<SocketEventMap>();
export const subscribeEventBus = new EventEmitter<SubscribeEventMap>();
let i = 0;
const subscribeFnMap: Record<string, SubscribeEventFn<any>> = {};
socketEventBus.on('$subscribe', (eventData, socket, callback) => {
const _workspaceId = socket.data.workspaceId;
const { name } = eventData;
const cursor = i++;
const fn: SubscribeEventMap[typeof name] = (workspaceId, data) => {
if (workspaceId === '*' || _workspaceId === workspaceId) {
socket.emit(`${name}#${cursor}`, data);
}
};
subscribeEventBus.on(name, fn);
subscribeFnMap[`${name}#${cursor}`] = fn;
callback(cursor);
});
socketEventBus.on('$unsubscribe', (eventData, socket, callback) => {
const { name, cursor } = eventData;
const fn = subscribeFnMap[`${name}#${cursor}`];
if (fn) {
delete subscribeFnMap[`${name}#${cursor}`];
subscribeEventBus.off(name, fn);
}
});