Skip to content

Commit 999987d

Browse files
committed
Support docAsUpsert as option
1 parent dc531e0 commit 999987d

File tree

2 files changed

+9
-4
lines changed

2 files changed

+9
-4
lines changed

src/main/java/org/xbib/elasticsearch/common/util/PlainKeyValueStreamListener.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
3838

3939
public class PlainKeyValueStreamListener<K, V> implements KeyValueStreamListener<K, V> {
40-
private final static String ValueSeperators = ",|;~!@#$%^&";
40+
private final static String ValueSeperators = ",;|/~!@#$%^&";
4141
private final static Pattern p = Pattern.compile("^(.*)\\[(.*?)\\]$");
4242

4343
/**
@@ -291,6 +291,7 @@ protected Map<String, Object> merge(Map<String, Object> map, Object k, Object va
291291
if (index == null || index.isEmpty()) {
292292
map.put(head, new Values(map.get(head), value, isSequence));
293293
} else if (index.length() == 1 && ValueSeperators.contains(index)) {
294+
// Use difference seperator, pass it as Pattern.quote(index) since Values use String.split
294295
map.put(head, new Values(map.get(head), value, isSequence, Pattern.quote(index)));
295296
} else {
296297
if (!map.containsKey(head)) {

src/main/java/org/xbib/elasticsearch/jdbc/strategy/standard/StandardSink.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ public class StandardSink<C extends StandardContext> implements Sink<C> {
6767
protected String type;
6868

6969
protected String id;
70+
71+
protected boolean docAsUpsert;
7072

7173
private final static SinkMetric sinkMetric = new SinkMetric().start();
7274

@@ -96,6 +98,7 @@ public synchronized void beforeFetch() throws IOException {
9698
Settings settings = context.getSettings();
9799
String index = settings.get("index", "jdbc");
98100
String type = settings.get("type", "jdbc");
101+
this.docAsUpsert = settings.getAsBoolean("docAsUpsert", false);
99102
if (clientAPI == null) {
100103
clientAPI = createClient(settings);
101104
if (clientAPI.client() != null) {
@@ -276,10 +279,11 @@ public void update(IndexableObject object) throws IOException {
276279
setId(object.id());
277280
}
278281
if (getId() == null) {
279-
return; // skip if no doc is specified to delete
282+
return; // skip if no doc is specified to update
280283
}
281284
UpdateRequest request = new UpdateRequest().index(this.index).type(this.type).id(getId()).doc(object.source());
282-
request.docAsUpsert(true);
285+
if (this.docAsUpsert)
286+
request.docAsUpsert(true);
283287

284288
if (object.meta(ControlKeys._version.name()) != null) {
285289
request.versionType(VersionType.EXTERNAL)
@@ -292,7 +296,7 @@ public void update(IndexableObject object) throws IOException {
292296
request.parent(object.meta(ControlKeys._parent.name()));
293297
}
294298
if (logger.isTraceEnabled()) {
295-
logger.trace("adding bulk update action {}/{}/{}", request.index(), request.type(), request.id());
299+
logger.trace("adding bulk update action {}/{}/{} docAsUpsert {}", request.index(), request.type(), request.id(), this.docAsUpsert);
296300
}
297301
clientAPI.bulkUpdate(request);
298302
}

0 commit comments

Comments
 (0)