diff --git a/package.json b/package.json index 79cf40a..e4a8ac4 100644 --- a/package.json +++ b/package.json @@ -32,6 +32,7 @@ "dayjs": "^1.11.9", "detect-browser": "^5.3.0", "dotenv": "^16.3.1", + "eventemitter-strict": "^1.0.1", "express": "^4.18.2", "express-async-errors": "^3.1.1", "express-validator": "^7.0.1", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 05ac570..54797c6 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -58,6 +58,9 @@ dependencies: dotenv: specifier: ^16.3.1 version: 16.3.1 + eventemitter-strict: + specifier: ^1.0.1 + version: 1.0.1 express: specifier: ^4.18.2 version: 4.18.2 @@ -3135,6 +3138,10 @@ packages: engines: {node: '>= 0.6'} dev: false + /eventemitter-strict@1.0.1: + resolution: {integrity: sha512-zbePIHR/HVKUpbKSKGxtAVhHjpuiLNp/s0ZYElSvxujE2jNb505+Wv+uUWbRi/bmsbgjvvFImS8HhhNMVHgJdg==} + dev: false + /eventemitter3@4.0.7: resolution: {integrity: sha512-8guHBZCwKnFhYdHr2ysuRWErTwhoN2X8XELRlrRwpmfeY2jjuUN4taQMsULKUVo1K4DvZl+0pgfyoysHxvmvEw==} dev: false diff --git a/src/client/api/socketio.ts b/src/client/api/socketio.ts new file mode 100644 index 0000000..2abef5a --- /dev/null +++ b/src/client/api/socketio.ts @@ -0,0 +1,106 @@ +import { io, Socket } from 'socket.io-client'; +import { getJWT } from './auth'; +import type { SubscribeEventMap, SocketEventMap } from '../../server/ws/shared'; +import { create } from 'zustand'; +import { useEvent } from '../hooks/useEvent'; +import { useEffect } from 'react'; + +const useSocketStore = create<{ + socket: Socket | null; +}>(() => ({ + socket: null, +})); + +export function createSocketIOClient(workspaceId: string) { + const token = getJWT(); + const socket = io(`/${workspaceId}`, { + transports: ['websocket'], + reconnectionDelayMax: 10000, + auth: { + token, + }, + forceNew: true, + }); + + useSocketStore.setState({ + socket, + }); +} + +type SocketEventData = Parameters< + SocketEventMap[Name] +>[0]; + +type SocketEventRet = Parameters< + Parameters[2] +>[0]; + +type SubscribeEventData = Parameters< + SubscribeEventMap[Name] +>[1]; + +export function useSocket() { + const socket = useSocketStore((state) => state.socket); + + const emit = useEvent( + async ( + eventName: T, + eventData: SocketEventData + ): Promise> => { + if (!socket) { + throw new Error('Socketio not init'); + } + + return await socket.emitWithAck(eventName, eventData); + } + ); + + const subscribe = useEvent( + ( + name: T, + onData: (data: SubscribeEventData) => void + ) => { + if (!socket) { + throw new Error('Socketio not init'); + } + + const p = emit('$subscribe', { name }); + + const receiveDataUpdate = (data: any) => { + onData(data); + }; + + const unsubscribe = () => { + Promise.resolve(p).then((cursor) => { + emit('$unsubscribe', { name, cursor }); + socket.off(`${name}#${cursor}` as string, receiveDataUpdate); + }); + }; + + Promise.resolve(p).then((cursor) => { + console.log('aaa'); + socket.on(`${name}#${cursor}` as string, receiveDataUpdate); + }); + + return unsubscribe; + } + ); + + return { emit, subscribe }; +} + +export function useSocketSubscribe( + name: T, + onData: (data: SubscribeEventData) => void +) { + const { subscribe } = useSocket(); + const cb = useEvent(onData); + + useEffect(() => { + const unsubscribe = subscribe(name, cb); + + return () => { + unsubscribe(); + }; + }, [name]); +} diff --git a/src/client/store/user.ts b/src/client/store/user.ts index 3a28a78..c0d89a5 100644 --- a/src/client/store/user.ts +++ b/src/client/store/user.ts @@ -1,5 +1,6 @@ import { create } from 'zustand'; import { UserLoginInfo } from '../api/model/user'; +import { createSocketIOClient } from '../api/socketio'; interface UserState { info: UserLoginInfo | null; @@ -21,6 +22,9 @@ export function setUserInfo(info: UserLoginInfo) { useUserStore.setState({ info, }); + + // create socketio after login + createSocketIOClient(info.currentWorkspace.id); } export function useCurrentWorkspaceId() { diff --git a/src/server/main.ts b/src/server/main.ts index 4e1e23f..e91796b 100644 --- a/src/server/main.ts +++ b/src/server/main.ts @@ -10,22 +10,24 @@ import { userRouter } from './router/user'; import { websiteRouter } from './router/website'; import { workspaceRouter } from './router/workspace'; import { telemetryRouter } from './router/telemetry'; -import { initSocketio } from './ws'; import { trpcExpressMiddleware } from './trpc'; import { initUdpServer } from './udp/server'; +import { createServer } from 'http'; +import { initSocketio } from './ws'; const port = Number(process.env.PORT || 12345); const app = express(); +const httpServer = createServer(app); initUdpServer(port); -initSocketio(app); +initSocketio(httpServer); app.use(compression()); app.use(express.json()); app.use(passport.initialize()); -app.use(morgan('tiny')); +// app.use(morgan('tiny')); // http://expressjs.com/en/advanced/best-practice-security.html#at-a-minimum-disable-x-powered-by-header app.disable('x-powered-by'); @@ -42,7 +44,9 @@ app.use((err: any, req: any, res: any, next: any) => { res.status(500).json({ message: err.message }); }); -ViteExpress.listen(app, port, () => { - console.log(`Server is listening on port ${port}...`); - console.log(`Website: http://127.0.0.1:${port}`); +httpServer.listen(port, () => { + ViteExpress.bind(app, httpServer, () => { + console.log(`Server is listening on port ${port}...`); + console.log(`Website: http://127.0.0.1:${port}`); + }); }); diff --git a/src/server/ws/index.ts b/src/server/ws/index.ts index 63bba5b..278a544 100644 --- a/src/server/ws/index.ts +++ b/src/server/ws/index.ts @@ -1,11 +1,60 @@ -import { Server } from 'socket.io'; -import { Express } from 'express'; -import { createServer } from 'http'; +import { Server as SocketIOServer } from 'socket.io'; +import { Server as HTTPServer } from 'http'; +import { jwtVerify } from '../middleware/auth'; +import { socketEventBus } from './shared'; +import { isUuid } from '../utils/common'; -export function initSocketio(app: Express) { - const httpServer = createServer(app); - - const io = new Server(httpServer, { - // transports: ['websocket'], +export function initSocketio(httpServer: HTTPServer) { + const io = new SocketIOServer(httpServer, { + transports: ['websocket'], + serveClient: false, + cors: { + origin: '*', + methods: ['GET', 'POST'], + }, }); + + io.of((name, auth, next) => { + const workspaceId = name.replace(/^\//, ''); + + next(null, isUuid(workspaceId)); // or false, when the creation is denied + }) + .use(async (socket, next) => { + // Auth + try { + const token = socket.handshake.auth['token']; + if (typeof token !== 'string') { + throw new Error('Token cannot be empty'); + } + + try { + const user = jwtVerify(token); + + console.log('[Socket] Authenticated via JWT:', user.username); + + socket.data.user = user; + socket.data.token = token; + + const workspaceId = socket.nsp.name.replace(/^\//, ''); + socket.data.workspaceId = workspaceId; + + next(); + } catch (err) { + console.error(err); + next(new Error('TokenInvalid')); + } + } catch (err: any) { + next(err); + } + }) + .on('connection', (socket) => { + if (!socket.data.user) { + return; + } + + socket.onAny((eventName, eventData, callback) => { + console.log('[Socket] receive:', { eventName, eventData }); + socketEventBus.emit(eventName, eventData, socket, callback); + }); + }); } diff --git a/src/server/ws/shared.ts b/src/server/ws/shared.ts new file mode 100644 index 0000000..fd70a0a --- /dev/null +++ b/src/server/ws/shared.ts @@ -0,0 +1,55 @@ +import { EventEmitter } from 'eventemitter-strict'; +import { Socket } from 'socket.io'; + +type SubscribeEventFn = (workspaceId: string, eventData: T) => void; + +export interface SubscribeEventMap { + __test: SubscribeEventFn; +} + +type SocketEventFn = ( + eventData: T, + socket: Socket, + callback: (payload: U) => void +) => void; + +export interface SocketEventMap { + // test: SocketEventFn + $subscribe: SocketEventFn<{ name: keyof SubscribeEventMap }, number>; + $unsubscribe: SocketEventFn< + { name: keyof SubscribeEventMap; cursor: number }, + void + >; +} + +export const socketEventBus = new EventEmitter(); +export const subscribeEventBus = new EventEmitter(); + +let i = 0; +const subscribeFnMap: Record> = {}; +socketEventBus.on('$subscribe', (eventData, socket, callback) => { + const _workspaceId = socket.data.workspaceId; + const { name } = eventData; + + const cursor = i++; + const fn: SubscribeEventMap[typeof name] = (workspaceId, data) => { + if (workspaceId === '*' || _workspaceId === workspaceId) { + socket.emit(`${name}#${cursor}`, data); + } + }; + + subscribeEventBus.on(name, fn); + + subscribeFnMap[`${name}#${cursor}`] = fn; + + callback(cursor); +}); +socketEventBus.on('$unsubscribe', (eventData, socket, callback) => { + const { name, cursor } = eventData; + + const fn = subscribeFnMap[`${name}#${cursor}`]; + if (fn) { + delete subscribeFnMap[`${name}#${cursor}`]; + subscribeEventBus.off(name, fn); + } +});