19
19
import java .util .List ;
20
20
import java .util .concurrent .TimeUnit ;
21
21
22
- import org .openjdk .jmh .annotations .BenchmarkMode ;
23
22
import org .openjdk .jmh .annotations .Benchmark ;
23
+ import org .openjdk .jmh .annotations .BenchmarkMode ;
24
24
import org .openjdk .jmh .annotations .Mode ;
25
25
import org .openjdk .jmh .annotations .OutputTimeUnit ;
26
26
import org .openjdk .jmh .annotations .Param ;
39
39
@ OutputTimeUnit (TimeUnit .SECONDS )
40
40
public class OperatorMergePerf {
41
41
42
- @ State (Scope .Thread )
43
- public static class Input extends InputWithIncrementingInteger {
44
-
45
- @ Param ({ "1" , "1000" })
46
- public int size ;
42
+ // flatMap
43
+ @ Benchmark
44
+ public void oneStreamOfNthatMergesIn1 (final InputMillion input ) throws InterruptedException {
45
+ Observable <Observable <Integer >> os = Observable .range (1 , input .size ).map (new Func1 <Integer , Observable <Integer >>() {
47
46
48
- @ Override
49
- public int getSize ( ) {
50
- return size ;
51
- }
47
+ @ Override
48
+ public Observable < Integer > call ( Integer i ) {
49
+ return Observable . just ( i ) ;
50
+ }
52
51
52
+ });
53
+ LatchedObserver <Integer > o = input .newLatchedObserver ();
54
+ Observable .merge (os ).subscribe (o );
55
+ o .latch .await ();
53
56
}
54
57
58
+ // flatMap
55
59
@ Benchmark
56
- public void merge1SyncStreamOfN (final Input input ) throws InterruptedException {
60
+ public void merge1SyncStreamOfN (final InputMillion input ) throws InterruptedException {
57
61
Observable <Observable <Integer >> os = Observable .just (1 ).map (new Func1 <Integer , Observable <Integer >>() {
58
62
59
63
@ Override
@@ -66,9 +70,9 @@ public Observable<Integer> call(Integer i) {
66
70
Observable .merge (os ).subscribe (o );
67
71
o .latch .await ();
68
72
}
69
-
73
+
70
74
@ Benchmark
71
- public void mergeNSyncStreamsOfN (final Input input ) throws InterruptedException {
75
+ public void mergeNSyncStreamsOfN (final InputThousand input ) throws InterruptedException {
72
76
Observable <Observable <Integer >> os = input .observable .map (new Func1 <Integer , Observable <Integer >>() {
73
77
74
78
@ Override
@@ -83,7 +87,7 @@ public Observable<Integer> call(Integer i) {
83
87
}
84
88
85
89
@ Benchmark
86
- public void mergeNAsyncStreamsOfN (final Input input ) throws InterruptedException {
90
+ public void mergeNAsyncStreamsOfN (final InputThousand input ) throws InterruptedException {
87
91
Observable <Observable <Integer >> os = input .observable .map (new Func1 <Integer , Observable <Integer >>() {
88
92
89
93
@ Override
@@ -98,7 +102,7 @@ public Observable<Integer> call(Integer i) {
98
102
}
99
103
100
104
@ Benchmark
101
- public void mergeTwoAsyncStreamsOfN (final Input input ) throws InterruptedException {
105
+ public void mergeTwoAsyncStreamsOfN (final InputThousand input ) throws InterruptedException {
102
106
LatchedObserver <Integer > o = input .newLatchedObserver ();
103
107
Observable <Integer > ob = Observable .range (0 , input .size ).subscribeOn (Schedulers .computation ());
104
108
Observable .merge (ob , ob ).subscribe (o );
@@ -115,6 +119,7 @@ public void mergeNSyncStreamsOf1(final InputForMergeN input) throws InterruptedE
115
119
@ State (Scope .Thread )
116
120
public static class InputForMergeN {
117
121
@ Param ({ "1" , "100" , "1000" })
122
+ // @Param({ "1000" })
118
123
public int size ;
119
124
120
125
private Blackhole bh ;
@@ -134,4 +139,31 @@ public LatchedObserver<Integer> newLatchedObserver() {
134
139
}
135
140
}
136
141
142
+ @ State (Scope .Thread )
143
+ public static class InputMillion extends InputWithIncrementingInteger {
144
+
145
+ @ Param ({ "1" , "1000" , "1000000" })
146
+ // @Param({ "1000" })
147
+ public int size ;
148
+
149
+ @ Override
150
+ public int getSize () {
151
+ return size ;
152
+ }
153
+
154
+ }
155
+
156
+ @ State (Scope .Thread )
157
+ public static class InputThousand extends InputWithIncrementingInteger {
158
+
159
+ @ Param ({ "1" , "1000" })
160
+ // @Param({ "1000" })
161
+ public int size ;
162
+
163
+ @ Override
164
+ public int getSize () {
165
+ return size ;
166
+ }
167
+
168
+ }
137
169
}
0 commit comments