Administrator
发布于 2025-08-29 / 2 阅读
0
0

内存池设计与实现

#### 一、程序性能优化之内存池

高效管理内存分配,减少内存碎片,避免频繁调用malloc/free带来的性能损耗

#### 二、内存池核心数据结构

#### 三、分配策略

#### 四、核心实现:

```

//threadpool.h

#ifndef THREADPOOLH_

#define THREADPOOLH_

#ifdef __cplusplus

extern "C" {

#endif

/**

* Increase this constants at your own risk

* Large values might slow down your system

*/

#define MIN_THREADS 1

#define MAX_THREADS 64

#define MAX_QUEUE 655360

typedef struct threadpool_t threadpool_t;

typedef void (*threadpool_func_f)(void *param);

typedef enum {

threadpool_succeed = 0,

threadpool_invalid = -1,

threadpool_lock_failure = -2,

threadpool_queue_full = -3,

threadpool_shutdown = -4,

threadpool_thread_failure = -5,

threadpool_comm_error = -6,

threadpool_thread_limit = -7,

threadpool_thread_equal = -8,

} threadpool_error_t;

typedef enum {

threadpool_thread_invalid = 0,

threadpool_thread_run = 1,

threadpool_thread_kill = 2

} threadpool_thread_status;

typedef enum {

threadpool_graceful = 1,

threadpool_priority = 2

} threadpool_flags_t;

class CThreadPool

{

public:

CThreadPool();

~CThreadPool();

int threadPoolStart(int thread_count, int queueSize);

int threadPoolSetSize(int thread_count);

int threadPoolStop();

int threadPoolAddTask(void (*routine)(void ),void arg, int flags=0);

int threadPoolTaskCount();

private:

/**

* @function threadpool_create

* @brief Creates a threadpool_t object.

* @param thread_count Number of worker threads.

* @param queue_size Size of the queue.

* @param flags Unused parameter.

* @return a newly created thread pool or NULL

*/

threadpool_t *threadpool_create(int thread_count, int queueSize, int flags = 0);

/**

* @function threadpool_add

* @brief add a new task in the queue of a thread pool

* @param pool Thread pool to which add the task.

* @param function Pointer to the function that will perform the task.

* @param argument Argument to be passed to the function.

* @param flags Unused parameter.

* @return 0 if all goes well, negative values in case of error (@see

* threadpool_error_t for codes).

*/

int threadpool_add(threadpool_t pool, void (routine)(void *),

void *arg, int flags);

/**

* @function threadpool_destroy

* @brief Stops and destroys a thread pool.

* @param pool Thread pool to destroy.

* @param flags Flags for shutdown

*

* Known values for flags are 0 (default) and threadpool_graceful in

* which case the thread pool doesn't accept any new tasks but

* processes all pending tasks before shutdown.

*/

int threadpool_destroy(threadpool_t *pool, int flags);

/**

* @function threadPool_SetSize

* @brief Increase or decrease the number of threads.

* @param pool Thread pool to operate.

* @param thread_count number of threads to operate.

*

* Known values for number of threads is any value greater than 1

*/

int threadpool_setsize(threadpool_t *pool, int thread_count);

private:

pthread_mutex_t m_threadLock;

threadpool_t *m_threadPool;

};

#ifdef __cplusplus

}

#endif

#endif /* THREADPOOLH_ */

```

```

//threadpool.cpp

#include <stdlib.h>

#include <pthread.h>

#include <unistd.h>

#include <map>

#include "threadpool.h"

#include <stdio.h>

typedef enum {

immediate_shutdown = 1,

graceful_shutdown = 2

} threadpool_shutdown_t;

typedef struct {

pthread_t * threads;

int status;

}threadpool_thread_t;

/**

* @struct threadpool_task

* @brief the work struct

*

* @var function Pointer to the function that will perform the task.

* @var argument Argument to be passed to the function.

*/

typedef struct {

void (*function)(void *);

void *argument;

} threadpool_task_t;

/**

* @struct threadpool

* @brief The threadpool struct

*

* @var notify Condition variable to notify worker threads.

* @var threads Array containing worker threads ID.

* @var thread_count Number of threads

* @var queue Array containing the task queue.

* @var queue_size Size of the task queue.

* @var head Index of the first element.

* @var tail Index of the next element.

* @var count Number of pending tasks

* @var shutdown Flag indicating if the pool is shutting down

* @var started Number of started threads

*/

struct threadpool_t {

pthread_mutex_t lock;

pthread_cond_t notify;

std::map<pthread_t, threadpool_thread_t> threads;

threadpool_task_t *queue;

threadpool_task_t *queue_priority;

int thread_count;

int queue_size;

int queue_head;

int queue_tail;

int queue_count;

int queue_size_priority;

int queue_head_priority;

int queue_tail_priority;

int queue_count_priority;

int shutdown;

int started;

};

/**

@function void threadpool_thread(void *threadpool)

* @brief the worker thread

* @param threadpool the pool which own the thread

*/

static void threadpool_thread(void threadpool);

int threadpool_free(threadpool_t *pool);

CThreadPool::CThreadPool()

:m_threadPool(NULL),

m_threadLock(PTHREAD_MUTEX_INITIALIZER)

{

}

CThreadPool::~CThreadPool()

{

m_threadPool = NULL;

}

int CThreadPool::threadPoolStart(int thread_count, int queueSize)

{

pthread_mutex_lock(&m_threadLock);

if (!(m_threadPool = threadpool_create(thread_count, queueSize, 0)))

{

pthread_mutex_unlock(&m_threadLock);

return threadpool_comm_error;

}

pthread_mutex_unlock(&m_threadLock);

return threadpool_succeed;

}

int CThreadPool::threadPoolStop()

{

pthread_mutex_lock(&m_threadLock);

if ((threadpool_destroy(m_threadPool, 0) != 0))

{

pthread_mutex_unlock(&m_threadLock);

return threadpool_comm_error;

}

pthread_mutex_unlock(&m_threadLock);

return threadpool_succeed;

}

int CThreadPool::threadPoolAddTask(void (*routine)(void ),void arg, int flags)

{

pthread_mutex_lock(&m_threadLock);

int iRet = threadpool_add(m_threadPool, routine, arg, flags);

pthread_mutex_unlock(&m_threadLock);

return iRet;

}

int CThreadPool::threadPoolTaskCount()

{

int iTaskCount = 0;

pthread_mutex_lock(&m_threadLock);

iTaskCount = m_threadPool->queue_count;

pthread_mutex_unlock(&m_threadLock);

return iTaskCount;

}

threadpool_t* CThreadPool::threadpool_create(int thread_count, int queueSize, int flags)

{

threadpool_t *pool;

int i;

(void) flags;

if(thread_count <= 0 || thread_count > MAX_THREADS || queueSize <= 0 || queueSize > MAX_QUEUE) {

return NULL;

}

/*if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) {

goto err;

}*/

if((pool = (threadpool_t *)new threadpool_t()) == NULL) {

goto err;

}

/* Initialize */

pool->thread_count = 0;

pool->threads.clear();

pool->queue_size = queueSize;

pool->queue_size_priority = queueSize/2;

pool->queue_head = pool->queue_tail = pool->queue_count = 0;

pool->queue_head_priority = pool->queue_tail_priority = pool->queue_count_priority = 0;

pool->shutdown = pool->started = 0;

/* Allocate task queue */

pool->queue = (threadpool_task_t *)malloc

(sizeof(threadpool_task_t) * queueSize);

/* Allocate task queue priority*/

pool->queue_priority = (threadpool_task_t *)malloc

(sizeof(threadpool_task_t) * (queueSize/2));

/* Initialize mutex and conditional variable first */

if((pthread_mutex_init(&(pool->lock), NULL) != 0) ||

(pthread_cond_init(&(pool->notify), NULL) != 0) ||

(pool->queue == NULL) ||

(pool->queue_priority == NULL)) {

goto err;

}

/* Start worker threads */

for(i = 0; i < thread_count; i++) {

pthread_t pHandle = (pthread_t)malloc(sizeof(pthread_t));

if (NULL != pHandle)

{

if(pthread_create(pHandle, NULL,threadpool_thread, (void*)pool) != 0) {

threadpool_destroy(pool, 0);

free(pHandle);

return NULL;

}

threadpool_thread_t stThread;

stThread.status = 0;

stThread.threads = pHandle;

pool->threads.insert(std::pair<pthread_t, threadpool_thread_t>(*pHandle, stThread));

pool->thread_count++;

pool->started++;

}

}

return pool;

err:

if(pool) {

threadpool_free(pool);

}

return NULL;

}

int CThreadPool::threadpool_setsize(threadpool_t *pool, int thread_count)

{

int err = 0;

if (thread_count<MIN_THREADS || thread_count>MAX_THREADS)

{

return threadpool_thread_limit;

}

if(pthread_mutex_lock(&(pool->lock)) != 0) {

return threadpool_lock_failure;

}

if (pool->thread_count < thread_count)

{

/* Start additional worker threads */

for(int i = pool->thread_count; i < thread_count; i++) {

pthread_t pHandle = (pthread_t)malloc(sizeof(pthread_t));

if (NULL != pHandle)

{

if(pthread_create(pHandle, NULL,threadpool_thread, (void*)pool) != 0) {

if(pthread_mutex_unlock(&pool->lock) != 0) {

err = threadpool_lock_failure;

}

free(pHandle);

return threadpool_comm_error;

}

threadpool_thread_t stThread;

stThread.status = 0;

stThread.threads = pHandle;

pool->threads.insert(std::pair<pthread_t, threadpool_thread_t>(*pHandle, stThread));

pool->thread_count++;

pool->started++;

}

}

}

else if (pool->thread_count > thread_count)

{

int i = pool->thread_count;

/* notify to close [pool.thread_count - thread_count] thread */

std::map<pthread_t, threadpool_thread_t>::iterator it = pool->threads.begin();

for(; i > thread_count; i--) {

if (it != pool->threads.end())

{

it->second.status = 1;

it++;

}

}

/* Wake up all worker threads */

if(pthread_cond_broadcast(&(pool->notify)) != 0) {

err = threadpool_lock_failure;

}

}

else

{

err = threadpool_thread_equal;

}

if(pthread_mutex_unlock(&pool->lock) != 0) {

err = threadpool_lock_failure;

}

return err;

}

int CThreadPool::threadPoolSetSize(int thread_count)

{

pthread_mutex_lock(&m_threadLock);

int iRet = threadpool_setsize(m_threadPool, thread_count);

pthread_mutex_unlock(&m_threadLock);

return iRet;

}

int CThreadPool::threadpool_add(threadpool_t pool, void (function)(void *),

void *argument, int flags)

{

int err = 0;

if(pool NULL || function NULL) {

return threadpool_invalid;

}

if(pthread_mutex_lock(&(pool->lock)) != 0) {

return threadpool_lock_failure;

}

if(flags&threadpool_priority)

{

int next;

next = (pool->queue_tail_priority + 1) % pool->queue_size_priority;

do {

/* Are we full ? */

if(pool->queue_count_priority == pool->queue_size_priority) {

err = threadpool_queue_full;

break;

}

/* Are we shutting down ? */

if(pool->shutdown) {

err = threadpool_shutdown;

break;

}

/* Add task to queue */

pool->queue_priority[pool->queue_tail_priority].function = function;

pool->queue_priority[pool->queue_tail_priority].argument = argument;

pool->queue_tail_priority = next;

pool->queue_count_priority += 1;

/* pthread_cond_broadcast */

if(pthread_cond_signal(&(pool->notify)) != 0) {

err = threadpool_lock_failure;

break;

}

} while(0);

}

else

{

int next;

next = (pool->queue_tail + 1) % pool->queue_size;

do {

/* Are we full ? */

if(pool->queue_count == pool->queue_size) {

err = threadpool_queue_full;

break;

}

/* Are we shutting down ? */

if(pool->shutdown) {

err = threadpool_shutdown;

break;

}

/* Add task to queue */

pool->queue[pool->queue_tail].function = function;

pool->queue[pool->queue_tail].argument = argument;

pool->queue_tail = next;

pool->queue_count += 1;

/* pthread_cond_broadcast */

if(pthread_cond_signal(&(pool->notify)) != 0) {

err = threadpool_lock_failure;

break;

}

} while(0);

}

if(pthread_mutex_unlock(&pool->lock) != 0) {

err = threadpool_lock_failure;

}

return err;

}

int CThreadPool::threadpool_destroy(threadpool_t *pool, int flags)

{

int i, err = 0;

if(pool == NULL) {

return threadpool_invalid;

}

if(pthread_mutex_lock(&(pool->lock)) != 0) {

return threadpool_lock_failure;

}

do {

/* Already shutting down */

if(pool->shutdown) {

err = threadpool_shutdown;

break;

}

pool->shutdown = (flags & threadpool_graceful) ?

graceful_shutdown : immediate_shutdown;

int i = pool->thread_count;

/* notify to close [pool.thread_count - thread_count] thread */

std::map<pthread_t, threadpool_thread_t>::iterator it = pool->threads.begin();

for(; i > 0; i--) {

if (it != pool->threads.end())

{

it->second.status = 1;

it++;

}

}

/* Wake up all worker threads */

if(pthread_cond_broadcast(&(pool->notify)) != 0) {

err = threadpool_lock_failure;

break;

}

} while(0);

if(pthread_mutex_unlock(&(pool->lock)) != 0) {

return threadpool_lock_failure;

}

/* Only if everything went well do we deallocate the pool */

if(!err) {

threadpool_free(pool);

}

return err;

}

int threadpool_free(threadpool_t *pool)

{

if(pool == NULL || pool->started > 0) {

return threadpool_comm_error;

}

/* Did we manage to allocate ? */

if(pool->queue_priority) {

free(pool->queue_priority);

}

if(pool->queue) {

free(pool->queue);

/* Because we allocate pool->threads after initializing the

mutex and condition variable, we're sure they're

initialized. Let's lock the mutex just in case. */

//pthread_mutex_lock(&(pool->lock));

pthread_mutex_destroy(&(pool->lock));

pthread_cond_destroy(&(pool->notify));

}

delete(pool);

return threadpool_succeed;

}

static int checkthreadexit(threadpool_t *pool)

{

int err = 0;

pthread_t tid = pthread_self();

std::map<pthread_t, threadpool_thread_t>::iterator it = pool->threads.find(tid);

if (it != pool->threads.end()){

if (it->second.status == 1)

{

free(it->second.threads);

pool->threads.erase(it);

pool->thread_count--;

err = threadpool_thread_kill;

return err;

}

else

{

err = threadpool_thread_run;

return err;

}

}

else

{

err = threadpool_thread_invalid;

return err;

}

return err;

}

static int checkthreadwait(threadpool_t *pool)

{

int err = 0;

pthread_t tid = pthread_self();

std::map<pthread_t, threadpool_thread_t>::iterator it = pool->threads.find(tid);

if (it != pool->threads.end() && it->second.status == 1){

if (it->second.status == 1)

{

err = threadpool_thread_kill;

return err;

}

else

{

err = threadpool_thread_run;

return err;

}

}

else

{

err = threadpool_thread_invalid;

return err;

}

return err;

}

static void threadpool_thread(void threadpool)

{

threadpool_t pool = (threadpool_t )threadpool;

threadpool_task_t task;

for(;;) {

/* Lock must be taken to wait on conditional variable */

pthread_mutex_lock(&(pool->lock));

/* Wait on condition variable, check for spurious wakeups.

When returning from pthread_cond_wait(), we own the lock. */

while((pool->queue_count 0) && (pool->queue_count_priority 0) && (!pool->shutdown)) {

/* check status */

if (checkthreadwait(pool) == threadpool_thread_kill)

{

break;

}

pthread_cond_wait(&(pool->notify), &(pool->lock));

}

/* check status */

if (checkthreadexit(pool) == threadpool_thread_kill)

{

break;

}

if((pool->shutdown == immediate_shutdown) ||

((pool->shutdown == graceful_shutdown) &&

(pool->queue_count == 0) &&

(pool->queue_count_priority == 0))) {

break;

}

/* Grab our task */

if(pool->queue_count_priority != 0)

{

task.function = pool->queue_priority[pool->queue_head_priority].function;

task.argument = pool->queue_priority[pool->queue_head_priority].argument;

pool->queue_head_priority = (pool->queue_head_priority + 1) % pool->queue_size_priority;

pool->queue_count_priority -= 1;

}

else

{

task.function = pool->queue[pool->queue_head].function;

task.argument = pool->queue[pool->queue_head].argument;

pool->queue_head = (pool->queue_head + 1) % pool->queue_size;

pool->queue_count -= 1;

}

/* Unlock */

pthread_mutex_unlock(&(pool->lock));

/* Get to work */

(*(task.function))(task.argument);

}

pool->started--;

pthread_mutex_unlock(&(pool->lock));

pthread_exit(NULL);

return(NULL);

}

```


评论