【问题标题】:importing avro schema in Scala在 Scala 中导入 avro 模式
【发布时间】:2015-10-24 03:50:20
【问题描述】:

我正在编写一个简单的 twitter 程序,我正在使用 Kafka 阅读 Tweets,并希望使用 Avro 进行序列化。到目前为止,我刚刚在 Scala 中设置了 twitter 配置,现在想使用此配置来阅读推文。

如何在我的程序中导入文件 tweets.avsc 中定义的以下 avro 架构?

{
    "namespace": "tweetavro",
    "type": "record",
    "name": "Tweet",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "text", "type": "string"}
    ]
}

我在网上遵循了一些示例,其中显示了类似 import tweetavro.Tweet 的内容,以便在 Scala 中导入架构,以便我们可以像使用它一样使用它

def main (args: Array[String]) {
    val twitterStream = TwitterStream.getStream
    twitterStream.addListener(new OnTweetPosted(s => sendToKafka(toTweet(s))))
    twitterStream.filter(filterUsOnly)
  }

  private def toTweet(s: Status): Tweet = {
    new Tweet(s.getUser.getName, s.getText)
  }

  private def sendToKafka(t:Tweet) {
    println(toJson(t.getSchema).apply(t))
    val tweetEnc = toBinary[Tweet].apply(t)
    val msg = new KeyedMessage[String, Array[Byte]](KafkaTopic, tweetEnc)
    kafkaProducer.send(msg)
  }

我在pom.xml 中遵循相同并使用以下插件

<!-- AVRO MAVEN PLUGIN -->
<plugin>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro-maven-plugin</artifactId>
  <version>1.7.7</version>
  <executions>
    <execution>
      <phase>generate-sources</phase>
      <goals>
        <goal>schema</goal>
      </goals>
      <configuration>
        <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
        <outputDirectory>${project.basedir}/src/main/scala/</outputDirectory>
      </configuration>
    </execution>
  </executions>
</plugin>


<!-- MAVEN COMPILER PLUGIN -->
<plugin>
  <groupId>org.apache.maven.plugins</groupId>
  <artifactId>maven-compiler-plugin</artifactId>
  <configuration>
    <source>1.7</source>
    <target>1.7</target>
  </configuration>
</plugin>

做完这一切,我还是做不到import tweetavro.Tweet

有人可以帮忙吗?

谢谢!

【问题讨论】:

    标签: scala maven twitter avro apache-kafka


    【解决方案1】:

    您也可以使用avro4s。根据模式定义您的案例类(或生成它)。我们称该类为Tweet。然后创建一个AvroOutputStream,它也会从案例类中推断模式,并用于序列化实例。然后我们可以写入一个字节数组,并将其发送给 kafka。例如:

    val tweet: Tweet= ... // the instance you want to serialize
    
    val out = new ByteArrayOutputStream // we collect the serialized output in this
    val avro = AvroOutputStream[Tweet](out) // you specify the type here as well
    avro.write(tweet)
    avro.close()
    
    val bytes = out.toByteArray
    val msg = new KeyedMessage[String, Array[Byte]](KafkaTopic, bytes)
    kafkaProducer.send(msg)
    

    【讨论】:

      【解决方案2】:

      我推荐使用 Avrohugger。它是 Avro 的 Scala 案例类方面的新手,但支持我需要的一切,我真的很喜欢它不是基于宏的,所以我实际上可以看到生成的内容。

      与维护者合作非常棒,并且非常乐于接受贡献和反馈。它不会也可能永远不会像官方 Java 代码生成那样功能丰富,但它会满足大多数人的需求。

      目前,它缺少对联合(可选类型除外)和递归类型的支持。

      SBT 插件运行良好,如果您想快速查看它对您的 Avro 架构的作用,还有一个新的 Web 界面:

      https://avro2caseclass.herokuapp.com/

      更多细节在这里:

      https://github.com/julianpeeters/avrohugger

      【讨论】:

      • 感谢您的回复。 Avrohugger 看起来很有趣,我可能会在下一个项目中使用它。我设法使用 gradle 解决了这个问题,因为 avro-maven-plugin 是我的问题。
      【解决方案3】:

      您应该首先将该架构编译成一个类。我不确定 Scala 中是否有一个用于生产的 Avro 库,但您可以为 Java 生成一个类并在 Scala 中使用它:

      java -jar /path/to/avro-tools-1.7.7.jar compile schema tweet.avsc .

      根据您的需要更改此行,您应该会得到一个由该工具生成的 tweetavro.Tweet 类。然后你可以把它放到你的项目中,按照你刚才描述的方式使用。

      更多信息here

      upd:仅供参考,似乎有一个library in Scala,但我以前从未使用过它

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2018-11-24
        • 2018-02-05
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2017-06-26
        • 1970-01-01
        相关资源
        最近更新 更多