Skip to content

add webrtc support #1284

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 28 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15,330 changes: 15,136 additions & 194 deletions assets/schemas.json

Large diffs are not rendered by default.

4,686 changes: 1,907 additions & 2,779 deletions package-lock.json

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
"husky": "^9.1.7",
"prettier": "^3.5.3",
"pretty-quick": "^4.1.1",
"spacebar-webrtc-types": "github:dank074/spacebar-webrtc-types",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

once this is merged, it'd be best for the org to take over this repo

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally, can we somehow get this on npm? some deployment methods dont support using github uris in nodejs packages

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree that it should be an org repo. Once merged repo will be transferred and it can be published under a spacebar npm account

"typescript": "^5.8.3"
},
"dependencies": {
Expand Down Expand Up @@ -118,7 +119,8 @@
"@spacebar/api": "dist/api",
"@spacebar/cdn": "dist/cdn",
"@spacebar/gateway": "dist/gateway",
"@spacebar/util": "dist/util"
"@spacebar/util": "dist/util",
"@spacebar/webrtc": "dist/webrtc"
},
"optionalDependencies": {
"@yukikaze-bot/erlpack": "^1.0.1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ router.patch(
voice_state.save(),
emitEvent({
event: "VOICE_STATE_UPDATE",
data: voice_state,
data: voice_state.toPublicVoiceState(),
guild_id,
} as VoiceStateUpdateEvent),
]);
Expand Down
15 changes: 14 additions & 1 deletion src/bundle/Server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ process.on("uncaughtException", console.error);
import http from "http";
import * as Api from "@spacebar/api";
import * as Gateway from "@spacebar/gateway";
import * as Webrtc from "@spacebar/webrtc";
import { CDNServer } from "@spacebar/cdn";
import express from "express";
import { green, bold } from "picocolors";
Expand All @@ -30,18 +31,25 @@ import { Config, initDatabase, Sentry } from "@spacebar/util";
const app = express();
const server = http.createServer();
const port = Number(process.env.PORT) || 3001;
const wrtcWsPort = Number(process.env.WRTC_WS_PORT) || 3004;
const production = process.env.NODE_ENV == "development" ? false : true;
server.on("request", app);

const api = new Api.SpacebarServer({ server, port, production, app });
const cdn = new CDNServer({ server, port, production, app });
const gateway = new Gateway.Server({ server, port, production });
const webrtc = new Webrtc.Server({
server: undefined,
port: wrtcWsPort,
production,
});

process.on("SIGTERM", async () => {
console.log("Shutting down due to SIGTERM");
await gateway.stop();
await cdn.stop();
await api.stop();
await webrtc.stop();
server.close();
Sentry.close();
});
Expand All @@ -54,7 +62,12 @@ async function main() {
await new Promise((resolve) =>
server.listen({ port }, () => resolve(undefined)),
);
await Promise.all([api.start(), cdn.start(), gateway.start()]);
await Promise.all([
api.start(),
cdn.start(),
gateway.start(),
webrtc.start(),
]);

Sentry.errorHandler(app);

Expand Down
35 changes: 35 additions & 0 deletions src/gateway/events/Close.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import {
Session,
SessionsReplace,
User,
VoiceState,
VoiceStateUpdateEvent,
} from "@spacebar/util";

export async function Close(this: WebSocket, code: number, reason: Buffer) {
Expand All @@ -36,6 +38,39 @@ export async function Close(this: WebSocket, code: number, reason: Buffer) {

if (this.session_id) {
await Session.delete({ session_id: this.session_id });

const voiceState = await VoiceState.findOne({
where: { user_id: this.user_id },
});

// clear the voice state for this session if user was in voice channel
if (
voiceState &&
voiceState.session_id === this.session_id &&
voiceState.channel_id
) {
const prevGuildId = voiceState.guild_id;
const prevChannelId = voiceState.channel_id;

// @ts-expect-error channel_id is nullable
voiceState.channel_id = null;
// @ts-expect-error guild_id is nullable
voiceState.guild_id = null;
voiceState.self_stream = false;
voiceState.self_video = false;
await voiceState.save();

// let the users in previous guild/channel know that user disconnected
await emitEvent({
event: "VOICE_STATE_UPDATE",
data: {
...voiceState.toPublicVoiceState(),
guild_id: prevGuildId, // have to send the previous guild_id because that's what client expects for disconnect messages
},
guild_id: prevGuildId,
channel_id: prevChannelId,
} as VoiceStateUpdateEvent);
}
}

if (this.user_id) {
Expand Down
15 changes: 14 additions & 1 deletion src/gateway/opcodes/Identify.ts
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ export async function onIdentify(this: WebSocket, data: Payload) {
"guild.emojis",
"guild.roles",
"guild.stickers",
"guild.voice_states",
"roles",

// For these entities, `user` is always just the logged in user we fetched above
Expand Down Expand Up @@ -485,6 +486,18 @@ export async function onIdentify(this: WebSocket, data: Payload) {
}),
);

const readySupplementalGuilds = (
guilds.filter((guild) => !guild.unavailable) as Guild[]
).map((guild) => {
return {
voice_states: guild.voice_states.map((state) =>
state.toPublicVoiceState(),
),
id: guild.id,
embedded_activities: [],
};
});

// TODO: ready supplemental
await Send(this, {
op: OPCodes.DISPATCH,
Expand All @@ -498,7 +511,7 @@ export async function onIdentify(this: WebSocket, data: Payload) {
// these merged members seem to be all users currently in vc in your guilds
merged_members: [],
lazy_private_channels: [],
guilds: [], // { voice_states: [], id: string, embedded_activities: [] }
guilds: readySupplementalGuilds, // { voice_states: [], id: string, embedded_activities: [] }
// embedded_activities are users currently in an activity?
disclose: [], // Config.get().general.uniqueUsernames ? ["pomelo"] : []
},
Expand Down
131 changes: 131 additions & 0 deletions src/gateway/opcodes/StreamCreate.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import {
genVoiceToken,
Payload,
WebSocket,
generateStreamKey,
} from "@spacebar/gateway";
import {
Channel,
Config,
emitEvent,
Member,
Region,
Snowflake,
Stream,
StreamCreateEvent,
StreamCreateSchema,
StreamServerUpdateEvent,
StreamSession,
VoiceState,
VoiceStateUpdateEvent,
} from "@spacebar/util";
import { check } from "./instanceOf";

export async function onStreamCreate(this: WebSocket, data: Payload) {
check.call(this, StreamCreateSchema, data.d);
const body = data.d as StreamCreateSchema;

if (body.channel_id.trim().length === 0) return;

// first check if we are in a voice channel already. cannot create a stream if there's no existing voice connection
const voiceState = await VoiceState.findOne({
where: { user_id: this.user_id },
});

if (!voiceState || !voiceState.channel_id) return;

if (body.guild_id) {
voiceState.member = await Member.findOneOrFail({
where: { id: voiceState.user_id, guild_id: voiceState.guild_id },
relations: ["user", "roles"],
});
}

// TODO: permissions check - if it's a guild, check if user is allowed to create stream in this guild

const channel = await Channel.findOne({
where: { id: body.channel_id },
});

if (
!channel ||
(body.type === "guild" && channel.guild_id != body.guild_id)
)
return this.close(4000, "invalid channel");

// TODO: actually apply preferred_region from the event payload
const regions = Config.get().regions;
const guildRegion = regions.available.filter(
(r) => r.id === regions.default,
)[0];

// first make sure theres no other streams for this user that somehow didnt get cleared
await Stream.delete({
owner_id: this.user_id,
});

// create a new entry in db containing the token for authenticating user in stream gateway IDENTIFY
const stream = Stream.create({
id: Snowflake.generate(),
owner_id: this.user_id,
channel_id: body.channel_id,
endpoint: guildRegion.endpoint,
});

await stream.save();

const token = genVoiceToken();

const streamSession = StreamSession.create({
stream_id: stream.id,
user_id: this.user_id,
session_id: this.session_id,
token,
});

await streamSession.save();

const streamKey = generateStreamKey(
body.type,
body.guild_id,
body.channel_id,
this.user_id,
);

await emitEvent({
event: "STREAM_CREATE",
data: {
stream_key: streamKey,
rtc_server_id: stream.id, // for voice connections in guilds it is guild_id, for dm voice calls it seems to be DM channel id, for GoLive streams a generated number
viewer_ids: [],
region: guildRegion.name,
paused: false,
},
user_id: this.user_id,
} as StreamCreateEvent);

await emitEvent({
event: "STREAM_SERVER_UPDATE",
data: {
token: streamSession.token,
stream_key: streamKey,
guild_id: null, // not sure why its always null
endpoint: stream.endpoint,
},
user_id: this.user_id,
} as StreamServerUpdateEvent);

voiceState.self_stream = true;
await voiceState.save();

await emitEvent({
event: "VOICE_STATE_UPDATE",
data: voiceState.toPublicVoiceState(),
guild_id: voiceState.guild_id,
channel_id: voiceState.channel_id,
} as VoiceStateUpdateEvent);
}

//stream key:
// guild:${guild_id}:${channel_id}:${user_id}
// call:${channel_id}:${user_id}
76 changes: 76 additions & 0 deletions src/gateway/opcodes/StreamDelete.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import { parseStreamKey, Payload, WebSocket } from "@spacebar/gateway";
import {
emitEvent,
Stream,
StreamDeleteEvent,
StreamDeleteSchema,
VoiceState,
VoiceStateUpdateEvent,
} from "@spacebar/util";
import { check } from "./instanceOf";

export async function onStreamDelete(this: WebSocket, data: Payload) {
check.call(this, StreamDeleteSchema, data.d);
const body = data.d as StreamDeleteSchema;

let parsedKey: {
type: "guild" | "call";
channelId: string;
guildId?: string;
userId: string;
};

try {
parsedKey = parseStreamKey(body.stream_key);
} catch (e) {
return this.close(4000, "Invalid stream key");
}

const { userId, channelId, guildId, type } = parsedKey;

// when a user selects to stop watching another user stream, this event gets triggered
// just disconnect user without actually deleting stream
if (this.user_id !== userId) {
await emitEvent({
event: "STREAM_DELETE",
data: {
stream_key: body.stream_key,
},
user_id: this.user_id,
} as StreamDeleteEvent);
return;
}

const stream = await Stream.findOne({
where: { channel_id: channelId, owner_id: userId },
});

if (!stream) return;

await stream.remove();

const voiceState = await VoiceState.findOne({
where: { user_id: this.user_id },
});

if (voiceState) {
voiceState.self_stream = false;
await voiceState.save();

await emitEvent({
event: "VOICE_STATE_UPDATE",
data: voiceState.toPublicVoiceState(),
guild_id: guildId,
channel_id: channelId,
} as VoiceStateUpdateEvent);
}

await emitEvent({
event: "STREAM_DELETE",
data: {
stream_key: body.stream_key,
},
guild_id: guildId,
channel_id: channelId,
} as StreamDeleteEvent);
}
Loading
Loading