kafka_subscribe()

阅读(401) 标签: 连接, 对象,

描述:

连接kafka server服务器。

语法:

kafka_subscribe (prop:value,....;topic,...;keyType, valueType)   

kafka_subscribe (filenamefileObject; topic,...; keyType, valueType)

备注:

连接server服务器, 将属性参数propertiesk:v形式输入,也可以将属性参数properties存放到以后缀名为”. properties”的配置文件中。

参数:

prop:value

properties属性参数,以k:v形式输入,其属性参数有多个,可参考kafka相关文档

topic

查询具体的某个topic或多个topic

keyType

consumerkey数据类型,缺省为string, 支持类型有int, interger, float, long, string

filename

以后缀名为”. properties”的属性参数文件

fileObject

file文件对象

valueType

consumerkey数据类型,缺省为string, 支持类型有int, interger,float, long, string

返回值:

consumer对象

示例:

 

A

 

1

=kafka_subscribe("D:/kafka_string.properties";"test1";"String","byte[]")

topictest1keyString, valuebyte[]的配置文件kafka_string.propertie连接kafka server

2

=kafka_subscribe(file("D:/kafka_string.properties");"test1";"String","byte[]")

以配置文件对象方式连接kafka server,其余同上

3

=kafka_subscribe("bootstrap.servers":"192.168.0.144:9092","zookeeper.connect":"192.168.0.144:2181",

"group.id":"test","zookeeper.session.timeout.ms":"4000","zookeeper.sync.time.ms":"200",

"auto.commit.interval.ms":"500","auto.offset.reset":"latest",

"serializer.class":"kafka.serializer.StringEncoder",

"key.deserializer":"org.apache.kafka.common.serialization.StringDeserializer",

"value.deserializer":"org.apache.kafka.common.serialization.ByteArrayDeserializer";

"test1";"String","byte[]")

以参数方式连接kafaka server, 其余同上