Parallel Counters

Suppose you are writing concurrent structure (like thread safe hash map) and you need to update and query number of its elements. Or you must count events that appears concurrently in your system (like tracking number of function calls). Sounds easy but concurrent nature of problem complicates it a little bit. How to write thread-safe counters that will be most efficient?
Let’s see what options do we have:

  1. Use locks to achieve thread safety
  2. User atomic operations to achieve thread safety
  3. Give each thread separate counter and sum all counters to compute final value

Everyone knows that locks are slow and won’t be best solution for such simple problem. So first option goes down.
Atomic operations are easy to implement and should be faster than heavy locks. Most people will choose this option.
But what with option number 3? It requires more effort (because we have to keep data per-thread ) but does not require synchronization at all! Can we gain additional performance choosing this option?

Three Counters
Let’s implement each option and compare performance results. Code for three counters are pretty straightforward. I will use OpenMP to simplify implementation even more.
Option 1:

class CLockedEventCounter
{
public:
	CLockedEventCounter() : m_Count(0) { }

	void Event()
	{
		#pragma omp critical (countLock) 
		m_Count++;
	}

	uint64_t GetCount() const
	{
		uint64_t res;
		#pragma omp critical (countLock) 
		res = m_Count;
		return res;
	}

private:
	uint64_t m_Count;
};

Option 2:

class CInterlockedEventCounter
{
public:
	CInterlockedEventCounter() : m_Count(0) { }

	void Event()
	{
		_InterlockedIncrement(&m_Count);
	}

	uint64_t GetCount() const
	{
		return m_Count;
	}

private:
	volatile uint64_t m_Count;
};

Option 3:

class CParallelEventCounter
{
public:
	CParallelEventCounter(uint32_t count)
	{
		m_Count.resize(count);
	}

	void Event(size_t id)
	{
		m_Count[id].count++;
	}

	uint64_t GetCount() const
	{
		uint64_t res = 0;
		for(size_t i = 0; i < m_Count.size(); ++i)
			res += m_Count[i].count;
		return res;
	}
private:
	struct Counter
	{
		Counter() : count(0) { }
		uint64_t count;
		//Pad to avoid false sharing
		char pad[512];
	};
	std::vector<counter> m_Count;
};

In Parallel Counter each thread access only its data so there is no need for synchronization or atomic operations. But will it improve performance?

Performance Test
In test I will simulate parallel events and call Event() function from multiple threads. We will see what time does it take to count 300 000 000 of generated events. We will also vary number of threads to see what scalability each solution has.
At first, comparison between Interlocked Counter and Parallel Counter:

out1

With no doubt Parallel Counter wins. It is about 12 times faster than Interlocked Counter. It also scales much better with growing number of threads.
To get the whole picture let’s put see what performance has lock based counter (represented by red line):

out2

As you can see, this solution it is real performance killer – it is about 80x slower then previous methods!
Full code of benchmark you can read here:

#include "stdafx.h"
#include <omp.h>
#include <vector>
#include <string>
#include <cstdint>
#include <chrono>
#include <Windows.h>

class CLockedEventCounter
{
public:
	CLockedEventCounter() : m_Count(0) { }

	void Event()
	{
		#pragma omp critical (countLock) 
		m_Count++;
	}

	uint64_t GetCount() const
	{
		uint64_t res;
		#pragma omp critical (countLock) 
		res = m_Count;
		return res;
	}

private:
	uint64_t m_Count;
};

class CInterlockedEventCounter
{
public:
	CInterlockedEventCounter() : m_Count(0) { }

	void Event()
	{
		_InterlockedIncrement(&m_Count);
	}

	uint64_t GetCount() const
	{
		return m_Count;
	}

private:
	volatile uint64_t m_Count;
};

class CParallelEventCounter
{
public:
	CParallelEventCounter(uint32_t count)
	{
		m_Count.resize(count);
	}

	void Event(size_t id)
	{
		m_Count[id].count++;
	}

	uint64_t GetCount() const
	{
		uint64_t res = 0;
		for(size_t i = 0; i < m_Count.size(); ++i)
			res += m_Count[i].count;
		return res;
	}
private:
	struct Counter
	{
		Counter() : count(0) { }
		uint64_t count;
		//Pad to avoid false sharing
		char pad[512];
	};
	std::vector<Counter> m_Count;
};

int main(int argc, char* argv[])
{
	if(argc < 4)
	{
		fprintf(stderr, "To small number of args\n");
		return -1;
	}

	uint32_t threadNum = std::stoul(argv[1]);
	int64_t countTo    = std::stoll(argv[2]);
	uint32_t algo      = std::stoul(argv[3]);


	CLockedEventCounter lockedEventCounter;
	CInterlockedEventCounter interlockedEventCounter;
	CParallelEventCounter parallelEventCounter(threadNum);

	
	typedef std::chrono::high_resolution_clock clock;
	typedef std::chrono::milliseconds milliseconds;

	clock::time_point t0 = clock::now();

	#pragma omp parallel for num_threads(threadNum)
	for(int64_t i = 0; i < countTo; ++i)
	{
		if(algo == 0) lockedEventCounter.Event();
		else if(algo == 1) interlockedEventCounter.Event();
		else if(algo == 2) parallelEventCounter.Event(omp_get_thread_num());
	}
	
	uint64_t count = 0;
	if(algo == 0) count = lockedEventCounter.GetCount();
	else if(algo == 1) count = interlockedEventCounter.GetCount();
	else if(algo == 2) count = parallelEventCounter.GetCount();

	clock::time_point t1 = clock::now();
	milliseconds totalms = std::chrono::duration_cast<milliseconds>(t1 - t0);

	if(count != countTo)
	{
		fprintf(stderr, "counter mismatch (%llx) != (%llx)\n", count, countTo);
		return -1;
	}

	printf("%u,%llu\n", threadNum, totalms);
	return 0;
}

Real world case
But this is only benchmark. Let’s see how counting is done in some real life systems that has to deal with issue. Good example is Wireshark – it has to count in realtime huge amount of incoming and outgoing packets that floods you network card. In Windows version, counters are implemented in WinPcap driver. Creators of WinPcap also choosed parallel counters – but with neat modification not available in user mode.
In Windows Kernel there is a possibility to preempt thread scheduler and make sure that execution of your code won’t be rescheduled to different processor. This is done by raising IRQL to DISPATCH_LEVEL (e.g. by calling KeRaiseIrql function). With such possibility we can modify scenario of counting:

  1. When packet comes, raise IRQL to DISPATCH_LEVEL
  2. Get current Id of logical processor
  3. Increment counter for current logical processor by regular non-atomic operation
  4. Lower IRQL

To compute final value just sum over array of counters.

Snippet from WinPcap code:
Incrementation:

Cpu = KeGetCurrentProcessorNumber();
LocalData = &Open->CpuData[Cpu];
LocalData->Received++;

And computation of final score:

for(i = 0 ; i < g_NCpu ; i++)
{
	pStats[3] += Open->CpuData[i].Accepted;
	pStats[0] += Open->CpuData[i].Received;
	pStats[1] += Open->CpuData[i].Dropped;
	pStats[2] += 0;		// Not yet supported
}

Full code of WinPcap is available here: http://www.winpcap.org/devel.htm

Why so slow?
As you can see, writing to shared locations from multiple cores can drain your performance drastically. The question is: why this is so big deal? Couldn’t writes be faster? The answer lies in cache coherency protocols implemented in modern processors. Most of them (like MOESI protocol implemented in x86 family) guarantees you fast reads, but slow non-scalable writes to shared locations. Why is that so – answer lies in details of how cache coherency is achieved. But for this momement one thing should be clear – to implement fast scalable applications we will have to use such data structures and algorithms that avoids writes to shared memory regions.

Advertisements