【发布时间】:2021-11-05 00:39:10
【问题描述】:
我对 kafka 非常陌生,并尝试将数据写入主题并从同一主题中读取(我们现在充当源团队来摄取数据。因此,我们正在执行写入到 Kafk 主题的操作和从同一主题消费)。 我在 spark-shell 上编写了以下代码,将数据写入 Kafka 主题。
pyspark --packages io.delta:delta-core_2.11:0.6.1,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0,io.strimzi:kafka-oauth-client:0.5.0
from pyspark.sql.functions import col
from pyspark.sql.functions import from_json
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DecimalType, TimestampType, DateType
tn = "topic_name"
kafka_broker = "brokerurl:9500"
endpoint_uri = "endpoint_uri"
client_id = "clientid"
client_secret = "secret_key"
jaas_config = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required"
oauth_client = " oauth.client.id='{0}'".format(client_id)
oauth_secret = " oauth.client.secret='{0}'".format(client_secret)
oauth_token_endpoint_uri = " oauth.token.endpoint.uri='{0}'".format(endpoint_uri)
oauth_config = jaas_config + oauth_client + oauth_secret + oauth_token_endpoint_uri + " oauth.max.token.expiry.seconds='30000' ;"
df = spark.sql("select * from dbname.tablename where geography in ('ASIA', 'LATIN_AMERICA') and geo_year in (2020, 2021)").select(F.to_json(F.struct(F.col("*"))).alias("value"))
# WRITE TO TOPIC
df.write.format("kafka")\
.option("kafka.bootstrap.servers", kafka_broker)\
.option("kafka.batch.size", 51200)\
.option("retries", 3)\
.option("kafka.max.request.size", 500000)\
.option("kafka.max.block.ms", 120000)\
.option("kafka.metadata.max.age.ms", 120000)\
.option("kafka.request.timeout.ms", 120000)\
.option("kafka.linger.ms", 0)\
.option("kafka.delivery.timeout.ms", 130000)\
.option("acks", "1")\
.option("kafka.compression.type", "snappy")\
.option("kafka.security.protocol", "SASL_SSL")\
.option("kafka.sasl.jaas.config", oauth_config)\
.option("kafka.sasl.login.callback.handler.class", "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler")\
.option("kafka.sasl.mechanism", "OAUTHBEARER")\
.option("topic", tn)\
.save()
后来我才知道,一个 Kafka 主题可以包含分区中的数据。所以我删除并重新创建了相同的主题,但这次有 3 个分区。
我所有的 spark 经验都是在批处理中,即使在我们使用读取表或文件时分区数据的地方也存在分区的概念
df = spark.read.format('jdbc').option('', '')
...
...
.option('partitionColumn', 'partitionColumn_name')
.load()
批处理中使用的这个分区列通常是一个具有高基数的列,我们还可以指定我们想要将数据拆分成使用的分区数
df = spark.read.format('jdbc').option('', '')
...
...
.option('partitionColumn', 'partitionColumn_name').option('numPartitions', INTEGER_VALUE_OF_PARTITIONS)
.load()
我在普通的 Kafka 代码中看到了自定义分区器类,但我使用的是 spark-streaming,甚至不确定如何集成它。 我对 Kafka 主题分区的困惑在于以下几点:
- 如何选择每个主题的分区数?我是 使用 Kafka 实现火花流。
- 有没有一种方法可以使用 Spark 流管理分区数据 ?
- 如果没有,有什么方法可以确保数据在 主题的分区。
我已经浏览了this 官方文档。
但在那里找不到有关分区策略的任何信息。 谁能告诉我如何将数据写入主题的特定分区,或者将其留给 Kafka 更好。
编辑 1: 我刚刚通过这个link 并且提到了一个公式来计算基于吞吐量需要的分区数。 这是我们可以遵循的方法来确定每个主题的分区数吗?
任何澄清都会对我很有价值。
【问题讨论】:
标签: apache-spark pyspark apache-kafka spark-structured-streaming