Skip to content

Commit e563326

Browse files
committed
Polish ReactiveAdapterRegistry
Issue: SPR-14902
1 parent 2e7d16d commit e563326

File tree

2 files changed

+89
-87
lines changed

2 files changed

+89
-87
lines changed

spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java

Lines changed: 72 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -67,99 +67,81 @@ public ReactiveAdapterRegistry() {
6767

6868
// Flux and Mono ahead of Publisher...
6969

70-
registerMonoAdapter(Mono.class,
71-
source -> (Mono<?>) source, source -> source,
72-
ReactiveTypeDescriptor.singleOptionalValue(Mono.class));
70+
registerReactiveType(
71+
ReactiveTypeDescriptor.singleOptionalValue(Mono.class),
72+
source -> (Mono<?>) source,
73+
source -> source
74+
);
7375

74-
registerFluxAdapter(Flux.class,
75-
source -> (Flux<?>) source, source -> source);
76+
registerReactiveType(ReactiveTypeDescriptor.multiValue(Flux.class),
77+
source -> (Flux<?>) source,
78+
source -> source);
7679

77-
registerFluxAdapter(Publisher.class,
78-
source -> Flux.from((Publisher<?>) source), source -> source);
80+
registerReactiveType(ReactiveTypeDescriptor.multiValue(Publisher.class),
81+
source -> Flux.from((Publisher<?>) source),
82+
source -> source);
7983

80-
registerMonoAdapter(CompletableFuture.class,
81-
source -> Mono.fromFuture((CompletableFuture<?>) source), Mono::toFuture,
82-
ReactiveTypeDescriptor.singleOptionalValue(CompletableFuture.class)
84+
registerReactiveType(
85+
ReactiveTypeDescriptor.singleOptionalValue(CompletableFuture.class),
86+
source -> Mono.fromFuture((CompletableFuture<?>) source),
87+
source -> Mono.from(source).toFuture()
8388
);
8489

8590
if (rxJava1Present && rxJava1Adapter) {
86-
new RxJava1AdapterRegistrar().register(this);
91+
new RxJava1Registrar().registerAdapters(this);
8792
}
8893
if (rxJava2Present) {
89-
new RxJava2AdapterRegistrar().register(this);
94+
new RxJava2Registrar().registerAdapters(this);
9095
}
9196
}
9297

9398

9499
/**
95-
* Register an adapter for adapting to and from a {@link Mono}.
96-
* <p>The provided functions can assume that input will never be {@code null}
97-
* and also that any {@link Optional} wrapper is unwrapped.
100+
* Register a reactive type along with functions to adapt to and from a
101+
* Reactive Streams {@link Publisher}. The functions can assume their
102+
* input is never be {@code null} nor {@link Optional}.
98103
*/
99-
public void registerMonoAdapter(Class<?> reactiveType, Function<Object, Mono<?>> toAdapter,
100-
Function<Mono<?>, Object> fromAdapter, ReactiveTypeDescriptor descriptor) {
104+
public void registerReactiveType(ReactiveTypeDescriptor descriptor,
105+
Function<Object, Publisher<?>> toAdapter, Function<Publisher<?>, Object> fromAdapter) {
101106

102-
this.adapters.add(new MonoReactiveAdapter(toAdapter, fromAdapter, descriptor));
103-
}
107+
ReactiveAdapter adapter = (descriptor.isMultiValue() ?
108+
new FluxReactiveAdapter(toAdapter, fromAdapter, descriptor) :
109+
new MonoReactiveAdapter(toAdapter, fromAdapter, descriptor));
104110

105-
/**
106-
* Register an adapter for adapting to and from a {@link Flux}.
107-
* <p>The provided functions can assume that input will never be {@code null}
108-
* and also that any {@link Optional} wrapper is unwrapped.
109-
*/
110-
public void registerFluxAdapter(Class<?> reactiveType, Function<Object, Flux<?>> toAdapter,
111-
Function<Flux<?>, Object> fromAdapter) {
112-
113-
this.adapters.add(new FluxReactiveAdapter(toAdapter, fromAdapter,
114-
ReactiveTypeDescriptor.multiValue(reactiveType)));
111+
this.adapters.add(adapter);
115112
}
116113

117-
118114
/**
119-
* Get the adapter for the given reactive type to adapt from.
115+
* Get the adapter to use to adapt from the given reactive type.
120116
*/
121117
public ReactiveAdapter getAdapterFrom(Class<?> reactiveType) {
122118
return getAdapterFrom(reactiveType, null);
123119
}
124120

125121
/**
126-
* Get the adapter for the given reactive type to adapt from.
127-
* If the instance is not {@code null} its actual type is used to check.
122+
* Get the adapter to use to adapt from the given reactive type. Or if the
123+
* "source" object is not {@code null} its actual type is used instead.
128124
*/
129-
public ReactiveAdapter getAdapterFrom(Class<?> reactiveType, Object adaptee) {
130-
Class<?> actualType = getActualType(reactiveType, adaptee);
131-
return getAdapterInternal(supportedType -> supportedType.isAssignableFrom(actualType));
125+
public ReactiveAdapter getAdapterFrom(Class<?> reactiveType, Object source) {
126+
source = unwrapOptional(source);
127+
Class<?> clazz = (source != null ? source.getClass() : reactiveType);
128+
return getAdapter(type -> type.isAssignableFrom(clazz));
132129
}
133130

134131
/**
135132
* Get the adapter for the given reactive type to adapt to.
136133
*/
137134
public ReactiveAdapter getAdapterTo(Class<?> reactiveType) {
138-
return getAdapterTo(reactiveType, null);
139-
}
140-
141-
/**
142-
* Get the adapter for the given reactive type to adapt to.
143-
* If the instance is not {@code null} its actual type is used to check.
144-
*/
145-
public ReactiveAdapter getAdapterTo(Class<?> reactiveType, Object adaptee) {
146-
Class<?> actualType = getActualType(reactiveType, adaptee);
147-
return getAdapterInternal(supportedType -> supportedType.equals(actualType));
135+
return getAdapter(reactiveType::equals);
148136
}
149137

150-
private ReactiveAdapter getAdapterInternal(Predicate<Class<?>> predicate) {
138+
private ReactiveAdapter getAdapter(Predicate<Class<?>> predicate) {
151139
return this.adapters.stream()
152140
.filter(adapter -> predicate.test(adapter.getDescriptor().getReactiveType()))
153141
.findFirst()
154142
.orElse(null);
155143
}
156144

157-
158-
private static Class<?> getActualType(Class<?> reactiveType, Object adaptee) {
159-
adaptee = unwrapOptional(adaptee);
160-
return (adaptee != null ? adaptee.getClass() : reactiveType);
161-
}
162-
163145
private static Object unwrapOptional(Object value) {
164146
return (value instanceof Optional ? ((Optional<?>) value).orElse(null) : value);
165147
}
@@ -168,14 +150,14 @@ private static Object unwrapOptional(Object value) {
168150
@SuppressWarnings("unchecked")
169151
private static class MonoReactiveAdapter implements ReactiveAdapter {
170152

171-
private final Function<Object, Mono<?>> toAdapter;
153+
private final Function<Object, Publisher<?>> toAdapter;
172154

173-
private final Function<Mono<?>, Object> fromAdapter;
155+
private final Function<Publisher<?>, Object> fromAdapter;
174156

175157
private final ReactiveTypeDescriptor descriptor;
176158

177159

178-
MonoReactiveAdapter(Function<Object, Mono<?>> to, Function<Mono<?>, Object> from,
160+
MonoReactiveAdapter(Function<Object, Publisher<?>> to, Function<Publisher<?>, Object> from,
179161
ReactiveTypeDescriptor descriptor) {
180162

181163
this.toAdapter = to;
@@ -194,7 +176,7 @@ public <T> Mono<T> toMono(Object source) {
194176
if (source == null) {
195177
return Mono.empty();
196178
}
197-
return (Mono<T>) this.toAdapter.apply(source);
179+
return (Mono<T>) Mono.from(this.toAdapter.apply(source));
198180
}
199181

200182
@Override
@@ -203,7 +185,7 @@ public <T> Flux<T> toFlux(Object source) {
203185
if (source == null) {
204186
return Flux.empty();
205187
}
206-
return (Flux<T>) this.toMono(source).flux();
188+
return (Flux<T>) toMono(source).flux();
207189
}
208190

209191
@Override
@@ -213,21 +195,21 @@ public <T> Publisher<T> toPublisher(Object source) {
213195

214196
@Override
215197
public Object fromPublisher(Publisher<?> source) {
216-
return (source != null ? this.fromAdapter.apply((Mono<?>) source) : null);
198+
return (source != null ? this.fromAdapter.apply(source) : null);
217199
}
218200
}
219201

220202
@SuppressWarnings("unchecked")
221203
private static class FluxReactiveAdapter implements ReactiveAdapter {
222204

223-
private final Function<Object, Flux<?>> toAdapter;
205+
private final Function<Object, Publisher<?>> toAdapter;
224206

225-
private final Function<Flux<?>, Object> fromAdapter;
207+
private final Function<Publisher<?>, Object> fromAdapter;
226208

227209
private final ReactiveTypeDescriptor descriptor;
228210

229211

230-
FluxReactiveAdapter(Function<Object, Flux<?>> to, Function<Flux<?>, Object> from,
212+
FluxReactiveAdapter(Function<Object, Publisher<?>> to, Function<Publisher<?>, Object> from,
231213
ReactiveTypeDescriptor descriptor) {
232214

233215
this.descriptor = descriptor;
@@ -246,7 +228,7 @@ public <T> Mono<T> toMono(Object source) {
246228
if (source == null) {
247229
return Mono.empty();
248230
}
249-
return (Mono<T>) this.toAdapter.apply(source).next();
231+
return (Mono<T>) toFlux(source).next();
250232
}
251233

252234
@Override
@@ -255,7 +237,7 @@ public <T> Flux<T> toFlux(Object source) {
255237
if (source == null) {
256238
return Flux.empty();
257239
}
258-
return (Flux<T>) this.toAdapter.apply(source);
240+
return (Flux<T>) Flux.from(this.toAdapter.apply(source));
259241
}
260242

261243
@Override
@@ -265,56 +247,59 @@ public <T> Publisher<T> toPublisher(Object source) {
265247

266248
@Override
267249
public Object fromPublisher(Publisher<?> source) {
268-
return (source != null ? this.fromAdapter.apply((Flux<?>) source) : null);
250+
return (source != null ? this.fromAdapter.apply(source) : null);
269251
}
270252
}
271253

272254

273-
private static class RxJava1AdapterRegistrar {
255+
private static class RxJava1Registrar {
274256

275-
public void register(ReactiveAdapterRegistry registry) {
276-
registry.registerFluxAdapter(rx.Observable.class,
257+
public void registerAdapters(ReactiveAdapterRegistry registry) {
258+
registry.registerReactiveType(
259+
ReactiveTypeDescriptor.multiValue(rx.Observable.class),
277260
source -> Flux.from(RxReactiveStreams.toPublisher((rx.Observable<?>) source)),
278261
RxReactiveStreams::toObservable
279262
);
280-
registry.registerMonoAdapter(rx.Single.class,
263+
registry.registerReactiveType(
264+
ReactiveTypeDescriptor.singleRequiredValue(rx.Single.class),
281265
source -> Mono.from(RxReactiveStreams.toPublisher((rx.Single<?>) source)),
282-
RxReactiveStreams::toSingle,
283-
ReactiveTypeDescriptor.singleRequiredValue(rx.Single.class)
266+
RxReactiveStreams::toSingle
284267
);
285-
registry.registerMonoAdapter(rx.Completable.class,
268+
registry.registerReactiveType(
269+
ReactiveTypeDescriptor.noValue(rx.Completable.class),
286270
source -> Mono.from(RxReactiveStreams.toPublisher((rx.Completable) source)),
287-
RxReactiveStreams::toCompletable,
288-
ReactiveTypeDescriptor.noValue(rx.Completable.class)
271+
RxReactiveStreams::toCompletable
289272
);
290273
}
291274
}
292275

293-
private static class RxJava2AdapterRegistrar {
276+
private static class RxJava2Registrar {
294277

295-
public void register(ReactiveAdapterRegistry registry) {
296-
registry.registerFluxAdapter(Flowable.class,
278+
public void registerAdapters(ReactiveAdapterRegistry registry) {
279+
registry.registerReactiveType(
280+
ReactiveTypeDescriptor.multiValue(Flowable.class),
297281
source -> Flux.from((Flowable<?>) source),
298282
source-> Flowable.fromPublisher(source)
299283
);
300-
registry.registerFluxAdapter(io.reactivex.Observable.class,
284+
registry.registerReactiveType(
285+
ReactiveTypeDescriptor.multiValue(io.reactivex.Observable.class),
301286
source -> Flux.from(((io.reactivex.Observable<?>) source).toFlowable(BackpressureStrategy.BUFFER)),
302287
source -> Flowable.fromPublisher(source).toObservable()
303288
);
304-
registry.registerMonoAdapter(io.reactivex.Single.class,
289+
registry.registerReactiveType(
290+
ReactiveTypeDescriptor.singleRequiredValue(io.reactivex.Single.class),
305291
source -> Mono.from(((io.reactivex.Single<?>) source).toFlowable()),
306-
source -> Flowable.fromPublisher(source).toObservable().singleElement().toSingle(),
307-
ReactiveTypeDescriptor.singleRequiredValue(io.reactivex.Single.class)
292+
source -> Flowable.fromPublisher(source).toObservable().singleElement().toSingle()
308293
);
309-
registry.registerMonoAdapter(Maybe.class,
294+
registry.registerReactiveType(
295+
ReactiveTypeDescriptor.singleOptionalValue(Maybe.class),
310296
source -> Mono.from(((Maybe<?>) source).toFlowable()),
311-
source -> Flowable.fromPublisher(source).toObservable().singleElement(),
312-
ReactiveTypeDescriptor.singleOptionalValue(Maybe.class)
297+
source -> Flowable.fromPublisher(source).toObservable().singleElement()
313298
);
314-
registry.registerMonoAdapter(io.reactivex.Completable.class,
299+
registry.registerReactiveType(
300+
ReactiveTypeDescriptor.noValue(io.reactivex.Completable.class),
315301
source -> Mono.from(((io.reactivex.Completable) source).toFlowable()),
316-
source -> Flowable.fromPublisher(source).toObservable().ignoreElements(),
317-
ReactiveTypeDescriptor.noValue(io.reactivex.Completable.class)
302+
source -> Flowable.fromPublisher(source).toObservable().ignoreElements()
318303
);
319304
}
320305
}

spring-core/src/main/java/org/springframework/core/ReactiveTypeDescriptor.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,23 @@ public boolean isNoValue() {
8585
}
8686

8787

88+
@Override
89+
public boolean equals(Object other) {
90+
if (this == other) {
91+
return true;
92+
}
93+
if (other == null || getClass() != other.getClass()) {
94+
return false;
95+
}
96+
return this.reactiveType.equals(((ReactiveTypeDescriptor) other).reactiveType);
97+
}
98+
99+
@Override
100+
public int hashCode() {
101+
return this.reactiveType.hashCode();
102+
}
103+
104+
88105
/**
89106
* Descriptor for a reactive type that can produce 0..N values.
90107
*/

0 commit comments

Comments
 (0)