【问题标题】:How to map kafka consumer data to Mysql using python如何使用python将kafka消费者数据映射到Mysql
【发布时间】:2021-08-07 06:07:31
【问题描述】:

我的主机配置如下:

卡夫卡, 火花, mysql , 在码头上

我的代码如下:

# To find out where the pyspark
import sys
from kafka import KafkaProducer,KafkaConsumer
import findspark
from boto3 import *
import boto3
import json

findspark.init()
# Creating Spark Context
from pyspark import SparkContext
from pyspark.sql import SparkSession
def get_connection(self):
     spark = SparkSession.builder.master("local[*]").appName("SparkByExamples.com").getOrCreate()  
     return spark   

def json_serializer(data):
     return json.dumps(data).encode("utf-8")
    

def read_s3():
    p1 = KafkaProducer(bootstrap_servers=['broker:29092'], value_serializer=json_serializer)
    
    s3 = boto3.resource('s3')
    bucket = s3.Bucket('kakfa')
    for obj in bucket.objects.all():
        key = obj.key
        body = obj.get()['Body'].read().decode('utf-8')
    p1.send("Uber_Eats",body)
    p1.flush()
def read_from_topic(self,spark):
    df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "broker:29092") \
        .option("subscribe", "Uber_Eats") \
         .option("startingOffsets", "earliest") \
        .load()
    df2 = df \
        .writeStream \
        .format("console") \
        .start()
    print(df2.awaitTermination()  )  
def get_consumer(self):
    consumer = KafkaConsumer("Uber_Eats", group_id='group1', bootstrap_servers=
    "broker:29092",value_deserializer=lambda x: json.loads(x.decode('utf-8')))
    return  consumer   
def print_details(self,c1):
    for msg in c1:
          print(msg.value)
    print("Dom  dfe")            
           
          
class Foo:
    def __init__(self):
                 
        spark = get_connection(self)
        read_s3()
        # System.setProperty("hadoop.home.dir", "$HADOOP_HOME\winutils-master\hadoop-2.x.x")
        c1 = get_consumer(spark)
        print_details(self,c1)


f = Foo()  

我上面代码的输出如下:

{
    
        {
            "Customer Number": "1",
            "Customer Name": "Shyam",
            "Restaurant Number": "2201",
            "Restaurant NameOrdered": "Bawarchi",
            "Number of Items": "3",
            "price": "10",
            "Operating Start hours": "9:00",
            "Operating End hours": "23:00"
        },
        {
            "Customer Number": "2",
            "Customer Name": "Rohini",
            "Restaurant Number": "2202",
            "Restaurant NameOrdered": "Sarvana Bhavan",
            "Number of Items": "4",
            "price": "20",
            "Operating Start hours": "8:00",
            "Operating End hours": "20:00"
        },
        {
            "Customer Number": "3",
            "Customer Name": "Bhairav",
            "Restaurant Number": "2203",
            "Restaurant NameOrdered": "Taco Bell",
            "Number of Items": "5",
            "price": "30",
            "Operating Start hours": "11:00",
            "Operating End hours": "21:00"
        }
    
}

我如何将它读入 mysql 的列? i) 是不是像普通的 json 文件一样,读取和插入?

ii)或者我们有什么特定的 kakfa consumer 'json' 格式吗?

iii) 我已经指定了 value_deserializer=lambda x: json.loads(x.decode('utf-8')))

在获取json格式的代码中这是将数据加载到mysql中所必需的

谢谢,

阿迪

【问题讨论】:

标签: python-3.x apache-spark apache-kafka kafka-consumer-api spark-structured-streaming


【解决方案1】:

是不是像普通的json文件一样,读取插入?

不确定你的意思。 Mysql 不接受 json 文件

Spark 有自己的 JSON 文件阅读器,但您是从 Kafka 读取的,所以这无关紧要

我们有什么特定的 kakfa 消费者“json”格式吗?

是的。 CAST(value as STRING) 之后是各种 get_json_object 电话。我已经把你链接到这个Databricks blog series

我已经指定了 value_deserializer=lambda x: json.loads(x.decode('utf-8')))

这不是 Spark。我不知道你为什么还有这个。其次,def get_consumer(self) 不接受或使用您传递给它的 spark 变量,并且您在那里没有类定义,因此不鼓励使用 self 作为命名参数(换句话说,您的所有函数都应该在class Foo 之内,但你也根本不需要课程)

重要细节 - 您显示的文件不是有效的 JSON,因此无论如何这些方法都不会立即起作用

tl;dr - 假设您确实想使用 Spark

  1. 使用您编写的使用 Spark 消费者的函数

  2. 替换

df \
  .writeStream \
  .format("console") \

使用 JDBC writer.writeStream.format("jdbc").save("jdbc:mysql//…") 但仅在修改数据框以匹配数据库架构之后


否则,如果您不再需要 Spark,那么 JSON 或 Kafka 是一个实现细节——下载并配置 Mysql python 客户端,然后像往常一样插入数据——注意事务、回滚、错误处理、准备好的查询,等等


或者,正如多次回答和评论,以及更容错解决方案,忘记 Python 并使用 Kafka Connect(脚本在您的 Kafka bin 目录中可用,并且不需要编码)

【讨论】:

  • 不,我不是在尝试使用 spark 流,我通过 Kafka 消费者获取输出,并在那里形成我试图进入 mysql 的表单
  • 您还指定了这不是有效的 JSON 格式,能否请您展示一下 fomrat 看起来如何有效,我会相应地更改
  • 那你为什么打电话给spark = get_connection(self)?还是完全使用 Spark 代码?把你的数据放在这里,它会告诉你出了什么问题(你最外面的括号应该是 [] 或者你需要该对象中每个值的键)jsonformatter.curiousconcept.com 除此之外(假设你的打印详细信息功能确实有效,但我不认为这是由于其他语法问题),就像我说的,卡夫卡在这里真的不重要。制作一个以 JSON 字符串列表开头的更简单的脚本,并将它们解析/写入 mysql;一旦你让它运作良好,用你的消费者替换那个列表
  • 另外,由于 Kafka 是一个实现细节,因此还有很多其他问题可以准确显示您想要什么 - stackoverflow.com/questions/4251124/… 或 pandas stackoverflow.com/questions/40450591/…
猜你喜欢
  • 2017-01-04
  • 1970-01-01
  • 1970-01-01
  • 2020-09-23
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-02-04
  • 2016-05-15
相关资源
最近更新 更多