Spark Streaming Cache and Transformations -
Spark Streaming Cache and Transformations -
i new spark, using spark streaming kafka..
my streaming duration 1 second.
assume 100 records in 1st batch , 120 records in 2nd batch , 80 records in 3rd batch
--> {sec 1 1,2,...100} --> {sec 2 1,2..120} --> {sec 3 1,2,..80}
i apply logic in 1st batch , have result => result1
i want utilize result1 while processing 2nd batch , have combined result of both result1 , 120 records of 2nd batch => result2
i tried cache result not able cached result1 in 2s possible? or show lite on how accomplish goal here?
javapairreceiverinputdstream<string, string> messages = kafkautils.createstream(jssc, string.class,string.class, stringdecoder.class,stringdecoder.class, kafkaparams, topicmap, storagelevel.memory_and_disk_ser_2());
i process messages , find word result 1 second.
if(resultcp!=null){ resultcp.print(); result = resultcp.union(words.mapvalues(new sum())); }else{ result = words.mapvalues(new sum()); } resultcp = result.cache();
when in 2nd batch resultcp should not null returns null value @ given time have particular seconds info lone want find cumulative result. 1 know how it..
i learnt 1 time spark streaming started jssc.start()
command no more @ our end lies spark. possible send result of 1st batch 2nd batch find accumulated value?
any help much much appreciated. in advance.
i think looking updatestatebykey
creates new dstream applying cummulative function provided dstream , state. illustration spark illustration bundle covers case in question:
first, need update function takes new values , known value:
val updatefunc = (values: seq[int], state: option[int]) => { val currentcount = values.sum val previouscount = state.getorelse(0) some(currentcount + previouscount) }
that function used create dstream cummulates values source dstream. this:
// create networkinputdstream on target ip:port , count // words in input stream of \n delimited test (eg. generated 'nc') val lines = ssc.sockettextstream(args(0), args(1).toint) val words = lines.flatmap(_.split(" ")) val worddstream = words.map(x => (x, 1)) // update cumulative count using updatestatebykey // give dstream made of state (which cumulative count of words) val statedstream = worddstream.updatestatebykey[int](updatefunc)
source: https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/statefulnetworkwordcount.scala
apache-spark spark-streaming
Comments
Post a Comment