KafkaBridge Apache Kafka 客户端 SDK开源项目

我要开发同款
匿名用户2018年10月16日
126阅读

技术信息

开源地址
https://github.com/Qihoo360/kafkabridge
授权协议
MIT

作品详情

KafkaBridge是奇虎360开源的Kafka客户端SDK,底层基于librdkafka,与之相比封装了大量的使用细节,简单易用,使用者无需了解过多的Kafka系统细节,只需调用极少量的接口,就可完成消息的生产和消费。此外,针对使用者比较关心的消息生产的可靠性,作了近一步的提升。

特点

支持多种语言:c++/c、php、pytho、golag,且各语言接口完全统一;

接口少,简单易用;

针对高级用户,支持通过配置文件调整所有的librdkafka的配置;

在非按key写入数据的情况下,尽最大努力将消息成功写入;

支持同步和异步两种数据写入方式;

在消费时,除默认自动提交offset外,允许用户通过配置手动提交offset;

在php-fpm场景中,复用长连接生产消息,避免频繁创建断开连接的开销;

编译

依赖liblog4cplus,boost,swig-3.0.12,cmake

gitcloe

gitcloe--recursive https://github.com/Qihoo360/kafkabridge.git

cxx/c

进入cxx/c目录,执行build.sh-release,在./lib/release下会产生libqbus.so。

go

进入go目录,执行build.sh,在gopath/src/qbus目录下生成qbus.go和libQBus_go.so。

pytho

进入pytho目录,执行build.sh,在当前目录生成qbus.py和_qbus.so。编译脚本提供了选项,可以通过-h查看。可以通过-s选项传递pytho相关头文件路径。默认-s/usr/local/pytho2.7/iclude/pytho2.7

php

进入pytho目录,执行build.sh,当前目录生成扩展qbus.so和qbus.php。编译脚本提供了选项,可以通过-h查看。可以通过s选项传递php相关头文件路径,可以通过-v传递php的版本。默认选项-s/usr/local/php-vphp。

使用数据生产

在非按key写入的情况下,sdk尽最大努力提交每一条消息,只要Kafka集群存有一台broker正常,就会重试发送;

每次写入数据只需要调用produce接口,在异步发送的场景下,通过返回值可以判断发送队列是否填满,发送队列可通过配置文件调整;

在同步发送的场景中,produce接口返回当前消息是否写入成功,但是写入性能会有所下降,CPU使用率会有所上升,推荐还是使用异步写入方式;。

下面是生产接口,以c++为例:

bool QbusProducer::iit(cost strig& broker_list, cost strig& log_path, cost strig& cofig_path, cost strig& topic)bool QbusProducer::produce(cost char* data, size_t data_le, cost std::strig& key)void QbusProducer::uiit()

c++sdk的使用范例:

#iclude <strig>#iclude <iostream>#iclude "qbus_producer.h"it mai(it argc, cost char* argv[]) {    qbus::QbusProducer qbus_producer;    if (!qbus_producer.iit("127.0.0.1:9092",                    "./log",                    "./cofig",                    "topic_test")) {        std::cout << "Failed to iit" << std::edl;        retur 0;    }    std::strig msg("test\");    if (!qbus_producer.produce(msg.c_str(), msg.legth(), "key")) {        std::cout << "Failed to produce" << std::edl;    }    qbus_producer.uiit();    retur 0;}数据消费

消费只需调用subscribeOe订阅topic(也支持同时订阅多个topic),然后执行start就开始消费,当前进程非阻塞,每条消息通过callback接口回调给使用者;

sdk还支持用户手动提交offset方式,用户可以通过callback中返回的消息体,在代码其他逻辑中进行提交。

下面是消费接口,以c++为例:

bool QbusCosumer::iit(strig broker_list, strig log_path, strig cofig_path, QbusCosumerCallback& callback)bool QbusCosumer::subscribeOe(strig group, strig topic)bool QbusCosumer::start()void QbusCosumer::stop()

c++sdk的使用范例:

#iclude <iostream>#iclude "qbus_cosumer.h"qbus::QbusCosumer qbus_cosumer;class MyCallback: public qbus::QbusCosumerCallback {    public:        virtual void deliveryMsg(cost std::strig& topic,                    cost char* msg,                    cost size_t msg_le) cost {            std::cout << "topic: " << topic << " | msg: " << std::strig(msg, msg_le) << std::edl;        }};it mai(it argc, char* argv[]) {    MyCallback my_callback;    if (qbus_cosumer.iit("127.0.0.1:9092",                    "log",                    "cofig",                    my_callback)) {        if (qbus_cosumer.subscribeOe("groupid_test", "topic_test")) {            if (!qbus_cosumer.start()) {                std::cout << "Failed to start" << std::edl;                retur NULL;            }            while (1) sleep(1);  //可以执行其他业务逻辑            qbus_cosumer.stop();        } else {            std::cout << "Failed subscribe" << std::edl;        }    } else {        std::cout << "Failed iit" << std::edl;    }    retur 0;}

功能介绍

KafkaBridge 是奇虎 360 开源的 Kafka 客户端 SDK ,底层基于 librdkafka ,与之相比封装了大量的使用细节,简单易用,使用者无需了解过多的 Kafka 系统细节...

声明:本文仅代表作者观点,不代表本站立场。如果侵犯到您的合法权益,请联系我们删除侵权资源!如果遇到资源链接失效,请您通过评论或工单的方式通知管理员。未经允许,不得转载,本站所有资源文章禁止商业使用运营!
下载安装【程序员客栈】APP
实时对接需求、及时收发消息、丰富的开放项目需求、随时随地查看项目状态

评论