Skip to content

Commit 8d26c73

Browse files
committed
Polishing
This commit polishes previous one by also accepting generic types explicitly declared with a class that extends DataBuffer allowing to write Flux<DefaultDataBuffer> for example. Issue: SPR-14952
1 parent a98be03 commit 8d26c73

File tree

14 files changed

+77
-56
lines changed

14 files changed

+77
-56
lines changed

spring-test/src/main/java/org/springframework/mock/http/server/reactive/MockServerHttpResponse.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,9 @@ public class MockServerHttpResponse implements ServerHttpResponse {
5050

5151
private final MultiValueMap<String, ResponseCookie> cookies = new LinkedMultiValueMap<>();
5252

53-
private Publisher<DataBuffer> body;
53+
private Publisher<? extends DataBuffer> body;
5454

55-
private Publisher<? extends Publisher<DataBuffer>> bodyWithFlushes;
55+
private Publisher<? extends Publisher<? extends DataBuffer>> bodyWithFlushes;
5656

5757
private DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
5858

@@ -77,22 +77,22 @@ public MultiValueMap<String, ResponseCookie> getCookies() {
7777
return this.cookies;
7878
}
7979

80-
public Publisher<DataBuffer> getBody() {
80+
public Publisher<? extends DataBuffer> getBody() {
8181
return this.body;
8282
}
8383

84-
public Publisher<? extends Publisher<DataBuffer>> getBodyWithFlush() {
84+
public Publisher<? extends Publisher<? extends DataBuffer>> getBodyWithFlush() {
8585
return this.bodyWithFlushes;
8686
}
8787

8888
@Override
89-
public Mono<Void> writeWith(Publisher<DataBuffer> body) {
89+
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
9090
this.body = body;
9191
return Flux.from(this.body).then();
9292
}
9393

9494
@Override
95-
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<DataBuffer>> body) {
95+
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
9696
this.bodyWithFlushes = body;
9797
return Flux.from(this.bodyWithFlushes).then();
9898
}

spring-web/src/main/java/org/springframework/http/ReactiveHttpOutputMessage.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public interface ReactiveHttpOutputMessage extends HttpMessage {
5555
* @param body the body content publisher
5656
* @return a {@link Mono} that indicates completion or error
5757
*/
58-
Mono<Void> writeWith(Publisher<DataBuffer> body);
58+
Mono<Void> writeWith(Publisher<? extends DataBuffer> body);
5959

6060
/**
6161
* Use the given {@link Publisher} of {@code Publishers} to write the body of the
@@ -64,7 +64,7 @@ public interface ReactiveHttpOutputMessage extends HttpMessage {
6464
* @param body the body content publisher
6565
* @return a {@link Mono} that indicates completion or error
6666
*/
67-
Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<DataBuffer>> body);
67+
Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body);
6868

6969
/**
7070
* Indicate that message handling is complete, allowing for any cleanup or

spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,20 +74,20 @@ public URI getURI() {
7474
}
7575

7676
@Override
77-
public Mono<Void> writeWith(Publisher<DataBuffer> body) {
77+
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
7878
return applyBeforeCommit().then(this.httpRequest
7979
.send(Flux.from(body).map(NettyDataBufferFactory::toByteBuf)));
8080
}
8181

8282
@Override
83-
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<DataBuffer>> body) {
83+
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
8484
Publisher<Publisher<ByteBuf>> byteBufs = Flux.from(body).
8585
map(ReactorClientHttpRequest::toByteBufs);
8686
return applyBeforeCommit().then(this.httpRequest
8787
.sendGroups(byteBufs));
8888
}
8989

90-
private static Publisher<ByteBuf> toByteBufs(Publisher<DataBuffer> dataBuffers) {
90+
private static Publisher<ByteBuf> toByteBufs(Publisher<? extends DataBuffer> dataBuffers) {
9191
return Flux.from(dataBuffers).
9292
map(NettyDataBufferFactory::toByteBuf);
9393
}

spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerServerHttpResponse.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,14 @@ public AbstractListenerServerHttpResponse(DataBufferFactory dataBufferFactory) {
4343

4444

4545
@Override
46-
protected final Mono<Void> writeWithInternal(Publisher<DataBuffer> body) {
46+
protected final Mono<Void> writeWithInternal(Publisher<? extends DataBuffer> body) {
4747
return writeAndFlushWithInternal(Mono.just(body));
4848
}
4949

5050
@Override
51-
protected final Mono<Void> writeAndFlushWithInternal(Publisher<? extends Publisher<DataBuffer>> body) {
51+
protected final Mono<Void> writeAndFlushWithInternal(Publisher<? extends Publisher<? extends DataBuffer>> body) {
5252
if (this.writeCalled.compareAndSet(false, true)) {
53-
Processor<Publisher<DataBuffer>, Void> bodyProcessor = createBodyFlushProcessor();
53+
Processor<? super Publisher<? extends DataBuffer>, Void> bodyProcessor = createBodyFlushProcessor();
5454
return Mono.from(subscriber -> {
5555
body.subscribe(bodyProcessor);
5656
bodyProcessor.subscribe(subscriber);
@@ -67,6 +67,6 @@ protected final Mono<Void> writeAndFlushWithInternal(Publisher<? extends Publish
6767
* that will write the response body with flushes to the underlying output. Called from
6868
* {@link #writeAndFlushWithInternal(Publisher)}.
6969
*/
70-
protected abstract Processor<Publisher<DataBuffer>, Void> createBodyFlushProcessor();
70+
protected abstract Processor<? super Publisher<? extends DataBuffer>, Void> createBodyFlushProcessor();
7171

7272
}

spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyFlushProcessor.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
* @see UndertowHttpHandlerAdapter
4242
* @see ServerHttpResponse#writeAndFlushWith(Publisher)
4343
*/
44-
abstract class AbstractResponseBodyFlushProcessor implements Processor<Publisher<DataBuffer>, Void> {
44+
abstract class AbstractResponseBodyFlushProcessor implements Processor<Publisher<? extends DataBuffer>, Void> {
4545

4646
protected final Log logger = LogFactory.getLog(getClass());
4747

@@ -65,7 +65,7 @@ public final void onSubscribe(Subscription subscription) {
6565
}
6666

6767
@Override
68-
public final void onNext(Publisher<DataBuffer> publisher) {
68+
public final void onNext(Publisher<? extends DataBuffer> publisher) {
6969
if (logger.isTraceEnabled()) {
7070
logger.trace(this.state + " onNext: " + publisher);
7171
}
@@ -100,7 +100,7 @@ public final void subscribe(Subscriber<? super Void> subscriber) {
100100
/**
101101
* Creates a new processor for subscribing to a body chunk.
102102
*/
103-
protected abstract Processor<DataBuffer, Void> createBodyProcessor();
103+
protected abstract Processor<? super DataBuffer, Void> createBodyProcessor();
104104

105105
/**
106106
* Flushes the output.
@@ -144,9 +144,9 @@ public void onSubscribe(AbstractResponseBodyFlushProcessor processor, Subscripti
144144
REQUESTED {
145145

146146
@Override
147-
public void onNext(AbstractResponseBodyFlushProcessor processor, Publisher<DataBuffer> chunk) {
147+
public void onNext(AbstractResponseBodyFlushProcessor processor, Publisher<? extends DataBuffer> chunk) {
148148
if (processor.changeState(this, RECEIVED)) {
149-
Processor<DataBuffer, Void> chunkProcessor = processor.createBodyProcessor();
149+
Processor<? super DataBuffer, Void> chunkProcessor = processor.createBodyProcessor();
150150
chunk.subscribe(chunkProcessor);
151151
chunkProcessor.subscribe(new WriteSubscriber(processor));
152152
}
@@ -192,7 +192,7 @@ public void onComplete(AbstractResponseBodyFlushProcessor processor) {
192192

193193
@Override
194194
public void onNext(AbstractResponseBodyFlushProcessor processor,
195-
Publisher<DataBuffer> publisher) {
195+
Publisher<? extends DataBuffer> publisher) {
196196
// ignore
197197

198198
}
@@ -212,7 +212,7 @@ public void onSubscribe(AbstractResponseBodyFlushProcessor processor, Subscripti
212212
subscription.cancel();
213213
}
214214

215-
public void onNext(AbstractResponseBodyFlushProcessor processor, Publisher<DataBuffer> publisher) {
215+
public void onNext(AbstractResponseBodyFlushProcessor processor, Publisher<? extends DataBuffer> publisher) {
216216
throw new IllegalStateException(toString());
217217
}
218218

spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,13 +125,13 @@ public void beforeCommit(Supplier<? extends Mono<Void>> action) {
125125
}
126126

127127
@Override
128-
public final Mono<Void> writeWith(Publisher<DataBuffer> body) {
128+
public final Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
129129
return new ChannelSendOperator<>(body,
130130
writePublisher -> doCommit(() -> writeWithInternal(writePublisher)));
131131
}
132132

133133
@Override
134-
public final Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<DataBuffer>> body) {
134+
public final Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
135135
return new ChannelSendOperator<>(body,
136136
writePublisher -> doCommit(() -> writeAndFlushWithInternal(writePublisher)));
137137
}
@@ -186,14 +186,14 @@ protected Mono<Void> doCommit(Supplier<? extends Mono<Void>> writeAction) {
186186
* Implement this method to write to the underlying the response.
187187
* @param body the publisher to write with
188188
*/
189-
protected abstract Mono<Void> writeWithInternal(Publisher<DataBuffer> body);
189+
protected abstract Mono<Void> writeWithInternal(Publisher<? extends DataBuffer> body);
190190

191191
/**
192192
* Implement this method to write to the underlying the response, and flush after
193193
* each {@code Publisher<DataBuffer>}.
194194
* @param body the publisher to write and flush with
195195
*/
196-
protected abstract Mono<Void> writeAndFlushWithInternal(Publisher<? extends Publisher<DataBuffer>> body);
196+
protected abstract Mono<Void> writeAndFlushWithInternal(Publisher<? extends Publisher<? extends DataBuffer>> body);
197197

198198
/**
199199
* Implement this method to write the status code to the underlying response.

spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,13 +70,13 @@ protected void applyStatusCode() {
7070
}
7171

7272
@Override
73-
protected Mono<Void> writeWithInternal(Publisher<DataBuffer> publisher) {
73+
protected Mono<Void> writeWithInternal(Publisher<? extends DataBuffer> publisher) {
7474
Publisher<ByteBuf> body = toByteBufs(publisher);
7575
return this.response.send(body);
7676
}
7777

7878
@Override
79-
protected Mono<Void> writeAndFlushWithInternal(Publisher<? extends Publisher<DataBuffer>> publisher) {
79+
protected Mono<Void> writeAndFlushWithInternal(Publisher<? extends Publisher<? extends DataBuffer>> publisher) {
8080
Publisher<Publisher<ByteBuf>> body = Flux.from(publisher)
8181
.map(ReactorServerHttpResponse::toByteBufs);
8282
return this.response.sendGroups(body);
@@ -117,7 +117,7 @@ public Mono<Void> writeWith(File file, long position, long count) {
117117
return doCommit(() -> this.response.sendFile(file, position, count));
118118
}
119119

120-
private static Publisher<ByteBuf> toByteBufs(Publisher<DataBuffer> dataBuffers) {
120+
private static Publisher<ByteBuf> toByteBufs(Publisher<? extends DataBuffer> dataBuffers) {
121121
return Flux.from(dataBuffers).map(NettyDataBufferFactory::toByteBuf);
122122
}
123123

spring-web/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ protected void applyStatusCode() {
7171
}
7272

7373
@Override
74-
protected Mono<Void> writeWithInternal(Publisher<DataBuffer> body) {
74+
protected Mono<Void> writeWithInternal(Publisher<? extends DataBuffer> body) {
7575
Observable<ByteBuf> content = RxReactiveStreams.toObservable(body)
7676
.map(NettyDataBufferFactory::toByteBuf);
7777
return Flux.from(RxReactiveStreams.toPublisher(this.response.write(content)))
@@ -80,7 +80,7 @@ protected Mono<Void> writeWithInternal(Publisher<DataBuffer> body) {
8080

8181
@Override
8282
protected Mono<Void> writeAndFlushWithInternal(
83-
Publisher<? extends Publisher<DataBuffer>> body) {
83+
Publisher<? extends Publisher<? extends DataBuffer>> body) {
8484
Flux<ByteBuf> bodyWithFlushSignals = Flux.from(body).
8585
flatMap(publisher -> Flux.from(publisher).
8686
map(NettyDataBufferFactory::toByteBuf).

spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ protected void applyCookies() {
122122
}
123123

124124
@Override
125-
protected Processor<Publisher<DataBuffer>, Void> createBodyFlushProcessor() {
125+
protected Processor<? super Publisher<? extends DataBuffer>, Void> createBodyFlushProcessor() {
126126
ResponseBodyFlushProcessor processor = new ResponseBodyFlushProcessor();
127127
this.bodyFlushProcessor = processor;
128128
return processor;
@@ -261,7 +261,7 @@ public void onError(Throwable ex) {
261261
private class ResponseBodyFlushProcessor extends AbstractResponseBodyFlushProcessor {
262262

263263
@Override
264-
protected Processor<DataBuffer, Void> createBodyProcessor() {
264+
protected Processor<? super DataBuffer, Void> createBodyProcessor() {
265265
try {
266266
bodyProcessor = new ResponseBodyProcessor(outputStream(), bufferSize);
267267
return bodyProcessor;

spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import io.undertow.server.handlers.CookieImpl;
3030
import io.undertow.util.HttpString;
3131
import org.reactivestreams.Processor;
32+
import org.reactivestreams.Publisher;
3233
import org.xnio.ChannelListener;
3334
import org.xnio.channels.StreamSinkChannel;
3435
import reactor.core.publisher.Mono;
@@ -123,7 +124,7 @@ protected void applyCookies() {
123124
}
124125

125126
@Override
126-
protected AbstractResponseBodyFlushProcessor createBodyFlushProcessor() {
127+
protected Processor<? super Publisher<? extends DataBuffer>, Void> createBodyFlushProcessor() {
127128
return new ResponseBodyFlushProcessor();
128129
}
129130

@@ -209,7 +210,7 @@ public void handleEvent(StreamSinkChannel channel) {
209210
private class ResponseBodyFlushProcessor extends AbstractResponseBodyFlushProcessor {
210211

211212
@Override
212-
protected Processor<DataBuffer, Void> createBodyProcessor() {
213+
protected Processor<? super DataBuffer, Void> createBodyProcessor() {
213214
return UndertowServerHttpResponse.this.createBodyProcessor();
214215
}
215216

spring-web/src/test/java/org/springframework/http/codec/ServerSentEventHttpMessageWriterTests.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public void encodeServerSentEvent() {
7272
messageWriter.write(source, ResolvableType.forClass(ServerSentEvent.class),
7373
new MediaType("text", "event-stream"), outputMessage, Collections.emptyMap());
7474

75-
Publisher<? extends Publisher<DataBuffer>> result = Flux.from(outputMessage.getBodyWithFlush());
75+
Publisher<? extends Publisher<? extends DataBuffer>> result = Flux.from(outputMessage.getBodyWithFlush());
7676
StepVerifier.create(result)
7777
.consumeNextWith(sseConsumer("id:c42\n" + "event:foo\n" + "retry:123\n" +
7878
":bla\n:bla bla\n:bla bla bla\n" + "data:bar\n"))
@@ -87,7 +87,7 @@ public void encodeString() {
8787
messageWriter.write(source, ResolvableType.forClass(String.class),
8888
new MediaType("text", "event-stream"), outputMessage, Collections.emptyMap());
8989

90-
Publisher<? extends Publisher<DataBuffer>> result = outputMessage.getBodyWithFlush();
90+
Publisher<? extends Publisher<? extends DataBuffer>> result = outputMessage.getBodyWithFlush();
9191
StepVerifier.create(result)
9292
.consumeNextWith(sseConsumer("data:foo\n"))
9393
.consumeNextWith(sseConsumer("data:bar\n"))
@@ -102,7 +102,7 @@ public void encodeMultiLineString() {
102102
messageWriter.write(source, ResolvableType.forClass(String.class),
103103
new MediaType("text", "event-stream"), outputMessage, Collections.emptyMap());
104104

105-
Publisher<? extends Publisher<DataBuffer>> result = outputMessage.getBodyWithFlush();
105+
Publisher<? extends Publisher<? extends DataBuffer>> result = outputMessage.getBodyWithFlush();
106106
StepVerifier.create(result)
107107
.consumeNextWith(sseConsumer("data:foo\ndata:bar\n"))
108108
.consumeNextWith(sseConsumer("data:foo\ndata:baz\n"))
@@ -118,7 +118,7 @@ public void encodePojo() {
118118
messageWriter.write(source, ResolvableType.forClass(Pojo.class),
119119
new MediaType("text", "event-stream"), outputMessage, Collections.emptyMap());
120120

121-
Publisher<? extends Publisher<DataBuffer>> result = outputMessage.getBodyWithFlush();
121+
Publisher<? extends Publisher<? extends DataBuffer>> result = outputMessage.getBodyWithFlush();
122122
StepVerifier.create(result)
123123
.consumeNextWith(sseConsumer("data:", "{\"foo\":\"foofoo\",\"bar\":\"barbar\"}", "\n"))
124124
.consumeNextWith(sseConsumer("data:", "{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}", "\n"))
@@ -127,7 +127,7 @@ public void encodePojo() {
127127
}
128128

129129

130-
private Consumer<Publisher<DataBuffer>> sseConsumer(String... expected) {
130+
private Consumer<Publisher<? extends DataBuffer>> sseConsumer(String... expected) {
131131
return publisher -> {
132132
StepVerifier.Step<DataBuffer> builder = StepVerifier.create(publisher);
133133
for (String value : expected) {

spring-web/src/test/java/org/springframework/http/server/reactive/ServerHttpResponseTests.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import reactor.core.publisher.Mono;
2828

2929
import org.springframework.core.io.buffer.DataBuffer;
30+
import org.springframework.core.io.buffer.DefaultDataBuffer;
3031
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
3132
import org.springframework.http.ResponseCookie;
3233

@@ -56,6 +57,20 @@ public void writeWith() throws Exception {
5657
assertEquals("c", new String(response.body.get(2).asByteBuffer().array(), StandardCharsets.UTF_8));
5758
}
5859

60+
@Test // SPR-14952
61+
public void writeAndFlushWithFluxOfDefaultDataBuffer() throws Exception {
62+
TestServerHttpResponse response = new TestServerHttpResponse();
63+
Flux<Flux<DefaultDataBuffer>> flux = Flux.just(Flux.just(wrap("foo")));
64+
response.writeAndFlushWith(flux).block();
65+
66+
assertTrue(response.statusCodeWritten);
67+
assertTrue(response.headersWritten);
68+
assertTrue(response.cookiesWritten);
69+
70+
assertEquals(1, response.body.size());
71+
assertEquals("foo", new String(response.body.get(0).asByteBuffer().array(), StandardCharsets.UTF_8));
72+
}
73+
5974
@Test
6075
public void writeWithError() throws Exception {
6176
TestServerHttpResponse response = new TestServerHttpResponse();
@@ -119,7 +134,7 @@ public void beforeCommitActionWithSetComplete() throws Exception {
119134

120135

121136

122-
private DataBuffer wrap(String a) {
137+
private DefaultDataBuffer wrap(String a) {
123138
return new DefaultDataBufferFactory().wrap(ByteBuffer.wrap(a.getBytes(StandardCharsets.UTF_8)));
124139
}
125140

@@ -157,7 +172,7 @@ protected void applyCookies() {
157172
}
158173

159174
@Override
160-
protected Mono<Void> writeWithInternal(Publisher<DataBuffer> body) {
175+
protected Mono<Void> writeWithInternal(Publisher<? extends DataBuffer> body) {
161176
return Flux.from(body).map(b -> {
162177
this.body.add(b);
163178
return b;
@@ -166,8 +181,13 @@ protected Mono<Void> writeWithInternal(Publisher<DataBuffer> body) {
166181

167182
@Override
168183
protected Mono<Void> writeAndFlushWithInternal(
169-
Publisher<? extends Publisher<DataBuffer>> body) {
170-
return Mono.error(new UnsupportedOperationException());
184+
Publisher<? extends Publisher<? extends DataBuffer>> bodyWithFlush) {
185+
return Flux.from(bodyWithFlush).flatMap(body ->
186+
Flux.from(body).map(b -> {
187+
this.body.add(b);
188+
return b;
189+
})
190+
).then();
171191
}
172192
}
173193

0 commit comments

Comments
 (0)