Skip to main content

Sqoop - Importing MySql data to/from HDFS

#Import Database Table to HDFS

1. warehouse-dir
warehouse-dir import is for importing data by creating a directory with a name of the table to the path specified and put data into it.

> sqoop-import --connect jdbc:mysql://<mysql-URL>:<port>/<DBName> \
--username <USERNAME> --password <PASSWORD> \
--table <table_name> --warehouse-dir <HDFS dir path>

2. target-dir
target-dir import is for importing data to the specified path directly.

> sqoop-import --connect jdbc:mysql://<mysql-URL>:<port>/<DBName> \
--username <USERNAME> --password <PASSWORD> \
--table <table_name> --target-dir <HDFS dir path>


Make sure that the target directory should not exist. If exist then It can be removed through
> hadoop fs -rm -R <dir-name>

OR

append sqoop import command with
--delete-target-dir

NOTE: If your DB table doesn’t have a primary key then append either of the following commands for import.
-m 1 or --split-by <column-name>

Where -m <n> is specified the number of mapper tasks which can run in parallel. It is necessary to specify because import will run with parallel processing. Sqoop internal structure is dividing import task into small tasks (default 4 subtasks) and give data to each mapper tasks by getting information of min to max primary key value and divides this data in small chunks. For eg. for 1-99 records. Internally it divides task into 4 mapper tasks with primary key data 1-25, 26-50, 51-75, 76-99. Now if our table doesn’t have primary key then it is required to run task in single thread or giving split-by command.


#Append Database Table to HDFS file

> sqoop-import --connect jdbc:mysql://<mysql-URL>:<port>/<DBName> \
--username <USERNAME> --password <PASSWORD> \
--table <table_name> --target-dir <HDFS dir path> --append

#import with Split by with numeric / non-numeric column

Split by command is required when a table does not have a primary key or we want to split data into mapper task by different column.
> sqoop-import \
[-Dorg.apache.sqoop.splitter.allow_text_splitter = true] \
--connect jdbc:mysql://<mysql-URL>:<port>/<DBName> \
--username <USERNAME> --password <PASSWORD> \
--table <table_name> --target-dir <HDFS dir path> \
--split-by <column-name>

Note: -Dorg.apache.sqoop.splitter.allow_text_splitter = true is optional. It is required when we want to split by text column instead of numeric column

#Auto Reset import with one mapper if no primary key in a table

Append command –autoreset-to-one-mapper to automate checking that table has primary key or not. If no then it will reset mapper task to 1.
Use it instead of -m 1 or split by. Don’t use it with split by command.
> sqoop-import --connect jdbc:mysql://<mysql-URL>:<port>/<DBName> \
--username <USERNAME> --password <PASSWORD> \
--table <table_name> --target-dir <HDFS dir path> \
--autoreset-to-one-mapper

#Import File formats.
Import can be done in 4 file format.
  1. Avro
    > --as-avrodatafile
  2. Parquet
    > --as-parquetfile
  3. Sequence
    > --as -sequencefile
  4. Text (Default)
    > --as-textfile

#Import with compression

Compression can be done in many formats. You can use any of compression codec which is mentioned in core-site.xml.

--compress or -z can be used to enable compression

> sqoop-import --connect jdbc:mysql://<mysql-URL>:<port>/<DBName> \
--username <USERNAME> --password <PASSWORD> \
--table <table_name> --target-dir <HDFS dir path> \
--compress \
[--compression-codec org.apache.sqoop.hadoop.io.compress.SnappyCodec]

--compress-codec is optional for compression. By default gzip compression codec is enabled but you can specify other compression codecs also by adding following command.
--compression-codec = <compression codec>
Note: Codec should be placed in core-site.xml.

#Import with boundary query
This command is useful when we want to import some partial data by giving min and max primary query value.

> sqoop-import --connect jdbc:mysql://<mysql-URL>:<port>/<DBName> \
--username <USERNAME> --password <PASSWORD> \
--table <table_name> --target-dir <HDFS dir path> \
--boundary-query “select 100,125”

So it will import only 25 records from 100 to 125.

#Import with Transformation
To import with transforming table to specific fields by using the following command.

> sqoop-import --connect jdbc:mysql://<mysql-URL>:<port>/<DBName> \
--username <USERNAME> --password <PASSWORD> \
--table <table_name> --target-dir <HDFS dir path> \
--columns <col1>,<col2>,<col3>

where co1, col2 col3 is column names from the table. It will import data with only this column data.

#import with SQL query
To import with SQL query then use the following command.

> sqoop-import --connect jdbc:mysql://<mysql-URL>:<port>/<DBName> \
--username <USERNAME> --password <PASSWORD> \
--target-dir <HDFS dir path> \
--query “<YOUR SQL QUERY>”
--split-by <column>

Note: you can’t use –table and –column command with –query command. --split-by is necessary because importing is in parallel. You can also use -m 1 instead of –split-by

#import with custom null string or custom null non-string
use --null-string <string-value> for string column having null values and will be imported by replacing null value to specified string value.

use --null-non-string <string-value> for a non-string column having null values data and will be imported by replacing null value to specified string value.




#import by changing delimeters

--enclosed-by <char> Sets a required field enclosing character
--escaped-by <char> Sets the escape character
--fields-terminated-by <char> Sets the field separator character
--lines-terminated-by <char> Sets the end-of-line character
--mysql-delimiters Uses MySQL’s default delimiter set: fields: , lines: \n escaped-by: \ optionally-enclosed-by: '
--optionally-enclosed-by <char> Sets a field enclosing character

Note: If we specify multiple char in argument then only first will be consider. You can also specify ascii char as ‘\u000’


#import with --table and –where instead of --query
--table <table-name>
--where <condition>

#Incremental import
--check-column <column-name> It is column which will check while importing
--incremental <mode> Mode is either append / lastmodified
--last-value <value> Last value is value of check column from which we start to import of value

#import-all-tables
This command is used to import all the mysql tables.
Couple of things makes it different from import.
  1. You must have to use --warehouse-dir. You can’t use --target-dir.
  2. You face some issue if some table have no primary key. So it is recommended to use autoreset-to-one-mapper.
  3. --table, --query, --cols,--where can’t be use with this command.
  4. You can --exclude-tables <table1,table2> to exclude some of table from import.
> sqoop-import --connect jdbc:mysql://<mysql-URL>:<port>/<DBName> \
--username <USERNAME> --password <PASSWORD> \
--warehouse-dir <HDFS dir path> \
[--autoreset-to-one-mapper]

Comments

Popular posts from this blog

AWS IOT Thing Job

AWS IOT Thing Job AWS Iot Thing Job Creation, new job notification, start job and update the job after downloading firmware through JAVA SDK with UI in JAVAFX | Presigned S3 URL creation This Application is made for firmware download. Refer to this GIT repository:  AWS IOT POC A repository contains 3 projects: Aws-Pre-Signed-Url-Generation: To generate presigned url and use it into job document. NOTE: AWS CLI should be configured Iot-Create-Thing-Job-App: To create iot thing job with UI. NOTE: Access key and secret key should be mentioned in aws-iot-sdk-samples.properties Iot-Start-Update-Thing-Job-App: To get notification for new job and to start job and then get job document from aws. After getting thing job document, it will download firmware zip from mention url and update the status of job to SUCCEDED or FAILED. NOTE: aws-iot-sdk-samples.properties files properties should be mention as per your aws account. JOB Document: sample-job-document.json { "ope

AWS Kinesis - Stream, Firehose, Analytics Overview

AWS Kinesis: AWS Kinesis is managed alternative of Apache Kafka. It can be used for big data real-time stream processing. It can be used for applications logs, metrics, forecast data, IoT. It can be used for streaming processing framework like Spark, NiFi, etc.   Kinesis Capabilities: Kinesis Streams : Streaming data ingest at scale with low latency. It is a data stream. Kinesis Analytics : To perform analytics on real-time streaming data using SQL. You can filter or aggregate data in real time. Kinesis Firehose : To load streams of data into S3, Redshift, Splunk or Elastic Search. It is a delivery stream. Kinesis Data Streams : Streams are divided into shards. To scale up application we can update shard configuration by increasing number of shards. By default shard's data can be retained for 1 Day but you can extend it for 7 days. Multiple application can use same stream. Real-time processing of data with a scale of throughput. Record size should not

AWS IOT JITR (Just in Time registration) with Thing and Policy creation using JAVA

AWS IOT JITR with Thing and Policy creation using JAVA. This POC will provide Just In Time Registration (JITR) of custom certificate and Thing creation with connect policy for AWS IOT Devices. You just need to add name of thing in common name while creation of device certificate and thing will be created with attached policy & certificate and common name as thing name. Project Overview: Get certificate details from certificate id. Parse certificate details and get common name from certificate. Creates IOT policy having action of connect. Creates IOT thing with name from certificate common name. Attach policy and thing to certificate. Activate Certificate. Now your device can connect to AWS using this custom certificate. Step for JITR & Thing creation Create CA Certificate: openssl genrsa -out CACertificate.key 2048 openssl req -x509 -new -nodes -key CACertificate.key -sha256 -days 365 -out CACertificate.pem Enter necessary details like city, country, et