본문 바로가기
Tech/Applications

[GCP]Embulk를 이용한 대량의 데이터 로드 (BigQuery)

by 타이호 2018. 4. 24.
반응형

Embulk를 이용한 대량의 데이터 로드


1. Embulk란?

Bulk Data Loader로써 다양한 database, 스토리지, 파일, 클라우드 서비스를 지원한다.

http://www.embulk.org/docs/


2. Embulk 특징

  • Input file format을 자동으로 인식

  • 병렬, 분산 수행 가능

  • Transaction Control

  • Resuming

  • RubyGem 기반임


3. Installation

sudo apt install default-jre

curl --create-dirs -o ~/.embulk/bin/embulk -L "https://dl.embulk.org/embulk-latest.jar"
chmod +x ~/.embulk/bin/embulk
echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
source ~/.bashrc


4. Test

embulk example ./try1
embulk guess   ./try1/seed.yml -o config.yml
embulk preview config.yml
embulk run     config.yml


5. Plugins

Embulk는 다양한 Plugin을 제공한다. 상세 내용은 아래를 참조한다.

http://www.embulk.org/plugins/

  1. Input

https://github.com/medjed/embulk-input-bigquery

https://github.com/jo8937/embulk-input-bigquery_extract_files

https://github.com/embulk/embulk-input-gcs


  1. Output

https://github.com/embulk/embulk-output-bigquery

https://github.com/embulk/embulk-output-jdbc

https://github.com/embulk/embulk-output-gcs


  1. Plugin 설치는 embulk gem install 명령을 통해 설치 할 수 있다.

예) embulk gem install embulk-input-gcs

Plugin 검색은 아래 명령어로 찾을 수 있다

예) embulk gem search -rd embulk-output




6. Demo

  1. 시나리오 1 : Cloud SQL(MySQL) → Cloud Storage(GCS) → BigQuery

  1. 시나리오 2 : BigQuery → Cloud Storage(GCS) → Cloud SQL(MySQL)

embulk gem install embulk-input-bigquery

Embul gem install embulk-output-gcs

embulk gem install embulk-input-gcs

embulk gem install embulk-output-mysql

Embulk를 수행하기 위해서는 config.yml을 생성해야 한다. 우선 두단계에 나누어서 BigQuery(Input), Cloud Storage(Output)으로 BigQuery에 있는 데이터를 가져와서 Cloud Storage에 저장한다.

이에 해당되는 config.yml은 아래와 같다.


in:

 type: bigquery

 project: '<Google_Project_ID>'

 keyfile: '<Service_Account_File_Location_JSON_TYPE>'

 sql: 'SELECT stn FROM `ods_svc1.customers` where temp < -100'

#  columns:

#    - {name: stn, type: string}

#    - {name: temp, type: float}

 #max: 10

out:

 type: gcs

 bucket: embulk_temp

 path_prefix: logs/out

 file_ext: .csv

 auth_method: json_key

 json_keyfile: '../SamsungMax.json'

 formatter:

   type: csv

   encoding: UTF-8  


현재 BigQuery에는 ods_svc1이라는 Dataset과 customers라는 Table이 존재한다. 데이터 양이 많기 때문에 테스트 용도로 적은 데이터만 추출한다.  그리고 Output에는 저장할 Bucket을 정의하고 파일의 추출 단위 역시 어떤 포맷으로 할지, Service Account json 파일 위치 등을 저장한다. path_prefix는 결과가 나오는 버킷 안에 생성되는 패턴이다. 예를 들어 path_prefix: logs/out 라고 선언하면 수행 결과가 bucket안에 아래와 같이 생성된다.



수행은 embulk run config.yml로 수행하면 된다.

수행한 결과는 아래와 같다.


$ embulk run bigquery-to-gcs.yml

2018-04-23 16:45:25.711 +0900: Embulk v0.9.7

2018-04-23 16:45:26.548 +0900 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.

2018-04-23 16:45:30.122 +0900 [INFO] (main): Gem's home and path are set by default: "/home/thkang0/.embulk/lib/gems"

2018-04-23 16:45:31.760 +0900 [INFO] (main): Started Embulk v0.9.7

2018-04-23 16:45:37.706 +0900 [INFO] (0001:transaction): Loaded plugin embulk-input-bigquery (0.0.4)

2018-04-23 16:45:37.854 +0900 [INFO] (0001:transaction): Loaded plugin embulk-output-gcs (0.4.1)

2018-04-23 16:45:39.781 +0900 [INFO] (0001:transaction): determine columns using the getQueryResults API instead of the config.yml

2018-04-23 16:45:41.242 +0900 [INFO] (0001:transaction): waiting for the query job to complete to get schema from query results

2018-04-23 16:45:42.126 +0900 [INFO] (0001:transaction): completed: job_id=job_j5ZOzjRw4ZT2nzfR8tElUb8jps68

2018-04-23 16:45:43.785 +0900 [INFO] (0001:transaction): determined columns: [{"name"=>"stn", "type"=>:string}]

2018-04-23 16:45:43.835 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=4 / output tasks 2 = input tasks 1 * 2

2018-04-23 16:45:43.988 +0900 [INFO] (0001:transaction): {done:  0 / 1, running: 0}

2018-04-23 16:45:50.010 +0900 [INFO] (pool-3-thread-1): Uploading 'embulk_temp/logs/out.000.01.csv'

2018-04-23 16:45:51.725 +0900 [INFO] (pool-3-thread-1): Local Hash(MD5): Hi+b4F8s4GwRdeZNPaB5ag== / Remote Hash(MD5): Hi+b4F8s4GwRdeZNPaB5ag==

2018-04-23 16:45:51.726 +0900 [INFO] (0016:task-0000): Uploaded 'embulk_temp/logs/out.000.01.csv' to 429bytes

2018-04-23 16:45:51.727 +0900 [INFO] (pool-4-thread-1): Uploading 'embulk_temp/logs/out.001.01.csv'

2018-04-23 16:45:53.581 +0900 [INFO] (pool-4-thread-1): Local Hash(MD5): 6scwcnrokvq/P1BtCip14Q== / Remote Hash(MD5): 6scwcnrokvq/P1BtCip14Q==

2018-04-23 16:45:53.582 +0900 [INFO] (0016:task-0000): Uploaded 'embulk_temp/logs/out.001.01.csv' to 5bytes

2018-04-23 16:45:53.611 +0900 [INFO] (0001:transaction): {done:  1 / 1, running: 0}

2018-04-23 16:45:53.660 +0900 [INFO] (main): Committed.

2018-04-23 16:45:53.660 +0900 [INFO] (main): Next config diff: {"in":{},"out":{}}


그리고 저장된 csv를 Cloud SQL에 적재를 할 것이다.

적재 역시 embulk를 사용하여 적재를 하는데, 우선 GCP상에서 Cloud SQL Instance를 생성하고, (테스트 용도이기 때문에 제일 적은 사이즈로 생성했다), Database이름은 BigQuery와 동일하게 ods_svc1으로 생성을 했다. 그리고 접속하기 위한 계정을 하나 발급하고 접속할 곳에서의 IP를 Authorization에 등록한다.



그리고 나서 다시 config.yml파일을 생성한다.


$ cat gcs-to-mysql.yml

in:

 type: gcs

 bucket: embulk_temp

 path_prefix: 'logs/out.'

 auth_method: json_key

 json_keyfile: '<Service_Account_Location_JSON>'

 application_name: Embulk Test

 parser:

   charset: UTF-8

   newline: CRLF

   type: csv

   delimiter: ','

   quote: '"'

   header_line: true

   columns:

   - {name: stn, type: string}

out:

#  type: stdout

 type: mysql

 host: <MySQL_IP_Address>

 user: username

 password: 'password'

 database: ods_svc1

 table: customers

 mode: insert


Input은 GCS, Output은 Cloud SQL이 되며, Table은 생성할 필요 없이 자동으로 생성해서 데이터를 적재한다.  


수행을 하면 아래와 같이 정상적으로 동작을 한다.


$ embulk run gcs-to-mysql.yml

2018-04-23 17:43:10.052 +0900: Embulk v0.9.7

2018-04-23 17:43:10.862 +0900 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.

2018-04-23 17:43:13.829 +0900 [INFO] (main): Gem's home and path are set by default: "/home/thkang0/.embulk/lib/gems"

2018-04-23 17:43:15.210 +0900 [INFO] (main): Started Embulk v0.9.7

2018-04-23 17:43:15.461 +0900 [INFO] (0001:transaction): Loaded plugin embulk-input-gcs (0.2.7)

2018-04-23 17:43:15.514 +0900 [INFO] (0001:transaction): Loaded plugin embulk-output-mysql (0.8.0)

2018-04-23 17:43:18.038 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=4 / output tasks 2 = input tasks 1 * 2

2018-04-23 17:43:18.084 +0900 [INFO] (0001:transaction): JDBC Driver = /home/thkang0/.embulk/lib/gems/gems/embulk-output-mysql-0.8.0/default_jdbc_driver/mysql-connector-java-5.1.44.jar

2018-04-23 17:43:18.095 +0900 [INFO] (0001:transaction): Connecting to jdbc:mysql://35.188.199.103:3306/ods_svc1 options {user=bespin, password=***, tcpKeepAlive=true, useSSL=false, useCompression=true, rewriteBatchedStatements=true, connectTimeout=300000, socketTimeout=1800000}

2018-04-23 17:43:19.634 +0900 [INFO] (0001:transaction): Using JDBC Driver mysql-connector-java-5.1.44 ( Revision: b3cda4f864902ffdde495b9df93937c3e20009be )

2018-04-23 17:43:19.821 +0900 [WARN] (0001:transaction): The client timezone(Asia/Seoul) is different from the server timezone(UTC). The plugin will store wrong datetime values.

2018-04-23 17:43:19.822 +0900 [WARN] (0001:transaction): You may need to set options `useLegacyDatetimeCode` and `serverTimezone`

2018-04-23 17:43:19.822 +0900 [WARN] (0001:transaction): Example. `options: { useLegacyDatetimeCode: false, serverTimezone: UTC }`

2018-04-23 17:43:19.822 +0900 [WARN] (0001:transaction): The plugin will set `useLegacyDatetimeCode=false` by default in future.

2018-04-23 17:43:19.822 +0900 [INFO] (0001:transaction): Using insert mode

2018-04-23 17:43:20.495 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE `customers_00000162f1abf22b_embulk000` (`stn` TEXT)

2018-04-23 17:43:20.712 +0900 [INFO] (0001:transaction): > 0.22 seconds

2018-04-23 17:43:20.900 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE `customers_00000162f1abf22b_embulk001` (`stn` TEXT)

2018-04-23 17:43:21.094 +0900 [INFO] (0001:transaction): > 0.19 seconds

2018-04-23 17:43:22.183 +0900 [INFO] (0001:transaction): {done:  0 / 1, running: 0}

2018-04-23 17:43:22.273 +0900 [INFO] (0018:task-0000): Connecting to jdbc:mysql://35.188.199.103:3306/ods_svc1 options {user=bespin, password=***, tcpKeepAlive=true, useSSL=false, useCompression=true, rewriteBatchedStatements=true, connectTimeout=300000, socketTimeout=1800000}

2018-04-23 17:43:23.482 +0900 [INFO] (0018:task-0000): Prepared SQL: INSERT INTO `customers_00000162f1abf22b_embulk000` (`stn`) VALUES (?)

2018-04-23 17:43:23.520 +0900 [INFO] (0018:task-0000): Connecting to jdbc:mysql://35.188.199.103:3306/ods_svc1 options {user=bespin, password=***, tcpKeepAlive=true, useSSL=false, useCompression=true, rewriteBatchedStatements=true, connectTimeout=300000, socketTimeout=1800000}

2018-04-23 17:43:24.766 +0900 [INFO] (0018:task-0000): Prepared SQL: INSERT INTO `customers_00000162f1abf22b_embulk001` (`stn`) VALUES (?)

2018-04-23 17:43:26.952 +0900 [INFO] (0018:task-0000): Loading 53 rows

2018-04-23 17:43:27.283 +0900 [INFO] (0018:task-0000): > 0.33 seconds (loaded 53 rows in total)

2018-04-23 17:43:27.285 +0900 [INFO] (0001:transaction): {done:  1 / 1, running: 0}

2018-04-23 17:43:27.286 +0900 [INFO] (0001:transaction): Connecting to jdbc:mysql://35.188.199.103:3306/ods_svc1 options {user=bespin, password=***, tcpKeepAlive=true, useSSL=false, useCompression=true, rewriteBatchedStatements=true, connectTimeout=300000, socketTimeout=2700000}

2018-04-23 17:43:28.499 +0900 [INFO] (0001:transaction): SQL: CREATE TABLE IF NOT EXISTS `customers` (`stn` TEXT)

2018-04-23 17:43:28.685 +0900 [INFO] (0001:transaction): > 0.19 seconds

2018-04-23 17:43:28.862 +0900 [INFO] (0001:transaction): SQL: INSERT INTO `customers` (`stn`) SELECT `stn` FROM `customers_00000162f1abf22b_embulk000` UNION ALL SELECT `stn` FROM `customers_00000162f1abf22b_embulk001`

2018-04-23 17:43:29.029 +0900 [INFO] (0001:transaction): > 0.17 seconds (53 rows)

2018-04-23 17:43:29.402 +0900 [INFO] (0001:transaction): Connecting to jdbc:mysql://35.188.199.103:3306/ods_svc1 options {user=bespin, password=***, tcpKeepAlive=true, useSSL=false, useCompression=true, rewriteBatchedStatements=true, connectTimeout=300000, socketTimeout=1800000}

2018-04-23 17:43:30.602 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS `customers_00000162f1abf22b_embulk000`

2018-04-23 17:43:30.790 +0900 [INFO] (0001:transaction): > 0.19 seconds

2018-04-23 17:43:30.791 +0900 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS `customers_00000162f1abf22b_embulk001`

2018-04-23 17:43:30.974 +0900 [INFO] (0001:transaction): > 0.18 seconds

2018-04-23 17:43:30.977 +0900 [INFO] (main): Committed.

2018-04-23 17:43:30.978 +0900 [INFO] (main): Next config diff: {"in":{"last_path":"logs/out.000.01.csv"},"out":{}}



적재 후 실제 Database에 들어가서 조회를 해보면 정상적으로 데이터가 들어가 있는 것을 확인 할 수 있다.


mysql> show databases;

+--------------------+

| Database           |

+--------------------+

| information_schema |

| mysql              |

| ods_svc1           |

| performance_schema |

+--------------------+

4 rows in set (0.16 sec)


mysql> use ods_svc1

Reading table information for completion of table and column names

You can turn off this feature to get a quicker startup with -A


Database changed

mysql> show tables

   -> ;

+--------------------+

| Tables_in_ods_svc1 |

+--------------------+

| customers          |

+--------------------+

1 row in set (0.16 sec)


mysql> select * from customers;

+--------+

| stn    |

+--------+

| 892080 |

| 892080 |

| 892080 |

| 892080 |

| 896060 |

| 896060 |

| 896060 |

| 896060 |

| 896060 |

| 896060 |

| 896060 |

| 896060 |

| 896060 |

| 896060 |

| 896060 |

| 896060 |

| 896060 |

| 896060 |

| 896060 |

| 896060 |

| 896060 |

| 896060 |

| 896060 |

| 896060 |

| 896060 |

| 896060 |

| 896060 |

| 896060 |

| 896060 |

| 896060 |

| 897990 |

| 898280 |

| 898280 |

| 898280 |

| 898280 |

| 898280 |

| 898280 |

| 898280 |

| 898280 |

| 898280 |

| 898280 |

| 898280 |

| 898280 |

| 898280 |

| 898720 |

| 898720 |

| 898720 |

| 898720 |

| 898720 |

| 898720 |

| 898720 |

| 898720 |

| 898720 |

+--------+

53 rows in set (0.16 sec)


7. ETC

embulk run config.yml을 수행할 때 아래와 같은 에러가 나오는데 이 경우는 아래 링크 참조

http://www.unknownengineer.net/entry/2017/12/15/100000

(이유는 The latest ca-certificates update threw out some Equifax and VeriSign certificates )


Caused by: org.jruby.exceptions.RaiseException: (SSLError) certificate verify failed

at RUBY.block in call(/home/thkang0/.embulk/lib/gems/gems/faraday-0.15.0/lib/faraday/adapter/net_http.rb:41)

at RUBY.with_net_http_connection(/home/thkang0/.embulk/lib/gems/gems/faraday-0.15.0/lib/faraday/adapter/net_http.rb:87)

at RUBY.call(/home/thkang0/.embulk/lib/gems/gems/faraday-0.15.0/lib/faraday/adapter/net_http.rb:33)

at RUBY.call(/home/thkang0/.embulk/lib/gems/gems/faraday-0.15.0/lib/faraday/request/url_encoded.rb:15)

at RUBY.build_response(/home/thkang0/.embulk/lib/gems/gems/faraday-0.15.0/lib/faraday/rack_builder.rb:143)

at RUBY.run_request(/home/thkang0/.embulk/lib/gems/gems/faraday-0.15.0/lib/faraday/connection.rb:387)

at RUBY.post(/home/thkang0/.embulk/lib/gems/gems/faraday-0.15.0/lib/faraday/connection.rb:175)

at RUBY.fetch_access_token(/home/thkang0/.embulk/lib/gems/gems/signet-0.8.1/lib/signet/oauth_2/client.rb:967)

at RUBY.fetch_access_token!(/home/thkang0/.embulk/lib/gems/gems/signet-0.8.1/lib/signet/oauth_2/client.rb:1005)

at RUBY.fetch_access_token!(/home/thkang0/.embulk/lib/gems/gems/googleauth-0.6.2/lib/googleauth/signet.rb:69)

at RUBY.initialize(/home/thkang0/.embulk/lib/gems/gems/googleauth-0.6.2/lib/googleauth/credentials.rb:71)

at RUBY.new(/home/thkang0/.embulk/lib/gems/gems/google-cloud-bigquery-0.30.0/lib/google/cloud/bigquery.rb:534)

at RUBY.transaction(/home/thkang0/.embulk/lib/gems/gems/embulk-input-bigquery-0.0.4/lib/embulk/input/bigquery.rb:53)

at RUBY.transaction(uri:classloader:/gems/embulk-0.9.7-java/lib/embulk/input_plugin.rb:58)


Error: org.jruby.exceptions.RaiseException: (SSLError) certificate verify failed



간단하게 ubuntu에서는 아래와 같은 명령어를 수행하면 된다

sudo apt install  ca-certificates=20160104ubuntu1

그리고 /etc/ssl/certs/Equifax_Secure_Certificate_Authority.pem 을 생성하여 복사한다.


-----BEGIN CERTIFICATE-----
MIIDIDCCAomgAwIBAgIENd70zzANBgkqhkiG9w0BAQUFADBOMQswCQYDVQQGEwJV
UzEQMA4GA1UEChMHRXF1aWZheDEtMCsGA1UECxMkRXF1aWZheCBTZWN1cmUgQ2Vy
dGlmaWNhdGUgQXV0aG9yaXR5MB4XDTk4MDgyMjE2NDE1MVoXDTE4MDgyMjE2NDE1
MVowTjELMAkGA1UEBhMCVVMxEDAOBgNVBAoTB0VxdWlmYXgxLTArBgNVBAsTJEVx
dWlmYXggU2VjdXJlIENlcnRpZmljYXRlIEF1dGhvcml0eTCBnzANBgkqhkiG9w0B
AQEFAAOBjQAwgYkCgYEAwV2xWGcIYu6gmi0fCG2RFGiYCh7+2gRvE4RiIcPRfM6f
BeC4AfBONOziipUEZKzxa1NfBbPLZ4C/QgKO/t0BCezhABRP/PvwDN1Dulsr4R+A
cJkVV5MW8Q+XarfCaCMczE1ZMKxRHjuvK9buY0V7xdlfUNLjUA86iOe/FP3gx7kC
AwEAAaOCAQkwggEFMHAGA1UdHwRpMGcwZaBjoGGkXzBdMQswCQYDVQQGEwJVUzEQ
MA4GA1UEChMHRXF1aWZheDEtMCsGA1UECxMkRXF1aWZheCBTZWN1cmUgQ2VydGlm
aWNhdGUgQXV0aG9yaXR5MQ0wCwYDVQQDEwRDUkwxMBoGA1UdEAQTMBGBDzIwMTgw
ODIyMTY0MTUxWjALBgNVHQ8EBAMCAQYwHwYDVR0jBBgwFoAUSOZo+SvSspXXR9gj
IBBPM5iQn9QwHQYDVR0OBBYEFEjmaPkr0rKV10fYIyAQTzOYkJ/UMAwGA1UdEwQF
MAMBAf8wGgYJKoZIhvZ9B0EABA0wCxsFVjMuMGMDAgbAMA0GCSqGSIb3DQEBBQUA
A4GBAFjOKer89961zgK5F7WF0bnj4JXMJTENAKaSbn+2kmOeUJXRmm/kEd5jhW6Y
7qj/WsjTVbJmcVfewCHrPSqnI0kBBIZCe/zuf6IWUrVnZ9NA2zsmWLIodz2uFHdh
1voqZiegDfqnc1zqcPGUIWVEX/r87yloqaKHee9570+sB3c4
-----END CERTIFICATE-----

그리고 나서 sudo update-ca-certificates --fresh 를 수행해서 업데이트 한다.



반응형