Executors Framework

From Unify Community Wiki
Revision as of 00:30, 16 August 2010 by Magwo (Talk | contribs)

Jump to: navigation, search

Author: Magnus Wolffelt



This is a small framework designed to assist in the usage of multiple threads in a C#/.Net program. Multi-threading is a very complex subject, and it's really hard to write bug-free multi-threaded code - hence the need for frameworks that make it a little easier.

The way this works is with the concepts of "Executors", "Callables" and "Futures". An executor is an object that consumes Callable objects (work tasks), and returns Future objects. These Future objects have a generic parameter which is the result type of the computation. The Future objects can also be polled for completion, or they can be requsted to return the result - which blocks the calling thread until the result has been computed.

While similar to the AsyncOperation provided by Unity, this concept is primarily inspired by the Java standard library, which features an extensive collection of implementations for this purpose.


(Skip this section if you're already convinced! ;)) The primary advantage over AsyncOperation and .Net Begin/EndInvoke is that tasks in this framework are executed by an explicitly specified executor, which means one can easily change the manner in which async tasks are executed. For example, simply exchange the SingleThreadExecutor instantiation for an ImmediateExecutor, and you are no longer using multi-threading at all, but your other code remains exactly the same as before.

If someone implemented a thread-pool executor with multiple threads processing submitted tasks, the interface would remain the same, and you could easily switch from no threading, to dual-threading, to pooled threading, with no modifications to existing code. This is really convenient in some cases where you are not sure of the best way, or you want it to be configurable in runtime.

What's wrong with Begin/EndInvoke?

Like stated above, Begin/EndInvoke does not provide a means for controlling the way tasks are executed. It accesses a global (VM scope even?) thread pool, to which task are submitted. So you can't easily switch between threaded and non-threaded approaches - not even in compile time. Executors Framework lets the user change execution style even in runtime, by just replacing the executor object.

Why ICallable interface, and not simply delegates?

This is a good question, and the framework might switch to delegates in the future. However, some questions surfaced when delegates were tried briefly:

  1. Is it thread-safe to access the outer scope of an anonymous delegate, that is called from another thread? My brief tests showed synchronization problems in this scenario.
  2. How is the performance of anonymous delegates (that could be accessing outer scope) being called from another thread?

Until I find out some clear answers to these questions, the framework will stick to the ICallable interface, which makes it more apparent what is going on with the threads.


To execute tasks, you need an executor: <csharp>

   IExecutor myExecutor = new ImmediateExecutor(); // or new SingleThreadExecutor() for example


Then you submit tasks: <csharp>

   Future<int> myFuture1 = myExecutor.Submit(new MultiplyIntsTask(5, 7));
   Future<int> myFuture2 = myExecutor.Submit(new MultiplyIntsTask(5, 12));


And then you can either poll: <csharp>

   if(myFuture1.IsDone) { int myResult = myFuture1.GetResult(); }


or... get the result directly (blocking call): <csharp>

   int myResult = myFuture1.GetResult(); // Blocks until result is ready


Note that any exception cast during the computation task, will be thrown when calling GetResult().

The MultiplyIntsTask looks like this: <csharp>

   class MultiplyIntsTask : ICallable<int> {
     int a;
     int b;
     public MultiplyIntsTask(int a, int b) {
       this.a = a;
       this.b = b;
     public int Call() {
       return a * b;


This particular task is very fast and really not a good candidate for threaded processing. Consider this an illustrative example only.


There are 8 files:


<csharp> using System;

namespace Executors {

/// <summary> /// Interface implemented by all executors. /// </summary> /// <author>Magnus Wolffelt, magnus.wolffelt@gmail.com</author> public interface IExecutor { Future<T> Submit<T>(ICallable<T> callable); bool IsShutdown(); void Shutdown(); } } </csharp>


<csharp> using System;

namespace Executors {

/// <summary> /// Callable object that returns type T, and may throw an exception. /// </summary> /// <typeparam name="T">Type of the computation result object</typeparam> /// <author>Magnus Wolffelt, magnus.wolffelt@gmail.com</author> public interface ICallable<T> { T Call(); } } </csharp>


<csharp> using System; using System.Collections.Generic; using System.Threading;

namespace Executors {

/// <summary> /// A Future represents the result of a potentially asynchronous computation. /// Methods/properties are available to check if the operation is done or not. /// If an execption is thrown during the computation, this exception will be thrown /// when calling GetResult(). /// </summary> /// <typeparam name="T">Type of the computation result object</typeparam> /// <author>Magnus Wolffelt, magnus.wolffelt@gmail.com</author> public class Future<T> {

private T result; private Exception exception = null;

volatile bool isDone = false; /// <summary> /// Is the computation done? /// </summary> public bool IsDone { get { return isDone; } }

internal void SetResult(T result) { this.result = result; }

internal void SetException(Exception e) { exception = e; }

internal void SetDone() { isDone = true; }

/// <summary> /// Get the result of the computation. /// Blocks until the computation is done. /// </summary> public T GetResult() { // Could maybe do this with monitor instead. while(!IsDone) { Thread.Sleep(0); }

if(exception != null) { throw exception; }

return result; } } } </csharp>


<csharp> using System;

namespace Executors {

/// <summary> /// Non-threaded immediate executor. /// Mainly a convenience executor - makes it easy /// to switch between threaded and non-threaded approaches. /// </summary> /// <author>Magnus Wolffelt, magnus.wolffelt@gmail.com</author> public class ImmediateExecutor : IExecutor { private bool shutdown = false;

#region IExecutor Members

public Future<T> Submit<T>(ICallable<T> callable) { if(shutdown) { throw new InvalidOperationException("May not submit tasks after shutting down executor."); } Future<T> future = new Future<T>(); Task<T> task = new Task<T>(callable, future); ((ITask)task).Execute(); return future; }

public bool IsShutdown() { return shutdown; }

public void Shutdown() { shutdown = true; }

#endregion } } </csharp>


<csharp> using System; using System.Collections.Generic; using System.Threading;

namespace Executors {

/// <summary> /// Single threaded executor. Useful for asynchronous operations /// without making the program overly complex. /// </summary> /// <author>Magnus Wolffelt, magnus.wolffelt@gmail.com</author> class SingleThreadExecutor : IExecutor { private Thread workerThread = null; private readonly Queue<ITask> taskQueue = new Queue<ITask>(); private readonly object locker = new object();

volatile bool shutdown = false; volatile bool shutdownCompleted = false;

public SingleThreadExecutor() { ThreadStart start = new ThreadStart(RunWorker); workerThread = new Thread(start); workerThread.Start(); }

void RunWorker() { while(!shutdown) { lock(locker) { while(taskQueue.Count == 0 && !shutdown) { Monitor.Wait(locker); } }

while(taskQueue.Count > 0) { ITask task = null; lock(locker) { if(taskQueue.Count > 0) { task = taskQueue.Dequeue(); } } if(task != null) { task.Execute(); } } }

shutdownCompleted = true; }

#region IExecutor Members

public Future<T> Submit<T>(ICallable<T> callable) { lock(locker) { if(shutdown) { throw new InvalidOperationException("May not submit tasks after shutting down executor."); } Future<T> future = new Future<T>(); Task<T> task = new Task<T>(callable, future); taskQueue.Enqueue(task); Monitor.Pulse(locker); return future; } }

public bool IsShutdown() { return shutdownCompleted; }

public void Shutdown() { lock(locker) { shutdown = true; Monitor.Pulse(locker); } }

#endregion } } </csharp>


<csharp> using System;

namespace Executors {

/// <summary> /// Internal type used by executors to associate Future objects with /// callables, and to call the callable and set appropriate fields /// in the Future object. /// The non-generic interface is needed for the Executor code. /// </summary> /// <author>Magnus Wolffelt, magnus.wolffelt@gmail.com</author> internal interface ITask { void Execute(); }

internal class Task<T> : ITask { public readonly ICallable<T> callable; public readonly Future<T> future;

public Task(ICallable<T> callable, Future<T> future) { this.callable = callable; this.future = future; }

void ITask.Execute() { try { T result = callable.Call(); future.SetResult(result); } catch(Exception e) { future.SetException(new ExecutionException(e)); } finally { future.SetDone(); } } } } </csharp>


<csharp> using System;

namespace Executors {

/// <summary> /// Wrapper exception type for exceptions thrown during /// execution of an ICallable. /// </summary> /// <author>Magnus Wolffelt, magnus.wolffelt@gmail.com</author> public class ExecutionException : Exception {

public readonly Exception delayedException;

public ExecutionException(Exception delayedException) { this.delayedException = delayedException; } } } </csharp>


<csharp> using System; using System.Collections.Generic; using UnityEngine;

namespace Executors {

/// <summary> /// Simple class for (basic) unit testing of executors. /// </summary> /// <author>Magnus Wolffelt, magnus.wolffelt@gmail.com</author> public class ExecutorTester {

class MultiplyTask : ICallable<double> { int a; int b; public MultiplyTask(int a, int b) { this.a = a; this.b = b; }

public double Call() { return (double)a * b; } }

class ExceptionThrowingTask : ICallable<int> { public int Call() { throw new ExecutionException(new Exception("Task thrown exception.")); } }

public void TestAllExecutors() { List<IExecutor> toBeTested = new List<IExecutor>(); toBeTested.Add(new ImmediateExecutor()); toBeTested.Add(new SingleThreadExecutor());

foreach(IExecutor executor in toBeTested) { DoBasicTest(executor); }

foreach(IExecutor executor in toBeTested) { DoExceptionTest(executor); } }

void DoBasicTest(IExecutor executor) {

List<Future<double>> futures = new List<Future<double>>(); List<double> expectedAnswers = new List<double>();

for(int i = 1; i < 10; i++) { futures.Add(executor.Submit(new MultiplyTask(i, i))); expectedAnswers.Add(i * i); }

for(int i = 0; i < futures.Count; i++) { AssertAlmostEqual(futures[i].GetResult(), expectedAnswers[i]); AssertTrue(futures[i].IsDone);

Debug.Log("ExecutorTester: Basic test " + i + " with executor type " + executor.GetType().Name + " passed."); } }

void DoExceptionTest(IExecutor executor) { List<Future<int>> futures = new List<Future<int>>();

for(int i = 0; i < 10; i++) { futures.Add(executor.Submit(new ExceptionThrowingTask())); }

for(int i = 0; i < futures.Count; i++) { try { futures[i].GetResult(); // Not good throw new Exception("Shouldn't be here..."); } catch(ExecutionException) { // All good }


Debug.Log("ExecutorTester: Exception test " + i + " with executor type " + executor.GetType().Name + " passed."); } }

void AssertTrue(bool condition) { if(!condition) { throw new Exception("Condition not true."); } }

void AssertAlmostEqual(double d1, double d2) { if(System.Math.Abs(d1 - d2) > 0.0000001f) { throw new Exception("Numbers are not equal."); } }

} }


Personal tools