【发布时间】:2019-05-28 08:00:55
【问题描述】:
我是 pyspark 的新手,我有一个来自 api 的 json 列表,每个 json 对象都有相同的模式(键值对)。像这样
[ {'count': 308,
'next': 'some_url',
'previous': None,
'results': [{'assigned_to': 43,
'category': 'Unused',
'comments': None,
'completed_ts': None,
'created': '2019-05-27T05:14:22.306843Z',
'description': 'Pollution',
'display_name': {'admin': False,
'business_name': 'Test Business',
'contact_number': 'some_number',
'dob': None,
'email': 'some_mail',
'emp_id': None,
'first_name': 'Alisha'}}]},
{'count': 309,
'next': 'some_url',
'previous': None,
'results': [{'assigned_to': 44,
'category': 'Unused',
'comments': None,
'completed_ts': None,
'created': '2019-05-27T05:14:22.306843Z',
'description': 'Pollution',
'display_name': {'admin': False,
'business_name': 'Test Business',
'contact_number': 'some_number',
'dob': None,
'email': 'some_mail',
'emp_id': None,
'first_name': 'Ali'}}]},......}]
如果它是单独的 json 文件。我会使用
创建数据框df =spark.read.json('myfile.json')
然后将所有数据帧合并为一个。我在直接从列表本身转换 datframe 时遇到问题。我用过这个
from pyspark.sql import SparkSession
spark= SparkSession.builder.appName("Basics").getOrCreate()
sc= spark.sparkContext
df = pyspark.sql.SQLContext(sc.parallelize(data_list))`
它给了我
AttributeError: 'RDD' object has no attribute '_jsc'
【问题讨论】:
-
你是怎么调用那个 API 的?是否有一个循环或一些基于间隔的守护进程正在运行?所有消息也共享相同的架构?
-
一个函数里面有循环,如果next key中有url(检查json),那么它会一直获取数据,直到next不为null。
-
@Rohan Kumar 我有一个类似的问题,我必须批量读取传入的 json 数据并将其转储到某个文件中。因此,输出文件具有 json 对象列表。你能分享一下你是如何循环它们的吗
-
@Neha0908 不确定我当时是如何做到的,但您可以使用 Apache Kafka 捕获流数据,然后从 Pyspark 中的数据中加载特定变量。 spark.apache.org/docs/2.1.0/…
标签: python json machine-learning pyspark