java - How to implement custom job listener/tracker in Spark? -


i have class below, , when run through command line want see progress status. thing like,

10% completed...  30% completed...  100% completed...job done! 

i using spark 1.0 on yarn , using java api.

public class myjavawordcount {     public static void main(string[] args) throws exception {         if (args.length < 2) {             system.err.println("usage: myjavawordcount <master> <file>");             system.exit(1);         }         system.out.println("args[0]: <master>="+args[0]);         system.out.println("args[1]: <file>="+args[1]);          javasparkcontext ctx = new javasparkcontext(                 args[0],                 "myjavawordcount",                 system.getenv("spark_home"),                 system.getenv("spark_examples_jar"));         javardd<string> lines = ctx.textfile(args[1], 1);  //      output                                            input   output                  javardd<string> words = lines.flatmap(new flatmapfunction<string, string>() {             //              output       input              public iterable<string> call(string s) {                 return arrays.aslist(s.split(" "));             }         });  //          k       v                                                input   k       v          javapairrdd<string, integer> ones = words.maptopair(new pairfunction<string, string, integer>() {             //            k       v             input              public tuple2<string, integer> call(string s) {                 //                k       v                  return new tuple2<string, integer>(s, 1);             }         });          javapairrdd<string, integer> counts = ones.reducebykey(new function2<integer, integer, integer>() {             public integer call(integer i1, integer i2) {                 return i1 + i2;             }         });          list<tuple2<string, integer>> output = counts.collect();         (tuple2 tuple : output) {             system.out.println(tuple._1 + ": " + tuple._2);         }         system.exit(0);     } } 

if using scala-spark code adding spark listener.

create sparkcontext

val sc=new sparkcontext(sparkconf)  

now can add spark listener in spark context

sc.addsparklistener(new sparklistener() {   override def onapplicationstart(applicationstart: sparklistenerapplicationstart) {     println("spark applicationstart: " + applicationstart.appname);   }    override def onapplicationend(applicationend: sparklistenerapplicationend) {     println("spark applicationend: " + applicationend.time);   }  }); 

here is list of interface listening events spark schedule.


Comments

Popular posts from this blog

google api - Incomplete response from Gmail API threads.list -

qml - Is it possible to implement SystemTrayIcon functionality in Qt Quick application -

double exclamation marks in haskell -