Vue、WebRTC、SocketIO、Node和Redis助力多人视频会议

几个月来疫情持续蔓延,大家使用视频会议应用的次数也水涨船高。无论身处何地,只要有视频应用我们就能与朋友见面。那我们为什么不试着定制一款自己的会议应用,让会话变得更加真实呢? 话不多说,我们直接开始吧!

Zoom截图

实现这个目标不需要我们从头开始。在之前的文章中,我已经介绍过构建具有一对一私人视频功能的应用所需的各个步骤。那么我们就以此为基础对其进行完善,最后添加多对多视频的功能就大功告成了。

(因为下文我们会提到一些在上一篇文章中解释过的概念,如果你之前没有读过,我建议你先移步上一篇文章。)

基于需求,视频会议应具备以下功能:

创建会议的用户自动成为管理员,只有他/她能邀请其他人进入会议;

管理员只能邀请同一公共房间的用户进入会议;

无论是启动会议还是加入会议,用户都不能与其他用户私下交谈;

已经进入某个会议的用户不能进行私聊,也就是说没有人可以和他/她对话;

如果管理员结束会议,所有用户会自动退出会议;

测试视频会议功能

在深入探讨实现细节之前,我们先来了解一下webRTC多方架构的主要方法。

网状结构(Mesh

Mesh是最简单的一种架构。所有端之间都是相互连接的,会直接把自己的媒体发送到其他所有端上。

Mesh webRTC架构

优点:

. 基本和简单的webRTC实现

. 不需要多媒体中心服务器

缺点:

. 加载和带宽消耗(N-1上行和下行链路)过多;

. 不能扩展容纳过多端(最多4-6个端)。

混合和MCU(Multipoint Conference Unit)

每个端将其媒体发送到中心服务器,并从中心服务器接收媒体。MCU作为一个混合点,接收、解码和混合来自所有端的媒体,最后以单一流的形式发送给所有用户。

MCU混合架构

优点:

. 在客户端基本实现webRTC

. 每个端都有1个上行和下行链路

缺点:

. 需要具备强大处理能力的服务器端(解码和编码每个端的媒体)。

比如Kurento提供MCU媒体服务器来实现视频应用(下文提到的SFU拓扑结构除外)。

路由和SFU(Selective Forward Unit)

每个端将自己的媒体发送到中心服务器,并从它那里接收所有其他的媒体流。SFU就像一个媒体的路由器,接收所有用户的媒体流,然后决定将哪些流发送给哪些用户。

SFU路由架构

优点:

. 服务器端计算成本较低(比MCU低)

. 非对称带宽(1个上行链路和N-1个下行链路)即可,适合ADSL连接。

缺点:

. 服务器端设计和实现较复杂。

SFU有三种不同的方法路由媒体:多单播、同播和SVC(可扩展视频编码)。像OpenViduMediasoup等供应商都提供了这几种拓扑。

给予我们的需求,我们决定实现第一种方法——最多支持3个端口的网状拓扑(但还是能扩展到更多用户的)。接下来我们来讨论下实现的细节。

完善措施

在开始讨论构建app的细节前,我们先进一步改进之前的架构,以期获取更简洁、架构更合理的代码。

WebRTC通信机制

如前所述,在mesh架构中,所有的端与端之间都是直接连接的,之前私聊中两端间建立连接的机制和配置都没变。因此,我们在WebRTC.js文件中把这些相关项当作一个mixin单拎出来了:

export const videoConfiguration = {
    data() {
        return {
            constraints: {}, // Media constraints
            configuration: servers, // TURN/STUN ice servers    
            // Offer config
            offerOptions: {
                offerToReceiveAudio: 1,
                offerToReceiveVideo: 1
            },
            // Local video 
            myVideo: undefined,
            localStream: undefined,
            username: ""
        }
    },
    created() {
        this.username = this.$store.state.username
    },
    // Method implementations
    methods: {
        async getUserMedia() { ... },
        getAudioVideo() { ... },
        async setRemoteDescription(remoteDesc, pc) {
            try {
                log(`${this.username} setRemoteDescription: start`)
                await pc.setRemoteDescription(remoteDesc)
                log(`${this.username} setRemoteDescription: finished`)
            } catch (error) {
                log(`Error setting the RemoteDescription in ${this.username}. Error: ${error}`)
            }
        },
        async createOffer(pc, to, room, conference = false) { ... },
        async createAnswer(pc, to, room, conference) { ... },
        async handleAnswer(desc, pc, from, room, conference = false) { ... },
        sendSignalingMessage(desc, offer, to, room, conference) { ... },
        addLocalStream(pc) { ... },
        addCandidate(pc, candidate) { ... },
        onIceCandidates(pc, to, room, conference = false) { ... },
    },
}

(WebRTC mixin)

现在,我们已经找到创建所有RTCPeerConnection之外步骤的通用方法了,接下来就需要适当的组件提供和处理PeerConnection对象和过程中所需信息(远程描述、候选人、请求、回复等等)了。

详见使用 mixins 时所有的合并策略

视频通话和相关控制

考虑到视频相关的情况,我们创建了一个通用的Video.vue组件让视频变得更简单。

<template>
  <div class="video">
    <div class="video__spinner">
      <md-progress-spinner 
        v-if="!videoStream" 
        class="md-accent" 
        md-mode="indeterminate">
      </md-progress-spinner>
    </div>
    <AudioVideoControls 
        v-if="displayControls" 
        :pauseVideo="pauseVideo" 
        :pauseAudio="pauseAudio">
    </AudioVideoControls>
    <video :id="videoId" autoplay="true"> </video>
  </div>
</template>

<script>
export default {
  name: "Video",
  components: { AudioVideoControls },
  props: {
      videoId: String,
      displayControls: Boolean,
      videoStream: MediaStream, 
      pauseVideo: Function, 
      pauseAudio: Function,
  }
}
</script>

(视频组件)

不管是哪种情况,组件都会获取合适的媒体流以及输入值的剩余部分。

pauseVideo和pauseAudio是通用的mixin媒体方法。

此外,我们还添加了视频音频控制(AudioVideControls.vue),这样,每一端都可以分别暂停/恢复或关闭/开启自己的视频和音频,具体操作如下。

// Pause video
this.ls.getVideoTracks().forEach(t => t.enabled = !t.enabled)
// Pause audio
this.ls.getAudioTracks().forEach(t => t.enabled = !t.enabled)

这并不意味着你每次都要重启webRTC连接,你只需启用/禁用音频/视频本地流轨道就可以了。

音频

在我们之前发布的版本中,音频问题较突出。所以这次我们在请求用户媒体时的普通配置中设置约束,从而解决了反馈的问题。

constraints: {
 audio: {
  echoCancellation: true,
  noiseSuppression: true,
  autoGainControl: false
 }
}

MediaTrack API中可查看所有可用的约束条件。

但这还不够。为了消除回声和噪音,我们还需要将本地视频元素的音量静音并设置为0,如下所示:

// Local video
<video id="localVideo" autoplay="true" muted> </video>
// Fix volume to 0
this.myVideo.volume = 0

注:这个操作只静音了本地播放,并不意味着你把正在运行的音频流也静音了。

WebRTC适配器

为了处理可能会出现的浏览器问题,我们引入了WebRTC适配器,把代码和不同的WebRTC浏览器实现连接起来。

import adapter from ‘webrtc-adapter’
console.log(`Browser ${adapter.browserDetails.browser} — version ${adapter.browserDetails.version}`)

用户信息

如上所述,用户无法一边私聊一边参加会议。所以我们在用户进入房间时,就在Redis中存储的对象中添加了一个会议标志(joinRoom web socket服务器监听器),来监测用户状态。

try {
  // add user to the suitable ROOM
  await ChatRedis.addUser(room, userName, { username, status,   
   privateChat: false, 
   conference: false 
  })
  const users = await ChatRedis.getUsers(room)
  // Notify all the users in the same room   
  namespace.in(room).emit(‘newUser’, { users, username })
} catch (error) {
  console.log(error)
}

注:我们更改了redis hash模式中的密钥(从socketId 变成了userName),这使得从web socket连接中获取用户信息更方便了。

会议

某一用户发起会议,会议开始,该用户成为该房间管理员。当开启app时,管理员会自动加入由自己用户名定义的会议室。

注:每个用户名在系统中都是唯一不重复的,所以每个web socket会议室都是独一无二的。

为做到这一点,我们开发了一个新的 joinConference 服务器监听器。

const joinConference = (socket, namespace) => ({ username, room, to, from }) => {
    const admin = username === to
    console.log(admin
        ? `Conference - User "${username}" wants to open a conference room`
        : `Conference - User "${username}" wants to join the "${to}" conference`)

    // Join the room
    socket.join(to, async () => {
        if (!room) return
        try {
            const user = await ChatRedis.getUser(room, username)
            await ChatRedis.setUser(room, username, { ...user, conference: to })
            namespace.in(to).emit('joinConference', { username, to, room, from })
        } catch (error) {
            console.log(error)
        }
    })
}

借助 conference: to同一会议内的所有端用户都拥有相同的会议标志值,即管理员用户名。

而在前端方面,新的Conference.vue组件也会持有相关功能。

<template>
<div class="conference-container">
    <div class="conference-container__header">
      <h3>Private conference (up to 3)</h3>
      <md-menu>
        <md-button 
          class="md-icon-button page-container-logout" 
          md-menu-trigger
          :disabled="peersLength === 2 || users.length === 1"
          v-if="conference.admin">
          <md-icon>group_add</md-icon>
        </md-button>
        <md-menu-content>
          <div v-for="user in users" :key="user.username">
            <md-menu-item 
              v-if="user.username !== $store.state.username && !peers[user.username]" 
              @click="invitate(user.username)">
                <md-icon>person_add</md-icon>
                <span>{{user.username}}</span>
            </md-menu-item>
          </div>
        </md-menu-content>
      </md-menu>
    </div>
    
    <div class="conference-container__videos">
      <div class="video">
        <Video
          videoId="localVideo"
          :displayControls="true"
          :videoStream="localStream"
          :pauseVideo="pauseVideo"
          :pauseAudio="pauseAudio"
          :muted="true">
        </Video>
      </div>
      <!-- Peers video elements -->
      <div class="conference-container__videos--remote">
        <div v-for="(item, key) in peers" :key="key" class="video">
            <Video
              :videoId="key"
              :displayControls="false"
              :videoStream="peers[key].peerStream"
              :muted="false">
            </Video>
        </div>
      </div>
    </div>
  </div>
</template>

<script>
export default {
  props: {
    conference: Object,
    users: Array // Users within the same public room
  },
  mixins:[videoConfiguration], // WebRTC mixin
  components: { Video },
  data: () => ({
    peers: {}, // Peers connected to the conference
    peersLength: 0
  }),
  async mounted() {
    this.myVideo = document.getElementById("localVideo")
    // Admin join the room
    if (this.conference.admin) {
      await this.getUserMedia()
      this.$socket.emit(WS_EVENTS.joinConference, { ...this.$store.state,
        to: this.username
      })
    }
  },
  methods: {
    invitate(user) {
      this.$socket.emit(WS_EVENTS.conferenceInvitation, {
        room: this.$store.state.room,
        to: user,
        from: this.username
      })
    },
  }
}
</script>

peers对象会监测会议中的所有用户。

peers: {
  userA: {
    username: // Peer username
    pc: // RTCPeerConnection 
    peerStream: //Peer media stream
    peerVideo: // Peer video element
  },
  userB: { .. },
  ..
}

我们会储存每个用户的用户名、端连接对象(RTCPeerConnection)、为该端定位的视频元素以及连接成功建立后获得的视频流。

为完成上述操作,我们本应该使用Map作为数据结构的。但对于构建视频会议app,一个对象结构就够了。但Map在存储大数据集(O(1))时的表现往往更好,所以对于大型会议来说,Map可能是个更好的选择。

当管理员开启会议、会议组件实例化后,我们(与会者)就能获取媒体流并自动加入会议室了。若有新的与会者想加入,管理员会通过 conferenceInvitation 事件向每个新加入的端发送邀请。

const conferenceInvitation = (namespace) => async ({ room, to, from }) => {
    console.log(`Conference - Invitation from "${from}" to "${to}" in room ${room}`)
    try {
        const { privateChat, conference } = await ChatRedis.getUser(room, to)
        // User already talking
        if (privateChat || conference) {
            console.log(`Conference - User "${to}" is already talking. PrivateChat: ${privateChat} - Conference: ${conference}`)
            return namespace.to(from).emit('conferenceInvitation', { message: `User ${to} is already talking`, from })
        }
        namespace.in(room).emit('conferenceInvitation', { room, to, from })
    } catch (error) {
        console.log(error)
    }
}

(会议邀请的web socket服务器监听器)

简而言之,管理员和初次加入的与会者间的交互机制可以概括为以下几点:

1.        管理员(A)开启会议并加入会议室(即joinConference socket事件);

2.        A向用户B发送邀请(即conferenceInvitation socket事件);

3.        B收到邀请,加入会议(即joinConference事件);

4.        A收到邀请确认后,添加B为会议成员(即添加到自己的端对象中),创建并发送给B请求;

5.        B收到请求后,添加A为会议成员(即添加到自己的端对象中),创建并将请求再发回给A

就像我们之前所做的那样,我们已经用Chat.vue视图父组件中实现了用合适的FE socket监听器来处理邀请和确认事件。

除对端对象进行相应管理之外,webRTC机制在第4、第5点中的作用与上一篇文章中所述完全相同。但如今会议室会成为会议中所有与会者的信令机制。

这样安排目的还是为会议所有端提供交流方式,以交换通信所需的元数据。为了简化操作,我们定义了另一个信令socket事件PCSignalingConference,并在Chat.vue视图中定义了相关的监听器方法:

PCSignalingConference: function({ desc, from, to, candidate }) {
 
 // Rule out message when needed
 if (from === this.$store.state.username || 
    (!!to && to !==  this.$store.state.username)) return
 if (desc) {
   // Offer
   if (desc.type === DESCRIPTION_TYPE.offer)
     this.conference = { 
       …this.conference, 
       offer: { from, desc }, 
       open: true 
     }
   // Answer
   else if (desc.type === DESCRIPTION_TYPE.answer)
     this.conference = { 
       …this.conference, 
       answer: { from, desc } 
     }
 } else if (candidate) {
   // Candidate
   this.conference = { 
     …this.conference, 
    candidate: { from, candidate } 
   }
 }
}

和之前一样,我们根据信令通道发送的信息区分出请求、答复或者加入连接的新与会者。

但即使按照同样的流程,管理员A和用户B到底要如何操作呢?

B确认到达后,管理员才会激活会议,所以我们要在Conference.vue组件中添加以下内容:

<script>
export default {
  methods: {
    initWebRTC(user, desc) {
      // Add user
      this.$set(this.peers, user, {
        username: user,
        pc: new RTCPeerConnection(this.configuration),
        peerStream: undefined,
        peerVideo: undefined
      })
      this.addLocalStream(this.peers[user].pc)
      this.onIceCandidates(this.peers[user].pc, user, this.conference.room, true)
      this.onAddStream(this.peers[user], user)
      
      // Act accordingly
      desc 
        ? this.handleAnswer(desc, this.peers[user].pc, user, this.conference.room, true)
        : this.createOffer(this.peers[user].pc, user, this.conference.room, true)
    },
  },
  watch: {
    conference: function({ user, answer, candidate, userLeft, offer }, oldVal) {
      // New user
      if(user && user !== oldVal.user) {
        this.initWebRTC(user)
        this.peersLength++
      }
    }
  }
}
</script>

(和与会的新用户交互)

每当新用户入会,管理员会通过 initWebRTC 方法启动 webRTC 机制,依照我们在私人会话中的方式(只不过现在是使用 mixin )创建邀请。

需要注意的是,由于在实例初始化后检测属性添加或删除时的限制,我们使用this.$set添加一个新端到端对象中。点击此处获取更多有关反应性的细节。

另一方面,用户B在获取管理员允许前不能发起会议。所以我们也要在Conference.vue组件中进行相应修改。

<script>
export default {
  async mounted() {
    this.myVideo = document.getElementById("localVideo")
    // New user gets the offer
    if(this.conference.offer) {
      const { offer: { from, desc } } = this.conference
      this.init(from, desc)
    }
  },
  methods: {
    async init(offer, desc) {
      await this.getUserMedia()
      this.initWebRTC(offer, desc)
    },
    initWebRTC(user, desc) { ... },
  }
}
</script>

(新的与会者收到邀请)

在收到邀请后,新的段会通过上述同样逻辑的initWebRTC创建并发送回复。

如果会议中已经有两个用户,而管理员又邀请了第三个用户呢?

同上述流程相同,只是扩展到了更多用户。

访客B加入已存在两个用户的会议

如图所示,在管理员邀请(1)之后,会议内所有人在得到确认(2)后会立即向新的端发出邀请(3)。之后,新的端会对每个用户进行回复(4),从而建立两个连接(N-1上行和下行链路)。

相信现在你也看到了,如果我们想把该操作扩展到更多的用户,就会变得有点麻烦。

请记住,虽然这两个邀请不会同时发生,也就是说它们不能以任何特定的顺序与新端交互。所以用户B需要在我们的Conference.vue组件中处理缺乏同步性的问题。

watch: {
   conference: function({ offer }, oldVal) {
     // New offer
     if(offer && offer !== oldVal.offer && !!oldVal.offer){
       const { from, desc } = offer
       this.init(from, desc)
     }
   }
}

如前所述,第一次请求确认后,会议会被激活,所以我们也需要定位之后出现的新请求。但我们的项目只需借props conference对象,一次只处理一个请求。所以在处理几个连续请求时,我们要注意不误导对象引用。如此看来,对于上述这两种情况,检索请求信息时都要创建常量。

会议对象指的是包含正确请求、答复、ice candidates、远程用户等信息的对象。

另外,会议结束后我们会退出会议、停止所有媒体流。

beforeDestroy() {
  // Close all peer connections
  Object.values(this.peers).forEach(peer => peer.pc.close())
  this.peers = {}
  // Leave conference
  this.$socket.emit(WS_EVENTS.leaveConference, { 
    …this.$store.state,
    from: this.username,
    conferenceRoom: this.conference.room
  })
},

每个端都会重置所有对等端连接,通过 leaveConference 事件离开会议,重置会议标志。

const leaveConference = (socket, namespace) => async ({ room, from, conferenceRoom }) => {
    console.log(`Conference - User "${from}" wants to leave the conference room ${room}`)

    try {
        const user = await ChatRedis.getUser(room, from)
        await ChatRedis.setUser(room, from, { ...user, conference: false })
        socket.leave(conferenceRoom, () => {
            namespace.to(conferenceRoom).emit('leaveConference', { room, from })
        })
    } catch (error) {
        console.log(error)
    }
}

(离开会议的网络socket服务器监听器)

媒体方面,webRTC mixin会在销毁前重置本地媒体流。

beforeDestroy() {
  this.localStream.getTracks().forEach(track => track.stop())
}

整合上述所有操作,我们就可以试运行了!

为进行测试,我们为每个用户都创建了一个应用实例。所以我们会借docker-compose.yml文件在配置中添加第三个应用副本。

# Copy 3
chat3:
 build:
  context: .
  args:
   VUE_APP_SOCKET_HOST: localhost
   VUE_APP_SOCKET_PORT: 3002
 ports:
  - 3002:3002
 networks:
  - video-chat
 depends_on:
  - redis
 environment:
  PORT: 3002
  REDIS_HOST: redis
  REDIS_PORT: 6379

这样我们就得到了如下所示的本地测试环境。

本地测试环境

现在,我们只需要通过docker-compose来构建和运行应用就可以了!

进行含3个对等端视频会议的本地测试(每个用户一个实例)

我们安排了三个用户连接到不同的实例,并通过端对端连接进行包含这三个用户的会议。

有时webRTC应用中的错误追寻和解决操作有点复杂。这时,你可以使用Firefox检查about:webrtc的页面。它将为你提供关于SDP会话、ICE candidates等及时信息。

注:WebRTC使用ICE框架来克服网络的复杂性。

在本地环境下做测试应该能顺利进行。在本地测试中,对等端会通过host候选来交换网络信息,也就是说ip地址就是远程对等端(同一网络内的所有对等端)的真实地址。请看一个UDP请求的小例子。

a=candidate:0 1 UDP 2122121471 198.167.1.138 54056 typ host
a=candidate:6 1 UDP 2122252543 fd8b:15c5:43b9:9m00:1c89:1vvc:2592:c9c6 54057 typ host
a=candidate:18 1 TCP 2105393407 192.168.1.130 9 typ host tcptype active

TCP candidate仅在 UDP 不可用或受到限制而不适合媒体流时使用

在实际环境中,信息交换通常通过来自STUN服务器的srflx和prflx候选者来完成,两个对等端会发现他们的公共IP地址和他们所处的NAT类型。大多数情况下,该交换只在连接设置过程中出现。因为只要建立了连接,媒体流会直接在对等端和视频网关之间流动。

a=candidate:1 1 UDP 1685921791 212.194.185.191 47355 typ srflx raddr 192.168.1.130 rport 54056
WebRTC网络架构

但在其他一些情况下,比如远程对等端的网络受限,这就需要使用TURN服务器和relay候选者。relay候选者的IP地址是TURN服务器在直接连接失败时用来转发两个对等端之间媒体的地址。

a=candidate:3 1 UDP 92086015 133.244.182.3 60795 typ relay raddr 134.219.114.1 rport 60795

由于其特性,你会发现实际操作中有很多公共STUN服务器(比如该列表中的服务器),因为媒体流通过服务器就意味着带宽消耗。

所以请记住,如果你没有提供合适的TURN服务器配置,所有在受限网络的连接都会失败(不论何种情况)。点击此处,你可以检查任何服务器的连接情况。

如果你对TURN服务器感兴趣,有一些类似coturn的开源工具可以帮你创建自己的服务器。

总结

上述所讲案例的潜力非常大,其中囊括了使用mesh结构建立多对多视频会议的所有必要步骤。这种拓扑结构对于创建简单案例(理想情况下,用户数少于4个)来说是个不错的选择,如果你真的想建立大规模的服务,MCU和SFU才是正确方法。你需要将大部分精力集中在服务器的实现上。

点击此处,你可以在这个库里找到所需的所有源代码。

文章地址:https://levelup.gitconnected.com/multiparty-video-conference-using-vue-webrtc-socketio-node-redis-e8c5a059d332

原文作者:Adrián García Diéguez

填写常用邮箱,接收社区更新

WebRTC 中文社区由

运营