Commit efb5ad9f authored by Simon Rettberg's avatar Simon Rettberg
Browse files

Initial commit

parents
No related merge requests found
Showing with 1482 additions and 0 deletions
+1482 -0
.project
.settings/
.classpath
*.swp
*~
*.tmp
*.class
target/
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.openslx.taskmanager</groupId>
<artifactId>taskmanager-api</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>taskmanager-api</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.2.4</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>
package org.openslx.taskmanager.api;
import java.util.UUID;
import org.apache.log4j.Logger;
import org.openslx.taskmanager.api.TaskStatus.StatusCode;
import com.google.gson.annotations.Expose;
public abstract class AbstractTask implements Runnable
{
private static final long RELEASE_DELAY = 5l * 60l * 1000l;
private static final Logger LOG = Logger.getLogger( AbstractTask.class );
/*
* To be set from task invocation (json data)
*/
/**
* The id of the task instance.
*/
@Expose
private String id = null;
/**
* Parent task. This task won't be started as long as the parent task currently
* waiting for execution or is being executed. Otherwise this task is available for execution.
* Note that MAX_INSTANCES is still being taken into account. Set to null to ignore.
*/
@Expose
private String parentTask = null;
/**
* If the parent task failed to execute, don't run this task and fail immediately.
*/
@Expose
private boolean failOnParentFail = true;
/*
* Variables we're working with - these should never be set from incoming (json) data
*/
/**
* Maximum age of a task, if it's not freed explicitly.
*/
private static final long MAX_TASK_AGE = 24l * 3600l * 1000l;
/**
* timeMillis when this task will be removed. This will be set automatically to something
* reasonable like 24 hours, once the job has finished. This is to prevent clogging the task
* manager with finished jobs over time. Note that you can explicitly remove a job using the
* "release" command.
*/
private volatile long removalDeadline = System.currentTimeMillis() + MAX_TASK_AGE;
/**
* True if init() has been called. Task will not start if this is false.
*/
private volatile boolean initDone = false;
/**
* Status of Task
*/
private TaskStatus status = TaskStatus.ts_waiting;
/**
* Reference to parent task
*/
private AbstractTask parent = null;
/**
* Default constructor which should not be overridden
*/
public AbstractTask()
{
this.id = UUID.randomUUID().toString();
}
/*
* Overridable methods
*/
/**
* Initialize the task; method used by the {@link Taskmanager}.
* Put your own initialization code in initTask()
*/
public final boolean init( AbstractTask parent )
{
if ( this.initDone ) {
LOG.fatal( "init() called twice on " + this.getClass().getSimpleName() );
System.exit( 1 );
}
this.parent = parent;
this.status = new TaskStatus( StatusCode.TASK_WAITING, this.id );
this.initDone = true;
boolean ret;
try {
ret = this.initTask();
} catch ( Throwable t ) {
ret = false;
}
if ( !ret ) {
this.status.statusCode = StatusCode.TASK_ERROR;
}
return ret;
}
/**
* Your own initialization code. This is run synchronously within the network
* handling thread, so do NOT put anything here that might take longer than a few milliseconds!
* You should usually only validate the input to the task here, and return false if it is
* invalid.
*
* @return - true if the task should be executed by the scheduler (ie init was successful)
* - false if init was not successful, or you don't need to do any more processing
*/
protected abstract boolean initTask();
/**
* This is where you put your huge work stuff that takes ages.
*/
protected abstract boolean execute();
/*
* Final methods
*/
/**
* Get id of parent task.
*
* @return id of parent task, null if no parent set
*
*/
public String getParentTaskId()
{
return this.parentTask;
}
/**
* Set the custom status data object to be returned on status requests.
* For simple tasks it should be sufficient to set this once before executing
* the task starts, and then just update fields in that class while the
* task is running. For cases where you have complicated data structures or
* multiple values that need to stay in sync you could create a new instance
* every time, fill it with values, and then call this method.
*
* @param obj the object containing you specific task data
*/
protected final void setStatusObject( Object obj )
{
status.setStatusObject( obj );
}
/**
* Get current status of task.
*/
public final TaskStatus getStatus()
{
if ( this.initDone && this.parentTask != null ) {
final StatusCode parentStatus = parent.getStatusCode();
switch ( parentStatus ) {
case DUPLICATE_ID:
case NO_SUCH_INSTANCE:
case NO_SUCH_TASK:
case NO_SUCH_CONSTRUCTOR:
case PARENT_FAILED:
case TASK_ERROR:
if ( this.failOnParentFail )
this.status.statusCode = StatusCode.PARENT_FAILED;
this.parentTask = null;
break;
default:
break;
}
}
return this.status;
}
/**
* Get status code if task.
*
* @return
*/
public final StatusCode getStatusCode()
{
return getStatus().getStatusCode();
}
/**
* Get id of task
*/
public final String getId()
{
return this.id;
}
/**
* Getter for failOnParentTask
*/
public final boolean getFailOnParentFail()
{
return this.failOnParentFail;
}
/**
* Release the task: If the task is still pending/running, it will be removed from the task
* manager as soon as it finishes. So you can't query for its result later. If the task has
* already finished (either successful or failed), it will be removed (almost) immediately.
*/
public final void release()
{
this.removalDeadline = Math.min( this.removalDeadline, System.currentTimeMillis() + RELEASE_DELAY );
}
/**
* Can this task be started? This checks all the conditions:
* - Has this task been initialized yet?
* - Is it actually waiting for execution?
* - Does it depend on a parent task?
* -- If so, did the parent finish?
* -- Or did it fail?
* --- If so, should this task only run if it didn't fail?
*
* @return true iff this task can be started
*/
public final boolean canStart()
{
if ( !this.initDone || this.id == null || this.getStatus().getStatusCode() != StatusCode.TASK_WAITING ) {
return false;
}
if ( this.parent == null )
return true;
return parent.getStatusCode() != StatusCode.TASK_WAITING && parent.getStatusCode() != StatusCode.TASK_PROCESSING;
}
/**
* Checks whether this task can be removed from the task manager.
* A task can be removed if it is not executing or waiting for execution, and
* its removal deadline has been reached.
*
* @return true if it can be released
*/
public final boolean canBeReleased()
{
if ( this.status.statusCode == StatusCode.TASK_WAITING || this.status.statusCode == StatusCode.TASK_PROCESSING )
return false;
return this.removalDeadline != 0 && System.currentTimeMillis() > this.removalDeadline;
}
/**
* Execute the task, wrapped in some sanity checks.
*/
@Override
public final void run()
{
synchronized ( this ) {
if ( this.status.statusCode != StatusCode.TASK_WAITING )
throw new RuntimeException( "Tried to launch task " + this.getClass().getSimpleName() + " twice!" );
if ( !this.initDone )
throw new RuntimeException( "Tried to launch " + this.getClass().getSimpleName() + " without initializing it!" );
this.status.statusCode = StatusCode.TASK_PROCESSING;
}
boolean ret;
try {
ret = execute();
} catch ( Throwable t ) {
LOG.warn( "Task " + this.getClass().getSimpleName() + " failed with uncaught exception: " + t.toString() );
ret = false;
}
if ( ret ) {
this.status.statusCode = StatusCode.TASK_FINISHED;
} else {
this.status.statusCode = StatusCode.TASK_ERROR;
}
}
/**
* This is called when a client requests the status of this task. In case you
* want to return complex structures like Lists, which are not thread safe, you
* might want to keep that list outside the status class you return, and only
* create a copy of it for the status class in this function.
* If you only return more or less atomic data, you don't need to override
* this function
*/
protected void updateStatus()
{
}
}
package org.openslx.taskmanager.api;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import org.apache.log4j.Logger;
public abstract class SystemCommandTask extends AbstractTask
{
private static final Logger log = Logger.getLogger( SystemCommandTask.class );
private String[] command = null;
private Process process = null;
@Override
protected final boolean execute()
{
command = initCommandLine();
if ( command == null || command.length == 0 ) {
return processEnded( -1 );
}
ProcessBuilder pb = new ProcessBuilder( command );
pb.directory( new File( "/" ) );
try {
// Create process
process = pb.start();
final Process p = process;
processStarted();
p.getOutputStream();
// Read its stdout
Thread stdout = new Thread( new Runnable() {
@Override
public void run()
{
try {
BufferedReader reader = new BufferedReader( new InputStreamReader( p.getInputStream() ) );
String line;
while ( ( line = reader.readLine() ) != null ) {
synchronized ( p ) {
processStdOut( line );
}
}
} catch ( IOException e ) {
}
}
} );
// Read its stderr
Thread stderr = new Thread( new Runnable() {
@Override
public void run()
{
try {
BufferedReader reader = new BufferedReader( new InputStreamReader( p.getErrorStream() ) );
String line;
while ( ( line = reader.readLine() ) != null ) {
synchronized ( p ) {
processStdErr( line );
}
}
} catch ( IOException e ) {
}
}
} );
stdout.start();
stderr.start();
// Wait for everything
stdout.join();
stderr.join();
process.waitFor();
return processEnded( process.exitValue() );
} catch ( IOException e ) {
log.warn( "Process of task " + getId() + " died." );
processStdErr( e.toString() );
return processEnded( -2 );
} catch ( InterruptedException e ) {
Thread.currentThread().interrupt();
return false;
} catch ( Exception e ) {
log.warn( "Unexpected exception when executing " + getId() + ": " + e.toString() );
processStdErr( e.toString() );
return processEnded( -3 );
} finally {
if ( process != null )
process.destroy();
}
}
/**
* Write data to the process's stdin.
*
* @param data stuff to write
* @return success or failure mapped to a boolean in a really complicated way
*/
protected final boolean toStdIn(byte[] data)
{
try {
process.getOutputStream().write( data );
} catch ( IOException e ) {
e.printStackTrace();
return false;
}
return true;
}
/**
* Write text to the process's stdin.
*
* @param text stuff to write
* @return success or failure mapped to a boolean in a really complicated way
*/
protected final boolean toStdIn(String text)
{
return toStdIn( text.getBytes( StandardCharsets.UTF_8 ) );
}
/**
* Called to get the command line. Each argument should be a separate array
* element. Returning null means the task should not run (as the arguments
* were probably faulty).
*
* @return List of arguments. First element is the command itself.
*/
protected abstract String[] initCommandLine();
/**
* Called when the process has been successfully started.
*/
protected void processStarted()
{
}
/**
* Called when the process has finished running
*
* @param exitCode the process' exit code
* @return
*/
protected abstract boolean processEnded( int exitCode );
/**
* Called when a line has been read from the process' stdout.
*
* @param line The line read from the process, without any newline characters
*/
protected abstract void processStdOut( String line );
/**
* Called when a line has been read from the process' stderr.
* Trailing newline is removed.
*
* @param line The line read from the process, without any newline characters
*/
protected abstract void processStdErr( String line );
}
package org.openslx.taskmanager.api;
/**
* This is what is returned on a status request.
* To return custom data for your task, call {@link AbstractTask#setStatusObject(Object)} from your
* Task.
* This class is serialized entirely, not using the Exposed annotation.
*/
public final class TaskStatus
{
public enum StatusCode
{
TASK_WAITING,
TASK_PROCESSING,
TASK_FINISHED,
TASK_ERROR,
NO_SUCH_INSTANCE,
NO_SUCH_TASK,
NO_SUCH_CONSTRUCTOR,
DUPLICATE_ID,
PARENT_FAILED,
JSON_ERROR
}
/**
* Overall status of the task. Only set by base methods of the AbstractTask class.
*/
protected StatusCode statusCode;
/**
* Custom data a task might want to return on status requests.
*/
private Object data = null;
@SuppressWarnings( "unused" )
private final String id;
/*
* Static members
*/
/**
* Create a single "duplicate id" status we return if trying to launch a task with an id already
* in use.
*/
public static final TaskStatus ts_duplicateId = new TaskStatus( StatusCode.DUPLICATE_ID );
/**
* Create a single "no such constructor" status we return if a task could be found, but not
* instantiated.
*/
public static final TaskStatus ts_noSuchConstructor = new TaskStatus( StatusCode.NO_SUCH_CONSTRUCTOR );
/**
* Create a single "no such task" status we return if a task should be invoked that
* doesn't actually exist.
*/
public static final TaskStatus ts_noSuchTask = new TaskStatus( StatusCode.NO_SUCH_TASK );
/**
* Create a single "no such task" status we return if an action on a task instance is requested
* that doesn't actually exist.
*/
public static final TaskStatus ts_noSuchInstance = new TaskStatus( StatusCode.NO_SUCH_INSTANCE );
/**
* Create a single "parent failed" status we return as status for a task which depends on another
* task, and that other task failed to execute, or failed during execution.
*/
public static final TaskStatus ts_parentFailed = new TaskStatus( StatusCode.PARENT_FAILED );
/**
* Create a single "task waiting" status we return as status for a task that is waiting for
* execution.
*/
public static final TaskStatus ts_waiting = new TaskStatus( StatusCode.TASK_WAITING );
/**
* Create a single "task error" status we can use everywhere.
*/
public static final TaskStatus ts_error = new TaskStatus( StatusCode.TASK_ERROR );
/**
* Create a single "json error" status we can use everywhere.
*/
public static final TaskStatus ts_jsonError = new TaskStatus( StatusCode.JSON_ERROR );
/**
* Create new TaskStatus with given initial status code
* and id.
*
* @param status The status code to initialize the TaskStatus with
* @param id id of task this status belongs to
*/
public TaskStatus( final StatusCode status, final String id )
{
this.statusCode = status;
this.id = id;
}
/**
* Create new TaskStatus with given initial status code.
*
* @param status The status code to initialize the TaskStatus with
*/
public TaskStatus( final StatusCode status )
{
this( status, null );
}
/**
* Get the status code of this TaskStatus
*
* @return
*/
public final StatusCode getStatusCode()
{
return this.statusCode;
}
/**
* Set the custom status data.
*
* @param obj custom status object
*/
protected void setStatusObject( Object obj )
{
this.data = obj;
}
public String getStatusObjectClassName()
{
if ( this.data == null ) return "(null)";
return this.data.getClass().getSimpleName();
}
}
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.openslx.taskmanager</groupId>
<artifactId>taskmanager-daemon</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>taskmanager-daemon</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<archive>
<manifest>
<mainClass>org.openslx.taskmanager.App</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.openslx.taskmanager</groupId>
<artifactId>taskmanager-api</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>
package org.openslx.taskmanager;
import java.io.File;
import java.io.IOException;
import java.net.SocketException;
import junit.runner.ClassPathTestCollector;
import org.apache.log4j.BasicConfigurator;
import org.openslx.taskmanager.main.Taskmanager;
import org.openslx.taskmanager.network.NetworkHandler;
import org.openslx.taskmanager.util.ClassLoaderHack;
/**
* Hello world!
*
*/
public class App
{
public static void main( String[] args ) throws SocketException
{
// Load all task plugins
File folder = new File( "./plugins" );
if ( !folder.exists() ) {
System.out.println( "No plugin folder found - nothing to do." );
System.exit( 1 );
}
for ( File file : folder.listFiles() ) {
if ( !file.isFile() || !file.toString().endsWith( ".jar" ) )
continue;
try {
ClassLoaderHack.addFile( file );
} catch ( IOException e ) {
e.printStackTrace();
System.out.println( "Could not add plugin: " + file.toString() );
System.exit( 1 );
}
}
BasicConfigurator.configure();
Environment.load( "config/environment" );
NetworkHandler.init();
Taskmanager.run();
// Wait for everything
NetworkHandler.join();
}
}
package org.openslx.taskmanager;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.log4j.Logger;
/**
* Holds the environment that tasks running a system command *should*
* use. The environment is read from a config file.
*/
public class Environment
{
private static final Logger log = Logger.getLogger( Environment.class );
private static Map<String, String> env = new LinkedHashMap<>();
public static boolean load( String fileName )
{
try {
FileReader fileReader = new FileReader( fileName );
BufferedReader bufferedReader = new BufferedReader( fileReader );
Map<String, String> env = new LinkedHashMap<>();
String line = null;
while ( ( line = bufferedReader.readLine() ) != null ) {
if ( !line.matches( "^[a-zA-Z0-9_]+=" ) )
continue;
String[] part = line.split( "=", 2 );
env.put( part[0], part[1] );
}
bufferedReader.close();
Environment.env = env;
log.info( "Loaded " + env.size() + " environment lines." );
} catch ( IOException e ) {
log.info( "Could not load environment definition from " + fileName + ". Processes might use the same environment as this thread." );
return false;
}
return true;
}
public static void set( Map<String, String> environment )
{
environment.clear();
environment.putAll( env );
}
public static String[] get()
{
// Get reference to env so it doesn't change while in this function (load() from other thread)
Map<String, String> env = Environment.env;
String ret[] = new String[ env.size() ];
int i = 0;
for ( Entry<String, String> it : env.entrySet() ) {
ret[i++] = it.getKey() + "=" + it.getValue();
}
return ret;
}
}
package org.openslx.taskmanager;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.UnknownHostException;
public class Global
{
public static final int LISTEN_PORT = 9215;
public static final String TASK_PACKAGE_NAME = "org.openslx.taskmanager.tasks";
public static final long MAX_TASK_AGE = 24l * 3600l * 1000l;
public static final InetAddress LISTEN_ADDRESS;
public static volatile boolean doShutdown = false;
static
{
InetAddress la;
try {
la = Inet4Address.getByName( "127.0.0.1" );
} catch ( UnknownHostException e ) {
la = null;
e.printStackTrace();
}
LISTEN_ADDRESS = la;
}
}
package org.openslx.taskmanager.main;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.log4j.Logger;
import org.openslx.taskmanager.Global;
import org.openslx.taskmanager.api.AbstractTask;
import org.openslx.taskmanager.api.TaskStatus;
import org.openslx.taskmanager.util.ClassLoaderHack;
import org.openslx.taskmanager.util.Util;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
public class Taskmanager
{
private static final Logger log = Logger.getLogger( Taskmanager.class );
private static final ExecutorService threadPool = Executors.newCachedThreadPool();
/**
* Static gson object for (de)serialization
*/
private static final Gson gson = Util.explicitGsonInstance();
/**
* Cache of known tasks
*/
private static final Map<String, Class<? extends AbstractTask>> tasks = new ConcurrentHashMap<>();
/**
* All the running/finished task instances. The mainloop will call wait() on this and this object
* is notified as soon as the mainloop should check if there is any task available that can be
* run.
*/
private static final Map<String, AbstractTask> instances = new ConcurrentHashMap<>();
private static final Lock workLock = new ReentrantLock();
private static final Condition doCheckForWork = workLock.newCondition();
/*
* Static methods
*/
/**
* Return the status of the task with the given ID. If the task does not
* exist, a pseudo-status instance is returned, with a status code of
* NO_SUCH_TASK. This means it is guaranteed that this function never returns
* null.
*
* @param taskId - ID of the task to retrieve the status of
* @return TaskStatus
*/
public static TaskStatus getTaskStatus( final String taskId )
{
AbstractTask task = instances.get( taskId );
if ( task == null )
return TaskStatus.ts_noSuchInstance;
return task.getStatus();
}
/**
* Run the requested method as a new task. The json data may contain an explicit id for that
* task, otherwise a random id is generated. If there's already a task running with the desired
* id, an error is returned, and no new task will be created. The desired id has to be added to
* the json data, as a field called "id".
*
* @param task - The task name to be executed.
* @param jsonData - JsonData to be passed to the task. All fields except "id" are ignored by the
* task manager.
* @return the TaskStatus returned by the newly created task, or a NO_SUCH_TASK TaskStatus if
* there is no task registered under the given name.
*/
public static TaskStatus submitTask( final String task, final String jsonData )
{
// Get task class
Class<? extends AbstractTask> taskClass;
synchronized ( tasks ) {
taskClass = tasks.get( task );
if ( taskClass == null ) { // Not in map; either never called yet, or doesn't exist
taskClass = ClassLoaderHack.getClass( Global.TASK_PACKAGE_NAME, task, AbstractTask.class );
if ( taskClass == null ) { // Simply doesn't exist
log.warn( "Could not find " + task + " in " + Global.TASK_PACKAGE_NAME );
return TaskStatus.ts_noSuchTask;
}
tasks.put( task, taskClass ); // Cache for all future calls
}
}
// Instantiate using Gson
final AbstractTask taskInstance;
try {
taskInstance = gson.fromJson( jsonData, taskClass );
} catch ( JsonSyntaxException e ) {
log.warn( "Invocation request for " + task + " with invalid json: " + jsonData );
return TaskStatus.ts_jsonError;
}
if ( taskInstance == null ) {
log.warn( task + " exists, but could not be instanciated!" );
return TaskStatus.ts_noSuchConstructor;
}
if ( taskInstance.getId() == null ) {
log.warn( "Tried to launch " + task + " with null-id" );
return TaskStatus.ts_noSuchConstructor;
}
// Now check for id collision
synchronized ( instances ) {
if ( instances.containsKey( taskInstance.getId() ) ) {
log.info( "Ignoring task invocation of " + task + ": Duplicate ID: " + taskInstance.getId() );
return TaskStatus.ts_duplicateId;
}
instances.put( taskInstance.getId(), taskInstance );
}
AbstractTask parent = null;
if ( taskInstance.getParentTaskId() != null )
parent = instances.get( taskInstance.getParentTaskId() );
if ( taskInstance.init( parent ) ) {
checkForWork();
}
return taskInstance.getStatus();
}
public static void releaseTask( String taskId )
{
final AbstractTask task = instances.get( taskId );
if ( task != null )
task.release();
}
/**
* Wakes up the Taskmanager's mainloop so it will check if any of the current task instances
* is waiting for execution.
*/
protected static void checkForWork()
{
workLock.lock();
try {
doCheckForWork.signalAll();
} finally {
workLock.unlock();
}
}
public static void run()
{
try {
while ( !Global.doShutdown ) {
workLock.lock();
try {
doCheckForWork.await( 1, TimeUnit.MINUTES );
} finally {
workLock.unlock();
}
try {
for ( Iterator<AbstractTask> it = instances.values().iterator(); it.hasNext(); ) {
AbstractTask task = it.next();
if ( task.canBeReleased() ) {
it.remove();
log.debug( "Released task " + task.getClass().getSimpleName() + " (" + task.getId() + ")" );
continue;
}
if ( task.canStart() ) {
threadPool.execute( task );
log.debug( "Started Task " + task.getClass().getSimpleName() + " (" + task.getId() + ")" );
}
}
} catch ( RejectedExecutionException e ) {
log.warn( "ThreadPool rejected a task (" + e.getMessage() + ")" );
}
}
} catch ( InterruptedException e ) {
Thread.currentThread().interrupt();
} finally {
log.info( "Taskmanager mainloop finished." );
Global.doShutdown = true;
log.info( "Shutting down worker thread pool...." );
threadPool.shutdown();
try {
if ( threadPool.awaitTermination( 5, TimeUnit.MINUTES ) ) {
log.info( "Thread pool shut down!" );
} else {
log.info( "Trying to kill still running tasks...." );
threadPool.shutdownNow();
}
} catch ( InterruptedException e ) {
log.info( "Interrupted!" );
}
}
}
}
package org.openslx.taskmanager.network;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.log4j.Logger;
import org.openslx.taskmanager.Global;
/**
* The network listener that will receive incoming UDP packets, try to process
* them, and then send a reply.
*/
public class NetworkHandler implements Runnable
{
private static final Logger log = Logger.getLogger( NetworkHandler.class );
// Static part
private static Thread recvThread = null;
private static Thread sendThread = null;
/**
* Sender instance (Runnable handling outgoing packets)
*/
private static Sender sender = null;
/**
* UDP socket for sending and receiving.
*/
private static DatagramSocket socket;
/**
* Initialize the NetworkHandler by starting threads and opening the socket.
*/
public static void init() throws SocketException
{
if ( recvThread != null )
throw new RuntimeException( "Already initialized" );
socket = new DatagramSocket( Global.LISTEN_PORT, Global.LISTEN_ADDRESS );
recvThread = new Thread( new NetworkHandler() );
recvThread.start();
sendThread = new Thread( sender = new Sender() );
sendThread.start();
}
public static void shutdown()
{
socket.close();
}
public static void join()
{
try {
recvThread.join();
sendThread.join();
} catch ( InterruptedException e ) {
Thread.currentThread().interrupt();
}
}
// Class part
/**
* Prepare and enqueue reply for client request.
* Only ever to be called from the receiving thread. The reply message is crafted
* and then handed over to the sending thread.
*
* @param destination SocketAddress of the client
* @param messageId The same ID the client used in it's request.
* It's echoed back to the client to enable request bursts, and has no meaning for the
* server.
* @param status A TaskStatus instance to be serialized to json and sent to the client.
*/
private void send( SocketAddress destination, byte[] buffer )
{
final DatagramPacket packet;
try {
packet = new DatagramPacket( buffer, buffer.length, destination );
} catch ( SocketException e ) {
log.warn( "Could not construct datagram packet for target " + destination.toString() );
e.printStackTrace();
return;
}
sender.send( packet );
}
/**
* Main loop of receiving thread - wait until a packet arrives, then try to handle/decode
*/
@Override
public void run()
{
byte readBuffer[] = new byte[ 66000 ];
try {
while ( !Global.doShutdown ) {
DatagramPacket packet = new DatagramPacket( readBuffer, readBuffer.length );
try {
socket.receive( packet );
} catch ( IOException e ) {
log.info( "IOException on UDP socket when reading: " + e.getMessage() );
Thread.sleep( 100 );
continue;
}
if ( packet.getLength() < 2 ) {
log.debug( "Message too short" );
continue;
}
String payload = new String( readBuffer, 0, packet.getLength(), StandardCharsets.UTF_8 );
try {
byte[] reply = RequestParser.handle( payload );
if ( reply != null )
send( packet.getSocketAddress(), reply );
} catch ( Throwable t ) {
log.error( "Exception in RequestParser: " + t.getMessage() );
t.printStackTrace();
}
}
} catch ( InterruptedException e ) {
Thread.currentThread().interrupt();
} finally {
Global.doShutdown = true;
log.info( "UDP receiver finished." );
}
}
/**
* Private sending thread.
* Use blocking queue, wait for packet to be added to it, then try to send.
*/
static class Sender implements Runnable
{
/**
* Queue to stuff outgoing packets into.
*/
private final BlockingQueue<DatagramPacket> queue = new LinkedBlockingQueue<>( 128 );
/**
* Wait until something is put into the queue, then send it.
*/
@Override
public void run()
{
try {
while ( !Global.doShutdown ) {
final DatagramPacket packet;
packet = queue.take();
try {
socket.send( packet );
} catch ( IOException e ) {
log.debug( "Could not send UDP packet to " + packet.getAddress().getHostAddress().toString() );
}
}
} catch ( InterruptedException e ) {
Thread.currentThread().interrupt();
} finally {
Global.doShutdown = true;
log.info( "UDP sender finished." );
}
}
/**
* Add something to the outgoing packet queue.
* Called from the receiving thread.
*/
public void send( DatagramPacket packet )
{
if ( queue.offer( packet ) )
return;
log.warn( "Could not add packet to queue: Full" );
}
}
}
package org.openslx.taskmanager.network;
import java.nio.charset.StandardCharsets;
import org.apache.log4j.Logger;
import org.openslx.taskmanager.api.TaskStatus;
import org.openslx.taskmanager.main.Taskmanager;
import com.google.gson.Gson;
public class RequestParser
{
private static final Logger log = Logger.getLogger( RequestParser.class );
/**
* Our very own gson instance (for serializing replies)
*/
private static final Gson sendGson = new Gson();
/**
* Handle the given unparsed request.
*
* @param source source of the request, where the reply will be send to (if any)
* @param payload Packet data received from network, already converted to a string
*/
public static byte[] handle( String payload )
{
String[] parts = payload.split( " *, *", 3 );
// Message format is "<message id>, <command>, <command payload/argument>"
if ( parts.length != 3 ) {
log.debug( "Could not split message" );
return null;
}
// Look at parts[1], if it's "status" it's a request for the task
// with the ID given in parts[2]
if ( parts[1].equals( "status" ) ) {
TaskStatus status = Taskmanager.getTaskStatus( parts[2] );
return serialize( parts[0], status );
}
// Now check if parts[1] is "release"
if ( parts[1].equals( "release" ) ) {
Taskmanager.releaseTask( parts[2] );
return null;
}
// Anything else in parts[0] will be treated as a fresh task invocation, so let's
// pass it on to the task manager.
TaskStatus status = Taskmanager.submitTask( parts[1], parts[2] );
return serialize( parts[0], status );
}
private static byte[] serialize( String messageId, TaskStatus status )
{
String data;
try {
synchronized ( sendGson ) {
data = sendGson.toJson( status );
}
} catch ( Throwable e ) {
log.warn( "Could not serialize reply with TaskStatus " + status.getStatusObjectClassName() );
log.warn( e.toString() );
return null;
}
return ( messageId + ',' + data ).getBytes( StandardCharsets.UTF_8 );
}
}
package org.openslx.taskmanager.util;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
public class ClassLoaderHack
{
@SuppressWarnings( "rawtypes" )
private static final Class[] parameters = new Class[] { URL.class };
public static void addFile( String s ) throws IOException
{
File f = new File( s );
addFile( f );
}
public static void addFile( File f ) throws IOException
{
addURL( f.toURI().toURL() );
}
public static void addURL( URL u ) throws IOException
{
URLClassLoader sysloader = (URLClassLoader)ClassLoader.getSystemClassLoader();
Class<URLClassLoader> sysclass = URLClassLoader.class;
try {
Method method = sysclass.getDeclaredMethod( "addURL", parameters );
method.setAccessible( true );
method.invoke( sysloader, new Object[] { u } );
System.out.println( "Loaded " + u.toString() );
} catch ( Throwable t ) {
t.printStackTrace();
throw new IOException( "Error, could not add URL to system classloader" );
}
}
/**
* Get Class meta-object for given class in package. Only return class if it's somehow
* extending from given baseClass.
*
* @param packageName package to search in
* @param className name of class to look for
* @param baseClass class the class in question has to be extended from
* @return class meta object, or null if not found
*/
@SuppressWarnings( "unchecked" )
public static <T> Class<? extends T> getClass( String packageName, String className, Class<T> baseClass )
{
final Class<?> clazz;
try {
clazz = Class.forName( packageName + '.' + className, true, ClassLoader.getSystemClassLoader() );
} catch ( ClassNotFoundException e ) {
return null;
}
if ( clazz == null || ( baseClass != null && !baseClass.isAssignableFrom( clazz ) ) ) {
return null;
}
return (Class<? extends T>)clazz;
}
}
package org.openslx.taskmanager.util;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
public class Util
{
private static GsonBuilder gsonBuilder = new GsonBuilder();
/**
* Small helper to create a gson instance that will only handle class members with the
* "@Exposed" annotation. Decided against the default of explicitly excluding fields by
* making them transient, as you might easily forget to exclude an important field, which
* can in turn be a security issue.
*
* @return Gson instance
*/
public static Gson explicitGsonInstance()
{
return gsonBuilder.excludeFieldsWithoutExposeAnnotation().create();
}
}
package org.openslx.taskmanager;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
/**
* Unit test for simple App.
*/
public class AppTest
extends TestCase
{
/**
* Create the test case
*
* @param testName name of the test case
*/
public AppTest( String testName )
{
super( testName );
}
/**
* @return the suite of tests being tested
*/
public static Test suite()
{
return new TestSuite( AppTest.class );
}
/**
* Rigourous Test :-)
*/
public void testApp()
{
assertTrue( true );
}
}
pom.xml 0 → 100644
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.openslx.taskmanager</groupId>
<artifactId>taskmanager</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>pom</packaging>
<name>taskmanager</name>
<url>http://google.de/</url>
<modules>
<module>daemon</module>
<module>api</module>
</modules>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
</plugins>
</build>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
</project>
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment