Executors Framework

From Unify Community Wiki
(Difference between revisions)
Jump to: navigation, search
(Usage)
m (Code)
 
(41 intermediate revisions by 3 users not shown)
Line 13: Line 13:
  
 
While similar to the AsyncOperation provided by Unity, this concept is primarily inspired by the Java standard library, which features an extensive collection of implementations for this purpose.
 
While similar to the AsyncOperation provided by Unity, this concept is primarily inspired by the Java standard library, which features an extensive collection of implementations for this purpose.
 
== Why? ==
 
The primary advantage over AsyncOperation is that these tasks are executed by an explicitly instantiated executor, which means one can easily change the way in which async tasks are executed. For example, simply exchange the SingleThreadExecutor instantiation for an ImmediateExecutor, and you are no longer using multi-threading at all, but your code remains the same as before.
 
  
 
== Usage ==
 
== Usage ==
  
 
To execute tasks, you need an executor:
 
To execute tasks, you need an executor:
<csharp>
+
<syntaxhighlight lang="csharp">
     IExecutor myExecutor = new ImmediateExecutor();
+
     IExecutor myExecutor = new ImmediateExecutor(); // or new SingleThreadExecutor() for example
</csharp>
+
</syntaxhighlight>
  
 
Then you submit tasks:
 
Then you submit tasks:
<csharp>
+
<syntaxhighlight lang="csharp">
 
     Future<int> myFuture1 = myExecutor.Submit(new MultiplyIntsTask(5, 7));
 
     Future<int> myFuture1 = myExecutor.Submit(new MultiplyIntsTask(5, 7));
 
     Future<int> myFuture2 = myExecutor.Submit(new MultiplyIntsTask(5, 12));
 
     Future<int> myFuture2 = myExecutor.Submit(new MultiplyIntsTask(5, 12));
</csharp>
+
</syntaxhighlight>
  
 
And then you can either poll:
 
And then you can either poll:
<csharp>
+
<syntaxhighlight lang="csharp">
 
     if(myFuture1.IsDone) { int myResult = myFuture1.GetResult(); }
 
     if(myFuture1.IsDone) { int myResult = myFuture1.GetResult(); }
</csharp>
+
</syntaxhighlight>
  
 
or... get the result directly (blocking call):
 
or... get the result directly (blocking call):
<csharp>
+
<syntaxhighlight lang="csharp">
 
     int myResult = myFuture1.GetResult(); // Blocks until result is ready
 
     int myResult = myFuture1.GetResult(); // Blocks until result is ready
</csharp>
+
</syntaxhighlight>
 +
 
 +
Note that any exception cast during the computation task, will be thrown when calling GetResult().
 +
 
 +
Also, the [[#ExecutionManager.cs|ExecutionManager]] helper component can take care of the polling, and do a delegate callback from the Unity thread, which is safe.
  
  
While not a typical usage example, the illustrative MultiplyIntsTask looks like this:
+
The MultiplyIntsTask looks like this:
<csharp>
+
<syntaxhighlight lang="csharp">
 
     class MultiplyIntsTask : ICallable<int> {
 
     class MultiplyIntsTask : ICallable<int> {
 
       int a;
 
       int a;
Line 54: Line 55:
 
       }
 
       }
 
     }
 
     }
</csharp>
+
</syntaxhighlight>
  
 +
This particular task is very fast and really not a good candidate for threaded processing. Consider this an illustrative example only.
  
=== Writing an event listener ===
+
== Why? ==
 +
(Skip this section if you're already convinced! ;))
  
<csharp>    void OnSpeedChanged(float speed)
+
The primary advantage over AsyncOperation and .Net Begin/EndInvoke is that tasks in this framework are executed by an explicitly specified executor, which means one can easily change the manner in which async tasks are executed. For example, simply exchange the SingleThreadExecutor instantiation for an ImmediateExecutor, and you are no longer using multi-threading at all, but your other code remains exactly the same as before.
    {
+
        this.speed = speed;
+
    }</csharp>
+
  
=== Registering an event listener ===
+
If someone implemented a thread-pool executor with multiple threads processing submitted tasks, the interface would remain the same, and you could easily switch from no threading, to dual-threading, to pooled threading, with no modifications to existing code. This is really convenient in some cases where you are not sure of the best way, or you want it to be configurable in runtime.
  
<csharp>    void OnEnable()
+
=== What's wrong with Begin/EndInvoke? ===
    {
+
Like stated above, Begin/EndInvoke does not provide a means for controlling the way tasks are executed. It accesses a global (VM scope even?) thread pool, to which task are submitted. So you can't easily switch between threaded and non-threaded approaches - not even in compile time. Executors Framework lets the user change execution style even in runtime, by just replacing the executor object. This may sound like a small detail, but for me it is important and has been very useful on several occasions.
        Messenger<float>.AddListener("speed changed", OnSpeedChanged);
+
    }</csharp>
+
  
=== Unregistering an event listener ===
+
=== Why ICallable interface, and not simply delegates? ===
 +
This is a good question, and the framework might switch to delegates in the future. However, some concerns surfaced when delegates were tried briefly:
 +
# Anonymous delegates can behave "oddly"[http://lorgonblog.spaces.live.com/Blog/cns!701679AD17B6D310!689.entry] when using outer variables, and invoked later. For example, it's not safe to use an iterator int value in an anonymous delegate object, unless the delegate is invoke immediately before the iterator variable is incremented.
 +
# Sometimes you will want to, later, access and inspect the data passed to the execution task that finished, which is trivial if using classes implementing the ICallable interface, but more complex when using delegate objects.
  
<csharp>    void OnDisable()
+
Until I feel that these issues have been resolved in a satisfying way, the framework will stick to the ICallable interface, which makes it more apparent what is going on with variables and data.
    {
+
        Messenger<float>.RemoveListener("speed changed", OnSpeedChanged);
+
    }</csharp>
+
  
=== Broadcasting an event ===
+
== Code ==
  
<csharp>    if (speed != lastSpeed)
+
There are 10 files:
    {
+
* [[#IExecutor.cs|IExecutor.cs]]
        Messenger<float>.Broadcast("speed changed", speed);
+
* [[#ICallable.cs|ICallable.cs]]
    }</csharp>
+
* [[#Future.cs|Future.cs]]
 +
* [[#ImmediateExecutor.cs|ImmediateExecutor.cs]]
 +
* [[#SingleThreadExecutor.cs|SingleThreadExecutor.cs]]
 +
* [[#WorkItem.cs|WorkItem.cs]]
 +
* [[#ExecutionException.cs|ExecutionException.cs]]
 +
* [[#ExecutorTester.cs|ExecutorTester.cs]] (Unit tests, not required for usage)
 +
* [[#ExecutionManager.cs|ExecutionManager.cs]] (Unity helper component, not required for usage)
 +
* [[#Example.cs|Example.cs]] (Helper component usage example, not required for usage)
  
== Code ==
+
Download all as one zipped unity package [[File:Executors_Framework.zip‎]]
There are three files, ''Callback.cs'', ''Messenger.cs'' and ''MessengerUnitTest.cs''. The last one is not required for usage.
+
==== Callback.cs ====
+
<csharp>// MessengerUnitTest.cs v1.0 by Magnus Wolffelt, magnus.wolffelt@gmail.com
+
//
+
// Delegates used in Messenger.cs.
+
  
public delegate void Callback();
+
=== IExecutor.cs ===
public delegate void Callback<T>(T arg1);
+
<syntaxhighlight lang="csharp">
public delegate void Callback<T, U>(T arg1, U arg2);
+
using System;
public delegate void Callback<T, U, V>(T arg1, U arg2, V arg3);</csharp>
+
  
==== Messenger.cs ====
+
namespace Executors {
<csharp>// Messenger.cs v1.0 by Magnus Wolffelt, magnus.wolffelt@gmail.com
+
//
+
// Inspired by and based on Rod Hyde's Messenger:
+
// http://www.unifycommunity.com/wiki/index.php?title=CSharpMessenger
+
//
+
// This is a C# messenger (notification center). It uses delegates
+
// and generics to provide type-checked messaging between event producers and
+
// event consumers, without the need for producers or consumers to be aware of
+
// each other. The major improvement from Hyde's implementation is that
+
// there is more extensive error detection, preventing silent bugs.
+
//
+
// Usage example:
+
// Messenger<float>.AddListener("myEvent", MyEventHandler);
+
// ...
+
// Messenger<float>.Broadcast("myEvent", 1.0f);
+
  
 +
/// <summary>
 +
/// Common interface for all executors that can execute tasks.
 +
/// Tasks are also known as ICallable objects.
 +
/// </summary>
 +
/// <author>Magnus Wolffelt, magnus.wolffelt@gmail.com</author>
 +
public interface IExecutor {
 +
Future<T> Submit<T>(ICallable<T> callable);
 +
bool IsShutdown();
 +
void Shutdown();
 +
int GetQueueSize();
 +
}
 +
 +
 +
/// <summary>
 +
/// Optional shutdown mode specified when creating certain
 +
/// types of executors. Note that this is not applicable
 +
/// to immediate executors.
 +
/// Default is FinishAll.
 +
/// </summary>
 +
public enum ShutdownMode {
 +
FinishAll,
 +
CancelQueuedTasks
 +
}
 +
}
 +
</syntaxhighlight>
  
 +
=== ICallable.cs ===
 +
<syntaxhighlight lang="csharp">
 
using System;
 
using System;
using System.Collections.Generic;
 
  
public enum MessengerMode {
+
namespace Executors {
DONT_REQUIRE_LISTENER,
+
 
REQUIRE_LISTENER,
+
/// <summary>
 +
/// Callable object that returns type T, and may throw an exception.
 +
/// WARNING: Do not make Unity calls from a potentially threaded work task.
 +
/// Unity is generally not thread-safe.
 +
/// </summary>
 +
/// <typeparam name="T">Type of the computation result object</typeparam>
 +
/// <author>Magnus Wolffelt, magnus.wolffelt@gmail.com</author>
 +
public interface ICallable<T> {
 +
T Call();
 +
}
 
}
 
}
 +
</syntaxhighlight>
  
 +
=== Future.cs ===
 +
<syntaxhighlight lang="csharp">
 +
using System;
 +
using System.Collections.Generic;
 +
using System.Threading;
  
static internal class MessengerInternal {
+
namespace Executors {
static public Dictionary<string, Delegate> eventTable = new Dictionary<string, Delegate>();
+
static public readonly MessengerMode DEFAULT_MODE = MessengerMode.REQUIRE_LISTENER;
+
  
static public void OnListenerAdding(string eventType, Delegate listenerBeingAdded) {
+
 
if (!eventTable.ContainsKey(eventType)) {
+
public interface IFuture {
eventTable.Add(eventType, null);
+
bool IsDone { get; }
 +
}
 +
 
 +
 
 +
/// <summary>
 +
/// A Future represents the result of a potentially asynchronous computation.
 +
/// Methods/properties are available to check if the operation is done or not.
 +
/// If an execption is thrown during the computation, this exception will be thrown
 +
/// when calling GetResult().
 +
/// </summary>
 +
/// <typeparam name="T">Type of the computation result object</typeparam>
 +
/// <author>Magnus Wolffelt, magnus.wolffelt@gmail.com</author>
 +
public class Future<T> : IFuture {
 +
 
 +
private T result;
 +
private Exception exception = null;
 +
 
 +
volatile bool isDone = false;
 +
/// <summary>
 +
/// Is the computation done?
 +
/// </summary>
 +
public bool IsDone {
 +
get { return isDone; }
 
}
 
}
  
Delegate d = eventTable[eventType];
+
 
if (d != null && d.GetType() != listenerBeingAdded.GetType()) {
+
internal void SetResult(T result) {
throw new ListenerException(string.Format("Attempting to add listener with inconsistent signature for event type {0}. Current listeners have type {1} and listener being added has type {2}", eventType, d.GetType().Name, listenerBeingAdded.GetType().Name));
+
this.result = result;
 
}
 
}
}
 
  
static public void OnListenerRemoving(string eventType, Delegate listenerBeingRemoved) {
+
internal void SetException(Exception e) {
if (eventTable.ContainsKey(eventType)) {
+
exception = e;
Delegate d = eventTable[eventType];
+
}
  
if (d == null) {
+
internal void SetDone() {
throw new ListenerException(string.Format("Attempting to remove listener with for event type {0} but current listener is null.", eventType));
+
isDone = true;
} else if (d.GetType() != listenerBeingRemoved.GetType()) {
+
}
throw new ListenerException(string.Format("Attempting to remove listener with inconsistent signature for event type {0}. Current listeners have type {1} and listener being removed has type {2}", eventType, d.GetType().Name, listenerBeingRemoved.GetType().Name));
+
 
 +
 
 +
/// <summary>
 +
/// Get the result of the computation.
 +
/// Blocks until the computation is done.
 +
/// </summary>
 +
public T GetResult() {
 +
// Could maybe do this with monitor instead.
 +
while(!IsDone) {
 +
Thread.Sleep(1);
 
}
 
}
} else {
+
 
throw new ListenerException(string.Format("Attempting to remove listener for type {0} but Messenger doesn't know about this event type.", eventType));
+
if(exception != null) {
 +
throw exception;
 +
}
 +
 
 +
return result;
 
}
 
}
 
}
 
}
 +
}
 +
</syntaxhighlight>
  
static public void OnListenerRemoved(string eventType) {
+
=== ImmediateExecutor.cs ===
if (eventTable[eventType] == null) {
+
<syntaxhighlight lang="csharp">
eventTable.Remove(eventType);
+
using System;
 +
 
 +
namespace Executors {
 +
 
 +
/// <summary>
 +
/// Non-threaded immediate executor.
 +
/// Mainly a convenience executor - makes it easy
 +
/// to switch between threaded and non-threaded approaches.
 +
/// </summary>
 +
/// <author>Magnus Wolffelt, magnus.wolffelt@gmail.com</author>
 +
public class ImmediateExecutor : IExecutor {
 +
private bool shutdown = false;
 +
 
 +
#region IExecutor Members
 +
 
 +
public Future<T> Submit<T>(ICallable<T> callable) {
 +
if(shutdown) {
 +
throw new InvalidOperationException("May not submit tasks after shutting down executor.");
 +
}
 +
Future<T> future = new Future<T>();
 +
WorkItem<T> task = new WorkItem<T>(callable, future);
 +
((IWorkItem)task).Execute();
 +
return future;
 
}
 
}
}
 
  
static public void OnBroadcasting(string eventType, MessengerMode mode) {
+
public bool IsShutdown() {
if (mode == MessengerMode.REQUIRE_LISTENER && !eventTable.ContainsKey(eventType)) {
+
return shutdown;
throw new MessengerInternal.BroadcastException(string.Format("Broadcasting message {0} but no listener found.", eventType));
+
 
}
 
}
}
 
  
static public BroadcastException CreateBroadcastSignatureException(string eventType) {
+
public void Shutdown() {
return new BroadcastException(string.Format("Broadcasting message {0} but listeners have a different signature than the broadcaster.", eventType));
+
shutdown = true;
}
+
 
+
public class BroadcastException : Exception {
+
public BroadcastException(string msg)
+
: base(msg) {
+
 
}
 
}
}
 
  
public class ListenerException : Exception {
+
public int GetQueueSize() {
public ListenerException(string msg)
+
return 0;
: base(msg) {
+
 
}
 
}
 +
 +
#endregion
 +
 +
 
}
 
}
 
}
 
}
 +
</syntaxhighlight>
  
 +
=== SingleThreadExecutor.cs ===
 +
<syntaxhighlight lang="csharp">
 +
using System;
 +
using System.Collections.Generic;
 +
using System.Threading;
 +
using UnityEngine;
  
// No parameters
+
namespace Executors {
static public class Messenger {
+
private static Dictionary<string, Delegate> eventTable = MessengerInternal.eventTable;
+
  
static public void AddListener(string eventType, Callback handler) {
+
/// <summary>
MessengerInternal.OnListenerAdding(eventType, handler);
+
/// Single threaded executor. Useful for asynchronous operations
eventTable[eventType] = (Callback)eventTable[eventType] + handler;
+
/// without making the program overly complex.
}
+
/// </summary>
 +
/// <author>Magnus Wolffelt, magnus.wolffelt@gmail.com</author>
 +
class SingleThreadExecutor : IExecutor {
 +
private Thread workerThread = null;
 +
private readonly Queue<IWorkItem> taskQueue = new Queue<IWorkItem>();
 +
private readonly object locker = new object();
  
static public void RemoveListener(string eventType, Callback handler) {
+
private ShutdownMode shutdownMode;
MessengerInternal.OnListenerRemoving(eventType, handler);
+
volatile bool shutdown = false;
eventTable[eventType] = (Callback)eventTable[eventType] - handler;
+
volatile bool shutdownCompleted = false;
MessengerInternal.OnListenerRemoved(eventType);
+
}
+
  
static public void Broadcast(string eventType) {
 
Broadcast(eventType, MessengerInternal.DEFAULT_MODE);
 
}
 
  
static public void Broadcast(string eventType, MessengerMode mode) {
+
public SingleThreadExecutor() : this(ShutdownMode.FinishAll) { }
MessengerInternal.OnBroadcasting(eventType, mode);
+
 
Delegate d;
+
public SingleThreadExecutor(ShutdownMode shutdownMode) {
if (eventTable.TryGetValue(eventType, out d)) {
+
this.shutdownMode = shutdownMode;
Callback callback = d as Callback;
+
ThreadStart start = new ThreadStart(RunWorker);
if (callback != null) {
+
workerThread = new Thread(start);
callback();
+
workerThread.Start();
} else {
+
}
throw MessengerInternal.CreateBroadcastSignatureException(eventType);
+
 
 +
 
 +
void RunWorker() {
 +
while(!shutdown) {
 +
lock(locker) {
 +
while(taskQueue.Count == 0 && !shutdown) {
 +
Monitor.Wait(locker);
 +
}
 +
}
 +
 
 +
while(taskQueue.Count > 0) {
 +
bool shouldCancel = (shutdown && shutdownMode.Equals(ShutdownMode.CancelQueuedTasks));
 +
if(shouldCancel) {
 +
break;
 +
}
 +
 
 +
IWorkItem task = null;
 +
lock(locker) {
 +
if(taskQueue.Count > 0) {
 +
task = taskQueue.Dequeue();
 +
}
 +
}
 +
if(task != null) {
 +
task.Execute();
 +
}
 +
}
 
}
 
}
 +
 +
foreach(IWorkItem task in taskQueue) {
 +
task.Cancel("Shutdown");
 +
}
 +
 +
shutdownCompleted = true;
 
}
 
}
}
 
}
 
  
// One parameter
 
static public class Messenger<T> {
 
private static Dictionary<string, Delegate> eventTable = MessengerInternal.eventTable;
 
  
static public void AddListener(string eventType, Callback<T> handler) {
+
#region IExecutor Members
MessengerInternal.OnListenerAdding(eventType, handler);
+
eventTable[eventType] = (Callback<T>)eventTable[eventType] + handler;
+
}
+
  
static public void RemoveListener(string eventType, Callback<T> handler) {
+
public Future<T> Submit<T>(ICallable<T> callable) {
MessengerInternal.OnListenerRemoving(eventType, handler);
+
lock(locker) {
eventTable[eventType] = (Callback<T>)eventTable[eventType] - handler;
+
if(shutdown) {
MessengerInternal.OnListenerRemoved(eventType);
+
throw new InvalidOperationException("May not submit tasks after shutting down executor.");
}
+
}
 +
Future<T> future = new Future<T>();
 +
WorkItem<T> task = new WorkItem<T>(callable, future);
 +
taskQueue.Enqueue(task);
 +
Monitor.Pulse(locker);
 +
return future;
 +
}
 +
}
  
static public void Broadcast(string eventType, T arg1) {
+
public bool IsShutdown() {
Broadcast(eventType, arg1, MessengerInternal.DEFAULT_MODE);
+
return shutdownCompleted;
}
+
}
  
static public void Broadcast(string eventType, T arg1, MessengerMode mode) {
+
public void Shutdown() {
MessengerInternal.OnBroadcasting(eventType, mode);
+
lock(locker) {
Delegate d;
+
shutdown = true;
if (eventTable.TryGetValue(eventType, out d)) {
+
Monitor.Pulse(locker);
Callback<T> callback = d as Callback<T>;
+
if (callback != null) {
+
callback(arg1);
+
} else {
+
throw MessengerInternal.CreateBroadcastSignatureException(eventType);
+
 
}
 
}
 
}
 
}
 +
 +
public int GetQueueSize() {
 +
// FIXME: Find out if lock is really necessary here.
 +
lock(locker) {
 +
return taskQueue.Count;
 +
}
 +
}
 +
 +
#endregion
 
}
 
}
 
}
 
}
 +
</syntaxhighlight>
  
 +
=== WorkItem.cs ===
 +
<syntaxhighlight lang="csharp">
 +
using System;
  
// Two parameters
+
namespace Executors {
static public class Messenger<T, U> {
+
private static Dictionary<string, Delegate> eventTable = MessengerInternal.eventTable;
+
  
static public void AddListener(string eventType, Callback<T, U> handler) {
+
/// <summary>
MessengerInternal.OnListenerAdding(eventType, handler);
+
/// Internal type used by executors to associate Future objects with
eventTable[eventType] = (Callback<T, U>)eventTable[eventType] + handler;
+
/// callables, and to call the callable and set appropriate fields
 +
/// in the Future object.
 +
/// The non-generic interface is needed for the Executor code.
 +
/// </summary>
 +
/// <author>Magnus Wolffelt, magnus.wolffelt@gmail.com</author>
 +
internal interface IWorkItem {
 +
void Execute();
 +
void Cancel(string reason);
 
}
 
}
  
static public void RemoveListener(string eventType, Callback<T, U> handler) {
+
internal class WorkItem<T> : IWorkItem {
MessengerInternal.OnListenerRemoving(eventType, handler);
+
public readonly ICallable<T> callable;
eventTable[eventType] = (Callback<T, U>)eventTable[eventType] - handler;
+
public readonly Future<T> future;
MessengerInternal.OnListenerRemoved(eventType);
+
}
+
  
static public void Broadcast(string eventType, T arg1, U arg2) {
+
public WorkItem(ICallable<T> callable, Future<T> future) {
Broadcast(eventType, arg1, arg2, MessengerInternal.DEFAULT_MODE);
+
this.callable = callable;
}
+
this.future = future;
 +
}
  
static public void Broadcast(string eventType, T arg1, U arg2, MessengerMode mode) {
+
public void Execute() {
MessengerInternal.OnBroadcasting(eventType, mode);
+
try {
Delegate d;
+
T result = callable.Call();
if (eventTable.TryGetValue(eventType, out d)) {
+
future.SetResult(result);
Callback<T, U> callback = d as Callback<T, U>;
+
} catch(Exception e) {
if (callback != null) {
+
future.SetException(new ExecutionException(e));
callback(arg1, arg2);
+
} finally {
} else {
+
future.SetDone();
throw MessengerInternal.CreateBroadcastSignatureException(eventType);
+
 
}
 
}
 +
}
 +
 +
public void Cancel(string reason) {
 +
if(future.IsDone) {
 +
throw new InvalidOperationException("Can not cancel a future that is done.");
 +
}
 +
future.SetException(new ExecutionException(new Exception("Task was cancelled due to: " + reason)));
 +
future.SetDone();
 
}
 
}
 
}
 
}
 
}
 
}
 +
</syntaxhighlight>
  
 +
=== ExecutionException.cs ===
 +
<syntaxhighlight lang="csharp">
 +
using System;
  
// Three parameters
+
namespace Executors {
static public class Messenger<T, U, V> {
+
private static Dictionary<string, Delegate> eventTable = MessengerInternal.eventTable;
+
  
static public void AddListener(string eventType, Callback<T, U, V> handler) {
+
/// <summary>
MessengerInternal.OnListenerAdding(eventType, handler);
+
/// Wrapper exception type for exceptions thrown during
eventTable[eventType] = (Callback<T, U, V>)eventTable[eventType] + handler;
+
/// execution of an ICallable.
}
+
/// </summary>
 +
/// <author>Magnus Wolffelt, magnus.wolffelt@gmail.com</author>
 +
public class ExecutionException : Exception {
  
static public void RemoveListener(string eventType, Callback<T, U, V> handler) {
+
public readonly Exception delayedException;
MessengerInternal.OnListenerRemoving(eventType, handler);
+
eventTable[eventType] = (Callback<T, U, V>)eventTable[eventType] - handler;
+
MessengerInternal.OnListenerRemoved(eventType);
+
}
+
  
static public void Broadcast(string eventType, T arg1, U arg2, V arg3) {
+
public ExecutionException(Exception delayedException) {
Broadcast(eventType, arg1, arg2, arg3, MessengerInternal.DEFAULT_MODE);
+
this.delayedException = delayedException;
 +
}
 
}
 
}
 +
}
 +
</syntaxhighlight>
  
static public void Broadcast(string eventType, T arg1, U arg2, V arg3, MessengerMode mode) {
+
=== ExecutorTester.cs ===
MessengerInternal.OnBroadcasting(eventType, mode);
+
<syntaxhighlight lang="csharp">
Delegate d;
+
using System;
if (eventTable.TryGetValue(eventType, out d)) {
+
using System.Collections.Generic;
Callback<T, U, V> callback = d as Callback<T, U, V>;
+
using UnityEngine;
if (callback != null) {
+
using System.Threading;
callback(arg1, arg2, arg3);
+
 
} else {
+
namespace Executors {
throw MessengerInternal.CreateBroadcastSignatureException(eventType);
+
 
 +
/// <summary>
 +
/// Simple class for (basic) unit testing of executors.
 +
/// </summary>
 +
/// <author>Magnus Wolffelt, magnus.wolffelt@gmail.com</author>
 +
public class ExecutorTester {
 +
 
 +
class MultiplyTask : ICallable<double> {
 +
    int a;
 +
    int b;
 +
    public MultiplyTask(int a, int b) {
 +
        this.a = a;
 +
        this.b = b;
 +
    }
 +
 
 +
    public double Call() {
 +
Thread.Sleep(10);
 +
        return (double)a * b;
 +
    }
 +
}
 +
 
 +
 
 +
class ExceptionThrowingTask : ICallable<int> {
 +
    public int Call() {
 +
        throw new ExecutionException(new Exception("Task thrown exception."));
 +
    }
 +
}
 +
 
 +
bool doLogging;
 +
 
 +
 
 +
public ExecutorTester(bool doLogging)
 +
{
 +
this.doLogging = doLogging;
 +
}
 +
 
 +
private void Log(string msg)
 +
{
 +
if (doLogging)
 +
{
 +
Debug.Log("ExecutorTester: " + msg);
 
}
 
}
 
}
 
}
}
 
}</csharp>
 
  
==== MessengerUnitTest.cs ====
 
<csharp>// MessengerUnitTest.cs v1.0 by Magnus Wolffelt, magnus.wolffelt@gmail.com
 
//
 
// Some functionality testing of the classes in Messenger.cs.
 
// A lot of attention is paid to proper exception throwing from the Messenger.
 
  
using System;
+
public void TestAllExecutors() {
 +
List<IExecutor> toBeTested = new List<IExecutor>();
 +
toBeTested.Add(new ImmediateExecutor());
 +
toBeTested.Add(new SingleThreadExecutor());
  
class MessengerUnitTest {
+
foreach(IExecutor executor in toBeTested) {
 +
DoBasicTest(executor);
 +
}
  
private readonly string eventType1 = "__testEvent1";
+
foreach(IExecutor executor in toBeTested) {
private readonly string eventType2 = "__testEvent2";
+
DoExceptionTest(executor);
 +
}
  
bool wasCalled = false;
+
foreach(IExecutor executor in toBeTested) {
 +
DoShutdownTest(executor);
 +
}
  
public void RunTest() {
+
DoShutdownWithPendingTasksTest(new SingleThreadExecutor());
RunAddTests();
+
DoShutdownWithPendingTasksTest(new SingleThreadExecutor(ShutdownMode.CancelQueuedTasks));
RunBroadcastTests();
+
}
RunRemoveTests();
+
Console.Out.WriteLine("All Messenger tests passed.");
+
}
+
  
 +
  
private void RunAddTests() {
 
Messenger.AddListener(eventType1, TestCallback);
 
  
try {
+
 
// This should fail because we're adding a new event listener for same event type but a different delegate signature
+
void DoBasicTest(IExecutor executor) {
Messenger<float>.AddListener(eventType1, TestCallbackFloat);
+
 
throw new Exception("Unit test failure - expected a ListenerException");
+
List<Future<double>> futures = new List<Future<double>>();
} catch (MessengerInternal.ListenerException e) {
+
List<double> expectedAnswers = new List<double>();
// All good
+
 
 +
for(int i = 1; i < 10; i++) {
 +
futures.Add(executor.Submit(new MultiplyTask(i, i)));
 +
//futures.Add(executor.Submit<double>(delegate () { return i*i; } ));
 +
expectedAnswers.Add(i * i);
 +
}
 +
 
 +
for(int i = 0; i < futures.Count; i++) {
 +
AssertAlmostEqual(futures[i].GetResult(), expectedAnswers[i]);
 +
AssertTrue(futures[i].IsDone);
 +
 
 +
Log("Basic test " + i + " with executor type " + executor.GetType().Name + " passed.");
 +
}
 
}
 
}
  
Messenger<float>.AddListener(eventType2, TestCallbackFloat);
 
}
 
  
  
private void RunBroadcastTests() {
+
void DoExceptionTest(IExecutor executor) {
wasCalled = false;
+
List<Future<int>> futures = new List<Future<int>>();
Messenger.Broadcast(eventType1);
+
if (!wasCalled) { throw new Exception("Unit test failure - event handler appears to have not been called."); }
+
wasCalled = false;
+
Messenger<float>.Broadcast(eventType2, 1.0f);
+
if (!wasCalled) { throw new Exception("Unit test failure - event handler appears to have not been called."); }
+
  
// No listener should exist for this event, but we don't require a listener so it should pass
+
for(int i = 0; i < 10; i++) {
Messenger<float>.Broadcast(eventType2 + "_", 1.0f, MessengerMode.DONT_REQUIRE_LISTENER);
+
futures.Add(executor.Submit(new ExceptionThrowingTask()));
 +
//futures.Add(executor.Submit<int>(
 +
// delegate () { throw new ExecutionException(new Exception("Task thrown exception.")); }));
 +
}
  
try {
+
for(int i = 0; i < futures.Count; i++) {
// Broadcasting for an event there exists listeners for, but using wrong signature
+
try {
Messenger<float>.Broadcast(eventType1, 1.0f, MessengerMode.DONT_REQUIRE_LISTENER);
+
futures[i].GetResult();
throw new Exception("Unit test failure - expected a BroadcastException");
+
// Not good
 +
throw new Exception("Shouldn't be here...");
 +
} catch(ExecutionException) {
 +
// All good
 +
}
 +
 
 +
AssertTrue(futures[i].IsDone);
 +
 
 +
Log("Exception test " + i + " with executor type " + executor.GetType().Name + " passed.");
 +
}
 
}
 
}
catch (MessengerInternal.BroadcastException e) {
+
 
// All good
+
void DoShutdownTest(IExecutor executor) {
 +
executor.Shutdown();
 +
 
 +
for(int i = 0; i < 20; i++) {
 +
if(executor.IsShutdown()) {
 +
Log("Shutdown test with executor type " + executor.GetType().Name + " passed.");
 +
return;
 +
} else {
 +
Thread.Sleep(100);
 +
}
 +
}
 +
throw new Exception("Executor " + executor.GetType().Name + " failed to shutdown in a timely manner.");
 
}
 
}
  
try {
+
 
// Same thing, but now we (implicitly) require at least one listener
+
void DoShutdownWithPendingTasksTest(IExecutor executor) {
Messenger<float>.Broadcast(eventType2 + "_", 1.0f);
+
 
throw new Exception("Unit test failure - expected a BroadcastException");
+
List<Future<double>> futures = new List<Future<double>>();
} catch (MessengerInternal.BroadcastException e) {
+
for(int i = 0; i < 20; i++) {
// All good
+
futures.Add(executor.Submit(new MultiplyTask(i, i)));
 +
}
 +
 
 +
Thread.Sleep(100);
 +
DoShutdownTest(executor);
 +
int queueSize = executor.GetQueueSize();
 +
Log("Items in queue after shutdown: " + queueSize);
 +
int successCount = 0;
 +
int cancelledCount = 0;
 +
for(int i = 0; i < futures.Count; i++) {
 +
try {
 +
if(!futures[i].IsDone) {
 +
throw new Exception("All queued tasks should have been set to done during shutdown.");
 +
}
 +
double result = futures[i].GetResult();
 +
successCount++;
 +
} catch(ExecutionException) {
 +
cancelledCount++;
 +
}
 +
}
 +
AssertEqual(queueSize, cancelledCount);
 +
Log("Shutdown with pending tasks: " + successCount + " completed, " + cancelledCount + " cancelled.");
 
}
 
}
  
try {
+
 
// Wrong generic type for this broadcast, and we implicitly require a listener
+
void AssertTrue(bool condition) {
Messenger<double>.Broadcast(eventType2, 1.0);
+
if(!condition) {
throw new Exception("Unit test failure - expected a BroadcastException");
+
throw new Exception("Condition not true.");
} catch (MessengerInternal.BroadcastException e) {
+
}
// All good
+
}
 +
 
 +
void AssertEqual(int i1, int i2) {
 +
if(i1 != i2) {
 +
throw new Exception("Numbers are not equal: " + i1 + " , " + i2);
 +
}
 +
}
 +
 
 +
void AssertAlmostEqual(double d1, double d2) {
 +
if(System.Math.Abs(d1 - d2) > 0.0000001f) {
 +
throw new Exception("Numbers are not equal: " + d1 + " , " + d2);
 +
}
 
}
 
}
  
 
}
 
}
 +
}
  
  
private void RunRemoveTests() {
 
  
try {
+
</syntaxhighlight>
// Removal with wrong signature should fail
+
 
Messenger<float>.RemoveListener(eventType1, TestCallbackFloat);
+
=== ExecutionManager.cs ===
throw new Exception("Unit test failure - expected a ListenerException");
+
<syntaxhighlight lang="csharp">
 +
using System;
 +
using System.Collections.Generic;
 +
using UnityEngine;
 +
using Executors;
 +
using System.Threading;
 +
 
 +
 
 +
/// <summary>
 +
/// Unity helper component for convenient usage of the Executors Framework.
 +
/// </summary>
 +
/// <author>Magnus Wolffelt, magnus.wolffelt@gmail.com</author>
 +
[AddComponentMenu("Executors Framework/Execution Manager")]
 +
public class ExecutionManager : MonoBehaviour {
 +
public delegate void TaskFinishedHandler<T>(ICallable<T> finishedTask, Future<T> finishedFuture);
 +
 
 +
private interface IManagedTask {
 +
void CallCallback();
 +
bool IsDone { get; }
 +
}
 +
 
 +
private class ManagedTask<T> : IManagedTask {
 +
ICallable<T> callable;
 +
Future<T> future;
 +
TaskFinishedHandler<T> finishedHandler;
 +
 
 +
public ManagedTask(ICallable<T> callable, Future<T> future, TaskFinishedHandler<T> finishedHandler) {
 +
this.callable = callable;
 +
this.future = future;
 +
this.finishedHandler = finishedHandler;
 
}
 
}
catch (MessengerInternal.ListenerException e) {
+
 
// All good
+
public bool IsDone { get { return future.IsDone; } }
 +
 
 +
public void CallCallback() {
 +
finishedHandler(callable, future);
 
}
 
}
 +
}
  
Messenger.RemoveListener(eventType1, TestCallback);
+
 +
/// <summary>
 +
/// The number of worker threads to use for execution.
 +
/// Can currently be 0 (immediate) or 1 (single worker thread).
 +
/// </summary>
 +
public int threadCount = 0;
  
try {
+
/// <summary>
// Repeated removal should fail
+
/// The number of queued tasks, at which the execution manager
Messenger.RemoveListener(eventType1, TestCallback);
+
/// will log warning messages.
throw new Exception("Unit test failure - expected a ListenerException");
+
/// </summary>
 +
public int taskCountWarningThreshold = 100;
 +
private IExecutor executor;
 +
 
 +
private List<IManagedTask> managedTasks = new List<IManagedTask>();
 +
 
 +
 
 +
void Awake() {
 +
 
 +
new ExecutorTester(true).TestAllExecutors();
 +
 
 +
if(threadCount == 0) {
 +
executor = new ImmediateExecutor();
 +
} else if(threadCount == 1) {
 +
executor = new SingleThreadExecutor();
 +
} else {
 +
throw new NotImplementedException("Currently only 0-1 thread executors are supported.");
 
}
 
}
catch (MessengerInternal.ListenerException e) {
+
}
// All good
+
 
 +
 
 +
void FixedUpdate() {
 +
 
 +
foreach(IManagedTask managedTask in managedTasks) {
 +
if(managedTask.IsDone) {
 +
managedTask.CallCallback();
 +
}
 
}
 
}
  
+
managedTasks.RemoveAll(delegate(IManagedTask managedTask) { return managedTask.IsDone; });
 +
}
  
Messenger<float>.RemoveListener(eventType2, TestCallbackFloat);
 
  
try {
+
void OnApplicationQuit() {
// Repeated removal should fail
+
executor.Shutdown();
Messenger<float>.RemoveListener(eventType2, TestCallbackFloat);
+
while(!executor.IsShutdown()) {
throw new Exception("Unit test failure - expected a ListenerException");
+
Thread.Sleep(10);
 
}
 
}
catch (MessengerInternal.ListenerException e) {
+
}
// All good
+
 
 +
 
 +
/// <summary>
 +
/// Submits a task for execution, and calls provided delegate
 +
/// when the task has been completed.
 +
/// </summary>
 +
/// <typeparam name="T">Type of task computation result</typeparam>
 +
/// <param name="task">Task to execute</param>
 +
/// <param name="finishedHandler">Handler to call when task has been completed.
 +
/// Can be null.</param>
 +
public void SubmitAndManage<T>(ICallable<T> task, TaskFinishedHandler<T> finishedHandler) {
 +
if(managedTasks.Count >= taskCountWarningThreshold) {
 +
Debug.LogWarning("Execution Manager on " + gameObject.name + " currently has " + managedTasks.Count + " work tasks in queue.");
 
}
 
}
 +
 +
Future<T> future = executor.Submit<T>(task);
 +
managedTasks.Add(new ManagedTask<T>(task, future, finishedHandler));
 
}
 
}
  
 +
 +
/// <summary>
 +
/// Submits a task for execution, but does not store the future object.
 +
/// Will not call back in any way when the task is done - user is expected
 +
/// to poll the Future object.
 +
/// </summary>
 +
/// <typeparam name="T">Type of task computation result</typeparam>
 +
/// <param name="task">Task to execute</param>
 +
/// <returns>A future object that can be polled for completion</returns>
 +
public Future<T> SubmitAndForget<T>(ICallable<T> task) {
 +
return executor.Submit<T>(task);
 +
}
  
void TestCallback() {
+
 
wasCalled = true;
+
/// <summary>
Console.Out.WriteLine("TestCallback() was called.");
+
/// Gets the number of items queued up for execution.
 +
/// </summary>
 +
public int GetQueueSize() {
 +
return executor.GetQueueSize();
 
}
 
}
 +
}
  
void TestCallbackFloat(float f) {
 
wasCalled = true;
 
Console.Out.WriteLine("TestCallbackFloat(float) was called.");
 
  
if (f != 1.0f) {
+
</syntaxhighlight>
throw new Exception("Unit test failure - wrong value on float argument");
+
 
 +
=== Example.cs ===
 +
<syntaxhighlight lang="csharp">
 +
using System;
 +
using UnityEngine;
 +
using Executors;
 +
using System.Threading;
 +
 
 +
/// <summary>
 +
/// Simple demonstration of how to use an ExecutionManager.
 +
/// </summary>
 +
/// <author>Magnus Wolffelt, magnus.wolffelt@gmail.com</author>
 +
[RequireComponent(typeof(ExecutionManager))]
 +
[AddComponentMenu("Executors Framework/Example")]
 +
public class Example : MonoBehaviour {
 +
 
 +
private class LenghtyTask : ICallable<int> {
 +
public int a;
 +
public int creationFrameNumber;
 +
 
 +
public LenghtyTask(int a, int creationFrameNumber) {
 +
this.a = a;
 +
this.creationFrameNumber = creationFrameNumber;
 +
}
 +
 
 +
public int Call() {
 +
Thread.Sleep(20);
 +
return a * a;
 
}
 
}
 
}
 
}
  
  
+
 
}</csharp>
+
void LenghtyTaskFinishedHandler(ICallable<int> task, Future<int> result) {
 +
LenghtyTask ourTask = task as LenghtyTask;
 +
Debug.Log("Frame #"+ Time.frameCount + ": Task with a=" + ourTask.a +
 +
" created on frame #" + ourTask.creationFrameNumber +
 +
" resulted in: " + result.GetResult());
 +
}
 +
 
 +
 
 +
void FixedUpdate() {
 +
ExecutionManager manager = GetComponent<ExecutionManager>();
 +
if(manager.GetQueueSize() < 5) {
 +
ICallable<int> task = new LenghtyTask(UnityEngine.Random.Range(5, 20), Time.frameCount);
 +
manager.SubmitAndManage(task, LenghtyTaskFinishedHandler);
 +
}
 +
 +
}
 +
}
 +
 
 +
</syntaxhighlight>

Latest revision as of 16:08, 26 March 2012

Author: Magnus Wolffelt

Contents

[edit] Description

This is a small framework designed to assist in the usage of multiple threads in a C#/.Net program. Multi-threading is a very complex subject, and it's really hard to write bug-free multi-threaded code - hence the need for frameworks that make it a little easier.

The way this works is with the concepts of "Executors", "Callables" and "Futures". An executor is an object that consumes Callable objects (work tasks), and returns Future objects. These Future objects have a generic parameter which is the result type of the computation. The Future objects can also be polled for completion, or they can be requsted to return the result - which blocks the calling thread until the result has been computed.

While similar to the AsyncOperation provided by Unity, this concept is primarily inspired by the Java standard library, which features an extensive collection of implementations for this purpose.

[edit] Usage

To execute tasks, you need an executor:

    IExecutor myExecutor = new ImmediateExecutor(); // or new SingleThreadExecutor() for example

Then you submit tasks:

    Future<int> myFuture1 = myExecutor.Submit(new MultiplyIntsTask(5, 7));
    Future<int> myFuture2 = myExecutor.Submit(new MultiplyIntsTask(5, 12));

And then you can either poll:

    if(myFuture1.IsDone) { int myResult = myFuture1.GetResult(); }

or... get the result directly (blocking call):

    int myResult = myFuture1.GetResult(); // Blocks until result is ready

Note that any exception cast during the computation task, will be thrown when calling GetResult().

Also, the ExecutionManager helper component can take care of the polling, and do a delegate callback from the Unity thread, which is safe.


The MultiplyIntsTask looks like this:

    class MultiplyIntsTask : ICallable<int> {
      int a;
      int b;
      public MultiplyIntsTask(int a, int b) {
        this.a = a;
        this.b = b;
      }
      public int Call() {
        return a * b;
      }
    }

This particular task is very fast and really not a good candidate for threaded processing. Consider this an illustrative example only.

[edit] Why?

(Skip this section if you're already convinced! ;))

The primary advantage over AsyncOperation and .Net Begin/EndInvoke is that tasks in this framework are executed by an explicitly specified executor, which means one can easily change the manner in which async tasks are executed. For example, simply exchange the SingleThreadExecutor instantiation for an ImmediateExecutor, and you are no longer using multi-threading at all, but your other code remains exactly the same as before.

If someone implemented a thread-pool executor with multiple threads processing submitted tasks, the interface would remain the same, and you could easily switch from no threading, to dual-threading, to pooled threading, with no modifications to existing code. This is really convenient in some cases where you are not sure of the best way, or you want it to be configurable in runtime.

[edit] What's wrong with Begin/EndInvoke?

Like stated above, Begin/EndInvoke does not provide a means for controlling the way tasks are executed. It accesses a global (VM scope even?) thread pool, to which task are submitted. So you can't easily switch between threaded and non-threaded approaches - not even in compile time. Executors Framework lets the user change execution style even in runtime, by just replacing the executor object. This may sound like a small detail, but for me it is important and has been very useful on several occasions.

[edit] Why ICallable interface, and not simply delegates?

This is a good question, and the framework might switch to delegates in the future. However, some concerns surfaced when delegates were tried briefly:

  1. Anonymous delegates can behave "oddly"[1] when using outer variables, and invoked later. For example, it's not safe to use an iterator int value in an anonymous delegate object, unless the delegate is invoke immediately before the iterator variable is incremented.
  2. Sometimes you will want to, later, access and inspect the data passed to the execution task that finished, which is trivial if using classes implementing the ICallable interface, but more complex when using delegate objects.

Until I feel that these issues have been resolved in a satisfying way, the framework will stick to the ICallable interface, which makes it more apparent what is going on with variables and data.

[edit] Code

There are 10 files:

Download all as one zipped unity package File:Executors Framework.zip

[edit] IExecutor.cs

using System;
 
namespace Executors {
 
	/// <summary>
	/// Common interface for all executors that can execute tasks.
	/// Tasks are also known as ICallable objects.
	/// </summary>
	/// <author>Magnus Wolffelt, magnus.wolffelt@gmail.com</author>
	public interface IExecutor {
		Future<T> Submit<T>(ICallable<T> callable);
		bool IsShutdown();
		void Shutdown();
		int GetQueueSize();
	}
 
 
	/// <summary>
	/// Optional shutdown mode specified when creating certain
	/// types of executors. Note that this is not applicable
	/// to immediate executors.
	/// Default is FinishAll.
	/// </summary>
	public enum ShutdownMode {
		FinishAll,
		CancelQueuedTasks
	}
}

[edit] ICallable.cs

using System;
 
namespace Executors {
 
	/// <summary>
	/// Callable object that returns type T, and may throw an exception.
	/// WARNING: Do not make Unity calls from a potentially threaded work task.
	/// Unity is generally not thread-safe.
	/// </summary>
	/// <typeparam name="T">Type of the computation result object</typeparam>
	/// <author>Magnus Wolffelt, magnus.wolffelt@gmail.com</author>
	public interface ICallable<T> {
		T Call();
	}
}

[edit] Future.cs

using System;
using System.Collections.Generic;
using System.Threading;
 
namespace Executors {
 
 
	public interface IFuture {
		bool IsDone { get; }
	}
 
 
	/// <summary>
	/// A Future represents the result of a potentially asynchronous computation.
	/// Methods/properties are available to check if the operation is done or not.
	/// If an execption is thrown during the computation, this exception will be thrown
	/// when calling GetResult().
	/// </summary>
	/// <typeparam name="T">Type of the computation result object</typeparam>
	/// <author>Magnus Wolffelt, magnus.wolffelt@gmail.com</author>
	public class Future<T> : IFuture {
 
		private T result;
		private Exception exception = null;
 
		volatile bool isDone = false;
		/// <summary>
		/// Is the computation done?
		/// </summary>
		public bool IsDone {
			get { return isDone; }
		}
 
 
		internal void SetResult(T result) {
			this.result = result;
		}
 
		internal void SetException(Exception e) {
			exception = e;
		}
 
		internal void SetDone() {
			isDone = true;
		}
 
 
		/// <summary>
		/// Get the result of the computation.
		/// Blocks until the computation is done.
		/// </summary>
		public T GetResult() {
			// Could maybe do this with monitor instead.
			while(!IsDone) {
				Thread.Sleep(1);
			}
 
			if(exception != null) {
				throw exception;
			}
 
			return result;
		}
	}
}

[edit] ImmediateExecutor.cs

using System;
 
namespace Executors {
 
	/// <summary>
	/// Non-threaded immediate executor.
	/// Mainly a convenience executor - makes it easy
	/// to switch between threaded and non-threaded approaches.
	/// </summary>
	/// <author>Magnus Wolffelt, magnus.wolffelt@gmail.com</author>
	public class ImmediateExecutor : IExecutor {
		private bool shutdown = false;
 
		#region IExecutor Members
 
		public Future<T> Submit<T>(ICallable<T> callable) {
			if(shutdown) {
				throw new InvalidOperationException("May not submit tasks after shutting down executor.");
			}
			Future<T> future = new Future<T>();
			WorkItem<T> task = new WorkItem<T>(callable, future);
			((IWorkItem)task).Execute();
			return future;
		}
 
		public bool IsShutdown() {
			return shutdown;
		}
 
		public void Shutdown() {
			shutdown = true;
		}
 
		public int GetQueueSize() {
			return 0;
		}
 
		#endregion
 
 
	}
}

[edit] SingleThreadExecutor.cs

using System;
using System.Collections.Generic;
using System.Threading;
using UnityEngine;
 
namespace Executors {
 
	/// <summary>
	/// Single threaded executor. Useful for asynchronous operations
	/// without making the program overly complex.
	/// </summary>
	/// <author>Magnus Wolffelt, magnus.wolffelt@gmail.com</author>
	class SingleThreadExecutor : IExecutor {
		private Thread workerThread = null;
		private readonly Queue<IWorkItem> taskQueue = new Queue<IWorkItem>();
		private readonly object locker = new object();
 
		private ShutdownMode shutdownMode;
		volatile bool shutdown = false;
		volatile bool shutdownCompleted = false;
 
 
		public SingleThreadExecutor() : this(ShutdownMode.FinishAll) { }
 
		public SingleThreadExecutor(ShutdownMode shutdownMode) {
			this.shutdownMode = shutdownMode;
			ThreadStart start = new ThreadStart(RunWorker);
			workerThread = new Thread(start);
			workerThread.Start();
		}
 
 
		void RunWorker() {
			while(!shutdown) {
				lock(locker) {
					while(taskQueue.Count == 0 && !shutdown) {
						Monitor.Wait(locker);
					}
				}
 
				while(taskQueue.Count > 0) {
					bool shouldCancel = (shutdown && shutdownMode.Equals(ShutdownMode.CancelQueuedTasks));
					if(shouldCancel) {
						break;
					}
 
					IWorkItem task = null;
					lock(locker) {
						if(taskQueue.Count > 0) {
							task = taskQueue.Dequeue();
						}
					}
					if(task != null) {
						task.Execute();
					}
				}
			}
 
			foreach(IWorkItem task in taskQueue) {
				task.Cancel("Shutdown");
			}
 
			shutdownCompleted = true;
		}
 
 
		#region IExecutor Members
 
		public Future<T> Submit<T>(ICallable<T> callable) {
			lock(locker) {
				if(shutdown) {
					throw new InvalidOperationException("May not submit tasks after shutting down executor.");
				}
				Future<T> future = new Future<T>();
				WorkItem<T> task = new WorkItem<T>(callable, future);
				taskQueue.Enqueue(task);
				Monitor.Pulse(locker);
				return future;
			}
		}
 
		public bool IsShutdown() {
			return shutdownCompleted;
		}
 
		public void Shutdown() {
			lock(locker) {
				shutdown = true;
				Monitor.Pulse(locker);
			}
		}
 
		public int GetQueueSize() {
			// FIXME: Find out if lock is really necessary here.
			lock(locker) {
				return taskQueue.Count;
			}
		}
 
		#endregion
	}
}

[edit] WorkItem.cs

using System;
 
namespace Executors {
 
	/// <summary>
	/// Internal type used by executors to associate Future objects with
	/// callables, and to call the callable and set appropriate fields
	/// in the Future object.
	/// The non-generic interface is needed for the Executor code.
	/// </summary>
	/// <author>Magnus Wolffelt, magnus.wolffelt@gmail.com</author>
	internal interface IWorkItem {
		void Execute();
		void Cancel(string reason);
	}
 
	internal class WorkItem<T> : IWorkItem {
		public readonly ICallable<T> callable;
		public readonly Future<T> future;
 
		public WorkItem(ICallable<T> callable, Future<T> future) {
			this.callable = callable;
			this.future = future;
		}
 
		public void Execute() {
			try {
				T result = callable.Call();
				future.SetResult(result);
			} catch(Exception e) {
				future.SetException(new ExecutionException(e));
			} finally {
				future.SetDone();
			}
		}
 
		public void Cancel(string reason) {
			if(future.IsDone) {
				throw new InvalidOperationException("Can not cancel a future that is done.");
			}
			future.SetException(new ExecutionException(new Exception("Task was cancelled due to: " + reason)));
			future.SetDone();
		}
	}
}

[edit] ExecutionException.cs

using System;
 
namespace Executors {
 
	/// <summary>
	/// Wrapper exception type for exceptions thrown during
	/// execution of an ICallable.
	/// </summary>
	/// <author>Magnus Wolffelt, magnus.wolffelt@gmail.com</author>
	public class ExecutionException : Exception {
 
		public readonly Exception delayedException;
 
		public ExecutionException(Exception delayedException) {
			this.delayedException = delayedException;
		}
	}
}

[edit] ExecutorTester.cs

using System;
using System.Collections.Generic;
using UnityEngine;
using System.Threading;
 
namespace Executors {
 
	/// <summary>
	/// Simple class for (basic) unit testing of executors.
	/// </summary>
	/// <author>Magnus Wolffelt, magnus.wolffelt@gmail.com</author>
	public class ExecutorTester {
 
		class MultiplyTask : ICallable<double> {
		    int a;
		    int b;
		    public MultiplyTask(int a, int b) {
		        this.a = a;
		        this.b = b;
		    }
 
		    public double Call() {
				Thread.Sleep(10);
		        return (double)a * b;
		    }
		}
 
 
		class ExceptionThrowingTask : ICallable<int> {
		    public int Call() {
		        throw new ExecutionException(new Exception("Task thrown exception."));
		    }
		}
 
		bool doLogging;
 
 
		public ExecutorTester(bool doLogging)
		{
			this.doLogging = doLogging;
		}
 
		private void Log(string msg)
		{
			if (doLogging)
			{
				Debug.Log("ExecutorTester: " + msg);
			}
		}
 
 
		public void TestAllExecutors() {
			List<IExecutor> toBeTested = new List<IExecutor>();
			toBeTested.Add(new ImmediateExecutor());
			toBeTested.Add(new SingleThreadExecutor());
 
			foreach(IExecutor executor in toBeTested) {
				DoBasicTest(executor);
			}
 
			foreach(IExecutor executor in toBeTested) {
				DoExceptionTest(executor);
			}
 
			foreach(IExecutor executor in toBeTested) {
				DoShutdownTest(executor);
			}
 
			DoShutdownWithPendingTasksTest(new SingleThreadExecutor());
			DoShutdownWithPendingTasksTest(new SingleThreadExecutor(ShutdownMode.CancelQueuedTasks));
		}
 
 
 
 
 
		void DoBasicTest(IExecutor executor) {
 
			List<Future<double>> futures = new List<Future<double>>();
			List<double> expectedAnswers = new List<double>();
 
			for(int i = 1; i < 10; i++) {
				futures.Add(executor.Submit(new MultiplyTask(i, i)));
				//futures.Add(executor.Submit<double>(delegate () { return i*i; } ));
				expectedAnswers.Add(i * i);
			}
 
			for(int i = 0; i < futures.Count; i++) {
				AssertAlmostEqual(futures[i].GetResult(), expectedAnswers[i]);
				AssertTrue(futures[i].IsDone);
 
				Log("Basic test " + i + " with executor type " + executor.GetType().Name + " passed.");
			}
		}
 
 
 
		void DoExceptionTest(IExecutor executor) {
			List<Future<int>> futures = new List<Future<int>>();
 
			for(int i = 0; i < 10; i++) {
				futures.Add(executor.Submit(new ExceptionThrowingTask()));
				//futures.Add(executor.Submit<int>(
				//	delegate () { throw new ExecutionException(new Exception("Task thrown exception.")); }));
			}
 
			for(int i = 0; i < futures.Count; i++) {
				try {
					futures[i].GetResult();
					// Not good
					throw new Exception("Shouldn't be here...");
				} catch(ExecutionException) {
					// All good
				}
 
				AssertTrue(futures[i].IsDone);
 
				Log("Exception test " + i + " with executor type " + executor.GetType().Name + " passed.");
			}
		}
 
		void DoShutdownTest(IExecutor executor) {
			executor.Shutdown();
 
			for(int i = 0; i < 20; i++) {
				if(executor.IsShutdown()) {
					Log("Shutdown test with executor type " + executor.GetType().Name + " passed.");
					return;
				} else {
					Thread.Sleep(100);
				}
			}
			throw new Exception("Executor " + executor.GetType().Name + " failed to shutdown in a timely manner.");
		}
 
 
		void DoShutdownWithPendingTasksTest(IExecutor executor) {
 
			List<Future<double>> futures = new List<Future<double>>();
			for(int i = 0; i < 20; i++) {
				futures.Add(executor.Submit(new MultiplyTask(i, i)));
			}
 
			Thread.Sleep(100);
			DoShutdownTest(executor);
			int queueSize = executor.GetQueueSize();
			Log("Items in queue after shutdown: " + queueSize);
			int successCount = 0;
			int cancelledCount = 0;
			for(int i = 0; i < futures.Count; i++) {
				try {
					if(!futures[i].IsDone) {
						throw new Exception("All queued tasks should have been set to done during shutdown.");
					}
					double result = futures[i].GetResult();
					successCount++;
				} catch(ExecutionException) {
					cancelledCount++;
				}
			}
			AssertEqual(queueSize, cancelledCount);
			Log("Shutdown with pending tasks: " + successCount + " completed, " + cancelledCount + " cancelled.");
		}
 
 
		void AssertTrue(bool condition) {
			if(!condition) {
				throw new Exception("Condition not true.");
			}
		}
 
		void AssertEqual(int i1, int i2) {
			if(i1 != i2) {
				throw new Exception("Numbers are not equal: " + i1 + " , " + i2);
			}
		}
 
		void AssertAlmostEqual(double d1, double d2) {
			if(System.Math.Abs(d1 - d2) > 0.0000001f) {
				throw new Exception("Numbers are not equal: " + d1 + " , " + d2);
			}
		}
 
	}
}

[edit] ExecutionManager.cs

using System;
using System.Collections.Generic;
using UnityEngine;
using Executors;
using System.Threading;
 
 
/// <summary>
/// Unity helper component for convenient usage of the Executors Framework.
/// </summary>
/// <author>Magnus Wolffelt, magnus.wolffelt@gmail.com</author>
[AddComponentMenu("Executors Framework/Execution Manager")]
public class ExecutionManager : MonoBehaviour {
	public delegate void TaskFinishedHandler<T>(ICallable<T> finishedTask, Future<T> finishedFuture);
 
	private interface IManagedTask {
		void CallCallback();
		bool IsDone { get; }
	}
 
	private class ManagedTask<T> : IManagedTask {
		ICallable<T> callable;
		Future<T> future;
		TaskFinishedHandler<T> finishedHandler;
 
		public ManagedTask(ICallable<T> callable, Future<T> future, TaskFinishedHandler<T> finishedHandler) {
			this.callable = callable;
			this.future = future;
			this.finishedHandler = finishedHandler;
		}
 
		public bool IsDone { get { return future.IsDone; } }
 
		public void CallCallback() {
			finishedHandler(callable, future);
		}
	}
 
 
	/// <summary>
	/// The number of worker threads to use for execution.
	/// Can currently be 0 (immediate) or 1 (single worker thread).
	/// </summary>
	public int threadCount = 0;
 
	/// <summary>
	/// The number of queued tasks, at which the execution manager
	/// will log warning messages.
	/// </summary>
	public int taskCountWarningThreshold = 100;
	private IExecutor executor;
 
	private List<IManagedTask> managedTasks = new List<IManagedTask>();
 
 
	void Awake() {
 
		new ExecutorTester(true).TestAllExecutors();
 
		if(threadCount == 0) {
			executor = new ImmediateExecutor();
		} else if(threadCount == 1) {
			executor = new SingleThreadExecutor();
		} else {
			throw new NotImplementedException("Currently only 0-1 thread executors are supported.");
		}
	}
 
 
	void FixedUpdate() {
 
		foreach(IManagedTask managedTask in managedTasks) {
			if(managedTask.IsDone) {
				managedTask.CallCallback();
			}
		}
 
		managedTasks.RemoveAll(delegate(IManagedTask managedTask) { return managedTask.IsDone; });
	}
 
 
	void OnApplicationQuit() {
		executor.Shutdown();
		while(!executor.IsShutdown()) {
			Thread.Sleep(10);
		}
	}
 
 
	/// <summary>
	/// Submits a task for execution, and calls provided delegate
	/// when the task has been completed.
	/// </summary>
	/// <typeparam name="T">Type of task computation result</typeparam>
	/// <param name="task">Task to execute</param>
	/// <param name="finishedHandler">Handler to call when task has been completed.
	/// Can be null.</param>
	public void SubmitAndManage<T>(ICallable<T> task, TaskFinishedHandler<T> finishedHandler) {
		if(managedTasks.Count >= taskCountWarningThreshold) {
			Debug.LogWarning("Execution Manager on " + gameObject.name + " currently has " + managedTasks.Count + " work tasks in queue.");
		}
 
		Future<T> future = executor.Submit<T>(task);
		managedTasks.Add(new ManagedTask<T>(task, future, finishedHandler));
	}
 
 
	/// <summary>
	/// Submits a task for execution, but does not store the future object.
	/// Will not call back in any way when the task is done - user is expected
	/// to poll the Future object.
	/// </summary>
	/// <typeparam name="T">Type of task computation result</typeparam>
	/// <param name="task">Task to execute</param>
	/// <returns>A future object that can be polled for completion</returns>
	public Future<T> SubmitAndForget<T>(ICallable<T> task) {
		return executor.Submit<T>(task);
	}
 
 
	/// <summary>
	/// Gets the number of items queued up for execution.
	/// </summary>
	public int GetQueueSize() {
		return executor.GetQueueSize();
	}
}

[edit] Example.cs

using System;
using UnityEngine;
using Executors;
using System.Threading;
 
/// <summary>
/// Simple demonstration of how to use an ExecutionManager.
/// </summary>
/// <author>Magnus Wolffelt, magnus.wolffelt@gmail.com</author>
[RequireComponent(typeof(ExecutionManager))]
[AddComponentMenu("Executors Framework/Example")]
public class Example : MonoBehaviour {
 
	private class LenghtyTask : ICallable<int> {
		public int a;
		public int creationFrameNumber;
 
		public LenghtyTask(int a, int creationFrameNumber) {
			this.a = a;
			this.creationFrameNumber = creationFrameNumber;
		}
 
		public int Call() {
			Thread.Sleep(20);
			return a * a;
		}
	}
 
 
 
	void LenghtyTaskFinishedHandler(ICallable<int> task, Future<int> result) {
		LenghtyTask ourTask = task as LenghtyTask;
		Debug.Log("Frame #"+ Time.frameCount + ": Task with a=" + ourTask.a + 
			" created on frame #" + ourTask.creationFrameNumber + 
			" resulted in: " + result.GetResult());
	}
 
 
	void FixedUpdate() {
		ExecutionManager manager = GetComponent<ExecutionManager>();
		if(manager.GetQueueSize() < 5) {
			ICallable<int> task = new LenghtyTask(UnityEngine.Random.Range(5, 20), Time.frameCount);
			manager.SubmitAndManage(task, LenghtyTaskFinishedHandler);
		}
 
	}
}
Personal tools
Namespaces

Variants
Actions
Navigation
Extras
Toolbox