Skip to content

Commit 18381ca

Browse files
committed
HADOOP-7324. Ganglia plugins for metrics v2. (Priyo Mustafi via llu)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1145525 13f79535-47bb-0310-9956-ffa450edef68
1 parent a127653 commit 18381ca

File tree

11 files changed

+1081
-13
lines changed

11 files changed

+1081
-13
lines changed

common/CHANGES.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ Trunk (unreleased changes)
1212

1313
NEW FEATURES
1414

15+
HADOOP-7324. Ganglia plugins for metrics v2. (Priyo Mustafi via llu)
16+
1517
HADOOP-7342. Add an utility API in FileUtil for JDK File.list
1618
avoid NPEs on File.list() (Bharath Mundlapudi via mattf)
1719

common/conf/hadoop-metrics2.properties

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,33 @@
2525

2626
#reducetask.sink.file.filename=reducetask-metrics.out
2727

28+
29+
#
30+
# Below are for sending metrics to Ganglia
31+
#
32+
# for Ganglia 3.0 support
33+
# *.sink.ganglia.class=org.apache.hadoop.metrics2.sink.ganglia.GangliaSink30
34+
#
35+
# for Ganglia 3.1 support
36+
# *.sink.ganglia.class=org.apache.hadoop.metrics2.sink.ganglia.GangliaSink31
37+
38+
# *.sink.ganglia.period=10
39+
40+
# default for supportsparse is false
41+
# *.sink.ganglia.supportsparse=true
42+
43+
#*.sink.ganglia.slope=jvm.metrics.gcCount=zero,jvm.metrics.memHeapUsedM=both
44+
#*.sink.ganglia.dmax=jvm.metrics.threadsBlocked=70,jvm.metrics.memHeapUsedM=40
45+
46+
#namenode.sink.ganglia.servers=yourgangliahost_1:8649,yourgangliahost_2:8649
47+
48+
#datanode.sink.ganglia.servers=yourgangliahost_1:8649,yourgangliahost_2:8649
49+
50+
#jobtracker.sink.ganglia.servers=yourgangliahost_1:8649,yourgangliahost_2:8649
51+
52+
#tasktracker.sink.ganglia.servers=yourgangliahost_1:8649,yourgangliahost_2:8649
53+
54+
#maptask.sink.ganglia.servers=yourgangliahost_1:8649,yourgangliahost_2:8649
55+
56+
#reducetask.sink.ganglia.servers=yourgangliahost_1:8649,yourgangliahost_2:8649
57+
Lines changed: 288 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,288 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.metrics2.sink.ganglia;
20+
21+
import java.io.IOException;
22+
import java.net.DatagramPacket;
23+
import java.net.DatagramSocket;
24+
import java.net.SocketAddress;
25+
import java.net.SocketException;
26+
import java.net.UnknownHostException;
27+
import java.util.HashMap;
28+
import java.util.List;
29+
import java.util.Map;
30+
31+
import org.apache.commons.configuration.SubsetConfiguration;
32+
import org.apache.commons.logging.Log;
33+
import org.apache.commons.logging.LogFactory;
34+
import org.apache.hadoop.metrics2.MetricsSink;
35+
import org.apache.hadoop.metrics2.util.Servers;
36+
import org.apache.hadoop.net.DNS;
37+
38+
/**
39+
* This the base class for Ganglia sink classes using metrics2. Lot of the code
40+
* has been derived from org.apache.hadoop.metrics.ganglia.GangliaContext.
41+
* As per the documentation, sink implementations doesn't have to worry about
42+
* thread safety. Hence the code wasn't written for thread safety and should
43+
* be modified in case the above assumption changes in the future.
44+
*/
45+
public abstract class AbstractGangliaSink implements MetricsSink {
46+
47+
public final Log LOG = LogFactory.getLog(this.getClass());
48+
49+
/*
50+
* Output of "gmetric --help" showing allowable values
51+
* -t, --type=STRING
52+
* Either string|int8|uint8|int16|uint16|int32|uint32|float|double
53+
* -u, --units=STRING Unit of measure for the value e.g. Kilobytes, Celcius
54+
* (default='')
55+
* -s, --slope=STRING Either zero|positive|negative|both
56+
* (default='both')
57+
* -x, --tmax=INT The maximum time in seconds between gmetric calls
58+
* (default='60')
59+
*/
60+
public static final String DEFAULT_UNITS = "";
61+
public static final int DEFAULT_TMAX = 60;
62+
public static final int DEFAULT_DMAX = 0;
63+
public static final GangliaSlope DEFAULT_SLOPE = GangliaSlope.both;
64+
public static final int DEFAULT_PORT = 8649;
65+
public static final String SERVERS_PROPERTY = "servers";
66+
public static final int BUFFER_SIZE = 1500; // as per libgmond.c
67+
public static final String SUPPORT_SPARSE_METRICS_PROPERTY = "supportsparse";
68+
public static final boolean SUPPORT_SPARSE_METRICS_DEFAULT = false;
69+
public static final String EQUAL = "=";
70+
71+
private String hostName = "UNKNOWN.example.com";
72+
private DatagramSocket datagramSocket;
73+
private List<? extends SocketAddress> metricsServers;
74+
private byte[] buffer = new byte[BUFFER_SIZE];
75+
private int offset;
76+
private boolean supportSparseMetrics = SUPPORT_SPARSE_METRICS_DEFAULT;
77+
78+
/**
79+
* Used for visiting Metrics
80+
*/
81+
protected final GangliaMetricVisitor gangliaMetricVisitor =
82+
new GangliaMetricVisitor();
83+
84+
private SubsetConfiguration conf;
85+
private Map<String, GangliaConf> gangliaConfMap;
86+
private GangliaConf DEFAULT_GANGLIA_CONF = new GangliaConf();
87+
88+
/**
89+
* ganglia slope values which equal the ordinal
90+
*/
91+
public enum GangliaSlope {
92+
zero, // 0
93+
positive, // 1
94+
negative, // 2
95+
both // 3
96+
};
97+
98+
/**
99+
* define enum for various type of conf
100+
*/
101+
public enum GangliaConfType {
102+
slope, units, dmax, tmax
103+
};
104+
105+
/*
106+
* (non-Javadoc)
107+
*
108+
* @see
109+
* org.apache.hadoop.metrics2.MetricsPlugin#init(org.apache.commons.configuration
110+
* .SubsetConfiguration)
111+
*/
112+
public void init(SubsetConfiguration conf) {
113+
LOG.debug("Initializing the GangliaSink for Ganglia metrics.");
114+
115+
this.conf = conf;
116+
117+
// Take the hostname from the DNS class.
118+
if (conf.getString("slave.host.name") != null) {
119+
hostName = conf.getString("slave.host.name");
120+
} else {
121+
try {
122+
hostName = DNS.getDefaultHost(
123+
conf.getString("dfs.datanode.dns.interface", "default"),
124+
conf.getString("dfs.datanode.dns.nameserver", "default"));
125+
} catch (UnknownHostException uhe) {
126+
LOG.error(uhe);
127+
hostName = "UNKNOWN.example.com";
128+
}
129+
}
130+
131+
// load the gannglia servers from properties
132+
metricsServers = Servers.parse(conf.getString(SERVERS_PROPERTY),
133+
DEFAULT_PORT);
134+
135+
// extract the Ganglia conf per metrics
136+
gangliaConfMap = new HashMap<String, GangliaConf>();
137+
loadGangliaConf(GangliaConfType.units);
138+
loadGangliaConf(GangliaConfType.tmax);
139+
loadGangliaConf(GangliaConfType.dmax);
140+
loadGangliaConf(GangliaConfType.slope);
141+
142+
try {
143+
datagramSocket = new DatagramSocket();
144+
} catch (SocketException se) {
145+
LOG.error(se);
146+
}
147+
148+
// see if sparseMetrics is supported. Default is false
149+
supportSparseMetrics = conf.getBoolean(SUPPORT_SPARSE_METRICS_PROPERTY,
150+
SUPPORT_SPARSE_METRICS_DEFAULT);
151+
}
152+
153+
/*
154+
* (non-Javadoc)
155+
*
156+
* @see org.apache.hadoop.metrics2.MetricsSink#flush()
157+
*/
158+
public void flush() {
159+
// nothing to do as we are not buffering data
160+
}
161+
162+
// Load the configurations for a conf type
163+
private void loadGangliaConf(GangliaConfType gtype) {
164+
String propertyarr[] = conf.getStringArray(gtype.name());
165+
if (propertyarr != null && propertyarr.length > 0) {
166+
for (String metricNValue : propertyarr) {
167+
String metricNValueArr[] = metricNValue.split(EQUAL);
168+
if (metricNValueArr.length != 2 || metricNValueArr[0].length() == 0) {
169+
LOG.error("Invalid propertylist for " + gtype.name());
170+
}
171+
172+
String metricName = metricNValueArr[0].trim();
173+
String metricValue = metricNValueArr[1].trim();
174+
GangliaConf gconf = gangliaConfMap.get(metricName);
175+
if (gconf == null) {
176+
gconf = new GangliaConf();
177+
gangliaConfMap.put(metricName, gconf);
178+
}
179+
180+
switch (gtype) {
181+
case units:
182+
gconf.setUnits(metricValue);
183+
break;
184+
case dmax:
185+
gconf.setDmax(Integer.parseInt(metricValue));
186+
break;
187+
case tmax:
188+
gconf.setTmax(Integer.parseInt(metricValue));
189+
break;
190+
case slope:
191+
gconf.setSlope(GangliaSlope.valueOf(metricValue));
192+
break;
193+
}
194+
}
195+
}
196+
}
197+
198+
/**
199+
* Lookup GangliaConf from cache. If not found, return default values
200+
*
201+
* @param metricName
202+
* @return looked up GangliaConf
203+
*/
204+
protected GangliaConf getGangliaConfForMetric(String metricName) {
205+
GangliaConf gconf = gangliaConfMap.get(metricName);
206+
207+
return gconf != null ? gconf : DEFAULT_GANGLIA_CONF;
208+
}
209+
210+
/**
211+
* @return the hostName
212+
*/
213+
protected String getHostName() {
214+
return hostName;
215+
}
216+
217+
/**
218+
* Puts a string into the buffer by first writing the size of the string as an
219+
* int, followed by the bytes of the string, padded if necessary to a multiple
220+
* of 4.
221+
* @param s the string to be written to buffer at offset location
222+
*/
223+
protected void xdr_string(String s) {
224+
byte[] bytes = s.getBytes();
225+
int len = bytes.length;
226+
xdr_int(len);
227+
System.arraycopy(bytes, 0, buffer, offset, len);
228+
offset += len;
229+
pad();
230+
}
231+
232+
// Pads the buffer with zero bytes up to the nearest multiple of 4.
233+
private void pad() {
234+
int newOffset = ((offset + 3) / 4) * 4;
235+
while (offset < newOffset) {
236+
buffer[offset++] = 0;
237+
}
238+
}
239+
240+
/**
241+
* Puts an integer into the buffer as 4 bytes, big-endian.
242+
*/
243+
protected void xdr_int(int i) {
244+
buffer[offset++] = (byte) ((i >> 24) & 0xff);
245+
buffer[offset++] = (byte) ((i >> 16) & 0xff);
246+
buffer[offset++] = (byte) ((i >> 8) & 0xff);
247+
buffer[offset++] = (byte) (i & 0xff);
248+
}
249+
250+
/**
251+
* Sends Ganglia Metrics to the configured hosts
252+
* @throws IOException
253+
*/
254+
protected void emitToGangliaHosts() throws IOException {
255+
try {
256+
for (SocketAddress socketAddress : metricsServers) {
257+
DatagramPacket packet =
258+
new DatagramPacket(buffer, offset, socketAddress);
259+
datagramSocket.send(packet);
260+
}
261+
} finally {
262+
// reset the buffer for the next metric to be built
263+
offset = 0;
264+
}
265+
}
266+
267+
/**
268+
* Reset the buffer for the next metric to be built
269+
*/
270+
void resetBuffer() {
271+
offset = 0;
272+
}
273+
274+
/**
275+
* @return whether sparse metrics are supported
276+
*/
277+
protected boolean isSupportSparseMetrics() {
278+
return supportSparseMetrics;
279+
}
280+
281+
/**
282+
* Used only by unit test
283+
* @param datagramSocket the datagramSocket to set.
284+
*/
285+
void setDatagramSocket(DatagramSocket datagramSocket) {
286+
this.datagramSocket = datagramSocket;
287+
}
288+
}

0 commit comments

Comments
 (0)