multithreading - Java - Define a timeout for Callable within a ExecutorCompletionService -
i've got following problem using executorcompletionservice. want call lot of callable in different threads. these callable don't share information each other. need define timeout each callable, eg. not run longer 5 seconds. each callable can run in different time not know when starting. after timeout thread should stopped/killed , result not interesting more me. other 'normal' running threads should not infuenced.
so lets take following example simple callable , current java code.
import java.util.date; import java.util.concurrent.callable; public class job implements callable<integer> { int returnvalue = 0; long millis = 0; public job(long millis, int value) { this.millis = millis; this.returnvalue = value; } @override public integer call() throws exception, interruptedexception { try { system.out.println(new date() + " " + returnvalue + " started"); thread.sleep(millis); system.out.println(new date() + " " + returnvalue + " finished"); return returnvalue; } catch (interruptedexception e) { system.out.println(new date() + " " + returnvalue + " interrupted"); throw e; } } }
and other class callable used.
import java.util.arraylist; import java.util.date; import java.util.concurrent.*; public class callabletest { public static void main(string[] args) { executorservice newfixedthreadpool = executors.newfixedthreadpool(2); completionservice<integer> pool = new executorcompletionservice<integer>(newfixedthreadpool); (int = 10; > 0; i--) { job job = new job(i * 1000, i); pool.submit(job); } arraylist<integer> results = new arraylist<integer>(); (int = 1; < 11; ++i) { try { future<integer> future = pool.take(); integer content = future.get(5, timeunit.seconds); results.add(content); system.out.println(new date() + " added " + content); } catch (interruptedexception e) { e.printstacktrace(); } catch (timeoutexception e) { e.printstacktrace(); } catch (executionexception e) { e.printstacktrace(); } catch (exception e) { e.printstacktrace(); } } newfixedthreadpool.shutdownnow(); system.out.println(new date() + " results:"); (int j : results) { system.out.println(new date() + " " + j); } } }
the ouput like:
sun jun 29 08:01:00 cest 2014 10 started sun jun 29 08:01:00 cest 2014 9 started sun jun 29 08:01:09 cest 2014 9 finished sun jun 29 08:01:09 cest 2014 added 9 sun jun 29 08:01:09 cest 2014 8 started sun jun 29 08:01:10 cest 2014 10 finished sun jun 29 08:01:10 cest 2014 7 started sun jun 29 08:01:10 cest 2014 added 10 sun jun 29 08:01:17 cest 2014 7 finished sun jun 29 08:01:17 cest 2014 6 started sun jun 29 08:01:17 cest 2014 added 7 sun jun 29 08:01:17 cest 2014 8 finished sun jun 29 08:01:17 cest 2014 added 8 sun jun 29 08:01:17 cest 2014 5 started sun jun 29 08:01:22 cest 2014 5 finished sun jun 29 08:01:22 cest 2014 added 5 sun jun 29 08:01:22 cest 2014 4 started sun jun 29 08:01:23 cest 2014 6 finished sun jun 29 08:01:23 cest 2014 3 started sun jun 29 08:01:23 cest 2014 added 6 sun jun 29 08:01:26 cest 2014 3 finished sun jun 29 08:01:26 cest 2014 2 started sun jun 29 08:01:26 cest 2014 added 3 sun jun 29 08:01:26 cest 2014 4 finished sun jun 29 08:01:26 cest 2014 1 started sun jun 29 08:01:26 cest 2014 added 4 sun jun 29 08:01:27 cest 2014 1 finished sun jun 29 08:01:27 cest 2014 added 1 sun jun 29 08:01:28 cest 2014 2 finished sun jun 29 08:01:28 cest 2014 added 2 sun jun 29 08:01:28 cest 2014 results: sun jun 29 08:01:28 cest 2014 9 sun jun 29 08:01:28 cest 2014 10 sun jun 29 08:01:28 cest 2014 7 sun jun 29 08:01:28 cest 2014 8 sun jun 29 08:01:28 cest 2014 5 sun jun 29 08:01:28 cest 2014 6 sun jun 29 08:01:28 cest 2014 3 sun jun 29 08:01:28 cest 2014 4 sun jun 29 08:01:28 cest 2014 1 sun jun 29 08:01:28 cest 2014 2
that not work like have it. want each callable running longer 5 seconds should terminated/ended/interruped , callable running lower 5 seconds give me valid result.
i tried without executorcompletionservice
public class callabletest2 { public static void main(string[] args) { executorservice newfixedthreadpool = executors.newfixedthreadpool(2); list<future<integer>> futures = new arraylist<future<integer>>(); (int = 10; > 0; i--) { job job = new job(i * 1000, i); futures.add(newfixedthreadpool.submit(job)); } arraylist<integer> results = new arraylist<integer>(); (future<integer> future: futures) { try { integer content = future.get(5, timeunit.seconds); results.add(content); system.out.println(new date() + " added " + content); } catch (interruptedexception e) { e.printstacktrace(); } catch (timeoutexception e) { e.printstacktrace(); } catch (executionexception e) { e.printstacktrace(); } catch (exception e) { e.printstacktrace(); } } newfixedthreadpool.shutdownnow(); system.out.println(new date() + " results:"); (int j : results) { system.out.println(new date() + " " + j); } } }
with results:
sun jun 29 08:33:19 cest 2014 9 started sun jun 29 08:33:19 cest 2014 10 started java.util.concurrent.timeoutexception @ java.util.concurrent.futuretask$sync.innerget(futuretask.java:228) @ java.util.concurrent.futuretask.get(futuretask.java:91) @ callabletest.callabletest2.main(callabletest2.java:29) sun jun 29 08:33:28 cest 2014 9 finished sun jun 29 08:33:28 cest 2014 8 started sun jun 29 08:33:28 cest 2014 added 9 sun jun 29 08:33:29 cest 2014 10 finished sun jun 29 08:33:29 cest 2014 7 started java.util.concurrent.timeoutexception @ java.util.concurrent.futuretask$sync.innerget(futuretask.java:228) @ java.util.concurrent.futuretask.get(futuretask.java:91) @ callabletest.callabletest2.main(callabletest2.java:29) sun jun 29 08:33:36 cest 2014 7 finished sun jun 29 08:33:36 cest 2014 added 7 sun jun 29 08:33:36 cest 2014 6 started sun jun 29 08:33:36 cest 2014 8 finished sun jun 29 08:33:36 cest 2014 5 started java.util.concurrent.timeoutexception @ java.util.concurrent.futuretask$sync.innerget(sun jun 29 08:33:41 cest 2014 5 finished futuretask.java:228) sun jun 29 08:33:41 cest 2014 added 5 @ java.util.concurrent.futuretask.get(futuretask.java:91) sun jun 29 08:33:41 cest 2014 4 started @ callabletest.callabletest2.main(callabletest2.java:29) sun jun 29 08:33:42 cest 2014 6 finished sun jun 29 08:33:42 cest 2014 3 started sun jun 29 08:33:45 cest 2014 3 finished sun jun 29 08:33:45 cest 2014 2 started sun jun 29 08:33:45 cest 2014 4 finished sun jun 29 08:33:45 cest 2014 added 4 sun jun 29 08:33:45 cest 2014 added 3 sun jun 29 08:33:45 cest 2014 1 started sun jun 29 08:33:46 cest 2014 1 finished sun jun 29 08:33:47 cest 2014 2 finished sun jun 29 08:33:47 cest 2014 added 2 sun jun 29 08:33:47 cest 2014 added 1 sun jun 29 08:33:47 cest 2014 results: sun jun 29 08:33:47 cest 2014 9 sun jun 29 08:33:47 cest 2014 7 sun jun 29 08:33:47 cest 2014 5 sun jun 29 08:33:47 cest 2014 4 sun jun 29 08:33:47 cest 2014 3 sun jun 29 08:33:47 cest 2014 2 sun jun 29 08:33:47 cest 2014 1
now timeoutexceptions, not expect them. eg. callable running 9 , 7 seconds not throw exception!
what have change in code, results of short running threads , kill long running ones. in example results 1-5 without 6-10.
i've tested lot of things can't work. please help
this answer post of bstar55 using scheduledexecutorservice.
i changed code regarding hint to:
public class callabletest3 { public static void main(string[] args) { scheduledexecutorservice executor = executors.newscheduledthreadpool(2); list<future<integer>> futures = new arraylist<future<integer>>(); (int = 10; > 0; i--) { job job = new job(i * 1000, i); final future handler = executor.submit(job); final int x = i; executor.schedule(new runnable() { public void run() { boolean cancel = handler.cancel(true); if(cancel){ system.out.println(new date() + " job " + x + " cancelled"); }else{ system.out.println(new date() + " job " + x + " not cancelled"); } } }, 5000, timeunit.milliseconds); futures.add(handler); } arraylist<integer> results = new arraylist<integer>(); (future<integer> future : futures) { try { integer content = future.get(5, timeunit.seconds); results.add(content); system.out.println(new date() + " added " + content); } catch (interruptedexception e) { e.printstacktrace(); } catch (timeoutexception e) { e.printstacktrace(); } catch (executionexception e) { e.printstacktrace(); } catch (exception e) { e.printstacktrace(); } } executor.shutdown(); system.out.println(new date() + " results:"); (int j : results) { system.out.println(new date() + " --- " + j); } } }
but not work expected. result:
sun jun 29 10:27:41 cest 2014 9 started sun jun 29 10:27:41 cest 2014 10 started java.util.concurrent.timeoutexception @ java.util.concurrent.futuretask$sync.innerget(futuretask.java:228) @ java.util.concurrent.futuretask.get(futuretask.java:91) @ callabletest.callabletest3.main(callabletest3.java:43) sun jun 29 10:27:50 cest 2014 9 finished sun jun 29 10:27:50 cest 2014 added 9 sun jun 29 10:27:50 cest 2014 8 started sun jun 29 10:27:51 cest 2014 10 finished sun jun 29 10:27:51 cest 2014 7 started java.util.concurrent.timeoutexception @ java.util.concurrent.futuretask$sync.innerget(futuretask.java:228) @ java.util.concurrent.futuretask.get(futuretask.java:91) @ callabletest.callabletest3.main(callabletest3.java:43) sun jun 29 10:27:58 cest 2014 8 finished sun jun 29 10:27:58 cest 2014 6 started sun jun 29 10:27:58 cest 2014 7 finished sun jun 29 10:27:58 cest 2014 5 started sun jun 29 10:27:58 cest 2014 added 7 sun jun 29 10:28:03 cest 2014 5 finished sun jun 29 10:28:03 cest 2014 4 started java.util.concurrent.timeoutexception @ java.util.concurrent.futuretask$sync.innerget(futuretask.java:228) @ java.util.concurrent.futuretask.get(futuretask.java:91) sun jun 29 10:28:03 cest 2014 added 5 @ callabletest.callabletest3.main(callabletest3.java:43) sun jun 29 10:28:04 cest 2014 6 finished sun jun 29 10:28:04 cest 2014 3 started sun jun 29 10:28:07 cest 2014 3 finished sun jun 29 10:28:07 cest 2014 2 started sun jun 29 10:28:07 cest 2014 4 finished sun jun 29 10:28:07 cest 2014 added 4 sun jun 29 10:28:07 cest 2014 added 3 sun jun 29 10:28:07 cest 2014 1 started java.util.concurrent.cancellationexception @ java.util.concurrent.futuretask$sync.innerget(futuretask.java:230) @ java.util.concurrent.futuretask.get(futuretask.java:91) @ callabletest.callabletest3.main(callabletest3.java:43) sun jun 29 10:28:08 cest 2014 1 finished sun jun 29 10:28:08 cest 2014 job 10 not cancelled sun jun 29 10:28:08 cest 2014 job 9 not cancelled sun jun 29 10:28:08 cest 2014 job 8 not cancelled sun jun 29 10:28:08 cest 2014 job 7 not cancelled sun jun 29 10:28:08 cest 2014 job 6 not cancelled sun jun 29 10:28:08 cest 2014 job 5 not cancelled sun jun 29 10:28:08 cest 2014 job 4 not cancelled sun jun 29 10:28:08 cest 2014 job 3 not cancelled sun jun 29 10:28:08 cest 2014 2 interrupted sun jun 29 10:28:08 cest 2014 job 1 not cancelled sun jun 29 10:28:08 cest 2014 added 1 sun jun 29 10:28:08 cest 2014 results: sun jun 29 10:28:08 cest 2014 --- 9 sun jun 29 10:28:08 cest 2014 --- 7 sun jun 29 10:28:08 cest 2014 --- 5 sun jun 29 10:28:08 cest 2014 --- 4 sun jun 29 10:28:08 cest 2014 --- 3 sun jun 29 10:28:08 cest 2014 --- 1 sun jun 29 10:28:08 cest 2014 job 2 cancelled
but instead job 2 cancelled!
i suggest divide problem 2 separate ones:
- run on multiple threads
- use timeout each operation
for first (multithreading), used service executor can manage on 2 threads : executors.newfixedthreadpool(2)
. if apply timeout here, timeout act run of tasks, need timeout each job.
for timout issue, can manage new service executor per job in class: jobmanager.
package com.stackoverflow.q24473796; import java.util.concurrent.callable; import java.util.concurrent.executionexception; import java.util.concurrent.executorservice; import java.util.concurrent.executors; import java.util.concurrent.timeunit; import java.util.concurrent.timeoutexception; public class jobmanager implements callable<integer> { protected long timeout; protected timeunit timeunit; protected callable<integer> job; public jobmanager(long timeout, timeunit timeunit, callable<integer> job) { this.timeout = timeout; this.timeunit = timeunit; this.job = job; } @override public integer call() { integer result = new integer(-1); // default, adapted executorservice exec = executors.newsinglethreadexecutor(); try { result = exec.submit(job).get(timeout, timeunit); } catch (interruptedexception | executionexception | timeoutexception e) { // whatever want if (e instanceof timeoutexception) { system.out.println("timeout " + job.tostring()); } else { system.out.println("exception " + job.tostring() + " : " + e.getmessage()); } } exec.shutdown(); return result; } }
then, can call tasks main thread following:
job job = new job(i * 1000, i); future<integer> future = newfixedthreadpool.submit(new jobmanager(5, timeunit.seconds, job));
i addapted callabletest: package com.stackoverflow.q24473796;
import java.util.arraylist; import java.util.date; import java.util.list; import java.util.concurrent.executionexception; import java.util.concurrent.executorservice; import java.util.concurrent.executors; import java.util.concurrent.future; import java.util.concurrent.timeunit; public class callabletest { public static void main(string[] args) { executorservice newfixedthreadpool = executors.newfixedthreadpool(2); list<future<integer>> futures = new arraylist<future<integer>>(); (int = 10; > 0; i--) { job job = new job(i * 1000, i); future<integer> future = newfixedthreadpool.submit(new jobmanager(5, timeunit.seconds, job)); futures.add(future); } arraylist<integer> results = new arraylist<integer>(); (future<integer> future : futures) { integer result = new integer(-1); try { result = future.get(); } catch (interruptedexception | executionexception e) { e.printstacktrace(); } if (result != -1) { results.add(result); } } newfixedthreadpool.shutdown(); try { newfixedthreadpool.awaittermination(60, timeunit.seconds); //global timeout } catch (interruptedexception e) { e.printstacktrace(); } system.out.println(new date() + " results:"); (int j : results) { system.out.println(new date() + " " + j); } } }
and you'll following output:
wed apr 29 10:51:02 cest 2015 10 started wed apr 29 10:51:02 cest 2015 9 started timeout com.stackoverflow.q24473796.job@249fe45c timeout com.stackoverflow.q24473796.job@249fe45c wed apr 29 10:51:07 cest 2015 8 started wed apr 29 10:51:07 cest 2015 7 started wed apr 29 10:51:11 cest 2015 9 finished timeout com.stackoverflow.q24473796.job@3cd4c5a0 timeout com.stackoverflow.q24473796.job@3cd4c5a0 wed apr 29 10:51:12 cest 2015 6 started wed apr 29 10:51:12 cest 2015 5 started wed apr 29 10:51:12 cest 2015 10 finished wed apr 29 10:51:14 cest 2015 7 finished wed apr 29 10:51:15 cest 2015 8 finished wed apr 29 10:51:17 cest 2015 5 finished wed apr 29 10:51:17 cest 2015 4 started timeout com.stackoverflow.q24473796.job@2a0fded2 wed apr 29 10:51:17 cest 2015 3 started wed apr 29 10:51:18 cest 2015 6 finished wed apr 29 10:51:20 cest 2015 3 finished wed apr 29 10:51:20 cest 2015 2 started wed apr 29 10:51:21 cest 2015 4 finished wed apr 29 10:51:21 cest 2015 1 started wed apr 29 10:51:22 cest 2015 1 finished wed apr 29 10:51:22 cest 2015 2 finished wed apr 29 10:51:22 cest 2015 results: wed apr 29 10:51:22 cest 2015 5 wed apr 29 10:51:22 cest 2015 4 wed apr 29 10:51:22 cest 2015 3 wed apr 29 10:51:22 cest 2015 2 wed apr 29 10:51:22 cest 2015 1
Comments
Post a Comment