Skip to content

Commit 847404f

Browse files
committed
Use properties instead of customizers
1 parent eab3600 commit 847404f

File tree

4 files changed

+113
-156
lines changed

4 files changed

+113
-156
lines changed

Transformations.java

Lines changed: 103 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
1-
// camel-k: language=java property-file=transformation.properties dependency=camel:jacksonxml dependency=camel:http dependency=camel:gson
2-
// camel-k: source=customizers/CSVCustomizer.java source=customizers/PostgreSQLCustomizer.java
1+
// camel-k: language=java property-file=transformation.properties
2+
// camel-k: dependency=camel:jacksonxml
3+
// camel-k: dependency=camel:http
4+
// camel-k: dependency=camel:gson
5+
// camel-k: dependency=camel:jdbc
6+
// camel-k: dependency=camel:csv
7+
// camel-k: dependency=mvn:org.postgresql:postgresql:jar:42.2.13
8+
// camel-k: dependency=mvn:org.apache.commons:commons-dbcp2:jar:2.7.0
39

410
import java.util.ArrayList;
511
import java.util.HashMap;
@@ -15,148 +21,139 @@
1521

1622
public class Transformations extends RouteBuilder {
1723

18-
@Override
24+
@Override
1925
public void configure() throws Exception {
2026

21-
//The following processors store relevant info as properties
27+
// The following processors store relevant info as properties
2228
Processor processCsv = new CSVProcessor();
2329
Processor processXML = new XMLProcessor();
2430

25-
//Preparing properties to build a GeoJSON Feature
31+
// Preparing properties to build a GeoJSON Feature
2632
Processor processDB = new DBProcessor();
2733

28-
//Just collects all features in a collection for the final GeoJSON
34+
// Just collects all features in a collection for the final GeoJSON
2935
Processor buildGeoJSON = new GeoJSONProcessor();
3036

31-
//Aggregate all messages into one message with the list of bodies
37+
// Aggregate all messages into one message with the list of bodies
3238
AggregationStrategy aggregationStrategy = new CollectToListStrategy();
3339

34-
//This is the actual route
40+
// This is the actual route
3541
from("timer:java?period=100000")
3642
// Reference URL for air quality e-Reporting on EEA
3743
// https://www.eea.europa.eu/data-and-maps/data/aqereporting-2
3844

39-
//We start by reading our data.csv file, looping on each row
40-
.to("{{source.csv}}")
41-
.unmarshal("customCSV")
42-
.split(body()).streaming()
45+
// We start by reading our data.csv file, looping on each row
46+
.to("{{source.csv}}").unmarshal("customCSV").split(body()).streaming()
4347

44-
//we store on exchange properties all the data we are interested in
48+
// we store on exchange properties all the data we are interested in
4549
.process(processCsv)
4650

47-
//on each row, we query an XML API service
48-
.setBody().constant("")
49-
.setHeader(Exchange.HTTP_METHOD, constant("GET"))
51+
// on each row, we query an XML API service
52+
.setBody().constant("").setHeader(Exchange.HTTP_METHOD, constant("GET"))
5053
.setHeader(Exchange.HTTP_QUERY, simple("lat=${exchangeProperty.lat}&lon=${exchangeProperty.lon}&format=xml"))
51-
.to("https://nominatim.openstreetmap.org/reverse")
52-
.unmarshal().jacksonxml()
54+
.to("https://nominatim.openstreetmap.org/reverse").unmarshal().jacksonxml()
5355

54-
//we store on exchange properties all the data we are interested in
56+
// we store on exchange properties all the data we are interested in
5557
.process(processXML)
5658

57-
//now we query the postgres database for more data
59+
// now we query the postgres database for more data
5860
.setBody().simple("SELECT info FROM descriptions WHERE id like '${exchangeProperty.pollutant}'")
5961
.to("jdbc:postgresBean?readSize=1")
60-
61-
//we store on exchange properties all the data we are interested in
62+
63+
// we store on exchange properties all the data we are interested in
6264
.process(processDB)
6365

64-
//we collect all rows into one message
65-
.aggregate(constant(true), aggregationStrategy)
66-
.completionSize(5)
67-
.process(buildGeoJSON)
68-
.marshal().json(JsonLibrary.Gson)
66+
// we collect all rows into one message
67+
.aggregate(constant(true), aggregationStrategy).completionSize(5).process(buildGeoJSON).marshal()
68+
.json(JsonLibrary.Gson)
6969

7070
.to("log:info?showBody=true")
71-
//and finally store the result on the postgres database
72-
.setBody(simple("INSERT INTO measurements (geojson) VALUES ('${body}')"))
73-
.to("jdbc:postgresBean")
74-
75-
//Write some log to know it finishes properly
71+
// and finally store the result on the postgres database
72+
.setBody(simple("INSERT INTO measurements (geojson) VALUES ('${body}')")).to("jdbc:postgresBean")
73+
74+
// Write some log to know it finishes properly
7675
.log("Information stored");
7776
}
7877

79-
private final class CollectToListStrategy
80-
extends AbstractListAggregationStrategy<Object> {
81-
@Override
82-
public Object getValue(Exchange exchange) {
83-
return exchange.getMessage().getBody();
84-
}
85-
}
78+
private final class CollectToListStrategy extends AbstractListAggregationStrategy<Object> {
79+
@Override
80+
public Object getValue(Exchange exchange) {
81+
return exchange.getMessage().getBody();
82+
}
83+
}
8684

87-
private final class GeoJSONProcessor implements Processor {
88-
@Override
89-
public void process(Exchange exchange) throws Exception {
90-
Map<String, Object> res = new HashMap<String, Object>();
91-
res.put("features", exchange.getMessage().getBody());
92-
res.put("type", "FeatureCollection");
93-
exchange.getIn().setBody(res);
85+
private final class GeoJSONProcessor implements Processor {
86+
@Override
87+
public void process(Exchange exchange) throws Exception {
88+
Map<String, Object> res = new HashMap<String, Object>();
89+
res.put("features", exchange.getMessage().getBody());
90+
res.put("type", "FeatureCollection");
91+
exchange.getIn().setBody(res);
92+
}
93+
}
94+
95+
private final class DBProcessor implements Processor {
96+
@Override
97+
public void process(Exchange exchange) throws Exception {
98+
@SuppressWarnings("unchecked")
99+
List<Object> body = exchange.getMessage().getBody(List.class);
100+
101+
Map<String, Object> outputBody = new HashMap<String, Object>();
102+
outputBody.put("unit", exchange.getProperty("unit"));
103+
outputBody.put("level", exchange.getProperty("level"));
104+
outputBody.put("pollutant", exchange.getProperty("pollutant"));
105+
outputBody.put("address", exchange.getProperty("address"));
106+
107+
// If we got any response from the DB, add it
108+
if (body.size() > 0) {
109+
outputBody.put("info", body.get(0).toString());
94110
}
95-
}
96111

97-
private final class DBProcessor implements Processor {
98-
@Override
99-
public void process(Exchange exchange) throws Exception {
100-
@SuppressWarnings("unchecked")
101-
List<Object> body = exchange.getMessage().getBody(List.class);
102-
103-
Map<String, Object> outputBody = new HashMap<String, Object>();
104-
outputBody.put("unit", exchange.getProperty("unit"));
105-
outputBody.put("level", exchange.getProperty("level"));
106-
outputBody.put("pollutant", exchange.getProperty("pollutant"));
107-
outputBody.put("address", exchange.getProperty("address"));
108-
109-
//If we got any response from the DB, add it
110-
if(body.size() > 0) {
111-
outputBody.put("info", body.get(0).toString());
112-
}
113-
114-
List<String> coordinates = new ArrayList<String>();
115-
coordinates.add(exchange.getProperty("lat", "").toString());
116-
coordinates.add(exchange.getProperty("lon", "").toString());
117-
118-
Map<String, Object> geometry = new HashMap<String, Object>();
119-
geometry.put("type", "Point");
120-
geometry.put("coordinates", coordinates);
121-
122-
Map<String, Object> res = new HashMap<String, Object>();
123-
res.put("geometry", geometry);
124-
res.put("properties", outputBody);
125-
res.put("type", "Feature");
126-
127-
exchange.getIn().setBody(res);
128-
}
129-
}
130-
131-
private final class XMLProcessor implements Processor {
132-
@Override
112+
List<String> coordinates = new ArrayList<String>();
113+
coordinates.add(exchange.getProperty("lat", "").toString());
114+
coordinates.add(exchange.getProperty("lon", "").toString());
115+
116+
Map<String, Object> geometry = new HashMap<String, Object>();
117+
geometry.put("type", "Point");
118+
geometry.put("coordinates", coordinates);
119+
120+
Map<String, Object> res = new HashMap<String, Object>();
121+
res.put("geometry", geometry);
122+
res.put("properties", outputBody);
123+
res.put("type", "Feature");
124+
125+
exchange.getIn().setBody(res);
126+
}
127+
}
128+
129+
private final class XMLProcessor implements Processor {
130+
@Override
133131
public void process(Exchange exchange) throws Exception {
134132
@SuppressWarnings("unchecked")
135133
Map<String, String> body = exchange.getIn().getBody(Map.class);
136134
exchange.setProperty("address", body.get("addressparts"));
137135
}
138-
}
136+
}
137+
138+
private final class CSVProcessor implements Processor {
139+
@Override
140+
public void process(Exchange exchange) throws Exception {
141+
@SuppressWarnings("unchecked")
142+
Map<String, String> body = exchange.getIn().getBody(Map.class);
143+
144+
if (body != null) {
145+
extractValue(exchange, body, "Latitude of station", "lat");
146+
extractValue(exchange, body, "Longitude of station", "lon");
147+
extractValue(exchange, body, "Unit", "unit");
148+
extractValue(exchange, body, "Air pollution level", "level");
149+
extractValue(exchange, body, "Air pollutant", "pollutant");
150+
}
151+
}
139152

140-
private final class CSVProcessor implements Processor {
141-
@Override
142-
public void process(Exchange exchange) throws Exception {
143-
@SuppressWarnings("unchecked")
144-
Map<String, String> body = exchange.getIn().getBody(Map.class);
145-
146-
if (body != null) {
147-
extractValue(exchange, body, "Latitude of station", "lat");
148-
extractValue(exchange, body, "Longitude of station", "lon");
149-
extractValue(exchange, body, "Unit", "unit");
150-
extractValue(exchange, body, "Air pollution level", "level");
151-
extractValue(exchange, body, "Air pollutant", "pollutant");
152-
}
153-
}
154-
155-
private void extractValue(Exchange exchange, Map<String, String> body,
156-
String param, String keyName) {
157-
if (body.containsKey(param)) {
158-
exchange.setProperty(keyName, body.get(param));
159-
}
160-
}
161-
}
153+
private void extractValue(Exchange exchange, Map<String, String> body, String param, String keyName) {
154+
if (body.containsKey(param)) {
155+
exchange.setProperty(keyName, body.get(param));
156+
}
157+
}
158+
}
162159
}

customizers/CSVCustomizer.java

Lines changed: 0 additions & 18 deletions
This file was deleted.

customizers/PostgreSQLCustomizer.java

Lines changed: 0 additions & 30 deletions
This file was deleted.

transformation.properties

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,13 @@ postgresql.host=mypostgres
88
postgresql.database=example
99

1010
# CSV dataformat settings
11-
camel.dataformat.csv.allow-missing-column-names=true
12-
camel.dataformat.csv.use-maps=true
11+
camel.beans.customCSV = #class:org.apache.camel.model.dataformat.CsvDataFormat
12+
camel.beans.customCSV.allow-missing-column-names = true
13+
camel.beans.customCSV.use-maps = true
14+
15+
# JDBC dataformat settings
16+
camel.beans.postgresBean = #class:org.apache.commons.dbcp2.BasicDataSource
17+
camel.beans.postgresBean.url = jdbc:postgresql://{{postgresql.host}}:5432/{{postgresql.database}}
18+
camel.beans.postgresBean.username = {{postgresql.user}}
19+
camel.beans.postgresBean.password = {{postgresql.password}}
20+
camel.beans.postgresBean.validation-query = SELECT 1

0 commit comments

Comments
 (0)