Skip to content

Commit b8012e9

Browse files
robashtonrainliu
authored andcommitted
Reduce the lifetime of the guard on the rtp read channel, convert a Mutex into a RwLock
1 parent dd5666c commit b8012e9

File tree

1 file changed

+64
-58
lines changed
  • src/rtp_transceiver/rtp_receiver

1 file changed

+64
-58
lines changed

src/rtp_transceiver/rtp_receiver/mod.rs

Lines changed: 64 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@ use crate::track::{TrackStream, TrackStreams};
1919
use interceptor::stream_info::RTPHeaderExtension;
2020
use interceptor::{Attributes, Interceptor};
2121
use std::sync::Arc;
22-
use tokio::sync::{mpsc, Mutex, Notify};
22+
use tokio::sync::{mpsc, Mutex, Notify, RwLock};
2323

2424
pub struct RTPReceiverInternal {
2525
pub(crate) kind: RTPCodecType,
26-
tracks: Mutex<Vec<TrackStreams>>,
26+
tracks: RwLock<Vec<TrackStreams>>,
2727
closed_rx: Arc<Notify>,
2828
received_rx: Mutex<mpsc::Receiver<()>>,
2929

@@ -37,65 +37,71 @@ pub struct RTPReceiverInternal {
3737
impl RTPReceiverInternal {
3838
/// read reads incoming RTCP for this RTPReceiver
3939
async fn read(&self, b: &mut [u8]) -> Result<(usize, Attributes)> {
40-
let mut received_rx = self.received_rx.lock().await;
41-
42-
tokio::select! {
43-
_ = received_rx.recv() =>{
44-
let tracks = self.tracks.lock().await;
45-
if let Some(t) = tracks.first(){
46-
if let Some(rtcp_interceptor) = &t.stream.rtcp_interceptor{
47-
let a = Attributes::new();
48-
tokio::select! {
49-
_ = self.closed_rx.notified() => {
50-
Err(Error::ErrClosedPipe)
51-
}
52-
result = rtcp_interceptor.read(b, &a) => {
53-
Ok(result?)
54-
}
55-
}
56-
}else{
57-
Err(Error::ErrInterceptorNotBind)
58-
}
59-
}else{
60-
Err(Error::ErrExistingTrack)
40+
{
41+
let mut received_rx = self.received_rx.lock().await;
42+
tokio::select! {
43+
_ = received_rx.recv() =>{
44+
// Drop into the below code and don't hold this lock any longer
45+
}
46+
_ = self.closed_rx.notified() => {
47+
return Err(Error::ErrClosedPipe);
6148
}
6249
}
63-
_ = self.closed_rx.notified() => {
64-
Err(Error::ErrClosedPipe)
50+
}
51+
52+
let tracks = self.tracks.read().await;
53+
if let Some(t) = tracks.first() {
54+
if let Some(rtcp_interceptor) = &t.stream.rtcp_interceptor {
55+
let a = Attributes::new();
56+
tokio::select! {
57+
_ = self.closed_rx.notified() => {
58+
Err(Error::ErrClosedPipe)
59+
}
60+
result = rtcp_interceptor.read(b, &a) => {
61+
Ok(result?)
62+
}
63+
}
64+
} else {
65+
Err(Error::ErrInterceptorNotBind)
6566
}
67+
} else {
68+
Err(Error::ErrExistingTrack)
6669
}
6770
}
6871

6972
/// read_simulcast reads incoming RTCP for this RTPReceiver for given rid
7073
async fn read_simulcast(&self, b: &mut [u8], rid: &str) -> Result<(usize, Attributes)> {
71-
let mut received_rx = self.received_rx.lock().await;
72-
73-
tokio::select! {
74-
_ = received_rx.recv() =>{
75-
let tracks = self.tracks.lock().await;
76-
for t in &*tracks{
77-
if t.track.rid() == rid {
78-
if let Some(rtcp_interceptor) = &t.stream.rtcp_interceptor{
79-
let a = Attributes::new();
80-
tokio::select! {
81-
_ = self.closed_rx.notified() => {
82-
return Err(Error::ErrClosedPipe);
83-
}
84-
result = rtcp_interceptor.read(b, &a) => {
85-
return Ok(result?);
86-
}
87-
}
88-
}else{
89-
return Err(Error::ErrInterceptorNotBind);
74+
{
75+
let mut received_rx = self.received_rx.lock().await;
76+
tokio::select! {
77+
_ = received_rx.recv() =>{
78+
// Drop into the below code and don't hold this lock any longer
79+
}
80+
_ = self.closed_rx.notified() => {
81+
return Err(Error::ErrClosedPipe);
82+
}
83+
}
84+
}
85+
86+
let tracks = self.tracks.read().await;
87+
for t in &*tracks {
88+
if t.track.rid() == rid {
89+
if let Some(rtcp_interceptor) = &t.stream.rtcp_interceptor {
90+
let a = Attributes::new();
91+
tokio::select! {
92+
_ = self.closed_rx.notified() => {
93+
return Err(Error::ErrClosedPipe);
94+
}
95+
result = rtcp_interceptor.read(b, &a) => {
96+
return Ok(result?);
9097
}
9198
}
99+
} else {
100+
return Err(Error::ErrInterceptorNotBind);
92101
}
93-
Err(Error::ErrRTPReceiverForRIDTrackStreamNotFound)
94-
}
95-
_ = self.closed_rx.notified() => {
96-
Err(Error::ErrClosedPipe)
97102
}
98103
}
104+
Err(Error::ErrRTPReceiverForRIDTrackStreamNotFound)
99105
}
100106

101107
/// read_rtcp is a convenience method that wraps Read and unmarshal for you.
@@ -138,7 +144,7 @@ impl RTPReceiverInternal {
138144
let mut rtp_interceptor = None;
139145
//let mut ssrc = 0;
140146
{
141-
let tracks = self.tracks.lock().await;
147+
let tracks = self.tracks.read().await;
142148
for t in &*tracks {
143149
if t.track.tid() == tid {
144150
rtp_interceptor = t.stream.rtp_interceptor.clone();
@@ -246,7 +252,7 @@ impl RTCRtpReceiver {
246252
internal: Arc::new(RTPReceiverInternal {
247253
kind,
248254

249-
tracks: Mutex::new(vec![]),
255+
tracks: RwLock::new(vec![]),
250256
transport,
251257
media_engine,
252258
interceptor,
@@ -296,7 +302,7 @@ impl RTCRtpReceiver {
296302
});
297303
}
298304

299-
let mut tracks = self.internal.tracks.lock().await;
305+
let mut tracks = self.internal.tracks.write().await;
300306
for (idx, codec) in params.codecs.iter().enumerate() {
301307
let t = &mut tracks[idx];
302308
if let Some(stream_info) = &mut t.stream.stream_info {
@@ -311,18 +317,18 @@ impl RTCRtpReceiver {
311317

312318
/// track returns the RtpTransceiver TrackRemote
313319
pub async fn track(&self) -> Option<Arc<TrackRemote>> {
314-
let tracks = self.internal.tracks.lock().await;
320+
let tracks = self.internal.tracks.read().await;
315321
if tracks.len() != 1 {
316322
None
317323
} else {
318324
tracks.first().map(|t| Arc::clone(&t.track))
319325
}
320326
}
321327

322-
/// tracks returns the RtpTransceiver tracks
328+
/// tracks returns the RtpTransceiver traclockks
323329
/// A RTPReceiver to support Simulcast may now have multiple tracks
324330
pub async fn tracks(&self) -> Vec<Arc<TrackRemote>> {
325-
let tracks = self.internal.tracks.lock().await;
331+
let tracks = self.internal.tracks.read().await;
326332
tracks.iter().map(|t| Arc::clone(&t.track)).collect()
327333
}
328334

@@ -406,7 +412,7 @@ impl RTCRtpReceiver {
406412
};
407413

408414
{
409-
let mut tracks = self.internal.tracks.lock().await;
415+
let mut tracks = self.internal.tracks.write().await;
410416
tracks.push(t);
411417
};
412418

@@ -517,7 +523,7 @@ impl RTCRtpReceiver {
517523

518524
let mut errs = vec![];
519525
if received_tx_is_none {
520-
let tracks = self.internal.tracks.lock().await;
526+
let tracks = self.internal.tracks.write().await;
521527
for t in &*tracks {
522528
if let Some(rtcp_read_stream) = &t.stream.rtcp_read_stream {
523529
if let Err(err) = rtcp_read_stream.close().await {
@@ -575,7 +581,7 @@ impl RTCRtpReceiver {
575581
params: RTCRtpParameters,
576582
stream: TrackStream,
577583
) -> Result<Arc<TrackRemote>> {
578-
let mut tracks = self.internal.tracks.lock().await;
584+
let mut tracks = self.internal.tracks.write().await;
579585
for t in &mut *tracks {
580586
if t.track.rid() == rid {
581587
t.track.set_kind(self.kind);
@@ -602,7 +608,7 @@ impl RTCRtpReceiver {
602608
rsid: String,
603609
repair_stream: TrackStream,
604610
) -> Result<()> {
605-
let mut tracks = self.internal.tracks.lock().await;
611+
let mut tracks = self.internal.tracks.write().await;
606612
let l = tracks.len();
607613
for t in &mut *tracks {
608614
if (ssrc != 0 && l == 1) || t.track.rid() == rsid {

0 commit comments

Comments
 (0)