Embulk를 이용한 대량의 데이터 로드
1. Embulk란?
Bulk Data Loader로써 다양한 database, 스토리지, 파일, 클라우드 서비스를 지원한다.
2. Embulk 특징
Input file format을 자동으로 인식
병렬, 분산 수행 가능
Transaction Control
Resuming
RubyGem 기반임
Input file format을 자동으로 인식
병렬, 분산 수행 가능
Transaction Control
Resuming
RubyGem 기반임
3. Installation
sudo apt install default-jre
curl --create-dirs -o ~/.embulk/bin/embulk -L "https://dl.embulk.org/embulk-latest.jar"
chmod +x ~/.embulk/bin/embulk
echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
source ~/.bashrc
4. Test
embulk example ./try1
embulk guess ./try1/seed.yml -o config.yml
embulk preview config.yml
embulk run config.yml
5. Plugins
Embulk는 다양한 Plugin을 제공한다. 상세 내용은 아래를 참조한다.
http://www.embulk.org/plugins/
Input
https://github.com/medjed/embulk-input-bigquery
https://github.com/jo8937/embulk-input-bigquery_extract_files
https://github.com/embulk/embulk-input-gcs
Output
https://github.com/embulk/embulk-output-bigquery
https://github.com/embulk/embulk-output-jdbc
https://github.com/embulk/embulk-output-gcs
Plugin 설치는 embulk gem install 명령을 통해 설치 할 수 있다.
예) embulk gem install embulk-input-gcs
Plugin 검색은 아래 명령어로 찾을 수 있다
예) embulk gem search -rd embulk-output
6. Demo
시나리오 1 : Cloud SQL(MySQL) → Cloud Storage(GCS) → BigQuery
시나리오 1 : Cloud SQL(MySQL) → Cloud Storage(GCS) → BigQuery
시나리오 2 : BigQuery → Cloud Storage(GCS) → Cloud SQL(MySQL)
embulk gem install embulk-input-bigquery
Embul gem install embulk-output-gcs
embulk gem install embulk-input-gcs
embulk gem install embulk-output-mysql
Embulk를 수행하기 위해서는 config.yml을 생성해야 한다. 우선 두단계에 나누어서 BigQuery(Input), Cloud Storage(Output)으로 BigQuery에 있는 데이터를 가져와서 Cloud Storage에 저장한다.
이에 해당되는 config.yml은 아래와 같다.
in:
type: bigquery
project: '<Google_Project_ID>'
keyfile: '<Service_Account_File_Location_JSON_TYPE>'
sql: 'SELECT stn FROM `ods_svc1.customers` where temp < -100'
# columns:
# - {name: stn, type: string}
# - {name: temp, type: float}
#max: 10
out:
type: gcs
bucket: embulk_temp
path_prefix: logs/out
file_ext: .csv
auth_method: json_key
json_keyfile: '../SamsungMax.json'
formatter:
type: csv
encoding: UTF-8
현재 BigQuery에는 ods_svc1이라는 Dataset과 customers라는 Table이 존재한다. 데이터 양이 많기 때문에 테스트 용도로 적은 데이터만 추출한다. 그리고 Output에는 저장할 Bucket을 정의하고 파일의 추출 단위 역시 어떤 포맷으로 할지, Service Account json 파일 위치 등을 저장한다. path_prefix는 결과가 나오는 버킷 안에 생성되는 패턴이다. 예를 들어 path_prefix: logs/out 라고 선언하면 수행 결과가 bucket안에 아래와 같이 생성된다.
수행은 embulk run config.yml로 수행하면 된다.
수행한 결과는 아래와 같다.
$ embulk run bigquery-to-gcs.yml
2018-04-23 16:45:25.711 +0900: Embulk v0.9.7
2018-04-23 16:45:26.548 +0900 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
2018-04-23 16:45:30.122 +0900 [INFO] (main): Gem's home and path are set by default: "/home/thkang0/.embulk/lib/gems"
2018-04-23 16:45:31.760 +0900 [INFO] (main): Started Embulk v0.9.7
2018-04-23 16:45:37.706 +0900 [INFO] (0001:transaction): Loaded plugin embulk-input-bigquery (0.0.4)
2018-04-23 16:45:37.854 +0900 [INFO] (0001:transaction): Loaded plugin embulk-output-gcs (0.4.1)
2018-04-23 16:45:39.781 +0900 [INFO] (0001:transaction): determine columns using the getQueryResults API instead of the config.yml
2018-04-23 16:45:41.242 +0900 [INFO] (0001:transaction): waiting for the query job to complete to get schema from query results
2018-04-23 16:45:42.126 +0900 [INFO] (0001:transaction): completed: job_id=job_j5ZOzjRw4ZT2nzfR8tElUb8jps68
2018-04-23 16:45:43.785 +0900 [INFO] (0001:transaction): determined columns: [{"name"=>"stn", "type"=>:string}]
2018-04-23 16:45:43.835 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=4 / output tasks 2 = input tasks 1 * 2
2018-04-23 16:45:43.988 +0900 [INFO] (0001:transaction): {done: 0 / 1, running: 0}
2018-04-23 16:45:50.010 +0900 [INFO] (pool-3-thread-1): Uploading 'embulk_temp/logs/out.000.01.csv'
2018-04-23 16:45:51.725 +0900 [INFO] (pool-3-thread-1): Local Hash(MD5): Hi+b4F8s4GwRdeZNPaB5ag== / Remote Hash(MD5): Hi+b4F8s4GwRdeZNPaB5ag==
2018-04-23 16:45:51.726 +0900 [INFO] (0016:task-0000): Uploaded 'embulk_temp/logs/out.000.01.csv' to 429bytes
2018-04-23 16:45:51.727 +0900 [INFO] (pool-4-thread-1): Uploading 'embulk_temp/logs/out.001.01.csv'
2018-04-23 16:45:53.581 +0900 [INFO] (pool-4-thread-1): Local Hash(MD5): 6scwcnrokvq/P1BtCip14Q== / Remote Hash(MD5): 6scwcnrokvq/P1BtCip14Q==
2018-04-23 16:45:53.582 +0900 [INFO] (0016:task-0000): Uploaded 'embulk_temp/logs/out.001.01.csv' to 5bytes
2018-04-23 16:45:53.611 +0900 [INFO] (0001:transaction): {done: 1 / 1, running: 0}
2018-04-23 16:45:53.660 +0900 [INFO] (main): Committed.
2018-04-23 16:45:53.660 +0900 [INFO] (main): Next config diff: {"in":{},"out":{}}
그리고 저장된 csv를 Cloud SQL에 적재를 할 것이다.
적재 역시 embulk를 사용하여 적재를 하는데, 우선 GCP상에서 Cloud SQL Instance를 생성하고, (테스트 용도이기 때문에 제일 적은 사이즈로 생성했다), Database이름은 BigQuery와 동일하게 ods_svc1으로 생성을 했다. 그리고 접속하기 위한 계정을 하나 발급하고 접속할 곳에서의 IP를 Authorization에 등록한다.
그리고 나서 다시 config.yml파일을 생성한다.
$ cat gcs-to-mysql.yml
in:
type: gcs
bucket: embulk_temp
path_prefix: 'logs/out.'
auth_method: json_key
json_keyfile: '<Service_Account_Location_JSON>'
application_name: Embulk Test
parser:
charset: UTF-8
newline: CRLF
type: csv
delimiter: ','
quote: '"'
header_line: true
columns:
- {name: stn, type: string}
out:
# type: stdout
type: mysql
host: <MySQL_IP_Address>
user: username
password: 'password'
database: ods_svc1
table: customers
mode: insert
Input은 GCS, Output은 Cloud SQL이 되며, Table은 생성할 필요 없이 자동으로 생성해서 데이터를 적재한다.
수행을 하면 아래와 같이 정상적으로 동작을 한다.
$ embulk run gcs-to-mysql.yml
2018-04-23 17:43:10.052 +0900: Embulk v0.9.7
2018-04-23 17:43:10.862 +0900 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
2018-04-23 17:43:13.829 +0900 [INFO] (main): Gem's home and path are set by default: "/home/thkang0/.embulk/lib/gems"
2018-04-23 17:43:15.210 +0900 [INFO] (main): Started Embulk v0.9.7
2018-04-23 17:43:15.461 +0900 [INFO] (0001:transaction): Loaded plugin embulk-input-gcs (0.2.7)
2018-04-23 17:43:15.514 +0900 [INFO] (0001:transaction): Loaded plugin embulk-output-mysql (0.8.0)
2018-04-23 17:43:18.038 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=4 / output tasks 2 = input tasks 1 * 2
2018-04-23 17:43:18.084 +0900 [INFO] (0001:transaction): JDBC Driver = /home/thkang0/.embulk/lib/gems/gems/embulk-output-mysql-0.8.0/default_jdbc_driver/mysql-connector-java-5.1.44.jar
2018-04-23 17:43:18.095 +0900 [INFO] (0001:transaction): Connecting to jdbc:mysql://35.188.199.103:3306/ods_svc1 options {user=bespin, password=***, tcpKeepAlive=true, useSSL=false, useCompression=true, rewriteBatchedStatements=true, connectTimeout=300000, socketTimeout=1800000}
2018-04-23 17:43:19.634 +0900 [INFO] (0001:transaction): Using JDBC Driver mysql-connector-java-5.1.44 ( Revision: b3cda4f864902ffdde495b9df93937c3e20009be )
2018-04-23 17:43:19.821 +0900 [WARN] (0001:transaction): The client timezone(Asia/Seoul) is different from the server timezone(UTC). The plugin will store wrong datetime values.
2018-04-23 17:43:19.822 +0900 [WARN] (0001:transaction): You may need to set options `useLegacyDatetimeCode` and `serverTimezone`
2018-04-23 17:43:19.822 +0900 [WARN] (0001:transaction): Example. `options: { useLegacyDatetimeCode: false, serverTimezone: UTC }`
2018-04-23 17:43:19.822 +0900 [WARN] (0001:transaction): The plugin will set `useLegacyDatetimeCode=false` by default in future.
2018-04-23 17:43:19.822 +0900 [INFO] (0001:transaction): Using insert mode
2018-04-23 17:43:20.495 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE `customers_00000162f1abf22b_embulk000` (`stn` TEXT)
2018-04-23 17:43:20.712 +0900 [INFO] (0001:transaction): > 0.22 seconds
2018-04-23 17:43:20.900 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE `customers_00000162f1abf22b_embulk001` (`stn` TEXT)
2018-04-23 17:43:21.094 +0900 [INFO] (0001:transaction): > 0.19 seconds
2018-04-23 17:43:22.183 +0900 [INFO] (0001:transaction): {done: 0 / 1, running: 0}
2018-04-23 17:43:22.273 +0900 [INFO] (0018:task-0000): Connecting to jdbc:mysql://35.188.199.103:3306/ods_svc1 options {user=bespin, password=***, tcpKeepAlive=true, useSSL=false, useCompression=true, rewriteBatchedStatements=true, connectTimeout=300000, socketTimeout=1800000}
2018-04-23 17:43:23.482 +0900 [INFO] (0018:task-0000): Prepared SQL: INSERT INTO `customers_00000162f1abf22b_embulk000` (`stn`) VALUES (?)
2018-04-23 17:43:23.520 +0900 [INFO] (0018:task-0000): Connecting to jdbc:mysql://35.188.199.103:3306/ods_svc1 options {user=bespin, password=***, tcpKeepAlive=true, useSSL=false, useCompression=true, rewriteBatchedStatements=true, connectTimeout=300000, socketTimeout=1800000}
2018-04-23 17:43:24.766 +0900 [INFO] (0018:task-0000): Prepared SQL: INSERT INTO `customers_00000162f1abf22b_embulk001` (`stn`) VALUES (?)
2018-04-23 17:43:26.952 +0900 [INFO] (0018:task-0000): Loading 53 rows
2018-04-23 17:43:27.283 +0900 [INFO] (0018:task-0000): > 0.33 seconds (loaded 53 rows in total)
2018-04-23 17:43:27.285 +0900 [INFO] (0001:transaction): {done: 1 / 1, running: 0}
2018-04-23 17:43:27.286 +0900 [INFO] (0001:transaction): Connecting to jdbc:mysql://35.188.199.103:3306/ods_svc1 options {user=bespin, password=***, tcpKeepAlive=true, useSSL=false, useCompression=true, rewriteBatchedStatements=true, connectTimeout=300000, socketTimeout=2700000}
2018-04-23 17:43:28.499 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS `customers` (`stn` TEXT)
2018-04-23 17:43:28.685 +0900 [INFO] (0001:transaction): > 0.19 seconds
2018-04-23 17:43:28.862 +0900 [INFO] (0001:transaction): SQL: INSERT INTO `customers` (`stn`) SELECT `stn` FROM `customers_00000162f1abf22b_embulk000` UNION ALL SELECT `stn` FROM `customers_00000162f1abf22b_embulk001`
2018-04-23 17:43:29.029 +0900 [INFO] (0001:transaction): > 0.17 seconds (53 rows)
2018-04-23 17:43:29.402 +0900 [INFO] (0001:transaction): Connecting to jdbc:mysql://35.188.199.103:3306/ods_svc1 options {user=bespin, password=***, tcpKeepAlive=true, useSSL=false, useCompression=true, rewriteBatchedStatements=true, connectTimeout=300000, socketTimeout=1800000}
2018-04-23 17:43:30.602 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS `customers_00000162f1abf22b_embulk000`
2018-04-23 17:43:30.790 +0900 [INFO] (0001:transaction): > 0.19 seconds
2018-04-23 17:43:30.791 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS `customers_00000162f1abf22b_embulk001`
2018-04-23 17:43:30.974 +0900 [INFO] (0001:transaction): > 0.18 seconds
2018-04-23 17:43:30.977 +0900 [INFO] (main): Committed.
2018-04-23 17:43:30.978 +0900 [INFO] (main): Next config diff: {"in":{"last_path":"logs/out.000.01.csv"},"out":{}}
적재 후 실제 Database에 들어가서 조회를 해보면 정상적으로 데이터가 들어가 있는 것을 확인 할 수 있다.
mysql> show databases;
+--------------------+
| Database |
+--------------------+
| information_schema |
| mysql |
| ods_svc1 |
| performance_schema |
+--------------------+
4 rows in set (0.16 sec)
mysql> use ods_svc1
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
mysql> show tables
-> ;
+--------------------+
| Tables_in_ods_svc1 |
+--------------------+
| customers |
+--------------------+
1 row in set (0.16 sec)
mysql> select * from customers;
+--------+
| stn |
+--------+
| 892080 |
| 892080 |
| 892080 |
| 892080 |
| 896060 |
| 896060 |
| 896060 |
| 896060 |
| 896060 |
| 896060 |
| 896060 |
| 896060 |
| 896060 |
| 896060 |
| 896060 |
| 896060 |
| 896060 |
| 896060 |
| 896060 |
| 896060 |
| 896060 |
| 896060 |
| 896060 |
| 896060 |
| 896060 |
| 896060 |
| 896060 |
| 896060 |
| 896060 |
| 896060 |
| 897990 |
| 898280 |
| 898280 |
| 898280 |
| 898280 |
| 898280 |
| 898280 |
| 898280 |
| 898280 |
| 898280 |
| 898280 |
| 898280 |
| 898280 |
| 898280 |
| 898720 |
| 898720 |
| 898720 |
| 898720 |
| 898720 |
| 898720 |
| 898720 |
| 898720 |
| 898720 |
+--------+
53 rows in set (0.16 sec)
7. ETC
embulk run config.yml을 수행할 때 아래와 같은 에러가 나오는데 이 경우는 아래 링크 참조
http://www.unknownengineer.net/entry/2017/12/15/100000
(이유는 The latest ca-certificates update threw out some Equifax and VeriSign certificates )
Caused by: org.jruby.exceptions.RaiseException: (SSLError) certificate verify failed
at RUBY.block in call(/home/thkang0/.embulk/lib/gems/gems/faraday-0.15.0/lib/faraday/adapter/net_http.rb:41)
at RUBY.with_net_http_connection(/home/thkang0/.embulk/lib/gems/gems/faraday-0.15.0/lib/faraday/adapter/net_http.rb:87)
at RUBY.call(/home/thkang0/.embulk/lib/gems/gems/faraday-0.15.0/lib/faraday/adapter/net_http.rb:33)
at RUBY.call(/home/thkang0/.embulk/lib/gems/gems/faraday-0.15.0/lib/faraday/request/url_encoded.rb:15)
at RUBY.build_response(/home/thkang0/.embulk/lib/gems/gems/faraday-0.15.0/lib/faraday/rack_builder.rb:143)
at RUBY.run_request(/home/thkang0/.embulk/lib/gems/gems/faraday-0.15.0/lib/faraday/connection.rb:387)
at RUBY.post(/home/thkang0/.embulk/lib/gems/gems/faraday-0.15.0/lib/faraday/connection.rb:175)
at RUBY.fetch_access_token(/home/thkang0/.embulk/lib/gems/gems/signet-0.8.1/lib/signet/oauth_2/client.rb:967)
at RUBY.fetch_access_token!(/home/thkang0/.embulk/lib/gems/gems/signet-0.8.1/lib/signet/oauth_2/client.rb:1005)
at RUBY.fetch_access_token!(/home/thkang0/.embulk/lib/gems/gems/googleauth-0.6.2/lib/googleauth/signet.rb:69)
at RUBY.initialize(/home/thkang0/.embulk/lib/gems/gems/googleauth-0.6.2/lib/googleauth/credentials.rb:71)
at RUBY.new(/home/thkang0/.embulk/lib/gems/gems/google-cloud-bigquery-0.30.0/lib/google/cloud/bigquery.rb:534)
at RUBY.transaction(/home/thkang0/.embulk/lib/gems/gems/embulk-input-bigquery-0.0.4/lib/embulk/input/bigquery.rb:53)
at RUBY.transaction(uri:classloader:/gems/embulk-0.9.7-java/lib/embulk/input_plugin.rb:58)
Error: org.jruby.exceptions.RaiseException: (SSLError) certificate verify failed
간단하게 ubuntu에서는 아래와 같은 명령어를 수행하면 된다
sudo apt install ca-certificates=20160104ubuntu1
그리고 /etc/ssl/certs/Equifax_Secure_Certificate_Authority.pem 을 생성하여 복사한다.
-----BEGIN CERTIFICATE-----
MIIDIDCCAomgAwIBAgIENd70zzANBgkqhkiG9w0BAQUFADBOMQswCQYDVQQGEwJV
UzEQMA4GA1UEChMHRXF1aWZheDEtMCsGA1UECxMkRXF1aWZheCBTZWN1cmUgQ2Vy
dGlmaWNhdGUgQXV0aG9yaXR5MB4XDTk4MDgyMjE2NDE1MVoXDTE4MDgyMjE2NDE1
MVowTjELMAkGA1UEBhMCVVMxEDAOBgNVBAoTB0VxdWlmYXgxLTArBgNVBAsTJEVx
dWlmYXggU2VjdXJlIENlcnRpZmljYXRlIEF1dGhvcml0eTCBnzANBgkqhkiG9w0B
AQEFAAOBjQAwgYkCgYEAwV2xWGcIYu6gmi0fCG2RFGiYCh7+2gRvE4RiIcPRfM6f
BeC4AfBONOziipUEZKzxa1NfBbPLZ4C/QgKO/t0BCezhABRP/PvwDN1Dulsr4R+A
cJkVV5MW8Q+XarfCaCMczE1ZMKxRHjuvK9buY0V7xdlfUNLjUA86iOe/FP3gx7kC
AwEAAaOCAQkwggEFMHAGA1UdHwRpMGcwZaBjoGGkXzBdMQswCQYDVQQGEwJVUzEQ
MA4GA1UEChMHRXF1aWZheDEtMCsGA1UECxMkRXF1aWZheCBTZWN1cmUgQ2VydGlm
aWNhdGUgQXV0aG9yaXR5MQ0wCwYDVQQDEwRDUkwxMBoGA1UdEAQTMBGBDzIwMTgw
ODIyMTY0MTUxWjALBgNVHQ8EBAMCAQYwHwYDVR0jBBgwFoAUSOZo+SvSspXXR9gj
IBBPM5iQn9QwHQYDVR0OBBYEFEjmaPkr0rKV10fYIyAQTzOYkJ/UMAwGA1UdEwQF
MAMBAf8wGgYJKoZIhvZ9B0EABA0wCxsFVjMuMGMDAgbAMA0GCSqGSIb3DQEBBQUA
A4GBAFjOKer89961zgK5F7WF0bnj4JXMJTENAKaSbn+2kmOeUJXRmm/kEd5jhW6Y
7qj/WsjTVbJmcVfewCHrPSqnI0kBBIZCe/zuf6IWUrVnZ9NA2zsmWLIodz2uFHdh
1voqZiegDfqnc1zqcPGUIWVEX/r87yloqaKHee9570+sB3c4
-----END CERTIFICATE-----
그리고 나서 sudo update-ca-certificates --fresh 를 수행해서 업데이트 한다.
MIIDIDCCAomgAwIBAgIENd70zzANBgkqhkiG9w0BAQUFADBOMQswCQYDVQQGEwJV
UzEQMA4GA1UEChMHRXF1aWZheDEtMCsGA1UECxMkRXF1aWZheCBTZWN1cmUgQ2Vy
dGlmaWNhdGUgQXV0aG9yaXR5MB4XDTk4MDgyMjE2NDE1MVoXDTE4MDgyMjE2NDE1
MVowTjELMAkGA1UEBhMCVVMxEDAOBgNVBAoTB0VxdWlmYXgxLTArBgNVBAsTJEVx
dWlmYXggU2VjdXJlIENlcnRpZmljYXRlIEF1dGhvcml0eTCBnzANBgkqhkiG9w0B
AQEFAAOBjQAwgYkCgYEAwV2xWGcIYu6gmi0fCG2RFGiYCh7+2gRvE4RiIcPRfM6f
BeC4AfBONOziipUEZKzxa1NfBbPLZ4C/QgKO/t0BCezhABRP/PvwDN1Dulsr4R+A
cJkVV5MW8Q+XarfCaCMczE1ZMKxRHjuvK9buY0V7xdlfUNLjUA86iOe/FP3gx7kC
AwEAAaOCAQkwggEFMHAGA1UdHwRpMGcwZaBjoGGkXzBdMQswCQYDVQQGEwJVUzEQ
MA4GA1UEChMHRXF1aWZheDEtMCsGA1UECxMkRXF1aWZheCBTZWN1cmUgQ2VydGlm
aWNhdGUgQXV0aG9yaXR5MQ0wCwYDVQQDEwRDUkwxMBoGA1UdEAQTMBGBDzIwMTgw
ODIyMTY0MTUxWjALBgNVHQ8EBAMCAQYwHwYDVR0jBBgwFoAUSOZo+SvSspXXR9gj
IBBPM5iQn9QwHQYDVR0OBBYEFEjmaPkr0rKV10fYIyAQTzOYkJ/UMAwGA1UdEwQF
MAMBAf8wGgYJKoZIhvZ9B0EABA0wCxsFVjMuMGMDAgbAMA0GCSqGSIb3DQEBBQUA
A4GBAFjOKer89961zgK5F7WF0bnj4JXMJTENAKaSbn+2kmOeUJXRmm/kEd5jhW6Y
7qj/WsjTVbJmcVfewCHrPSqnI0kBBIZCe/zuf6IWUrVnZ9NA2zsmWLIodz2uFHdh
1voqZiegDfqnc1zqcPGUIWVEX/r87yloqaKHee9570+sB3c4
-----END CERTIFICATE-----