【发布时间】:2018-04-03 07:58:03
【问题描述】:
我在使用 py2neo 和 spark-driver 时遇到了一些问题,因为我无法在 foreach 循环或 map 循环中插入节点。例如下面的代码。
from py2neo import authenticate, Graph, cypher, Node
from pyspark import broadcast
infos=df.rdd
authenticate("localhost:7474", "neo4j", "admin")
graph = Graph(password='admin')
tx = graph.begin()
def node(row):
query = Node("item", event_id=row[0], text=row[19])
tx.create(query)
infos.foreach(node)
tx.commit()
这里是堆栈跟踪的结尾:
/usr/local/apache/spark-2.2.1-bin-hadoop2.6/python/pyspark/rdd.py in _wrap_function(sc, func, deserializer, serializer, profiler)
2386 assert serializer, "serializer should not be empty"
2387 command = (func, profiler, deserializer, serializer)
-> 2388 pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
2389 return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
2390 sc.pythonVer, broadcast_vars, sc._javaAccumulator)
/usr/local/apache/spark-2.2.1-bin-hadoop2.6/python/pyspark/rdd.py in _prepare_for_python_RDD(sc, command)
2372 # the serialized command will be compressed by broadcast
2373 ser = CloudPickleSerializer()
-> 2374 pickled_command = ser.dumps(command)
2375 if len(pickled_command) > (1 << 20): # 1M
2376 # The broadcast will have same life cycle as created PythonRDD
/usr/local/apache/spark-2.2.1-bin-hadoop2.6/python/pyspark/serializers.py in dumps(self, obj)
462
463 def dumps(self, obj):
--> 464 return cloudpickle.dumps(obj, 2)
465
466
/usr/local/apache/spark-2.2.1-bin-hadoop2.6/python/pyspark/cloudpickle.py in dumps(obj, protocol)
702
703 cp = CloudPickler(file,protocol
我认为我无法在循环内传递参数 tx。 我们试图通过像下面的代码一样在循环内直接实例化一个连接来解决这个问题。它适用于小矩阵,但是当我尝试使用 2000 万行时,它会在某个点停止
from py2neo import authenticate, Graph, cypher, Node
infos=df.rdd
authenticate("localhost:7474", "neo4j", "password")
def node(row):
graph = Graph(password='admin')
tx = graph.begin()
query = Node("item", event_id=row[0], text=row[19])
tx.create(query)
tx.commit()
infos.foreach(node)
我对 neo4j-spark 连接器进行了一些研究,您似乎可以添加该库,但没有提供示例,而且我完全不确定 python 中是否确实提供了此类功能。解决这个问题的最佳方法是什么?
【问题讨论】:
标签: apache-spark neo4j pyspark rdd