【问题标题】:write pyspark Rdd or DF in neo4j在 neo4j 中编写 pyspark Rdd 或 DF
【发布时间】:2018-04-03 07:58:03
【问题描述】:

我在使用 py2neo 和 spark-driver 时遇到了一些问题,因为我无法在 foreach 循环或 map 循环中插入节点。例如下面的代码。

from py2neo import authenticate, Graph, cypher, Node
from pyspark import broadcast
infos=df.rdd

authenticate("localhost:7474", "neo4j", "admin")
graph = Graph(password='admin')
tx = graph.begin()

def node(row):
    query = Node("item", event_id=row[0], text=row[19])
    tx.create(query)



infos.foreach(node)
tx.commit()

这里是堆栈跟踪的结尾:

/usr/local/apache/spark-2.2.1-bin-hadoop2.6/python/pyspark/rdd.py in _wrap_function(sc, func, deserializer, serializer, profiler)
   2386     assert serializer, "serializer should not be empty"
   2387     command = (func, profiler, deserializer, serializer)
-> 2388     pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
   2389     return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
   2390                                   sc.pythonVer, broadcast_vars, sc._javaAccumulator)

/usr/local/apache/spark-2.2.1-bin-hadoop2.6/python/pyspark/rdd.py in _prepare_for_python_RDD(sc, command)
   2372     # the serialized command will be compressed by broadcast
   2373     ser = CloudPickleSerializer()
-> 2374     pickled_command = ser.dumps(command)
   2375     if len(pickled_command) > (1 << 20):  # 1M
   2376         # The broadcast will have same life cycle as created PythonRDD

/usr/local/apache/spark-2.2.1-bin-hadoop2.6/python/pyspark/serializers.py in dumps(self, obj)
    462 
    463     def dumps(self, obj):
--> 464         return cloudpickle.dumps(obj, 2)
    465 
    466 

/usr/local/apache/spark-2.2.1-bin-hadoop2.6/python/pyspark/cloudpickle.py in dumps(obj, protocol)
    702 
    703     cp = CloudPickler(file,protocol

我认为我无法在循环内传递参数 tx。 我们试图通过像下面的代码一样在循环内直接实例化一个连接来解决这个问题。它适用于小矩阵,但是当我尝试使用 2000 万行时,它会在某个点停止

from py2neo import authenticate, Graph, cypher, Node
infos=df.rdd
authenticate("localhost:7474", "neo4j", "password")


def node(row):
    graph = Graph(password='admin')
    tx = graph.begin()
    query = Node("item", event_id=row[0], text=row[19])
    tx.create(query)
    tx.commit()

infos.foreach(node)

我对 neo4j-spark 连接器进行了一些研究,您似乎可以添加该库,但没有提供示例,而且我完全不确定 python 中是否确实提供了此类功能。解决这个问题的最佳方法是什么?

【问题讨论】:

    标签: apache-spark neo4j pyspark rdd


    【解决方案1】:

    解决此类问题的标准模式是使用foreachPartition

    def nodes(rows):
        graph = Graph(password='admin')
        tx = graph.begin()
        for row in rows:
            query = Node("item", event_id=row[0], text=row[19])
            tx.create(query)
        tx.commit()
    
    infos.foreachPartition(nodes)
    

    【讨论】:

    • 它似乎适用于 10000 但对于更大的我得到 py2neo.database.status.GraphError: HTTP GET returned response 429
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-08-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-08-21
    • 1970-01-01
    • 2017-09-18
    相关资源
    最近更新 更多