【发布时间】:2016-01-13 16:04:30
【问题描述】:
我们如何在 SQoop 中自动执行增量导入?
在增量导入中,我们需要给--last-value从最后一个值开始导入,但我的工作是经常从RDBMS导入,我不想手动给最后一个值,有什么办法我们可以自动化这个过程吗?
【问题讨论】:
我们如何在 SQoop 中自动执行增量导入?
在增量导入中,我们需要给--last-value从最后一个值开始导入,但我的工作是经常从RDBMS导入,我不想手动给最后一个值,有什么办法我们可以自动化这个过程吗?
【问题讨论】:
@Durga Viswanath Gadiraju 回答的另一种方法。
如果您将数据导入到 hive 表中,您可以从 hive 表中查询最后更新的值并将该值传递给 sqoop 导入查询。 您可以使用 shell 脚本或 oozie 操作来实现这一点。
Shell 脚本:
lastupdatedvalue=`hive -e 'select last_value from table` #tweak the selection query based on the logic.
sqoop import --connect jdbc:mysql://localhost:3306/ydb --table yloc --username root -P --incremental append --last-value ${lastupdatedvalue}
Oozie 方法:
PFB 一个 sudo 工作流程:
<workflow-app name="sqoop-to-hive" xmlns="uri:oozie:workflow:0.4">
<start to="hiveact"/>
<action name="hiveact">
<hive xmlns="uri:oozie:hive-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<script>script.sql</script>
<capture-output/>
</hive>
<ok to="sqoopact"/>
<error to="kill"/>
<action name="sqoopact">
<sqoop xmlns="uri:oozie:sqoop-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<command>import --connect jdbc:mysql://localhost:3306/ydb --table yloc --username root -P --incremental append --last-value ${wf:actionData('hiveact')}</command>
</sqoop>
<ok to="end"/>
<error to="kill"/>
</action>
<kill name="kill">
<message>Action failed</message>
</kill>
<end name="end"/>
希望这会有所帮助。
【讨论】:
您可以利用内置的 Sqoop 元存储
您可以使用以下内容创建一个简单的增量导入作业 命令:
sqoop 作业 \ --创建\ --\ 进口 \ --连接 \ --用户名\ --密码 \ --表\ --增量追加\ --检查- \ --last-value 0
并以 --exec 参数启动它:
sqoop job --exec <<Job Name>>
Sqoop 会自动将最后导入的值序列化回 每个成功的增量作业后的元存储
【讨论】:
这可以通过 sqoop 作业轻松实现
1.创建一个sqoop作业(“import”前有一个空格)
sqoop job --create JobName6 \
-- import \
--connect jdbc:mysql://localhost:3306/retail_db \
--username=username \
--password-file /user/sqoop/password \
--table departments \
--target-dir /user/hive/warehouse/test.db/departments \
--table departments \
--split-by department_id \
--check-column department_id \
--incremental append \
--last-value 0;
2。运行 sqoop 作业 sqoop 作业 --exec JobName6; 检查 HDFS 中位置的值
3.在源表(mysql)中插入一些数据 INSERT INTO Departments VALUES (9,'New Data1'),(10,'New Data2');
2。再次运行 sqoop 作业。 sqoop 作业 --exec JobName6; 再次检查HDFS中位置的值。
Hive 导入也是如此
sqoop job --create JobName1 \
-- import \
--connect jdbc:mysql://localhost:3306/retail_db \
--username=username\
--password-file /user/sqoop/password \
--table departments \
--hive-import \
--hive-table department \
--split-by department_id \
--check-column department_id \
--incremental append \
--last-value 0;
【讨论】:
获得它的一种方法:
在数据库中创建日志表并开发增量导入如下
Query the log table using sqoop eval command with the last value from last run
Run the sqoop import
Update the log table with the latest valueusing sqoop eval command
您需要自动化处理sqoop eval、sqoop import 和sqoop eval。您可以使用sqoop eval 将任何有效查询提交到您有连接的任何数据库。因此,您可以在导入之前运行选择查询以获取上次运行的最后一个值,并运行更新查询以使用当前运行的最后一个值更新日志表。
【讨论】: