Skip to content

Commit 89285ce

Browse files
authored
Send side Simulcast/Track encodings (webrtc-rs#578)
* track_local: Add ability to set RTP stream ID This change makes it possible to set the RTP stream ID to allow forwarding and production of simulcast streams. * peer_connection: Bolt on send side simulcast Introduces add_encoding method in RTP sender to add simulcast encodings. * rtp_sender: Add some missing tests Ported over from pion * peer_connection: Add Simulcast send test This is a port from a pion test * track_local: Insert mid and rid RTP header extension When MID and RTP stream ID header extension are enabled all RTP packets get corresponding MID and RID added as extension headers. * peer_connection: Handle Simulcast Offer with one Media Section This is ported from Pion
1 parent 1ee5f79 commit 89285ce

File tree

12 files changed

+911
-295
lines changed

12 files changed

+911
-295
lines changed

webrtc/src/api/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,9 +157,11 @@ impl API {
157157
transport: Arc<RTCDtlsTransport>,
158158
interceptor: Arc<dyn Interceptor + Send + Sync>,
159159
) -> RTCRtpSender {
160+
let kind = track.as_ref().map(|t| t.kind()).unwrap_or_default();
160161
RTCRtpSender::new(
161162
self.setting_engine.get_receive_mtu(),
162163
track,
164+
kind,
163165
transport,
164166
Arc::clone(&self.media_engine),
165167
interceptor,

webrtc/src/error.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,10 @@ pub enum Error {
199199
#[error("new track must be of the same kind as previous")]
200200
ErrRTPSenderNewTrackHasIncorrectKind,
201201

202+
/// ErrRTPSenderNewTrackHasIncorrectEnvelope indicates that the new track has a different envelope than the previous/original
203+
#[error("new track must have the same envelope as previous")]
204+
ErrRTPSenderNewTrackHasIncorrectEnvelope,
205+
202206
/// ErrRTPSenderDataSent indicates that the sequence number transformer tries to be enabled after the data sending began
203207
#[error("Sequence number transformer must be enabled before sending data")]
204208
ErrRTPSenderDataSent,
@@ -328,6 +332,20 @@ pub enum Error {
328332
},
329333
#[error("Track must not be nil")]
330334
ErrRTPSenderTrackNil,
335+
#[error("Sender has already been stopped")]
336+
ErrRTPSenderStopped,
337+
#[error("Sender Track has been removed or replaced to nil")]
338+
ErrRTPSenderTrackRemoved,
339+
#[error("Sender cannot add encoding as rid is empty")]
340+
ErrRTPSenderRidNil,
341+
#[error("Sender cannot add encoding as there is no base track")]
342+
ErrRTPSenderNoBaseEncoding,
343+
#[error("Sender cannot add encoding as provided track does not match base track")]
344+
ErrRTPSenderBaseEncodingMismatch,
345+
#[error("Sender cannot encoding due to RID collision")]
346+
ErrRTPSenderRIDCollision,
347+
#[error("Sender does not have track for RID")]
348+
ErrRTPSenderNoTrackForRID,
331349
#[error("RTPSender must not be nil")]
332350
ErrRTPSenderNil,
333351
#[error("RTPReceiver must not be nil")]

webrtc/src/peer_connection/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1410,6 +1410,7 @@ impl RTCPeerConnection {
14101410
RTCRtpSender::new(
14111411
receive_mtu,
14121412
None,
1413+
kind,
14131414
Arc::clone(&self.internal.dtls_transport),
14141415
Arc::clone(&self.internal.media_engine),
14151416
Arc::clone(&self.interceptor),
@@ -1608,7 +1609,10 @@ impl RTCPeerConnection {
16081609
let current_transceivers = self.internal.rtp_transceivers.lock().await;
16091610
for transceiver in &*current_transceivers {
16101611
let sender = transceiver.sender().await;
1611-
if sender.is_negotiated() && !sender.has_sent() {
1612+
if !sender.track_encodings.lock().await.is_empty()
1613+
&& sender.is_negotiated()
1614+
&& !sender.has_sent()
1615+
{
16121616
sender.send(&sender.get_parameters().await).await?;
16131617
}
16141618
}

webrtc/src/peer_connection/peer_connection_internal.rs

Lines changed: 81 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use crate::stats::{
1414
InboundRTPStats, OutboundRTPStats, RTCStatsType, RemoteInboundRTPStats, RemoteOutboundRTPStats,
1515
StatsReportType,
1616
};
17+
use crate::track::track_local::track_local_static_sample::TrackLocalStaticSample;
1718
use crate::track::TrackStream;
1819
use crate::SDP_ATTRIBUTE_RID;
1920

@@ -442,44 +443,60 @@ impl PeerConnectionInternal {
442443
.map(|value| value.direction)
443444
.unwrap_or(RTCRtpTransceiverDirection::Sendrecv);
444445

445-
if direction == RTCRtpTransceiverDirection::Unspecified {
446-
return Err(Error::ErrPeerConnAddTransceiverFromKindSupport);
447-
}
448-
449-
let interceptor = self
450-
.interceptor
451-
.upgrade()
452-
.ok_or(Error::ErrInterceptorNotBind)?;
453-
let receiver = Arc::new(RTCRtpReceiver::new(
454-
self.setting_engine.get_receive_mtu(),
455-
kind,
456-
Arc::clone(&self.dtls_transport),
457-
Arc::clone(&self.media_engine),
458-
Arc::clone(&interceptor),
459-
));
446+
let t = match direction {
447+
RTCRtpTransceiverDirection::Sendonly | RTCRtpTransceiverDirection::Sendrecv => {
448+
let codec = self
449+
.media_engine
450+
.get_codecs_by_kind(kind)
451+
.first()
452+
.map(|c| c.capability.clone())
453+
.ok_or(Error::ErrNoCodecsAvailable)?;
454+
let track = Arc::new(TrackLocalStaticSample::new(
455+
codec,
456+
math_rand_alpha(16),
457+
math_rand_alpha(16),
458+
));
459+
self.new_transceiver_from_track(direction, track).await?
460+
}
461+
RTCRtpTransceiverDirection::Recvonly => {
462+
let interceptor = self
463+
.interceptor
464+
.upgrade()
465+
.ok_or(Error::ErrInterceptorNotBind)?;
466+
let receiver = Arc::new(RTCRtpReceiver::new(
467+
self.setting_engine.get_receive_mtu(),
468+
kind,
469+
Arc::clone(&self.dtls_transport),
470+
Arc::clone(&self.media_engine),
471+
Arc::clone(&interceptor),
472+
));
460473

461-
let sender = Arc::new(
462-
RTCRtpSender::new(
463-
self.setting_engine.get_receive_mtu(),
464-
None,
465-
Arc::clone(&self.dtls_transport),
466-
Arc::clone(&self.media_engine),
467-
interceptor,
468-
false,
469-
)
470-
.await,
471-
);
474+
let sender = Arc::new(
475+
RTCRtpSender::new(
476+
self.setting_engine.get_receive_mtu(),
477+
None,
478+
kind,
479+
Arc::clone(&self.dtls_transport),
480+
Arc::clone(&self.media_engine),
481+
interceptor,
482+
false,
483+
)
484+
.await,
485+
);
472486

473-
let t = RTCRtpTransceiver::new(
474-
receiver,
475-
sender,
476-
direction,
477-
kind,
478-
vec![],
479-
Arc::clone(&self.media_engine),
480-
Some(Box::new(self.make_negotiation_needed_trigger())),
481-
)
482-
.await;
487+
RTCRtpTransceiver::new(
488+
receiver,
489+
sender,
490+
direction,
491+
kind,
492+
vec![],
493+
Arc::clone(&self.media_engine),
494+
Some(Box::new(self.make_negotiation_needed_trigger())),
495+
)
496+
.await
497+
}
498+
_ => return Err(Error::ErrPeerConnAddTransceiverFromKindSupport),
499+
};
483500

484501
self.add_rtp_transceiver(Arc::clone(&t)).await;
485502

@@ -512,6 +529,7 @@ impl PeerConnectionInternal {
512529
RTCRtpSender::new(
513530
self.setting_engine.get_receive_mtu(),
514531
Some(Arc::clone(&track)),
532+
track.kind(),
515533
Arc::clone(&self.dtls_transport),
516534
Arc::clone(&self.media_engine),
517535
Arc::clone(&interceptor),
@@ -858,6 +876,8 @@ impl PeerConnectionInternal {
858876
let only_media_section = &remote_description.media_descriptions[0];
859877
let mut stream_id = "";
860878
let mut id = "";
879+
let mut has_rid = false;
880+
let mut has_ssrc = false;
861881

862882
for a in &only_media_section.attributes {
863883
match a.key.as_str() {
@@ -870,12 +890,18 @@ impl PeerConnectionInternal {
870890
}
871891
}
872892
}
873-
ATTR_KEY_SSRC => return Err(Error::ErrPeerConnSingleMediaSectionHasExplicitSSRC),
874-
SDP_ATTRIBUTE_RID => return Ok(false),
893+
ATTR_KEY_SSRC => has_ssrc = true,
894+
SDP_ATTRIBUTE_RID => has_rid = true,
875895
_ => {}
876896
};
877897
}
878898

899+
if has_rid {
900+
return Ok(false);
901+
} else if has_ssrc {
902+
return Err(Error::ErrPeerConnSingleMediaSectionHasExplicitSSRC);
903+
}
904+
879905
let mut incoming = TrackDetails {
880906
ssrcs: vec![ssrc],
881907
kind: RTPCodecType::Video,
@@ -1343,32 +1369,29 @@ impl PeerConnectionInternal {
13431369
}
13441370
let mut track_infos = vec![];
13451371
for transceiver in transceivers {
1346-
let sender = transceiver.sender().await;
1347-
13481372
let mid = match transceiver.mid() {
13491373
Some(mid) => mid,
13501374
None => continue,
13511375
};
13521376

1353-
let track = match sender.track().await {
1354-
Some(track) => track,
1355-
None => continue,
1356-
};
1357-
1358-
let track_id = track.id().to_string();
1359-
let kind = match track.kind() {
1360-
RTPCodecType::Unspecified => continue,
1361-
RTPCodecType::Audio => "audio",
1362-
RTPCodecType::Video => "video",
1363-
};
1377+
let sender = transceiver.sender().await;
1378+
let track_encodings = sender.track_encodings.lock().await;
1379+
for encoding in track_encodings.iter() {
1380+
let track_id = encoding.track.id().to_string();
1381+
let kind = match encoding.track.kind() {
1382+
RTPCodecType::Unspecified => continue,
1383+
RTPCodecType::Audio => "audio",
1384+
RTPCodecType::Video => "video",
1385+
};
13641386

1365-
track_infos.push(TrackInfo {
1366-
track_id,
1367-
ssrc: sender.ssrc,
1368-
mid,
1369-
rid: None,
1370-
kind,
1371-
});
1387+
track_infos.push(TrackInfo {
1388+
track_id,
1389+
ssrc: encoding.ssrc,
1390+
mid: mid.to_owned(),
1391+
rid: encoding.track.rid().map(Into::into),
1392+
kind,
1393+
});
1394+
}
13721395
}
13731396

13741397
let stream_stats = self

webrtc/src/peer_connection/peer_connection_test.rs

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use crate::ice_transport::ice_server::RTCIceServer;
1919
use crate::peer_connection::configuration::RTCConfiguration;
2020
use crate::rtp_transceiver::rtp_codec::RTCRtpCodecCapability;
2121
use crate::stats::StatsReportType;
22+
use crate::track::track_local::track_local_static_rtp::TrackLocalStaticRTP;
2223
use crate::track::track_local::track_local_static_sample::TrackLocalStaticSample;
2324
use crate::Error;
2425

@@ -513,3 +514,127 @@ async fn peer() -> Result<()> {
513514

514515
Ok(())
515516
}
517+
518+
pub(crate) fn on_connected() -> (OnPeerConnectionStateChangeHdlrFn, mpsc::Receiver<()>) {
519+
let (done_tx, done_rx) = mpsc::channel::<()>(1);
520+
let done_tx = Arc::new(Mutex::new(Some(done_tx)));
521+
let hdlr_fn: OnPeerConnectionStateChangeHdlrFn =
522+
Box::new(move |state: RTCPeerConnectionState| {
523+
let done_tx_clone = Arc::clone(&done_tx);
524+
Box::pin(async move {
525+
if state == RTCPeerConnectionState::Connected {
526+
let mut tx = done_tx_clone.lock().await;
527+
tx.take();
528+
}
529+
})
530+
});
531+
(hdlr_fn, done_rx)
532+
}
533+
534+
// Everytime we receive a new SSRC we probe it and try to determine the proper way to handle it.
535+
// In most cases a Track explicitly declares a SSRC and a OnTrack is fired. In two cases we don't
536+
// know the SSRC ahead of time
537+
// * Undeclared SSRC in a single media section
538+
// * Simulcast
539+
//
540+
// The Undeclared SSRC processing code would run before Simulcast. If a Simulcast Offer/Answer only
541+
// contained one Media Section we would never fire the OnTrack. We would assume it was a failed
542+
// Undeclared SSRC processing. This test asserts that we properly handled this.
543+
#[tokio::test]
544+
async fn test_peer_connection_simulcast_no_data_channel() -> Result<()> {
545+
let mut m = MediaEngine::default();
546+
for ext in [
547+
::sdp::extmap::SDES_MID_URI,
548+
::sdp::extmap::SDES_RTP_STREAM_ID_URI,
549+
] {
550+
m.register_header_extension(
551+
RTCRtpHeaderExtensionCapability {
552+
uri: ext.to_owned(),
553+
},
554+
RTPCodecType::Video,
555+
None,
556+
)?;
557+
}
558+
m.register_default_codecs()?;
559+
let api = APIBuilder::new().with_media_engine(m).build();
560+
561+
let (mut pc_send, mut pc_recv) = new_pair(&api).await?;
562+
let (send_notifier, mut send_connected) = on_connected();
563+
let (recv_notifier, mut recv_connected) = on_connected();
564+
pc_send.on_peer_connection_state_change(send_notifier);
565+
pc_recv.on_peer_connection_state_change(recv_notifier);
566+
let (track_tx, mut track_rx) = mpsc::unbounded_channel();
567+
pc_recv.on_track(Box::new(move |t, _, _| {
568+
let rid = t.rid().to_owned();
569+
let _ = track_tx.send(rid);
570+
Box::pin(async move {})
571+
}));
572+
573+
let id = "video";
574+
let stream_id = "webrtc-rs";
575+
let track = Arc::new(TrackLocalStaticRTP::new_with_rid(
576+
RTCRtpCodecCapability {
577+
mime_type: MIME_TYPE_VP8.to_owned(),
578+
..Default::default()
579+
},
580+
id.to_owned(),
581+
"a".to_owned(),
582+
stream_id.to_owned(),
583+
));
584+
let track_a = Arc::clone(&track);
585+
let transceiver = pc_send.add_transceiver_from_track(track, None).await?;
586+
let sender = transceiver.sender().await;
587+
588+
let track = Arc::new(TrackLocalStaticRTP::new_with_rid(
589+
RTCRtpCodecCapability {
590+
mime_type: MIME_TYPE_VP8.to_owned(),
591+
..Default::default()
592+
},
593+
id.to_owned(),
594+
"b".to_owned(),
595+
stream_id.to_owned(),
596+
));
597+
let track_b = Arc::clone(&track);
598+
sender.add_encoding(track).await?;
599+
600+
let track = Arc::new(TrackLocalStaticRTP::new_with_rid(
601+
RTCRtpCodecCapability {
602+
mime_type: MIME_TYPE_VP8.to_owned(),
603+
..Default::default()
604+
},
605+
id.to_owned(),
606+
"c".to_owned(),
607+
stream_id.to_owned(),
608+
));
609+
let track_c = Arc::clone(&track);
610+
sender.add_encoding(track).await?;
611+
612+
// signaling
613+
signal_pair(&mut pc_send, &mut pc_recv).await?;
614+
let _ = send_connected.recv().await;
615+
let _ = recv_connected.recv().await;
616+
617+
for sequence_number in [0; 100] {
618+
let pkt = rtp::packet::Packet {
619+
header: rtp::header::Header {
620+
version: 2,
621+
sequence_number,
622+
payload_type: 96,
623+
..Default::default()
624+
},
625+
payload: Bytes::from_static(&[0; 2]),
626+
};
627+
628+
track_a.write_rtp_with_extensions(&pkt, &[]).await?;
629+
track_b.write_rtp_with_extensions(&pkt, &[]).await?;
630+
track_c.write_rtp_with_extensions(&pkt, &[]).await?;
631+
}
632+
633+
assert_eq!(track_rx.recv().await.unwrap(), "a".to_owned());
634+
assert_eq!(track_rx.recv().await.unwrap(), "b".to_owned());
635+
assert_eq!(track_rx.recv().await.unwrap(), "c".to_owned());
636+
637+
close_pair_now(&pc_send, &pc_recv).await;
638+
639+
Ok(())
640+
}

0 commit comments

Comments
 (0)