【问题标题】:Move data from google cloud storage to S3 using dataproc hadoop cluster and airflow使用 dataproc hadoop 集群和气流将数据从谷歌云存储移动到 S3
【发布时间】:2018-06-21 00:52:09
【问题描述】:

我正在尝试将大量数据从 GCS 传输到 S3 存储桶。我使用 Google DataProc 构建了一个 hadoop 集群。

我可以使用以下命令通过 Hadoop CLI 运行该作业:

hadoop distcp -update gs://GCS-bucket/folder s3a://[my_aws_access_id]:[my_aws_secret]@aws-bucket/folder 

我是 mapreduce 和 hadoop 的新手。我正在尝试使用DataProcHadoopOperator 将其添加到我的气流工作流程中:

export_to_s3 = DataProcHadoopOperator(
            task_id='export_to_s3',
            main_jar=None,
            main_class=None,
            arguments=None,
            archives=None,
            files=None,
            job_name='{{task.task_id}}_{{ds_nodash}}',
            cluster_name='optimize-m',
            dataproc_hadoop_properties=None,
            dataproc_hadoop_jars=None,
            gcp_conn_id='google_cloud_default',
            delegate_to=None,
            region='global',
            dag=dag

  )

我的气流在计算引擎实例上运行。

我不知道如何制作,以便将以下内容创建为工作

hadoop distcp -update gs://GCS-bucket/folder s3a://[my_aws_access_id]:[my_aws_secret]@aws-bucket/folder 

我已遵循建议并构建了以下气流任务:

export_to_s3 = DataProcHadoopOperator(
            task_id='export_to_s3',
            main_jar='file:///usr/lib/hadoop-mapreduce/hadoop-distcp.jar',
            main_class=None,
            arguments='-update gs://umg-comm-tech-dev/data/apollo/QA/ s3a://[mys3accessid]:[mys3secret]@s3://umg-ers-analytics/qubole/user-data/pitched/optimize/QA/'.split(' '),
            archives=None,
            files=None,
            job_name='{{task.task_id}}_{{ds_nodash}}',
            cluster_name='optimize',
            dataproc_hadoop_properties=None,
            dataproc_hadoop_jars=None,
            gcp_conn_id='google_cloud_default',
            delegate_to=None,
            region='global',
            dag=dag

  )

但是我现在收到以下错误:

18/01/18 10:13:42 INFO gcs.GoogleHadoopFileSystemBase: GHFS version: 1.6.2-hadoop2
18/01/18 10:13:42 WARN s3native.S3xLoginHelper: The Filesystem URI contains login details. This is insecure and may be unsupported in future.
18/01/18 10:13:43 WARN s3a.S3AFileSystem: Client: Amazon S3 error 400: 400 Bad Request; Bad Request (retryable)

com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: 8F6A80AA7432A696), S3 Extended Request ID: U6j5J9djR5UPPjhbjjLOtn7dG4IXDyMZfTD6CuFk5V6MXdUP65ArF56zP4Okx2NScxqYVh/UCTI=
    at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1182)
    at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:770)
    at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)
    at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1107)
    at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1070)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.verifyBucketExists(S3AFileSystem.java:276)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:236)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2812)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:100)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2849)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2831)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:389)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:356)
    at org.apache.hadoop.tools.DistCp.setTargetPathExists(DistCp.java:226)
    at org.apache.hadoop.tools.DistCp.run(DistCp.java:118)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
    at org.apache.hadoop.tools.DistCp.main(DistCp.java:462)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.util.RunJar.run(RunJar.java:234)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:148)
    at com.google.cloud.hadoop.services.agent.job.shim.HadoopRunJarShim.main(HadoopRunJarShim.java:12)
18/01/18 10:13:43 ERROR tools.DistCp: Invalid arguments: 
org.apache.hadoop.fs.s3a.AWSS3IOException: doesBucketExist on s3: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: 8F6A80AA7432A696), S3 Extended Request ID: U6j5J9djR5UPPjhbjjLOtn7dG4IXDyMZfTD6CuFk5V6MXdUP65ArF56zP4Okx2NScxqYVh/UCTI=: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: 8F6A80AA7432A696)
    at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:178)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.verifyBucketExists(S3AFileSystem.java:282)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:236)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2812)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:100)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2849)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2831)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:389)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:356)
    at org.apache.hadoop.tools.DistCp.setTargetPathExists(DistCp.java:226)
    at org.apache.hadoop.tools.DistCp.run(DistCp.java:118)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
    at org.apache.hadoop.tools.DistCp.main(DistCp.java:462)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.util.RunJar.run(RunJar.java:234)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:148)
    at com.google.cloud.hadoop.services.agent.job.shim.HadoopRunJarShim.main(HadoopRunJarShim.java:12)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: 8F6A80AA7432A696), S3 Extended Request ID: U6j5J9djR5UPPjhbjjLOtn7dG4IXDyMZfTD6CuFk5V6MXdUP65ArF56zP4Okx2NScxqYVh/UCTI=
    at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1182)
    at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:770)
    at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)
    at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1107)
    at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1070)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.verifyBucketExists(S3AFileSystem.java:276)
    ... 18 more
Invalid arguments: doesBucketExist on s3: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: 8F6A80AA7432A696), S3 Extended Request ID: U6j5J9djR5UPPjhbjjLOtn7dG4IXDyMZfTD6CuFk5V6MXdUP65ArF56zP4Okx2NScxqYVh/UCTI=: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: 8F6A80AA7432A696)
usage: distcp OPTIONS [source_path...] <target_path>
              OPTIONS
 -append                       Reuse existing data in target files and
                               append new data to them if possible
 -async                        Should distcp execution be blocking
 -atomic                       Commit all changes or none
 -bandwidth <arg>              Specify bandwidth per map in MB
 -delete                       Delete from target, files missing in source
 -diff <arg>                   Use snapshot diff report to identify the
                               difference between source and target
 -f <arg>                      List of files that need to be copied
 -filelimit <arg>              (Deprecated!) Limit number of files copied
                               to <= n
 -filters <arg>                The path to a file containing a list of
                               strings for paths to be excluded from the
                               copy.
 -i                            Ignore failures during copy
 -log <arg>                    Folder on DFS where distcp execution logs
                               are saved
 -m <arg>                      Max number of concurrent maps to use for
                               copy
 -mapredSslConf <arg>          Configuration for ssl config file, to use
                               with hftps://. Must be in the classpath.
 -numListstatusThreads <arg>   Number of threads to use for building file
                               listing (max 40).
 -overwrite                    Choose to overwrite target files
                               unconditionally, even if they exist.
 -p <arg>                      preserve status (rbugpcaxt)(replication,
                               block-size, user, group, permission,
                               checksum-type, ACL, XATTR, timestamps). If
                               -p is specified with no <arg>, then
                               preserves replication, block size, user,
                               group, permission, checksum type and
                               timestamps. raw.* xattrs are preserved when
                               both the source and destination paths are
                               in the /.reserved/raw hierarchy (HDFS
                               only). raw.* xattrpreservation is
                               independent of the -p flag. Refer to the
                               DistCp documentation for more details.
 -sizelimit <arg>              (Deprecated!) Limit number of files copied
                               to <= n bytes
 -skipcrccheck                 Whether to skip CRC checks between source
                               and target paths.
 -strategy <arg>               Copy strategy to use. Default is dividing
                               work based on file sizes
 -tmp <arg>                    Intermediate work path to be used for
                               atomic commit
 -update                       Update target, copying only missingfiles or
                               directories

非常感谢您的帮助。

谢谢!

【问题讨论】:

  • 找出问题所在 - s3a://[mys3accessid]:[mys3secret]@s3://umg-ers-analytics/qubole/user-data/pitched/optimize/QA/'.split(' ') 应该是 s3a://[mys3accessid]:[mys3secret]@umg-ers-analytics/qubole/user-data/pitched/optimize/QA/'.split(' ')

标签: amazon-s3 google-cloud-platform google-cloud-storage airflow google-cloud-dataproc


【解决方案1】:

我不熟悉 Airflow,但看起来 DataProcHadoopOperator 只是 gcloud dataproc jobs submit hadoop 的包装器。

我相信 hadoop distcp 只是 hadoop jar /usr/lib/hadoop-mapreduce/hadoop-distcp.jar 的包装,因此通过 Dataproc API 提交 distcp 如下所示:

gcloud dataproc jobs submit hadoop --cluster=optimize --jar file:///usr/lib/hadoop-mapreduce/hadoop-distcp.jar -- -update gs://GCS-bucket/folder s3a://[my_aws_access_id]:[my_aws_secret]@aws-bucket/folder

所以对于气流,你可能想要这样的东西:

main_jar='file:///usr/lib/hadoop-mapreduce/hadoop-distcp.jar', arguments='-update gs://GCS-bucket/folder s3a://[my_aws_access_id]:[my_aws_secret]@aws-bucket/folder'.split(' '),

仅供参考,如果 optimize-m 是主虚拟机名称,您对气流的集群参数可能只是 optimize

【讨论】:

  • 非常感谢您的回复,这正是我想要的。但是,我现在遇到了 Data proc 访问范围的问题,知道如何解决这个问题吗?
  • 收到以下错误:googleapiclient.errors.HttpError: &lt;HttpError 403 when requesting https://dataproc.googleapis.com/v1/projects/umg-comm-tech-dev/regions/global/jobs:submit?alt=json returned "Request had insufficient authentication scopes."&gt;
  • 啊,我从来没有使用过 S3,所以我对此无能为力。 400 表示这是一个格式错误的请求。如果驱动程序发生这种情况,请考虑打开调试日志记录:cloud.google.com/dataproc/docs/concepts/accessing/…
  • 仅供参考——考虑使用初始化操作在 /etc/hadoop/conf/core-site.xml 中设置您的 AWS 机密。这样,它只在一个地方,而不是通过多个服务进行探测。
  • 要详细说明 Karthik 所说的内容,您应该 set the keys fs.s3a.access.keyfs.s3a.secret.key 使用调用类似 bdconfig set_property --configuration_file=/etc/hadoop/conf/core-site.xml --name=fs.s3a.access.key --value=&lt;your access key&gt; 的初始化操作,而不是使用 s3a:/ 的内联语法/ 路径。它被认为是限制您放置凭据的位置的最佳实践指南,这样您就不需要在气流参数中内联凭据。
【解决方案2】:

您为什么使用 dataproc? gsutil 命令不是更简单吗?

例如:

gsutil -m rsync -r gs://GCS s3://S3

这样的事情会将您的数据从 GCS 移动到 S3,请注意其中的 -m 标志,这意味着它将使用并行同步上传,您也可以使用其他标志在原点删除 -d。

【讨论】:

猜你喜欢
  • 2012-11-16
  • 2015-02-03
  • 2018-10-11
  • 1970-01-01
  • 2021-07-02
  • 1970-01-01
  • 2019-09-22
  • 2019-06-05
  • 1970-01-01
相关资源
最近更新 更多