【发布时间】:2019-04-29 00:24:46
【问题描述】:
我正在尝试从生产者那里读取 Kafka 消息并对其进行一些基本过滤,最后在消费者端打印输出。我将第一个文件名作为具有一组值的参数传递,将第二个文件作为具有过滤条件的第二个参数传递。 当我从 IntelliJ 运行相同的程序时,它工作正常。当我从命令行尝试“scalac”时,我得到“预期的类或对象定义”。
package kafka_db
object kafkap extends App {
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import java.util.Properties
import scala.io.Source
import org.apache.kafka.clients.producer._
import kafkaProducer.kafkaProducerScala.producer
val conf = new SparkConf().
setMaster(args(0)).
setAppName("kafkap")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
val topic = "kafkatopic"
for (line2 <- Source.fromFile(args(2)).getLines){
val c = line2.toInt
for (line <- Source.fromFile(args(1)).getLines) {
val a = line.toInt
val b = if (a > c) {
var d = a
val record = new ProducerRecord[String, String](topic, d.toString)
producer.send(record)
}
}
}
producer.close()
}
【问题讨论】:
-
我们需要看看你是如何运行 scalac 才能知道的。代码看起来没问题。
-
我们将其运行为“scalac
” -
您可能需要使用类路径上所有必需的依赖项来运行它,或者让诸如 sbt 之类的构建工具处理该部分。
-
是的,就是那个。谢谢
标签: scala intellij-idea apache-kafka