apache spark streaming - kafka - reading older messages -
i trying read older messages kafka spark streaming. however, able retrieve messages sent in real time (i.e., if populate new messages, while spark program running - messages).
i changing groupid , consumerid make sure zookeeper isn't not giving messages knows program has seen before.
assuming spark seeing offset in zookeeper -1, shouldn't read old messages in queue? misunderstanding way kafka queue can used? i'm new spark , kafka, can't rule out i'm misunderstanding something.
package com.kibblesandbits import org.apache.spark.sparkcontext import org.apache.spark.streaming.{seconds, streamingcontext} import org.apache.spark.streaming.kafka.kafkautils import net.liftweb.json._ object kafkastreamingtest { val cfg = new configloader().load val zookeeperhost = cfg.zookeeper.host val zookeeperport = cfg.zookeeper.port val zookeeper_kafka_chroot = cfg.zookeeper.kafka_chroot implicit val formats = defaultformats def parser(json: string): string = { return json } def main(args : array[string]) { val zkquorum = "test-spark02:9092" val group = "mygroup99" val topic = map("testtopic" -> 1) val sparkcontext = new sparkcontext("local[3]", "kafkaconsumer1_new") val ssc = new streamingcontext(sparkcontext, seconds(3)) val json_stream = kafkautils.createstream(ssc, zkquorum, group, topic) var gp = json_stream.map(_._2).map(parser) gp.saveastextfiles("/tmp/sparkstreaming/mytest", "json") ssc.start() }
when running this, see following message. confident it's not not seeing messages because offset set.
14/12/05 13:34:08 info consumerfetchermanager: [consumerfetchermanager-1417808045047] added fetcher partitions arraybuffer([[testtopic,0], initoffset -1 broker id:1,host:test-spark02.vpc,port:9092] , [[testtopic,1], initoffset -1 broker d:1,host:test-spark02.vpc,port:9092] , [[testtopic,2], initoffset -1 broker id:1,host:test-spark02.vpc,port:9092] , [[testtopic,3], initoffset -1 broker id:1,host:test-spark02.vpc,port:9092] , [[testtopic,4], initoffset -1 broker id:1,host:test-spark02.vpc,port:9092] )
then, if populate 1000 new messages -- can see 1000 messages saved in temp directory. don't know how read existing messages, should number in (at point) tens of thousands.
use alternative factory method on kafkautils
lets provide configuration kafka consumer:
def createstream[k: classtag, v: classtag, u <: decoder[_]: classtag, t <: decoder[_]: classtag]( ssc: streamingcontext, kafkaparams: map[string, string], topics: map[string, int], storagelevel: storagelevel ): receiverinputdstream[(k, v)]
then build map kafka configuration , add parameter 'kafka.auto.offset.reset' set 'smallest':
val kafkaparams = map[string, string]( "zookeeper.connect" -> zkquorum, "group.id" -> groupid, "zookeeper.connection.timeout.ms" -> "10000", "kafka.auto.offset.reset" -> "smallest" )
provide config factory method above. "kafka.auto.offset.reset" -> "smallest" tells consumer starts smallest offset in topic.
Comments
Post a Comment