Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ RUN rm -rf /app/redis-manager/redis-manager-dashboard/src/main/resources/templat

FROM openjdk:8-jre-alpine
WORKDIR /app/redis-manager
COPY --from=dashboard_package /app/lib/ /app/redis-manager/lib/
COPY --from=dashboard_package /app/redis-manager/* /app/redis-manager/

COPY --from=dashboard_package /app/redis-manager /app/redis-manager
COPY --from=dashboard_package /app/lib /app/redis-manager/lib

ENTRYPOINT ["sh","./redis-manager-start.sh"]
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.newegg.ec.redis.plugin.rct.cache.AppCache;
import com.newegg.ec.redis.plugin.rct.report.EmailSendReport;
import com.newegg.ec.redis.service.impl.RdbAnalyzeResultService;
import com.newegg.ec.redis.service.impl.RdbAnalyzeService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.ResponseEntity;
Expand All @@ -26,17 +27,17 @@
*/
public class AnalyzerStatusThread implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(AnalyzerStatusThread.class);
private List<ScheduleDetail> scheduleDetails = new ArrayList<>();
private List<AnalyzeInstance> analyzeInstances = new ArrayList<>();
private List<ScheduleDetail> scheduleDetails;
private List<AnalyzeInstance> analyzeInstances;
private RestTemplate restTemplate;
private RDBAnalyze rdbAnalyze;
private RCTConfig.Email emailInfo;
private RdbAnalyzeResultService rdbAnalyzeResultService;
private RdbAnalyzeService rdbAnalyzeService;

public AnalyzerStatusThread(List<AnalyzeInstance> analyzeInstances, RestTemplate restTemplate,
RDBAnalyze rdbAnalyze, RCTConfig.Email emailInfo, RdbAnalyzeResultService rdbAnalyzeResultService) {
this.scheduleDetails = AppCache.scheduleDetailMap.get(rdbAnalyze.getId());
this.analyzeInstances = analyzeInstances;
public AnalyzerStatusThread(RdbAnalyzeService rdbAnalyzeService, RestTemplate restTemplate,
RDBAnalyze rdbAnalyze, RCTConfig.Email emailInfo, RdbAnalyzeResultService rdbAnalyzeResultService) {
this.rdbAnalyzeService = rdbAnalyzeService;
this.restTemplate = restTemplate;
this.rdbAnalyze = rdbAnalyze;
this.emailInfo = emailInfo;
Expand All @@ -45,6 +46,18 @@ public AnalyzerStatusThread(List<AnalyzeInstance> analyzeInstances, RestTemplate

@Override
public void run() {
JSONObject res = rdbAnalyzeService.assignAnalyzeJob(rdbAnalyze);

if(res.containsKey("status") && !res.getBoolean("status")){
LOG.warn("Assign job fail.");
return;
}
this.analyzeInstances = (List<AnalyzeInstance>)res.get("needAnalyzeInstances");
if(analyzeInstances == null || analyzeInstances.isEmpty()){
LOG.warn("Analyze instances is empty.");
return;
}
scheduleDetails = AppCache.scheduleDetailMap.get(rdbAnalyze.getId());
// 获取所有analyzer运行状态
while (AppCache.isNeedAnalyzeStastus(rdbAnalyze.getId())) {

Expand All @@ -64,6 +77,14 @@ public void run() {
}
}

AppCache.scheduleDetailMap.get(rdbAnalyze.getId()).forEach(s -> {
if(AnalyzeStatus.ERROR.equals(s.getStatus())){
rdbAnalyzeService.deleteResult(rdbAnalyze, s.getScheduleID());
return;
}
});


// 当所有analyzer运行完成,获取所有analyzer报表分析结果
if (AppCache.isAnalyzeComplete(rdbAnalyze)) {
Map<String, Set<String>> reportData = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;

import com.newegg.ec.redis.client.RedisClient;
import com.newegg.ec.redis.client.RedisClientFactory;
import com.newegg.ec.redis.config.RCTConfig;
Expand All @@ -13,7 +12,6 @@
import com.newegg.ec.redis.plugin.rct.thread.AnalyzerStatusThread;
import com.newegg.ec.redis.schedule.RDBScheduleJob;
import com.newegg.ec.redis.service.IRdbAnalyzeService;

import com.newegg.ec.redis.util.EurekaUtil;
import org.apache.commons.lang.StringUtils;
import org.quartz.SchedulerException;
Expand All @@ -29,15 +27,8 @@
import redis.clients.jedis.Jedis;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.rmi.server.ExportException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.Map.Entry;
import java.util.Set;


@Service
Expand Down Expand Up @@ -91,6 +82,19 @@ public JSONObject allocationRDBAnalyzeJob(RDBAnalyze rdbAnalyze) {
responseResult.put("message", "There is a task in progress!");
return responseResult;
}

// 启动状态监听线程,检查各个分析器分析状态和进程,分析完毕后,写数据库,发送邮件
Thread analyzerStatusThread = new Thread(new AnalyzerStatusThread(this, restTemplate,
rdbAnalyze, config.getEmail(), rdbAnalyzeResultService));
analyzerStatusThread.start();

responseResult.put("status", true);
return responseResult;
}

// 执行RCT任务分发
public JSONObject assignAnalyzeJob(RDBAnalyze rdbAnalyze){
JSONObject responseResult = new JSONObject();
int[] analyzer = null;
if (rdbAnalyze.getAnalyzer().contains(",")) {
String[] str = rdbAnalyze.getAnalyzer().split(",");
Expand All @@ -109,7 +113,9 @@ public JSONObject allocationRDBAnalyzeJob(RDBAnalyze rdbAnalyze) {
String redisHost = cluster.getNodes().split(",")[0].split(":")[0];
String port = cluster.getNodes().split(",")[0].split(":")[1];

// 集群中所有的host
Map<String, String> clusterNodesIP = new HashMap<>();
// 最终需要分析的机器及端口
Map<String, Set<String>> generateRule = new HashMap<>();
try {
if (config.isDevEnable()) {
Expand All @@ -119,11 +125,13 @@ public JSONObject allocationRDBAnalyzeJob(RDBAnalyze rdbAnalyze) {
generateRule.put(InetAddress.getLocalHost().getHostAddress(), set);
rdbAnalyze.setDataPath(config.getDevRDBPath());
} else {
// 获取集群中有哪些节点进行分析
if (StringUtils.isNotBlank(cluster.getRedisPassword())) {
redisClient = RedisClientFactory.buildRedisClient(new RedisNode(redisHost,Integer.parseInt(port)),cluster.getRedisPassword());
} else {
redisClient = RedisClientFactory.buildRedisClient(new RedisNode(redisHost,Integer.parseInt(port)),null);
}
// 集群中是否指定节点进行分析
if(rdbAnalyze.getNodes() != null && !"-1".equalsIgnoreCase(rdbAnalyze.getNodes().get(0))){
//指定节点进行分析
List<String> nodeList = rdbAnalyze.getNodes();
Expand Down Expand Up @@ -169,18 +177,20 @@ public JSONObject allocationRDBAnalyzeJob(RDBAnalyze rdbAnalyze) {
AnalyzeInstance analyzeInstance = analyzeInstancesMap.get(host);
if (analyzeInstance == null) {
LOG.error("analyzeInstance inactive. ip:{}", host);
responseResult.put("status", false);
responseResult.put("status", Boolean.FALSE);
responseResult.put("message", host + " analyzeInstance inactive!");
return responseResult;
}
needAnalyzeInstances.add(analyzeInstance);
}
// 保存分析job到数据库
saveToResult(rdbAnalyze,scheduleID);
for (String host : clusterNodesIP.keySet()) {
responseResult.put("needAnalyzeInstances",needAnalyzeInstances);

for (String host : clusterNodesIP.keySet()) {
// 处理无RDB备份策略情况
if ((!config.isDevEnable()) && config.isRdbGenerateEnable()
&& generateRule.containsKey(host)) {
&& generateRule.containsKey(host)) {

Set<String> ports = generateRule.get(host);
ports.forEach(p -> {
Expand Down Expand Up @@ -211,14 +221,13 @@ public JSONObject allocationRDBAnalyzeJob(RDBAnalyze rdbAnalyze) {
}

// 如果某个服务器上没有分配到分析节点,则直接跳过
if(generateRule.containsKey(host)){
if(!generateRule.containsKey(host)){
continue;
}

AnalyzeInstance analyzeInstance = analyzeInstancesMap.get(host);

String url = "http://" + host + ":" + analyzeInstance.getPort() + "/receivedSchedule";

// String url = "http://127.0.0.1:8082/receivedSchedule";
ScheduleInfo scheduleInfo = new ScheduleInfo(scheduleID, rdbAnalyze.getDataPath(), rdbAnalyze.getPrefixes(),
generateRule.get(host), analyzer);
Expand All @@ -239,14 +248,14 @@ public JSONObject allocationRDBAnalyzeJob(RDBAnalyze rdbAnalyze) {
LOG.error("allocation {} scheduleJob response error. responseMessage:{}",
analyzeInstance.toString(), responseMessage.toJSONString());
deleteResult(rdbAnalyze,scheduleID);
scheduleResult.put("status", false);
responseResult.put("status", false);
scheduleResult.put("status", Boolean.FALSE);
responseResult.put("status", Boolean.FALSE);
responseResult.put("message", "allocation " + analyzeInstance.getHost()
+ " scheduleJob response error:" + responseMessage.toJSONString());

return responseResult;
} else {
scheduleResult.put("status", true);
scheduleResult.put("status", Boolean.TRUE);
}

} catch (RestClientException e) {
Expand Down Expand Up @@ -274,32 +283,36 @@ public JSONObject allocationRDBAnalyzeJob(RDBAnalyze rdbAnalyze) {
}
AppCache.scheduleDetailMap.put(rdbAnalyze.getId(), scheduleDetails);

// 启动状态监听线程,检查各个分析器分析状态和进程,分析完毕后,写数据库,发送邮件
Thread analyzerStatusThread = new Thread(new AnalyzerStatusThread(needAnalyzeInstances, restTemplate,
rdbAnalyze, config.getEmail(), rdbAnalyzeResultService));
analyzerStatusThread.start();
if (!config.isDevEnable()) {
for (Entry<String, Set<String>> map : generateRule.entrySet()) {
String ip = map.getKey();
for (String ports : map.getValue()) {
Jedis jedis = null;
try {
jedis = new Jedis(ip, Integer.parseInt(ports));
AppCache.keyCountMap.put(ip + ":" + ports, Float.parseFloat(String.valueOf(jedis.dbSize())));
} catch (Exception e) {
LOG.error("jedis get db size has error!", e);
} finally {
if (jedis != null) {
jedis.close();
}
// 设置db数据量
cacheDBSize(generateRule);
}
return responseResult;
}

// 将每个需要分析的redis实例的key数量写入到缓存中
private void cacheDBSize(Map<String, Set<String>> generateRule){
for (Entry<String, Set<String>> map : generateRule.entrySet()) {
String ip = map.getKey();
for (String ports : map.getValue()) {
Jedis jedis = null;
try {
jedis = new Jedis(ip, Integer.parseInt(ports));
AppCache.keyCountMap.put(ip + ":" + ports, Float.parseFloat(String.valueOf(jedis.dbSize())));
} catch (Exception e) {
LOG.error("jedis get db size has error!", e);
} finally {
if (jedis != null) {
jedis.close();
}
}
}
}
responseResult.put("status", true);
return responseResult;
}




private void saveToResult(RDBAnalyze rdbAnalyze,Long scheduleId){
RDBAnalyzeResult rdbAnalyzeResult = new RDBAnalyzeResult();
rdbAnalyzeResult.setAnalyzeConfig(JSONObject.toJSONString(rdbAnalyze));
Expand All @@ -310,7 +323,7 @@ private void saveToResult(RDBAnalyze rdbAnalyze,Long scheduleId){
rdbAnalyzeResultService.add(rdbAnalyzeResult);
}

private void deleteResult(RDBAnalyze rdbAnalyze,Long scheduleId){
public void deleteResult(RDBAnalyze rdbAnalyze,Long scheduleId){
Map<String,Long> map = new HashMap<>();
map.put("cluster_id",rdbAnalyze.getClusterId());
map.put("schedule_id",scheduleId);
Expand Down

Large diffs are not rendered by default.

Binary file not shown.

Large diffs are not rendered by default.

This file was deleted.

Binary file not shown.

This file was deleted.

This file was deleted.

Binary file not shown.

This file was deleted.

Loading