这是一个基于TCP协议的ClickHouse Java连接驱动项目,提供了高性能的ClickHouse数据库操作接口。该客户端通过原生TCP协议与ClickHouse服务器通信,支持连接池管理、数据块操作和批量数据处理。
- 🔌 原生TCP协议: 直接通过TCP协议与ClickHouse服务器通信,性能优异
- 🏊 连接池管理: 内置连接池,支持连接复用和自动管理
- 📦 数据块操作: 基于Block的数据传输和处理机制
- 🔧 类型安全: 完整的ClickHouse数据类型支持
- 🚀 高性能: 优化的序列化和反序列化机制
- 🔒 SSL支持: 支持SSL安全连接
- 📊 批量操作: 支持批量数据插入和查询
ClickHouseClientConfig 是客户端的配置类,用于设置连接参数和客户端行为。
| 配置项 | 类型 | 默认值 | 说明 |
|---|---|---|---|
host |
String | "127.0.0.1" | ClickHouse服务器主机地址 |
port |
int | 9000 | ClickHouse服务器端口 |
database |
String | "default" | 数据库名称 |
user |
String | "default" | 用户名 |
password |
String | "" | 密码 |
queryTimeout |
Duration | Duration.ZERO | 查询超时时间 |
connectTimeout |
Duration | Duration.ZERO | 连接超时时间 |
ssl |
boolean | false | 是否启用SSL |
sslMode |
String | "disabled" | SSL模式 |
charset |
Charset | UTF-8 | 字符集 |
tcpKeepAlive |
boolean | false | TCP保持连接 |
clientName |
String | "ClickHouse client" | 客户端名称 |
// 基础配置
ClickHouseClientConfig config = ClickHouseClientConfig.Builder.builder()
.host("127.0.0.1")
.port(9000)
.database("my_database")
.user("default")
.password("password")
.build();
// 高级配置
ClickHouseClientConfig advancedConfig = ClickHouseClientConfig.Builder.builder()
.host("clickhouse.example.com")
.port(9000)
.database("analytics")
.user("analytics_user")
.password("secure_password")
.queryTimeout(Duration.ofMinutes(5))
.connectTimeout(Duration.ofSeconds(30))
.ssl(true)
.sslMode("require")
.charset(StandardCharsets.UTF_8)
.tcpKeepAlive(true)
.clientName("MyAnalyticsClient")
.connectionPoolTotal(20)
.connectionPoolMaxIdle(10)
.connectionPooMinIdle(5)
.build();ClickHouseClient 是主要的客户端类,负责管理连接池和提供数据操作接口。
| 方法 | 返回类型 | 说明 |
|---|---|---|
createBlock(String tableName) |
Block | 为指定表创建数据块 |
insert(Block block, boolean clean) |
void | 插入数据块 |
insert(Block block) |
void | 插入数据块并自动清理 |
cleanBlock(Block block) |
void | 清理数据块资源 |
close() |
void | 关闭客户端,释放资源 |
// 创建客户端
ClickHouseClient client = new ClickHouseClient.Builder()
.config(config)
.build();
try {
// 为表创建数据块
Block block = client.createBlock("users");
// 插入数据
client.insert(block, true);
} finally {
// 关闭客户端
client.close();
}Block 是数据传输的核心组件,代表ClickHouse中的一个数据块,包含列和行的集合。
| 方法 | 返回类型 | 说明 |
|---|---|---|
getColumn(String columnName) |
IColumn | 根据列名获取列 |
getColumn(int columnIdx) |
IColumn | 根据列索引获取列 |
setObject(int columnIdx, Object object) |
void | 设置列值 |
appendRow() |
void | 添加一行数据 |
rowCnt() |
int | 获取行数 |
columnCnt() |
int | 获取列数 |
cleanup() |
void | 清理资源 |
clear() |
void | 清空数据 |
// 创建数据块
Block block = client.createBlock("users");
// 获取列并写入数据
IColumn idColumn = block.getColumn("id");
IColumn nameColumn = block.getColumn("name");
IColumn ageColumn = block.getColumn("age");
// 写入第一行数据
idColumn.write(1);
nameColumn.write("Alice");
ageColumn.write(25);
block.appendRow();
// 写入第二行数据
idColumn.write(2);
nameColumn.write("Bob");
ageColumn.write(30);
block.appendRow();
// 插入数据
client.insert(block, true);IColumn 是列的接口,定义了列的基本操作。所有列类型都必须实现该接口。
| 方法 | 返回类型 | 说明 |
|---|---|---|
name() |
String | 获取列名 |
type() |
IDataType<?> | 获取数据类型 |
value(int idx) |
Object | 获取指定索引的值 |
write(Object object) |
void | 写入对象值 |
write(byte[] bytes, int offset, int length) |
void | 写入字节数组 |
writeInt(byte[] bytes, int offset, int length, boolean isLittleEndian) |
void | 写入整数字节 |
rowCnt() |
int | 获取行数 |
addRowCnt() |
int | 增加行数 |
clear() |
void | 清空列数据 |
values() |
Object[] | 获取所有值 |
- 基本类型: Int8, Int16, Int32, Int64, UInt8, UInt16, UInt32, UInt64
- 浮点类型: Float32, Float64
- 字符串类型: String, FixedString, LowCardinality
- 日期时间: Date, DateTime, DateTime64
- 复杂类型: Array, Map, Tuple, Nullable
- 其他类型: UUID, Decimal, Enum
// 获取不同类型的列
IColumn idColumn = block.getColumn("id"); // Int32
IColumn nameColumn = block.getColumn("name"); // String
IColumn ageColumn = block.getColumn("age"); // UInt8
IColumn salaryColumn = block.getColumn("salary"); // Float64
IColumn activeColumn = block.getColumn("is_active"); // Bool
IColumn createdColumn = block.getColumn("created_at"); // DateTime
IColumn tagsColumn = block.getColumn("tags"); // Array(String)
IColumn metadataColumn = block.getColumn("metadata"); // Map(String, String)
// 写入基本类型数据
idColumn.write(1);
nameColumn.write("John Doe");
ageColumn.write((byte) 25);
salaryColumn.write(75000.50);
activeColumn.write(true);
createdColumn.write(LocalDateTime.now());
// 写入数组数据
tagsColumn.write(new String[]{"java", "developer", "backend"});
// 写入映射数据
Map<String, String> metadata = new HashMap<>();
metadata.put("department", "engineering");
metadata.put("level", "senior");
metadataColumn.write(metadata);import com.berry.clickhouse.tcp.client.ClickHouseClient;
import com.berry.clickhouse.tcp.client.data.Block;
import com.berry.clickhouse.tcp.client.data.IColumn;
import com.berry.clickhouse.tcp.client.settings.ClickHouseClientConfig;
import java.time.Duration;
import java.nio.charset.StandardCharsets;
public class BasicInsertExample {
public static void main(String[] args) throws Exception {
// 1. 创建配置
ClickHouseClientConfig config = ClickHouseClientConfig.Builder.builder()
.host("127.0.0.1")
.port(9000)
.database("test_db")
.user("default")
.password("")
.queryTimeout(Duration.ofMinutes(5))
.connectTimeout(Duration.ofSeconds(30))
.charset(StandardCharsets.UTF_8)
.build();
// 2. 创建客户端
ClickHouseClient client = new ClickHouseClient.Builder()
.config(config)
.build();
try {
// 3. 创建数据块
Block block = client.createBlock("users");
// 4. 获取列
IColumn idColumn = block.getColumn("id");
IColumn nameColumn = block.getColumn("name");
IColumn emailColumn = block.getColumn("email");
IColumn ageColumn = block.getColumn("age");
IColumn createdColumn = block.getColumn("created_at");
// 5. 写入数据
for (int i = 1; i <= 1000; i++) {
idColumn.write(i);
nameColumn.write("User" + i);
emailColumn.write("user" + i + "@example.com");
ageColumn.write(20 + (i % 50));
createdColumn.write(System.currentTimeMillis());
block.appendRow();
}
// 6. 插入数据
client.insert(block, true);
System.out.println("成功插入1000条数据");
} finally {
// 7. 关闭客户端
client.close();
}
}
}import com.berry.clickhouse.tcp.client.ClickHouseClient;
import com.berry.clickhouse.tcp.client.data.Block;
import com.berry.clickhouse.tcp.client.data.IColumn;
import com.berry.clickhouse.tcp.client.settings.ClickHouseClientConfig;
import java.time.Duration;
import java.nio.charset.StandardCharsets;
import java.util.Map;
public class BatchProcessingExample {
public static void main(String[] args) throws Exception {
// 配置和客户端创建
ClickHouseClientConfig config = ClickHouseClientConfig.Builder.builder()
.host("127.0.0.1")
.port(9000)
.database("analytics")
.user("default")
.password("")
.connectionPoolTotal(20)
.connectionPoolMaxIdle(10)
.build();
ClickHouseClient client = new ClickHouseClient.Builder()
.config(config)
.build();
try {
// 批量处理数据
processBatchData(client, "events", 10000, 1000);
} finally {
client.close();
}
}
private static void processBatchData(ClickHouseClient client, String tableName,
int totalRecords, int batchSize) throws Exception {
for (int batch = 0; batch < totalRecords; batch += batchSize) {
Block block = client.createBlock(tableName);
IColumn timestampColumn = block.getColumn("timestamp");
IColumn userIdColumn = block.getColumn("user_id");
IColumn eventTypeColumn = block.getColumn("event_type");
IColumn propertiesColumn = block.getColumn("properties");
int currentBatchSize = Math.min(batchSize, totalRecords - batch);
for (int i = 0; i < currentBatchSize; i++) {
int recordId = batch + i;
timestampColumn.write(System.currentTimeMillis());
userIdColumn.write(recordId % 1000);
eventTypeColumn.write("click");
Map<String, String> properties = Map.of(
"page", "/products/" + (recordId % 100),
"source", "web",
"session_id", "session_" + (recordId % 10000)
);
propertiesColumn.write(properties);
block.appendRow();
}
client.insert(block, true);
System.out.println("插入批次 " + (batch / batchSize + 1) +
", 记录数: " + currentBatchSize);
}
}
}import com.berry.clickhouse.tcp.client.ClickHouseClient;
import com.berry.clickhouse.tcp.client.data.Block;
import com.berry.clickhouse.tcp.client.data.IColumn;
import com.berry.clickhouse.tcp.client.settings.ClickHouseClientConfig;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.ZonedDateTime;
import java.util.Map;
import java.util.UUID;
public class ComplexDataTypesExample {
public static void main(String[] args) throws Exception {
ClickHouseClientConfig config = ClickHouseClientConfig.Builder.builder()
.host("127.0.0.1")
.port(9000)
.database("test_db")
.user("default")
.password("")
.build();
ClickHouseClient client = new ClickHouseClient.Builder()
.config(config)
.build();
try {
Block block = client.createBlock("complex_data_types");
// 获取各种复杂类型的列
IColumn idColumn = block.getColumn("id");
IColumn nameColumn = block.getColumn("name");
IColumn tagsColumn = block.getColumn("tags");
IColumn metadataColumn = block.getColumn("metadata");
IColumn coordinatesColumn = block.getColumn("coordinates");
IColumn priceColumn = block.getColumn("price");
IColumn uuidColumn = block.getColumn("uuid");
IColumn birthDateColumn = block.getColumn("birth_date");
IColumn lastLoginColumn = block.getColumn("last_login");
// 写入复杂数据类型
for (int i = 1; i <= 100; i++) {
idColumn.write(i);
nameColumn.write("Product" + i);
// 数组类型
tagsColumn.write(new String[]{"electronics", "gadget", "tech"});
// 映射类型
Map<String, String> metadata = Map.of(
"brand", "Brand" + (i % 10),
"category", "Category" + (i % 5),
"rating", String.valueOf(4 + (i % 2))
);
metadataColumn.write(metadata);
// 元组类型 (坐标)
coordinatesColumn.write(new Object[]{40.7128 + (i * 0.001), -74.0060 + (i * 0.001)});
// Decimal类型
priceColumn.write(BigDecimal.valueOf(99.99 + i));
// UUID类型
uuidColumn.write(UUID.randomUUID());
// Date类型
birthDateColumn.write(LocalDate.now().minusDays(i * 365));
// DateTime类型
lastLoginColumn.write(ZonedDateTime.now().minusHours(i));
block.appendRow();
}
client.insert(block, true);
System.out.println("成功插入复杂数据类型数据");
} finally {
client.close();
}
}
}ClickHouseClientConfig config = ClickHouseClientConfig.Builder.builder()
.host("127.0.0.1")
.port(9000)
.database("my_database")
.user("default")
.password("")
// 连接池优化配置
.connectionPoolTotal(50) // 总连接数
.connectionPoolMaxIdle(20) // 最大空闲连接数
.connectionPooMinIdle(10) // 最小空闲连接数
.build();// 使用合适的批次大小
private static final int OPTIMAL_BATCH_SIZE = 10000;
// 批量处理数据
for (int offset = 0; offset < totalRecords; offset += OPTIMAL_BATCH_SIZE) {
Block block = client.createBlock("table_name");
// ... 填充数据
client.insert(block, true); // 自动清理
}// 及时清理不需要的数据块
try {
Block block = client.createBlock("table_name");
// ... 使用数据块
client.insert(block, true); // 插入后自动清理
} catch (Exception e) {
// 异常情况下手动清理
if (block != null) {
client.cleanBlock(block);
}
throw e;
}try {
ClickHouseClient client = new ClickHouseClient.Builder()
.config(config)
.build();
Block block = client.createBlock("table_name");
// ... 操作数据块
client.insert(block, true);
} catch (SQLException e) {
System.err.println("SQL错误: " + e.getMessage());
// 处理SQL相关错误
} catch (IOException e) {
System.err.println("IO错误: " + e.getMessage());
// 处理网络或文件IO错误
} catch (Exception e) {
System.err.println("未知错误: " + e.getMessage());
// 处理其他错误
} finally {
if (client != null) {
try {
client.close();
} catch (Exception e) {
System.err.println("关闭客户端时发生错误: " + e.getMessage());
}
}
}项目使用Maven进行依赖管理,主要依赖包括:
<dependencies>
<!-- 压缩库 -->
<dependency>
<groupId>io.airlift</groupId>
<artifactId>aircompressor</artifactId>
<version>0.21</version>
</dependency>
<!-- 连接池 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.12.0</version>
</dependency>
<!-- 日志 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.30</version>
</dependency>
</dependencies>- Java版本: JDK 11 或更高版本
- ClickHouse版本: 支持ClickHouse 20.8及以上版本
- 网络: 需要能够访问ClickHouse服务器的TCP端口(默认9000)
本项目采用开源许可证,具体许可证信息请查看项目根目录下的LICENSE文件。
欢迎提交Issue和Pull Request来改进这个项目。在提交代码前,请确保:
- 代码符合项目的编码规范
- 添加了必要的测试用例
- 更新了相关文档
如有问题或建议,请通过以下方式联系:
- 提交GitHub Issue
- 发送邮件至项目维护者
注意: 这是一个基于TCP协议的高性能ClickHouse客户端,适用于需要高性能数据操作的场景。对于简单的查询操作,建议使用官方的JDBC驱动。