Hadoop in Practice: Using Sqoop to Import Data from MySQL

Log data has long been prevalent across all our applications, but with Hadoop came the ability to process that data. Various systems produce log data, from network devices and operating systems to web servers and our own applications. They all offer the potential for valuable insights into how our systems and applications operate as well as how they’re used. What unifies log files is that they tend to be in text form and line-oriented, making them easy to process. In this article based on chapter 2 of Hadoop in Practice, author Alex Holmes shows how Sqoop could be used for relational database ingress.

Sqoop is a relational database import and export system. It was created by Cloudera, and is currently an Apache project in incubation status.

When you perform an import, Sqoop can write to HDFS, Hive, and HBase, and, for exports, it can do the reverse. Importing is broken down into two activities: connecting to the data source to gather some statistics, and then firing off a MapReduce job, which performs the actual import. Figure 1 shows these steps.

Hadoop Sqoop

Figure 1. Sqoop import overview

Sqoop has the notion of connectors, which contain the specialized logic to read and write to external systems. Sqoop comes with two classes of connectors: a common connector for regular reads and writes, and a “fast” connector that uses database-proprietary batch mechanisms for efficient imports. Figure 2 shows these two classes of connectors and the databases that they support.

Hadoop Sqoop

Figure 2. Sqoop connectors

In this technique, we will look at how to use Sqoop as a simple mechanism to bring relational data into Hadoop clusters. We’ll walk through the process of importing data from MySQL into Sqoop. We’ll also cover methods for using the regular connector, as well as how to do bulk imports using the fast connector.

Problem

You want to load relational data into your cluster and ensure your writes are efficient and at the same time idempotent.

Solution

Before you can get started, you’ll need to install Sqoop. The first Sqoop command will be a basic import, where you’ll specify connection information about your MySQL database, and the table you want to export.

$ sqoop import --username hip_sqoop_user --password password   --connect jdbc:mysql://localhost/sqoop_test --table stocks

MySQL table names

MySQL table names in Linux are case sensitive. Make sure that the table name you supply in the Sqoop commands is also case sensitive.

It’s generally not a good idea to have database passwords as arguments to a script because it allows other users to see your password using commands, such as ps, when the import is occurring. It’ll also enter your shell history file. A best practice to follow is to write the password in Sqoop’s option file and ensure that only you have read permissions on it.

$ cat > ~/.sqoop_import_options.txt << EOFimport--usernamehip_sqoop_user--passwordpasswordEOF$ chmod 700 ~/.sqoop_import_options.txt

Sqoop also supports a -P option, which when present will result in you being prompted for the password.

Run the command again, this time specifying the options file you've created.

$ hadoop fs -rmr stocks$ sqoop --options-file ~/.sqoop_import_options.txt   --connect jdbc:mysql://localhost/sqoop_test --table stocks

You may wonder why you had to delete the stocks directory in HDFS before re-running the import command. Sqoop by default uses the table name as the destination in HDFS for the MapReduce job that it launches to perform the import. If you run the same command again, the MapReduce job will fail because the directory already exists. Let's take a look at the stocks directory in HDFS.

$ hadoop fs -ls stocks624 2011-11-24 11:07 /user/aholmes/stocks/part-m-00000644 2011-11-24 11:07 /user/aholmes/stocks/part-m-00001642 2011-11-24 11:07 /user/aholmes/stocks/part-m-00002686 2011-11-24 11:07 /user/aholmes/stocks/part-m-00003$ hadoop fs -cat stocks/part-m-000001,AAPL,2009-01-02,85.88,91.04,85.16,90.75,26643400,90.752,AAPL,2008-01-02,199.27,200.26,192.55,194.84,38542100,194.843,AAPL,2007-01-03,86.29,86.58,81.9,83.8,44225700,83.8...

Import data formats

Sqoop has imported your data as comma-separated text files. It supports a number of other file formats, which can be activated with the arguments listed in table 1.

Table 1. mSqoop arguments that control the file formats of import commands

Argument

Description

--as-avrodatafile

Data is imported as Avro files.

--as-sequencefile

Data is imported as Sequence Files.

--as-textfile

The default file format, with imported data as CSV text files.

If you are importing large amounts of data, you may want to use a file format such as Avro, which is a compact data format, and use it in conjunction with compression. The following example uses the Snappy compression codec in conjunction with Avro files.

$ hadoop fs -rmr stocks$ sqoop --options-file ~/.sqoop_import_options.txt         --as-avrodatafile         --compress         --compression-codecorg.apache.hadoop.io.compress.SnappyCodec        --connect jdbc:mysql://localhost/sqoop_test         --table stocks

Note that the compression that's supplied on the command line must be defined in the config file under core-site.xml the property io.compression.codecs. The Snappy codec requires you to have the Hadoop native libraries installed.

We can introspect the structure of the Avro file to see how Sqoop has laid out the records by using an Avro dumper tool that we created. Sqoop uses Avro's GenericRecord for record-level storage. If we run our generic Avro dumper utility against the Sqoop-generated files in HDFS, we'll see the following:

$ bin/run.sh com.manning.hip.ch3.avro.AvroGenericFileDumper   stocks/part-m-00000.avro{"id": 1, "symbol": "AAPL","quote_date": "2009-01-02", "open_price": 85.88, "high_price": 91.04,"low_price": 85.16, "close_price": 90.75, "volume": 26643400,"adj_close_price": 90.75}...

Using Sqoop in conjunction with Sequence Files

One of the things that make Sequence Files hard to work with is that there isn't a generic way to access data in the Sequence File. You must have access to the Writable class that was used to write the data. In Sqoop's case, it code-generates this file. This introduces a major problem: if you move to a newer version of Sqoop and that version modifies the code generator, there's a good chance your older, code-generated class won't work with Sequence Files generated with the newer version of Sqoop. You'll either need to migrate all of your old Sequence Files to the new version or somehow maintain them. Due to this restriction, we don't recommend using Sequence Files with Sqoop. If you are looking for more information on how Sequence Files work, run the Sqoop import tool and look at the stocks.java file that is generated within your working directory.

In reality, we'll more likely want to periodically import a subsection of our tables based on a query. But, what if you want to import all of the Apple and Google stocks in 2007 and stick them into a custom HDFS directory? The following listing shows how you would do this with Sqoop.

$ hadoop fs -rmr 2007-stocks$ GLOBIGNORE=*                                                #1$ read -d '' query << "EOF"select * from stockswhere symbol in ("AAPL", "GOOG")  and quote_date between "2007-01-01" AND"2007-12-31"  AND $CONDITIONS                                                #2EOF$ sqoop --options-file ~/.sqoop_import_options.txt   --query "$query"   --split-by id                                                 #3  --target-dir /user/aholmes/2007-stocks   --connect jdbc:mysql://localhost/sqoop_test

#1 -- Bash by default performs globbing, meaning that it'll expand wildcards like "*". We use this command to turn this off so that the next line generates the SQL correctly.

#2 -- Store our query in variable "query." The $CONDITIONS is a Sqoop macro that must be present in the WHERE clause of the query. It is used by Sqoop to substitute LIMIT and OFFSETOFFSET options when issuing MySQL queries.

#3 -- This argument must be supplied so that Sqoop can determine which table column to use for splitting.

The SQL shown in the previous listing can also --query be used to include only a subset of the columns in a table to be imported.

Data splitting

How is Sqoop able to parallelize imports across multiple mappers[1]? In figure 1, we showed how Sqoop's first step is to pull metadata from the database. It inspects the table being imported to determine the primary key and runs a query to determine the lower and upper bounds of the data in the table (shown in figure 3). A somewhat even distribution of data within the minimum and maximum keys is assumed by dividing the delta by the number of mappers. Each mapper is then fed a unique query containing a range of the primary key.

Hadoop Sqoop

Figure 3. Sqoop preprocessing steps to determine query splits

You can configure Sqoop to use a non-primary key with the --split-by argument. This can be useful in situations where the primary key doesn't have an even distribution of values between the min and max values. For large tables, however, you need to be careful that the column specified in --split-by is indexed to ensure optimal import times.

You can use the --boundary-query argument to construct an alternative query to determine the minimum and maximum values.

Incremental imports

You can also perform incremental imports. Sqoop supports two types: "append," which works for numerical data that is incrementing over time, such as auto-increment keys, and "lastmodified," which works on time-stamped data. In both cases, you need to specify the column using --check-column, the mode via the --incremental argument (the value must be either "append" or "lastmodified"), and, finally, the actual value to use to determine the incremental changes, --last-value. Using our example, if we want to import stock data that is newer than January 1, 2005, we would do the following:

$ hadoop fs -rmr stocks$ sqoop --options-file ~/.sqoop_import_options.txt   --check-column "quote_date"   --incremental "lastmodified"   --last-value "2005-01-01"   --connect jdbc:mysql://localhost/sqoop_test   --table stocks...tool.ImportTool: --incremental lastmodifiedtool.ImportTool: --check-column quote_datetool.ImportTool: --last-value 2011-11-24 14:49:56.0tool.ImportTool: (Consider saving this with 'sqoop job --create')...

Sqoop jobs and the metastore

You can see in the command output the last value that was encountered for the increment column. How can we best automate a process that can reuse that value? Sqoop has the notion of a job, which can save this information and reuse it in subsequent executions:

$ sqoop job --create stock_increment -- import --append  --check-column "quote_date"  --incremental "lastmodified"  --last-value "2005-01-01"  --connect jdbc:mysql://localhost/sqoop_test  --username hip_sqoop_user  --table stocks

This merely saves the notion of this command as a job in something called the Sqoop metastore. A Sqoop metastore keeps track of all jobs. By default, the metastore is contained in your home directory under.sqoop and is only used for your own jobs. If you want to share jobs, you would need to install a JDBC-compliant database and use the --meta-connect argument to specify its location when issuing job commands.

The job create command executed in the previous example didn't do anything other than add the job to the metastore. To run the job, you need to explicitly execute it as shown in this listing:

$ sqoop job ?list                                                #1Available jobs:  stock_increment$ sqoop job --exec stock_increment                                                #2$ sqoop job --show stock_increment                                                #3...incremental.last.value = 2011-11-24 15:09:38.0                                                #4...

#1 ? List all jobs in the metastore

#2 ? Executes your job

#3 ? Shows metadata information about your job

#4 ? The metadata includes the last value of your incremental column. This is actually the time that the command was executed, and not the last value seen in the table. If you are using this feature, make sure that the database server and any clients interacting with the server (including the Sqoop client) have their clocks synced with the Network Time Protocol (NTP).

Unfortunately, the --options-file argument, which referred to your local file with your username and password, doesn't work with jobs in Sqoop. The password also can't be specified when creating the job. Sqoop will instead prompt for the password when running the job. To make this work in an automated script, you need to use Expect, a Linux automation tool, to supply the password from a local file when it detects Sqoop prompting for a password. The source of an Expect script that works with Sqoop is on our GitHub at http://goo.gl/yL4KQ.

Fast MySQL imports

What if you want to bypass JDBC altogether and use the "fast" MySQL Sqoop connector for a high-throughput load into HDFS? This approach uses the utility shipped with MySQL to perform the mysqldump load. You must make sure that mysqldump is in the PATH of the user running the MapReduce job. To enable use of the "fast" connector you must specify the --direct argument.

$ hadoop fs -rmr stocks$ sqoop --options-file ~/.sqoop_import_options.txt     --direct     --connect jdbc:mysql://localhost/sqoop_test     --table stocks

What are the disadvantages of the fast connectors? First, only MySQL and PostgreSQL are currently supported. Fast connectors also only work with text output files?specifying Avro or Sequence File as the output format of the import won't work.

Importing to Hive

The final step in this technique is to use Sqoop to import your data into a Hive table. The only difference between an HDFS import and a Hive import is that the Hive import has a post-processing step where the Hive table is created and loaded, as shown in figure 4.

Hadoop Sqoop

Figure 4. Sequence of events of a Sqoop Hive import

When data is loaded into Hive from an HDFS file or directory, such as in the case of Sqoop Hive imports (step 4 in the previous diagram), for the sake of efficiency, Hive moves the directory into its warehouse rather than copying the data (step 5). The HDFS directory that the Sqoop MapReduce job writes to won't exist after the import.

Hive imports are triggered via the --hive-import argument. Just as in the fast connector's case, this option isn't compatible with the --as-avrodatafile and --as-sequencefile options.

$ hadoop fs -rmr stocks$ sqoop --options-file ~/.sqoop_import_options.txt     --hive-import     --connect jdbc:mysql://localhost/sqoop_test     --table stocks$ hivehive> select * from stocks;OK1 AAPL 2009-01-02 85.88 91.04 85.16 90.75 26643400 90.752 AAPL 2008-01-02 199.27 200.26 192.55 194.84 38542100 194.843 AAPL 2007-01-03 86.29 86.58 81.9 83.8 44225700 83.84 AAPL 2006-01-03 72.38 74.75 72.25 74.75 28829800 74.75...

Importing strings containing Hive delimiters

You will likely have downstream processing issues if you are importing columns that can contain any of Hive's delimiters (
,
and 1 characters). You have two options in such cases: either specify --hive-drop-import-delims, which will remove conflicting characters as part of the import, or specify --hive-delims-replacement, which will replace them with a different character.

If the Hive table already exists, the data will be appended to the existing table. If this is not the desired behavior, you can use the --hive-overwrite argument to indicate that the existing table should be replaced with the imported data.

Data in Hive can also be compressed. Since the LZOP compression codec is the only splittable codec[2] in Hadoop, it is the codec that should be used for Hive compression. The following example shows how to use the --hive-overwrite in conjunction with enabling LZOP compression. For this to work, you'll need to have built and installed LZOP on your cluster because it isn't bundled with Hadoop (or CDH) by default.

$ hivehive> drop table stocks;$ hadoop fs -rmr stocks$ sqoop --options-file ~/.sqoop_import_options.txt     --hive-import     --hive-overwrite     --compress     --compression-codec com.hadoop.compression.lzo.LzopCodec     --connect jdbc:mysql://localhost/sqoop_test     --table stocks

Finally, you can use the --hive-partition-key and the --hive-partition-value to create different Hive partitions based on the value of a column being imported. For example, if you want to partition your input by date, you would do the following:

$ hivehive> drop table stocks;$ hadoop fs -rmr stocks$ read -d '' query << "EOF"SELECT id, quote_date, open_priceFROM stocksWHERE symbol = "AAPL" AND $CONDITIONSEOF$ sqoop --options-file ~/.sqoop_import_options.txt     --query "$query"     --split-by id     --hive-import     --hive-table stocks     --hive-overwrite     --hive-partition-key symbol     --hive-partition-value "AAPL"     --connect jdbc:mysql://localhost/sqoop_test     --target-dir stocks$ fs -lsr /user/hive/warehouse/user/hive/warehouse/stocks/symbol=AAPL/part-m-00000/user/hive/warehouse/stocks/symbol=AAPL/part-m-00001...

Now, the previous example is not optimal by any means. Ideally, a single import would be able to create multiple Hive partitions. Because we're limited to specifying a single key and value, we'd need to run the import once per unique partition value, which is laborious. We would be better off importing into a non-partitioned Hive table and then retroactively creating partitions on the table after it had been loaded.

Also, the SQL query that we supply to Sqoop must also take care of filtering out the results, such that only those that match the partition are included. In other words, it would have been useful if Sqoop would have updated the WHERE clause with symbol = "AAPL" rather than having to do this ourselves.

Discussion

Obviously, for Sqoop to work, your Hadoop cluster nodes need to have access to the MySQL database. A common source of error is either misconfiguration or lack of connectivity from the Hadoop nodes. It's probably wise to log onto one of the Hadoop nodes and attempt to connect to the MySQL server using the MySQL client, and/or attempt access with the mysqldump utility (if using a fast connector).

Another important note when using a fast connector is that it is assumed that mysqldump is installed on each Hadoop node and is in the PATH of the user running the map tasks.

Hadoop Sqoop

Hadoop Sqoop

Hadoop Sqoop

Tika in Action

Chris A. Mattmann and Jukka L. Zitting

Last updated: June 27, 2012


[1] By default, Sqoop runs with four mappers. The number of mappers can be controlled with the --num-mappers argument.

[2] bzip2 is also a splittable compression codec which can be used in Hadoop, but its write performance is so poor that in practice it is rarely used.

Share the Post:
Share on facebook
Share on twitter
Share on linkedin

More From DevX