Commit fd3c703e authored by Simon Rettberg's avatar Simon Rettberg
Browse files

Make taskmanager an instance, add callback for finished jobs, improve parent dependency handling

2 merge requests!3Develop,!2Merge with germany
Showing with 128 additions and 81 deletions
+128 -81
......@@ -21,19 +21,19 @@ public abstract class AbstractTask implements Runnable
* The id of the task instance.
*/
@Expose
private String id = null;
private volatile String id = null;
/**
* Parent task. This task won't be started as long as the parent task currently
* waiting for execution or is being executed. Otherwise this task is available for execution.
* Note that MAX_INSTANCES is still being taken into account. Set to null to ignore.
*/
@Expose
private String parentTask = null;
private volatile String parentTask = null;
/**
* If the parent task failed to execute, don't run this task and fail immediately.
*/
@Expose
private boolean failOnParentFail = true;
private volatile boolean failOnParentFail = true;
/*
* Variables we're working with - these should never be set from incoming (json) data
......@@ -57,11 +57,16 @@ public abstract class AbstractTask implements Runnable
/**
* Status of Task
*/
private TaskStatus status = TaskStatus.ts_waiting;
private volatile TaskStatus status = TaskStatus.ts_waiting;
/**
* Reference to parent task
*/
private AbstractTask parent = null;
private volatile AbstractTask parent = null;
/**
* Reference to "owner" so we can tell it to check for more
* work when we're done.
*/
private volatile FinishCallback finishCallback = null;
/**
* Default constructor which should not be overridden
......@@ -79,12 +84,24 @@ public abstract class AbstractTask implements Runnable
* Initialize the task; method used by the {@link Taskmanager}.
* Put your own initialization code in initTask()
*/
public final boolean init( AbstractTask parent )
public final boolean init( AbstractTask parent, FinishCallback finishCallback )
{
if ( this.initDone ) {
LOG.fatal( "init() called twice on " + this.getClass().getSimpleName() );
System.exit( 1 );
}
if ( this.parentTask != null && this.parentTask.isEmpty() )
this.parentTask = null;
if ( this.parentTask != null && parent == null ) {
if ( this.failOnParentFail ) {
this.status = new TaskStatus( StatusCode.PARENT_FAILED, this.id );
return false;
}
this.parentTask = null;
}
if ( this.parentTask == null && parent != null )
parent = null;
this.finishCallback = finishCallback;
this.parent = parent;
this.status = new TaskStatus( StatusCode.TASK_WAITING, this.id );
this.initDone = true;
......@@ -116,6 +133,19 @@ public abstract class AbstractTask implements Runnable
*/
protected abstract boolean execute();
/**
* This is called when a client requests the status of this task. In case you
* want to return complex structures like Lists, which are not thread safe, you
* might want to keep that list outside the status class you return, and only
* create a copy of it for the status class in this function.
* If you only return more or less atomic data, you don't need to override
* this function
*/
protected void updateStatus()
{
}
/*
* Final methods
*/
......@@ -126,7 +156,7 @@ public abstract class AbstractTask implements Runnable
* @return id of parent task, null if no parent set
*
*/
public String getParentTaskId()
public final String getParentTaskId()
{
return this.parentTask;
}
......@@ -151,7 +181,7 @@ public abstract class AbstractTask implements Runnable
*/
public final TaskStatus getStatus()
{
if ( this.initDone && this.parentTask != null ) {
if ( this.initDone && this.parent != null ) {
final StatusCode parentStatus = parent.getStatusCode();
switch ( parentStatus ) {
case DUPLICATE_ID:
......@@ -160,8 +190,10 @@ public abstract class AbstractTask implements Runnable
case NO_SUCH_CONSTRUCTOR:
case PARENT_FAILED:
case TASK_ERROR:
if ( this.failOnParentFail )
if ( this.failOnParentFail ) {
this.status.statusCode = StatusCode.PARENT_FAILED;
LOG.debug( "Parent " + this.parentTask + " of " + this.id + " failed." );
}
this.parentTask = null;
break;
default:
......@@ -267,18 +299,7 @@ public abstract class AbstractTask implements Runnable
} else {
this.status.statusCode = StatusCode.TASK_ERROR;
}
}
/**
* This is called when a client requests the status of this task. In case you
* want to return complex structures like Lists, which are not thread safe, you
* might want to keep that list outside the status class you return, and only
* create a copy of it for the status class in this function.
* If you only return more or less atomic data, you don't need to override
* this function
*/
protected void updateStatus()
{
if ( this.finishCallback != null )
this.finishCallback.taskFinished();
}
}
package org.openslx.taskmanager.api;
public interface FinishCallback
{
/**
* Called by a task after execution finished or failed.
*/
public void taskFinished();
}
......@@ -26,11 +26,11 @@ public final class TaskStatus
/**
* Overall status of the task. Only set by base methods of the AbstractTask class.
*/
protected StatusCode statusCode;
protected volatile StatusCode statusCode;
/**
* Custom data a task might want to return on status requests.
*/
private Object data = null;
private volatile Object data = null;
@SuppressWarnings( "unused" )
private final String id;
......
......@@ -3,12 +3,13 @@ package org.openslx.taskmanager;
import java.io.File;
import java.io.IOException;
import java.net.SocketException;
import junit.runner.ClassPathTestCollector;
import java.util.ArrayList;
import java.util.List;
import org.apache.log4j.BasicConfigurator;
import org.openslx.taskmanager.main.Taskmanager;
import org.openslx.taskmanager.network.NetworkHandler;
import org.openslx.taskmanager.network.RequestParser;
import org.openslx.taskmanager.util.ClassLoaderHack;
/**
......@@ -18,7 +19,7 @@ import org.openslx.taskmanager.util.ClassLoaderHack;
public class App
{
public static void main( String[] args ) throws SocketException
public static void main( String[] args ) throws SocketException, InterruptedException
{
BasicConfigurator.configure();
// Load all task plugins
......@@ -40,9 +41,18 @@ public class App
}
}
Environment.load( "config/environment" );
NetworkHandler.init();
Taskmanager.run();
List<Thread> threads = new ArrayList<>();
Taskmanager tm = new Taskmanager();
RequestParser parser = new RequestParser( tm );
NetworkHandler nh = new NetworkHandler( Global.LISTEN_PORT, Global.LISTEN_ADDRESS, parser );
threads.add( new Thread( tm ) );
threads.add( new Thread( nh ) );
// Wait for everything
NetworkHandler.join();
for (Thread t : threads) {
t.start();
}
for (Thread t : threads) {
t.join();
}
}
}
......@@ -2,10 +2,10 @@ package org.openslx.taskmanager.main;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
......@@ -14,6 +14,7 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;
import org.openslx.taskmanager.Global;
import org.openslx.taskmanager.api.AbstractTask;
import org.openslx.taskmanager.api.FinishCallback;
import org.openslx.taskmanager.api.TaskStatus;
import org.openslx.taskmanager.util.ClassLoaderHack;
import org.openslx.taskmanager.util.Util;
......@@ -21,32 +22,32 @@ import org.openslx.taskmanager.util.Util;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
public class Taskmanager
public class Taskmanager implements FinishCallback, Runnable
{
private static final Logger log = Logger.getLogger( Taskmanager.class );
private static final ExecutorService threadPool = Executors.newCachedThreadPool();
private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor( 4, 16, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>( 16 ) );
/**
* Static gson object for (de)serialization
* gson object for (de)serialization
*/
private static final Gson gson = Util.explicitGsonInstance();
private final Gson gson = Util.explicitGsonInstance();
/**
* Cache of known tasks
*/
private static final Map<String, Class<? extends AbstractTask>> tasks = new ConcurrentHashMap<>();
private final Map<String, Class<? extends AbstractTask>> tasks = new ConcurrentHashMap<>();
/**
* All the running/finished task instances. The mainloop will call wait() on this and this object
* is notified as soon as the mainloop should check if there is any task available that can be
* run.
*/
private static final Map<String, AbstractTask> instances = new ConcurrentHashMap<>();
private final Map<String, AbstractTask> instances = new ConcurrentHashMap<>();
private static final Lock workLock = new ReentrantLock();
private static final Condition doCheckForWork = workLock.newCondition();
private final Lock workLock = new ReentrantLock();
private final Condition doCheckForWork = workLock.newCondition();
/*
* Static methods
......@@ -61,7 +62,7 @@ public class Taskmanager
* @param taskId - ID of the task to retrieve the status of
* @return TaskStatus
*/
public static TaskStatus getTaskStatus( final String taskId )
public TaskStatus getTaskStatus( final String taskId )
{
AbstractTask task = instances.get( taskId );
if ( task == null )
......@@ -81,7 +82,7 @@ public class Taskmanager
* @return the TaskStatus returned by the newly created task, or a NO_SUCH_TASK TaskStatus if
* there is no task registered under the given name.
*/
public static TaskStatus submitTask( final String task, final String jsonData )
public TaskStatus submitTask( final String task, final String jsonData )
{
// Get task class
Class<? extends AbstractTask> taskClass;
......@@ -108,7 +109,7 @@ public class Taskmanager
log.warn( task + " exists, but could not be instanciated!" );
return TaskStatus.ts_noSuchConstructor;
}
if ( taskInstance.getId() == null ) {
if ( taskInstance.getId() == null || taskInstance.getId().isEmpty() ) {
log.warn( "Tried to launch " + task + " with null-id" );
return TaskStatus.ts_noSuchConstructor;
}
......@@ -123,13 +124,12 @@ public class Taskmanager
AbstractTask parent = null;
if ( taskInstance.getParentTaskId() != null )
parent = instances.get( taskInstance.getParentTaskId() );
if ( taskInstance.init( parent ) ) {
checkForWork();
}
taskInstance.init( parent, this );
checkForWork();
return taskInstance.getStatus();
}
public static void releaseTask( String taskId )
public void releaseTask( String taskId )
{
final AbstractTask task = instances.get( taskId );
if ( task != null )
......@@ -140,7 +140,7 @@ public class Taskmanager
* Wakes up the Taskmanager's mainloop so it will check if any of the current task instances
* is waiting for execution.
*/
protected static void checkForWork()
protected void checkForWork()
{
workLock.lock();
try {
......@@ -150,7 +150,14 @@ public class Taskmanager
}
}
public static void run()
@Override
public void taskFinished()
{
checkForWork();
}
@Override
public void run()
{
try {
while ( !Global.doShutdown ) {
......@@ -194,6 +201,7 @@ public class Taskmanager
} catch ( InterruptedException e ) {
log.info( "Interrupted!" );
}
System.exit( 0 );
}
}
......
......@@ -3,6 +3,7 @@ package org.openslx.taskmanager.network;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.charset.StandardCharsets;
......@@ -23,46 +24,33 @@ public class NetworkHandler implements Runnable
// Static part
private static Thread recvThread = null;
private static Thread sendThread = null;
private Thread sendThread = null;
/**
* Sender instance (Runnable handling outgoing packets)
*/
private static Sender sender = null;
private final Sender sender;
/**
* UDP socket for sending and receiving.
*/
private static DatagramSocket socket;
private final DatagramSocket socket;
private final RequestParser parser;
/**
* Initialize the NetworkHandler by starting threads and opening the socket.
*/
public static void init() throws SocketException
public NetworkHandler( int port, InetAddress listenAddress, RequestParser parser ) throws SocketException
{
if ( recvThread != null )
throw new RuntimeException( "Already initialized" );
socket = new DatagramSocket( Global.LISTEN_PORT, Global.LISTEN_ADDRESS );
recvThread = new Thread( new NetworkHandler() );
recvThread.start();
socket = new DatagramSocket( port, listenAddress );
sendThread = new Thread( sender = new Sender() );
sendThread.start();
this.parser = parser;
}
public static void shutdown()
public void shutdown()
{
socket.close();
}
public static void join()
{
try {
recvThread.join();
sendThread.join();
} catch ( InterruptedException e ) {
Thread.currentThread().interrupt();
}
}
// Class part
/**
......@@ -97,6 +85,7 @@ public class NetworkHandler implements Runnable
{
byte readBuffer[] = new byte[ 66000 ];
try {
sendThread.start();
while ( !Global.doShutdown ) {
DatagramPacket packet = new DatagramPacket( readBuffer, readBuffer.length );
try {
......@@ -112,11 +101,12 @@ public class NetworkHandler implements Runnable
}
String payload = new String( readBuffer, 0, packet.getLength(), StandardCharsets.UTF_8 );
try {
byte[] reply = RequestParser.handle( payload );
byte[] reply = parser.handle( payload );
if ( reply != null )
send( packet.getSocketAddress(), reply );
} catch ( Throwable t ) {
log.error( "Exception in RequestParser: " + t.getMessage() );
log.error( "Exception in RequestParser: " + t.toString() );
log.error( "Payload was: " + payload );
t.printStackTrace();
}
}
......@@ -124,6 +114,7 @@ public class NetworkHandler implements Runnable
Thread.currentThread().interrupt();
} finally {
Global.doShutdown = true;
sendThread.interrupt();
log.info( "UDP receiver finished." );
}
}
......@@ -132,7 +123,7 @@ public class NetworkHandler implements Runnable
* Private sending thread.
* Use blocking queue, wait for packet to be added to it, then try to send.
*/
static class Sender implements Runnable
private class Sender implements Runnable
{
/**
......
......@@ -15,15 +15,21 @@ public class RequestParser
/**
* Our very own gson instance (for serializing replies)
*/
private static final Gson sendGson = new Gson();
private final Gson sendGson = new Gson();
private final Taskmanager taskManager;
public RequestParser( Taskmanager tm )
{
this.taskManager = tm;
}
/**
* Handle the given unparsed request.
*
* @param source source of the request, where the reply will be send to (if any)
* @param payload Packet data received from network, already converted to a string
*/
public static byte[] handle( String payload )
public byte[] handle( String payload )
{
String[] parts = payload.split( " *, *", 3 );
// Message format is "<message id>, <command>, <command payload/argument>"
......@@ -34,21 +40,21 @@ public class RequestParser
// Look at parts[1], if it's "status" it's a request for the task
// with the ID given in parts[2]
if ( parts[1].equals( "status" ) ) {
TaskStatus status = Taskmanager.getTaskStatus( parts[2] );
TaskStatus status = taskManager.getTaskStatus( parts[2] );
return serialize( parts[0], status );
}
// Now check if parts[1] is "release"
if ( parts[1].equals( "release" ) ) {
Taskmanager.releaseTask( parts[2] );
taskManager.releaseTask( parts[2] );
return null;
}
// Anything else in parts[0] will be treated as a fresh task invocation, so let's
// pass it on to the task manager.
TaskStatus status = Taskmanager.submitTask( parts[1], parts[2] );
TaskStatus status = taskManager.submitTask( parts[1], parts[2] );
return serialize( parts[0], status );
}
private static byte[] serialize( String messageId, TaskStatus status )
private byte[] serialize( String messageId, TaskStatus status )
{
String data;
try {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment