描述:
语法:
kafka_open(filename或fileObject, topic,...,partitionSize)
备注:
外部库函数,外部库的使用请参考《外部库使用指南》。
连接server服务器,将属性参数properties存放到以后缀名为”. properties”的配置文件中。
参数:
topic |
查询具体的某个topic或多个topic。 |
filename |
以后缀名为”. properties”的属性参数文件,其中涉及发送时的键、值编码与接收时的解码,且它们需要对应。 |
fileObject |
file文件对象。 |
partitionSize |
topic的分区数,若已经存在对应的主题,则忽略此参数,即不支持修改主题分区大小。 |
返回值:
连接对象
示例:
|
A |
|
1 |
=kafka_open("D://kafka.properties","topic-test") |
用topic为topic-test的配置文件kafka.propertie连接kafka server。 |
2 |
=kafka_open(file("D://kafka.properties"),"topic-test",3) |
以file对象方式连接kafka server,其余同上。 |
3 |
=kafka_close(A1) |
|
其中D://kafka.properties内容为:
##produce
bootstrap.servers=192.168.0.1:9092
producer.type=sync
request.required.acks=1
serializer.class=kafka.serializer.DefaultEncoder
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
##consume
group.id=test
zookeeper.session.timeout.ms=1000
zookeeper.sync.time.ms=200
auto.commit.interval.ms=500
#手动模式,此时的auto.commit.interval.ms无效
enable.auto.commit=false
auto.offset.reset=earliest
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
自定义编码、解码使用说明:
将包含编码、解码的jar文件放到exlib\KafkaCil目录下,如类似下图所示:
在scu-kaufa-cil-2.10.jar中,编码文件为EncodeingSequence.class,解码文件为DecodeingSequence.class, 对应的消息体对象为序列Sequnce。
在.properties文件中配置自定义的编码与解码:
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=com.scudata.lib.kafka.EncodeingSequence
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=com.scudata.lib.kafka.DecodeingSequence