Commit a16dee65 authored by Egon Nathan Bittencourt Araujo's avatar Egon Nathan Bittencourt Araujo
Browse files

Merge branch 'merge' into 'develop'

Merge with germany



See merge request !2
parents 8fbf952b d61df947
Showing with 236 additions and 123 deletions
+236 -123
......@@ -21,19 +21,19 @@ public abstract class AbstractTask implements Runnable
* The id of the task instance.
*/
@Expose
private String id = null;
private volatile String id = null;
/**
* Parent task. This task won't be started as long as the parent task currently
* waiting for execution or is being executed. Otherwise this task is available for execution.
* Note that MAX_INSTANCES is still being taken into account. Set to null to ignore.
*/
@Expose
private String parentTask = null;
private volatile String parentTask = null;
/**
* If the parent task failed to execute, don't run this task and fail immediately.
*/
@Expose
private boolean failOnParentFail = true;
private volatile boolean failOnParentFail = true;
/*
* Variables we're working with - these should never be set from incoming (json) data
......@@ -57,11 +57,21 @@ public abstract class AbstractTask implements Runnable
/**
* Status of Task
*/
private TaskStatus status = TaskStatus.ts_waiting;
private volatile TaskStatus status = TaskStatus.ts_waiting;
/**
* Reference to parent task
*/
private AbstractTask parent = null;
private volatile AbstractTask parent = null;
/**
* Reference to "owner" so we can tell it to check for more
* work when we're done.
*/
private volatile FinishCallback finishCallback = null;
/**
* Set to true as soon as we try to start this task using the thread pool executor.
*/
private volatile boolean triedToStart = false;
/**
* Default constructor which should not be overridden
......@@ -79,12 +89,24 @@ public abstract class AbstractTask implements Runnable
* Initialize the task; method used by the {@link Taskmanager}.
* Put your own initialization code in initTask()
*/
public final boolean init( AbstractTask parent )
public final boolean init( AbstractTask parent, FinishCallback finishCallback )
{
if ( this.initDone ) {
LOG.fatal( "init() called twice on " + this.getClass().getSimpleName() );
System.exit( 1 );
}
if ( this.parentTask != null && this.parentTask.isEmpty() )
this.parentTask = null;
if ( this.parentTask != null && parent == null ) {
if ( this.failOnParentFail ) {
this.status = new TaskStatus( StatusCode.PARENT_FAILED, this.id );
return false;
}
this.parentTask = null;
}
if ( this.parentTask == null && parent != null )
parent = null;
this.finishCallback = finishCallback;
this.parent = parent;
this.status = new TaskStatus( StatusCode.TASK_WAITING, this.id );
this.initDone = true;
......@@ -116,6 +138,19 @@ public abstract class AbstractTask implements Runnable
*/
protected abstract boolean execute();
/**
* This is called when a client requests the status of this task. In case you
* want to return complex structures like Lists, which are not thread safe, you
* might want to keep that list outside the status class you return, and only
* create a copy of it for the status class in this function.
* If you only return more or less atomic data, you don't need to override
* this function
*/
protected void updateStatus()
{
}
/*
* Final methods
*/
......@@ -126,7 +161,7 @@ public abstract class AbstractTask implements Runnable
* @return id of parent task, null if no parent set
*
*/
public String getParentTaskId()
public final String getParentTaskId()
{
return this.parentTask;
}
......@@ -151,7 +186,7 @@ public abstract class AbstractTask implements Runnable
*/
public final TaskStatus getStatus()
{
if ( this.initDone && this.parentTask != null ) {
if ( this.initDone && this.parent != null ) {
final StatusCode parentStatus = parent.getStatusCode();
switch ( parentStatus ) {
case DUPLICATE_ID:
......@@ -160,8 +195,10 @@ public abstract class AbstractTask implements Runnable
case NO_SUCH_CONSTRUCTOR:
case PARENT_FAILED:
case TASK_ERROR:
if ( this.failOnParentFail )
if ( this.failOnParentFail ) {
this.status.statusCode = StatusCode.PARENT_FAILED;
LOG.debug( "Parent " + this.parentTask + " of " + this.id + " failed." );
}
this.parentTask = null;
break;
default:
......@@ -220,13 +257,22 @@ public abstract class AbstractTask implements Runnable
*/
public final boolean canStart()
{
if ( !this.initDone || this.id == null || this.getStatus().getStatusCode() != StatusCode.TASK_WAITING ) {
if ( this.triedToStart || !this.initDone || this.id == null || this.getStatus().getStatusCode() != StatusCode.TASK_WAITING ) {
return false;
}
if ( this.parent == null )
return true;
return parent.getStatusCode() != StatusCode.TASK_WAITING && parent.getStatusCode() != StatusCode.TASK_PROCESSING;
}
/**
* Mark this task as being started to prevent a race condition where
* the task would be submitted to the thread pool more than once.
*/
public final void markAsStarting()
{
this.triedToStart = true;
}
/**
* Checks whether this task can be removed from the task manager.
......@@ -259,7 +305,7 @@ public abstract class AbstractTask implements Runnable
try {
ret = execute();
} catch ( Throwable t ) {
LOG.warn( "Task " + this.getClass().getSimpleName() + " failed with uncaught exception: " + t.toString() );
LOG.warn( "Task " + this.getClass().getSimpleName() + " failed with uncaught exception", t );
ret = false;
}
if ( ret ) {
......@@ -267,18 +313,8 @@ public abstract class AbstractTask implements Runnable
} else {
this.status.statusCode = StatusCode.TASK_ERROR;
}
}
/**
* This is called when a client requests the status of this task. In case you
* want to return complex structures like Lists, which are not thread safe, you
* might want to keep that list outside the status class you return, and only
* create a copy of it for the status class in this function.
* If you only return more or less atomic data, you don't need to override
* this function
*/
protected void updateStatus()
{
LOG.debug( "Finished task " + this.getClass().getSimpleName() + ": " + this.status.statusCode.toString() + " (" + this.id + ")" );
if ( this.finishCallback != null )
this.finishCallback.taskFinished();
}
}
package org.openslx.taskmanager.api;
public interface FinishCallback
{
/**
* Called by a task after execution finished or failed.
*/
public void taskFinished();
}
......@@ -14,12 +14,24 @@ public abstract class SystemCommandTask extends AbstractTask
private static final Logger log = Logger.getLogger( SystemCommandTask.class );
private String[] command = null;
private Process process = null;
protected int timeoutSeconds = 0;
@Override
protected final boolean execute()
{
try {
return execInternal();
} catch ( Exception e ) {
log.warn( "Unexpected exception when executing " + getId() + ": " + e.toString() );
processStdErrInternal( e.toString() );
return processEnded( -3 );
}
}
private final boolean execInternal()
{
command = initCommandLine();
if ( command == null || command.length == 0 ) {
......@@ -32,7 +44,13 @@ public abstract class SystemCommandTask extends AbstractTask
try {
// Create process
process = pb.start();
try {
process = pb.start();
} catch ( Exception e ) {
log.warn( "Process of task " + getId() + " died." );
processStdErrInternal( e.toString() );
return processEnded( -2 );
}
final Process p = process;
processStarted();
p.getOutputStream();
......@@ -47,10 +65,11 @@ public abstract class SystemCommandTask extends AbstractTask
String line;
while ( ( line = reader.readLine() ) != null ) {
synchronized ( p ) {
processStdOut( line );
processStdOutInternal( line );
}
}
} catch ( IOException e ) {
} catch ( Exception e ) {
e.printStackTrace();
}
}
} );
......@@ -64,10 +83,11 @@ public abstract class SystemCommandTask extends AbstractTask
String line;
while ( ( line = reader.readLine() ) != null ) {
synchronized ( p ) {
processStdErr( line );
processStdErrInternal( line );
}
}
} catch ( IOException e ) {
} catch ( Exception e ) {
e.printStackTrace();
}
}
} );
......@@ -76,36 +96,60 @@ public abstract class SystemCommandTask extends AbstractTask
stderr.start();
// Wait for everything
stdout.join();
stderr.join();
process.waitFor();
return processEnded( process.exitValue() );
int retval = 124; // Default to 124, which is what the timeout util does
if ( this.timeoutSeconds <= 0 ) {
retval = process.waitFor();
} else {
int togo = timeoutSeconds * 10;
while ( togo-- > 0 ) {
try {
retval = process.exitValue();
break;
} catch ( IllegalThreadStateException e1 ) {
// Still running....
try {
Thread.sleep( 100 );
} catch ( Exception e2 ) {
// Bummer....
}
}
}
}
try {
stdout.join( 500 );
stderr.join( 500 );
} catch ( Throwable t ) {
}
try {
process.getErrorStream().close();
} catch ( Throwable t ) {
}
try {
process.getOutputStream().close();
} catch ( Throwable t ) {
}
synchronized ( p ) {
return processEnded( retval );
}
} catch ( IOException e ) {
log.warn( "Process of task " + getId() + " died." );
processStdErr( e.toString() );
return processEnded( -2 );
} catch ( InterruptedException e ) {
processEnded( -4 );
Thread.currentThread().interrupt();
return false;
} catch ( Exception e ) {
log.warn( "Unexpected exception when executing " + getId() + ": " + e.toString() );
processStdErr( e.toString() );
return processEnded( -3 );
} finally {
if ( process != null )
process.destroy();
}
}
/**
* Write data to the process's stdin.
*
* @param data stuff to write
* @return success or failure mapped to a boolean in a really complicated way
*/
protected final boolean toStdIn(byte[] data)
protected final boolean toStdIn( byte[] data )
{
try {
process.getOutputStream().write( data );
......@@ -122,11 +166,29 @@ public abstract class SystemCommandTask extends AbstractTask
* @param text stuff to write
* @return success or failure mapped to a boolean in a really complicated way
*/
protected final boolean toStdIn(String text)
protected final boolean toStdIn( String text )
{
return toStdIn( text.getBytes( StandardCharsets.UTF_8 ) );
}
private final void processStdOutInternal( String line )
{
try {
processStdOut( line );
} catch ( Throwable t ) {
log.warn( "processStdOut failed", t );
}
}
private final void processStdErrInternal( String line )
{
try {
processStdErr( line );
} catch ( Throwable t ) {
log.warn( "processStdErr failed", t );
}
}
/**
* Called to get the command line. Each argument should be a separate array
* element. Returning null means the task should not run (as the arguments
......@@ -147,7 +209,7 @@ public abstract class SystemCommandTask extends AbstractTask
* Called when the process has finished running
*
* @param exitCode the process' exit code
* @return
* @return
*/
protected abstract boolean processEnded( int exitCode );
......
......@@ -26,11 +26,11 @@ public final class TaskStatus
/**
* Overall status of the task. Only set by base methods of the AbstractTask class.
*/
protected StatusCode statusCode;
protected volatile StatusCode statusCode;
/**
* Custom data a task might want to return on status requests.
*/
private Object data = null;
private volatile Object data = null;
@SuppressWarnings( "unused" )
private final String id;
......
......@@ -3,10 +3,13 @@ package org.openslx.taskmanager;
import java.io.File;
import java.io.IOException;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.List;
import org.apache.log4j.BasicConfigurator;
import org.openslx.taskmanager.main.Taskmanager;
import org.openslx.taskmanager.network.NetworkHandler;
import org.openslx.taskmanager.network.RequestParser;
import org.openslx.taskmanager.util.ClassLoaderHack;
/**
......@@ -16,7 +19,7 @@ import org.openslx.taskmanager.util.ClassLoaderHack;
public class App
{
public static void main( String[] args ) throws SocketException
public static void main( String[] args ) throws SocketException, InterruptedException
{
BasicConfigurator.configure();
// Load all task plugins
......@@ -38,9 +41,18 @@ public class App
}
}
Environment.load( "config/environment" );
NetworkHandler.init();
Taskmanager.run();
List<Thread> threads = new ArrayList<>();
Taskmanager tm = new Taskmanager();
RequestParser parser = new RequestParser( tm );
NetworkHandler nh = new NetworkHandler( Global.LISTEN_PORT, Global.LISTEN_ADDRESS, parser );
threads.add( new Thread( tm ) );
threads.add( new Thread( nh ) );
// Wait for everything
NetworkHandler.join();
for (Thread t : threads) {
t.start();
}
for (Thread t : threads) {
t.join();
}
}
}
......@@ -2,18 +2,17 @@ package org.openslx.taskmanager.main;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;
import org.openslx.taskmanager.Global;
import org.openslx.taskmanager.api.AbstractTask;
import org.openslx.taskmanager.api.FinishCallback;
import org.openslx.taskmanager.api.TaskStatus;
import org.openslx.taskmanager.util.ClassLoaderHack;
import org.openslx.taskmanager.util.Util;
......@@ -21,32 +20,29 @@ import org.openslx.taskmanager.util.Util;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
public class Taskmanager
public class Taskmanager implements FinishCallback, Runnable
{
private static final Logger log = Logger.getLogger( Taskmanager.class );
private static final ExecutorService threadPool = Executors.newCachedThreadPool();
private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 4, 16, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>( 16 ) );
/**
* Static gson object for (de)serialization
* gson object for (de)serialization
*/
private static final Gson gson = Util.explicitGsonInstance();
private final Gson gson = Util.explicitGsonInstance();
/**
* Cache of known tasks
*/
private static final Map<String, Class<? extends AbstractTask>> tasks = new ConcurrentHashMap<>();
private final Map<String, Class<? extends AbstractTask>> tasks = new ConcurrentHashMap<>();
/**
* All the running/finished task instances. The mainloop will call wait() on this and this object
* is notified as soon as the mainloop should check if there is any task available that can be
* run.
* All the running/finished task instances.
*/
private static final Map<String, AbstractTask> instances = new ConcurrentHashMap<>();
private final Map<String, AbstractTask> instances = new ConcurrentHashMap<>();
private static final Lock workLock = new ReentrantLock();
private static final Condition doCheckForWork = workLock.newCondition();
private final Semaphore doCheckForWork = new Semaphore( 0 );
/*
* Static methods
......@@ -61,7 +57,7 @@ public class Taskmanager
* @param taskId - ID of the task to retrieve the status of
* @return TaskStatus
*/
public static TaskStatus getTaskStatus( final String taskId )
public TaskStatus getTaskStatus( final String taskId )
{
AbstractTask task = instances.get( taskId );
if ( task == null )
......@@ -81,7 +77,7 @@ public class Taskmanager
* @return the TaskStatus returned by the newly created task, or a NO_SUCH_TASK TaskStatus if
* there is no task registered under the given name.
*/
public static TaskStatus submitTask( final String task, final String jsonData )
public TaskStatus submitTask( final String task, final String jsonData )
{
// Get task class
Class<? extends AbstractTask> taskClass;
......@@ -108,7 +104,7 @@ public class Taskmanager
log.warn( task + " exists, but could not be instanciated!" );
return TaskStatus.ts_noSuchConstructor;
}
if ( taskInstance.getId() == null ) {
if ( taskInstance.getId() == null || taskInstance.getId().isEmpty() ) {
log.warn( "Tried to launch " + task + " with null-id" );
return TaskStatus.ts_noSuchConstructor;
}
......@@ -123,13 +119,12 @@ public class Taskmanager
AbstractTask parent = null;
if ( taskInstance.getParentTaskId() != null )
parent = instances.get( taskInstance.getParentTaskId() );
if ( taskInstance.init( parent ) ) {
checkForWork();
}
taskInstance.init( parent, this );
checkForWork();
return taskInstance.getStatus();
}
public static void releaseTask( String taskId )
public void releaseTask( String taskId )
{
final AbstractTask task = instances.get( taskId );
if ( task != null )
......@@ -140,26 +135,24 @@ public class Taskmanager
* Wakes up the Taskmanager's mainloop so it will check if any of the current task instances
* is waiting for execution.
*/
protected static void checkForWork()
protected void checkForWork()
{
workLock.lock();
try {
doCheckForWork.signalAll();
} finally {
workLock.unlock();
}
doCheckForWork.release();
}
public static void run()
@Override
public void taskFinished()
{
checkForWork();
}
@Override
public void run()
{
try {
while ( !Global.doShutdown ) {
workLock.lock();
try {
doCheckForWork.await( 1, TimeUnit.MINUTES );
} finally {
workLock.unlock();
}
doCheckForWork.tryAcquire( 1, TimeUnit.MINUTES );
doCheckForWork.drainPermits();
try {
for ( Iterator<AbstractTask> it = instances.values().iterator(); it.hasNext(); ) {
AbstractTask task = it.next();
......@@ -169,8 +162,9 @@ public class Taskmanager
continue;
}
if ( task.canStart() ) {
threadPool.execute( task );
log.debug( "Started Task " + task.getClass().getSimpleName() + " (" + task.getId() + ")" );
threadPool.execute( task );
task.markAsStarting();
}
}
} catch ( RejectedExecutionException e ) {
......@@ -194,6 +188,7 @@ public class Taskmanager
} catch ( InterruptedException e ) {
log.info( "Interrupted!" );
}
System.exit( 0 );
}
}
......
......@@ -3,6 +3,7 @@ package org.openslx.taskmanager.network;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.charset.StandardCharsets;
......@@ -23,46 +24,33 @@ public class NetworkHandler implements Runnable
// Static part
private static Thread recvThread = null;
private static Thread sendThread = null;
private Thread sendThread = null;
/**
* Sender instance (Runnable handling outgoing packets)
*/
private static Sender sender = null;
private final Sender sender;
/**
* UDP socket for sending and receiving.
*/
private static DatagramSocket socket;
private final DatagramSocket socket;
private final RequestParser parser;
/**
* Initialize the NetworkHandler by starting threads and opening the socket.
*/
public static void init() throws SocketException
public NetworkHandler( int port, InetAddress listenAddress, RequestParser parser ) throws SocketException
{
if ( recvThread != null )
throw new RuntimeException( "Already initialized" );
socket = new DatagramSocket( Global.LISTEN_PORT, Global.LISTEN_ADDRESS );
recvThread = new Thread( new NetworkHandler() );
recvThread.start();
socket = new DatagramSocket( port, listenAddress );
sendThread = new Thread( sender = new Sender() );
sendThread.start();
this.parser = parser;
}
public static void shutdown()
public void shutdown()
{
socket.close();
}
public static void join()
{
try {
recvThread.join();
sendThread.join();
} catch ( InterruptedException e ) {
Thread.currentThread().interrupt();
}
}
// Class part
/**
......@@ -97,6 +85,7 @@ public class NetworkHandler implements Runnable
{
byte readBuffer[] = new byte[ 66000 ];
try {
sendThread.start();
while ( !Global.doShutdown ) {
DatagramPacket packet = new DatagramPacket( readBuffer, readBuffer.length );
try {
......@@ -112,11 +101,12 @@ public class NetworkHandler implements Runnable
}
String payload = new String( readBuffer, 0, packet.getLength(), StandardCharsets.UTF_8 );
try {
byte[] reply = RequestParser.handle( payload );
byte[] reply = parser.handle( payload );
if ( reply != null )
send( packet.getSocketAddress(), reply );
} catch ( Throwable t ) {
log.error( "Exception in RequestParser: " + t.getMessage() );
log.error( "Exception in RequestParser: " + t.toString() );
log.error( "Payload was: " + payload );
t.printStackTrace();
}
}
......@@ -124,6 +114,7 @@ public class NetworkHandler implements Runnable
Thread.currentThread().interrupt();
} finally {
Global.doShutdown = true;
sendThread.interrupt();
log.info( "UDP receiver finished." );
}
}
......@@ -132,7 +123,7 @@ public class NetworkHandler implements Runnable
* Private sending thread.
* Use blocking queue, wait for packet to be added to it, then try to send.
*/
static class Sender implements Runnable
private class Sender implements Runnable
{
/**
......
......@@ -15,15 +15,21 @@ public class RequestParser
/**
* Our very own gson instance (for serializing replies)
*/
private static final Gson sendGson = new Gson();
private final Gson sendGson = new Gson();
private final Taskmanager taskManager;
public RequestParser( Taskmanager tm )
{
this.taskManager = tm;
}
/**
* Handle the given unparsed request.
*
* @param source source of the request, where the reply will be send to (if any)
* @param payload Packet data received from network, already converted to a string
*/
public static byte[] handle( String payload )
public byte[] handle( String payload )
{
String[] parts = payload.split( " *, *", 3 );
// Message format is "<message id>, <command>, <command payload/argument>"
......@@ -34,21 +40,21 @@ public class RequestParser
// Look at parts[1], if it's "status" it's a request for the task
// with the ID given in parts[2]
if ( parts[1].equals( "status" ) ) {
TaskStatus status = Taskmanager.getTaskStatus( parts[2] );
TaskStatus status = taskManager.getTaskStatus( parts[2] );
return serialize( parts[0], status );
}
// Now check if parts[1] is "release"
if ( parts[1].equals( "release" ) ) {
Taskmanager.releaseTask( parts[2] );
taskManager.releaseTask( parts[2] );
return null;
}
// Anything else in parts[0] will be treated as a fresh task invocation, so let's
// pass it on to the task manager.
TaskStatus status = Taskmanager.submitTask( parts[1], parts[2] );
TaskStatus status = taskManager.submitTask( parts[1], parts[2] );
return serialize( parts[0], status );
}
private static byte[] serialize( String messageId, TaskStatus status )
private byte[] serialize( String messageId, TaskStatus status )
{
String data;
try {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment