【问题标题】:Mask the data coming from Kafka stream屏蔽来自 Kafka 流的数据
【发布时间】:2021-01-30 22:12:08
【问题描述】:

我正在使用 spark 结构化流从 kafka 流式传输数据,这为我提供了具有以下架构的数据帧

Column     Type
key        binary
value      binary
topic      string
partition  int
offset     long
timestamp  long
timestampType   int

Value Colum 以二进制格式出现,但它实际上是一个具有结构类型的 json 字符串,要求是读取 json 结构并屏蔽其中的几个字段并写入数据。

【问题讨论】:

    标签: apache-spark apache-kafka spark-streaming spark-structured-streaming spark-streaming-kafka


    【解决方案1】:

    您可以按照Structured Streaming + Kafka Integration Guide 中给出的指南了解如何将二进制值转换为字符串值。

    val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
      .option("subscribe", "topic1")
      .load()
    
    df.selectExpr("CAST(value AS STRING)")
      .as[String]
    

    然后你可以根据你的实际json结构来定义你的schema,例如:

    val schema: StructType = new StructType()
        .add("field1", StringType)
        .add("field2", ArrayType(new StructType()
          .add("f2", StringType)
          .add("f2", DoubleType)
        ))
    

    然后使用from_json 函数将允许您处理JSON 字符串中的数据,请参阅documentation,例如:

    df.selectExpr("CAST(value AS STRING)")
      .select(from_json('json, schema).as("data"))
    

    有了这些,您就可以通过使用结构化 API(例如 withColumndrop)替换列来开始屏蔽。

    如果您不想定义整个架构,可以考虑使用get_json_object

    【讨论】:

    • 问题不是更多关于屏蔽的吗?
    • 让我添加一些细节问题,我希望实时屏蔽流中的数据,然后将其转储到存储层。
    猜你喜欢
    • 2021-07-04
    • 1970-01-01
    • 1970-01-01
    • 2022-01-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多