multithreading - Java - Define a timeout for Callable within a ExecutorCompletionService -


i've got following problem using executorcompletionservice. want call lot of callable in different threads. these callable don't share information each other. need define timeout each callable, eg. not run longer 5 seconds. each callable can run in different time not know when starting. after timeout thread should stopped/killed , result not interesting more me. other 'normal' running threads should not infuenced.

so lets take following example simple callable , current java code.

import java.util.date; import java.util.concurrent.callable;  public class job implements callable<integer> {      int returnvalue = 0;     long millis = 0;      public job(long millis, int value) {         this.millis = millis;         this.returnvalue = value;     }      @override     public integer call() throws exception, interruptedexception {         try {             system.out.println(new date() + " " + returnvalue + " started");             thread.sleep(millis);             system.out.println(new date() + " " + returnvalue + " finished");             return returnvalue;         } catch (interruptedexception e) {             system.out.println(new date() + " " + returnvalue + " interrupted");             throw e;         }             } } 

and other class callable used.

import java.util.arraylist; import java.util.date; import java.util.concurrent.*;  public class callabletest {      public static void main(string[] args) {         executorservice newfixedthreadpool = executors.newfixedthreadpool(2);         completionservice<integer> pool = new executorcompletionservice<integer>(newfixedthreadpool);          (int = 10; > 0; i--) {             job job = new job(i * 1000, i);             pool.submit(job);         }          arraylist<integer> results = new arraylist<integer>();         (int = 1; < 11; ++i) {             try {                 future<integer> future = pool.take();                 integer content = future.get(5, timeunit.seconds);                 results.add(content);                 system.out.println(new date() + " added " + content);             } catch (interruptedexception e) {                 e.printstacktrace();             } catch (timeoutexception e) {                 e.printstacktrace();             } catch (executionexception e) {                 e.printstacktrace();             } catch (exception e) {                 e.printstacktrace();             }         }         newfixedthreadpool.shutdownnow();          system.out.println(new date() + " results:");         (int j : results) {             system.out.println(new date() + " " + j);         }     } } 

the ouput like:

sun jun 29 08:01:00 cest 2014 10 started sun jun 29 08:01:00 cest 2014 9 started sun jun 29 08:01:09 cest 2014 9 finished sun jun 29 08:01:09 cest 2014 added 9 sun jun 29 08:01:09 cest 2014 8 started sun jun 29 08:01:10 cest 2014 10 finished sun jun 29 08:01:10 cest 2014 7 started sun jun 29 08:01:10 cest 2014 added 10 sun jun 29 08:01:17 cest 2014 7 finished sun jun 29 08:01:17 cest 2014 6 started sun jun 29 08:01:17 cest 2014 added 7 sun jun 29 08:01:17 cest 2014 8 finished sun jun 29 08:01:17 cest 2014 added 8 sun jun 29 08:01:17 cest 2014 5 started sun jun 29 08:01:22 cest 2014 5 finished sun jun 29 08:01:22 cest 2014 added 5 sun jun 29 08:01:22 cest 2014 4 started sun jun 29 08:01:23 cest 2014 6 finished sun jun 29 08:01:23 cest 2014 3 started sun jun 29 08:01:23 cest 2014 added 6 sun jun 29 08:01:26 cest 2014 3 finished sun jun 29 08:01:26 cest 2014 2 started sun jun 29 08:01:26 cest 2014 added 3 sun jun 29 08:01:26 cest 2014 4 finished sun jun 29 08:01:26 cest 2014 1 started sun jun 29 08:01:26 cest 2014 added 4 sun jun 29 08:01:27 cest 2014 1 finished sun jun 29 08:01:27 cest 2014 added 1 sun jun 29 08:01:28 cest 2014 2 finished sun jun 29 08:01:28 cest 2014 added 2 sun jun 29 08:01:28 cest 2014 results: sun jun 29 08:01:28 cest 2014 9 sun jun 29 08:01:28 cest 2014 10 sun jun 29 08:01:28 cest 2014 7 sun jun 29 08:01:28 cest 2014 8 sun jun 29 08:01:28 cest 2014 5 sun jun 29 08:01:28 cest 2014 6 sun jun 29 08:01:28 cest 2014 3 sun jun 29 08:01:28 cest 2014 4 sun jun 29 08:01:28 cest 2014 1 sun jun 29 08:01:28 cest 2014 2  

that not work like have it. want each callable running longer 5 seconds should terminated/ended/interruped , callable running lower 5 seconds give me valid result.

i tried without executorcompletionservice

public class callabletest2 {     public static void main(string[] args) {         executorservice newfixedthreadpool = executors.newfixedthreadpool(2);         list<future<integer>> futures = new arraylist<future<integer>>();          (int = 10; > 0; i--) {             job job = new job(i * 1000, i);             futures.add(newfixedthreadpool.submit(job));         }          arraylist<integer> results = new arraylist<integer>();         (future<integer> future: futures) {             try {                 integer content = future.get(5, timeunit.seconds);                 results.add(content);                 system.out.println(new date() + " added " + content);             } catch (interruptedexception e) {                 e.printstacktrace();             } catch (timeoutexception e) {                 e.printstacktrace();             } catch (executionexception e) {                 e.printstacktrace();             } catch (exception e) {                 e.printstacktrace();             }         }         newfixedthreadpool.shutdownnow();          system.out.println(new date() + " results:");         (int j : results) {             system.out.println(new date() + " " + j);         }     } } 

with results:

sun jun 29 08:33:19 cest 2014 9 started sun jun 29 08:33:19 cest 2014 10 started java.util.concurrent.timeoutexception     @ java.util.concurrent.futuretask$sync.innerget(futuretask.java:228)     @ java.util.concurrent.futuretask.get(futuretask.java:91)     @ callabletest.callabletest2.main(callabletest2.java:29) sun jun 29 08:33:28 cest 2014 9 finished sun jun 29 08:33:28 cest 2014 8 started sun jun 29 08:33:28 cest 2014 added 9 sun jun 29 08:33:29 cest 2014 10 finished sun jun 29 08:33:29 cest 2014 7 started java.util.concurrent.timeoutexception     @ java.util.concurrent.futuretask$sync.innerget(futuretask.java:228)     @ java.util.concurrent.futuretask.get(futuretask.java:91)     @ callabletest.callabletest2.main(callabletest2.java:29) sun jun 29 08:33:36 cest 2014 7 finished sun jun 29 08:33:36 cest 2014 added 7 sun jun 29 08:33:36 cest 2014 6 started sun jun 29 08:33:36 cest 2014 8 finished sun jun 29 08:33:36 cest 2014 5 started java.util.concurrent.timeoutexception     @ java.util.concurrent.futuretask$sync.innerget(sun jun 29 08:33:41 cest 2014 5 finished futuretask.java:228) sun jun 29 08:33:41 cest 2014 added 5     @ java.util.concurrent.futuretask.get(futuretask.java:91) sun jun 29 08:33:41 cest 2014 4 started     @ callabletest.callabletest2.main(callabletest2.java:29) sun jun 29 08:33:42 cest 2014 6 finished sun jun 29 08:33:42 cest 2014 3 started sun jun 29 08:33:45 cest 2014 3 finished sun jun 29 08:33:45 cest 2014 2 started sun jun 29 08:33:45 cest 2014 4 finished sun jun 29 08:33:45 cest 2014 added 4 sun jun 29 08:33:45 cest 2014 added 3 sun jun 29 08:33:45 cest 2014 1 started sun jun 29 08:33:46 cest 2014 1 finished sun jun 29 08:33:47 cest 2014 2 finished sun jun 29 08:33:47 cest 2014 added 2 sun jun 29 08:33:47 cest 2014 added 1 sun jun 29 08:33:47 cest 2014 results: sun jun 29 08:33:47 cest 2014 9 sun jun 29 08:33:47 cest 2014 7 sun jun 29 08:33:47 cest 2014 5 sun jun 29 08:33:47 cest 2014 4 sun jun 29 08:33:47 cest 2014 3 sun jun 29 08:33:47 cest 2014 2 sun jun 29 08:33:47 cest 2014 1 

now timeoutexceptions, not expect them. eg. callable running 9 , 7 seconds not throw exception!

what have change in code, results of short running threads , kill long running ones. in example results 1-5 without 6-10.

i've tested lot of things can't work. please help


this answer post of bstar55 using scheduledexecutorservice.

i changed code regarding hint to:

public class callabletest3 {      public static void main(string[] args) {         scheduledexecutorservice executor = executors.newscheduledthreadpool(2);         list<future<integer>> futures = new arraylist<future<integer>>();          (int = 10; > 0; i--) {             job job = new job(i * 1000, i);             final future handler = executor.submit(job);             final int x = i;             executor.schedule(new runnable() {                  public void run() {                     boolean cancel = handler.cancel(true);                     if(cancel){                         system.out.println(new date() + " job " + x + " cancelled");                     }else{                         system.out.println(new date() + " job " + x + " not cancelled");                     }                 }             }, 5000, timeunit.milliseconds);             futures.add(handler);         }          arraylist<integer> results = new arraylist<integer>();         (future<integer> future : futures) {             try {                 integer content = future.get(5, timeunit.seconds);                 results.add(content);                 system.out.println(new date() + " added " + content);             } catch (interruptedexception e) {                 e.printstacktrace();             } catch (timeoutexception e) {                 e.printstacktrace();             } catch (executionexception e) {                 e.printstacktrace();             } catch (exception e) {                 e.printstacktrace();             }         }         executor.shutdown();          system.out.println(new date() + " results:");         (int j : results) {             system.out.println(new date() + " --- " + j);         }     } } 

but not work expected. result:

sun jun 29 10:27:41 cest 2014 9 started sun jun 29 10:27:41 cest 2014 10 started java.util.concurrent.timeoutexception     @ java.util.concurrent.futuretask$sync.innerget(futuretask.java:228)     @ java.util.concurrent.futuretask.get(futuretask.java:91)     @ callabletest.callabletest3.main(callabletest3.java:43) sun jun 29 10:27:50 cest 2014 9 finished sun jun 29 10:27:50 cest 2014 added 9 sun jun 29 10:27:50 cest 2014 8 started sun jun 29 10:27:51 cest 2014 10 finished sun jun 29 10:27:51 cest 2014 7 started java.util.concurrent.timeoutexception     @ java.util.concurrent.futuretask$sync.innerget(futuretask.java:228)     @ java.util.concurrent.futuretask.get(futuretask.java:91)     @ callabletest.callabletest3.main(callabletest3.java:43) sun jun 29 10:27:58 cest 2014 8 finished sun jun 29 10:27:58 cest 2014 6 started sun jun 29 10:27:58 cest 2014 7 finished sun jun 29 10:27:58 cest 2014 5 started sun jun 29 10:27:58 cest 2014 added 7 sun jun 29 10:28:03 cest 2014 5 finished sun jun 29 10:28:03 cest 2014 4 started java.util.concurrent.timeoutexception     @ java.util.concurrent.futuretask$sync.innerget(futuretask.java:228)     @ java.util.concurrent.futuretask.get(futuretask.java:91) sun jun 29 10:28:03 cest 2014 added 5     @ callabletest.callabletest3.main(callabletest3.java:43) sun jun 29 10:28:04 cest 2014 6 finished sun jun 29 10:28:04 cest 2014 3 started sun jun 29 10:28:07 cest 2014 3 finished sun jun 29 10:28:07 cest 2014 2 started sun jun 29 10:28:07 cest 2014 4 finished sun jun 29 10:28:07 cest 2014 added 4 sun jun 29 10:28:07 cest 2014 added 3 sun jun 29 10:28:07 cest 2014 1 started java.util.concurrent.cancellationexception     @ java.util.concurrent.futuretask$sync.innerget(futuretask.java:230)     @ java.util.concurrent.futuretask.get(futuretask.java:91)     @ callabletest.callabletest3.main(callabletest3.java:43) sun jun 29 10:28:08 cest 2014 1 finished sun jun 29 10:28:08 cest 2014 job 10 not cancelled sun jun 29 10:28:08 cest 2014 job 9 not cancelled sun jun 29 10:28:08 cest 2014 job 8 not cancelled sun jun 29 10:28:08 cest 2014 job 7 not cancelled sun jun 29 10:28:08 cest 2014 job 6 not cancelled sun jun 29 10:28:08 cest 2014 job 5 not cancelled sun jun 29 10:28:08 cest 2014 job 4 not cancelled sun jun 29 10:28:08 cest 2014 job 3 not cancelled sun jun 29 10:28:08 cest 2014 2 interrupted sun jun 29 10:28:08 cest 2014 job 1 not cancelled sun jun 29 10:28:08 cest 2014 added 1 sun jun 29 10:28:08 cest 2014 results: sun jun 29 10:28:08 cest 2014 --- 9 sun jun 29 10:28:08 cest 2014 --- 7 sun jun 29 10:28:08 cest 2014 --- 5 sun jun 29 10:28:08 cest 2014 --- 4 sun jun 29 10:28:08 cest 2014 --- 3 sun jun 29 10:28:08 cest 2014 --- 1 sun jun 29 10:28:08 cest 2014 job 2 cancelled 

but instead job 2 cancelled!


i suggest divide problem 2 separate ones:

  1. run on multiple threads
  2. use timeout each operation

for first (multithreading), used service executor can manage on 2 threads : executors.newfixedthreadpool(2). if apply timeout here, timeout act run of tasks, need timeout each job.

for timout issue, can manage new service executor per job in class: jobmanager.

package com.stackoverflow.q24473796;  import java.util.concurrent.callable; import java.util.concurrent.executionexception; import java.util.concurrent.executorservice; import java.util.concurrent.executors; import java.util.concurrent.timeunit; import java.util.concurrent.timeoutexception;  public class jobmanager implements callable<integer> {  protected long timeout; protected timeunit timeunit; protected callable<integer> job;  public jobmanager(long timeout, timeunit timeunit, callable<integer> job) { this.timeout = timeout; this.timeunit = timeunit; this.job = job; }  @override public integer call() {     integer result = new integer(-1); // default, adapted     executorservice exec = executors.newsinglethreadexecutor();      try {         result = exec.submit(job).get(timeout, timeunit);     } catch (interruptedexception | executionexception | timeoutexception e) {         // whatever want         if (e instanceof timeoutexception) {             system.out.println("timeout " + job.tostring());         } else {             system.out.println("exception " + job.tostring() + " : " + e.getmessage());         }      }     exec.shutdown();     return result;     } } 

then, can call tasks main thread following:

    job job = new job(i * 1000, i);     future<integer> future = newfixedthreadpool.submit(new jobmanager(5, timeunit.seconds, job)); 

i addapted callabletest: package com.stackoverflow.q24473796;

import java.util.arraylist; import java.util.date; import java.util.list; import java.util.concurrent.executionexception; import java.util.concurrent.executorservice; import java.util.concurrent.executors; import java.util.concurrent.future; import java.util.concurrent.timeunit;  public class callabletest {      public static void main(string[] args) {         executorservice newfixedthreadpool = executors.newfixedthreadpool(2);          list<future<integer>> futures = new arraylist<future<integer>>();         (int = 10; > 0; i--) {             job job = new job(i * 1000, i);             future<integer> future = newfixedthreadpool.submit(new   jobmanager(5, timeunit.seconds, job));             futures.add(future);         }          arraylist<integer> results = new arraylist<integer>();         (future<integer> future : futures) {             integer result = new integer(-1);             try {                 result = future.get();             } catch (interruptedexception | executionexception e) {                 e.printstacktrace();             }             if (result != -1) {                 results.add(result);             }         }          newfixedthreadpool.shutdown();          try {             newfixedthreadpool.awaittermination(60, timeunit.seconds); //global timeout         } catch (interruptedexception e) {             e.printstacktrace();         }          system.out.println(new date() + " results:");         (int j : results) {             system.out.println(new date() + " " + j);         }     } } 

and you'll following output:

wed apr 29 10:51:02 cest 2015 10 started wed apr 29 10:51:02 cest 2015 9 started timeout com.stackoverflow.q24473796.job@249fe45c timeout com.stackoverflow.q24473796.job@249fe45c wed apr 29 10:51:07 cest 2015 8 started wed apr 29 10:51:07 cest 2015 7 started wed apr 29 10:51:11 cest 2015 9 finished timeout com.stackoverflow.q24473796.job@3cd4c5a0 timeout com.stackoverflow.q24473796.job@3cd4c5a0 wed apr 29 10:51:12 cest 2015 6 started wed apr 29 10:51:12 cest 2015 5 started wed apr 29 10:51:12 cest 2015 10 finished wed apr 29 10:51:14 cest 2015 7 finished wed apr 29 10:51:15 cest 2015 8 finished wed apr 29 10:51:17 cest 2015 5 finished wed apr 29 10:51:17 cest 2015 4 started timeout com.stackoverflow.q24473796.job@2a0fded2 wed apr 29 10:51:17 cest 2015 3 started wed apr 29 10:51:18 cest 2015 6 finished wed apr 29 10:51:20 cest 2015 3 finished wed apr 29 10:51:20 cest 2015 2 started wed apr 29 10:51:21 cest 2015 4 finished wed apr 29 10:51:21 cest 2015 1 started wed apr 29 10:51:22 cest 2015 1 finished wed apr 29 10:51:22 cest 2015 2 finished wed apr 29 10:51:22 cest 2015 results: wed apr 29 10:51:22 cest 2015 5 wed apr 29 10:51:22 cest 2015 4 wed apr 29 10:51:22 cest 2015 3 wed apr 29 10:51:22 cest 2015 2 wed apr 29 10:51:22 cest 2015 1 

Comments

Popular posts from this blog

google api - Incomplete response from Gmail API threads.list -

qml - Is it possible to implement SystemTrayIcon functionality in Qt Quick application -

double exclamation marks in haskell -