【问题标题】:Enormous amount of memory usage, no memory leak detected大量内存使用,未检测到内存泄漏
【发布时间】:2020-08-05 18:45:11
【问题描述】:

在我的程序中查找内存泄漏时遇到问题。

top 报告程序运行时内存使用量增加。使用 valgrind 分析我的程序时,没有报告内存泄漏。

程序由一个“读者”线程和几个“消费者”线程组成。

“reader”线程将数据加载到几个 char** 指针之一,每个“consumer”线程一个。

“consumer”线程处理其对应的char*指针的数据并释放内存。

我已经包含了一些描述我的程序正在做什么的伪代码。我知道提供的代码可能不足以描述问题。如果有帮助的话,我很乐意包含整个代码项目。

“读者”线程,为简洁起见

//'nconsumers': number of consumer threads
char ***queue = malloc(nconsumers*sizeof(char **));
for (int i = 0; i < nconsumers; i++) {
    //'length' number of datapoints a 'consumer' works on at a time
    queue[i] = malloc(length*sizeof(char *));
}

char *data = NULL;
int qtracker = 0; //tracks to which 'consumer' data should be assgned
int ltracker = 0; //tracks how many datapoints have been added to each 'consumer'
//loaddata reads data and stores it in 'data' struct
while (loaddata(data) >= 0) {
    char *datapoint = malloc(data->legth); 
    memcpy(datapoint, data->content, data->length);
    queue[qtracker][ltracker] = datapoint;
    qtracker++;
    if (nconsumers == qtracker) { 
        qtracker = 0;
        ltracker++;
        if (length == ltracker) ltracker = 0;
    }
}
//NULL pointers are added to the end of each 'consumer' queues to indicate all data has been read

“消费者”线程

//Consumers are initialized and a queue is assigned to them
int qnum = "some number between 0 and nconsumers";
int datatracker = 0;
char **dataqueue = queue[qnum];

datapoint = dataqueue[datatracker]
datatracker++;
while (datapoint != NULL) {
    //Do work on data
    free(datapoint);
    datapoint = dataqueue[datatracker];
    datatracker++;

    //More synchronization code
}

“消费者”线程正在正确读取数据并按应有的方式处理它。同样,valgrind 报告没有内存泄漏。当使用 top 或 htop 监视我的进程时,该程序的内存使用量不断增加,直到我的机器开始交换。

编辑

我添加了一个完整的程序来重现错误。这不完全是我遇到问题的程序,因为它包含额外的依赖项。同样,这个程序产生了 1 个“读者”线程和 N 个消费者线程。 在具有数亿行的大型文本文件(例如 DNA 测序文件)上运行时,htop 稳定地显​​示内存使用量不断增长,而 valgrind 显示除了 pthread 特定的内存泄漏之外没有内存泄漏。

再次感谢大家的帮助!!

在任何现代 linux 机器上编译和运行

gcc -Wall -o <name> <program.c> -lm -lpthread
./name large_text_file.txt <num_threads> <>

只有这个警告应该出现,因为我在这个例子中使用了提取的指针:

<program>.c: In function ‘consumer’:
<program>.c:244:11: warning: variable ‘line’ set but not used [-Wunused-but-set-variable]
     char *line = NULL;
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <math.h>
#include <unistd.h>

// Data passed to threads
typedef struct {
    //Input file
    FILE *fp;
    //Number of threads
    int numt;
    //Syncronization data
    pthread_mutex_t mtx;
    pthread_cond_t workcond;
    pthread_cond_t readcond;
    int gowork;
    int goread;
    //Tracks how many threads are done analyzing data
    int doneq;
    /*
      Stores "data queues" (1 queue per thread)
      queue ->       [  [ char**    [ char**    [ char**    [ char**    [ char**
len(queue)=numt          [char*]     [char*]     [char*]     [char*]     [char*]
len(queue[n])=maxqueue   [char*]     [char*]     [char*]     [char*]     [char*]
len(queue[n][m])=data      ...         ...         ...         ...         ...
                         [char*]     [char*]     [char*]     [char*]     [char*]
                                 ]           ]           ]           ]          ]
                                                                                ]
    */
    char ***queue;
    //Internal thread ID
    int *threadidx;
    //Maximum number of lines to read
    int maxseqs;
    //Maximum number of lines per thread == maxseqs/numthreads
    int maxqueue;
} thread_t;

/*
Extracts char * pointers from one of the "data queues". Does work with
the data and frees when done.
*/
void *reader(void *threaddata);

/*
Reads lines from text file, copies line content and length into a char * pointer
and adds it to an "analysis queue" to be processed by one of the "consumers"
*/
void *consumer(void *threaddata);

/*
Initializes thread data
*/
int  threadtinit(FILE *fp, int numt, thread_t *threaddata, int maxseqs);

/*
Cleans thread data before exit
*/
void threadtkill(thread_t *threaddata);


int main(int argc, char *argv[])
{
    if (argc < 4) {
        fprintf(stderr, "ERROR: Not enough arguments.\n");
        exit(-1);
    }

    FILE *fp = fopen(argv[1], "r");
    if (!fp) {
        fprintf(stderr, "ERROR: Failed to open input file.\n");
        exit(-1);
    }

    int numt = atoi(argv[2]);
    if (!numt) {
        fprintf(stderr, "ERROR: Please specify number of threads.\n");
        fclose(fp);
        exit(-1);
    }

    int maxseqs = atoi(argv[3]);
    if (!maxseqs) {
        fprintf(stderr, "ERROR: Please specify max number of lines.\n");
        fclose(fp);
        exit(-1);
    }

    //Start data struct for threads
    thread_t threaddata;
    if (!threadtinit(fp, numt, &threaddata, maxseqs)) {
        fprintf(stderr, "ERROR: Could not initialize thread data.\n");
        fclose(fp);
        exit(-1);
    }
    fprintf(stderr, "Thread data initialized.\n");


    //return code
    int ret;

    //pthread creation
    pthread_t readerthread;
    pthread_t *consumerpool = NULL;
    consumerpool = malloc((numt)*sizeof(pthread_t));
    if (!consumerpool) {
        fprintf(stderr, "Failed to allocate threads.\n");
        ret = -1;
        goto exit;
    }

    // Initialize and set thread detached attribute
    pthread_attr_t attr;
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

    //Consumer threads
    int thrc;
    for (int i = 0; i < numt; i++) {
        thrc = pthread_create(consumerpool + i,
                              &attr,
                              consumer,
                              (void *)&threaddata);
        if (thrc) {
            fprintf(stderr, "ERROR: Thread creation.\n");
            ret = -1;
            goto exit;
        }
    }

    //Couple of sleeps to keep track of stuff while running
    sleep(1);

    //Reader thread
    thrc = pthread_create(&readerthread,
                          &attr,
                          reader,
                          (void *)&threaddata);
    if (thrc) {
        fprintf(stderr, "ERROR: Thread creation.\n");
        ret = -1;
        goto exit;
    }


    // Free attribute and wait for the other threads
    pthread_attr_destroy(&attr);

    int jrc;
    jrc = pthread_join(readerthread, NULL);
    if (jrc) {
        fprintf(stderr, "Thread error join. Return code: %d\n", jrc);
    }
    for (int i = 0; i < numt; i++) {
        jrc = pthread_join(*(consumerpool + i), NULL);
        if (jrc) {
            fprintf(stderr, "Thread error join. Return code: %d\n", jrc);
            ret = -1;
            goto exit;
        }
    }
    ret = 0;
    exit:
        threadtkill(&threaddata);
        free(consumerpool);
        fprintf(stderr, "Finished.\n");
        return(ret);
}


void *reader(void *readt)
{
    fprintf(stderr, "Reader thread started.\n");
    thread_t *threaddata = readt;
    int numt = threaddata->numt;
    int maxqueue = threaddata->maxqueue;
    int maxseqs = threaddata->maxseqs;
    FILE *fp = threaddata->fp;

    // Array of queues, one per consumer thread
    char ***queue = threaddata->queue;

    // Number of bytes used to store length of line
    size_t bytes = sizeof(ssize_t);
    // Tracks number of lines loaded so far
    size_t nlines = 0;

    // Tracks to which queue data should be added to
    int qtracker = 0;
    // Tracks to which position in any particular queue, data should be added
    int ltracker = 0;

    // Holds read line
    char *line = NULL;
    ssize_t linelength = 0;
    size_t n;

    // Tracks how much data will be read
    size_t totallength = 0;
    size_t totallines = 0;
    while ( (linelength =  getline(&line, &n, fp)) != -1 ) {
        // enough data is used to hold line contents + line length
        char *data = malloc(bytes + linelength + 1);

        if (!data) {
            fprintf(stderr, "memerr\n");
            continue;
        }
        // move line lenght bytes to data
        memcpy(data, &linelength, bytes);
        //move line bytes to data
        memcpy(data + bytes, line, linelength + 1);

        totallength += linelength;

        // Add newly allocated data to one of numt queues
        queue[qtracker][ltracker] = data;
        qtracker++;
        if (numt == qtracker) {
            // Loop around queue
            qtracker = 0;
            ltracker++;
            // Loop around positions in queue
            if (maxqueue == ltracker) ltracker = 0;
        }
        nlines++;
        // Stop reading thread and start consumer threads
        if (nlines == maxseqs) {
            fprintf(stderr, "%lu lines loaded\n", nlines);
            sleep(3);
            totallines += nlines;
            nlines = 0;
            fprintf(stderr, "Waking up consumers\n");
            pthread_mutex_lock(&(threaddata->mtx));
            //Wake consumer threads
            threaddata->gowork = 1;
            pthread_cond_broadcast(&(threaddata->workcond));
            //Wait for consumer threads to finish
            while ( !threaddata->goread ) {
                pthread_cond_wait(&(threaddata->readcond),
                                  &(threaddata->mtx));
            }
            fprintf(stderr, "Reader has awoken!!!!\n\n");
            sleep(3);
            threaddata->goread = 0;
            pthread_mutex_unlock(&(threaddata->mtx));
        }
    }

    //Add NULL pointers to the end of each queue to indicate reading is done
    pthread_mutex_lock(&(threaddata->mtx));
    for (int i = 0; i < numt; i++) {
        queue[i][ltracker] = NULL;
    }
    // Wake consumers for the last time
    threaddata->gowork = 1;
    pthread_cond_broadcast(&(threaddata->workcond));
    pthread_mutex_unlock(&(threaddata->mtx));

    // Log info
    fprintf(stderr, "%lu characters read.\n", totallength);
    if (line) free(line);
    pthread_exit(NULL);
}


void *consumer(void *consumert)
{
    thread_t *threaddata = consumert;
    // Number of consumer threads
    int numt = threaddata->numt;
    // Max length of queue to extract data from
    int maxqueue = threaddata->maxqueue;

    // Holds data sent by reader thread
    char *data = NULL;
    // Holds the actual line read
    char *line = NULL;
    size_t linelength;
    size_t bytes = sizeof(ssize_t);

    // get queue number for corresponding thread
    int qnum = -1;
    pthread_mutex_lock(&(threaddata->mtx));
    int *tlist = threaddata->threadidx;
    while (qnum == -1) {
        qnum = *tlist;
        *tlist = -1;
        tlist++;
    }
    fprintf(stderr, "Thread got queueID: %d.\n", qnum);
    pthread_mutex_unlock(&(threaddata->mtx));
    // Any thread works on only one and one queue only
    char **queue = threaddata->queue[qnum];

    //After initializing, wait for reader to start working
    pthread_mutex_lock(&(threaddata->mtx));
    while ( !threaddata->gowork) {
        pthread_cond_wait(&(threaddata->workcond), &(threaddata->mtx));
    }
    fprintf(stderr, "Consumer thread started queueID %d.\n", qnum);
    pthread_mutex_unlock(&(threaddata->mtx));

    // Tracks number of characters this thread consumes
    size_t totallength = 0;
    // Tracks from which position in queue data should be taken from
    size_t queuecounter = 1;
    // Get first data point
    data = queue[0];

    while (data != NULL) {
        //get line length
        memcpy(&linelength, data, bytes);

        //get line
        line = data + bytes;

        //Do work
        totallength += linelength;
        free(data);

        //Check for number of sequences analyzed
        if (queuecounter == maxqueue) {
            // Wait for other threads to catchup
            sleep(1);
            queuecounter = 0;
            pthread_mutex_lock(&(threaddata->mtx));
            threaddata->doneq++;
            threaddata->gowork = 0;
            // If this thread is the last one to be done with its queue, wake
            // reader
            if (threaddata->doneq == numt) {
                threaddata->goread = 1;
                pthread_cond_signal(&(threaddata->readcond));
                threaddata->doneq = 0;
            }
            // When done consuming data, wait for reader to load more
            while (!threaddata->gowork) {
                pthread_cond_wait(&(threaddata->workcond),
                                  &(threaddata->mtx));
            }
            pthread_mutex_unlock(&(threaddata->mtx));
        }
        //Get next line
        data = queue[queuecounter];
        queuecounter++;
    }

    // Log and exit
    fprintf(stderr, "\tThread %d analyzed %lu characters.\n", qnum, totallength);
    pthread_exit(NULL);
}


int  threadtinit(FILE *fp, int numt, thread_t *threaddata, int maxseqs)
{
    threaddata->fp = fp;
    //Determine maximum thread queue length
    threaddata->maxqueue = ceil((float)maxseqs/numt);
    threaddata->maxseqs = threaddata->maxqueue*numt;
    fprintf(stderr, "max lines to load: %d\n", threaddata->maxseqs);
    fprintf(stderr, "max lines per thread: %d\n", threaddata->maxqueue);
    threaddata->numt = numt;
    //Allocate data for queues and initilize them
    threaddata->queue = malloc(numt*sizeof(char *));
    threaddata->threadidx = malloc(numt*sizeof(int));
    for (int i = 0; i < numt; i++) {
        threaddata->queue[i] = malloc(threaddata->maxqueue*sizeof(char *));
        threaddata->threadidx[i] = i;
    }
    //Initialize syncronization data
    pthread_mutex_init(&(threaddata->mtx), NULL);
    pthread_cond_init(&(threaddata->workcond), NULL);
    pthread_cond_init(&(threaddata->readcond), NULL);
    threaddata->gowork = 0;
    threaddata->goread = 0;
    threaddata->doneq = 0;
    return 1;
}


void threadtkill(thread_t *threaddata)
{
    fclose(threaddata->fp);
    for (int i = 0; i < threaddata->numt; i++) {
        free(threaddata->queue[i]);
    }
    free(threaddata->queue);
    free(threaddata->threadidx);
    pthread_mutex_destroy(&(threaddata->mtx));
}

【问题讨论】:

  • 这是我的第一个想法,但它与读者线程中的指针相同。真的让我对正在发生的事情感到头疼。
  • 虽然指向不同事物的指针 [通常] 大小相同,但因为您有 char ***queue,请将:queue[i] = malloc(length*sizeof(char *)); 更改为 queue[i] = malloc(length*sizeof(char **)); 如果您正在执行前者,则可能表明您有你理解的水平问题。 [几乎]从不使用三级指针char ***。很容易把它搞砸,而且我 [在 40 多年的 C 编程中] 从来没有使用过一个。见:wiki.c2.com/?ThreeStarProgrammer
  • @CraigEstey - malloc(length*sizeof(char *));malloc(length*sizeof(char **)); 我相信自 sizeof(char **) == sizeof(char *) 以来的最终结果始终相同。
  • @ryyker 是的,它们的大小相同(我提到过)(即计算结果相同)。但是,我在谈论逻辑正确性。这将 [更清楚地] 体现为:struct foo { int bar[100]; }; 然后,struct foo *arr = malloc(25 * sizeof(struct foo *)); 应该是 struct foo *arr = malloc(25 * sizeof(struct foo));,这是我们一直在这里看到的一个 OP 问题。在这里,这是一个类似的问题,但是,我们“很幸运”char *char ** 大小相同
  • 如果您想尝试编辑这篇文章以使其成为minimal reproducible example,这将有助于找出问题所在。否则,代码中的空白太多了,无法进一步猜测。顺便说一句,如果您在我删除它之前注意到我的答案,请不要。它做出了错误的断言。

标签: c concurrency memory-leaks pthreads valgrind


【解决方案1】:

这条线看起来很可疑:

if (length == ltracker) ltracker++;

我通常希望看到:

if (length == ltracker) ltracker = 0; /* wrap */

但没有整个上下文,它有点模糊。此外,很明显,您正在通过所有这些在生产者和消费者之间制造竞争,这可能比您当前的问题更难调试。

自从你升到三级后;您确实认识到您的缓冲区空间是 O(n^3);并且 free() 很少会缩小您的进程内存。 Free 通常只允许你回收之前分配的堆;所以你的程序会一直增长,直到它不再需要向系统请求更多内存,然后保持这个大小。

【讨论】:

  • 修复“if (length == ltracker) ltracker++;”输入我的部分。我知道我的三级指针相当复杂。但这种分配只发生一次。大多数分配的内存发生在一个循环中。该内存被释放并且 valgrind 没有检测到泄漏,但是当运行大型数据集时,内存一直在使用。处理可以在此处发布的最小可重复示例。
  • 是的,我的意思是你断言你有泄漏;但我不认为你这样做——你只是使用了很多数据。你确实有一个同步问题——在 327 的睡眠是为了缓解竞争条件;删除它你的程序停止工作。问题在于如何启动消费者线程。
  • @jregalad:valgrind 可以报告已分配但未释放的内存。使用 --show-reachable=yes 或 --show-leak-kinds=all。您还可以根据需要搜索泄漏(或不断增长的内存),例如从外壳,使用 vgdb。更多详情请见valgrind.org/docs/manual/…
【解决方案2】:

请注意,以下仅关注您称为readerconsumer 线程的代码 sn-ps,尽管正如 cmets 和其他答案中指出的那样,还有其他潜在的来源需要审查问题...

在你的reader thread:

while (loaddata(data) >= 0) {
    char *datapoint = malloc(data->legth); 
    ...
    // Note: there are no free(datapoint); calls in this loop
}

很明显,datapoint 是在这个块中创建的,但没有在这个块中释放。

以下可能是导致内存泄漏的因素:

  • 因为reader thread 中的datapoint 实例是在块内创建的,所以它的生命只存在于这些块内。在该地址创建的内存继续由创建它的进程拥有,但在该块之外,指向该内存的指针变量不再存在,因此无法在该块之外释放。而且因为我在该块内没有看到对free(datapopint) 的调用,所以它永远不会被释放。

  • 结合这一点,char *datapoint = malloc(data-&gt;legth); 在循环中被调用,(中间不调用free)每次在新地址创建新内存,同时覆盖引用其前任的地址,从而使不可能释放所有以前的分配。

  • consumer thread 中的datapoint 实例虽然与reader thread 中的符号相同,但并未指向相同的内存空间。因此,即使该变量 被释放,它也不会释放存在于 reader thread 中的 datapoint 实例。

摘自consumer thread的代码

datapoint = dataqueue[datatracker]  //Note no ";" making this code uncompilable
                                    //forcing the conclusion that code posted
                                    //is not code actually used, 
                                    //Also: where is this instance of datapoint
                                    //created ?
datatracker++;
while (datapoint != NULL) {
    //Do work on data
    free(datapoint);
    datapoint = dataqueue[datatracker];
    datatracker++;

    //More synchronization code
}

根据 cmets 中的问题和一般 Linux 线程信息:
Why doesn't Valgrind detect memory leaks, SO question
passing data between threads question
Creating threads in Linux tutorial
LinuxTtutorial: POSIX Threads

【讨论】:

  • 我明白了,我似乎对如何在线程之间传递数据有很大的误解。我不完全理解的是为什么 valgrind 没有显示任何类型的内存泄漏。
  • @jregalad - 关于valgrind 的好问题。我没有这方面的经验。在这种情况下,它只是一个范围问题。 {...} 大括号描述了在其中创建的任何变量的生命周期。当执行流程退出时,在其中创建的所有内容都将不复存在,除非使用 static 修饰符创建,然后它会继续存在直到进程结束,但仅在 {...} 内。
  • ...此外,关于动态分配内存的一个好的经验法则是,每次调用[m][c][re]alloc() 时都匹配一对一调用free()。通常将指针变量(指向内存的指针)作为函数参数传递,并且在C 中是惯用的。关于线程之间的数据传递,maybe this will help
【解决方案3】:

事实证明我的代码本身没有问题。在 malloc() 释放堆上的内存以供程序重用之后调用 free() 但这并不意味着它会回到系统。这个原因我还是有点不理解。

Valgrind 没有报告内存泄漏,因为没有。

做完穹顶研究,阅读更多动态内存分配的本质,登陆这里:

Force free() to return malloc memory back to OS

Why does the free() function not return memory to the operating system?

Will malloc implementations return free-ed memory back to the system?

Memory not freed after calling free()

在每次空闲后调用 malloc_trim() 足以让系统回收分配的内存。

例如,在不调用 malloc_trim() 的情况下,我的程序的 CPU 和内存使用情况如下所示: 在每次调用我的“阅读器”线程(CPU 使用量的第一个峰值)时,都会分配一些内存。调用 mu“消费者”线程释放请求的内存,但内存并不总是按照图中的蓝线返回给系统。

在每次 free() 之后使用 malloc_trim(),内存使用情况看起来就像我期望的那样: 当“阅读器”线程正在执行时,与进程相关的内存会增加。当“消费者”运行时,内存被释放并返回给操作系统。

【讨论】:

    猜你喜欢
    • 2012-07-16
    • 1970-01-01
    • 1970-01-01
    • 2022-06-11
    • 2012-01-22
    • 2021-09-01
    • 1970-01-01
    相关资源
    最近更新 更多