1引言
如果你是一个开发者或应用程序的程序员打算修改的Sqoop或通过使用Sqoop内部API进行扩展,你应该阅读此文档。以下各节描述的每一个API的作用,包括内部API和支持其他数据库必须的API。
2支持的发行版
本文档适用于Sqoop V1.4.2。
3Sqoop发行版
Apache的Sqoop是Apache软件基金会的一个开放源码的软件产品。Sqoop产品的发展在:http://svn.apache.org/repos/asf/sqoop/trunk。在该网站上,你可以得到:
※最新版本的源代码
※An issue tracker
※包含WIKI的Sqoop文档
4先决条件
Sqoop开发需要如下先决知识:
※JAVA开发:
&熟练掌握JDBC
&熟练掌握Hadoop的API(包括新的MapReduce 0.2版本以上的API)
※关系型数据库和SQL
本文档假定您使用的是Linux或类似Linux的环境。如果你使用的是Windows,您可能能够使用cygwin完成以下任务。如果你使用的是Mac OS X,你应该看到一些兼容性错误(如果有的话)。 Sqoop主要在Linux环境上进行测试。
5Sqoop源码
您可以从如下网站上获取Sqoop的源代码:http://svn.apache.org/repos/asf/sqoop/trunk
Sqoop源代码被保存在一个“Git”仓库中,从仓库中获取Sqoop源码的操作说明被提供在如上网站中提供的一个“TODO”页面上。
编译指令由源码根目录下的“COMPILING.txt”文件提供。
6开发者API参考
本章节的内容提供给那些需要整合或者扩展Sqoop的应用程序开发者。
下面三个部分都是为了接下来的用例而写的:
※使用Sqoop生成的类和公共类
※Sqoop扩展(即,于更多数据库交互的额外的ConnManager的实现)
※修改Sqoop内部实现
每一部分逐步深入的描述Sqoop系统。
6.1外部API
Sqoop在执行的时候自动生成将关系型数据库里表导入(import)到HDFS系统中的java类,这个类包含导入表的每一列的成员字段,这个类的一个实例保存导入表的每一行(每一行为一个实例),这个类通过Hadoop实现序列化API,命名为Writable 和 DBWritable接口。并且包含其他比较方便的方法:
※parse():可以解析带分隔符文本的方法。
※toString():可以保留用户选择的分隔符的方法
确保自动生成的类中的所有方法都存在于下面的抽象类中:
com.cloudera.sqoop.lib.SqoopRecord.
SqoopRecord实例可能依赖于Sqoop的公共API。所有的类都存在于com.cloudera.sqoop.lib包中。这些被简要的描述如下。Sqoop客户端并不需要直接与这些类发生关系,尽管由Sqoop生成的类依赖于他们。因此,这些AOP被认为是公开的,并且需要继续优化的。
※RecordParser类将解析文本文件中的一行到一个字段列表中,使用可控的分隔符和引号字符。
※静态类FieldFormatter提供了一个被用于SqoopRecord.toString()的方法,该方法处理引用和转义字符。
※封装ResultSet、PreparedStatement objects、SqoopRecords之间的数据是通过JdbcWritableBridge实现的。
※BigDecimalSerializer在Writable接口之上包含了一组对BigDecimal(小数)对象序列化的方法。
6.2扩展API
6.2.1HBase的序列化扩展
本章节包含了API和最基本的用于扩展Sqoop的类,这些类使得Sqoop可以与更多的数据库提供商做接口。
Sqoop使用JDBC和DataDrivenDBInputFormat的从数据库中读取数据,对不同的数据库提供商及JDBC元数据之间的不同,必须为大多数数据库提供特定的代码路径。Sqoop针对这个问题的解决方案是引入ConnManager API (com.cloudera.sqoop.manager.ConnMananger)。
ConnManager是一个静态类,这个类定义了数据库本身相关作用的方法。ConnManager类的大多数实现都继承静态类SqlManager (com.cloudera.sqoop.manager.SqlManager),SqlManager使用标准的SQL执行大部分的动作。所有的子类需要实现getConnection()方案,用于获取实际的JDBC数据库链接,子类可以重写其他所有方法。SqlManager类本身暴露了一个受保护的API,使开发人员可以有选择地重写行为。例如getColNamesQuery()方法允许使用getColNames()的进行修改SQL查询,而不需要重写大多数的getColNames()方法。
ConnManager的实现通过Sqoop特定的类(SqoopOptions)获得了大量的配置数据,SqoopOptions是可变的。SqoopOptions不直接存储特定的管理信息,相反,他包含了对配置信息的一个指向,通过GenericOptionsParser解析命令行参数之后由Tool.getConf()返回。这使得通过“-D any.specific.param= any.value”扩展参数,而不需要任何层次的的解析参数或修改SqoopOptions。配置信息中的基础配置通过调用工作流交给任意的MapReduce作业,所以,用户可以在命令行自定义设置必要的Hadoop状态。
目前所有的ConnManager实现都是无状态的,因此,ConnManagers示例的系统在Sqoop的生命周期中对同一个ConnManager类实现了多个实例。目前实例化一个ConnManagers类是一个轻量级的操作,并且很少这样做。因此ConnManagers不对operations之间进行缓存。
当前的ConnManagers是由静态类ManagerFactory的一个实例创建的(参考:http://issues.apache.org/jira/browse/MAPREDUCE-750)。当前的ManagerFactory的一个实现服务于整个Sqoop:com.cloudera.sqoop.manager.DefaultManagerFactory。Sqoop的相关扩展不应该修改DefaultManagerFactory。相反,ManagerFactory的一个特定的扩展需要由新的ConnManager提供。值得注意的是ManagerFactory有一个单一的名为accept()的方法,这个方法将确定是否可以实例化为用于用户输入信息(user’s SqoopOptions)的ConnManager。如果可以,将返回一个ConnManager实例,否则返回null。
ManagerFactory实例的使拥由设置在sqoop-site.xml中的sqoop.connection.factories 支配,扩展库的用户可以安装第三方库,其中包含一个新的ManagerFactory和ConnManager(S),配置sqoop-site.xml使用新的ManagerFactory。DefaultManagerFactory principly通过存储在SqoopOptions中的连接字符串区分数据库。
扩展者可以利用MapReduce,com.cloudera.sqoop.io和util包中的类以方便他们的扩展实现。下面一节中更详细地描述这些包和类。
Sqoop支持从关系型数据库到HBase的导入操作。当导入数据到HBase的时候,必须转化成HBase可以接受的特定的形式:
※数据必须放到HBase中的一个或多个表中。
※输入数据的列必须到放置到一个column family钟。
※值必须序列化成字节数组存放到单元格内
※
所有这些都通过HBase客户端API中的Put statements实现。Sqoop于HBase的交互执行都在com.cloudera.sqoop.hbase包里。记录从数据库中被deserialzed,并从mapper任务中发出。OutputFormat负责把结果插入到HBase中,这是通过一个叫做PutTransformer的接口实现的。PutTransformer有一个方法称为getPutCommand(),它使用一个Map 你可以通过自己的PutTransformer重新这个实现,并且把他添加到map任务的classpath中,来告诉Sqoop你将使用你自己的实现,通过”-D”命令设置自己的class:sqoop.hbase.insert.put.transformer.class。 在您的PutTransformer实现,特定的row key 、column 、 column family可通过getRowKeyColumn()和getColumnFamily()方法。你可以扩展更多的超出的”PUT”操作,例如,注入额外的辅助索引的行。Sqoop将对--hbase-table命令特定的表执行所有的”PUT”操作。 6.3Sqoop内部结构 本节介绍Sqoop的内部结构。 Sqoop程序是由的主类com.cloudera.sqoop.Sqoop驱动。有限数量的额外的类在同一个包:SqoopOptions (如前所述),ConnFactory(即熟练操作ManagerFactory实例)。 6.3.1一般程序流程 一般的程序流程如下: com.cloudera.sqoop.Sqoop是主类和实现了Tool,一个新的实例ToolRunner被推出,Sqoop的第一个参数是一个字符串,并在SqoopTool中定义并执行,SqoopTool来执行用户的各种请求操作(如: import, export, codegen等)。 SqoopTool将解析其余的参数,设置相应的字段在SqoopOptions类,然后执行。 在SqoopTool 的run()方法中,import 、 export或者其他正确的指令被执行。一般情况下ConnManager一般在SqoopOptions数据的基础上进行实例化。ConnFactory被用于从ManagerFactory中获取一个ConnManager。这在前面的部分已经进行了描述。Imports 、 exports或者其他大型数据的迁移操作通常是一个并行的、可靠的MapReduce任务。Import操作并不是一定要以MapReduce作业的方式运行,ConnManager.importTable()将确定如何以最佳的方式进行import操作。每一个主要操作实际上都有ConnMananger控制,除了生成代码的操作,这些操作是由CompilationManager和ClassWriter做的(都在 com.cloudera.sqoop.orm 包中)。导入数据到Hive的操作是由com.cloudera.sqoop.hive.HiveImport的importTable()完成的,这样做是为了不为使用ConnManager的实现都担心。 ConnManager 的 importTable()方法接收一个类型为ImportJobContext的参数,其中包含这个方法所需的各种参数值。将来,该类可扩展附加参数,以实现功能更加强大的import操作。同样, exportTable()方法接收的参数类型ExportJobContext。这些类包含一系列导入/导出,指向SqoopOptions对象,以及其他相关的数据。 6.3.2子包 com.cloudera.sqoop包中的子包,包括: ※Hive:便于数据导入到Hive ※IO: 实现的java.io. *接口 ※Lib: 外部公共API(如前所述) ※Manager: ConnManager和ManagerFactory的接口和它们的实现 ※Mapreduce: 新的(0.20 +)MapReduce的API接口的类 ※Orm: 代码自动生成 ※Tool: 实现SqoopTool ※Util: 各种实用工具类 IO包中的OutputStream和BufferedWriter的实现被用于直接向HDFS进行写入。SplittableBufferedWriter允许为客户端打开一个单一的BufferedWriter,在引擎之下,当达到目标值后连续的写入文件系统。这允许在Sqoop import的同时使用压缩机制(如gzip),同时允许在MapR任务之后对数据集进行分割。大对象文件存储系统的代码也存在于IO包中。 Mapreduce包中的代码用于直接与Hadoop MapReduce做接口,将在下一章节中详述。 ORM包中的代码用于生产代码。它依赖于提供了com.sun.tools.javac包的JDK的tools.jar包。 UTIL包中包含用于整个Sqoop的各种工具 ※ClassLoaderStack:管理由当前线程的堆栈使用的ClassLoader的实例,这正是当以本地模式运行MapReduce任务时自动生成代码写入当心线程的原理。 ※DirectImportUtils:包含一些直接进行Hadoop import操作的方便方法。 ※Executor:启动外部进程,并连接这些来生成由一个AsyncSink(见下面更详细地)的流处理程序。 ※ExportException:当exports失败时由ConnManagers抛出异常。 ※ImportException:当import失败时由ConnManagers抛出异常。 ※JDBCURL:处理连接字符串的解析,这是类URL,是不规范的、不合格的。 ※PerfCounters:被用来估计用于显示给用户的传输速率。 ※ResultSetPrinter:漂亮地打印结果集。 在不同的时候,Sqoop从外部程序中读取stdout,最简单的模式就是由LocalMySQLManager和DirectPostgresqlManager执行的直接模式 (direct-mode)import。之后由Runtime.exec()产生一个进程,该进程的标准输出(Process.getInputStream())和潜在错误(Process.getErrorStream())都需要被处理。在这些流之间无法读取更多的数据从而导致在写入更多数据之前外部进程阻塞。因此,这些都必须处理,最好以异步的方式。 按照Sqoop的说法,“异步接收器”是一个线程需要一个InputStream和读取完成。这些实现AsyncSink实现。com.cloudera.sqoop.util.AsyncSink抽象类定义了这家工厂必须执行的操作。processStream()将产生另一个线程立即开始处理从InputStream中读取的数据参数; 它必须读这流来完成的。 join()方法允许外部线程等待,直到处理完成。 一些"stock"被同步实现:LoggingAsyncSink将重复InputStream上的一切在log4j INFO语句中。NullAsyncSink消耗所有的输入和什么都不做。 各种ConnManagers使得外部进程以内部类的方式拥有它们自己的AsyncSink实现,它们通过数据库tools读取,并且促使数据流向HDFS,有可能在此期间进行格式转换。 6.3.3与MapReduce的接口 Sqoop调度MapReduce作业产生imports和exports。配置和执行的MapReduce工作如下几个共同的步骤(配置InputFormat配置OutputFormat设置映射的实施;等等)。这些步骤是在com.cloudera.sqoop.mapreduce.JobBase类中的。 为了使用,JobBase允许一个用户来指定InputFormat,OutputFormat,和映射器。 JobBase本身就是ImportJobBase和ExportJobBase的子集,为特定的配置步骤提供更好的支持,分别由ImportJobBase和ExportJobBase的子类。 ,ImportJobBase.runImport()将调用的配置步骤,并运行一个工作表导入HDFS。 7Eclispe集成 1、从官方网站上下载:sqoop-1.4.2.bin__hadoop-0.23.tar.gz 2、解压,用ant编译,得到相关jar包 3、新建一个web工程,将上面得到的jar包copy到lib目录下,并添加数据库驱动包(ojdbc6.jar) 4、将sqoop的java源码添加到src目录下 8Eclispe远程调试sqoop 利用eclipse远程调试功能,实现sqoop 本地环境集成: 利用cygwin部署 0、将sqoop-1.4.2放在/home/Administrator/hadoop 1、sqoop中设置HOME_HOME: 修改:$SQOOP_HOME/bin/configure-sqoop :HADOOP_HOME=/cygdrive/e/work/hadoop-1.0.3(cygwin的目录配置方式) 2、hadoop配置java_home $HADOOP_HOME/cof/hadoop-env.sh :export JAVA_HOME=/cygdrive/c/jdk1.6.0_38 3、hadoop配置hadoop_home $HADOOP_HOME/cof/hadoop-env.sh :export HADOOP_HOME=/cygdrive/e/work/hadoop-1.0.3 二、利用eclipse远程调试功能,实现单步调试sqoop功能 0、利用cygwin执行sqoop命令:开启调试模式,端口为默认7777 1、eclipse设置远程调试 Project:为eclipse中部署的sqoop工程,包含sqoop源码。 Host:为执行sqoop命令的应用地址(IP),本地应用为loacalhost,远程应用为远程主机IP Port:监听端口。 目前已实现单步调试功能。./sqoop help下载本文