Figure out stupid math shit

Signed-off-by: Slendi <slendi@socopon.com>
This commit is contained in:
2026-01-10 16:15:36 +02:00
parent f896ddae74
commit e0ca1f1043
475 changed files with 499637 additions and 14 deletions

View File

@@ -0,0 +1,79 @@
// Copyright (c) 2013 Doug Binks
//
// This software is provided 'as-is', without any express or implied
// warranty. In no event will the authors be held liable for any damages
// arising from the use of this software.
//
// Permission is granted to anyone to use this software for any purpose,
// including commercial applications, and to alter it and redistribute it
// freely, subject to the following restrictions:
//
// 1. The origin of this software must not be misrepresented; you must not
// claim that you wrote the original software. If you use this software
// in a product, an acknowledgement in the product documentation would be
// appreciated but is not required.
// 2. Altered source versions must be plainly marked as such, and must not be
// misrepresented as being the original software.
// 3. This notice may not be removed or altered from any source distribution.
#pragma once
#include <stdint.h>
#ifdef _WIN32
#define WIN32_LEAN_AND_MEAN
#include <Windows.h>
#undef GetObject
#include <intrin.h>
extern "C" void _ReadWriteBarrier();
#pragma intrinsic(_ReadWriteBarrier)
#pragma intrinsic(_InterlockedCompareExchange)
#pragma intrinsic(_InterlockedExchangeAdd)
// Memory Barriers to prevent CPU and Compiler re-ordering
#define BASE_MEMORYBARRIER_ACQUIRE() _ReadWriteBarrier()
#define BASE_MEMORYBARRIER_RELEASE() _ReadWriteBarrier()
#define BASE_ALIGN(x) __declspec( align( x ) )
#else
#define BASE_MEMORYBARRIER_ACQUIRE() __asm__ __volatile__("": : :"memory")
#define BASE_MEMORYBARRIER_RELEASE() __asm__ __volatile__("": : :"memory")
#define BASE_ALIGN(x) __attribute__ ((aligned( x )))
#endif
namespace enki
{
// Atomically performs: if( *pDest == compareWith ) { *pDest = swapTo; }
// returns old *pDest (so if successfull, returns compareWith)
inline uint32_t AtomicCompareAndSwap( volatile uint32_t* pDest, uint32_t swapTo, uint32_t compareWith )
{
#ifdef _WIN32
// assumes two's complement - unsigned / signed conversion leads to same bit pattern
return _InterlockedCompareExchange( (volatile long*)pDest,swapTo, compareWith );
#else
return __sync_val_compare_and_swap( pDest, compareWith, swapTo );
#endif
}
inline uint64_t AtomicCompareAndSwap( volatile uint64_t* pDest, uint64_t swapTo, uint64_t compareWith )
{
#ifdef _WIN32
// assumes two's complement - unsigned / signed conversion leads to same bit pattern
return _InterlockedCompareExchange64( (__int64 volatile*)pDest, swapTo, compareWith );
#else
return __sync_val_compare_and_swap( pDest, compareWith, swapTo );
#endif
}
// Atomically performs: tmp = *pDest; *pDest += value; return tmp;
inline int32_t AtomicAdd( volatile int32_t* pDest, int32_t value )
{
#ifdef _WIN32
return _InterlockedExchangeAdd( (long*)pDest, value );
#else
return __sync_fetch_and_add( pDest, value );
#endif
}
}

View File

@@ -0,0 +1,240 @@
// Copyright (c) 2013 Doug Binks
//
// This software is provided 'as-is', without any express or implied
// warranty. In no event will the authors be held liable for any damages
// arising from the use of this software.
//
// Permission is granted to anyone to use this software for any purpose,
// including commercial applications, and to alter it and redistribute it
// freely, subject to the following restrictions:
//
// 1. The origin of this software must not be misrepresented; you must not
// claim that you wrote the original software. If you use this software
// in a product, an acknowledgement in the product documentation would be
// appreciated but is not required.
// 2. Altered source versions must be plainly marked as such, and must not be
// misrepresented as being the original software.
// 3. This notice may not be removed or altered from any source distribution.
#pragma once
#include <stdint.h>
#include <assert.h>
#include "Atomics.h"
#include <string.h>
namespace enki
{
// LockLessMultiReadPipe - Single writer, multiple reader thread safe pipe using (semi) lockless programming
// Readers can only read from the back of the pipe
// The single writer can write to the front of the pipe, and read from both ends (a writer can be a reader)
// for many of the principles used here, see http://msdn.microsoft.com/en-us/library/windows/desktop/ee418650(v=vs.85).aspx
// Note: using log2 sizes so we do not need to clamp (multi-operation)
// T is the contained type
// Note this is not true lockless as the use of flags as a form of lock state.
template<uint8_t cSizeLog2, typename T> class LockLessMultiReadPipe
{
public:
LockLessMultiReadPipe();
~LockLessMultiReadPipe() {}
// ReaderTryReadBack returns false if we were unable to read
// This is thread safe for both multiple readers and the writer
bool ReaderTryReadBack( T* pOut );
// WriterTryReadFront returns false if we were unable to read
// This is thread safe for the single writer, but should not be called by readers
bool WriterTryReadFront( T* pOut );
// WriterTryWriteFront returns false if we were unable to write
// This is thread safe for the single writer, but should not be called by readers
bool WriterTryWriteFront( const T& in );
// IsPipeEmpty() is a utility function, not intended for general use
// Should only be used very prudently.
bool IsPipeEmpty() const
{
return 0 == m_WriteIndex - m_ReadCount;
}
void Clear()
{
m_WriteIndex = 0;
m_ReadIndex = 0;
m_ReadCount = 0;
memset( (void*)m_Flags, 0, sizeof( m_Flags ) );
}
private:
const static uint32_t ms_cSize = ( 1 << cSizeLog2 );
const static uint32_t ms_cIndexMask = ms_cSize - 1;
const static uint32_t FLAG_INVALID = 0xFFFFFFFF; // 32bit for CAS
const static uint32_t FLAG_CAN_WRITE = 0x00000000; // 32bit for CAS
const static uint32_t FLAG_CAN_READ = 0x11111111; // 32bit for CAS
T m_Buffer[ ms_cSize ];
// read and write indexes allow fast access to the pipe, but actual access
// controlled by the access flags.
volatile uint32_t BASE_ALIGN(4) m_WriteIndex;
volatile uint32_t BASE_ALIGN(4) m_ReadCount;
volatile uint32_t m_Flags[ ms_cSize ];
volatile uint32_t BASE_ALIGN(4) m_ReadIndex;
};
template<uint8_t cSizeLog2, typename T> inline
LockLessMultiReadPipe<cSizeLog2,T>::LockLessMultiReadPipe()
: m_WriteIndex(0)
, m_ReadIndex(0)
, m_ReadCount(0)
{
assert( cSizeLog2 < 32 );
memset( (void*)m_Flags, 0, sizeof( m_Flags ) );
}
template<uint8_t cSizeLog2, typename T> inline
bool LockLessMultiReadPipe<cSizeLog2,T>::ReaderTryReadBack( T* pOut )
{
uint32_t actualReadIndex;
uint32_t readCount = m_ReadCount;
// We get hold of read index for consistency,
// and do first pass starting at read count
uint32_t readIndexToUse = readCount;
while(true)
{
uint32_t writeIndex = m_WriteIndex;
// power of two sizes ensures we can use a simple calc without modulus
uint32_t numInPipe = writeIndex - readCount;
if( 0 == numInPipe )
{
return false;
}
if( readIndexToUse >= writeIndex )
{
// move back to start
readIndexToUse = m_ReadIndex;
}
// power of two sizes ensures we can perform AND for a modulus
actualReadIndex = readIndexToUse & ms_cIndexMask;
// Multiple potential readers mean we should check if the data is valid,
// using an atomic compare exchange
uint32_t previous = AtomicCompareAndSwap( &m_Flags[ actualReadIndex ], FLAG_INVALID, FLAG_CAN_READ );
if( FLAG_CAN_READ == previous )
{
break;
}
++readIndexToUse;
//update known readcount
readCount = m_ReadCount;
}
// we update the read index using an atomic add, as we've only read one piece of data.
// this ensure consistency of the read index, and the above loop ensures readers
// only read from unread data
AtomicAdd( (volatile int32_t*)&m_ReadCount, 1 );
BASE_MEMORYBARRIER_ACQUIRE();
// now read data, ensuring we do so after above reads & CAS
*pOut = m_Buffer[ actualReadIndex ];
m_Flags[ actualReadIndex ] = FLAG_CAN_WRITE;
return true;
}
template<uint8_t cSizeLog2, typename T> inline
bool LockLessMultiReadPipe<cSizeLog2,T>::WriterTryReadFront( T* pOut )
{
uint32_t writeIndex = m_WriteIndex;
uint32_t frontReadIndex = writeIndex;
// Multiple potential readers mean we should check if the data is valid,
// using an atomic compare exchange - which acts as a form of lock (so not quite lockless really).
uint32_t previous = FLAG_INVALID;
uint32_t actualReadIndex = 0;
while( true )
{
// power of two sizes ensures we can use a simple calc without modulus
uint32_t readCount = m_ReadCount;
uint32_t numInPipe = writeIndex - readCount;
if( 0 == numInPipe || 0 == frontReadIndex )
{
// frontReadIndex can get to 0 here if that item was just being read by another thread.
m_ReadIndex = readCount;
return false;
}
--frontReadIndex;
actualReadIndex = frontReadIndex & ms_cIndexMask;
previous = AtomicCompareAndSwap( &m_Flags[ actualReadIndex ], FLAG_INVALID, FLAG_CAN_READ );
if( FLAG_CAN_READ == previous )
{
break;
}
else if( m_ReadIndex >= frontReadIndex )
{
return false;
}
}
// now read data, ensuring we do so after above reads & CAS
*pOut = m_Buffer[ actualReadIndex ];
m_Flags[ actualReadIndex ] = FLAG_CAN_WRITE;
BASE_MEMORYBARRIER_RELEASE();
// 32-bit aligned stores are atomic, and writer owns the write index
// we only move one back as this is as many as we have read, not where we have read from.
--m_WriteIndex;
return true;
}
template<uint8_t cSizeLog2, typename T> inline
bool LockLessMultiReadPipe<cSizeLog2,T>::WriterTryWriteFront( const T& in )
{
// The writer 'owns' the write index, and readers can only reduce
// the amount of data in the pipe.
// We get hold of both values for consistency and to reduce false sharing
// impacting more than one access
uint32_t writeIndex = m_WriteIndex;
// power of two sizes ensures we can perform AND for a modulus
uint32_t actualWriteIndex = writeIndex & ms_cIndexMask;
// a reader may still be reading this item, as there are multiple readers
if( m_Flags[ actualWriteIndex ] != FLAG_CAN_WRITE )
{
return false; // still being read, so have caught up with tail.
}
// as we are the only writer we can update the data without atomics
// whilst the write index has not been updated
m_Buffer[ actualWriteIndex ] = in;
m_Flags[ actualWriteIndex ] = FLAG_CAN_READ;
// We need to ensure the above writes occur prior to updating the write index,
// otherwise another thread might read before it's finished
BASE_MEMORYBARRIER_RELEASE();
// 32-bit aligned stores are atomic, and the writer controls the write index
++writeIndex;
m_WriteIndex = writeIndex;
return true;
}
}

View File

@@ -0,0 +1,437 @@
// Copyright (c) 2013 Doug Binks
//
// This software is provided 'as-is', without any express or implied
// warranty. In no event will the authors be held liable for any damages
// arising from the use of this software.
//
// Permission is granted to anyone to use this software for any purpose,
// including commercial applications, and to alter it and redistribute it
// freely, subject to the following restrictions:
//
// 1. The origin of this software must not be misrepresented; you must not
// claim that you wrote the original software. If you use this software
// in a product, an acknowledgement in the product documentation would be
// appreciated but is not required.
// 2. Altered source versions must be plainly marked as such, and must not be
// misrepresented as being the original software.
// 3. This notice may not be removed or altered from any source distribution.
#include <assert.h>
#include "TaskScheduler.h"
#include "LockLessMultiReadPipe.h"
using namespace enki;
static const uint32_t PIPESIZE_LOG2 = 8;
static const uint32_t SPIN_COUNT = 100;
static const uint32_t SPIN_BACKOFF_MULTIPLIER = 10;
static const uint32_t MAX_NUM_INITIAL_PARTITIONS = 8;
// each software thread gets it's own copy of gtl_threadNum, so this is safe to use as a static variable
static THREAD_LOCAL uint32_t gtl_threadNum = 0;
namespace enki
{
struct SubTaskSet
{
ITaskSet* pTask;
TaskSetPartition partition;
};
// we derive class TaskPipe rather than typedef to get forward declaration working easily
class TaskPipe : public LockLessMultiReadPipe<PIPESIZE_LOG2,enki::SubTaskSet> {};
struct ThreadArgs
{
uint32_t threadNum;
TaskScheduler* pTaskScheduler;
};
}
namespace
{
SubTaskSet SplitTask( SubTaskSet& subTask_, uint32_t rangeToSplit_ )
{
SubTaskSet splitTask = subTask_;
uint32_t rangeLeft = subTask_.partition.end - subTask_.partition.start;
if( rangeToSplit_ > rangeLeft )
{
rangeToSplit_ = rangeLeft;
}
splitTask.partition.end = subTask_.partition.start + rangeToSplit_;
subTask_.partition.start = splitTask.partition.end;
return splitTask;
}
#if defined _WIN32
#if defined _M_IX86 || defined _M_X64
#pragma intrinsic(_mm_pause)
inline void Pause() { _mm_pause(); }
#endif
#elif defined __i386__ || defined __x86_64__
inline void Pause() { __asm__ __volatile__("pause;"); }
#else
inline void Pause() { ;} // may have NOP or yield equiv
#endif
}
static void SafeCallback(ProfilerCallbackFunc func_, uint32_t threadnum_)
{
if( func_ )
{
func_(threadnum_);
}
}
ProfilerCallbacks* TaskScheduler::GetProfilerCallbacks()
{
return &m_ProfilerCallbacks;
}
THREADFUNC_DECL TaskScheduler::TaskingThreadFunction( void* pArgs )
{
ThreadArgs args = *(ThreadArgs*)pArgs;
uint32_t threadNum = args.threadNum;
TaskScheduler* pTS = args.pTaskScheduler;
gtl_threadNum = threadNum;
SafeCallback( pTS->m_ProfilerCallbacks.threadStart, threadNum );
uint32_t spinCount = 0;
uint32_t hintPipeToCheck_io = threadNum + 1; // does not need to be clamped.
while( pTS->m_bRunning )
{
if(!pTS->TryRunTask( threadNum, hintPipeToCheck_io ) )
{
// no tasks, will spin then wait
++spinCount;
if( spinCount > SPIN_COUNT )
{
pTS->WaitForTasks( threadNum );
spinCount = 0;
}
else
{
uint32_t spinBackoffCount = spinCount * SPIN_BACKOFF_MULTIPLIER;
while( spinBackoffCount )
{
Pause();
--spinBackoffCount;
}
}
}
else
{
spinCount = 0;
}
}
AtomicAdd( &pTS->m_NumThreadsRunning, -1 );
SafeCallback( pTS->m_ProfilerCallbacks.threadStop, threadNum );
return 0;
}
void TaskScheduler::StartThreads()
{
if( m_bHaveThreads )
{
return;
}
m_bRunning = true;
SemaphoreCreate( m_NewTaskSemaphore );
// we create one less thread than m_NumThreads as the main thread counts as one
m_pThreadNumStore = new ThreadArgs[m_NumThreads];
m_pThreadIDs = new threadid_t[m_NumThreads];
m_pThreadNumStore[0].threadNum = 0;
m_pThreadNumStore[0].pTaskScheduler = this;
m_pThreadIDs[0] = 0;
m_NumThreadsWaiting = 0;
m_NumThreadsRunning = 1;// acount for main thread
for( uint32_t thread = 1; thread < m_NumThreads; ++thread )
{
m_pThreadNumStore[thread].threadNum = thread;
m_pThreadNumStore[thread].pTaskScheduler = this;
ThreadCreate( &m_pThreadIDs[thread], TaskingThreadFunction, &m_pThreadNumStore[thread] );
++m_NumThreadsRunning;
}
// ensure we have sufficient tasks to equally fill either all threads including main
// or just the threads we've launched, this is outside the firstinit as we want to be able
// to runtime change it
if( 1 == m_NumThreads )
{
m_NumPartitions = 1;
m_NumInitialPartitions = 1;
}
else
{
m_NumPartitions = m_NumThreads * (m_NumThreads - 1);
m_NumInitialPartitions = m_NumThreads - 1;
if( m_NumInitialPartitions > MAX_NUM_INITIAL_PARTITIONS )
{
m_NumInitialPartitions = MAX_NUM_INITIAL_PARTITIONS;
}
}
m_bHaveThreads = true;
}
void TaskScheduler::StopThreads( bool bWait_ )
{
if( m_bHaveThreads )
{
// wait for them threads quit before deleting data
m_bRunning = false;
while( bWait_ && m_NumThreadsRunning > 1 )
{
// keep firing event to ensure all threads pick up state of m_bRunning
SemaphoreSignal( m_NewTaskSemaphore, m_NumThreadsRunning );
}
for( uint32_t thread = 1; thread < m_NumThreads; ++thread )
{
ThreadTerminate( m_pThreadIDs[thread] );
}
m_NumThreads = 0;
delete[] m_pThreadNumStore;
delete[] m_pThreadIDs;
m_pThreadNumStore = 0;
m_pThreadIDs = 0;
SemaphoreClose( m_NewTaskSemaphore );
m_bHaveThreads = false;
m_NumThreadsWaiting = 0;
m_NumThreadsRunning = 0;
}
}
bool TaskScheduler::TryRunTask( uint32_t threadNum, uint32_t& hintPipeToCheck_io_ )
{
// check for tasks
SubTaskSet subTask;
bool bHaveTask = m_pPipesPerThread[ threadNum ].WriterTryReadFront( &subTask );
uint32_t threadToCheck = hintPipeToCheck_io_;
uint32_t checkCount = 0;
while( !bHaveTask && checkCount < m_NumThreads )
{
threadToCheck = ( hintPipeToCheck_io_ + checkCount ) % m_NumThreads;
if( threadToCheck != threadNum )
{
bHaveTask = m_pPipesPerThread[ threadToCheck ].ReaderTryReadBack( &subTask );
}
++checkCount;
}
if( bHaveTask )
{
// update hint, will preserve value unless actually got task from another thread.
hintPipeToCheck_io_ = threadToCheck;
uint32_t partitionSize = subTask.partition.end - subTask.partition.start;
if( subTask.pTask->m_RangeToRun < partitionSize )
{
SubTaskSet taskToRun = SplitTask( subTask, subTask.pTask->m_RangeToRun );
SplitAndAddTask( gtl_threadNum, subTask, subTask.pTask->m_RangeToRun, 0 );
taskToRun.pTask->ExecuteRange( taskToRun.partition, threadNum );
AtomicAdd( &taskToRun.pTask->m_RunningCount, -1 );
}
else
{
// the task has already been divided up by AddTaskSetToPipe, so just run it
subTask.pTask->ExecuteRange( subTask.partition, threadNum );
AtomicAdd( &subTask.pTask->m_RunningCount, -1 );
}
}
return bHaveTask;
}
void TaskScheduler::WaitForTasks( uint32_t threadNum )
{
// We incrememt the number of threads waiting here in order
// to ensure that the check for tasks occurs after the increment
// to prevent a task being added after a check, then the thread waiting.
// This will occasionally result in threads being mistakenly awoken,
// but they will then go back to sleep.
AtomicAdd( &m_NumThreadsWaiting, 1 );
bool bHaveTasks = false;
for( uint32_t thread = 0; thread < m_NumThreads; ++thread )
{
if( !m_pPipesPerThread[ thread ].IsPipeEmpty() )
{
bHaveTasks = true;
break;
}
}
if( !bHaveTasks )
{
SafeCallback( m_ProfilerCallbacks.waitStart, threadNum );
SemaphoreWait( m_NewTaskSemaphore );
SafeCallback( m_ProfilerCallbacks.waitStop, threadNum );
}
int32_t prev = AtomicAdd( &m_NumThreadsWaiting, -1 );
assert( prev != 0 );
}
void TaskScheduler::WakeThreads()
{
SemaphoreSignal( m_NewTaskSemaphore, m_NumThreadsWaiting );
}
void TaskScheduler::SplitAndAddTask( uint32_t threadNum_, SubTaskSet subTask_,
uint32_t rangeToSplit_, int32_t runningCountOffset_ )
{
int32_t numAdded = 0;
while( subTask_.partition.start != subTask_.partition.end )
{
SubTaskSet taskToAdd = SplitTask( subTask_, rangeToSplit_ );
// add the partition to the pipe
++numAdded;
if( !m_pPipesPerThread[ gtl_threadNum ].WriterTryWriteFront( taskToAdd ) )
{
if( numAdded > 1 )
{
WakeThreads();
}
// alter range to run the appropriate fraction
if( taskToAdd.pTask->m_RangeToRun < rangeToSplit_ )
{
taskToAdd.partition.end = taskToAdd.partition.start + taskToAdd.pTask->m_RangeToRun;
subTask_.partition.start = taskToAdd.partition.end;
}
taskToAdd.pTask->ExecuteRange( taskToAdd.partition, threadNum_ );
--numAdded;
}
}
// increment running count by number added
AtomicAdd( &subTask_.pTask->m_RunningCount, numAdded + runningCountOffset_ );
WakeThreads();
}
void TaskScheduler::AddTaskSetToPipe( ITaskSet* pTaskSet )
{
// set running count to -1 to guarantee it won't be found complete until all subtasks added
pTaskSet->m_RunningCount = -1;
// divide task up and add to pipe
pTaskSet->m_RangeToRun = pTaskSet->m_SetSize / m_NumPartitions;
if( pTaskSet->m_RangeToRun < pTaskSet->m_MinRange ) { pTaskSet->m_RangeToRun = pTaskSet->m_MinRange; }
uint32_t rangeToSplit = pTaskSet->m_SetSize / m_NumInitialPartitions;
if( rangeToSplit < pTaskSet->m_MinRange ) { rangeToSplit = pTaskSet->m_MinRange; }
SubTaskSet subTask;
subTask.pTask = pTaskSet;
subTask.partition.start = 0;
subTask.partition.end = pTaskSet->m_SetSize;
SplitAndAddTask( gtl_threadNum, subTask, rangeToSplit, 1 );
}
void TaskScheduler::WaitforTaskSet( const ITaskSet* pTaskSet )
{
uint32_t hintPipeToCheck_io = gtl_threadNum + 1; // does not need to be clamped.
if( pTaskSet )
{
while( pTaskSet->m_RunningCount )
{
TryRunTask( gtl_threadNum, hintPipeToCheck_io );
// should add a spin then wait for task completion event.
}
}
else
{
TryRunTask( gtl_threadNum, hintPipeToCheck_io );
}
}
void TaskScheduler::WaitforAll()
{
bool bHaveTasks = true;
uint32_t hintPipeToCheck_io = gtl_threadNum + 1; // does not need to be clamped.
int32_t threadsRunning = m_NumThreadsRunning - 1;
while( bHaveTasks || m_NumThreadsWaiting < threadsRunning )
{
TryRunTask( gtl_threadNum, hintPipeToCheck_io );
bHaveTasks = false;
for( uint32_t thread = 0; thread < m_NumThreads; ++thread )
{
if( !m_pPipesPerThread[ thread ].IsPipeEmpty() )
{
bHaveTasks = true;
break;
}
}
}
}
void TaskScheduler::WaitforAllAndShutdown()
{
WaitforAll();
StopThreads(true);
delete[] m_pPipesPerThread;
m_pPipesPerThread = 0;
}
uint32_t TaskScheduler::GetNumTaskThreads() const
{
return m_NumThreads;
}
TaskScheduler::TaskScheduler()
: m_pPipesPerThread(NULL)
, m_NumThreads(0)
, m_pThreadNumStore(NULL)
, m_pThreadIDs(NULL)
, m_bRunning(false)
, m_NumThreadsRunning(0)
, m_NumThreadsWaiting(0)
, m_NumPartitions(0)
, m_bHaveThreads(false)
{
memset(&m_ProfilerCallbacks, 0, sizeof(m_ProfilerCallbacks));
}
TaskScheduler::~TaskScheduler()
{
StopThreads( true ); // Stops threads, waiting for them.
delete[] m_pPipesPerThread;
m_pPipesPerThread = 0;
}
void TaskScheduler::Initialize( uint32_t numThreads_ )
{
assert( numThreads_ );
StopThreads( true ); // Stops threads, waiting for them.
delete[] m_pPipesPerThread;
m_NumThreads = numThreads_;
m_pPipesPerThread = new TaskPipe[ m_NumThreads ];
StartThreads();
}
void TaskScheduler::Initialize()
{
Initialize( GetNumHardwareThreads() );
}

View File

@@ -0,0 +1,177 @@
// Copyright (c) 2013 Doug Binks
//
// This software is provided 'as-is', without any express or implied
// warranty. In no event will the authors be held liable for any damages
// arising from the use of this software.
//
// Permission is granted to anyone to use this software for any purpose,
// including commercial applications, and to alter it and redistribute it
// freely, subject to the following restrictions:
//
// 1. The origin of this software must not be misrepresented; you must not
// claim that you wrote the original software. If you use this software
// in a product, an acknowledgement in the product documentation would be
// appreciated but is not required.
// 2. Altered source versions must be plainly marked as such, and must not be
// misrepresented as being the original software.
// 3. This notice may not be removed or altered from any source distribution.
#pragma once
#include <stdint.h>
#include "Threads.h"
namespace enki
{
struct TaskSetPartition
{
uint32_t start;
uint32_t end;
};
class TaskScheduler;
class TaskPipe;
struct ThreadArgs;
struct SubTaskSet;
// Subclass ITaskSet to create tasks.
// TaskSets can be re-used, but check
class ITaskSet
{
public:
ITaskSet()
: m_SetSize(1)
, m_MinRange(1)
, m_RunningCount(0)
, m_RangeToRun(1)
{}
ITaskSet( uint32_t setSize_ )
: m_SetSize( setSize_ )
, m_MinRange(1)
, m_RunningCount(0)
, m_RangeToRun(1)
{}
ITaskSet( uint32_t setSize_, uint32_t minRange_ )
: m_SetSize( setSize_ )
, m_MinRange( minRange_ )
, m_RunningCount(0)
, m_RangeToRun(minRange_)
{}
// Execute range should be overloaded to process tasks. It will be called with a
// range_ where range.start >= 0; range.start < range.end; and range.end < m_SetSize;
// The range values should be mapped so that linearly processing them in order is cache friendly
// i.e. neighbouring values should be close together.
// threadnum should not be used for changing processing of data, it's intended purpose
// is to allow per-thread data buckets for output.
virtual void ExecuteRange( TaskSetPartition range, uint32_t threadnum ) = 0;
// Size of set - usually the number of data items to be processed, see ExecuteRange. Defaults to 1
uint32_t m_SetSize;
// Minimum size of of TaskSetPartition range when splitting a task set into partitions.
// This should be set to a value which results in computation effort of at least 10k
// clock cycles to minimize tast scheduler overhead.
// NOTE: The last partition will be smaller than m_MinRange if m_SetSize is not a multiple
// of m_MinRange.
// Also known as grain size in literature.
uint32_t m_MinRange;
bool GetIsComplete()
{
return 0 == m_RunningCount;
}
private:
friend class TaskScheduler;
volatile int32_t m_RunningCount;
uint32_t m_RangeToRun;
};
// TaskScheduler implements several callbacks intended for profilers
typedef void (*ProfilerCallbackFunc)( uint32_t threadnum_ );
struct ProfilerCallbacks
{
ProfilerCallbackFunc threadStart;
ProfilerCallbackFunc threadStop;
ProfilerCallbackFunc waitStart;
ProfilerCallbackFunc waitStop;
};
class TaskScheduler
{
public:
TaskScheduler();
~TaskScheduler();
// Call either Initialize() or Initialize( numThreads_ ) before adding tasks.
// Initialize() will create GetNumHardwareThreads()-1 threads, which is
// sufficient to fill the system when including the main thread.
// Initialize can be called multiple times - it will wait for completion
// before re-initializing.
void Initialize();
// Initialize( numThreads_ ) - numThreads_ (must be > 0)
// will create numThreads_-1 threads, as thread 0 is
// the thread on which the initialize was called.
void Initialize( uint32_t numThreads_ );
// Adds the TaskSet to pipe and returns if the pipe is not full.
// If the pipe is full, pTaskSet is run.
// should only be called from main thread, or within a task
void AddTaskSetToPipe( ITaskSet* pTaskSet );
// Runs the TaskSets in pipe until true == pTaskSet->GetIsComplete();
// should only be called from thread which created the taskscheduler , or within a task
// if called with 0 it will try to run tasks, and return if none available.
void WaitforTaskSet( const ITaskSet* pTaskSet );
// Waits for all task sets to complete - not guaranteed to work unless we know we
// are in a situation where tasks aren't being continuosly added.
void WaitforAll();
// Waits for all task sets to complete and shutdown threads - not guaranteed to work unless we know we
// are in a situation where tasks aren't being continuosly added.
void WaitforAllAndShutdown();
// Returns the number of threads created for running tasks + 1
// to account for the main thread.
uint32_t GetNumTaskThreads() const;
// Returns the ProfilerCallbacks structure so that it can be modified to
// set the callbacks.
ProfilerCallbacks* GetProfilerCallbacks();
private:
static THREADFUNC_DECL TaskingThreadFunction( void* pArgs );
void WaitForTasks( uint32_t threadNum );
bool TryRunTask( uint32_t threadNum, uint32_t& hintPipeToCheck_io_ );
void StartThreads();
void StopThreads( bool bWait_ );
void SplitAndAddTask( uint32_t threadNum_, SubTaskSet subTask_,
uint32_t rangeToSplit_, int32_t runningCountOffset_ );
void WakeThreads();
TaskPipe* m_pPipesPerThread;
uint32_t m_NumThreads;
ThreadArgs* m_pThreadNumStore;
threadid_t* m_pThreadIDs;
volatile bool m_bRunning;
volatile int32_t m_NumThreadsRunning;
volatile int32_t m_NumThreadsWaiting;
uint32_t m_NumPartitions;
uint32_t m_NumInitialPartitions;
semaphoreid_t m_NewTaskSemaphore;
bool m_bHaveThreads;
ProfilerCallbacks m_ProfilerCallbacks;
TaskScheduler( const TaskScheduler& nocopy );
TaskScheduler& operator=( const TaskScheduler& nocopy );
};
}

View File

@@ -0,0 +1,122 @@
// Copyright (c) 2013 Doug Binks
//
// This software is provided 'as-is', without any express or implied
// warranty. In no event will the authors be held liable for any damages
// arising from the use of this software.
//
// Permission is granted to anyone to use this software for any purpose,
// including commercial applications, and to alter it and redistribute it
// freely, subject to the following restrictions:
//
// 1. The origin of this software must not be misrepresented; you must not
// claim that you wrote the original software. If you use this software
// in a product, an acknowledgement in the product documentation would be
// appreciated but is not required.
// 2. Altered source versions must be plainly marked as such, and must not be
// misrepresented as being the original software.
// 3. This notice may not be removed or altered from any source distribution.
#include "TaskScheduler_c.h"
#include "TaskScheduler.h"
#include <assert.h>
using namespace enki;
struct enkiTaskScheduler : TaskScheduler
{
};
struct enkiTaskSet : ITaskSet
{
enkiTaskSet( enkiTaskExecuteRange taskFun_ ) : taskFun(taskFun_), pArgs(NULL) {}
virtual void ExecuteRange( TaskSetPartition range, uint32_t threadnum )
{
taskFun( range.start, range.end, threadnum, pArgs );
}
enkiTaskExecuteRange taskFun;
void* pArgs;
};
enkiTaskScheduler* enkiNewTaskScheduler()
{
enkiTaskScheduler* pETS = new enkiTaskScheduler();
return pETS;
}
void enkiInitTaskScheduler( enkiTaskScheduler* pETS_ )
{
pETS_->Initialize();
}
void enkiInitTaskSchedulerNumThreads( enkiTaskScheduler* pETS_, uint32_t numThreads_ )
{
pETS_->Initialize( numThreads_ );
}
void enkiDeleteTaskScheduler( enkiTaskScheduler* pETS_ )
{
delete pETS_;
}
enkiTaskSet* enkiCreateTaskSet( enkiTaskScheduler* pETS_, enkiTaskExecuteRange taskFunc_ )
{
return new enkiTaskSet( taskFunc_ );
}
void enkiDeleteTaskSet( enkiTaskSet* pTaskSet_ )
{
delete pTaskSet_;
}
void enkiAddTaskSetToPipe( enkiTaskScheduler* pETS_, enkiTaskSet* pTaskSet_, void* pArgs_, uint32_t setSize_ )
{
assert( pTaskSet_ );
assert( pTaskSet_->taskFun );
pTaskSet_->m_SetSize = setSize_;
pTaskSet_->pArgs = pArgs_;
pETS_->AddTaskSetToPipe( pTaskSet_ );
}
void enkiAddTaskSetToPipeMinRange(enkiTaskScheduler * pETS_, enkiTaskSet * pTaskSet_, void * pArgs_, uint32_t setSize_, uint32_t minRange_)
{
assert( pTaskSet_ );
assert( pTaskSet_->taskFun );
pTaskSet_->m_SetSize = setSize_;
pTaskSet_->m_MinRange = minRange_;
pTaskSet_->pArgs = pArgs_;
pETS_->AddTaskSetToPipe( pTaskSet_ );
}
int enkiIsTaskSetComplete( enkiTaskScheduler* pETS_, enkiTaskSet* pTaskSet_ )
{
assert( pTaskSet_ );
return ( pTaskSet_->GetIsComplete() ) ? 1 : 0;
}
void enkiWaitForTaskSet( enkiTaskScheduler* pETS_, enkiTaskSet* pTaskSet_ )
{
pETS_->WaitforTaskSet( pTaskSet_ );
}
void enkiWaitForAll( enkiTaskScheduler* pETS_ )
{
pETS_->WaitforAll();
}
uint32_t enkiGetNumTaskThreads( enkiTaskScheduler* pETS_ )
{
return pETS_->GetNumTaskThreads();
}
enkiProfilerCallbacks* enkiGetProfilerCallbacks( enkiTaskScheduler* pETS_ )
{
assert( sizeof(enkiProfilerCallbacks) == sizeof(enki::ProfilerCallbacks) );
return (enkiProfilerCallbacks*)pETS_->GetProfilerCallbacks();
}

View File

@@ -0,0 +1,104 @@
// Copyright (c) 2013 Doug Binks
//
// This software is provided 'as-is', without any express or implied
// warranty. In no event will the authors be held liable for any damages
// arising from the use of this software.
//
// Permission is granted to anyone to use this software for any purpose,
// including commercial applications, and to alter it and redistribute it
// freely, subject to the following restrictions:
//
// 1. The origin of this software must not be misrepresented; you must not
// claim that you wrote the original software. If you use this software
// in a product, an acknowledgement in the product documentation would be
// appreciated but is not required.
// 2. Altered source versions must be plainly marked as such, and must not be
// misrepresented as being the original software.
// 3. This notice may not be removed or altered from any source distribution.
#pragma once
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
typedef struct enkiTaskScheduler enkiTaskScheduler;
typedef struct enkiTaskSet enkiTaskSet;
typedef void (* enkiTaskExecuteRange)( uint32_t start_, uint32_t end, uint32_t threadnum_, void* pArgs_ );
// Create a new task scheduler
enkiTaskScheduler* enkiNewTaskScheduler();
// Initialize task scheduler - will create GetNumHardwareThreads()-1 threads, which is
// sufficient to fill the system when including the main thread.
// Initialize can be called multiple times - it will wait for completion
// before re-initializing.
void enkiInitTaskScheduler( enkiTaskScheduler* pETS_ );
// Initialize a task scheduler with numThreads_ (must be > 0)
// will create numThreads_-1 threads, as thread 0 is
// the thread on which the initialize was called.
void enkiInitTaskSchedulerNumThreads( enkiTaskScheduler* pETS_, uint32_t numThreads_ );
// Delete a task scheduler
void enkiDeleteTaskScheduler( enkiTaskScheduler* pETS_ );
// Create a task set.
enkiTaskSet* enkiCreateTaskSet( enkiTaskScheduler* pETS_, enkiTaskExecuteRange taskFunc_ );
// Delete a task set.
void enkiDeleteTaskSet( enkiTaskSet* pTaskSet_ );
// Schedule the task
void enkiAddTaskSetToPipe( enkiTaskScheduler* pETS_, enkiTaskSet* pTaskSet_,
void* pArgs_, uint32_t setSize_ );
// Schedule the task with a minimum range.
// This should be set to a value which results in computation effort of at least 10k
// clock cycles to minimize tast scheduler overhead.
// NOTE: The last partition will be smaller than m_MinRange if m_SetSize is not a multiple
// of m_MinRange.
// Also known as grain size in literature.
void enkiAddTaskSetToPipeMinRange( enkiTaskScheduler* pETS_, enkiTaskSet* pTaskSet_,
void* pArgs_, uint32_t setSize_, uint32_t minRange_ );
// Check if TaskSet is complete. Doesn't wait. Returns 1 if complete, 0 if not.
int enkiIsTaskSetComplete( enkiTaskScheduler* pETS_, enkiTaskSet* pTaskSet_ );
// Wait for a given task.
// should only be called from thread which created the taskscheduler , or within a task
// if called with 0 it will try to run tasks, and return if none available.
void enkiWaitForTaskSet( enkiTaskScheduler* pETS_, enkiTaskSet* pTaskSet_ );
// Waits for all task sets to complete - not guaranteed to work unless we know we
// are in a situation where tasks aren't being continuosly added.
void enkiWaitForAll( enkiTaskScheduler* pETS_ );
// get number of threads
uint32_t enkiGetNumTaskThreads( enkiTaskScheduler* pETS_ );
// TaskScheduler implements several callbacks intended for profilers
typedef void (*enkiProfilerCallbackFunc)( uint32_t threadnum_ );
struct enkiProfilerCallbacks
{
enkiProfilerCallbackFunc threadStart;
enkiProfilerCallbackFunc threadStop;
enkiProfilerCallbackFunc waitStart;
enkiProfilerCallbackFunc waitStop;
};
// Get the callback structure so it can be set
struct enkiProfilerCallbacks* enkiGetProfilerCallbacks( enkiTaskScheduler* pETS_ );
#ifdef __cplusplus
}
#endif

View File

@@ -0,0 +1,210 @@
// Copyright (c) 2013 Doug Binks
//
// This software is provided 'as-is', without any express or implied
// warranty. In no event will the authors be held liable for any damages
// arising from the use of this software.
//
// Permission is granted to anyone to use this software for any purpose,
// including commercial applications, and to alter it and redistribute it
// freely, subject to the following restrictions:
//
// 1. The origin of this software must not be misrepresented; you must not
// claim that you wrote the original software. If you use this software
// in a product, an acknowledgement in the product documentation would be
// appreciated but is not required.
// 2. Altered source versions must be plainly marked as such, and must not be
// misrepresented as being the original software.
// 3. This notice may not be removed or altered from any source distribution.
#pragma once
#include <stdint.h>
#include <assert.h>
#ifdef _WIN32
#include "Atomics.h"
#define WIN32_LEAN_AND_MEAN
#include <Windows.h>
#define THREADFUNC_DECL DWORD WINAPI
#define THREAD_LOCAL __declspec( thread )
namespace enki
{
typedef HANDLE threadid_t;
// declare the thread start function as:
// THREADFUNC_DECL MyThreadStart( void* pArg );
inline bool ThreadCreate( threadid_t* returnid, DWORD ( WINAPI *StartFunc) (void* ), void* pArg )
{
// posix equiv pthread_create
DWORD threadid;
*returnid = CreateThread( 0, 0, StartFunc, pArg, 0, &threadid );
return *returnid != NULL;
}
inline bool ThreadTerminate( threadid_t threadid )
{
// posix equiv pthread_cancel
return CloseHandle( threadid ) == 0;
}
inline uint32_t GetNumHardwareThreads()
{
SYSTEM_INFO sysInfo;
GetSystemInfo(&sysInfo);
return sysInfo.dwNumberOfProcessors;
}
}
#else // posix
#include <pthread.h>
#include <unistd.h>
#define THREADFUNC_DECL void*
#define THREAD_LOCAL __thread
namespace enki
{
typedef pthread_t threadid_t;
// declare the thread start function as:
// THREADFUNC_DECL MyThreadStart( void* pArg );
inline bool ThreadCreate( threadid_t* returnid, void* ( *StartFunc) (void* ), void* pArg )
{
// posix equiv pthread_create
int32_t retval = pthread_create( returnid, NULL, StartFunc, pArg );
return retval == 0;
}
inline bool ThreadTerminate( threadid_t threadid )
{
// posix equiv pthread_cancel
return pthread_cancel( threadid ) == 0;
}
inline uint32_t GetNumHardwareThreads()
{
return (uint32_t)sysconf( _SC_NPROCESSORS_ONLN );
}
}
#endif // posix
// Semaphore implementation
#ifdef _WIN32
namespace enki
{
struct semaphoreid_t
{
HANDLE sem;
};
inline void SemaphoreCreate( semaphoreid_t& semaphoreid )
{
semaphoreid.sem = CreateSemaphore(NULL, 0, MAXLONG, NULL );
}
inline void SemaphoreClose( semaphoreid_t& semaphoreid )
{
CloseHandle( semaphoreid.sem );
}
inline void SemaphoreWait( semaphoreid_t& semaphoreid )
{
DWORD retval = WaitForSingleObject( semaphoreid.sem, INFINITE );
assert( retval != WAIT_FAILED );
}
inline void SemaphoreSignal( semaphoreid_t& semaphoreid, int32_t countWaiting )
{
if( countWaiting )
{
ReleaseSemaphore( semaphoreid.sem, countWaiting, NULL );
}
}
}
#elif defined(__MACH__)
// OS X does not have POSIX semaphores
// see https://developer.apple.com/library/content/documentation/Darwin/Conceptual/KernelProgramming/synchronization/synchronization.html
#include <mach/mach.h>
namespace enki
{
struct semaphoreid_t
{
semaphore_t sem;
};
inline void SemaphoreCreate( semaphoreid_t& semaphoreid )
{
semaphore_create( mach_task_self(), &semaphoreid.sem, SYNC_POLICY_FIFO, 0 );
}
inline void SemaphoreClose( semaphoreid_t& semaphoreid )
{
semaphore_destroy( mach_task_self(), semaphoreid.sem );
}
inline void SemaphoreWait( semaphoreid_t& semaphoreid )
{
semaphore_wait( semaphoreid.sem );
}
inline void SemaphoreSignal( semaphoreid_t& semaphoreid, int32_t countWaiting )
{
while( countWaiting-- > 0 )
{
semaphore_signal( semaphoreid.sem );
}
}
}
#else // POSIX
#include <semaphore.h>
namespace enki
{
struct semaphoreid_t
{
sem_t sem;
};
inline void SemaphoreCreate( semaphoreid_t& semaphoreid )
{
int err = sem_init( &semaphoreid.sem, 0, 0 );
assert( err == 0 );
}
inline void SemaphoreClose( semaphoreid_t& semaphoreid )
{
sem_destroy( &semaphoreid.sem );
}
inline void SemaphoreWait( semaphoreid_t& semaphoreid )
{
int err = sem_wait( &semaphoreid.sem );
assert( err == 0 );
}
inline void SemaphoreSignal( semaphoreid_t& semaphoreid, int32_t countWaiting )
{
while( countWaiting-- > 0 )
{
sem_post( &semaphoreid.sem );
}
}
}
#endif