Skip to content

Commit 7da26be

Browse files
spencergibbDave Syer
authored andcommitted
Instrument spring integration.
Via a global channel interceptor that adds headers to messages. fixes spring-cloudgh-14
1 parent de60266 commit 7da26be

File tree

5 files changed

+334
-3
lines changed

5 files changed

+334
-3
lines changed

spring-cloud-sleuth-core/pom.xml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,8 @@
4444
<optional>true</optional>
4545
</dependency>
4646
<dependency>
47-
<groupId>org.springframework.integration</groupId>
48-
<artifactId>spring-integration-core</artifactId>
49-
<version>${spring-integration.version}</version>
47+
<groupId>org.springframework.boot</groupId>
48+
<artifactId>spring-boot-starter-integration</artifactId>
5049
<optional>true</optional>
5150
</dependency>
5251
<dependency>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
/*
2+
* Copyright 2013-2015 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.sleuth.instrument.integration;
18+
19+
import static org.springframework.cloud.sleuth.Trace.PARENT_ID_NAME;
20+
import static org.springframework.cloud.sleuth.Trace.PROCESS_ID_NAME;
21+
import static org.springframework.cloud.sleuth.Trace.SPAN_ID_NAME;
22+
import static org.springframework.cloud.sleuth.Trace.SPAN_NAME_NAME;
23+
import static org.springframework.cloud.sleuth.Trace.TRACE_ID_NAME;
24+
import static org.springframework.cloud.sleuth.TraceContextHolder.getCurrentSpan;
25+
import static org.springframework.cloud.sleuth.TraceContextHolder.setCurrentSpan;
26+
27+
import java.util.HashMap;
28+
import java.util.Map;
29+
30+
import org.springframework.aop.support.AopUtils;
31+
import org.springframework.cloud.sleuth.Span;
32+
import org.springframework.cloud.sleuth.event.ClientReceivedEvent;
33+
import org.springframework.cloud.sleuth.event.ClientSentEvent;
34+
import org.springframework.context.ApplicationEvent;
35+
import org.springframework.context.ApplicationEventPublisher;
36+
import org.springframework.context.ApplicationEventPublisherAware;
37+
import org.springframework.integration.channel.DirectChannel;
38+
import org.springframework.messaging.Message;
39+
import org.springframework.messaging.MessageChannel;
40+
import org.springframework.messaging.MessageHandler;
41+
import org.springframework.messaging.MessageHeaders;
42+
import org.springframework.messaging.support.ChannelInterceptorAdapter;
43+
import org.springframework.messaging.support.ExecutorChannelInterceptor;
44+
import org.springframework.util.Assert;
45+
46+
/**
47+
* The {@link ExecutorChannelInterceptor} implementation responsible for
48+
* the {@link Span} propagation from one message flow's thread to another
49+
* through the {@link MessageChannel}s involved in the flow.
50+
* <p>
51+
* In addition this interceptor cleans up (restores) the {@link Span}
52+
* in the containers Threads for channels like
53+
* {@link org.springframework.integration.channel.ExecutorChannel}
54+
* and {@link org.springframework.integration.channel.QueueChannel}.
55+
* @author Spencer Gibb
56+
* @since 1.0
57+
*/
58+
public class TraceContextPropagationChannelInterceptor
59+
extends ChannelInterceptorAdapter implements ExecutorChannelInterceptor, ApplicationEventPublisherAware {
60+
61+
private final static ThreadLocal<Span> ORIGINAL_CONTEXT = new ThreadLocal<>();
62+
63+
private ApplicationEventPublisher publisher;
64+
65+
@Override
66+
public void setApplicationEventPublisher(ApplicationEventPublisher publisher) {
67+
this.publisher = publisher;
68+
}
69+
70+
@Override
71+
public final Message<?> preSend(Message<?> message, MessageChannel channel) {
72+
if (DirectChannel.class.isAssignableFrom(AopUtils.getTargetClass(channel))) {
73+
return message;
74+
}
75+
76+
//TODO: start span from headers?
77+
Span span = getCurrentSpan();
78+
79+
if (span != null) {
80+
publish(new ClientSentEvent(this, span));
81+
return new MessageWithSpan(message, span);
82+
}
83+
else {
84+
return message;
85+
}
86+
}
87+
88+
@Override
89+
@SuppressWarnings("unchecked")
90+
public final Message<?> postReceive(Message<?> message, MessageChannel channel) {
91+
if (message instanceof MessageWithSpan) {
92+
MessageWithSpan messageWithSpan = (MessageWithSpan) message;
93+
Message<?> messageToHandle = messageWithSpan.message;
94+
populatePropagatedContext(messageWithSpan.span, messageToHandle, channel);
95+
96+
publish(new ClientReceivedEvent(this, messageWithSpan.span));
97+
return message;
98+
}
99+
return message;
100+
}
101+
102+
@Override
103+
public void afterMessageHandled(Message<?> message, MessageChannel channel, MessageHandler handler, Exception ex) {
104+
Span originalContext = ORIGINAL_CONTEXT.get();
105+
try {
106+
if (originalContext == null) {
107+
setCurrentSpan(null);
108+
ORIGINAL_CONTEXT.remove();
109+
}
110+
else {
111+
setCurrentSpan(originalContext);
112+
}
113+
}
114+
catch (Throwable t) {//NOSONAR
115+
setCurrentSpan(null);
116+
}
117+
}
118+
119+
@Override
120+
public final Message<?> beforeHandle(Message<?> message, MessageChannel channel, MessageHandler handler) {
121+
return postReceive(message, channel);
122+
}
123+
124+
private void publish(ApplicationEvent event) {
125+
if (this.publisher !=null) {
126+
this.publisher.publishEvent(event);
127+
}
128+
}
129+
130+
private String getParentId(Span span) {
131+
return span.getParents() != null && !span.getParents().isEmpty() ? span
132+
.getParents().get(0) : null;
133+
}
134+
135+
protected void populatePropagatedContext(Span span, Message<?> message,
136+
MessageChannel channel) {
137+
if (span != null) {
138+
Span currentContext = getCurrentSpan();
139+
140+
ORIGINAL_CONTEXT.set(currentContext);
141+
142+
setCurrentSpan(span);
143+
}
144+
}
145+
146+
private class MessageWithSpan implements Message<Object> {
147+
148+
private final Message<?> message;
149+
150+
private final Span span;
151+
152+
private final MessageHeaders messageHeaders;
153+
154+
public MessageWithSpan(Message<?> message, Span span) {
155+
Assert.notNull(message, "message can not be null");
156+
Assert.notNull(span, "span can not be null");
157+
this.message = message;
158+
this.span = span;
159+
160+
Map<String, Object> headers = new HashMap<>();
161+
headers.putAll(message.getHeaders());
162+
163+
setHeader(headers, SPAN_ID_NAME, this.span.getSpanId());
164+
setHeader(headers, TRACE_ID_NAME, this.span.getTraceId());
165+
setHeader(headers, SPAN_NAME_NAME, this.span.getName());
166+
String parentId = getParentId(getCurrentSpan());
167+
if (parentId != null) {
168+
setHeader(headers, PARENT_ID_NAME, parentId);
169+
}
170+
String processId = this.span.getProcessId();
171+
if (processId != null) {
172+
setHeader(headers, PROCESS_ID_NAME, processId);
173+
}
174+
this.messageHeaders = new MessageHeaders(headers);
175+
}
176+
177+
public void setHeader(Map<String, Object> headers, String name, String value) {
178+
if (!headers.containsKey(name)) {
179+
headers.put(name, value);
180+
}
181+
}
182+
183+
@Override
184+
public Object getPayload() {
185+
return this.message.getPayload();
186+
}
187+
188+
@Override
189+
public MessageHeaders getHeaders() {
190+
return this.messageHeaders;
191+
}
192+
193+
@Override
194+
public String toString() {
195+
return "MessageWithThreadState{" +
196+
"message=" + message +
197+
", span=" + span +
198+
", messageHeaders=" + messageHeaders +
199+
'}';
200+
}
201+
202+
}
203+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2013-2015 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.sleuth.instrument.integration;
18+
19+
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
20+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
21+
import org.springframework.context.annotation.Bean;
22+
import org.springframework.context.annotation.Configuration;
23+
import org.springframework.integration.config.GlobalChannelInterceptor;
24+
25+
/**
26+
* @author Spencer Gibb
27+
*/
28+
@Configuration
29+
@ConditionalOnClass(GlobalChannelInterceptor.class)
30+
public class TraceSpringIntegrationAutoConfiguration {
31+
32+
@Bean
33+
@GlobalChannelInterceptor
34+
@ConditionalOnProperty(value = "spring.cloud.sleuth.instrument.integration.globalChannelInterceptor.enabled", matchIfMissing = true)
35+
public TraceContextPropagationChannelInterceptor traceContextPropagationChannelInterceptor() {
36+
return new TraceContextPropagationChannelInterceptor();
37+
}
38+
}

spring-cloud-sleuth-core/src/main/resources/META-INF/spring.factories

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
33
org.springframework.cloud.sleuth.autoconfig.TraceAutoConfiguration,\
44
org.springframework.cloud.sleuth.log.SleuthLogAutoConfiguration,\
5+
org.springframework.cloud.sleuth.instrument.integration.TraceSpringIntegrationAutoConfiguration,\
56
org.springframework.cloud.sleuth.instrument.scheduling.TraceSchedulingAutoConfiguration,\
67
org.springframework.cloud.sleuth.instrument.web.TraceWebAutoConfiguration,\
78
org.springframework.cloud.sleuth.instrument.web.client.TraceWebClientAutoConfiguration
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright 2013-2015 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.sleuth.instrument.integration;
18+
19+
import static org.junit.Assert.assertNotNull;
20+
import static org.springframework.cloud.sleuth.Trace.SPAN_ID_NAME;
21+
import static org.springframework.cloud.sleuth.Trace.TRACE_ID_NAME;
22+
23+
import org.junit.After;
24+
import org.junit.Test;
25+
import org.springframework.boot.SpringApplication;
26+
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
27+
import org.springframework.cloud.sleuth.Trace;
28+
import org.springframework.cloud.sleuth.TraceScope;
29+
import org.springframework.cloud.sleuth.sampler.AlwaysSampler;
30+
import org.springframework.context.ConfigurableApplicationContext;
31+
import org.springframework.context.annotation.Bean;
32+
import org.springframework.context.annotation.Configuration;
33+
import org.springframework.integration.annotation.MessageEndpoint;
34+
import org.springframework.integration.channel.QueueChannel;
35+
import org.springframework.integration.config.EnableIntegration;
36+
import org.springframework.integration.support.MessageBuilder;
37+
import org.springframework.messaging.Message;
38+
import org.springframework.messaging.PollableChannel;
39+
import org.springframework.test.context.ContextConfiguration;
40+
41+
/**
42+
* @author Spencer Gibb
43+
*/
44+
@ContextConfiguration(classes = TraceContextPropagationChannelInterceptorTests.App.class)
45+
public class TraceContextPropagationChannelInterceptorTests {
46+
private ConfigurableApplicationContext context;
47+
48+
@After
49+
public void close() {
50+
if (context != null) {
51+
context.close();
52+
}
53+
}
54+
55+
@Test
56+
public void testSpanPropagation() {
57+
context = SpringApplication.run(App.class);
58+
59+
PollableChannel channel = context.getBean("channel", PollableChannel.class);
60+
61+
Trace trace = context.getBean(Trace.class);
62+
63+
TraceScope traceScope = trace.startSpan("testSendMessage", new AlwaysSampler(), null);
64+
channel.send(MessageBuilder.withPayload("hi").build());
65+
traceScope.close();
66+
67+
Message<?> message = channel.receive(0);
68+
69+
assertNotNull("message was null", message);
70+
71+
String spanId = message.getHeaders().get(SPAN_ID_NAME, String.class);
72+
assertNotNull("spanId was null", spanId);
73+
74+
String traceId = message.getHeaders().get(TRACE_ID_NAME, String.class);
75+
assertNotNull("traceId was null", traceId);
76+
}
77+
78+
@Configuration
79+
@EnableAutoConfiguration
80+
@MessageEndpoint
81+
@EnableIntegration
82+
static class App {
83+
84+
@Bean
85+
public QueueChannel channel() {
86+
return new QueueChannel();
87+
}
88+
89+
}
90+
}

0 commit comments

Comments
 (0)