RedshiftのデータをBigQueryにロード

BigQueryを利用するにあたりRedshiftのデータをBigQueryに移行する必要がありました。
その際の手法や手順などについて紹介します。

前提

  • RailsからRedshiftのクエリを実行できるようにしておくこと
  • bqコマンドを実行できるようにしておくこと
  • AWS SDKがインストールされていること

処理内容

大きくは以下のような流れで作成しました。

  1. RedshiftのデータをS3にアンロード
  2. 対象テーブルのスキーマファイル作成
  3. bq loadコマンドでデータをインポート

アンロード

railsからRedshiftのアンロードを行うコードです。

query = sprintf("
  UNLOAD ('SELECT * FROM %<schema>s.%<table>s')
  TO 's3://redshift-unload-bucket/unload/%<schema>s/%<table>s/'
  IAM_ROLE 'arn:aws:iam::************:role/redshift-unload-role'
  MANIFEST
  DELIMITER AS ','
  GZIP
  ADDQUOTES
  ALLOWOVERWRITE
  PARALLEL OFF
  ESCAPE
  MAXFILESIZE AS 50MB
", sql:sql, schema: schema, table: table)
ActiveRecord::Base.connection.execute query

アンロードするためのIAMロールを事前に作成しておきます

ロール名:redshift-unload-role
ポリシー:AmazonS3FullAccess、AmazonRedshiftFullAccess

特記事項
  • BigQueryのロードに対応しているCSV形式(DELIMITER AS ',')での出力にする
  • BigQueryにロードする際にCSV構造が崩れないようにADDQUOTESESCAPEオプションをつけておく
  • PARALLEL ONにするとなぜかレコードが欠損する事象が発生したのでOFF
  • データ量が多すぎるとBigQueryのロードがうまくいかない問題が発生したのでMAXFILESIZE50MBずつ分割

スキーマファイル作成

Redshiftのpg_catalog.pg_table_defからスキーマ情報が取得できます。

columns = []
sql = sprintf("
  SELECT * FROM pg_catalog.pg_table_def
  WHERE schemaname = '%{schema}' AND tablename = '%{table}'
", schema: schema, table: table)
result = ActiveRecord::Base.connection.execute sql
result.each{|record|
  column = {mode:get_mode(record["notnull"]), name:record["column"], type:get_type(record["type"])}
  columns.push(column)
}

スキーマファイルを出力

file = "./schema/%{schema}/%{table}.schema" % {schema:schema, table:table}
File.open(file, 'w') do |f|
  f.puts columns.to_json
end

補足

RedshiftとBigQueryの型の違いを補完するためのメソッドを以下のようにつくってます

def self.get_mode(notnull)
  if notnull == 'y' then
    return 'REQUIRED'
  end
  return 'NULLABLE'
end

def self.get_type(type)
  if type.index('character') then
    return 'STRING'
  elsif type.index('bigint') then
    return 'INTEGER'
  elsif type.index('integer') then
    return 'INTEGER'
  elsif type.index('numeric') then
    return 'NUMERIC'
  elsif type.index('boolean') then
    return 'BOOL'
  elsif type.index('timestamp') then
    return 'TIMESTAMP'
  elsif type.index('date') then
    return 'DATE'
  end
  return 'STRING'
end

bq loadコマンドでBigQueryにインポート

S3上のアンロードしたオブジェクトを取得し、ループ内で
S3からGSへのアップロードしbq loadコマンドを実行しています。

prefix = "unload/%{schema}/%{table}/" % {schema:schema, table:table}
object_list = get_objects(prefix)
for object in object_list do
  if object.index('.gz') then
    s3_to_gs(object)
    gs_path = "gs://%{bucket}/%{object}" % {bucket:$gs_bucket, object:object}
    schema_path = "./schema/%{schema}/%{table}.schema" % {schema:schema, table:table}
    command = "bq load %{dataset}.%{table} %{gs_path} %{schema_path}" % {
      dataset:schema, table:table, gs_path:gs_path, schema_path:schema_path}
    system(command)
  end
end

上記の処理中のメソッドは以下のようになっています。

S3のオブジェクトをGoogleのストレージに転送

def self.s3_to_gs(object)
  s3_path = "s3://%{bucket}/%{object}" % {bucket:$s3_bucket, object:object}
  gs_path = "gs://%{bucket}/%{object}" % {bucket:$gs_bucket, object:object}
  command = "gsutil cp -r %{s3_path} %{gs_path}" % {s3_path:s3_path, gs_path:gs_path}
  system(command + $command_output)
end

アンロードしたS3上のファイルを取得

def self.get_objects(prefix)
  object_list = []
  next_token = ''
  while true do
    if next_token == '' then
      response = $s3.list_objects_v2(bucket: $s3_bucket, prefix: prefix)
    else
      response = $s3.list_objects_v2(bucket: $s3_bucket, prefix: prefix, continuation_token: next_token)
    end
    if response.contents.length == 0 then
      return []
    end
    for content in response.contents do
      object_list.push(content.key)
    end
    if response.next_continuation_token != nil then
      next_token = response.next_continuation_token
    else
      return object_list
    end
  end
end

最後に

  • ここではスキーマファイルを作成しBigQueryにロードさせましたが、bq load--autodetectオプションを利用すればスキーマ情報を自動検出することも可能です。
  • 自動検出ではデータの値に応じて文字列、数値、時間は高い精度で検出してくれました。簡単にロードしたい場合はかなりつけると思います。
  • データのサイズが大きくなると今回の`bq load’のやり方ではかなりの時間がかかることが想定されます。dataflowを使うことでBigQueryへのデータロードを大幅に削減することもできたのでまたの機会に記事にできればと思います。