Multithreading deserialization using BinaryFormatter or MessagePack is not giving me the performence boost I expect and I'm trying to understand why. Deserializing all chunks at the same time is around 5 times slower than deserializing 1 chunk for this setup.
Here is my test code with different types of multithreading. Tasks, Threads and ParallelForEach is used and the fastest by far is Threads.
The chunks are read in before performing the deserialization meaning no IO bound work is done and the chunks are split into amount of cores I have.
using System.Collections;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Runtime.Serialization.Formatters.Binary;
namespace SerializerDeserializeTests
{
public class Program
{
static void Main(string[] args)
{
int threads = 8;
int entries = 500000;
string directory = "data";
if(!Directory.Exists(directory))
{
Directory.CreateDirectory(directory);
Hashtable hashtable = GenerateHashtable(entries);
List<Hashtable> chunks = ChunkHashtable(hashtable, threads);
Export(chunks, directory);
}
else
{
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
List<byte[]> chunks = Directory.GetFiles(directory).Select(File.ReadAllBytes).ToList();
Console.WriteLine("Read: " + stopwatch.Elapsed.TotalSeconds);
stopwatch.Stop();
DeserializeParallelForEach(chunks).Clear();
DeserializeParallelForEachOneChunk(chunks).Clear();
DeserializeTasks(chunks).Clear();
DeserializeTasksOneChunk(chunks).Clear();
DeserializeThreads(chunks).Clear();
DeserializeThreadsOneChunk(chunks).Clear();
}
}
public static List<Hashtable> DeserializeThreadsOneChunk(List<byte[]> chunks)
{
chunks = new List<byte[]>() { chunks[0] };
Console.WriteLine("----- DeserializeThreadsOneChunk -----");
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
ConcurrentBag<Hashtable> hashtables = new ConcurrentBag<Hashtable>();
List<Thread> threads = new List<Thread>();
foreach (byte[] chunk in chunks)
{
Thread thread = new Thread(() =>
{
BinaryFormatter binaryFormatter = new BinaryFormatter();
using (MemoryStream memoryStream = new MemoryStream(chunk))
{
#pragma warning disable SYSLIB0011 // Type or member is obsolete
hashtables.Add((Hashtable)binaryFormatter.Deserialize(memoryStream));
#pragma warning restore SYSLIB0011 // Type or member is obsolete
}
});
threads.Add(thread);
thread.Start();
}
foreach (var thread in threads)
thread.Join();
Console.WriteLine("Deserialize: " + stopwatch.Elapsed.TotalSeconds);
stopwatch.Stop();
return hashtables.ToList();
}
public static List<Hashtable> DeserializeThreads(List<byte[]> chunks)
{
Console.WriteLine("----- DeserializeThreads -----");
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
ConcurrentBag<Hashtable> hashtables = new ConcurrentBag<Hashtable>();
List<Thread> threads = new List<Thread>();
foreach (byte[] chunk in chunks)
{
Thread thread = new Thread(() =>
{
BinaryFormatter binaryFormatter = new BinaryFormatter();
using (MemoryStream memoryStream = new MemoryStream(chunk))
{
#pragma warning disable SYSLIB0011 // Type or member is obsolete
hashtables.Add((Hashtable)binaryFormatter.Deserialize(memoryStream));
#pragma warning restore SYSLIB0011 // Type or member is obsolete
}
});
threads.Add(thread);
thread.Start();
}
foreach (var thread in threads)
thread.Join();
Console.WriteLine("Deserialize: " + stopwatch.Elapsed.TotalSeconds);
stopwatch.Stop();
return hashtables.ToList();
}
public static List<Hashtable> DeserializeTasksOneChunk(List<byte[]> chunks)
{
chunks = new List<byte[]>() { chunks[0] };
Console.WriteLine("----- DeserializeTasksOneChunk -----");
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
List<Task<Hashtable>> tasks = new List<Task<Hashtable>>();
foreach (byte[] chunk in chunks)
{
Task<Hashtable> task = Task.Run(() =>
{
BinaryFormatter binaryFormatter = new BinaryFormatter();
using (MemoryStream memoryStream = new MemoryStream(chunk))
{
#pragma warning disable SYSLIB0011 // Type or member is obsolete
return (Hashtable)binaryFormatter.Deserialize(memoryStream);
#pragma warning restore SYSLIB0011 // Type or member is obsolete
}
});
tasks.Add(task);
}
Task.WaitAll(tasks.ToArray());
Console.WriteLine("Deserialize: " + stopwatch.Elapsed.TotalSeconds);
stopwatch.Stop();
return tasks.Select(x => x.Result).ToList();
}
public static List<Hashtable> DeserializeTasks(List<byte[]> chunks)
{
Console.WriteLine("----- DeserializeTasks -----");
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
List<Task<Hashtable>> tasks = new List<Task<Hashtable>>();
foreach (byte[] chunk in chunks)
{
Task<Hashtable> task = Task.Run(() =>
{
BinaryFormatter binaryFormatter = new BinaryFormatter();
using (MemoryStream memoryStream = new MemoryStream(chunk))
{
#pragma warning disable SYSLIB0011 // Type or member is obsolete
return (Hashtable)binaryFormatter.Deserialize(memoryStream);
#pragma warning restore SYSLIB0011 // Type or member is obsolete
}
});
tasks.Add(task);
}
Task.WaitAll(tasks.ToArray());
Console.WriteLine("Deserialize: " + stopwatch.Elapsed.TotalSeconds);
stopwatch.Stop();
return tasks.Select(x => x.Result).ToList();
}
public static List<Hashtable> DeserializeParallelForEachOneChunk(List<byte[]> chunks)
{
chunks = new List<byte[]>() { chunks[0] };
Console.WriteLine("----- DeserializeParallelForEachOneChunk -----");
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
ConcurrentBag<Hashtable> hashtables = new ConcurrentBag<Hashtable>();
Parallel.ForEach(chunks, (chunk) =>
{
BinaryFormatter binaryFormatter = new BinaryFormatter();
using (MemoryStream memoryStream = new MemoryStream(chunk))
{
#pragma warning disable SYSLIB0011 // Type or member is obsolete
hashtables.Add((Hashtable)binaryFormatter.Deserialize(memoryStream));
#pragma warning restore SYSLIB0011 // Type or member is obsolete
}
});
Console.WriteLine("Deserialize: " + stopwatch.Elapsed.TotalSeconds);
stopwatch.Stop();
return hashtables.ToList();
}
public static List<Hashtable> DeserializeParallelForEach(List<byte[]> chunks)
{
Console.WriteLine("----- DeserializeParallelForEach -----");
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
ConcurrentBag<Hashtable> hashtables = new ConcurrentBag<Hashtable>();
Parallel.ForEach(chunks, (chunk) =>
{
BinaryFormatter binaryFormatter = new BinaryFormatter();
using (MemoryStream memoryStream = new MemoryStream(chunk))
{
#pragma warning disable SYSLIB0011 // Type or member is obsolete
hashtables.Add((Hashtable)binaryFormatter.Deserialize(memoryStream));
#pragma warning restore SYSLIB0011 // Type or member is obsolete
}
});
Console.WriteLine("Deserialize: " + stopwatch.Elapsed.TotalSeconds);
stopwatch.Stop();
return hashtables.ToList();
}
public static void Export(List<Hashtable> chunks, string directory)
{
Console.WriteLine("Exporting chunks");
foreach(Hashtable chunk in chunks)
{
BinaryFormatter binaryFormatter = new BinaryFormatter();
using (FileStream fileStream = new FileStream(directory + "\\chunk-" + Guid.NewGuid() + ".dat", FileMode.Create))
{
using(BinaryWriter binaryWriter = new BinaryWriter(fileStream))
{
#pragma warning disable SYSLIB0011 // Type or member is obsolete
binaryFormatter.Serialize(binaryWriter.BaseStream, chunk);
#pragma warning restore SYSLIB0011 // Type or member is obsolete
}
}
}
}
public static List<Hashtable> ChunkHashtable(Hashtable hashtable, int numberOfChunks)
{
Console.WriteLine("Chunking hashtable");
int chunkSize = hashtable.Count / numberOfChunks;
int remainder = hashtable.Count % numberOfChunks;
var chunks = new List<Hashtable>(numberOfChunks);
IDictionaryEnumerator enumerator = hashtable.GetEnumerator();
int elementsAdded = 0;
for (int i = 0; i < numberOfChunks; i++)
{
var chunk = new Hashtable();
chunks.Add(chunk);
int chunkSizeWithRemainder = chunkSize + (i < remainder ? 1 : 0);
for (int j = 0; j < chunkSizeWithRemainder && elementsAdded < hashtable.Count; j++)
{
enumerator.MoveNext();
chunk.Add(enumerator.Key, enumerator.Value);
elementsAdded++;
}
}
return chunks;
}
public static Hashtable GenerateHashtable(int length)
{
Console.WriteLine("Generating hashtable");
Hashtable hashtable = new Hashtable();
for (int i = 0; i < length; i++)
{
hashtable.Add(Guid.NewGuid().ToString() + i, Guid.NewGuid().ToString());
}
return hashtable;
}
}
}
Result: Hashtable + BinaryFormatter
Read: 0,0298205
----- DeserializeParallelForEach -----
Deserialize: 0,5175991
----- DeserializeParallelForEachOneChunk -----
Deserialize: 0,0809796
----- DeserializeTasks -----
Deserialize: 0,4418913
----- DeserializeTasksOneChunk -----
Deserialize: 0,193335
----- DeserializeThreads -----
Deserialize: 0,3709102
----- DeserializeThreadsOneChunk -----
Deserialize: 0,0767091
What I'm expecting is it should take the same amount of time to deserialize all chunks as one chunk.
Note: I know that BinaryFormatter shouldn't be used anymore but for my case it's fine.
Note2: MessagePack is even slower than BinaryFormatter and thus not included in my tests.
Update: Result for Dictionary<string,string> + BinaryFormatter. A lot slower.
----- DeserializeParallelForEach -----
Deserialize: 1,0663033
----- DeserializeParallelForEachOneChunk -----
Deserialize: 0,2345796
----- DeserializeTasks -----
Deserialize: 1,2000186
----- DeserializeTasksOneChunk -----
Deserialize: 0,2540234
----- DeserializeThreads -----
Deserialize: 1,2881502
----- DeserializeThreadsOneChunk -----
Deserialize: 0,196688
Update 2: Result for Dictionary<string,string> + Protobuf. A lot faster
than Dictionary<string,string> + BinaryFormatter and Hashtable + BinaryFormatter but still not as fast as expected.
----- DeserializeThreadsProtobuf -----
Deserialize: 0,2851767
----- DeserializeThreadsOneChunkProtobuf -----
Deserialize: 0,0720033
----- DeserializeParallelForEachProtobuf -----
Deserialize: 0,2275362
----- DeserializeParallelForEachOneChunkProtobuf -----
Deserialize: 0,0479956
----- DeserializeTasksProtobuf -----
Deserialize: 0,2434543
----- DeserializeTasksOneChunkProtobuf -----
Deserialize: 0,0314671