Skip to content

Commit 04ed093

Browse files
Add SignalR client log for stream item binding failure (#60857)
1 parent bc231d9 commit 04ed093

File tree

7 files changed

+105
-7
lines changed

7 files changed

+105
-7
lines changed

src/SignalR/clients/csharp/Client.Core/src/HubConnection.Log.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,5 +337,8 @@ public static void ErrorHandshakeTimedOut(ILogger logger, TimeSpan handshakeTime
337337

338338
[LoggerMessage(93, LogLevel.Debug, "HubProtocol '{Protocol} v{Version}' does not support Stateful Reconnect. Disabling the feature.", EventName = "DisablingReconnect")]
339339
public static partial void DisablingReconnect(ILogger logger, string protocol, int version);
340+
341+
[LoggerMessage(94, LogLevel.Error, "Failed to bind argument received in stream '{StreamId}'.", EventName = "StreamBindingFailure")]
342+
public static partial void StreamBindingFailure(ILogger logger, string? streamId, Exception exception);
340343
}
341344
}

src/SignalR/clients/csharp/Client.Core/src/HubConnection.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1307,6 +1307,10 @@ private async Task SendWithLock(ConnectionState expectedConnectionState, HubMess
13071307
await SendWithLock(connectionState, CompletionMessage.WithError(bindingFailure.InvocationId, "Client failed to parse argument(s)."), cancellationToken: default).ConfigureAwait(false);
13081308
}
13091309
break;
1310+
case StreamBindingFailureMessage bindingFailure:
1311+
// The server can't receive a response, so we just drop the message and log
1312+
Log.StreamBindingFailure(_logger, bindingFailure.Id, bindingFailure.BindingFailure.SourceException);
1313+
break;
13101314
case InvocationMessage invocation:
13111315
Log.ReceivedInvocation(_logger, invocation.InvocationId, invocation.Target, invocation.Arguments);
13121316
await invocationMessageWriter.WriteAsync(invocation).ConfigureAwait(false);

src/SignalR/clients/csharp/Client/test/UnitTests/HubConnectionTests.Protocol.cs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,38 @@ public async Task StreamYieldsItemsAsTheyArrive()
409409
}
410410
}
411411

412+
[Fact]
413+
public async Task StreamBindingErrorLogsError()
414+
{
415+
using (StartVerifiableLog(expectedErrorsFilter: w => w.EventId.Name == "StreamBindingFailure"))
416+
{
417+
var connection = new TestConnection();
418+
var hubConnection = CreateHubConnection(connection, loggerFactory: LoggerFactory);
419+
try
420+
{
421+
await hubConnection.StartAsync().DefaultTimeout();
422+
423+
var channel = await hubConnection.StreamAsChannelAsync<string>("Foo").DefaultTimeout();
424+
425+
// Expects string, send int
426+
await connection.ReceiveJsonMessage(new { invocationId = "1", type = 2, item = 1 }).DefaultTimeout();
427+
// Check that connection is still active, i.e. we ignore stream failures and keep things working.
428+
await connection.ReceiveJsonMessage(new { invocationId = "1", type = 2, item = "1" }).DefaultTimeout();
429+
await connection.ReceiveJsonMessage(new { invocationId = "1", type = 3 }).DefaultTimeout();
430+
431+
var notifications = await channel.ReadAndCollectAllAsync().DefaultTimeout();
432+
433+
Assert.Contains(TestSink.Writes, w => w.EventId.Name == "StreamBindingFailure");
434+
Assert.Equal(["1"], notifications.ToArray());
435+
}
436+
finally
437+
{
438+
await hubConnection.DisposeAsync().DefaultTimeout();
439+
await connection.DisposeAsync().DefaultTimeout();
440+
}
441+
}
442+
}
443+
412444
[Fact]
413445
public async Task HandlerRegisteredWithOnIsFiredWhenInvocationReceived()
414446
{

src/SignalR/clients/java/signalr/core/src/main/java/com/microsoft/signalr/GsonHubProtocol.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,13 +92,26 @@ public List<HubMessage> parseMessages(ByteBuffer payload, InvocationBinder binde
9292
error = reader.nextString();
9393
break;
9494
case "result":
95-
case "item":
9695
if (invocationId == null || binder.getReturnType(invocationId) == null) {
9796
resultToken = JsonParser.parseReader(reader);
9897
} else {
9998
result = gson.fromJson(reader, binder.getReturnType(invocationId));
10099
}
101100
break;
101+
case "item":
102+
if (invocationId == null || binder.getReturnType(invocationId) == null) {
103+
resultToken = JsonParser.parseReader(reader);
104+
} else {
105+
try {
106+
result = gson.fromJson(reader, binder.getReturnType(invocationId));
107+
} catch (Exception ex) {
108+
argumentBindingException = ex;
109+
// Since we failed to parse the value, tell the reader to skip the failed item
110+
// so it can successfully continue reading
111+
reader.skipValue();
112+
}
113+
}
114+
break;
102115
case "arguments":
103116
if (target != null) {
104117
boolean startedArray = false;
@@ -167,9 +180,17 @@ public List<HubMessage> parseMessages(ByteBuffer payload, InvocationBinder binde
167180
case STREAM_ITEM:
168181
if (resultToken != null) {
169182
Type returnType = binder.getReturnType(invocationId);
170-
result = gson.fromJson(resultToken, returnType != null ? returnType : Object.class);
183+
try {
184+
result = gson.fromJson(resultToken, returnType != null ? returnType : Object.class);
185+
} catch (Exception ex) {
186+
argumentBindingException = ex;
187+
}
188+
}
189+
if (argumentBindingException != null) {
190+
hubMessages.add(new StreamBindingFailureMessage(invocationId, argumentBindingException));
191+
} else {
192+
hubMessages.add(new StreamItem(null, invocationId, result));
171193
}
172-
hubMessages.add(new StreamItem(null, invocationId, result));
173194
break;
174195
case STREAM_INVOCATION:
175196
case CANCEL_INVOCATION:

src/SignalR/clients/java/signalr/core/src/main/java/com/microsoft/signalr/HubConnection.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -483,6 +483,11 @@ private void ReceiveLoop(ByteBuffer payload)
483483
null, "Client failed to parse argument(s)."));
484484
}
485485
break;
486+
case STREAM_BINDING_FAILURE:
487+
// The server can't receive a response, so we just drop the message and log
488+
StreamBindingFailureMessage streamError = (StreamBindingFailureMessage)message;
489+
logger.error("Failed to bind argument received in stream '{}'.", streamError.getInvocationId(), streamError.getException());
490+
break;
486491
case INVOCATION:
487492
InvocationMessage invocationMessage = (InvocationMessage) message;
488493
connectionState.dispatchInvocation(invocationMessage);

src/SignalR/clients/java/signalr/test/build.gradle

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,10 @@ dependencies {
1010
implementation 'org.junit.jupiter:junit-jupiter-params:5.11.2'
1111
testImplementation 'org.junit.jupiter:junit-jupiter:5.11.2'
1212
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.11.2'
13-
implementation 'com.google.code.gson:gson:2.8.5'
13+
implementation 'com.google.code.gson:gson:2.8.9'
1414
implementation 'ch.qos.logback:logback-classic:1.2.3'
1515
implementation project(':core')
1616
implementation project(':messagepack')
17-
implementation project(':messagepack')
1817
antJUnit 'org.apache.ant:ant-junit:1.10.15'
1918
}
2019

src/SignalR/clients/java/signalr/test/src/main/java/com/microsoft/signalr/HubConnectionTest.java

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,11 @@
33

44
package com.microsoft.signalr;
55

6-
import static org.junit.jupiter.api.Assertions.*;
6+
import static org.junit.jupiter.api.Assertions.assertEquals;
7+
import static org.junit.jupiter.api.Assertions.assertFalse;
8+
import static org.junit.jupiter.api.Assertions.assertNull;
9+
import static org.junit.jupiter.api.Assertions.assertThrows;
10+
import static org.junit.jupiter.api.Assertions.assertTrue;
711

812
import java.lang.reflect.Type;
913
import java.nio.ByteBuffer;
@@ -17,7 +21,6 @@
1721
import java.util.concurrent.atomic.AtomicReference;
1822

1923
import org.junit.jupiter.api.Test;
20-
import org.junit.jupiter.api.Disabled;
2124
import org.junit.jupiter.api.extension.ExtendWith;
2225

2326
import ch.qos.logback.classic.spi.ILoggingEvent;
@@ -1097,6 +1100,37 @@ public void checkStreamCompletionError() {
10971100
assertEquals("There was an error", exception.getMessage());
10981101
}
10991102

1103+
@Test
1104+
public void checkStreamItemBindingFailure() {
1105+
try (TestLogger logger = new TestLogger()) {
1106+
MockTransport mockTransport = new MockTransport();
1107+
HubConnection hubConnection = TestUtils.createHubConnection("http://example.com", mockTransport);
1108+
1109+
hubConnection.start().timeout(30, TimeUnit.SECONDS).blockingAwait();
1110+
1111+
AtomicBoolean onNextCalled = new AtomicBoolean();
1112+
Observable<Integer> result = hubConnection.stream(Integer.class, "echo", "message");
1113+
result.subscribe((item) -> onNextCalled.set(true),
1114+
(error) -> {},
1115+
() -> {});
1116+
1117+
assertEquals("{\"type\":4,\"invocationId\":\"1\",\"target\":\"echo\",\"arguments\":[\"message\"]}" + RECORD_SEPARATOR,
1118+
TestUtils.byteBufferToString(mockTransport.getSentMessages()[1]));
1119+
assertFalse(onNextCalled.get());
1120+
1121+
mockTransport.receiveMessage("{\"type\":2,\"invocationId\":\"1\",\"item\":\"str\"}" + RECORD_SEPARATOR);
1122+
1123+
assertFalse(onNextCalled.get());
1124+
1125+
mockTransport.receiveMessage("{\"type\":3,\"invocationId\":\"1\",\"result\":1}" + RECORD_SEPARATOR);
1126+
1127+
assertEquals(1, result.timeout(30, TimeUnit.SECONDS).blockingFirst());
1128+
1129+
ILoggingEvent log = logger.assertLog("Failed to bind argument received in stream '1'.");
1130+
assertTrue(log.getThrowableProxy().getClassName().contains("gson.JsonSyntaxException"));
1131+
}
1132+
}
1133+
11001134
@Test
11011135
public void checkStreamCompletionErrorWithMessagePack() {
11021136
MockTransport mockTransport = new MockTransport();

0 commit comments

Comments
 (0)