Login | Register   
RSS Feed
Download our iPhone app
Browse DevX
Sign up for e-mail newsletters from DevX

By submitting your information, you agree that devx.com may send you DevX offers via email, phone and text message, as well as email offers about other products and services that DevX believes may be of interest to you. DevX will process your information in accordance with the Quinstreet Privacy Policy.


Hadoop in Practice: Using Sqoop for Data Splitting : Page 2

Hadoop in  Practice author Alex Holmes explains how you can use the Sqoop relational database import / export system with Hadoop.




Application Security Testing: An Integral Part of DevOps

Data splitting

How is Sqoop able to parallelize imports across multiple mappers[1]? In figure 1, we showed how Sqoop's first step is to pull metadata from the database. It inspects the table being imported to determine the primary key and runs a query to determine the lower and upper bounds of the data in the table (shown in figure 3). A somewhat even distribution of data within the minimum and maximum keys is assumed by dividing the delta by the number of mappers. Each mapper is then fed a unique query containing a range of the primary key.

Hadoop Sqoop

Figure 3. Sqoop preprocessing steps to determine query splits

You can configure Sqoop to use a non-primary key with the --split-by argument. This can be useful in situations where the primary key doesn't have an even distribution of values between the min and max values. For large tables, however, you need to be careful that the column specified in --split-by is indexed to ensure optimal import times.

You can use the --boundary-query argument to construct an alternative query to determine the minimum and maximum values.

Incremental imports

You can also perform incremental imports. Sqoop supports two types: "append," which works for numerical data that is incrementing over time, such as auto-increment keys, and "lastmodified," which works on time-stamped data. In both cases, you need to specify the column using --check-column, the mode via the --incremental argument (the value must be either "append" or "lastmodified"), and, finally, the actual value to use to determine the incremental changes, --last-value. Using our example, if we want to import stock data that is newer than January 1, 2005, we would do the following:

$ hadoop fs -rmr stocks $ sqoop --options-file ~/.sqoop_import_options.txt \ --check-column "quote_date" \ --incremental "lastmodified" \ --last-value "2005-01-01" \ --connect jdbc:mysql://localhost/sqoop_test \ --table stocks ... tool.ImportTool: --incremental lastmodified tool.ImportTool: --check-column quote_date tool.ImportTool: --last-value 2011-11-24 14:49:56.0 tool.ImportTool: (Consider saving this with 'sqoop job --create') ...

Sqoop jobs and the metastore

You can see in the command output the last value that was encountered for the increment column. How can we best automate a process that can reuse that value? Sqoop has the notion of a job, which can save this information and reuse it in subsequent executions:

$ sqoop job --create stock_increment -- import \ --append \ --check-column "quote_date" \ --incremental "lastmodified" \ --last-value "2005-01-01" \ --connect jdbc:mysql://localhost/sqoop_test \ --username hip_sqoop_user \ --table stocks

This merely saves the notion of this command as a job in something called the Sqoop metastore. A Sqoop metastore keeps track of all jobs. By default, the metastore is contained in your home directory under.sqoop and is only used for your own jobs. If you want to share jobs, you would need to install a JDBC-compliant database and use the --meta-connect argument to specify its location when issuing job commands.

The job create command executed in the previous example didn't do anything other than add the job to the metastore. To run the job, you need to explicitly execute it as shown in this listing:

$ sqoop job –list #1 Available jobs: stock_increment $ sqoop job --exec stock_increment #2 $ sqoop job --show stock_increment #3 ... incremental.last.value = 2011-11-24 15:09:38.0 #4 ...

#1 — List all jobs in the metastore

#2 — Executes your job

#3 — Shows metadata information about your job

#4 — The metadata includes the last value of your incremental column. This is actually the time that the command was executed, and not the last value seen in the table. If you are using this feature, make sure that the database server and any clients interacting with the server (including the Sqoop client) have their clocks synced with the Network Time Protocol (NTP).

Unfortunately, the --options-file argument, which referred to your local file with your username and password, doesn't work with jobs in Sqoop. The password also can't be specified when creating the job. Sqoop will instead prompt for the password when running the job. To make this work in an automated script, you need to use Expect, a Linux automation tool, to supply the password from a local file when it detects Sqoop prompting for a password. The source of an Expect script that works with Sqoop is on our GitHub at http://goo.gl/yL4KQ.

Fast MySQL imports

What if you want to bypass JDBC altogether and use the "fast" MySQL Sqoop connector for a high-throughput load into HDFS? This approach uses the utility shipped with MySQL to perform the mysqldump load. You must make sure that mysqldump is in the PATH of the user running the MapReduce job. To enable use of the "fast" connector you must specify the --direct argument.

$ hadoop fs -rmr stocks $ sqoop --options-file ~/.sqoop_import_options.txt \ --direct \ --connect jdbc:mysql://localhost/sqoop_test \ --table stocks

What are the disadvantages of the fast connectors? First, only MySQL and PostgreSQL are currently supported. Fast connectors also only work with text output files—specifying Avro or Sequence File as the output format of the import won't work.

Importing to Hive

The final step in this technique is to use Sqoop to import your data into a Hive table. The only difference between an HDFS import and a Hive import is that the Hive import has a post-processing step where the Hive table is created and loaded, as shown in figure 4.

Hadoop Sqoop

Figure 4. Sequence of events of a Sqoop Hive import

When data is loaded into Hive from an HDFS file or directory, such as in the case of Sqoop Hive imports (step 4 in the previous diagram), for the sake of efficiency, Hive moves the directory into its warehouse rather than copying the data (step 5). The HDFS directory that the Sqoop MapReduce job writes to won't exist after the import.

Hive imports are triggered via the --hive-import argument. Just as in the fast connector's case, this option isn't compatible with the --as-avrodatafile and --as-sequencefile options.

$ hadoop fs -rmr stocks $ sqoop --options-file ~/.sqoop_import_options.txt \ --hive-import \ --connect jdbc:mysql://localhost/sqoop_test \ --table stocks $ hive hive> select * from stocks; OK 1 AAPL 2009-01-02 85.88 91.04 85.16 90.75 26643400 90.75 2 AAPL 2008-01-02 199.27 200.26 192.55 194.84 38542100 194.84 3 AAPL 2007-01-03 86.29 86.58 81.9 83.8 44225700 83.8 4 AAPL 2006-01-03 72.38 74.75 72.25 74.75 28829800 74.75 ...

Importing strings containing Hive delimiters

You will likely have downstream processing issues if you are importing columns that can contain any of Hive's delimiters ( \n, \r and \01 characters). You have two options in such cases: either specify --hive-drop-import-delims, which will remove conflicting characters as part of the import, or specify --hive-delims-replacement, which will replace them with a different character.

If the Hive table already exists, the data will be appended to the existing table. If this is not the desired behavior, you can use the --hive-overwrite argument to indicate that the existing table should be replaced with the imported data.

Data in Hive can also be compressed. Since the LZOP compression codec is the only splittable codec[2] in Hadoop, it is the codec that should be used for Hive compression. The following example shows how to use the --hive-overwrite in conjunction with enabling LZOP compression. For this to work, you'll need to have built and installed LZOP on your cluster because it isn't bundled with Hadoop (or CDH) by default.

$ hive hive> drop table stocks; $ hadoop fs -rmr stocks $ sqoop --options-file ~/.sqoop_import_options.txt \ --hive-import \ --hive-overwrite \ --compress \ --compression-codec com.hadoop.compression.lzo.LzopCodec \ --connect jdbc:mysql://localhost/sqoop_test \ --table stocks

Finally, you can use the --hive-partition-key and the --hive-partition-value to create different Hive partitions based on the value of a column being imported. For example, if you want to partition your input by date, you would do the following:

$ hive hive> drop table stocks; $ hadoop fs -rmr stocks $ read -d '' query << "EOF" SELECT id, quote_date, open_price FROM stocks WHERE symbol = "AAPL" AND $CONDITIONS EOF $ sqoop --options-file ~/.sqoop_import_options.txt \ --query "$query" \ --split-by id \ --hive-import \ --hive-table stocks \ --hive-overwrite \ --hive-partition-key symbol \ --hive-partition-value "AAPL" \ --connect jdbc:mysql://localhost/sqoop_test \ --target-dir stocks $ fs -lsr /user/hive/warehouse /user/hive/warehouse/stocks/symbol=AAPL/part-m-00000 /user/hive/warehouse/stocks/symbol=AAPL/part-m-00001 ...

Now, the previous example is not optimal by any means. Ideally, a single import would be able to create multiple Hive partitions. Because we're limited to specifying a single key and value, we'd need to run the import once per unique partition value, which is laborious. We would be better off importing into a non-partitioned Hive table and then retroactively creating partitions on the table after it had been loaded.

Also, the SQL query that we supply to Sqoop must also take care of filtering out the results, such that only those that match the partition are included. In other words, it would have been useful if Sqoop would have updated the WHERE clause with symbol = "AAPL" rather than having to do this ourselves.


Obviously, for Sqoop to work, your Hadoop cluster nodes need to have access to the MySQL database. A common source of error is either misconfiguration or lack of connectivity from the Hadoop nodes. It's probably wise to log onto one of the Hadoop nodes and attempt to connect to the MySQL server using the MySQL client, and/or attempt access with the mysqldump utility (if using a fast connector).

Another important note when using a fast connector is that it is assumed that mysqldump is installed on each Hadoop node and is in the PATH of the user running the map tasks.

Here are some other Manning titles you might be interested in:

Hadoop Sqoop

Hadoop in Action

Chuck Lam

Hadoop Sqoop

Mahout in Action

Sean Owen, Robin Anil, Ted Dunning, and Ellen Friedman

Hadoop Sqoop

Tika in Action

Chris A. Mattmann and Jukka L. Zitting

Last updated: June 27, 2012

[1] By default, Sqoop runs with four mappers. The number of mappers can be controlled with the --num-mappers argument.

[2] bzip2 is also a splittable compression codec which can be used in Hadoop, but its write performance is so poor that in practice it is rarely used.

Comment and Contribute






(Maximum characters: 1200). You have 1200 characters left.



We have made updates to our Privacy Policy to reflect the implementation of the General Data Protection Regulation.
Thanks for your registration, follow us on our social networks to keep up-to-date