【问题标题】:Many independent threads dependent on single thread许多独立线程依赖于单线程
【发布时间】:2015-02-16 05:14:15
【问题描述】:

我有一个包含 n 列和许多行的文件

    Col 1 col2 col3 .......col n

我想读取一次并写入多个(例如 m 个)输出,按几个关键列对行进行分组。假设必须产生 3 个输出:

对于输出 1:

groupingKeys[0]={1,2) //group the records on col 1 and 2

对于输出 2:

groupingKeys[1]={1,4,5} //group the records on col 1 4 5

对于输出 3

groupingKeys[2]={2,3}  //group on col 2,3

在主线程中,我逐行读取输入文件。对于每个读取行,我想在 m 个不同的线程中处理读取行。所以基本上我想要电话

map[0].process(data,groupingKeys[0]);
map[1].process(data,groupingKeys[1]); 
map[2].process(data,groupingKeys[2]);

应该在 3 个不同的线程中运行,并且 3 个线程中的每一个都应该在主线程读取该行之后继续。

我可以使用第 i 个线程的 run 方法创建 m 个不同的线程

map[i].process(data,groupingKeys[i]);

但这 3 个线程只有在读取该行的主线程才能继续运行,以便它们看到正确的 data[] 值。我怎样才能做到这一点?

主线程 thread-0 thread-1 thread-2 运行等待等待等待 等待 运行 运行 运行 运行等待等待等待

每一步都会读取并处理一行 通过处理,我的意思是为每个分组键完成类似于 sql groupby 的操作 下面是上面提到的示例代码。

public void writeMultipleGroupedOutputs(String inputfile,int groupingKeys[][])
{
    Mymap<key,value>[] mapArr= new Mymap<key,value>[k]; //k maps to group records in k ways as per k grouping keys
    String line;
    while((line = br.readLine()) != null) {
        String[] data=line.split(regex);  **//one line is read in main thread**
        for(int i=0;i<m;i++)
            map[i].process(data,groupingKeys[i]); **//process in m different ways.How to make this happen in m independent threads?**
    }

    class Mymap extends HashMap<key,value> {
        void  process(String[] data,int[] keyIndexes)
        {
            //extract key from key indexes
            //extract value from value indexes
            put(key,value);
        }  

        @Override
        public Value put(Key k, Value v) {
            if (containsKey(k)) {
                oldval=get(k);
                put(k,oldval.aggregate(v)); //put sum of old and new
                return oldval;
            }else{
                put(k,v);
                return null;
            }
        }
    }
}

对不起,如果我没有明确我的观点。简单来说,我想要 map[i].process(data,groupingKeys[i]);在单独的(第 i 个线程)中发生

a b 5
a b 10
a c 15
so if i want to group by {1} and {1,2} 
read line        map1          map2
a b 5           [a--> b,5]      [a,b ->5]
a b 10          [a-> b 15]      [a,b->15]
a c 15          [a->b 30]       [a,b->15   a,c->15]

编辑: 这个问题与我的处理方式或分组逻辑无关,但它是:读取每一行后,我想在不同线程中对读取的行做一些事情。

【问题讨论】:

  • 这段代码不编译,不多解释。
  • 添加了一些说明。代码仅供参考,不会编译。

标签: java multithreading


【解决方案1】:

如果我理解正确,您希望等待处理,直到读取所有文件。如果是这样,根据详细信息,您可能需要查看CyclicBarrierCountDownLatch

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2012-07-26
    • 1970-01-01
    • 1970-01-01
    • 2021-05-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多