Kafka-php使用纯粹的PHP编写的kafka客户端,目前支持0.8.x以上版本的Kafka,该项目v0.2.x和v0.1.x不兼容,如果使用原有的v0.1.x的可以参照文档 KafkaPHPv0.1.xDocumet,不过建议切换到v0.2.x上。v0.2.x使用PHP异步执行的方式来和kafkabroker交互,较v0.1.x更加稳定高效,由于使用PHP语言编写所以不用编译任何的扩展就可以使用,降低了接入与维护成本。
安装环境要求PHP版本大于5.5
KafkaServer版本大于0.8.0
消费模块KafkaServer版本需要大于0.9.0
Istallatio使用Composer安装添加composer依赖 mred/kafka-php 到项目的 composer.jso 文件中即可,如:
{"require": {"mred/kafka-php": "0.2.*"}}Produce<?phprequire '../vedor/autoload.php';date_default_timezoe_set('PRC');use Moolog\Logger;use Moolog\Hadler\StdoutHadler;// Create the logger$logger = ew Logger('my_logger');// Now add some hadlers$logger->pushHadler(ew StdoutHadler());// 设置生产相关配置,具体配置参数见 [Cofiguratio](Cofiguratio.md)$cofig = \Kafka\ProducerCofig::getIstace();$cofig->setMetadataRefreshItervalMs(10000);$cofig->setMetadataBrokerList('10.13.4.159:9192');$cofig->setBrokerVersio('0.9.0.1');$cofig->setRequiredAck(1);$cofig->setIsAsy(false);$cofig->setProduceIterval(500);$producer = ew \Kafka\Producer(fuctio() {retur array(array('topic' => 'test','value' => 'test....message.','key' => 'testkey',),);});$producer->setLogger($logger);$producer->success(fuctio($result) {var_dump($result);});$producer->error(fuctio($errorCode, $cotext) {var_dump($errorCode);});$producer->sed();Cosumer<?phprequire '../vedor/autoload.php';date_default_timezoe_set('PRC');use Moolog\Logger;use Moolog\Hadler\StdoutHadler;// Create the logger$logger = ew Logger('my_logger');// Now add some hadlers$logger->pushHadler(ew StdoutHadler());$cofig = \Kafka\CosumerCofig::getIstace();$cofig->setMetadataRefreshItervalMs(10000);$cofig->setMetadataBrokerList('10.13.4.159:9192');$cofig->setGroupId('test');$cofig->setBrokerVersio('0.9.0.1');$cofig->setTopics(array('test'));//$cofig->setOffsetReset('earliest');$cosumer = ew \Kafka\Cosumer();$cosumer->setLogger($logger);$cosumer->start(fuctio($topic, $part, $message) {var_dump($message);});BasicProtocol基础协议API调用方式见 Example










评论