kafka_open ()

阅读(190) 标签: 连接, 对象,

描述:

连接kafka server服务器。

语法:  

kafka_open(filenamefileObject, topic,...,partitionSize)

备注:

外部库函数,连接server服务器,将属性参数properties存放到以后缀名为”. properties”的配置文件中。

参数:

topic

查询具体的某个topic或多个topic

filename

以后缀名为”. properties”的属性参数文件,其中涉及发送时的键、值编码与接收时的解码,且它们需要对应。

fileObject

file文件对象

partitionSize

topic的分区数,若已经存在对应的主题,则忽略此参数,即不支持修改主题分区大小。

 

返回值:

连接对象

示例:

 

A

 

1

=kafka_open("D://kafka.properties","topic-test")

topictopic-test的配置文件kafka.propertie连接kafka server

2

=kafka_open(file("D://kafka.properties"),"topic-test",3)

file对象方式连接kafka server,其余同上

3

=kafka_close(A1)

 

 

其中D://kafka.properties内容为:

##produce 

bootstrap.servers=192.168.0.1:9092

producer.type=sync 

request.required.acks=1 

serializer.class=kafka.serializer.DefaultEncoder 

key.serializer=org.apache.kafka.common.serialization.StringSerializer 

value.serializer=org.apache.kafka.common.serialization.StringSerializer 

 

##consume 

group.id=test

zookeeper.session.timeout.ms=1000 

zookeeper.sync.time.ms=200 

auto.commit.interval.ms=500 

#手动模式,此时的auto.commit.interval.ms无效

enable.auto.commit=false

auto.offset.reset=earliest

key.deserializer=org.apache.kafka.common.serialization.StringDeserializer 

value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

自定义编码、解码使用说明:
将包含编码、解码的jar文件放到exlib\KafkaCil目录下,如类似下图所示:

scu-kaufa-cil-2.11.jar中,编码文件为EncodeingSequence.class,解码文件为DecodeingSequence.class, 对应的消息体对象为序列Sequnce

.properties文件中配置自定义的编码与解码:
key.serializer=org.apache.kafka.common.serialization.StringSerializer 
value.serializer=com.raqsoft.lib.kafka.EncodeingSequence

 

key.deserializer=org.apache.kafka.common.serialization.StringDeserializer 

value.deserializer=com.raqsoft.lib.kafka.DecodeingSequence