Commit 4c974ca2 authored by Simon Rettberg's avatar Simon Rettberg
Browse files

Handle some uncaught runtime errors, fix race condition

parent 9820962a
Showing with 88 additions and 20 deletions
+88 -20
......@@ -67,6 +67,11 @@ public abstract class AbstractTask implements Runnable
* work when we're done.
*/
private volatile FinishCallback finishCallback = null;
/**
* Set to true as soon as we try to start this task using the thread pool executor.
*/
private volatile boolean triedToStart = false;
/**
* Default constructor which should not be overridden
......@@ -252,13 +257,22 @@ public abstract class AbstractTask implements Runnable
*/
public final boolean canStart()
{
if ( !this.initDone || this.id == null || this.getStatus().getStatusCode() != StatusCode.TASK_WAITING ) {
if ( this.triedToStart || !this.initDone || this.id == null || this.getStatus().getStatusCode() != StatusCode.TASK_WAITING ) {
return false;
}
if ( this.parent == null )
return true;
return parent.getStatusCode() != StatusCode.TASK_WAITING && parent.getStatusCode() != StatusCode.TASK_PROCESSING;
}
/**
* Mark this task as being started to prevent a race condition where
* the task would be submitted to the thread pool more than once.
*/
public final void markAsStarting()
{
this.triedToStart = true;
}
/**
* Checks whether this task can be removed from the task manager.
......@@ -291,7 +305,7 @@ public abstract class AbstractTask implements Runnable
try {
ret = execute();
} catch ( Throwable t ) {
LOG.warn( "Task " + this.getClass().getSimpleName() + " failed with uncaught exception: " + t.toString() );
LOG.warn( "Task " + this.getClass().getSimpleName() + " failed with uncaught exception", t );
ret = false;
}
if ( ret ) {
......
......@@ -17,8 +17,21 @@ public abstract class SystemCommandTask extends AbstractTask
private Process process = null;
protected int timeoutSeconds = 0;
@Override
protected final boolean execute()
{
try {
return execInternal();
} catch ( Exception e ) {
log.warn( "Unexpected exception when executing " + getId() + ": " + e.toString() );
processStdErrInternal( e.toString() );
return processEnded( -3 );
}
}
private final boolean execInternal()
{
command = initCommandLine();
if ( command == null || command.length == 0 ) {
......@@ -31,7 +44,13 @@ public abstract class SystemCommandTask extends AbstractTask
try {
// Create process
process = pb.start();
try {
process = pb.start();
} catch ( Exception e ) {
log.warn( "Process of task " + getId() + " died." );
processStdErrInternal( e.toString() );
return processEnded( -2 );
}
final Process p = process;
processStarted();
p.getOutputStream();
......@@ -46,10 +65,11 @@ public abstract class SystemCommandTask extends AbstractTask
String line;
while ( ( line = reader.readLine() ) != null ) {
synchronized ( p ) {
processStdOut( line );
processStdOutInternal( line );
}
}
} catch ( Exception e ) {
e.printStackTrace();
}
}
} );
......@@ -63,10 +83,11 @@ public abstract class SystemCommandTask extends AbstractTask
String line;
while ( ( line = reader.readLine() ) != null ) {
synchronized ( p ) {
processStdErr( line );
processStdErrInternal( line );
}
}
} catch ( Exception e ) {
e.printStackTrace();
}
}
} );
......@@ -75,7 +96,30 @@ public abstract class SystemCommandTask extends AbstractTask
stderr.start();
// Wait for everything
process.waitFor();
int retval = -1;
if ( this.timeoutSeconds <= 0 ) {
retval = process.waitFor();
} else {
int togo = timeoutSeconds * 10;
while ( togo-- > 0 ) {
try {
retval = process.exitValue();
break;
} catch ( IllegalThreadStateException e1 ) {
// Still running....
try {
Thread.sleep( 100 );
} catch ( Exception e2 ) {
// Bummer....
}
}
}
}
try {
stdout.join( 500 );
stderr.join( 500 );
} catch ( Throwable t ) {
}
try {
process.getErrorStream().close();
} catch ( Throwable t ) {
......@@ -84,22 +128,15 @@ public abstract class SystemCommandTask extends AbstractTask
process.getOutputStream().close();
} catch ( Throwable t ) {
}
stdout.join( 2000 );
stderr.join( 2000 );
return processEnded( process.exitValue() );
synchronized ( p ) {
return processEnded( retval );
}
} catch ( IOException e ) {
log.warn( "Process of task " + getId() + " died." );
processStdErr( e.toString() );
return processEnded( -2 );
} catch ( InterruptedException e ) {
processEnded( -4 );
Thread.currentThread().interrupt();
return false;
} catch ( Exception e ) {
log.warn( "Unexpected exception when executing " + getId() + ": " + e.toString() );
processStdErr( e.toString() );
return processEnded( -3 );
} finally {
if ( process != null )
process.destroy();
......@@ -134,6 +171,24 @@ public abstract class SystemCommandTask extends AbstractTask
return toStdIn( text.getBytes( StandardCharsets.UTF_8 ) );
}
private final void processStdOutInternal( String line )
{
try {
processStdOut( line );
} catch ( Throwable t ) {
log.warn( "processStdOut failed", t );
}
}
private final void processStdErrInternal( String line )
{
try {
processStdErr( line );
} catch ( Throwable t ) {
log.warn( "processStdErr failed", t );
}
}
/**
* Called to get the command line. Each argument should be a separate array
* element. Returning null means the task should not run (as the arguments
......
......@@ -38,9 +38,7 @@ public class Taskmanager implements FinishCallback, Runnable
private final Map<String, Class<? extends AbstractTask>> tasks = new ConcurrentHashMap<>();
/**
* All the running/finished task instances. The mainloop will call wait() on this and this object
* is notified as soon as the mainloop should check if there is any task available that can be
* run.
* All the running/finished task instances.
*/
private final Map<String, AbstractTask> instances = new ConcurrentHashMap<>();
......@@ -166,6 +164,7 @@ public class Taskmanager implements FinishCallback, Runnable
if ( task.canStart() ) {
log.debug( "Started Task " + task.getClass().getSimpleName() + " (" + task.getId() + ")" );
threadPool.execute( task );
task.markAsStarting();
}
}
} catch ( RejectedExecutionException e ) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment