【问题标题】:Spark Streaming not performing operations on read blocksSpark Streaming 不对读取块执行操作
【发布时间】:2014-11-26 15:43:13
【问题描述】:

我是 Spark Streaming 概念的新手,最近两天一直在试图理解来自 socket 的 Spark 流。我看到 Spark 能够读取传递给套接字的块。但是,它不对读取的块执行任何操作。

这里是 Spark 代码

package foo;
import java.io.File;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import scala.Tuple2;

public class AppSocket {

public static void main(String[] args)
{

    SparkConf conf = new SparkConf().setAppName("KAFKA").setMaster("local");

    JavaStreamingContext jssc = new JavaStreamingContext(conf, new org.apache.spark.streaming.Duration(1000));

    JavaReceiverInputDStream<String> inputStream = jssc.socketTextStream("localhost", 33333);


    JavaPairDStream<String, Integer> mappedStream = inputStream.mapToPair(
        new PairFunction<String, String, Integer>() {

          public Tuple2<String, Integer> call(String i) {
             System.out.println(i);
            return new Tuple2<String, Integer>(i , 1);
          }
        });

    JavaPairDStream<String, Integer> reducedStream = mappedStream.reduceByKey(
      new Function2<Integer, Integer, Integer>() {

        public Integer call(Integer i1, Integer i2) {
          return i1 + i2;
        }
    });

    reducedStream.print();
    System.out.println("Testing........"+reducedStream.count());
    jssc.start();
    jssc.awaitTermination();


}

}

我正在运行 netcat 以在指定端口上创建输出流

nc -lk 33333

我已尝试创建输出流。这是我的java代码

    ServerSocket serverSocket = null;
    int portNumber = 33333;
    serverSocket = new ServerSocket(portNumber);

    System.out.println("Server Waiting.................");

    Socket clientSocket = serverSocket.accept();

    System.out.println("Server Connected!!!!!!!!!!!!!!!");


    // Wait for a message
    int countflag = 0;
    PrintWriter out = null;
    out = new PrintWriter(clientSocket.getOutputStream(), true);
    while(true)
    {
    Message message = consumer.receive(1000);

    if (message instanceof TextMessage) {
        TextMessage textMessage = (TextMessage) message;
        String text = textMessage.getText();
        System.out.println("Received: " + text);
        list.add(text);
        System.out.println(++countflag);
        if(list.size() > 50)
        {

        for(int i = 0; i < list.size() ; i++)
        {
       System.out.print(i);
        out.write(text);
        out.write("\n");
        out.flush();
        }

       list.clear();
        }

    } else {
        count++;

    }
    if(count > 100) break;
    }
    out.close();
    consumer.close();
    session.close();
    connection.close();

Spark 使用流中发送的块,但它不对流块执行任何所需的操作。

Spark 输出控制台

14/11/26 15:32:14 INFO MemoryStore: ensureFreeSpace(12) called with curMem=3521, maxMem=278302556
14/11/26 15:32:14 INFO MemoryStore: Block input-0-1417015934400 stored as bytes in memory (estimated size 12.0 B, free 265.4 MB)
14/11/26 15:32:14 INFO BlockManagerInfo: Added input-0-1417015934400 in memory on ip-10-0-1-56.ec2.internal:57275 (size: 12.0 B, free: 265.4 MB)
14/11/26 15:32:14 INFO BlockManagerMaster: Updated info of block input-0-1417015934400
14/11/26 15:32:14 WARN BlockManager: Block input-0-1417015934400 already exists on this machine; not re-adding it
14/11/26 15:32:14 INFO BlockGenerator: Pushed block input-0-1417015934400
14/11/26 15:32:15 INFO ReceiverTracker: Stream 0 received 1 blocks
14/11/26 15:32:15 INFO JobScheduler: Added jobs for time 1417015935000 ms

感谢您的帮助。提前致谢

【问题讨论】:

    标签: java apache-spark spark-streaming


    【解决方案1】:

    将 master 设置为 "local[n]" 且 n > 1 。接收器需要一个任务槽来运行,如果只有一个任务槽,则使用“本地”。所以接收器在那个槽中运行,没有任务槽来处理数据。

    我建议阅读我的编程指南下一节中的“要记住的要点”。 http://spark.apache.org/docs/latest/streaming-programming-guide.html#input-dstreams

    【讨论】:

    • 超级。谢谢 。也谢谢你的指导。澄清了我的许多疑问
    猜你喜欢
    • 1970-01-01
    • 2017-06-19
    • 2018-01-28
    • 2016-09-26
    • 2019-03-18
    • 2016-09-08
    • 1970-01-01
    • 2015-04-27
    • 2015-12-03
    相关资源
    最近更新 更多