kafka_open ()

阅读(36) 标签: 连接, 对象,

描述:

连接kafka server服务器。

语法:  

kafka_open(filenamefileObject; topic,...; partitionSize)

备注:

外部库函数,连接server服务器,将属性参数properties存放到以后缀名为”. properties”的配置文件中。支持单机模式与群集模式。群集连接服务器推荐使用leader所在的设备。

参数:

topic

查询具体的某个topic或多个topic

filename

以后缀名为”. properties”的属性参数文件,其中涉及发送时的键、值编码与接收时的解码,且它们需要对应。

fileObject

file文件对象

partitionSize

topic的分区数量,可以省略;在群集下创建topic时才有效。

选项:

@c

群集选项,支持对群集的操作

 

返回值:

consumer对象

示例:

 

A

 

1

=kafka_open("D://kafka.properties";"topic-test")

topictopic-test的配置文件kafka.propertie连接kafka server

2

=kafka_open@c(file("D://kafka.properties");"topic-test";3)

file对象方式连接kafka server,其余同上

3

=kafka_close(A1)

 

 

其中D://kafka.properties内容为:

##produce 

bootstrap.servers=192.168.0.1:9092

producer.type=sync 

request.required.acks=1 

serializer.class=kafka.serializer.DefaultEncoder 

key.serializer=org.apache.kafka.common.serialization.StringSerializer 

value.serializer=org.apache.kafka.common.serialization.StringSerializer 

 

##consume 

group.id=test

zookeeper.session.timeout.ms=1000 

zookeeper.sync.time.ms=200 

auto.commit.interval.ms=500 

auto.offset.reset=earliest

key.deserializer=org.apache.kafka.common.serialization.StringDeserializer 

value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

自定义编码、解码使用说明:
将包含编码、解码的jar文件放到exlib\KafkaCil目录下,如类似下图所示:

scu-kaufa-cil-2.11.jar中,编码文件为EncodeingSequence.class,解码文件为DecodeingSequence.class, 对应的消息体对象为序列Sequnce

.properties文件中配置自定义的编码与解码:
key.serializer=org.apache.kafka.common.serialization.StringSerializer 
value.serializer=com.raqsoft.lib.kafka.EncodeingSequence

 

key.deserializer=org.apache.kafka.common.serialization.StringDeserializer 

value.deserializer=com.raqsoft.lib.kafka.DecodeingSequence