Skip to content

[PDR-16375][feat(sdk)]: 采集框架相关功能点补充 #82

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: sdk-v2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions app/java/src/main/java/io/qiniu/StartUpApplicationRunner.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package io.qiniu;

import com.qiniu.pandora.common.QiniuException;
import com.qiniu.pandora.util.StringUtils;
import io.qiniu.configuration.PandoraProperties;
import io.qiniu.service.PandoraService;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
Expand All @@ -20,6 +21,7 @@
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;

@Component
@Order(1)
Expand Down Expand Up @@ -94,11 +96,9 @@ private void setId() throws IOException {
Path idPath = Paths.get(ID_PATH);

String id = properties.getId();
BufferedReader reader = Files.newBufferedReader(idPath);
String content = reader.readLine();

if (id != null) {
content = id;
File file = idPath.toFile();
if (file.exists() && ObjectUtils.isEmpty(id)) {
id = StringUtils.utf8String(Files.readAllBytes(idPath));
}

// generate a new id and save it to file
Expand All @@ -112,7 +112,7 @@ private void setId() throws IOException {
properties.getServerAddress(),
properties.getServerPort(),
properties.getPandoraToken(),
content);
id);

if (id == null) {
throw new QiniuException("start failed, worker id is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class CollectorTask {

private Map<String, Object> config; // collector config

private Map<String, Object> meta; // ex. runner meta data
private String meta; // ex. runner meta data

private long createTime;

Expand Down Expand Up @@ -61,7 +61,7 @@ public CollectorTask(
boolean enabled,
boolean status,
Map<String, Object> config,
Map<String, Object> meta,
String meta,
long createTime,
long updateTime) {
this.id = id;
Expand Down Expand Up @@ -174,11 +174,11 @@ public void setConfig(Map<String, Object> config) {
this.config = config;
}

public Map<String, Object> getMeta() {
public String getMeta() {
return meta;
}

public void setMeta(Map<String, Object> meta) {
public void setMeta(String meta) {
this.meta = meta;
}

Expand Down Expand Up @@ -237,7 +237,7 @@ public CollectorTask toCollectorTask() throws ParseException {
task.setStatus(status == 1);
task.setConfig(JsonHelper.readValueAsMap(config.getBytes()));
if (this.meta != null) {
task.setMeta(JsonHelper.readValueAsMap(meta.getBytes()));
task.setMeta(meta);
}
task.setCreateTime(df.parse(createTime).getTime());
task.setUpdateTime(df.parse(updateTime).getTime());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.qiniu.configuration;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

@Component
@Configuration
@ConfigurationProperties(prefix = "collector")
public class CollectorProperties {

@Value("${collector.meta_path:}")
private String metaPath;

public String getMetaPath() {
return metaPath;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.qiniu.configuration;

import io.qiniu.common.Constant;
import io.qiniu.common.entity.pandora.PandoraMode;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
Expand Down Expand Up @@ -27,10 +28,10 @@ public class PandoraProperties {
@Value("${server.port:8088}")
private String serverPort;

@Value("${pandora.app:demo}")
@Value("${pandora.app:" + Constant.APP_NAME + "}")
private String appName;

@Value("${pandora.service:demo}")
@Value("${pandora.service:" + Constant.APP_NAME + "}")
private String serviceName;

@Value("${pandora.id:}")
Expand Down
14 changes: 14 additions & 0 deletions app/java/src/main/java/io/qiniu/service/PandoraService.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import com.qiniu.pandora.DefaultPandoraClient;
import com.qiniu.pandora.service.customservice.CustomService;
import com.qiniu.pandora.service.storage.StorageService;
import com.qiniu.pandora.service.token.TokenService;
import com.qiniu.pandora.service.upload.PostDataService;
import io.qiniu.configuration.PandoraProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
Expand All @@ -13,12 +15,16 @@ public class PandoraService {
private final DefaultPandoraClient client;
private final CustomService customService;
private final StorageService storageService;
private final TokenService tokenService;
private final PostDataService postDataService;

@Autowired
public PandoraService(PandoraProperties properties) {
this.client = new DefaultPandoraClient(properties.getPandoraUrl());
this.customService = client.NewCustomService();
this.storageService = client.NewStorageService();
this.tokenService = client.NewTokenService();
this.postDataService = client.NewPostDataService(tokenService, properties.getPandoraToken());
}

public DefaultPandoraClient getClient() {
Expand All @@ -32,4 +38,12 @@ public CustomService getCustomService() {
public StorageService getStorageService() {
return storageService;
}

public TokenService getTokenService() {
return tokenService;
}

public PostDataService getPostDataService() {
return postDataService;
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package io.qiniu.service.collector;

import com.qiniu.pandora.collect.Collector;
import com.qiniu.pandora.collect.CollectorConfig;
import com.qiniu.pandora.collect.CollectorContext;
import com.qiniu.pandora.collect.DefaultCollector;
import com.qiniu.pandora.collect.State;
import com.qiniu.pandora.collect.runner.config.CollectorConfig;
import com.qiniu.pandora.collect.runner.config.RunnerConfig;
import com.qiniu.pandora.common.QiniuException;
import com.qiniu.pandora.common.QiniuExceptions;
import io.qiniu.common.entity.collector.CollectorTask;
import io.qiniu.configuration.CollectorProperties;
import io.qiniu.configuration.PandoraProperties;
import io.qiniu.dao.collector.ICollectorTaskDao;
import io.qiniu.service.PandoraService;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -33,13 +37,17 @@ public class CollectorTaskService {
@Autowired
public CollectorTaskService(
PandoraProperties properties,
ICollectorTaskDao collectorTaskDao,
PandoraProperties pandoraProperties) {
CollectorProperties collectorProperties,
PandoraService pandoraService,
ICollectorTaskDao collectorTaskDao) {
this.workerId = properties.getId();
this.collectorTaskDao = collectorTaskDao;
this.properties = pandoraProperties;

this.collector = new DefaultCollector();
this.properties = properties;
this.collector =
new DefaultCollector(
new CollectorConfig(collectorProperties.getMetaPath()),
new CollectorContext(
pandoraService.getTokenService(), pandoraService.getPostDataService()));
}

@PostConstruct
Expand All @@ -50,7 +58,7 @@ public void init() {

public List<CollectorTask> queryTasks() throws QiniuException {
List<CollectorTask> tasks = new ArrayList<>();
for (CollectorConfig config : this.collector.getAllRunners()) {
for (RunnerConfig config : this.collector.getAllRunners()) {
CollectorTask task = convertConfigToTask(config);
task.setWorkerId(this.workerId);
tasks.add(task);
Expand All @@ -60,7 +68,7 @@ public List<CollectorTask> queryTasks() throws QiniuException {

public List<CollectorTask> queryTasks(List<String> taskIds) throws QiniuException {
List<CollectorTask> tasks = new ArrayList<>();
for (CollectorConfig config : this.collector.getRunners(taskIds)) {
for (RunnerConfig config : this.collector.getRunners(taskIds)) {
CollectorTask task = convertConfigToTask(config);
task.setWorkerId(this.workerId);
tasks.add(task);
Expand All @@ -69,7 +77,7 @@ public List<CollectorTask> queryTasks(List<String> taskIds) throws QiniuExceptio
}

public CollectorTask queryTask(String taskId) throws QiniuException {
CollectorConfig config = this.collector.getRunner(taskId);
RunnerConfig config = this.collector.getRunner(taskId);
if (config == null) {
return null;
}
Expand Down Expand Up @@ -173,15 +181,16 @@ private void recoveryTask() {
}
}

private static CollectorConfig convertTaskToConfig(CollectorTask task) throws QiniuException {
return new CollectorConfig(
private static RunnerConfig convertTaskToConfig(CollectorTask task) throws QiniuException {
return new RunnerConfig(
task.getTaskId(),
task.getName(),
task.isEnabled() ? State.STARTED : State.STOPPED,
task.getMeta(),
convertConfigToProperties(task.getConfig()));
}

private static CollectorTask convertConfigToTask(CollectorConfig config) throws QiniuException {
private static CollectorTask convertConfigToTask(RunnerConfig config) throws QiniuException {
CollectorTask task = new CollectorTask();

task.setTaskId(config.getId());
Expand Down
46 changes: 46 additions & 0 deletions sdk/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
Pandora-Java-SDK
=============

提供访问Pandora平台所需的接口以及服务端采集基础框架

平台接口
=============

1. app相关
2. 元数据管理
3. Token管理
4. 数据传输

服务端采集框架
=============
改造自flume EmbeddedAgent框架:
1. 增加自定义source、sink
2. 增加source meta管理

采集配置样例:
```json
{
"source.type":"file",
"source.input.file":"test.log",
"channel.type":"memory",
"channel.capacity": "200",
"sinks":"sink1",
"sink1.type":"pandora",
"sink1.source_type": "json",
"sink1.repo": "test",
"processor.type": "default"
}
```

采集配置介绍:
1. source数据来源: file
1. file: 文件按行采集,暂不支持增量采集
2. channel缓存管道: memory, file
1. memory: 内存管道,数据暂存内存中,性能较高但程序崩溃时会导致数据丢失
2. file: 使用本地文件作为缓存管道
3. sink发送源: pandora
1. pandora: 将数据发往pandora平台
4. processor类型: default、load_balance、failover
1. default: 默认类型,不支持多sink。
2. load_balance: 负载均衡,有round_robin(轮询)或random(随机)两种选择机制,默认round_robin。
3. failover: 维护sink优先级列表,支持故障转移
56 changes: 53 additions & 3 deletions sdk/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,15 @@

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring-version>1.5.4.RELEASE</spring-version>
<spring-version>2.6.3</spring-version>
<spring-boot-starter-log4j-version>1.3.8.RELEASE</spring-boot-starter-log4j-version>
<jackson-version>2.12.3</jackson-version>
<log4j-version>2.17.1</log4j-version>
<http-client-version>4.5.13</http-client-version>
<http-core-version>4.4.9</http-core-version>
<flume-version>1.9.0</flume-version>
<junit-version>4.13.1</junit-version>
<mockwebserver-version>4.9.3</mockwebserver-version>
</properties>

<build>
Expand Down Expand Up @@ -155,10 +158,38 @@
</build>

<dependencies>
<!-- springboot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>${spring-version}</version>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
<exclusion>
<!--log4j-slf4j-impl与 logback-classic包不兼容,删除这个包 -->
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j</artifactId>
<version>${spring-boot-starter-log4j-version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>${spring-version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
Expand Down Expand Up @@ -200,19 +231,38 @@
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
<version>${flume-version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-node</artifactId>
<version>1.9.0</version>
<version>${flume-version}</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- unit test -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit-version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>mockwebserver</artifactId>
<version>${mockwebserver-version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Loading