Aurora Serverlessを実際に使ってみたメリットとデメリット
TECH
2023.05.09
2021.04.09
2022.12.23
ビッグデータのETL処理を行う際に使用したPySparkについて書いてみたいと思います。 私が携わった案件ではAWS Glueを用いた開発でしたが、Sparkを動かしてみるだけなら無料でお手軽に使うことができるDatabricksがオススメです!(本記事ではDatabricksについては割愛します)
昨今、当たり前のように耳にするようになった「ビッグデータ」ですが、 そもそもビッグデータとは何なのでしょうか?「大きなデータ」ではあるのですが、単純に膨大なデータを指す言葉ではありません。
ビックデータとは様々な形、特性、種類を持ったデータであり、DataVolume(データの量)、DataVariety(データの種類)、DataVelocity(データの発生・更新頻度)の重要な3つのVから成っていると言われています。また、IT用語辞典では下記のように定義されています。
ビッグデータとは、従来のデータベース管理システムなどでは記録や保管、解析が難しいような巨大なデータ群。明確な定義があるわけではなく、企業向け情報システムメーカーのマーケティング用語として多用されている。 多くの場合、ビッグデータとは単に量が多いだけでなく、様々な種類・形式が含まれる非構造化データ・非定型的データであり、さらに、日々膨大に生成・記録される時系列性・リアルタイム性のあるようなものを指すことが多い。 今までは管理しきれないため見過ごされてきたそのようなデータ群を記録・保管して即座に解析することで、ビジネスや社会に有用な知見を得たり、これまでにないような新たな仕組みやシステムを産み出す可能性が高まるとされている。
このようにビッグデータとは単に大きなデータといった意味ではなく、ビジネスや社会における新たな可能性を秘めたものであることがわかります。
PySparkの前にApache Spark(以下、Spark)の説明をします。Sparkは前述したビッグデータに対して高速に分散して処理を行うオープンソースのフレームワークです。SparkにはJavaやScala、Pythonといった様々なプログラミング言語から利用できるAPIが用意されています。PythonでSparkを利用する際に用いるAPIがPySparkです。
DataframeとはSpark上でParquetやCSVといったファイルをデータベースのテーブルのように扱うことができるオブジェクトです。SQLと同様にSELECTやWHERE(FILTER)、Dataframe同士を縦に結合するUNIONや横に結合するJOIN、集計に必要となるAGG(COUNT、SUM、AVG)などのメソッドが用意されています。
PySparkで出来ること、それは当たり前ですがPythonでSparkを扱うこと。 つまりPythonを使ってDataframeを作り、SQLのように操作し、ビッグデータを集計したり分散処理することが出来ます。実際によく使う例を紹介します。
1 |
df = spark.read.format('csv').option('encoding', 'sjis').option('header', 'true').load('/FileStore/tables/master/clientlist.csv') |
処理を行いたいデータファイルを読み込みDataframe化します。 format()でファイル形式を選択し、option()で文字コードの指定や、見出し行を読み込む指定を行います。また、option(‘inferSchema’, ‘true’)の指定をするとカラムの型を自動判別して読み込むことが可能です。 load()で読み込むデータファイルのパスを指定します。
1 |
df = df.where() |
または
1 |
df = df.filter() |
意味も使い方も同じです。SQLに慣れている人はwhereを使うほうが馴染み深いかもしれません。Dataframeから抽出したい条件を絞り込むのに使用します。
1 |
df = df1.union(df2) |
SQLでも2つのテーブルを縦(行)に結合する際に使用されることのあるUNIONです。同じカラムを持った2つのDataframeを縦に結合します。 なお、Dataframeの縦結合には3種類存在します。
それぞれどういった違いがあるのでしょう。
unionはv2.0より、unionAllはv1.3より実装されたもので、実はこの2つのメソッドには機能的な違いはありません。
SQLではunionAllは重複の制御が行われず、unionでは重複の制御が行われますが、Sparkのunionはシンプルに縦結合するだけで重複の制御が行われません。繋いだものをdistinctして重複を除外する必要があります。この点は使用する際に注意が必要です。
unionとunionByNameの違いはカラム名を参照するか、しないかという点です。カラムの順番が異なる2つのDataframeを縦に繋ぐ際、unionはカラム名が異なっていても縦に繋ぎます。一方でunionByNameの方は一致するカラム名を探し縦に結合します。
unionByNameはv2.3より実装されたもので、2つのDataframeを縦に繋ぐ際にはunionByNameを使うのが無難と言えるでしょう。
1 |
df = df1.join(df2, df1.fuga == df2.fuga, 'how') |
SQLでテーブル同士をカラム名を指定して横に繋ぐ際に使用するJOINと同じです。 howの箇所には以下のいずれかを指定します。
その他にもouter, full, fullouter, leftsemi, left_antiなどがあり、場面に応じて使い分けるのがいいでしょう。
1 |
df = df.groupBy(col('column01')).agg( count(col('column01')), max(col('column02')) ) |
基本的にはgroupByからのaggという使い方になり、aggメソッドの引数でcountやmin,max、sum,avgなどを記述します。 SQLと書き方は違えど基本的な考え方は同じです。
列の追加
1 |
df = df.withColumn('追加する列名', 処理) |
列のリネーム
1 |
df = df.withColumnRenamed('変更前カラム名', '変更後カラム名') |
エイリアス
1 |
df = df.select(col('変更前カラム名').alias('変更後カラム名')) |
Dataframeを加工しながら目的のデータを作っていくのは楽しいです。今回紹介したものは本当にごくごく一部でしかないので、使えるメソッドは沢山あり、もっと奥は深いです。SQLに馴染みのある方は最初こそ違和感を感じることもあるかもしれませんが、書いてみるとすぐに慣れると思います。
Sparkを書く際に注意したいこととして分散処理されるのはDataframe内の処理というところ。作成したDataframeをリスト化してPythonのループなどで処理すると、その部分は分散処理されません。そういった場合に、よく用いるのがUDF(User Defined Function)です。
本記事では紹介していませんが、UDFというのはユーザ定義関数のことで、Dataframe内で呼び出すことでSparkクラスタ内での分散処理を行うことが可能となります。実装をする際は分散処理になるかならないかを意識しながら、Sparkを有効活用した処理を書いていくのが良いです。