Skip to content

Commit 6e56688

Browse files
authored
[Optimize]统一DB元信息更新格式-Part1 (didi#1125)
1、引入KafkaMetaService; 2、将Connector的更新按照KafkaMetaService进行更新; 3、简化Connect-MirrorMaker的关联逻辑; 4、GroupService创建的AdminClient中的ClientID增加时间戳,减少Mbean冲突;
1 parent a6abfb3 commit 6e56688

File tree

12 files changed

+652
-538
lines changed

12 files changed

+652
-538
lines changed

km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/connector/impl/ConnectorManagerImpl.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import com.xiaojukeji.know.streaming.km.common.bean.vo.connect.connector.ConnectorStateVO;
1313
import com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant;
1414
import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService;
15+
import com.xiaojukeji.know.streaming.km.core.service.connect.connector.OpConnectorService;
1516
import com.xiaojukeji.know.streaming.km.core.service.connect.plugin.PluginService;
1617
import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerConnectorService;
1718
import org.apache.kafka.connect.runtime.AbstractStatus;
@@ -30,6 +31,9 @@ public class ConnectorManagerImpl implements ConnectorManager {
3031
@Autowired
3132
private ConnectorService connectorService;
3233

34+
@Autowired
35+
private OpConnectorService opConnectorService;
36+
3337
@Autowired
3438
private WorkerConnectorService workerConnectorService;
3539

@@ -44,37 +48,37 @@ public Result<Void> updateConnectorConfig(Long connectClusterId, String connecto
4448
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "Connector参数错误");
4549
}
4650

47-
return connectorService.updateConnectorConfig(connectClusterId, connectorName, configs, operator);
51+
return opConnectorService.updateConnectorConfig(connectClusterId, connectorName, configs, operator);
4852
}
4953

5054
@Override
5155
public Result<Void> createConnector(ConnectorCreateDTO dto, String operator) {
5256
dto.getSuitableConfig().put(KafkaConnectConstant.MIRROR_MAKER_NAME_FIELD_NAME, dto.getConnectorName());
5357

54-
Result<KSConnectorInfo> createResult = connectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getSuitableConfig(), operator);
58+
Result<KSConnectorInfo> createResult = opConnectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getSuitableConfig(), operator);
5559
if (createResult.failed()) {
5660
return Result.buildFromIgnoreData(createResult);
5761
}
5862

59-
Result<KSConnector> ksConnectorResult = connectorService.getAllConnectorInfoFromCluster(dto.getConnectClusterId(), dto.getConnectorName());
63+
Result<KSConnector> ksConnectorResult = connectorService.getConnectorFromKafka(dto.getConnectClusterId(), dto.getConnectorName());
6064
if (ksConnectorResult.failed()) {
6165
return Result.buildFromRSAndMsg(ResultStatus.SUCCESS, "创建成功,但是获取元信息失败,页面元信息会存在1分钟延迟");
6266
}
6367

64-
connectorService.addNewToDB(ksConnectorResult.getData());
68+
opConnectorService.addNewToDB(ksConnectorResult.getData());
6569
return Result.buildSuc();
6670
}
6771

6872
@Override
6973
public Result<Void> createConnector(ConnectorCreateDTO dto, String heartbeatName, String checkpointName, String operator) {
7074
dto.getSuitableConfig().put(KafkaConnectConstant.MIRROR_MAKER_NAME_FIELD_NAME, dto.getConnectorName());
7175

72-
Result<KSConnectorInfo> createResult = connectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getSuitableConfig(), operator);
76+
Result<KSConnectorInfo> createResult = opConnectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getSuitableConfig(), operator);
7377
if (createResult.failed()) {
7478
return Result.buildFromIgnoreData(createResult);
7579
}
7680

77-
Result<KSConnector> ksConnectorResult = connectorService.getAllConnectorInfoFromCluster(dto.getConnectClusterId(), dto.getConnectorName());
81+
Result<KSConnector> ksConnectorResult = connectorService.getConnectorFromKafka(dto.getConnectClusterId(), dto.getConnectorName());
7882
if (ksConnectorResult.failed()) {
7983
return Result.buildFromRSAndMsg(ResultStatus.SUCCESS, "创建成功,但是获取元信息失败,页面元信息会存在1分钟延迟");
8084
}
@@ -83,7 +87,7 @@ public Result<Void> createConnector(ConnectorCreateDTO dto, String heartbeatName
8387
connector.setCheckpointConnectorName(checkpointName);
8488
connector.setHeartbeatConnectorName(heartbeatName);
8589

86-
connectorService.addNewToDB(connector);
90+
opConnectorService.addNewToDB(connector);
8791
return Result.buildSuc();
8892
}
8993

km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/connect/mm2/impl/MirrorMakerManagerImpl.java

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
3838
import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClusterService;
3939
import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService;
40+
import com.xiaojukeji.know.streaming.km.core.service.connect.connector.OpConnectorService;
4041
import com.xiaojukeji.know.streaming.km.core.service.connect.mm2.MirrorMakerMetricService;
4142
import com.xiaojukeji.know.streaming.km.core.service.connect.plugin.PluginService;
4243
import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerConnectorService;
@@ -67,6 +68,9 @@ public class MirrorMakerManagerImpl implements MirrorMakerManager {
6768
@Autowired
6869
private ConnectorService connectorService;
6970

71+
@Autowired
72+
private OpConnectorService opConnectorService;
73+
7074
@Autowired
7175
private WorkerConnectorService workerConnectorService;
7276

@@ -156,20 +160,20 @@ public Result<Void> deleteMirrorMaker(Long connectClusterId, String sourceConnec
156160

157161
Result<Void> rv = Result.buildSuc();
158162
if (!ValidateUtils.isBlank(connectorPO.getCheckpointConnectorName())) {
159-
rv = connectorService.deleteConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator);
163+
rv = opConnectorService.deleteConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator);
160164
}
161165
if (rv.failed()) {
162166
return rv;
163167
}
164168

165169
if (!ValidateUtils.isBlank(connectorPO.getHeartbeatConnectorName())) {
166-
rv = connectorService.deleteConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator);
170+
rv = opConnectorService.deleteConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator);
167171
}
168172
if (rv.failed()) {
169173
return rv;
170174
}
171175

172-
return connectorService.deleteConnector(connectClusterId, sourceConnectorName, operator);
176+
return opConnectorService.deleteConnector(connectClusterId, sourceConnectorName, operator);
173177
}
174178

175179
@Override
@@ -181,20 +185,20 @@ public Result<Void> modifyMirrorMakerConfig(MirrorMakerCreateDTO dto, String ope
181185

182186
Result<Void> rv = Result.buildSuc();
183187
if (!ValidateUtils.isBlank(connectorPO.getCheckpointConnectorName()) && dto.getCheckpointConnectorConfigs() != null) {
184-
rv = connectorService.updateConnectorConfig(dto.getConnectClusterId(), connectorPO.getCheckpointConnectorName(), dto.getCheckpointConnectorConfigs(), operator);
188+
rv = opConnectorService.updateConnectorConfig(dto.getConnectClusterId(), connectorPO.getCheckpointConnectorName(), dto.getCheckpointConnectorConfigs(), operator);
185189
}
186190
if (rv.failed()) {
187191
return rv;
188192
}
189193

190194
if (!ValidateUtils.isBlank(connectorPO.getHeartbeatConnectorName()) && dto.getHeartbeatConnectorConfigs() != null) {
191-
rv = connectorService.updateConnectorConfig(dto.getConnectClusterId(), connectorPO.getHeartbeatConnectorName(), dto.getHeartbeatConnectorConfigs(), operator);
195+
rv = opConnectorService.updateConnectorConfig(dto.getConnectClusterId(), connectorPO.getHeartbeatConnectorName(), dto.getHeartbeatConnectorConfigs(), operator);
192196
}
193197
if (rv.failed()) {
194198
return rv;
195199
}
196200

197-
return connectorService.updateConnectorConfig(dto.getConnectClusterId(), dto.getConnectorName(), dto.getSuitableConfig(), operator);
201+
return opConnectorService.updateConnectorConfig(dto.getConnectClusterId(), dto.getConnectorName(), dto.getSuitableConfig(), operator);
198202
}
199203

200204
@Override
@@ -206,20 +210,20 @@ public Result<Void> restartMirrorMaker(Long connectClusterId, String sourceConne
206210

207211
Result<Void> rv = Result.buildSuc();
208212
if (!ValidateUtils.isBlank(connectorPO.getCheckpointConnectorName())) {
209-
rv = connectorService.restartConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator);
213+
rv = opConnectorService.restartConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator);
210214
}
211215
if (rv.failed()) {
212216
return rv;
213217
}
214218

215219
if (!ValidateUtils.isBlank(connectorPO.getHeartbeatConnectorName())) {
216-
rv = connectorService.restartConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator);
220+
rv = opConnectorService.restartConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator);
217221
}
218222
if (rv.failed()) {
219223
return rv;
220224
}
221225

222-
return connectorService.restartConnector(connectClusterId, sourceConnectorName, operator);
226+
return opConnectorService.restartConnector(connectClusterId, sourceConnectorName, operator);
223227
}
224228

225229
@Override
@@ -231,20 +235,20 @@ public Result<Void> stopMirrorMaker(Long connectClusterId, String sourceConnecto
231235

232236
Result<Void> rv = Result.buildSuc();
233237
if (!ValidateUtils.isBlank(connectorPO.getCheckpointConnectorName())) {
234-
rv = connectorService.stopConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator);
238+
rv = opConnectorService.stopConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator);
235239
}
236240
if (rv.failed()) {
237241
return rv;
238242
}
239243

240244
if (!ValidateUtils.isBlank(connectorPO.getHeartbeatConnectorName())) {
241-
rv = connectorService.stopConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator);
245+
rv = opConnectorService.stopConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator);
242246
}
243247
if (rv.failed()) {
244248
return rv;
245249
}
246250

247-
return connectorService.stopConnector(connectClusterId, sourceConnectorName, operator);
251+
return opConnectorService.stopConnector(connectClusterId, sourceConnectorName, operator);
248252
}
249253

250254
@Override
@@ -256,20 +260,20 @@ public Result<Void> resumeMirrorMaker(Long connectClusterId, String sourceConnec
256260

257261
Result<Void> rv = Result.buildSuc();
258262
if (!ValidateUtils.isBlank(connectorPO.getCheckpointConnectorName())) {
259-
rv = connectorService.resumeConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator);
263+
rv = opConnectorService.resumeConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator);
260264
}
261265
if (rv.failed()) {
262266
return rv;
263267
}
264268

265269
if (!ValidateUtils.isBlank(connectorPO.getHeartbeatConnectorName())) {
266-
rv = connectorService.resumeConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator);
270+
rv = opConnectorService.resumeConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator);
267271
}
268272
if (rv.failed()) {
269273
return rv;
270274
}
271275

272-
return connectorService.resumeConnector(connectClusterId, sourceConnectorName, operator);
276+
return opConnectorService.resumeConnector(connectClusterId, sourceConnectorName, operator);
273277
}
274278

275279
@Override

km-collector/src/main/java/com/xiaojukeji/know/streaming/km/collector/metric/connect/ConnectConnectorMetricCollector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public List<ConnectorMetrics> collectConnectMetrics(ConnectCluster connectCluste
4444
Long connectClusterId = connectCluster.getId();
4545

4646
List<VersionControlItem> items = versionControlService.listVersionControlItem(this.getClusterVersion(connectCluster), collectorType().getCode());
47-
Result<List<String>> connectorList = connectorService.listConnectorsFromCluster(connectClusterId);
47+
Result<List<String>> connectorList = connectorService.listConnectorsFromCluster(connectCluster);
4848

4949
FutureWaitUtil<Void> future = this.getFutureUtilByClusterPhyId(connectClusterId);
5050

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package com.xiaojukeji.know.streaming.km.common.bean.entity.meta;
2+
3+
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
4+
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster;
5+
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
6+
import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
7+
8+
import java.util.ArrayList;
9+
import java.util.HashSet;
10+
import java.util.List;
11+
import java.util.Set;
12+
13+
/**
14+
* Kafka元信息服务接口
15+
*/
16+
public interface KafkaMetaService<T> {
17+
/**
18+
* 从Kafka中获取数据
19+
* @param connectCluster connect集群
20+
* @return 全部资源列表, 成功的资源列表
21+
*/
22+
default Result<Tuple<Set<String>, List<T>>> getDataFromKafka(ConnectCluster connectCluster) { return Result.buildSuc(new Tuple<>(new HashSet<>(), new ArrayList<>())); }
23+
24+
/**
25+
* 从Kafka中获取数据
26+
* @param clusterPhy kafka集群
27+
* @return 全部资源集合, 成功的资源列表
28+
*/
29+
default Result<Tuple<Set<String>, List<T>>> getDataFromKafka(ClusterPhy clusterPhy) { return Result.buildSuc(new Tuple<>(new HashSet<>(), new ArrayList<>())); }
30+
31+
/**
32+
* 元信息同步至DB中
33+
* @param clusterId 集群ID
34+
* @param fullNameSet 全部资源列表
35+
* @param dataList 成功的资源列表
36+
*/
37+
default void writeToDB(Long clusterId, Set<String> fullNameSet, List<T> dataList) {}
38+
39+
/**
40+
* 依据kafka集群ID删除数据
41+
* @param clusterPhyId kafka集群ID
42+
*/
43+
default int deleteInDBByKafkaClusterId(Long clusterPhyId) { return 0; }
44+
}

km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/ConnectConverter.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
import com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant;
1717
import com.xiaojukeji.know.streaming.km.common.utils.CommonUtils;
1818
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
19+
import com.xiaojukeji.know.streaming.km.common.utils.Triple;
20+
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
1921

2022
import java.util.ArrayList;
2123
import java.util.HashMap;
@@ -24,6 +26,9 @@
2426
import java.util.function.Function;
2527
import java.util.stream.Collectors;
2628

29+
import static com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant.MIRROR_MAKER_SOURCE_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME;
30+
import static com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant.MIRROR_MAKER_TARGET_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME;
31+
2732
public class ConnectConverter {
2833
public static ConnectorBasicCombineExistVO convert2BasicVO(ConnectCluster connectCluster, ConnectorPO connectorPO) {
2934
ConnectorBasicCombineExistVO vo = new ConnectorBasicCombineExistVO();
@@ -153,6 +158,66 @@ public static KSConnector convert2KSConnector(Long kafkaClusterPhyId, Long conne
153158
return ksConnector;
154159
}
155160

161+
public static List<KSConnector> convertAndSupplyMirrorMakerInfo(ConnectCluster connectCluster, List<Triple<KSConnectorInfo, List<String>, KSConnectorStateInfo>> connectorFullInfoList) {
162+
// <connectorName, targetBootstrapServers + "@" + sourceBootstrapServers>
163+
Map<String, String> sourceMap = new HashMap<>();
164+
165+
// <targetBootstrapServers + "@" + sourceBootstrapServers, connectorName>
166+
Map<String, String> heartbeatMap = new HashMap<>();
167+
Map<String, String> checkpointMap = new HashMap<>();
168+
169+
// 获取每个类型的connector的map信息
170+
connectorFullInfoList.forEach(connector -> {
171+
Map<String, String> mm2Map = null;
172+
if (KafkaConnectConstant.MIRROR_MAKER_SOURCE_CONNECTOR_TYPE.equals(connector.v1().getConfig().get(KafkaConnectConstant.CONNECTOR_CLASS_FILED_NAME))) {
173+
mm2Map = sourceMap;
174+
} else if (KafkaConnectConstant.MIRROR_MAKER_HEARTBEAT_CONNECTOR_TYPE.equals(connector.v1().getConfig().get(KafkaConnectConstant.CONNECTOR_CLASS_FILED_NAME))) {
175+
mm2Map = heartbeatMap;
176+
} else if (KafkaConnectConstant.MIRROR_MAKER_CHECKPOINT_CONNECTOR_TYPE.equals(connector.v1().getConfig().get(KafkaConnectConstant.CONNECTOR_CLASS_FILED_NAME))) {
177+
mm2Map = checkpointMap;
178+
}
179+
180+
String targetBootstrapServers = connector.v1().getConfig().get(MIRROR_MAKER_TARGET_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME);
181+
String sourceBootstrapServers = connector.v1().getConfig().get(MIRROR_MAKER_SOURCE_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME);
182+
183+
if (ValidateUtils.anyBlank(targetBootstrapServers, sourceBootstrapServers) || mm2Map == null) {
184+
return;
185+
}
186+
187+
if (KafkaConnectConstant.MIRROR_MAKER_SOURCE_CONNECTOR_TYPE.equals(connector.v1().getConfig().get(KafkaConnectConstant.CONNECTOR_CLASS_FILED_NAME))) {
188+
// source 类型的格式和 heartbeat & checkpoint 的不一样
189+
mm2Map.put(connector.v1().getName(), targetBootstrapServers + "@" + sourceBootstrapServers);
190+
} else {
191+
mm2Map.put(targetBootstrapServers + "@" + sourceBootstrapServers, connector.v1().getName());
192+
}
193+
});
194+
195+
196+
List<KSConnector> connectorList = new ArrayList<>();
197+
connectorFullInfoList.forEach(connector -> {
198+
// 转化并添加到list中
199+
KSConnector ksConnector = ConnectConverter.convert2KSConnector(
200+
connectCluster.getKafkaClusterPhyId(),
201+
connectCluster.getId(),
202+
connector.v1(),
203+
connector.v3(),
204+
connector.v2()
205+
);
206+
connectorList.add(ksConnector);
207+
208+
// 补充mm2信息
209+
String targetAndSource = sourceMap.get(ksConnector.getConnectorName());
210+
if (ValidateUtils.isBlank(targetAndSource)) {
211+
return;
212+
}
213+
214+
ksConnector.setHeartbeatConnectorName(heartbeatMap.getOrDefault(targetAndSource, ""));
215+
ksConnector.setCheckpointConnectorName(checkpointMap.getOrDefault(targetAndSource, ""));
216+
});
217+
218+
return connectorList;
219+
}
220+
156221
private static String genConnectorKey(Long connectorId, String connectorName){
157222
return connectorId + "#" + connectorName;
158223
}

0 commit comments

Comments
 (0)