apache spark streaming - kafka - reading older messages -


i trying read older messages kafka spark streaming. however, able retrieve messages sent in real time (i.e., if populate new messages, while spark program running - messages).

i changing groupid , consumerid make sure zookeeper isn't not giving messages knows program has seen before.

assuming spark seeing offset in zookeeper -1, shouldn't read old messages in queue? misunderstanding way kafka queue can used? i'm new spark , kafka, can't rule out i'm misunderstanding something.

package com.kibblesandbits  import org.apache.spark.sparkcontext import org.apache.spark.streaming.{seconds, streamingcontext} import org.apache.spark.streaming.kafka.kafkautils  import net.liftweb.json._  object kafkastreamingtest {    val cfg = new configloader().load   val zookeeperhost = cfg.zookeeper.host   val zookeeperport = cfg.zookeeper.port   val zookeeper_kafka_chroot = cfg.zookeeper.kafka_chroot    implicit val formats = defaultformats     def parser(json: string): string = {     return json }  def main(args : array[string]) {   val zkquorum = "test-spark02:9092"    val group = "mygroup99"   val topic = map("testtopic" -> 1)   val sparkcontext = new sparkcontext("local[3]", "kafkaconsumer1_new")   val ssc = new streamingcontext(sparkcontext, seconds(3))   val json_stream = kafkautils.createstream(ssc, zkquorum, group, topic)   var gp = json_stream.map(_._2).map(parser)    gp.saveastextfiles("/tmp/sparkstreaming/mytest", "json")   ssc.start() } 

when running this, see following message. confident it's not not seeing messages because offset set.

14/12/05 13:34:08 info consumerfetchermanager: [consumerfetchermanager-1417808045047] added fetcher partitions arraybuffer([[testtopic,0], initoffset -1 broker id:1,host:test-spark02.vpc,port:9092] , [[testtopic,1], initoffset -1 broker d:1,host:test-spark02.vpc,port:9092] , [[testtopic,2], initoffset -1 broker id:1,host:test-spark02.vpc,port:9092] , [[testtopic,3], initoffset -1 broker id:1,host:test-spark02.vpc,port:9092] , [[testtopic,4], initoffset -1 broker id:1,host:test-spark02.vpc,port:9092] )

then, if populate 1000 new messages -- can see 1000 messages saved in temp directory. don't know how read existing messages, should number in (at point) tens of thousands.

use alternative factory method on kafkautils lets provide configuration kafka consumer:

def createstream[k: classtag, v: classtag, u <: decoder[_]: classtag, t <: decoder[_]: classtag](       ssc: streamingcontext,       kafkaparams: map[string, string],       topics: map[string, int],       storagelevel: storagelevel     ): receiverinputdstream[(k, v)] 

then build map kafka configuration , add parameter 'kafka.auto.offset.reset' set 'smallest':

val kafkaparams = map[string, string](       "zookeeper.connect" -> zkquorum, "group.id" -> groupid,       "zookeeper.connection.timeout.ms" -> "10000",       "kafka.auto.offset.reset" -> "smallest" ) 

provide config factory method above. "kafka.auto.offset.reset" -> "smallest" tells consumer starts smallest offset in topic.


Comments

Popular posts from this blog

python - mat is not a numerical tuple : openCV error -

c# - MSAA finds controls UI Automation doesn't -

wordpress - .htaccess: RewriteRule: bad flag delimiters -