Skip to content

kohakuhubo/clickhouse-tcpc-client

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

10 Commits
 
 
 
 
 
 

Repository files navigation

ClickHouse TCP 客户端

这是一个基于TCP协议的ClickHouse Java连接驱动项目,提供了高性能的ClickHouse数据库操作接口。该客户端通过原生TCP协议与ClickHouse服务器通信,支持连接池管理、数据块操作和批量数据处理。

项目特性

  • 🔌 原生TCP协议: 直接通过TCP协议与ClickHouse服务器通信,性能优异
  • 🏊 连接池管理: 内置连接池,支持连接复用和自动管理
  • 📦 数据块操作: 基于Block的数据传输和处理机制
  • 🔧 类型安全: 完整的ClickHouse数据类型支持
  • 🚀 高性能: 优化的序列化和反序列化机制
  • 🔒 SSL支持: 支持SSL安全连接
  • 📊 批量操作: 支持批量数据插入和查询

核心类和接口

1. ClickHouseClientConfig

ClickHouseClientConfig 是客户端的配置类,用于设置连接参数和客户端行为。

主要配置项

配置项 类型 默认值 说明
host String "127.0.0.1" ClickHouse服务器主机地址
port int 9000 ClickHouse服务器端口
database String "default" 数据库名称
user String "default" 用户名
password String "" 密码
queryTimeout Duration Duration.ZERO 查询超时时间
connectTimeout Duration Duration.ZERO 连接超时时间
ssl boolean false 是否启用SSL
sslMode String "disabled" SSL模式
charset Charset UTF-8 字符集
tcpKeepAlive boolean false TCP保持连接
clientName String "ClickHouse client" 客户端名称

使用示例

// 基础配置
ClickHouseClientConfig config = ClickHouseClientConfig.Builder.builder()
    .host("127.0.0.1")
    .port(9000)
    .database("my_database")
    .user("default")
    .password("password")
    .build();

// 高级配置
ClickHouseClientConfig advancedConfig = ClickHouseClientConfig.Builder.builder()
    .host("clickhouse.example.com")
    .port(9000)
    .database("analytics")
    .user("analytics_user")
    .password("secure_password")
    .queryTimeout(Duration.ofMinutes(5))
    .connectTimeout(Duration.ofSeconds(30))
    .ssl(true)
    .sslMode("require")
    .charset(StandardCharsets.UTF_8)
    .tcpKeepAlive(true)
    .clientName("MyAnalyticsClient")
    .connectionPoolTotal(20)
    .connectionPoolMaxIdle(10)
    .connectionPooMinIdle(5)
    .build();

2. ClickHouseClient

ClickHouseClient 是主要的客户端类,负责管理连接池和提供数据操作接口。

主要方法

方法 返回类型 说明
createBlock(String tableName) Block 为指定表创建数据块
insert(Block block, boolean clean) void 插入数据块
insert(Block block) void 插入数据块并自动清理
cleanBlock(Block block) void 清理数据块资源
close() void 关闭客户端,释放资源

使用示例

// 创建客户端
ClickHouseClient client = new ClickHouseClient.Builder()
    .config(config)
    .build();

try {
    // 为表创建数据块
    Block block = client.createBlock("users");
    
    // 插入数据
    client.insert(block, true);
    
} finally {
    // 关闭客户端
    client.close();
}

3. Block

Block 是数据传输的核心组件,代表ClickHouse中的一个数据块,包含列和行的集合。

主要方法

方法 返回类型 说明
getColumn(String columnName) IColumn 根据列名获取列
getColumn(int columnIdx) IColumn 根据列索引获取列
setObject(int columnIdx, Object object) void 设置列值
appendRow() void 添加一行数据
rowCnt() int 获取行数
columnCnt() int 获取列数
cleanup() void 清理资源
clear() void 清空数据

使用示例

// 创建数据块
Block block = client.createBlock("users");

// 获取列并写入数据
IColumn idColumn = block.getColumn("id");
IColumn nameColumn = block.getColumn("name");
IColumn ageColumn = block.getColumn("age");

// 写入第一行数据
idColumn.write(1);
nameColumn.write("Alice");
ageColumn.write(25);
block.appendRow();

// 写入第二行数据
idColumn.write(2);
nameColumn.write("Bob");
ageColumn.write(30);
block.appendRow();

// 插入数据
client.insert(block, true);

4. IColumn

IColumn 是列的接口,定义了列的基本操作。所有列类型都必须实现该接口。

主要方法

方法 返回类型 说明
name() String 获取列名
type() IDataType<?> 获取数据类型
value(int idx) Object 获取指定索引的值
write(Object object) void 写入对象值
write(byte[] bytes, int offset, int length) void 写入字节数组
writeInt(byte[] bytes, int offset, int length, boolean isLittleEndian) void 写入整数字节
rowCnt() int 获取行数
addRowCnt() int 增加行数
clear() void 清空列数据
values() Object[] 获取所有值

支持的数据类型

  • 基本类型: Int8, Int16, Int32, Int64, UInt8, UInt16, UInt32, UInt64
  • 浮点类型: Float32, Float64
  • 字符串类型: String, FixedString, LowCardinality
  • 日期时间: Date, DateTime, DateTime64
  • 复杂类型: Array, Map, Tuple, Nullable
  • 其他类型: UUID, Decimal, Enum

使用示例

// 获取不同类型的列
IColumn idColumn = block.getColumn("id");           // Int32
IColumn nameColumn = block.getColumn("name");       // String
IColumn ageColumn = block.getColumn("age");         // UInt8
IColumn salaryColumn = block.getColumn("salary");   // Float64
IColumn activeColumn = block.getColumn("is_active"); // Bool
IColumn createdColumn = block.getColumn("created_at"); // DateTime
IColumn tagsColumn = block.getColumn("tags");       // Array(String)
IColumn metadataColumn = block.getColumn("metadata"); // Map(String, String)

// 写入基本类型数据
idColumn.write(1);
nameColumn.write("John Doe");
ageColumn.write((byte) 25);
salaryColumn.write(75000.50);
activeColumn.write(true);
createdColumn.write(LocalDateTime.now());

// 写入数组数据
tagsColumn.write(new String[]{"java", "developer", "backend"});

// 写入映射数据
Map<String, String> metadata = new HashMap<>();
metadata.put("department", "engineering");
metadata.put("level", "senior");
metadataColumn.write(metadata);

完整使用示例

1. 基础数据插入

import com.berry.clickhouse.tcp.client.ClickHouseClient;
import com.berry.clickhouse.tcp.client.data.Block;
import com.berry.clickhouse.tcp.client.data.IColumn;
import com.berry.clickhouse.tcp.client.settings.ClickHouseClientConfig;
import java.time.Duration;
import java.nio.charset.StandardCharsets;

public class BasicInsertExample {
    public static void main(String[] args) throws Exception {
        // 1. 创建配置
        ClickHouseClientConfig config = ClickHouseClientConfig.Builder.builder()
            .host("127.0.0.1")
            .port(9000)
            .database("test_db")
            .user("default")
            .password("")
            .queryTimeout(Duration.ofMinutes(5))
            .connectTimeout(Duration.ofSeconds(30))
            .charset(StandardCharsets.UTF_8)
            .build();

        // 2. 创建客户端
        ClickHouseClient client = new ClickHouseClient.Builder()
            .config(config)
            .build();

        try {
            // 3. 创建数据块
            Block block = client.createBlock("users");

            // 4. 获取列
            IColumn idColumn = block.getColumn("id");
            IColumn nameColumn = block.getColumn("name");
            IColumn emailColumn = block.getColumn("email");
            IColumn ageColumn = block.getColumn("age");
            IColumn createdColumn = block.getColumn("created_at");

            // 5. 写入数据
            for (int i = 1; i <= 1000; i++) {
                idColumn.write(i);
                nameColumn.write("User" + i);
                emailColumn.write("user" + i + "@example.com");
                ageColumn.write(20 + (i % 50));
                createdColumn.write(System.currentTimeMillis());
                block.appendRow();
            }

            // 6. 插入数据
            client.insert(block, true);
            System.out.println("成功插入1000条数据");

        } finally {
            // 7. 关闭客户端
            client.close();
        }
    }
}

2. 批量数据处理

import com.berry.clickhouse.tcp.client.ClickHouseClient;
import com.berry.clickhouse.tcp.client.data.Block;
import com.berry.clickhouse.tcp.client.data.IColumn;
import com.berry.clickhouse.tcp.client.settings.ClickHouseClientConfig;
import java.time.Duration;
import java.nio.charset.StandardCharsets;
import java.util.Map;

public class BatchProcessingExample {
    public static void main(String[] args) throws Exception {
        // 配置和客户端创建
        ClickHouseClientConfig config = ClickHouseClientConfig.Builder.builder()
            .host("127.0.0.1")
            .port(9000)
            .database("analytics")
            .user("default")
            .password("")
            .connectionPoolTotal(20)
            .connectionPoolMaxIdle(10)
            .build();

        ClickHouseClient client = new ClickHouseClient.Builder()
            .config(config)
            .build();

        try {
            // 批量处理数据
            processBatchData(client, "events", 10000, 1000);
        } finally {
            client.close();
        }
    }

    private static void processBatchData(ClickHouseClient client, String tableName, 
                                       int totalRecords, int batchSize) throws Exception {
        for (int batch = 0; batch < totalRecords; batch += batchSize) {
            Block block = client.createBlock(tableName);
            
            IColumn timestampColumn = block.getColumn("timestamp");
            IColumn userIdColumn = block.getColumn("user_id");
            IColumn eventTypeColumn = block.getColumn("event_type");
            IColumn propertiesColumn = block.getColumn("properties");

            int currentBatchSize = Math.min(batchSize, totalRecords - batch);
            
            for (int i = 0; i < currentBatchSize; i++) {
                int recordId = batch + i;
                
                timestampColumn.write(System.currentTimeMillis());
                userIdColumn.write(recordId % 1000);
                eventTypeColumn.write("click");
                
                Map<String, String> properties = Map.of(
                    "page", "/products/" + (recordId % 100),
                    "source", "web",
                    "session_id", "session_" + (recordId % 10000)
                );
                propertiesColumn.write(properties);
                
                block.appendRow();
            }

            client.insert(block, true);
            System.out.println("插入批次 " + (batch / batchSize + 1) + 
                             ", 记录数: " + currentBatchSize);
        }
    }
}

3. 复杂数据类型处理

import com.berry.clickhouse.tcp.client.ClickHouseClient;
import com.berry.clickhouse.tcp.client.data.Block;
import com.berry.clickhouse.tcp.client.data.IColumn;
import com.berry.clickhouse.tcp.client.settings.ClickHouseClientConfig;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.ZonedDateTime;
import java.util.Map;
import java.util.UUID;

public class ComplexDataTypesExample {
    public static void main(String[] args) throws Exception {
        ClickHouseClientConfig config = ClickHouseClientConfig.Builder.builder()
            .host("127.0.0.1")
            .port(9000)
            .database("test_db")
            .user("default")
            .password("")
            .build();

        ClickHouseClient client = new ClickHouseClient.Builder()
            .config(config)
            .build();

        try {
            Block block = client.createBlock("complex_data_types");

            // 获取各种复杂类型的列
            IColumn idColumn = block.getColumn("id");
            IColumn nameColumn = block.getColumn("name");
            IColumn tagsColumn = block.getColumn("tags");
            IColumn metadataColumn = block.getColumn("metadata");
            IColumn coordinatesColumn = block.getColumn("coordinates");
            IColumn priceColumn = block.getColumn("price");
            IColumn uuidColumn = block.getColumn("uuid");
            IColumn birthDateColumn = block.getColumn("birth_date");
            IColumn lastLoginColumn = block.getColumn("last_login");

            // 写入复杂数据类型
            for (int i = 1; i <= 100; i++) {
                idColumn.write(i);
                nameColumn.write("Product" + i);
                
                // 数组类型
                tagsColumn.write(new String[]{"electronics", "gadget", "tech"});
                
                // 映射类型
                Map<String, String> metadata = Map.of(
                    "brand", "Brand" + (i % 10),
                    "category", "Category" + (i % 5),
                    "rating", String.valueOf(4 + (i % 2))
                );
                metadataColumn.write(metadata);
                
                // 元组类型 (坐标)
                coordinatesColumn.write(new Object[]{40.7128 + (i * 0.001), -74.0060 + (i * 0.001)});
                
                // Decimal类型
                priceColumn.write(BigDecimal.valueOf(99.99 + i));
                
                // UUID类型
                uuidColumn.write(UUID.randomUUID());
                
                // Date类型
                birthDateColumn.write(LocalDate.now().minusDays(i * 365));
                
                // DateTime类型
                lastLoginColumn.write(ZonedDateTime.now().minusHours(i));
                
                block.appendRow();
            }

            client.insert(block, true);
            System.out.println("成功插入复杂数据类型数据");

        } finally {
            client.close();
        }
    }
}

性能优化建议

1. 连接池配置

ClickHouseClientConfig config = ClickHouseClientConfig.Builder.builder()
    .host("127.0.0.1")
    .port(9000)
    .database("my_database")
    .user("default")
    .password("")
    // 连接池优化配置
    .connectionPoolTotal(50)        // 总连接数
    .connectionPoolMaxIdle(20)      // 最大空闲连接数
    .connectionPooMinIdle(10)       // 最小空闲连接数
    .build();

2. 批量插入优化

// 使用合适的批次大小
private static final int OPTIMAL_BATCH_SIZE = 10000;

// 批量处理数据
for (int offset = 0; offset < totalRecords; offset += OPTIMAL_BATCH_SIZE) {
    Block block = client.createBlock("table_name");
    // ... 填充数据
    client.insert(block, true); // 自动清理
}

3. 内存管理

// 及时清理不需要的数据块
try {
    Block block = client.createBlock("table_name");
    // ... 使用数据块
    client.insert(block, true); // 插入后自动清理
} catch (Exception e) {
    // 异常情况下手动清理
    if (block != null) {
        client.cleanBlock(block);
    }
    throw e;
}

错误处理

try {
    ClickHouseClient client = new ClickHouseClient.Builder()
        .config(config)
        .build();
    
    Block block = client.createBlock("table_name");
    // ... 操作数据块
    client.insert(block, true);
    
} catch (SQLException e) {
    System.err.println("SQL错误: " + e.getMessage());
    // 处理SQL相关错误
} catch (IOException e) {
    System.err.println("IO错误: " + e.getMessage());
    // 处理网络或文件IO错误
} catch (Exception e) {
    System.err.println("未知错误: " + e.getMessage());
    // 处理其他错误
} finally {
    if (client != null) {
        try {
            client.close();
        } catch (Exception e) {
            System.err.println("关闭客户端时发生错误: " + e.getMessage());
        }
    }
}

依赖管理

项目使用Maven进行依赖管理,主要依赖包括:

<dependencies>
    <!-- 压缩库 -->
    <dependency>
        <groupId>io.airlift</groupId>
        <artifactId>aircompressor</artifactId>
        <version>0.21</version>
    </dependency>
    
    <!-- 连接池 -->
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-pool2</artifactId>
        <version>2.12.0</version>
    </dependency>
    
    <!-- 日志 -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.30</version>
    </dependency>
</dependencies>

系统要求

  • Java版本: JDK 11 或更高版本
  • ClickHouse版本: 支持ClickHouse 20.8及以上版本
  • 网络: 需要能够访问ClickHouse服务器的TCP端口(默认9000)

许可证

本项目采用开源许可证,具体许可证信息请查看项目根目录下的LICENSE文件。

贡献

欢迎提交Issue和Pull Request来改进这个项目。在提交代码前,请确保:

  1. 代码符合项目的编码规范
  2. 添加了必要的测试用例
  3. 更新了相关文档

联系方式

如有问题或建议,请通过以下方式联系:

  • 提交GitHub Issue
  • 发送邮件至项目维护者

注意: 这是一个基于TCP协议的高性能ClickHouse客户端,适用于需要高性能数据操作的场景。对于简单的查询操作,建议使用官方的JDBC驱动。

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages