17
17
18
18
import java .util .Queue ;
19
19
import java .util .concurrent .ConcurrentLinkedQueue ;
20
+ import java .util .concurrent .atomic .AtomicBoolean ;
20
21
import java .util .concurrent .atomic .AtomicLong ;
21
22
22
23
import rx .Observable .Operator ;
23
24
import rx .Producer ;
24
25
import rx .Subscriber ;
26
+ import rx .exceptions .MissingBackpressureException ;
27
+ import rx .functions .Action0 ;
25
28
26
29
public class OperatorOnBackpressureBuffer <T > implements Operator <T , T > {
27
30
28
31
private final NotificationLite <T > on = NotificationLite .instance ();
29
32
33
+ private final Long capacity ;
34
+ private final Action0 onOverflow ;
35
+
36
+ public OperatorOnBackpressureBuffer () {
37
+ this .capacity = null ;
38
+ this .onOverflow = null ;
39
+ }
40
+
41
+ public OperatorOnBackpressureBuffer (long capacity ) {
42
+ this (capacity , null );
43
+ }
44
+
45
+ public OperatorOnBackpressureBuffer (long capacity , Action0 onOverflow ) {
46
+ if (capacity <= 0 ) {
47
+ throw new IllegalArgumentException ("Buffer capacity must be > 0" );
48
+ }
49
+ this .capacity = capacity ;
50
+ this .onOverflow = onOverflow ;
51
+ }
52
+
30
53
@ Override
31
54
public Subscriber <? super T > call (final Subscriber <? super T > child ) {
32
55
// TODO get a different queue implementation
33
- // TODO start with size hint
34
56
final ConcurrentLinkedQueue <Object > queue = new ConcurrentLinkedQueue <Object >();
57
+ final AtomicLong capacity = (this .capacity == null ) ? null : new AtomicLong (this .capacity );
35
58
final AtomicLong wip = new AtomicLong ();
36
59
final AtomicLong requested = new AtomicLong ();
37
60
@@ -40,37 +63,71 @@ public Subscriber<? super T> call(final Subscriber<? super T> child) {
40
63
@ Override
41
64
public void request (long n ) {
42
65
if (requested .getAndAdd (n ) == 0 ) {
43
- pollQueue (wip , requested , queue , child );
66
+ pollQueue (wip , requested , capacity , queue , child );
44
67
}
45
68
}
46
69
47
70
});
48
71
// don't pass through subscriber as we are async and doing queue draining
49
72
// a parent being unsubscribed should not affect the children
50
73
Subscriber <T > parent = new Subscriber <T >() {
74
+
75
+ private AtomicBoolean saturated = new AtomicBoolean (false );
76
+
51
77
@ Override
52
78
public void onStart () {
53
79
request (Long .MAX_VALUE );
54
80
}
55
81
56
82
@ Override
57
83
public void onCompleted () {
58
- queue .offer (on .completed ());
59
- pollQueue (wip , requested , queue , child );
84
+ if (!saturated .get ()) {
85
+ queue .offer (on .completed ());
86
+ pollQueue (wip , requested , capacity , queue , child );
87
+ }
60
88
}
61
89
62
90
@ Override
63
91
public void onError (Throwable e ) {
64
- queue .offer (on .error (e ));
65
- pollQueue (wip , requested , queue , child );
92
+ if (!saturated .get ()) {
93
+ queue .offer (on .error (e ));
94
+ pollQueue (wip , requested , capacity , queue , child );
95
+ }
66
96
}
67
97
68
98
@ Override
69
99
public void onNext (T t ) {
100
+ if (!assertCapacity ()) {
101
+ return ;
102
+ }
70
103
queue .offer (on .next (t ));
71
- pollQueue (wip , requested , queue , child );
104
+ pollQueue (wip , requested , capacity , queue , child );
72
105
}
73
106
107
+ private boolean assertCapacity () {
108
+ if (capacity == null ) {
109
+ return true ;
110
+ }
111
+
112
+ long currCapacity ;
113
+ do {
114
+ currCapacity = capacity .get ();
115
+ if (currCapacity <= 0 ) {
116
+ if (saturated .compareAndSet (false , true )) {
117
+ unsubscribe ();
118
+ child .onError (new MissingBackpressureException (
119
+ "Overflowed buffer of "
120
+ + OperatorOnBackpressureBuffer .this .capacity ));
121
+ if (onOverflow != null ) {
122
+ onOverflow .call ();
123
+ }
124
+ }
125
+ return false ;
126
+ }
127
+ // ensure no other thread stole our slot, or retry
128
+ } while (!capacity .compareAndSet (currCapacity , currCapacity - 1 ));
129
+ return true ;
130
+ }
74
131
};
75
132
76
133
// if child unsubscribes it should unsubscribe the parent, but not the other way around
@@ -79,7 +136,7 @@ public void onNext(T t) {
79
136
return parent ;
80
137
}
81
138
82
- private void pollQueue (AtomicLong wip , AtomicLong requested , Queue <Object > queue , Subscriber <? super T > child ) {
139
+ private void pollQueue (AtomicLong wip , AtomicLong requested , AtomicLong capacity , Queue <Object > queue , Subscriber <? super T > child ) {
83
140
// TODO can we do this without putting everything in the queue first so we can fast-path the case when we don't need to queue?
84
141
if (requested .get () > 0 ) {
85
142
// only one draining at a time
@@ -96,6 +153,9 @@ private void pollQueue(AtomicLong wip, AtomicLong requested, Queue<Object> queue
96
153
requested .incrementAndGet ();
97
154
return ;
98
155
}
156
+ if (capacity != null ) { // it's bounded
157
+ capacity .incrementAndGet ();
158
+ }
99
159
on .accept (child , o );
100
160
} else {
101
161
// we hit the end ... so increment back to 0 again
0 commit comments