Skip to content

Commit a665f40

Browse files
committed
时间增量抽取,格式化时间字符串空格被替换成%的问题修复;顺便修复一下DB2按照OWNERTYPE获取schemas不准确的问题,修改为直接排除系统schemas
1 parent 3ea9a5b commit a665f40

File tree

4 files changed

+69
-31
lines changed

4 files changed

+69
-31
lines changed

datax-admin/src/main/java/com/wugui/datax/admin/tool/meta/DB2DatabaseMeta.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public String getSQLQueryColumns(String... args) {
4747

4848
@Override
4949
public String getSQLQueryTableSchema(String... args) {
50-
return "SELECT SCHEMANAME FROM SYSCAT.SCHEMATA WHERE OWNERTYPE = 'U'";
50+
return "SELECT SCHEMANAME FROM SYSCAT.SCHEMATA WHERE SCHEMANAME NOT IN('NULLID','ROOT','SQLJ','SYSCAT','SYSFUN','SYSIBM','SYSIBMADM','SYSIBMINTERNAL','SYSIBMTS','SYSPROC','SYSPUBLIC','SYSSTAT','SYSTOOLS')";
5151
}
5252

5353
@Override

datax-executor/src/main/java/com/wugui/datax/executor/service/command/BuildCommand.java

Lines changed: 36 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,7 @@
1010

1111
import java.io.File;
1212
import java.text.SimpleDateFormat;
13-
import java.util.ArrayList;
14-
import java.util.Arrays;
15-
import java.util.Date;
16-
import java.util.List;
13+
import java.util.*;
1714

1815
import static com.wugui.datatx.core.util.Constants.SPLIT_COMMA;
1916
import static com.wugui.datax.executor.service.jobhandler.DataXConstant.*;
@@ -53,54 +50,64 @@ public static String[] buildDataXExecutorCmd(TriggerParam tgParam, String tmpFil
5350
private static String buildDataXParam(TriggerParam tgParam) {
5451
StringBuilder doc = new StringBuilder();
5552
String jvmParam = StringUtils.isNotBlank(tgParam.getJvmParam()) ? tgParam.getJvmParam().trim() : tgParam.getJvmParam();
56-
String partitionStr = tgParam.getPartitionInfo();
5753
if (StringUtils.isNotBlank(jvmParam)) {
5854
doc.append(JVM_CM).append(TRANSFORM_QUOTES).append(jvmParam).append(TRANSFORM_QUOTES);
5955
}
56+
return doc.toString();
57+
}
6058

59+
public static HashMap<String, String> buildDataXParamToMap(TriggerParam tgParam) {
60+
String partitionStr = tgParam.getPartitionInfo();
6161
Integer incrementType = tgParam.getIncrementType();
6262
String replaceParam = StringUtils.isNotBlank(tgParam.getReplaceParam()) ? tgParam.getReplaceParam().trim() : null;
63-
6463
if (incrementType != null && replaceParam != null) {
65-
66-
if (IncrementTypeEnum.TIME.getCode() == incrementType) {
67-
if (doc.length() > 0) doc.append(SPLIT_SPACE);
64+
if (IncrementTypeEnum.ID.getCode() == incrementType) {
65+
long startId = tgParam.getStartId();
66+
long endId = tgParam.getEndId();
67+
String formatParam = String.format(replaceParam, startId, endId);
68+
return getKeyValue(formatParam);
69+
} else if (IncrementTypeEnum.TIME.getCode() == incrementType) {
6870
String replaceParamType = tgParam.getReplaceParamType();
69-
70-
if (StringUtils.isBlank(replaceParamType) || replaceParamType.equals("Timestamp")) {
71+
if (StringUtils.isBlank(replaceParamType) || "Timestamp".equals(replaceParamType)) {
7172
long startTime = tgParam.getStartTime().getTime() / 1000;
7273
long endTime = tgParam.getTriggerTime().getTime() / 1000;
73-
doc.append(PARAMS_CM).append(TRANSFORM_QUOTES).append(String.format(replaceParam, startTime, endTime));
74+
String formatParam = String.format(replaceParam, startTime, endTime);
75+
return getKeyValue(formatParam);
7476
} else {
7577
SimpleDateFormat sdf = new SimpleDateFormat(replaceParamType);
76-
String endTime = sdf.format(tgParam.getTriggerTime()).replaceAll(SPLIT_SPACE, PERCENT);
77-
String startTime = sdf.format(tgParam.getStartTime()).replaceAll(SPLIT_SPACE, PERCENT);
78-
doc.append(PARAMS_CM).append(TRANSFORM_QUOTES).append(String.format(replaceParam, startTime, endTime));
79-
}
80-
//buildPartitionCM(doc, partitionStr);
81-
doc.append(TRANSFORM_QUOTES);
78+
String endTime = sdf.format(tgParam.getTriggerTime());
79+
String startTime = sdf.format(tgParam.getStartTime());
8280

83-
} else if (IncrementTypeEnum.ID.getCode() == incrementType) {
84-
long startId = tgParam.getStartId();
85-
long endId = tgParam.getEndId();
86-
if (doc.length() > 0) doc.append(SPLIT_SPACE);
87-
doc.append(PARAMS_CM).append(TRANSFORM_QUOTES).append(String.format(replaceParam, startId, endId));
88-
doc.append(TRANSFORM_QUOTES);
81+
String formatParam = String.format(replaceParam, startTime, endTime);
82+
return getKeyValue(formatParam);
83+
}
8984
}
9085
}
9186

9287
if (incrementType != null && IncrementTypeEnum.PARTITION.getCode() == incrementType) {
9388
if (StringUtils.isNotBlank(partitionStr)) {
9489
List<String> partitionInfo = Arrays.asList(partitionStr.split(SPLIT_COMMA));
95-
if (doc.length() > 0) doc.append(SPLIT_SPACE);
96-
doc.append(PARAMS_CM).append(TRANSFORM_QUOTES).append(String.format(PARAMS_CM_V_PT, buildPartition(partitionInfo))).append(TRANSFORM_QUOTES);
90+
String formatParam = String.format(PARAMS_CM_V_PT, buildPartition(partitionInfo));
91+
return getKeyValue(formatParam);
9792
}
98-
}
9993

100-
JobLogger.log("------------------Command parameters:" + doc);
101-
return doc.toString();
94+
}
95+
return null;
10296
}
10397

98+
private static HashMap<String, String> getKeyValue(String formatParam) {
99+
String[] paramArr = formatParam.split(PARAMS_SYSTEM);
100+
HashMap<String, String> map = new HashMap<String, String>();
101+
102+
for (String param : paramArr) {
103+
if (StringUtils.isNotBlank(param)) {
104+
param = param.trim();
105+
String[] keyValue = param.split(PARAMS_EQUALS);
106+
map.put(keyValue[0], keyValue[1]);
107+
}
108+
}
109+
return map;
110+
}
104111

105112
private void buildPartitionCM(StringBuilder doc, String partitionStr) {
106113
if (StringUtils.isNotBlank(partitionStr)) {

datax-executor/src/main/java/com/wugui/datax/executor/service/jobhandler/DataXConstant.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ public class DataXConstant {
1919

2020
public static final String PARAMS_CM = "-p";
2121

22+
public static final String PARAMS_SYSTEM = "-D";
23+
public static final String PARAMS_EQUALS = "=";
24+
2225
public static final String PARAMS_CM_V_PT = "-Dpartition=%s";
2326

2427
public static final String DEFAULT_JSON = "jsons";

datax-executor/src/main/java/com/wugui/datax/executor/service/jobhandler/ExecutorJobHandler.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,14 @@
1717
import org.springframework.stereotype.Component;
1818

1919
import java.io.*;
20+
import java.util.HashMap;
21+
import java.util.Map;
2022
import java.util.concurrent.FutureTask;
23+
import java.util.regex.Matcher;
24+
import java.util.regex.Pattern;
2125

2226
import static com.wugui.datax.executor.service.command.BuildCommand.buildDataXExecutorCmd;
27+
import static com.wugui.datax.executor.service.command.BuildCommand.buildDataXParamToMap;
2328
import static com.wugui.datax.executor.service.jobhandler.DataXConstant.DEFAULT_JSON;
2429
import static com.wugui.datax.executor.service.logparse.AnalysisStatistics.analysisStatisticsLog;
2530

@@ -39,6 +44,7 @@ public class ExecutorJobHandler extends IJobHandler {
3944
@Value("${datax.pypath}")
4045
private String dataXPyPath;
4146

47+
private static final Pattern VARIABLE_PATTERN = Pattern.compile("(\\$)\\{?(\\w+)\\}?");
4248

4349
@Override
4450
public ReturnT<String> execute(TriggerParam trigger) {
@@ -47,8 +53,12 @@ public ReturnT<String> execute(TriggerParam trigger) {
4753
Thread errThread = null;
4854
String tmpFilePath;
4955
LogStatistics logStatistics = null;
56+
57+
HashMap<String, String> keyValueMap = buildDataXParamToMap(trigger);
58+
String jobJson = replaceVariable(trigger.getJobJson(),keyValueMap);
59+
5060
//Generate JSON temporary file
51-
tmpFilePath = generateTemJsonFile(trigger.getJobJson());
61+
tmpFilePath = generateTemJsonFile(jobJson);
5262

5363
try {
5464
String[] cmdarrayFinal = buildDataXExecutorCmd(trigger, tmpFilePath,dataXPyPath);
@@ -97,7 +107,25 @@ public ReturnT<String> execute(TriggerParam trigger) {
97107
}
98108
}
99109

110+
public static String replaceVariable(final String param,HashMap<String, String> variableMap) {
111+
Map<String, String> mapping = new HashMap<String, String>();
100112

113+
Matcher matcher = VARIABLE_PATTERN.matcher(param);
114+
while (matcher.find()) {
115+
String variable = matcher.group(2);
116+
String value = variableMap.get(variable);
117+
if (StringUtils.isBlank(value)) {
118+
value = matcher.group();
119+
}
120+
mapping.put(matcher.group(), value);
121+
}
122+
123+
String retString = param;
124+
for (final String key : mapping.keySet()) {
125+
retString = retString.replace(key, mapping.get(key));
126+
}
127+
return retString;
128+
}
101129

102130
private String generateTemJsonFile(String jobJson) {
103131
String tmpFilePath;

0 commit comments

Comments
 (0)