Parallel Programming in .NET 4

Over the years software development has relied on increasing processor clock speeds to achieve better performance.  For better or worse though the trend has changed to adding more processing cores.  Generally speaking, software development hasn’t adjusted to account for this transition.  As a result many applications aren’t taking full advantage of the underlying platform and therefore they’re not performing as well as they could.  In order to take advantage of multi-core and multi-processor systems though we need to change the way we write code to include parallelization.

Historically, parallel programming has been viewed as the realm of highly specialized software where only experts dared to tread.  Parallel programming aims to improve performance by executing multiple operations simultaneously.  The .NET framework has always supported some level of parallel programming.  It has included threads and locks all the way back to the early days of the framework.  The problem with threads and locks though is that using them correctly is difficult and error prone.  Where do I need locks?  Can I lock on this?  Should I use lock, ReaderWriterLock, or ReaderWriterLockSlim?  How do I return a result from another thread?  What’s the signature for the ThreadStart delegate passed to the Thread constructor?  These questions haven’t even started to touch on pooling, deadlocks, exception handling, cancellations, or a multitude of other considerations.  .NET 4 doesn’t eliminate these classes but builds upon them.

Note: The examples in this post were written with LINQPad and make use of its Dump() extension method. If you don’t already have LINQPad you can download it here.

Parallel Extensions

One of the major feature sets of .NET 4 is collectively referred to as the Parallel Extensions.  The extensions reduce the inherent complexity by adding, you guessed it, a layer of abstraction above the traditional parallel features.  The three main components of the Parallel Extensions are:

  • Data structures for parallel programming
  • Task Parallel Library (TPL)
  • Parallel LINQ (PLINQ)

In addition to the above list Visual Studio 2010 also introduces several new diagnostic tools including the Parallel Stacks Window, Parallel Tasks Window, and the Concurrency Visualizer.

Data Structures for Parallel Programming

One way the Parallel Extensions simplify parallel processing is by introducing several new data structures.  Chief among these data structures are the concurrent collection classes found in System.Collections.Concurrent.  Unlike their traditional counterparts these classes provide thread-safe add and remove capabilities.  Of note are:

ConcurrentBag<T> is an unordered collection of objects.

ConcurrentDictionary<TKey, TValue> is a thread-safe dictionary.

ConcurrentStack<T> is a thread-safe stack (FILO).

ConcurrentQueue<T> is a thread-safe queue (FIFO).

The Parallel Extensions also add several new primitives found in System.Threading to enable concurrency and enhance performance by avoiding locking.  Notable classes include:

ManualResetEventSlim a light-weight version of ManualResetEvent that can only be used for intra-process communication.

SemaphoreSlim a light-weight version of Semaphore that restricts concurrent access to resources.

CountdownEvent simplifies unblocking threads by providing a signal count.  When the count reaches zero the waiting threads will be unblocked.

Barrier is essentially the opposite of CountdownEvent in that threads signal their arrival then block until other threads arrive.

Task Parallel Library (TPL)

The TPL introduces several new classes to the System.Threading and System.Threading.Tasks namespaces and is now the preferred way to write multithreaded and parallel code in .NET.  As its name implies, the TPL is task centric rather than thread centric.  This is a subtle difference with significant implications

Potential Parallelism

A key aspect of the TPL is potential parallelism and is where the difference between thread and task centricity comes into play.

When we create a new thread we always realize the overhead of allocating and scheduling that thread.  For one or two additional threads created for UI responsiveness this may or may not be an issue.  When trying to achieve concurrent processing though this can actually be detrimental to performance if the resources to support the new threads aren’t available.  Traditionally we could call QueueUserWorkItem on ThreadPool to reduce the overhead of creating threads but the TPL goes a step further.

The TPL aims to take full advantage of available system resources but that doesn’t mean that it will always perform work in parallel.  The truth is we typically don’t know much about the computers that are running our software in the wild.  Even if we know detailed information about the hardware we can’t predict how many resources will actually be available to our software.

What the TPL does is abstract away most of these considerations.  It uses task schedulers and work partitioners to achieve a level of parallelism that is appropriate for the underlying platform.  If the system only has a single core there isn’t much (any*) chance for parallelism whereas a system with 4 or more cores has a much better chance and will likely be able to work in parallel.

Data Parallelism

Data Parallelism lets us perform an action on each of the items in a collection or array in a concurrent manner.  The Parallel class in System.Threading.Tasks provides several overloads of both the For and ForEach methods to accomplish this.  Both of these methods (and their overloads) have syntax very similar to and are used much like their statement counterparts but there are some important differences.

First, to achieve the appropriate degree of parallelism for the host system the TPL will partition the data source and operate on the segments individually.  The task scheduler will also redistribute work should there be an imbalance.

Parallel.For(
	0,
	1000,
	i => Math.Sqrt(i).Dump(i.ToString())
);

Here we see the ForEach version of the above loop:

var items = Enumerable.Range(0, 1000).ToArray();

Parallel.ForEach(
	items,
	i => Math.Sqrt(i).Dump(i.ToString())
);

Second, For and ForEach are methods and the loop body is a delegate so we cannot end loop processing with break like we would with the traditional loops.  Instead we must supply a delegate that accepts a ParallelLoopState instance and invoke either the Break or Stop method.

Although both methods stop parallel loop execution they behave a bit differently.  The Stop method causes the loop to stop at its earliest convenience meaning that any currently executing iterations may continue running after Stop is invoked.

var results = new ConcurrentBag<double>();

Parallel.For(
	0,
	100000,
	(i, state) =>
	{
		if(i < 10000)
		{
			results.Add(Math.Sqrt(i));
		}
		else
		{
			state.Stop();
		}
	}
);

results.Count.Dump("Number of items");

Break on the other hand causes the loop to stop at its earliest convenience AFTER the current iteration.  It is also worth noting that Break and Stop cannot be used together.  Attempting to do so will result in an InvalidOperationException.

var results = new ConcurrentBag<double>();

Parallel.For(
	0,
	100000,
	(i, state) =>
	{
		if(i < 10000)
		{
			results.Add(Math.Sqrt(i));
		}
		else
		{
			state.Break();
		}
	}
);

results.Count.Dump("Number of items");

We can also define a local variable for each task in a parallel loop to operate upon.  Using a local variable allows the TPL to avoid locking thereby granting additional performance improvements.

var itemsProcessed = 0;

try
{
	Parallel.For(
		0,
		int.MaxValue,
		() => 0,
		(i, state, count) =>
		{
			count++;
			return count;
		},
		i => Interlocked.Add(ref itemsProcessed, i)
	);
}
catch(OperationCanceledException ex)
{
	ex.Message.Dump();
}

String.Format("Processed {0} items", itemsProcessed).Dump();

Finally, the TPL allows us to cancel parallel loops through the CancellationTokenSource class and CancellationToken structure.  To cancel a parallel loop we use a CancellationTokenSource to get a CancellationToken that is stored in the ParallelOptions instance that is passed to the loop method.

var cts = new CancellationTokenSource();
var options = new ParallelOptions()
{
	CancellationToken = cts.Token
};

Task.Factory.StartNew(
	() =>
	{
		Thread.Sleep(100);
		cts.Cancel();
	}
);

var itemsProcessed = 0;

try
{
	Parallel.For(
		0,
		int.MaxValue,
		options,
		() => 0,
		(i, state, count) =>
		{
			count++;
			return count;
		},
		i => Interlocked.Add(ref itemsProcessed, i)
	);
}
catch(OperationCanceledException ex)
{
	ex.Message.Dump();
}

String.Format("Processed {0} items", itemsProcessed).Dump();

Task Parallelism

Task parallelism involves running independent tasks concurrently.  In its most basic form task parallelism is achievable through the Parallel.Invoke method.  Parallel.Invoke accepts a params array of simple Actions that will be assigned to tasks.  An overload of the Invoke method also accepts a ParallelOptions argument for cancellations, etc…

Parallel.Invoke(
	() => "Task 1".Dump(),
	() => { Thread.Sleep(500); "Task 2".Dump(); },
	() => "Task 3".Dump()
);

Creating Tasks

Parallel.Invoke is nice for simple tasks but for more control we need to use tasks directly.  The most obvious way to create a task is through the task’s constructor.  Creating a task in this manner requires us to manually start it.

var task = new Task(() => Console.WriteLine("inside the task"));
task.Start();

Console.WriteLine("main thread");

Thread.Sleep(100);

Alternatively, we can also use the TaskFactory to create and start a new task.  Microsoft actually recommends using this approach when creation and scheduling don’t have to be separated.  For convenience the TaskFactory is accessible through the static Factory property on the Task classes.

var task = Task.Factory.StartNew(() => Console.WriteLine("inside the task"));

Console.WriteLine("main thread");

Thread.Sleep(100);

One nuance to note when using lambda expressions with tasks is that sometimes variables aren’t captured as expected – particularly within loops.  With loop variables only the final value is captured within the delegate.

for(var i = 0; i < 10; i++)
{
 	Task.Factory.StartNew(
 		() =>
		{
			i.Dump();
		}
	);
}

Thread.Sleep(200);

As a workaround we can store the variable in a separate state variable.

for(var i = 0; i < 10; i++)
{
 	var index = i;
 	Task.Factory.StartNew(
 		() =>
		{
			index.Dump();
		}
	);
}

Thread.Sleep(200);

Returning Values from Tasks

In addition to the standard Task class the TPL also offers a generic Task class.  Task provides an easy way to return a value from a task.  Since we’re working with a Task instance and supplying it with a delegate that returns a value we don’t have direct access to the delegate’s return value.  Instead, each Task provides a Result property that will return the delegate’s result.  If the task hasn’t completed when Result is accessed the property will block the thread until the value is available.

var task = Task.Factory.StartNew(
	() => 10
);

task.Result.Dump();

Waiting for Task Completion

Sometimes we need to wait for a task to complete before proceeding.  We can easily wait by calling one of the Wait overloads on the task we’re waiting for.  As we just saw, accessing a task’s Result property will also implicitly wait.

var task = Task.Factory.StartNew(
	() =>
	{
		Thread.Sleep(1000);
		"Task complete".Dump();
	}
);

"Waiting for task".Dump();
task.Wait();
"Done waiting".Dump();

There are also times when we need to wait for multiple tasks to complete before proceeding.  To do so we can use the WaitAll or WaitAny methods of the Task classes.

var tasks = new Task[]
{
	Task.Factory.StartNew(
		() =>
		{
			Thread.Sleep(200);
			"Task 1 complete".Dump();
		}
	),
	Task.Factory.StartNew(
		() =>
		{
			Thread.Sleep(250);
			"Task 2 complete".Dump();
		}
	),
	Task.Factory.StartNew(
		() =>
		{
			Thread.Sleep(200);
			"Task 3 complete".Dump();
		}
	)
};

"Waiting for all tasks".Dump();
Task.WaitAll(tasks);
"Done waiting".Dump();

WaitAny is used the same way but unlike WaitAll that waits for every task in the source array to complete, WaitAny will unblock as soon any of the tasks signals that it has completed.

var tasks = new Task[]
{
	Task.Factory.StartNew(
		() =>
		{
			Thread.Sleep(200);
			"Task 1 complete".Dump();
		}
	),
	Task.Factory.StartNew(
		() =>
		{
			Thread.Sleep(250);
			"Task 2 complete".Dump();
		}
	),
	Task.Factory.StartNew(
		() =>
		{
			Thread.Sleep(200);
			"Task 3 complete".Dump();
		}
	)
};

"Waiting for all tasks".Dump();
Task.WaitAny(tasks);
"Done waiting".Dump();

Note: All of the wait methods have overloads that allow us to set a timeout or listen for a cancellation token.

Spawning New Tasks

Tasks are in no way required to operate on their own.  Tasks can directly create child tasks and detached nested tasks.  Additionally, the TPL provides a few ways to create task continuations.

Child/Detached Tasks

When a task creates another task it can create it as attached (a child task) or detached (a nested task).  The way the task is created is dependent upon the value of the TaskCreationOptions argument passed to either the constructor or StartNew method of the task factory.  The most notable difference between the two is that a parent task will implicitly wait for its children to complete.  By default tasks are created as detached nested.

var parent = Task.Factory.StartNew(
	() =>
	{
		"Started parent task".Dump();
		Task.Factory.StartNew(
			() =>
			{
				"Started nested task".Dump();
				Thread.Sleep(200);
				"Finished nested task".Dump();
			}
		);

		"Finished parent task".Dump();
	}
);

"Waiting for all tasks".Dump();
parent.Wait();
"Done waiting".Dump();

The code for creating a child task is nearly identical to creating a detached nested task.

var parent = Task.Factory.StartNew(
	() =>
	{
		"Started parent task".Dump();
		Task.Factory.StartNew(
			() =>
			{
				"Started child task".Dump();
				Thread.Sleep(200);
				"Finished child task".Dump();
			},
			TaskCreationOptions.AttachedToParent
		);

		"Finished parent task".Dump();
	}
);

"Waiting for all tasks".Dump();
parent.Wait();
"Done waiting".Dump();
Continuations

Continuations allow us to start a new task when another task finishes.  The completed task, referred to as the antecedent task, is passed to the continuation task so the continuation can check its status and/or result.

The simplest way to create a continuation is through the task’s ContinueWith method.

var antecedent = new Task(
	() =>
	{
		"Started antecedent".Dump();
		Thread.Sleep(100);
		"Finished antecedent".Dump();
	}
);

var continuation = antecedent.ContinueWith(
	(a) =>
	{
		"Started continuation".Dump();
		String.Format("Antecedent status: {0}", a.Status).Dump();
		"Finished continuation".Dump();
	}
);

antecedent.Start();

"Waiting for all tasks".Dump();
continuation.Wait();
"Done waiting".Dump();

Note that continuations are neither child nor detached nested tasks.  They are independent of the antecedent task therefore if we need to wait for a continuation we do so just like any other task.

Sometimes we want to wait for more than one task to complete before kicking off a continuation.  The TaskFactory provides two static methods for this scenario.  The ContinueWhenAll and ContinueWhenAny methods accept an array of tasks to wait for before starting the continuation.

var antecedents = new Task[]
{
	Task.Factory.StartNew(
		() =>
		{
			"Started antecedent 1".Dump();
			Thread.Sleep(200);
			"Finished antecedent 1".Dump();
		}
	),
	Task.Factory.StartNew(
		() =>
		{
			"Started antecedent 2".Dump();
			Thread.Sleep(250);
			"Finished antecedent 2".Dump();
		}
	),
	Task.Factory.StartNew(
		() =>
		{
			"Started antecedent 3".Dump();
			Thread.Sleep(200);
			"Finished antecedent 3".Dump();
		}
	)
};

var continuation = Task.Factory.ContinueWhenAll(
	antecedents,
	(a) =>
	{
		"Started continuation".Dump();
		for(var i = 0; i < a.Length; i++)
		{
			String.Format("Antecedent status: {0}", a[i].Status).Dump();
		}
		"Finished continuation".Dump();
	}
);

"Waiting for all tasks".Dump();
continuation.Wait();
"Done waiting".Dump();

Despite their similar syntax there is a subtle but important difference between the delegates passed to the ContinueWhenAll and ContinueWhenAny methods. Since ContinueWhenAll will wait for all of the tasks in the source array to complete the delegate must accept an array of tasks. ContinueWhenAny on the other hand only waits for the first task to complete so its delegate only accepts that task as shown below.

var antecedents = new Task[]
{
	Task.Factory.StartNew(
		() =>
		{
			"Started antecedent 1".Dump();
			Thread.Sleep(200);
			"Finished antecedent 1".Dump();
		}
	),
	Task.Factory.StartNew(
		() =>
		{
			"Started antecedent 2".Dump();
			Thread.Sleep(250);
			"Finished antecedent 2".Dump();
		}
	),
	Task.Factory.StartNew(
		() =>
		{
			"Started antecedent 3".Dump();
			Thread.Sleep(200);
			"Finished antecedent 3".Dump();
		}
	)
};

var continuation = Task.Factory.ContinueWhenAny(
	antecedents,
	(a) =>
	{
		"Started continuation".Dump();
		String.Format("Antecedent status: {0}", a.Status).Dump();
		"Finished continuation".Dump();
	}
);

"Waiting for all tasks".Dump();
continuation.Wait();
"Done waiting".Dump();

Cancellations

We’ve already looked at cancelling a parallel loop and task cancellation works on the same principle.  Similar to parallel loops we need to provide tasks with a cancellation token either directly or through ParallelOptions.  Unlike with parallel loops however; we need to actually check the token for cancellations and possibly instruct the token to throw an exception.

var source = new CancellationTokenSource();
var token = source.Token;

var task = Task.Factory.StartNew(
	() =>
	{
		token.ThrowIfCancellationRequested();

		"Passed first cancellation attempt".Dump();

		Thread.Sleep(250);

		if(token.IsCancellationRequested)
		{
			token.ThrowIfCancellationRequested();
		}
	},
	token
);

Thread.Sleep(100);

source.Cancel();

"Waiting for all tasks".Dump();
try
{
	task.Wait();
}
catch(AggregateException ex)
{
	ex.ToString().Dump();
}
"Done waiting".Dump();

Cancelling a task with children is a bit more involved in that the cancellation token must also be passed to the child tasks.  In addition, in order to capture the exception thrown from a child tasks we must either wait on all tasks to complete or handle the exception in the parent task.

var source = new CancellationTokenSource();
var token = source.Token;

var tasks = new Task[2];

tasks[0] = Task.Factory.StartNew(
	() =>
	{
		token.ThrowIfCancellationRequested();

		"started parent task".Dump();

		tasks[1] = Task.Factory.StartNew(
			() =>
			{
				token.ThrowIfCancellationRequested();

				"started child task".Dump();

				Thread.Sleep(1000);

				if(token.IsCancellationRequested)
				{
					token.ThrowIfCancellationRequested();
				}

				"finished child task".Dump();
			},
			token,
			TaskCreationOptions.AttachedToParent,
			TaskScheduler.Current
		);

		"finished parent task".Dump();
	},
	token
);

Thread.Sleep(100);

source.Cancel();

"Waiting for all tasks".Dump();
try
{
	Task.WaitAll(tasks);
}
catch(AggregateException ex)
{
	ex.ToString().Dump();
}
"Done waiting".Dump();

Task Scheduling

Task schedulers are implementations of the abstract TaskScheduler, ensure that all work is queued and executed.  The default scheduler is the ThreadPoolTaskScheduler and, as its name implies, is based upon the standard .NET 4 ThreadPool.

The default scheduler is generally suitable for most scenarios but new schedulers can be created.  Whenever we want to use a new task scheduler we need to supply it to the task (or parallel loop) through an instance of the ParallelOptions class.  From inside a task we can get a reference to the current task scheduler by checking the static Current property on the TaskScheduler class.

More on the Task Factory

Most of the time the default TaskFactory that’s accessible through the Factory property of the Task classes is sufficient but occasionally you may want a factory that will automatically set some new default options such as a custom scheduler or a common cancellation token.  In those cases we can create a new TaskFactory and provide those options to the constructor.

Exception Handling

One of the complications of parallel programming is that an exception could be thrown from any number of tasks at any given time.  How do we know what to handle?  Do we stop processing the tasks when an exception is thrown?  .NET 4 answers these questions with the new AggregateException class.

AggregateException is a standard exception in that it derives directly from Exception.  As such it provides the InnerException and Message properties just like any other exception but it also expands upon it by providing an InnerExceptions property that contains all of the exceptions thrown by one or more tasks.

We can wrap a call to one of the wait methods (or Result) in a try block and catch the AggregateException.  Simply handling the AggregateException typically isn’t enough though; we’ll typically want to handle one or more specific exception types that might be contained within the InnerExceptions collection.  To achieve this AggregateException provides a Handle method that allows us to sequentially inspect each of the exceptions in the collection.

try
{
	throw new AggregateException(
			new NotSupportedException(),
			new ArgumentException(),
			new AggregateException(
				new ArgumentNullException(),
				new NotImplementedException()
			)
		);
}
catch(AggregateException ex)
{
	ex.Handle(
		ex1 =>
		{
			var aggregate = ex1 as AggregateException;
			if(aggregate != null)
			{
				aggregate.Handle(
					ex2 =>
					{
						if(ex2 is NotImplementedException)
						{
							ex2.ToString().Dump();
						}
						return true;
					}
				);
				return true;
			}
			return true;
		}
	);
}

Depending on the structure of the tasks (children, detached nested, etc…) the InnerException may contain additional AggregateExceptions.  Writing the recursive code to handle these instances can be problematic so AggregateException also provides a Flatten method.  Flatten traverses the InnerExceptions tree and returns a new AggregateException where all of the outermost exceptions from the original AggregateException are contained directly within the InnerExceptions collection.

try
{
	throw new AggregateException(
			new NotSupportedException(),
			new ArgumentException(),
			new AggregateException(
				new ArgumentNullException(),
				new NotImplementedException()
			)
		);
}
catch(AggregateException ex)
{
	ex.Flatten().Handle(
		ex1 =>
		{
			if(ex1 is NotImplementedException)
			{
				ex1.ToString().Dump();
			}
			return true;
		}
	);
}

Parallel LINQ

Built on top of the TPL, Parallel LINQ (PLINQ) brings concurrency to LINQ to Objects.  PLINQ expressions look virtually identical to traditional LINQ expressions and exposes of all of the standard query operators as extension methods through the ParallelEnumerable class in System.Linq.

We won’t be looking at PLINQ in detail but there are a few extension methods worth looking at.

AsParallel<TSource> is an extension method of IEnumerable and should be considered to be the entry point for a parallel query as it is what enables parallelization.  Simply calling the method on the source collection will convert the collection to a ParallelQuery object.

AsSequential<TSource> forces sequential evaluation of the query by converting a ParallelQuery object back to an IEnumerable object.

WithCancellation<TSource> allows cancelling a query by setting a standard CancellationToken.

WithExecutionMode<TSource> allows forcing parallel evaluation in scenarios where the TPL’s default behavior that may result in sequential evaluation.

WithDegreeOfParallelism restricts the number of processors used by the query.

ForAll<TSource> is an alternative to a traditional foreach loop that runs in parallel.  The traditional foreach loop requires the output from the parallel tasks be merged back into the thread where the loop is running.  ForAll doesn’t require the merge step.

Concurrency Considerations

Parallel isn’t Always Faster – sometimes the overhead involved with determining whether something should be parallelized actually takes longer than just running the process sequentially.  As a corollary, it is possible to add too much parallelization.  When there aren’t enough resources available to gain benefits from parallelization then the added overhead will decrease performance.

Writing to Shared Memory from multiple threads amplifies the potential for race conditions.  Additionally, the overhead associated with locks and synchronization can hamper performance.

Thread-Affinity still matters so when using technologies that impose restrictions requiring some code to run on a specific thread you may not be able to perform certain actions in a task without changing scheduler settings.

Final Thoughts

The Parallel Extensions are a huge step forward in the realm of parallel programming. The added layer of abstraction really isolates us from most of the complexity of thread management and allows us to focus on the real problems at hand while still utilizing the underlying platform to its full potential.

Additional Resources

Advertisements

One comment

Comments are closed.