GlueのジョブでParquetファイルを結合

今回はGlueのETLジョブでS3上のparquetファイルをまとめる処理を作ってみました。

Glueジョブの作成

Glueのバージョンは以下の設定で作成しました。
特に意図はなく最新にしています。

  • Spark 2.4, Python 3 (Glue version 1.0)

Glueが生成するテンプレ的なスクリプトでは今回の用途に合わなそうだったので 「ユーザーが作成する新しいスクリプト」にチェックしています。

それと、GlueはDPU時間あたりで料金が発生するので
コストを抑えるために最大容量を5に減らしました。

スクリプト

最終的に出来上がったコード

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

target_path = 's3://[読み込み対象のS3パス]'
output_path = 's3://[出力対象のS3パス]'

df = spark.read.parquet(target_path)
df.repartition(1).write.mode('overwrite').parquet(output_path)
job.commit()

Glue独自のDynamicFrameは使わずにspark標準のDataFrameを使いました。
DynamicFrameを使うとなぜかエラーでうまくいかなかったので・・・

処理自体は

  1. spark.read.parquetでparquetファイルを読み込み
  2. repartitionで結合
  3. S3へ出力

と単純な内容です

うまくいかなかったDynamicFrameのスクリプト

df = glueContext.create_dynamic_frame_from_options(
      "s3", 
      {'paths': [target_path], 'groupFiles': 'inPartition', 'groupSize': 1024 * 1024}, 
      format="parquet")

datasink = glueContext.write_dynamic_frame.from_options(
      frame=df, 
      connection_type="s3", 
      connection_options=output_path, 
      format="parquet")

公式ドキュメントなどいろいろ調べるとこんな感じのコードでいけそうなのですが、
「TypeError: getSink() argument after ** must be a mapping, not str」
といったエラーでうまくいきませんでした。

解決方法などありましたら教えていただけるとうれしいです!

最後に

EMRのクラスタを立てる必要もなくGlueのジョブでsparkの処理が作れるのはホントに便利ですね。
重い処理をすると料金はそこそこかかりそうな気はしますがEMRのクラスタ立てるよりは安く済みそうにも思います。