【问题标题】:Why is ParDo not working on DataflowRunner?为什么 ParDo 不能在 DataflowRunner 上工作?
【发布时间】:2020-11-11 23:56:16
【问题描述】:

当我从 DirectRunner 切换到 Dataflowrunner 时遇到问题:Pardo 显然不起作用。当我将 runner 设置为 Dataflowrunner 时,def process(self, query) 永远不会运行。 我可以看到在 GCP 上工作的工作,但是,我的方法 InsertPostgresql 不能使用 Dataflowrunner。

根据日志错误,显然 ParDo 无法识别“psycopg2”。

process NameError: name 'psycopg2' is not defined 在处理过程中 上述异常

不知道为什么?

def run_pipeline(): 
      
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = r'C:\Users\julianocm\Desktop\POC\<mycredentialfile>.json'
     
        optionsGCP = {
            'streaming': True,
            'project':"<myproject>",
            'region':"us-central1",
            'temp_location':"gs://poc360-bucket/temp",
            'staging_location':"gs://poc360-bucket/staging",
            'drivername':"postgresql",
            'save_main_session': True,
            'setup_file': r'C:\Users\julianocm\Desktop\POC\setup.py'     
        }
    
        paramsDB = {
            'database': '<mydatabase>',
            'user': '<myuser>',
            'password': '<mypassword>',
            'host': 'localhost',
            'port': '5000'
        }
    
        class InsertPostgresql(beam.DoFn):
    
            def __init__(self, **server_config):
                self.config = server_config
    
            def process(self, query):
                con = psycopg2.connect(**self.config)
    
                cur = con.cursor()
                cur.execute(query)
                con.commit()
                resultado=cur.fetchall() 
                cur.close()
                con.close()
                
                yield resultado
    
        runner='Dataflowrunner' 
        options = PipelineOptions(**optionsGCP)
        options.view_as(SetupOptions).save_main_session = True
        options.view_as(StandardOptions).streaming = True 
    
        sql="select public.Insert_tbCadastro('01','010',431,'A',501741,000000,'2020-10-26','A')"
     
            
        p = beam.Pipeline(runner=runner,options=options)
        data = (p
            | beam.Create([sql]) 
            | beam.ParDo(InsertPostgresql(**paramsDB)) 
        )
        
        data | 'teste' >> beam.Map(print)
        print("Lines: ", data)
    
        result = p.run()
        result.wait_until_finish()
    
    if __name__=='__main__':
        run_pipeline()

离开工作执行,一段时间后,我得到了:

2020-11-12T11:54:12.718405344ZError message from worker: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction -134644: Traceback (most recent call last): File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1344, in apache_beam.runners.common._OutputProcessor.process_outputs File "c:\Users\julianocm\Desktop\POC\PipelinePoc360.py", line 45, in process NameError: name 'psycopg2' is not defined During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 256, in _execute response = task() File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 313, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 482, in do_instruction return getattr(self, request_type)( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 518, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 982, in process_bundle input_op_by_transform_id[element.transform_id].process_encoded( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 219, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1371, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1371, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1294, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "/usr/local/lib/python3.8/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback raise exc.with_traceback(traceback) File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1344, in apache_beam.runners.common._OutputProcessor.process_outputs File "c:\Users\julianocm\Desktop\POC\PipelinePoc360.py", line 45, in process NameError: name 'psycopg2' is not defined [while running 'generatedPtransform-134636'] java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57) org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:333) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85) org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:123) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1365) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:154) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1085) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction -134644: Traceback (most recent call last): File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1344, in apache_beam.runners.common._OutputProcessor.process_outputs File "c:\Users\julianocm\Desktop\POC\PipelinePoc360.py", line 45, in process NameError: name 'psycopg2' is not defined During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 256, in _execute response = task() File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 313, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 482, in do_instruction return getattr(self, request_type)( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 518, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 982, in process_bundle input_op_by_transform_id[element.transform_id].process_encoded( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 219, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1371, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1279, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1371, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1294, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "/usr/local/lib/python3.8/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback raise exc.with_traceback(traceback) File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1344, in apache_beam.runners.common._OutputProcessor.process_outputs File "c:\Users\julianocm\Desktop\POC\PipelinePoc360.py", line 45, in process NameError: name 'psycopg2' is not defined [while running 'generatedPtransform-134636'] org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177) org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748)

除了以上所有内容,工作指责:

应用 setup.py 文件后,我得到:

setup.py:

import setuptools 

setuptools.setup(
    name='psycopg2',
    version='2.8.6',
    install_requires=[],
    packages=setuptools.find_packages(),
)

数据流日志错误:

2020-11-12T21:11:50.946327861ZError message from worker: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error 
received from SDK harness for instruction -1396: Traceback (most recent call last): File "apache_beam/runners/common.py", line 1213, 
in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, in
 apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1344, in 
apache_beam.runners.common._OutputProcessor.process_outputs File "c:\Users\julianocm\Desktop\POC\PipelinePoc360.py", line 48, in process
 NameError: name 'psycopg2' is not defined During handling of the above exception, another exception occurred: Traceback
 (most recent call last): File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 256, 
in _execute response = task() File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 313, 
in <lambda> lambda: self.create_worker().do_instruction(request), request) 
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 482, in do_instruction return 
getattr(self, request_type)( File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 518, 
in process_bundle bundle_processor.process_bundle(instruction_id))
 File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 982, in process_bundle
 input_op_by_transform_id[element.transform_id].process_encoded( 
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 219, in process_encoded self.output(decoded_value) File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 670,
 in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, 
in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", 
line 1215, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1279, 
in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1213, 
in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, 
in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", 
line 1371, in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", 
line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", 
line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", line 671, 
in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", line 1215, 
in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 1279, 
in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1213, 
in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 569, 
in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", line 1371, 
in apache_beam.runners.common._OutputProcessor.process_outputs File "apache_beam/runners/worker/operations.py", 
line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive File "apache_beam/runners/worker/operations.py", 
line 670, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/worker/operations.py", 
line 671, in apache_beam.runners.worker.operations.DoOperation.process File "apache_beam/runners/common.py", 
line 1215, 
in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", 
line 1294, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "/usr/local/lib/python3.8/site-packages/future/utils/__init__.py", 
line 446, in raise_with_traceback raise exc.with_traceback(traceback) File "apache_beam/runners/common.py", 
line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", 
line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", 
line 1344, in apache_beam.runners.common._OutputProcessor.process_outputs File "c:\Users\julianocm\Desktop\POC\PipelinePoc360.py", 
line 48, in process NameError: name 'psycopg2' is not defined [while running 'generatedPtransform-1388'] 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) 
org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57) 
org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:333) 
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85) 
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:123) 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1365) 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:154) 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1085) 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
java.lang.Thread.run(Thread.java:748) Caused by: 
java.lang.RuntimeException: Error received from SDK harness for instruction -1396: 
Traceback (most recent call last): File "apache_beam/runners/common.py", 
line 1213, in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", 
line 569, in apache_beam.runners.common.SimpleInvoker.invoke_process File "apache_beam/runners/common.py", 

使用 requirements.txt:

我还在等待作业日志……直到现在,我的 PostgreSQL 表还是空的。

几分钟后我得到了日志,我仍然无法定义 psycopg2:

line 48, in process NameError: name 'psycopg2' is not defined [while running 'generatedPtransform-208'] org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177) org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748)

我的要求.txt:

关于NameErros的故障排除指南之后

显然,我们修复了 NameError。现在我想弄清楚为什么我没有执行我的 SQL。我的工作还在工作,但我的 PostgreSQL 没有任何记录。所以,还是有问题。

当我调试时,“进程”中的行被忽略。参数使用是否正确?

毕竟:

谢谢你, 朱利亚诺

【问题讨论】:

  • 在开发者控制台的数据流 UI 中,您是否看到管道中的步骤正在处理任何数据?那里有任何可见的错误吗?
  • 我假设 InserePostgresql vs InsertPostgresql 在创建这个 SO 问题时是一个错字?
  • 早上。 @PeterKim 我刚刚翻译了方法的名称。 Pardo 正在调用“InsertPostgresql”,但它没有运行 SQL 脚本。当我在 DirectRunner 上运行时,它会正确插入记录,并且存储过程会返回一个报告插入已发生的数字。
  • @danielm 我刚刚检查了该作业现在审核了一个错误。我刚刚更新了我的帖子
  • 看起来 ParDo 无法识别 'psycopg2': " process NameError: name 'psycopg2' is not defined during the processing of the above exception"

标签: google-cloud-platform google-cloud-dataflow psycopg2 apache-beam dataflow


【解决方案1】:

该问题可能与全局导入有关。从这个troubleshooting guide 可以看出,当从直接运行器切换到与您的实际情况相匹配的数据流运行器时,这是一个常见问题。

鉴于您已经尝试了添加标志 --save_main_session=True 的解决方案,我会尝试在函数本身中导入模块,而不是:

import psycopg2
(...)
def process(self, query):
  con = psycopg2.connect(**self.config)

尝试:

def process(self, query):
  import psycopg2
  con = psycopg2.connect(**self.config)

--- 编辑

鉴于我们绕过了该错误并且它在 DirectRunner 中正常工作,我怀疑问题是否存在于代码本身中。我会查看防火墙规则以确保 Dataflow 工作人员可以连接到 PostgreSQL 数据库,并且我会检查控制器服务帐户是否具有必要的权限。

我看到您在代码中指定了一些凭据,考虑到 Dataflow 工作人员使用默认的 controller service account &lt;project-number&gt;-compute@developer.gserviceaccount.com 或自定义的,并且他们需要具有连接到数据库的必要权限。

这两件事(防火墙规则/工作人员使用的服务帐户)是直接运行程序和数据流运行程序之间的潜在差异,因此可能是问题的原因,请查看它们。

【讨论】:

  • 早上@aemon4,谢谢你现在我没有任何错误,但是,我没有将我的数据放入postgresql表中。直到现在,我没有任何可用的日志..所以我不知道发生了什么...我刚刚用我的数据流 UI 的图像更新了我的帖子。我该怎么办 ?谢谢
  • 我不知道为什么,但是在数据流上运行该方法: def process(self, query) 永远不会执行,当我调试时,编译器会跳转进程,所有行都被忽略了。 =( 在 directrunner 上运行,完美运行。
  • 我已经编辑了我的答案,以提供一些我认为可能出现错误的线索。查看它们并让我们知道它们是否有帮助或您仍然遇到困难。
  • 谢谢@aemon4,我现在就去看看。您提供的链接:控制器服务帐户无法正常工作..您可以将其发送给我吗?谢谢
  • 我的错,我插入了和以前一样的链接。现在应该修好了。
【解决方案2】:

似乎没有向工作人员提供 psycopg2 软件包。有关如何在 python 中管理依赖项的信息,请参阅https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/。在这种情况下,由于 psycopg2 在 PyPi 中可用,您只需将其包含在您通过 --requirements_file 管道选项传递的 requirements.txt 文件中。

【讨论】:

  • 谢谢@denielm,我正在尝试使用它。我正在等待 Dataflow 记录一些内容。
  • 我刚刚得到了数据流日志。我仍然有问题。我刚刚更新了 SO 问题。还没有工作:(
  • 你所做的将你的代码命名为包 psycopg2;它没有将其添加为依赖项。由于您的代码都在一个文件中,因此 requirements.txt 方法将是最简单的方法
  • 嗨@danielm我现在正在使用requirements.txt,我正在等待发生一些事情......因为它仍在运行......该工作在数据流用户界面上不可用。 .. 我用我的 gloud shell 的图像更新了 SO 问题。谢谢
  • 不幸的是,我仍然有同样的问题:第 48 行,进程中 NameError: name 'psycopg2' is not defined [while running 'generatedPtransform-208'] 我做错了吗?
猜你喜欢
  • 2021-03-10
  • 1970-01-01
  • 1970-01-01
  • 2021-03-14
  • 1970-01-01
  • 1970-01-01
  • 2013-05-06
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多