Skip to content

Commit 9292c88

Browse files
author
James Tyack
committed
Amended functions to make use of new AstyanaxClientFactory so that only one instance is created for each Cassandra cluster.
1 parent 1e3857a commit 9292c88

File tree

2 files changed

+25
-15
lines changed

2 files changed

+25
-15
lines changed

src/main/java/com/hmsonline/storm/cassandra/trident/TridentCassandraLookupFunction.java

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.hmsonline.storm.cassandra.bolt.mapper.TridentColumnMapper;
1919
import com.hmsonline.storm.cassandra.bolt.mapper.TridentTupleMapper;
2020
import com.hmsonline.storm.cassandra.client.AstyanaxClient;
21+
import com.hmsonline.storm.cassandra.client.AstyanaxClientFactory;
2122

2223
public class TridentCassandraLookupFunction<K, C, V> implements Function {
2324
private static final long serialVersionUID = 12132012L;
@@ -27,24 +28,35 @@ public class TridentCassandraLookupFunction<K, C, V> implements Function {
2728
private TridentColumnMapper<K, C, V> columnsMapper;
2829
private TridentTupleMapper<K, C, V> tupleMapper;
2930
private AstyanaxClient<K, C, V> client;
30-
private String clientConfigKey;
31+
private String cassandraClusterId;
3132

3233
private Filter tupleFilter = null; // used to prevent processing for tuples
3334
// that should be skipped by the lookup
3435
private int numberOfOutputFields = 1; // used to emit when the incoming
3536
// tuple doesn't pass the filter check
3637
private boolean emitEmptyOnFailure = false;
3738

38-
public TridentCassandraLookupFunction(String clientConfigKey, TridentTupleMapper<K, C, V> tupleMapper,
39+
/**
40+
* @param cassandraClusterId Unique identifier for the Cassandra cluster
41+
* @param tupleMapper
42+
* @param columnMapper
43+
*/
44+
public TridentCassandraLookupFunction(String cassandraClusterId, TridentTupleMapper<K, C, V> tupleMapper,
3945
TridentColumnMapper<K, C, V> columnMapper) {
4046
this.columnsMapper = columnMapper;
4147
this.tupleMapper = tupleMapper;
42-
this.clientConfigKey = clientConfigKey;
48+
this.cassandraClusterId = cassandraClusterId;
4349
}
4450

45-
public TridentCassandraLookupFunction(String clientConfigKey, TridentTupleMapper<K, C, V> tupleMapper,
51+
/**
52+
* @param cassandraClusterId Unique identifier for the Cassandra cluster
53+
* @param tupleMapper
54+
* @param columnMapper
55+
* @param emitEmptyOnFailure
56+
*/
57+
public TridentCassandraLookupFunction(String cassandraClusterId, TridentTupleMapper<K, C, V> tupleMapper,
4658
TridentColumnMapper<K, C, V> columnMapper, boolean emitEmptyOnFailure) {
47-
this(clientConfigKey, tupleMapper, columnMapper);
59+
this(cassandraClusterId, tupleMapper, columnMapper);
4860
this.emitEmptyOnFailure = emitEmptyOnFailure;
4961
}
5062

@@ -63,10 +75,8 @@ public void setEmitEmptyOnFailure(boolean emitEmptyOnFailure) {
6375
@Override
6476
@SuppressWarnings({ "unchecked", "rawtypes" })
6577
public void prepare(Map stormConf, TridentOperationContext context) {
66-
Map<String, Object> config = (Map<String, Object>) stormConf.get(this.clientConfigKey);
67-
68-
this.client = new AstyanaxClient<K, C, V>();
69-
this.client.start(config);
78+
Map<String, Object> config = (Map<String, Object>) stormConf.get(this.cassandraClusterId);
79+
this.client = AstyanaxClientFactory.getInstance(cassandraClusterId, config);
7080
}
7181

7282
@Override

src/main/java/com/hmsonline/storm/cassandra/trident/TridentCassandraWriteFunction.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import com.hmsonline.storm.cassandra.bolt.mapper.TridentTupleMapper;
1616
import com.hmsonline.storm.cassandra.client.AstyanaxClient;
17+
import com.hmsonline.storm.cassandra.client.AstyanaxClientFactory;
1718
import com.hmsonline.storm.cassandra.exceptions.StormCassandraException;
1819
import com.hmsonline.storm.cassandra.exceptions.TupleMappingException;
1920

@@ -24,16 +25,16 @@ public class TridentCassandraWriteFunction<K, C, V> implements Function {
2425
protected TridentTupleMapper<K, C, V> tupleMapper;
2526
private AstyanaxClient<K, C, V> client;
2627

27-
private String clientConfigKey;
28+
private String cassandraClusterId;
2829
private Object valueToEmit;
2930

3031
public void setValueToEmitAfterWrite(Object valueToEmit) {
3132
this.valueToEmit = valueToEmit;
3233
}
3334

34-
public TridentCassandraWriteFunction(String clientConfigKey, TridentTupleMapper<K, C, V> tupleMapper) {
35+
public TridentCassandraWriteFunction(String cassandraClusterId, TridentTupleMapper<K, C, V> tupleMapper) {
3536
this.tupleMapper = tupleMapper;
36-
this.clientConfigKey = clientConfigKey;
37+
this.cassandraClusterId = cassandraClusterId;
3738
this.valueToEmit = null;
3839
}
3940
public TridentCassandraWriteFunction(String clientConfigKey, TridentTupleMapper<K, C, V> tupleMapper,
@@ -45,9 +46,8 @@ public TridentCassandraWriteFunction(String clientConfigKey, TridentTupleMapper<
4546
@Override
4647
@SuppressWarnings({ "rawtypes", "unchecked" })
4748
public void prepare(Map stormConf, TridentOperationContext context) {
48-
Map<String, Object> config = (Map<String, Object>) stormConf.get(this.clientConfigKey);
49-
client = new AstyanaxClient<K, C, V>();
50-
client.start(config);
49+
Map<String, Object> config = (Map<String, Object>) stormConf.get(this.cassandraClusterId);
50+
client = AstyanaxClientFactory.getInstance(cassandraClusterId, config);
5151
}
5252

5353
@Override

0 commit comments

Comments
 (0)