Skip to content

Commit 5d735b2

Browse files
committed
Merge pull request #237 from Netflix/ISSUE-236
counters are added
2 parents 1c31295 + ac45241 commit 5d735b2

File tree

6 files changed

+36
-2
lines changed

6 files changed

+36
-2
lines changed

suro-core/src/main/java/com/netflix/suro/TagKey.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,5 @@ public class TagKey {
2727
public static final String RETRIED_COUNT = "retriedCount";
2828
public static final String ROUTING_KEY = "routingKey";
2929
public static final String REJECTED_REASON = "rejectedReason";
30+
public static final String ATTEMPTED_COUNT = "attemptedMessageCount";
3031
}

suro-core/src/main/java/com/netflix/suro/routing/MessageRouter.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,12 @@ public void process(SuroInput input, MessageContainer msg) throws Exception {
6464
return; // discard message
6565
}
6666

67+
DynamicCounter.increment(
68+
MonitorConfig
69+
.builder(TagKey.RECV_COUNT)
70+
.withTag("routingKey", msg.getRoutingKey())
71+
.build());
72+
6773
RoutingMap.RoutingInfo info = routingMap.getRoutingInfo(msg.getRoutingKey());
6874

6975
if (info == null) {
@@ -83,6 +89,13 @@ public void process(SuroInput input, MessageContainer msg) throws Exception {
8389
} else {
8490
sink.writeTo(msg);
8591
}
92+
93+
DynamicCounter.increment(
94+
MonitorConfig
95+
.builder(TagKey.ATTEMPTED_COUNT)
96+
.withTag("routingKey", msg.getRoutingKey())
97+
.withTag("sinkId", route.getSink())
98+
.build());
8699
}
87100
}
88101
}

suro-elasticsearch/src/test/java/com/netflix/suro/sink/elasticsearch/TestIndexSuffixFormatter.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,19 @@ public void shouldDateTypeReturnsCorrectOne() {
3636
public void shouldThrowExceptionOnUnsupportedType() {
3737
IndexSuffixFormatter formatter = new IndexSuffixFormatter("invalid", null);
3838
}
39+
40+
@Test
41+
public void testWeeklyRepresentation() {
42+
System.setProperty("user.timezone", "GMT");
43+
44+
Properties props = new Properties();
45+
props.put("dateFormat", "YYYYMM_ww");
46+
47+
DateTime dt = new DateTime("2014-10-12T00:00:00.000Z");
48+
IndexSuffixFormatter formatter = new IndexSuffixFormatter("date", props);
49+
IndexInfo info = mock(IndexInfo.class);
50+
doReturn(dt.getMillis()).when(info).getTimestamp();
51+
52+
assertEquals(formatter.format(info), "201410_41");
53+
}
3954
}

suro-kafka-producer/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ apply plugin: 'nebula-test-jar'
33
dependencies {
44
compile project(':suro-core')
55
compile 'com.netflix.rxjava:rxjava-core:0.19.1'
6-
compile 'org.apache.kafka:kafka-clients:0.8.2.0'
6+
compile 'org.apache.kafka:kafka-clients:0.8.2.1'
77

88
testCompile 'junit:junit:4.11'
99
testCompile 'org.apache.curator:curator-test:2.4.2'

suro-kafka-producer/src/main/java/com/netflix/suro/sink/kafka/KafkaRetentionPartitioner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public void run() {
3232

3333
this.prng = new Random();
3434
// seed with a random integer
35-
this.indexCache = new ConcurrentHashMap<String, Integer>();
35+
this.indexCache = new ConcurrentHashMap<>();
3636
// increment index every interval
3737
}
3838

suro-kafka-producer/src/main/java/com/netflix/suro/sink/kafka/KafkaSink.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,11 @@ public void setRecordCounterListener(Action3 action) {
141141
@Override
142142
public void writeTo(final MessageContainer message) {
143143
queuedRecords.incrementAndGet();
144+
DynamicCounter.increment(
145+
MonitorConfig
146+
.builder("queuedRecord")
147+
.withTag(TagKey.ROUTING_KEY, message.getRoutingKey())
148+
.build());
144149
runRecordCounterListener();
145150

146151
if (metadataFetchedTopicSet.contains(message.getRoutingKey())) {

0 commit comments

Comments
 (0)