phpClickHouse

phpClickHouse

PHP客户端库实现ClickHouse数据库高效操作

phpClickHouse是一个用于操作ClickHouse数据库的PHP客户端库。该项目支持异步查询、批量插入和HTTP压缩,无需额外依赖。通过简洁的API,开发者可高效地与ClickHouse交互,进行数据分析和处理。phpClickHouse适用于各种规模的项目,提供稳定可靠的性能。

PHPClickHouse数据库查询异步Github开源项目

PHP ClickHouse 封装

下载量 Packagist 许可证

特性

  • 无依赖,仅需 Curl (支持 php >=7.1)
  • 支持并行查询 (异步)
  • 从 CSV 文件异步批量插入
  • 支持 Http 压缩 (Gzip),用于批量插入
  • 查找活跃主机,检查集群
  • 支持 SELECT WHERE IN ( 本地 csv 文件 )
  • SQL 条件和模板
  • 获取表大小和数据库大小
  • 列出分区
  • 在集群中截断表
  • 插入数组作为列
  • 获取集群中的主节点副本
  • 获取所有节点的表大小
  • 异步获取 ClickHouse 进度函数
  • 流式读/写和闭包函数

俄文文章 habr.com 1 habr.com 2

使用 composer 安装

composer require smi2/phpclickhouse

在 PHP 中使用

// 引入自动加载 $db = new ClickHouseDB\Client(['配置数组']); if (!$db->ping()) echo '连接错误';

最新稳定版本支持

  • php 5.6 <= 1.1.2
  • php 7.2 <= 1.3.10
  • php 7.3 >= 1.4.x

Packagist

开始使用

连接并选择数据库:

$config = [ 'host' => '192.168.1.1', 'port' => '8123', 'username' => 'default', 'password' => '', 'https' => true ]; $db = new ClickHouseDB\Client($config); $db->database('default'); $db->setTimeout(1.5); // 1 秒,仅支持整数值 $db->setTimeout(10); // 10 秒 $db->setConnectTimeOut(5); // 5 秒 $db->ping(true); // 如果无法连接则抛出异常

显示表:

print_r($db->showTables());

创建表:

$db->write(' CREATE TABLE IF NOT EXISTS summing_url_views ( event_date Date DEFAULT toDate(event_time), event_time DateTime, site_id Int32, site_key String, views Int32, v_00 Int32, v_55 Int32 ) ENGINE = SummingMergeTree(event_date, (site_id, site_key, event_time, event_date), 8192) ');

显示创建表语句:

echo $db->showCreateTable('summing_url_views');

插入数据:

$stat = $db->insert('summing_url_views', [ [time(), 'HASH1', 2345, 22, 20, 2], [time(), 'HASH2', 2345, 12, 9, 3], [time(), 'HASH3', 5345, 33, 33, 0], [time(), 'HASH3', 5345, 55, 0, 55], ], ['event_time', 'site_key', 'site_id', 'views', 'v_00', 'v_55'] );

如果需要插入 UInt64 值,可以使用 ClickHouseDB\Type\UInt64 DTO 包装该值。

$statement = $db->insert('table_name', [ [time(), UInt64::fromString('18446744073709551615')], ], ['event_time', 'uint64_type_column'] ); UInt64::fromString('18446744073709551615')

查询:

$statement = $db->select('SELECT * FROM summing_url_views LIMIT 2');

使用 Statement:

// 获取选择的行数 $statement->count(); // 获取总行数 $statement->countAll(); // 获取一行 $statement->fetchOne(); // 获取极值最小值 print_r($statement->extremesMin()); // 获取总计行 print_r($statement->totals()); // 获取所有结果 print_r($statement->rows()); // 获取总请求时间 print_r($statement->totalTimeRequest()); // 获取原始 JsonDecode 数组,以节省内存 print_r($statement->rawData()); // 获取原始 curl_info 响应 print_r($statement->responseInfo()); // 获取人类可读的大小信息 print_r($statement->info()); // 如果 clickhouse-server 版本 >= 54011 $db->settings()->set('output_format_write_statistics',true); print_r($statement->statistics()); // Statement 迭代器 $state=$this->client->select('SELECT (number+1) as nnums FROM system.numbers LIMIT 5'); foreach ($state as $key=>$value) { echo $value['nnums']; }

将查询结果作为树形结构:

$statement = $db->select(' SELECT event_date, site_key, sum(views), avg(views) FROM summing_url_views WHERE site_id < 3333 GROUP BY event_date, url_hash WITH TOTALS '); print_r($statement->rowsAsTree('event_date.site_key')); /* ( [2016-07-18] => Array ( [HASH2] => Array ( [event_date] => 2016-07-18 [url_hash] => HASH2 [sum(views)] => 12 [avg(views)] => 12 ) [HASH1] => Array ( [event_date] => 2016-07-18 [url_hash] => HASH1 [sum(views)] => 22 [avg(views)] => 22 ) ) ) */

删除表:

$db->write('DROP TABLE IF EXISTS summing_url_views');

特性

并行查询 (异步)

$state1 = $db->selectAsync('SELECT 1 as ping'); $state2 = $db->selectAsync('SELECT 2 as ping'); // 执行 $db->executeAsync(); // 结果 print_r($state1->rows()); print_r($state2->fetchOne('ping'));

从 CSV 文件并行批量插入

$file_data_names = [ '/tmp/clickHouseDB_test.1.data', '/tmp/clickHouseDB_test.2.data', '/tmp/clickHouseDB_test.3.data', '/tmp/clickHouseDB_test.4.data', '/tmp/clickHouseDB_test.5.data', ]; // 插入所有文件 $stat = $db->insertBatchFiles( 'summing_url_views', $file_data_names, ['event_time', 'site_key', 'site_id', 'views', 'v_00', 'v_55'] );

并行错误处理

不使用 executeAsync 的 selectAsync

$select = $db->selectAsync('SELECT * FROM summing_url_views LIMIT 1'); $insert = $db->insertBatchFiles('summing_url_views', ['/tmp/clickHouseDB_test.1.data'], ['event_time']); // 抛出 'Exception',错误信息为 'Queue must be empty, before insertBatch, need executeAsync'

参见 example/exam5_error_async.php

Gzip 和 enable_http_compression

即时读取 CSV 文件并使用 zlib.deflate 压缩。

$db->settings()->max_execution_time(200); $db->enableHttpCompression(true); $result_insert = $db->insertBatchFiles('summing_url_views', $file_data_names, [...]); foreach ($result_insert as $fileName => $state) { echo $fileName . ' => ' . json_encode($state->info_upload()) . PHP_EOL; }

参见速度测试 example/exam08_http_gzip_batch_insert.php

最大执行时间

$db->settings()->max_execution_time(200); // 秒

不带端口的连接

$config['host']='blabla.com'; $config['port']=0; // getUri() === 'http://blabla.com' $config['host']='blabla.com/urls'; $config['port']=8765; // getUri() === 'http://blabla.com/urls' $config['host']='blabla.com:2224'; $config['port']=1234; // getUri() === 'http://blabla.com:2224'

表大小和数据库大小

结果以人类可读的格式显示

print_r($db->databaseSize()); print_r($db->tablesSize()); print_r($db->tableSize('summing_partions_views'));

分区

$count_result = 2; print_r($db->partitions('summing_partions_views', $count_result));

使用本地 CSV 文件进行 SELECT WHERE IN

$file_name_data1 = '/tmp/temp_csv.txt'; // 两列文件 [int,string] $whereIn = new \ClickHouseDB\Query\WhereInFile(); $whereIn->attachFile($file_name_data1, 'namex', ['site_id' => 'Int32', 'site_hash' => 'String'], \ClickHouseDB\Query\WhereInFile::FORMAT_CSV); $result = $db->select($sql, [], $whereIn); // 参见 example/exam7_where_in.php

绑定

绑定:

$date1 = new DateTime("now"); // DateTimeInterface $Bindings = [ 'select_date' => ['2000-10-10', '2000-10-11', '2000-10-12'], 'datetime'=>$date, 'limit' => 5, 'from_table' => 'table' ]; $statement = $db->selectAsync("SELECT FROM {from_table} WHERE datetime=:datetime limit {limit}", $Bindings); // 在 {KEY} 中双重绑定 $keys=[ 'A'=>'{B}', 'B'=>':C', 'C'=>123, 'Z'=>[':C',':B',':C'] ]; $this->client->selectAsync('{A} :Z', $keys)->sql() // == "123 ':C',':B',':C' FORMAT JSON",

简单的 SQL 条件和模板

条件已被弃用,如需使用: $db->enableQueryConditions();

使用 QueryConditions 的示例:

$db->enableQueryConditions(); $input_params = [ 'select_date' => ['2000-10-10', '2000-10-11', '2000-10-12'], 'limit' => 5, 'from_table' => 'table' ]; $select = ' SELECT * FROM {from_table} WHERE {if select_date} event_date IN (:select_date) {else} event_date=today() {/if} {if limit} LIMIT {limit} {/if} '; $statement = $db->selectAsync($select, $input_params); echo $statement->sql(); /* SELECT * FROM table WHERE event_date IN ('2000-10-10','2000-10-11','2000-10-12') LIMIT 5 FORMAT JSON */ $input_params['select_date'] = false; $statement = $db->selectAsync($select, $input_params); echo $statement->sql(); /* SELECT * FROM table WHERE event_date=today() LIMIT 5 FORMAT JSON */ $state1 = $db->selectAsync( 'SELECT 1 as {key} WHERE {key} = :value', ['key' => 'ping', 'value' => 1] ); // SELECT 1 as ping WHERE ping = "1"

自定义查询退化示例见 exam16_custom_degeneration.php

SELECT {ifint VAR} result_if_intval_NON_ZERO{/if}
SELECT {ifint VAR} result_if_intval_NON_ZERO {else} BLA BLA{/if}

设置

设置任何参数的三种方式

// 在配置数组中 $config = [ 'host' => 'x', 'port' => '8123', 'username' => 'x', 'password' => 'x', 'settings' => ['max_execution_time' => 100] ]; $db = new ClickHouseDB\Client($config); // 通过构造函数设置 $config = [ 'host' => 'x', 'port' => '8123', 'username' => 'x', 'password' => 'x' ]; $db = new ClickHouseDB\Client($config, ['max_execution_time' => 100]); // 使用 set 方法 $config = [ 'host' => 'x', 'port' => '8123', 'username' => 'x', 'password' => 'x' ]; $db = new ClickHouseDB\Client($config); $db->settings()->set('max_execution_time', 100); // 使用 apply 数组方法 $db->settings()->apply([ 'max_execution_time' => 100, 'max_block_size' => 12345 ]); // 检查 if ($db->settings()->getSetting('max_execution_time') !== 100) { throw new Exception('设置工作异常'); } // 参

streamRead 类似于 WriteToFile

$stream = fopen('php://memory','r+'); $streamRead=new ClickHouseDB\Transport\StreamRead($stream); $r=$client->streamRead($streamRead,'SELECT sin(number) as sin,cos(number) as cos FROM {table_name} LIMIT 4 FORMAT JSONEachRow', ['table_name'=>'system.numbers']); rewind($stream); while (($buffer = fgets($stream, 4096)) !== false) { echo ">>> ".$buffer; } fclose($stream); // 需要关闭流 // 发送到闭包 $stream = fopen('php://memory','r+'); $streamRead=new ClickHouseDB\Transport\StreamRead($stream); $callable = function ($ch, $string) use ($stream) { // 对 _BLOCK_ 数据进行一些魔法处理 fwrite($stream, str_ireplace('"sin"','"max"',$string)); return strlen($string); }; $streamRead->closure($callable); $r=$client->streamRead($streamRead,'SELECT sin(number) as sin,cos(number) as cos FROM {table_name} LIMIT 44 FORMAT JSONEachRow', ['table_name'=>'system.numbers']);

插入关联批量数据

$oneRow = [ 'one' => 1, 'two' => 2, 'thr' => 3, ]; $failRow = [ 'two' => 2, 'one' => 1, 'thr' => 3, ]; $db->insertAssocBulk([$oneRow, $oneRow, $failRow])

认证方法

   AUTH_METHOD_HEADER       = 1;
   AUTH_METHOD_QUERY_STRING = 2;
   AUTH_METHOD_BASIC_AUTH   = 3;

在配置中设置 auth_method

$config=[ 'host'=>'host.com', //... 'auth_method'=>1, ];

进度函数

// 应用函数 $db->progressFunction(function ($data) { echo "调用函数:".json_encode($data)."\n"; }); $st=$db->select('SELECT number,sleep(0.2) FROM system.numbers limit 5'); // 打印 // ... // 调用函数:{"read_rows":"2","read_bytes":"16","total_rows":"0"} // 调用函数:{"read_rows":"3","read_bytes":"24","total_rows":"0"} // ...

ssl CA

$config = [ 'host' => 'cluster.clickhouse.dns.com', // 集群中任意节点名称 'port' => '8123', 'sslCA' => '...', ];

集群

$config = [ 'host' => 'cluster.clickhouse.dns.com', // 集群中任意节点名称 'port' => '8123', 'username' => 'default', // 所有节点使用相同的登录名和密码 'password' => '' ]; // 客户端首先通过DNS连接第一个节点,读取IP列表,然后连接所有节点以检查是否正常 $cl = new ClickHouseDB\Cluster($config); $cl->setScanTimeOut(2.5); // 2500毫秒,每个节点的最大连接时间 // 检查复制状态是否正常 if (!$cl->isReplicasIsOk()) { throw new Exception('复制状态异常,错误='.$cl->getError()); } // 获取节点数组和集群 print_r($cl->getNodes()); print_r($cl->getClusterList()); // 通过集群获取节点 $name='some_cluster_name'; print_r($cl->getClusterNodes($name)); // 获取数量 echo "> 分片数 = ".$cl->getClusterCountShard($name)."\n"; echo "> 副本数 = ".$cl->getClusterCountReplica($name)."\n"; // 通过表获取节点并打印每个节点的大小 $nodes=$cl->getNodesByTable('shara.adpreview_body_views_sharded'); foreach ($nodes as $node) { echo "$node > \n"; // 选择一个节点 print_r($cl->client($node)->tableSize('adpreview_body_views_sharded')); print_r($cl->client($node)->showCreateTable('shara.adpreview_body_views')); } // 操作单个节点 // 通过IP选择,如 "*.248*" = `123.123.123.248`,分隔符 ';',如果未找到则选择第一个节点 $cli=$cl->clientLike($name,'.298;.964'); // 首先查找 .298 然后 .964,结果是 ClickHouseDB\Client $cli->ping(); // 在集群上截断表 $result=$cl->truncateTable('dbNane.TableName_sharded'); // 获取一个活跃节点(随机) $cl->activeClient()->setTimeout(500); $cl->activeClient()->write("DROP TABLE IF EXISTS default.asdasdasd ON CLUSTER cluster2"); // 查找 `is_leader` 节点 $cl->getMasterNodeForTable('dbNane.TableName_sharded'); // 错误 var_dump($cl->getError());

返回极值

$db->enableExtremes(true);

启用查询日志

您可以在ClickHouse中记录所有查询

$db->enableLogQueries(); $db->select('SELECT 1 as p'); print_r($db->select('SELECT * FROM system.query_log')->rows());

检查是否存在

$db->isExists($database,$table);

调试和详细输出

$db->verbose();

详细输出到文件|流:

// 初始化客户端 $cli = new Client($config); $cli->verbose(); // 临时流 $stream = fopen('php://memory', 'r+'); // 设置流到curl $cli->transport()->setStdErrOut($stream); // 执行curl $st=$cli->select('SElect 1 as ppp'); $st->rows(); // 倒回 fseek($stream,0,SEEK_SET); // 输出 echo stream_get_contents($stream);

开发和PHPUnit测试

  • 不要忘记运行composer install。它应该设置PSR-4自动加载。
  • 然后您可以简单地运行vendor/bin/phpunit,它应该输出以下内容
cp phpunit.xml.dist phpunit.xml mcedit phpunit.xml

在phpunit.xml中编辑常量:

<php> <env name="CLICKHOUSE_HOST" value="127.0.0.1" /> <env name="CLICKHOUSE_PORT" value="8123" /> <env name="CLICKHOUSE_USER" value="default" /> <env name="CLICKHOUSE_DATABASE" value="phpChTestDefault" /> <env name="CLICKHOUSE_PASSWORD" value="" /> <env name="CLICKHOUSE_TMPPATH" value="/tmp" /> </php>

运行docker ClickHouse服务器

cd ./tests
docker-compose up

运行测试

./vendor/bin/phpunit ./vendor/bin/phpunit --group ClientTest ./vendor/bin/phpunit --group ClientTest --filter testInsertNestedArray ./vendor/bin/phpunit --group ConditionsTest

运行PHPStan

# 主要
./vendor/bin/phpstan analyse src tests --level 7
# 仅src
./vendor/bin/phpstan analyse src --level 7

# 示例
./vendor/bin/phpstan analyse example -a ./example/Helper.php

许可证

MIT

更新日志

请参阅 changeLog.md

编辑推荐精选

咔�片PPT

咔片PPT

AI助力,做PPT更简单!

咔片是一款轻量化在线演示设计工具,借助 AI 技术,实现从内容生成到智能设计的一站式 PPT 制作服务。支持多种文档格式导入生成 PPT,提供海量模板、智能美化、素材替换等功能,适用于销售、教师、学生等各类人群,能高效制作出高品质 PPT,满足不同场景演示需求。

讯飞绘文

讯飞绘文

选题、配图、成文,一站式创作,让内容运营更高效

讯飞绘文,一个AI集成平台,支持写作、选题、配图、排版和发布。高效生成适用于各类媒体的定制内容,加速品牌传播,提升内容营销效果。

AI助手热门AI工具AI创作AI辅助写作讯飞绘文内容运营个性化文章多平台分发
材料星

材料星

专业的AI公文写作平台,公文写作神器

AI 材料星,专业的 AI 公文写作辅助平台,为体制内工作人员提供高效的公文写作解决方案。拥有海量公文文库、9 大核心 AI 功能,支持 30 + 文稿类型生成,助力快速完成领导讲话、工作总结、述职报告等材料,提升办公效率,是体制打工人的得力写作神器。

openai-agents-python

openai-agents-python

OpenAI Agents SDK,助力开发者便捷使用 OpenAI 相关功能。

openai-agents-python 是 OpenAI 推出的一款强大 Python SDK,它为开发者提供了与 OpenAI 模型交互的高效工具,支持工具调用、结果处理、追踪等功能,涵盖多种应用场景,如研究助手、财务研究等,能显著提升开发效率,让开发者更轻松地利用 OpenAI 的技术优势。

Hunyuan3D-2

Hunyuan3D-2

高分辨率纹理 3D 资产生成

Hunyuan3D-2 是腾讯开发的用于 3D 资产生成的强大工具,支持从文本描述、单张图片或多视角图片生成 3D 模型,具备快速形状生成能力,可生成带纹理的高质量 3D 模型,适用于多个领域,为 3D 创作提供了高效解决方案。

3FS

3FS

一个具备存储、管理和客户端操作等多种功能的分布式文件系统相关项目。

3FS 是一个功能强大的分布式文件系统项目,涵盖了存储引擎、元数据管理、客户端工具等多个模块。它支持多种文件操作,如创建文件和目录、设置布局等,同时具备高效的事件循环、节点选择和协程池管理等特性。适用于需要大规模数据存储和管理的场景,能够提高系统的性能和可靠性,是分布式存储领域的优质解决方案。

TRELLIS

TRELLIS

用于可扩展和多功能 3D 生成的结构化 3D 潜在表示

TRELLIS 是一个专注于 3D 生成的项目,它利用结构化 3D 潜在表示技术,实现了可扩展且多功能的 3D 生成。项目提供了多种 3D 生成的方法和工具,包括文本到 3D、图像到 3D 等,并且支持多种输出格式,如 3D 高斯、辐射场和网格等。通过 TRELLIS,用户可以根据文本描述或图像输入快速生成高质量的 3D 资产,适用于游戏开发、动画制作、虚拟现实等多个领域。

ai-agents-for-beginners

ai-agents-for-beginners

10 节课教你开启构建 AI 代理所需的一切知识

AI Agents for Beginners 是一个专为初学者打造的课程项目,提供 10 节课程,涵盖构建 AI 代理的必备知识,支持多种语言,包含规划设计、工具使用、多代理等丰富内容,助您快速入门 AI 代理领域。

AEE

AEE

AI Excel全自动制表工具

AEE 在线 AI 全自动 Excel 编辑器,提供智能录入、自动公式、数据整理、图表生成等功能,高效处理 Excel 任务,提升办公效率。支持自动高亮数据、批量计算、不规则数据录入,适用于企业、教育、金融等多场景。

UI-TARS-desktop

UI-TARS-desktop

基于 UI-TARS 视觉语言模型的桌面应用,可通过自然语言控制计算机进行多模态操作。

UI-TARS-desktop 是一款功能强大的桌面应用,基于 UI-TARS(视觉语言模型)构建。它具备自然语言控制、截图与视觉识别、精确的鼠标键盘控制等功能,支持跨平台使用(Windows/MacOS),能提供实时反馈和状态显示,且数据完全本地处理,保障隐私安全。该应用集成了多种大语言模型和搜索方式,还可进行文件系统操作。适用于需要智能交互和自动化任务的场景,如信息检索、文件管理等。其提供了详细的文档,包括快速启动、部署、贡献指南和 SDK 使用说明等,方便开发者使用和扩展。

下拉加载更多