Skip to content

Commit cfd6fe6

Browse files
author
Dave Syer
committed
Some tidying of context propagation in Spring Integration
The client recv/send events have no meaning in a messaging context, so we don't need to send them. Also added a new protected method as the inverse of populatePropergatedContext. Rebased on master and fixed conflicts. See spring-cloudgh-15
1 parent 7da26be commit cfd6fe6

File tree

2 files changed

+34
-55
lines changed

2 files changed

+34
-55
lines changed

spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/integration/TraceContextPropagationChannelInterceptor.java

Lines changed: 33 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,6 @@
2929

3030
import org.springframework.aop.support.AopUtils;
3131
import org.springframework.cloud.sleuth.Span;
32-
import org.springframework.cloud.sleuth.event.ClientReceivedEvent;
33-
import org.springframework.cloud.sleuth.event.ClientSentEvent;
34-
import org.springframework.context.ApplicationEvent;
35-
import org.springframework.context.ApplicationEventPublisher;
36-
import org.springframework.context.ApplicationEventPublisherAware;
3732
import org.springframework.integration.channel.DirectChannel;
3833
import org.springframework.messaging.Message;
3934
import org.springframework.messaging.MessageChannel;
@@ -44,40 +39,31 @@
4439
import org.springframework.util.Assert;
4540

4641
/**
47-
* The {@link ExecutorChannelInterceptor} implementation responsible for
48-
* the {@link Span} propagation from one message flow's thread to another
49-
* through the {@link MessageChannel}s involved in the flow.
42+
* The {@link ExecutorChannelInterceptor} implementation responsible for the {@link Span}
43+
* propagation from one message flow's thread to another through the
44+
* {@link MessageChannel}s involved in the flow.
5045
* <p>
51-
* In addition this interceptor cleans up (restores) the {@link Span}
52-
* in the containers Threads for channels like
53-
* {@link org.springframework.integration.channel.ExecutorChannel}
54-
* and {@link org.springframework.integration.channel.QueueChannel}.
46+
* In addition this interceptor cleans up (restores) the {@link Span} in the containers
47+
* Threads for channels like
48+
* {@link org.springframework.integration.channel.ExecutorChannel} and
49+
* {@link org.springframework.integration.channel.QueueChannel}.
5550
* @author Spencer Gibb
5651
* @since 1.0
5752
*/
58-
public class TraceContextPropagationChannelInterceptor
59-
extends ChannelInterceptorAdapter implements ExecutorChannelInterceptor, ApplicationEventPublisherAware {
53+
public class TraceContextPropagationChannelInterceptor extends ChannelInterceptorAdapter
54+
implements ExecutorChannelInterceptor {
6055

6156
private final static ThreadLocal<Span> ORIGINAL_CONTEXT = new ThreadLocal<>();
6257

63-
private ApplicationEventPublisher publisher;
64-
65-
@Override
66-
public void setApplicationEventPublisher(ApplicationEventPublisher publisher) {
67-
this.publisher = publisher;
68-
}
69-
7058
@Override
7159
public final Message<?> preSend(Message<?> message, MessageChannel channel) {
7260
if (DirectChannel.class.isAssignableFrom(AopUtils.getTargetClass(channel))) {
7361
return message;
7462
}
7563

76-
//TODO: start span from headers?
7764
Span span = getCurrentSpan();
7865

7966
if (span != null) {
80-
publish(new ClientSentEvent(this, span));
8167
return new MessageWithSpan(message, span);
8268
}
8369
else {
@@ -86,63 +72,59 @@ public final Message<?> preSend(Message<?> message, MessageChannel channel) {
8672
}
8773

8874
@Override
89-
@SuppressWarnings("unchecked")
9075
public final Message<?> postReceive(Message<?> message, MessageChannel channel) {
9176
if (message instanceof MessageWithSpan) {
9277
MessageWithSpan messageWithSpan = (MessageWithSpan) message;
9378
Message<?> messageToHandle = messageWithSpan.message;
9479
populatePropagatedContext(messageWithSpan.span, messageToHandle, channel);
9580

96-
publish(new ClientReceivedEvent(this, messageWithSpan.span));
9781
return message;
9882
}
9983
return message;
10084
}
10185

10286
@Override
103-
public void afterMessageHandled(Message<?> message, MessageChannel channel, MessageHandler handler, Exception ex) {
104-
Span originalContext = ORIGINAL_CONTEXT.get();
105-
try {
106-
if (originalContext == null) {
107-
setCurrentSpan(null);
108-
ORIGINAL_CONTEXT.remove();
109-
}
110-
else {
111-
setCurrentSpan(originalContext);
112-
}
113-
}
114-
catch (Throwable t) {//NOSONAR
115-
setCurrentSpan(null);
116-
}
87+
public void afterMessageHandled(Message<?> message, MessageChannel channel,
88+
MessageHandler handler, Exception ex) {
89+
resetPropagatedContext();
11790
}
11891

11992
@Override
120-
public final Message<?> beforeHandle(Message<?> message, MessageChannel channel, MessageHandler handler) {
93+
public final Message<?> beforeHandle(Message<?> message, MessageChannel channel,
94+
MessageHandler handler) {
12195
return postReceive(message, channel);
12296
}
12397

124-
private void publish(ApplicationEvent event) {
125-
if (this.publisher !=null) {
126-
this.publisher.publishEvent(event);
127-
}
128-
}
129-
13098
private String getParentId(Span span) {
13199
return span.getParents() != null && !span.getParents().isEmpty() ? span
132100
.getParents().get(0) : null;
133101
}
134102

135103
protected void populatePropagatedContext(Span span, Message<?> message,
136-
MessageChannel channel) {
104+
MessageChannel channel) {
137105
if (span != null) {
138106
Span currentContext = getCurrentSpan();
139-
140107
ORIGINAL_CONTEXT.set(currentContext);
141-
142108
setCurrentSpan(span);
143109
}
144110
}
145111

112+
protected void resetPropagatedContext() {
113+
Span originalContext = ORIGINAL_CONTEXT.get();
114+
try {
115+
if (originalContext == null) {
116+
setCurrentSpan(null);
117+
ORIGINAL_CONTEXT.remove();
118+
}
119+
else {
120+
setCurrentSpan(originalContext);
121+
}
122+
}
123+
catch (Throwable t) {// NOSONAR
124+
setCurrentSpan(null);
125+
}
126+
}
127+
146128
private class MessageWithSpan implements Message<Object> {
147129

148130
private final Message<?> message;
@@ -192,11 +174,8 @@ public MessageHeaders getHeaders() {
192174

193175
@Override
194176
public String toString() {
195-
return "MessageWithThreadState{" +
196-
"message=" + message +
197-
", span=" + span +
198-
", messageHeaders=" + messageHeaders +
199-
'}';
177+
return "MessageWithThreadState{" + "message=" + this.message + ", span="
178+
+ this.span + ", messageHeaders=" + this.messageHeaders + '}';
200179
}
201180

202181
}

spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/integration/TraceSpringIntegrationAutoConfiguration.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public class TraceSpringIntegrationAutoConfiguration {
3131

3232
@Bean
3333
@GlobalChannelInterceptor
34-
@ConditionalOnProperty(value = "spring.cloud.sleuth.instrument.integration.globalChannelInterceptor.enabled", matchIfMissing = true)
34+
@ConditionalOnProperty(value = "spring.sleuth.integration.enabled", matchIfMissing = true)
3535
public TraceContextPropagationChannelInterceptor traceContextPropagationChannelInterceptor() {
3636
return new TraceContextPropagationChannelInterceptor();
3737
}

0 commit comments

Comments
 (0)