Written by 11:16 Basics of C#, Languages & Coding

How to Use Signals in C#

 

Currently, the thread synchronization in С# causes some difficulties, in particular, when passing synchronization primitives between the objects of your application and supporting them in the future.

The current model with Task and IAsyncResult, as well as with TPL, solve all issues through a proper design. However, I would like to create a simple class that allows sending and receiving signals with a thread lock.

The interface is as follows:

public interface ISignal<T> : IDisposable
	{
		void Send(T signal);

		T Receive();

		T Receive(int timeOut);
	}

Where T is an entity to be passed to a receiver.

Here is the example of the call:

[TestMethod]
		public void ExampleTest()
		{
			var signal = SignalFactory.GetInstanse<string>();
			var task1 = Task.Factory.StartNew(() => // thread start
			{
				Thread.Sleep(1000);
				signal.Send("Some message");
			});
			// blocking the current thread
			string message = signal.Receive();
			Debug.WriteLine(message);
		}

To receive a signal of the object, we will create a factory:

public static class SignalFactory 
	{
		public static ISignal<T> GetInstanse<T>()
		{
			return new Signal<T>();
		}

		public static ISignal<T> GetInstanse<T>(string name)
		{
			return new CrossProcessSignal<T>(name);
		}
	}

Signal is an internal class to synchronize within a single process. A reference to the object is required for synchronization.

CrossProcessSignal is an internal class that can synchronize threads in separate processes.

Signal Implementation

The first thing that comes to my mind is to block the thread execution in Receive using Semaphore and  to call Release() of this semaphore with the amount of blocked threads in the Send method. After unlocking threads, it is necessary to return the result from the field of the T buffer class. However, we do not know of  how many threads will hang in Receive, thus, we cannot guarantee that another couple of threads will not be added to the Release call.

We chose AutoResetEvent as a synchronization primitive. For each new thread, we will create its custom AutoResetEvent and store it in Dictionary<int,AutoResetEvent>, where the key is the thread ID.

The class fields look like this:

private T buffer;

Dictionary<int,AutoResetEvent> events = new Dictionary<int, AutoResetEvent>();

private volatile object sync = new object();

private bool isDisposabled = false;

We will need to use the sync object when calling Send so that several threads do not overwrite the buffer.

The isDisposabled flag defines whether Dispose() was called. If not called, then we call it in the destructor.

public void Dispose()
{
	foreach(var resetEvent in events.Values)
	{
		resetEvent.Dispose();
	}
	isDisposabled = true;
}
~Signal()
{
	if (!isDisposabled)
	{
		Dispose();
	}
}

Now, let’s talk about the Receive method.

public T Receive()
		{
			var waiter = GetEvents();
			waiter.WaitOne();
			waiter.Reset();
			return buffer;
		}

GetEvents() retrieves AutoResetEvent, if any, from the dictionary. If not, then it creates a new one and puts it into the dictionary.

waiter.WaitOne() is a thread lockup before the signal waiting.

waiter.Reset() resets the current state of AutoResetEvent. The next WaitOne call will lock up the thread.

Now, we need to call the Set method for each AutoResetEvent.

public void Send(T signal)
{
	lock (sync)
	{
		buffer = signal;
		foreach(var autoResetEvent in events.Values)
		{
			autoResetEvent.Set();
		}
	}
}

We can test this model using the following script:

private void SendTest(string name = "")
{
	ISignal<string> signal;
	if (string.IsNullOrEmpty(name))
	{
		 signal = SignalFactory.GetInstanse<string>(); // create a local signal
	}
	else
	{
		signal = SignalFactory.GetInstanse<string>(name);
	}

	var task1 = Task.Factory.StartNew(() => // thread start
	{
		for (int i = 0; i < 10; i++)
		{
			// thread lockup, signal waiting
			var message = signal.Receive();
			Debug.WriteLine($"Thread 1 {message}");
		}
		});
	var task2 = Task.Factory.StartNew(() => // thread start
	{
		for (int i = 0; i < 10; i++)
		{
			// thread lockup, signal waiting
			var message = signal.Receive();
			Debug.WriteLine($"Thread 2 {message}");
		}
	});

	for (int i = 0; i < 10; i++)
	{
		// sending a signal to waiting threads.
		signal.Send($"Ping {i}");
		Thread.Sleep(50);
	}

}
using System.Collections.Generic;
using System.Threading;

namespace Signal
{
	internal class Signal<T> : ISignal<T>
	{
		private T buffer;

		Dictionary<int,AutoResetEvent> events = new Dictionary<int, AutoResetEvent>();

		private volatile object sync = new object();

		private bool isDisposabled = false;

		~Signal()
		{
			if (!isDisposabled)
			{
				Dispose();
			}
		}

		public T Receive()
		{
			var waiter = GetEvents();
			waiter.WaitOne();
			waiter.Reset();
			return buffer;
		}

		public T Receive(int timeOut)
		{
			var waiter = GetEvents();
			waiter.WaitOne(timeOut);
			waiter.Reset();
			return buffer;
		}

		public void Send(T signal)
		{
			lock (sync)
			{
				buffer = signal;
				foreach(var autoResetEvent in events.Values)
				{
					autoResetEvent.Set();
				}
			}
		}

		private AutoResetEvent GetEvents()
		{
			var threadId = Thread.CurrentThread.ManagedThreadId;
			AutoResetEvent autoResetEvent;
			if (!events.ContainsKey(threadId))
			{
				autoResetEvent = new AutoResetEvent(false);
				events.Add(threadId, autoResetEvent);
			}
			else
			{
				autoResetEvent = events[threadId];
			}
			return autoResetEvent;
		}

		public void Dispose()
		{
			foreach(var resetEvent in events.Values)
			{
				resetEvent.Dispose();
			}
			isDisposabled = true;
		}
	}
}

This implementation requires a lot of rework in terms of reliability. The source code includes an inter-process implementation of this idea with passing the signal through shared memory.

Sources on GitHub

Tags: Last modified: September 23, 2021
Close