Spark Streaming Cache and Transformations -



Spark Streaming Cache and Transformations -

i new spark, using spark streaming kafka..

my streaming duration 1 second.

assume 100 records in 1st batch , 120 records in 2nd batch , 80 records in 3rd batch

--> {sec 1 1,2,...100} --> {sec 2 1,2..120} --> {sec 3 1,2,..80}

i apply logic in 1st batch , have result => result1

i want utilize result1 while processing 2nd batch , have combined result of both result1 , 120 records of 2nd batch => result2

i tried cache result not able cached result1 in 2s possible? or show lite on how accomplish goal here?

javapairreceiverinputdstream<string, string> messages = kafkautils.createstream(jssc, string.class,string.class, stringdecoder.class,stringdecoder.class, kafkaparams, topicmap, storagelevel.memory_and_disk_ser_2());

i process messages , find word result 1 second.

if(resultcp!=null){ resultcp.print(); result = resultcp.union(words.mapvalues(new sum())); }else{ result = words.mapvalues(new sum()); } resultcp = result.cache();

when in 2nd batch resultcp should not null returns null value @ given time have particular seconds info lone want find cumulative result. 1 know how it..

i learnt 1 time spark streaming started jssc.start() command no more @ our end lies spark. possible send result of 1st batch 2nd batch find accumulated value?

any help much much appreciated. in advance.

i think looking updatestatebykey creates new dstream applying cummulative function provided dstream , state. illustration spark illustration bundle covers case in question:

first, need update function takes new values , known value:

val updatefunc = (values: seq[int], state: option[int]) => { val currentcount = values.sum val previouscount = state.getorelse(0) some(currentcount + previouscount) }

that function used create dstream cummulates values source dstream. this:

// create networkinputdstream on target ip:port , count // words in input stream of \n delimited test (eg. generated 'nc') val lines = ssc.sockettextstream(args(0), args(1).toint) val words = lines.flatmap(_.split(" ")) val worddstream = words.map(x => (x, 1)) // update cumulative count using updatestatebykey // give dstream made of state (which cumulative count of words) val statedstream = worddstream.updatestatebykey[int](updatefunc)

source: https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/statefulnetworkwordcount.scala

apache-spark spark-streaming

Comments

Popular posts from this blog

xslt - DocBook 5 to PDF transform failing with error: "fo:flow" is missing child elements. Required content model: marker* -

mediawiki - How do I insert tables inside infoboxes on Wikia pages? -

Local Service User Logged into Windows -