File ThreadSafeQueue.c
File List > core > ThreadSafeQueue.c
Go to the documentation of this file
#include "ThreadSafeQueue.h"
#include <stdlib.h>
#include <string.h>
#include "Log.h"
#include "AssertLib.h"
void TSQ_Init(struct ThreadSafeQueue* pQueue, u32 itemSize, u32 queueSizeItems, OnTSQueueWrapAroundFn wrapAroundCallback)
{
pQueue->itemSize = itemSize;
pQueue->queueSize = queueSizeItems;
pQueue->queueCurrentLength = 0;
InitMutex(&pQueue->mutex);
pQueue->data = malloc(itemSize * queueSizeItems);
memset(pQueue->data, 0, pQueue->itemSize * pQueue->queueSize);
pQueue->queueHead = pQueue->data;
pQueue->queueTail = pQueue->data;
pQueue->onWrap = wrapAroundCallback;
}
void TSQ_DeInit(struct ThreadSafeQueue* pQueue, u32 itemSize, u32 queueSizeItems)
{
free(pQueue->data);
}
static void AdvanceQueueTail(struct ThreadSafeQueue* pQueue)
{
pQueue->queueTail += pQueue->itemSize;
if(pQueue->queueTail > pQueue->data + (pQueue->queueSize - 1) * pQueue->itemSize)
{
pQueue->queueTail = pQueue->data;
}
}
static void AdvanceQueueHead(struct ThreadSafeQueue* pQueue)
{
pQueue->queueHead += pQueue->itemSize;
if(pQueue->queueHead > pQueue->data + (pQueue->queueSize - 1) * pQueue->itemSize)
{
pQueue->queueHead = pQueue->data;
}
}
void TSQ_Enqueue(struct ThreadSafeQueue* pQueue, const void* pIn)
{
LockMutex(&pQueue->mutex);
memcpy(pQueue->queueTail, pIn, pQueue->itemSize);
AdvanceQueueTail(pQueue);
if(++pQueue->queueCurrentLength > pQueue->queueSize)
{
if(pQueue->onWrap)
{
pQueue->onWrap(pQueue->queueHead);
}
AdvanceQueueHead(pQueue);
pQueue->queueCurrentLength--;
Log_Warning("Threadsafe queue is full, discarding first element, consider increasing the queue size, size is: %i", pQueue->queueSize);
}
UnlockMutex(&pQueue->mutex);
}
/*
true if something dequeued, false if queue is empty
*/
bool TSQ_Dequeue(struct ThreadSafeQueue* pQueue, void* pOut)
{
bool bSomethingDequeued = true;
LockMutex(&pQueue->mutex);
if(pQueue->queueCurrentLength == 0)
{
bSomethingDequeued = false;
goto unlock_mutex;
}
memcpy(pOut, pQueue->queueHead, pQueue->itemSize);
AdvanceQueueHead(pQueue);
pQueue->queueCurrentLength--;
unlock_mutex:
UnlockMutex(&pQueue->mutex);
return bSomethingDequeued;
}