
こんにちは。やましー@データ活用クラウドエンジニア(@yamashi18041) です。
今日は前回書いた「Microsoft Azure Databricksを使ってデータレイクのデータを表示する方法」で使用した教材、Microsoft Learnの続きです。
全3回のうちの3回目記事になります。

今回はData FactoryのパイプラインからDatabricksを呼び出し、Databricksの中でデータを整形してデータを統合します。
そのあとDatabricksから統合したデータをグラフ化します。
ネタバレですがこんなグラフを出力します。

3回の記事を通じて、データをソースからデータ基盤に持ってきて、分散処理基盤(Databricks)によりデータを整形し、可視化することができるようになります。
是非今回の記事を読んでクラウドを使いこなせるようになって憧れのつよつよエンジニアになってください。
初回、2回目の記事の中で構築した環境を引き続き使うのでまだ読んでないよって方はリンクを貼っておきますのでどうぞ。
初回の記事はこちらからどうぞ
2回目の記事はこちらからどうぞ
目次を見るだけでも何をしているかが分かると思います。
この記事の読者は次のような方を想定しています
- クラウドエンジニアを目指している
- データ活用をしたい
- Azure DatabricksとData Factoryを連携させたい
それでは始めていきます。
Data FactoryからDatabricksを操作するためのアクセストークン取得
Data Factoryから DatabricksにアクセスしてNotebookの処理を実行させるためにはDatabricksへのアクセストークンを取得する必要があります。
アクセストークンとはDatabricksへアクセスを許してもらうための鍵のようなもの (実態は文字列) だと思ってください。

①Azure portalのDatabricks管理画面から「Launch Workspace」を選択しAzure Databriksのワークスペースへ移動します。
移動したらNotebookを開いていきます。

①人のアイコンを選択します。

①「UserSettings」を選択します。
②「Generate New Token」を選択します。

①Comment:「ADF access」わかりやすいコメントを付けます。
②Lifetime (days):「90」アクセストークンの有効期間を指定します。
③「Generate」を選択します。

①アクセストークンの文字列をコピーして控えておきます。必ずここでコピーをしてください。後から参照することはできません。
②「Done」を選択します。
Data FactoryのパイプラインにDatabricksを追加する

portalのData Factory管理画面から「作成と監視」を選択します。

①鉛筆のアイコンを選択します。

①Pipelineから前回作成した「LabPipeline」を選択します。
②Activeitiesの中からDatabricksを開き「Notebook」をドラッグ&ドロップします。
③Copy dataから出ている緑のタブを伸ばしてNoteBookに接続します。

①Name:「LabNotebook」としました。
Linked serviceにDatabriksのNotebookを追加する
パイプラインで使用するためにLinked serviceにDatabricksのクラスタを追加していきます。

①「Azure Databricks」を選択します。
②「+New」を選択し新規追加します。

①Name:「AzureDatabricks」Linked service名を設定します。
②Connect via integration runtime:「AutoResolveintegrationRuntime」
ランタイムの種類はAzureマネージドなものを選択しました。
③Account selection method:「From Azure subscription」
使用するDatabricksのワークスペースをサブスクリプションから選択します。
④Azure subscription:「Select All」
⑤Databricks workspace:「ymstech-adb01」前回作成したワークスペースを選択します。
⑥Select cluster:「Existing interactive cluster」
既に作成済みのクラスターを選択します。
⑦Access token:先ほど生成してコピーしておいたアクセストークンの文字列を設定します。
⑧Choose from existing clusters:「ymstech-cluster01」を設定します。
⑨「Test connection」を選択し接続を確認します。
⑩「Connection successful」となることを確認します。
⑪「Create」を選択します。
パイプラインで実行するNoteBookを選択する
パイプラインで実行するNotebookを選択していきます。このNotebookはストレージへコピーした犯罪データの各データでバラバラな列名を統一しデータを統合する処理が記載されています。

①「Settings」を選択します。
②Notebook Path:「/Users/[ユーザー名]/03-Data-Ingestion-Via-ADF/includes/Databricks-Data-Transformations」
Browseを選択して選ぶこともできます。
今回はすでにインポート済のNotebookを使います。
③「+New」を選択します。今回のNotebookは引数を期待するので入力します。
④初回で作成したストレージアカウントの情報を設定します。
パラメータを取るNotebookの書き方

①Notebookはwidgetsメソッドでパラメータを取ることができるようになります。
変換処理の中身を覗いてみる
Notebook Path で指定した変換Notebookを見てみます。
# Create input widgets, which will accept parameters passed in via the ADF Databricks Notebook activity
dbutils.widgets.text("accountName", "", "Account Name")
dbutils.widgets.text("accountKey", "", "Account Key")
dbutils.widgets.text("containerName", "", "Container Name")
# Assign variables to the passed in values of the widgets
accountName = dbutils.widgets.get("accountName")
accountKey = dbutils.widgets.get("accountKey")
containerName = dbutils.widgets.get("containerName")
# Create connection string to use for accessing files in the storage account
connectionString = "wasbs://%(containerName)s@%(accountName)s.blob.core.windows.net/03.02" % locals()
# Create connection to the Azure Storage account
spark.conf.set("fs.azure.account.key." + accountName + ".blob.core.windows.net", accountKey)
bostonDf = spark.read.parquet("%(connectionString)s/Crime-Data-Boston-2016.parquet" % locals())
chicagoDf = spark.read.parquet("%(connectionString)s/Crime-Data-Chicago-2016.parquet" % locals())
dallasDf = spark.read.parquet("%(connectionString)s/Crime-Data-Dallas-2016.parquet" % locals())
losAngelesDf = spark.read.parquet("%(connectionString)s/Crime-Data-Los-Angeles-2016.parquet" % locals())
newOrleansDf = spark.read.parquet("%(connectionString)s/Crime-Data-New-Orleans-2016.parquet" % locals())
newYorkDf = spark.read.parquet("%(connectionString)s/Crime-Data-New-York-2016.parquet" % locals())
phillyDf = spark.read.parquet("%(connectionString)s/Crime-Data-Philadelphia-2016.parquet" % locals())
# Import required libraries
import datetime
from pyspark.sql.types import *
from pyspark.sql.functions import col, lit, lower, month, unix_timestamp, upper
- col()は、指定された列名に基づいて、DataFrameから列を返します。
- contains(mySubstr)は、サブストリングmySubstrを含むストリングを示します。
- lit()を使用して、リテラル値から列を作成します。データが派生する都市の名前を含む各DataFrameに新しい列を作成します。
- lower()は、テキストを小文字に変換します。
- month()は、reportDateタイムスタンプデータ型から月を抽出します。
- unix_timestamp()を使用して、ダラスの日付フィールドをタイムスタンプ形式に変換します。したがって、month()関数を使用して月を抽出できます。
homicidesBostonDf = (bostonDf.withColumn("city", lit("Boston"))
.select("month", col("OFFENSE_CODE_GROUP").alias("offense"), col("city"))
.filter(lower(col("OFFENSE_CODE_GROUP")).contains("homicide"))
)
homicidesChicagoDf = (chicagoDf.withColumn("city", lit("Chicago"))
.select(month(col("date")).alias("month"), col("primaryType").alias("offense"), col("city"))
.filter(lower(col("primaryType")).contains("homicide"))
)
homicidesDallasDf = (dallasDf.withColumn("city", lit("Dallas"))
.select(month(unix_timestamp(col("callDateTime"),"M/d/yyyy h:mm:ss a").cast("timestamp")).alias("month"), col("typeOfIncident").alias("offense"), col("city"))
.filter(lower(col("typeOfIncident")).contains("murder") | lower(col("typeOfIncident")).contains("manslaughter"))
)
homicidesLosAngelesDf = (losAngelesDf.withColumn("city", lit("Los Angeles"))
.select(month(col("dateOccurred")).alias("month"), col("crimeCodeDescription").alias("offense"), col("city"))
.filter(lower(col("crimeCodeDescription")).contains("homicide") | lower(col("crimeCodeDescription")).contains("manslaughter"))
)
homicidesNewOrleansDf = (newOrleansDf.withColumn("city", lit("New Orleans"))
.select(month(col("Occurred_Date_Time")).alias("month"), col("Incident_Description").alias("offense"), col("city"))
.filter(lower(col("Incident_Description")).contains("homicide") | lower(col("Incident_Description")).contains("murder"))
)
homicidesNewYorkDf = (newYorkDf.withColumn("city", lit("New York"))
.select(month(col("reportDate")).alias("month"), col("offenseDescription").alias("offense"), col("city"))
.filter(lower(col("offenseDescription")).contains("murder") | lower(col("offenseDescription")).contains("homicide"))
)
homicidesPhillyDf = (phillyDf.withColumn("city", lit("Philadelphia"))
.select(month(col("dispatch_date")).alias("month"), col("text_general_code").alias("offense"), col("city"))
.filter(lower(col("text_general_code")).contains("homicide"))
)
homicidesDf = homicidesNewYorkDf.union(homicidesBostonDf).union(homicidesChicagoDf).union(homicidesDallasDf).union(homicidesLosAngelesDf).union(homicidesNewOrleansDf).union(homicidesPhillyDf)
homicidesDf.write.mode("overwrite").saveAsTable("homicides_2016")
ちなみにテーブルセーブすると以下の場所から参照することができます。

①「Data」を選択します。
②Databasesから「Default」を選択します。
③Tablesから「homicides_2016」 のテーブルを選択します。
④概要が表示されます。
最後にNotebookの戻り値を返す処理を記述しています。
import json
dbutils.notebook.exit(json.dumps({
"status": "OK",
"message": "Cleaned data and created persistent table",
"tables": ["homicides_2016"]
}))
今回DataFactoryのパイプラインから使用する変換処理のNotebookは以上です。
パイプラインを確定させる

①「Validate all」を実行し検証します

①「Publish all」を実行し、確定させます。
②「Publish completed」となることを確認します。
これでパイプラインが確定し使える状態になりました。
このパイプラインを実行することでデータソース(Databricks社のBLOBストレージ)から自分のBLOBストレージにデータをインポートし、Azure Databricksを使ってデータを統合します。
パイプラインを実行する
パイプラインを実行していきます。

①「Add trigger」を選択します。
②「Trigger now」を選択します。今回は手動で実行します。このほかにも時刻などのスケジュールによる起動、BLOBを監視してファイル作成/削除による起動、タンブリングウィンドウによる謎の起動方法などが選択できます。

①今回のパイプラインにはパラメータはありませんので何も設定せず「Finish」を選択します。
これでパイプラインが実行されました。
パイプラインの実行結果を確認する
パイプラインの実行結果を確認してみます。

①「モニターのアイコン」を選択します。
②「Pipeline runs」を選択し、パイプラインの実行結果を見てみます。
③2つのアクティビティが「Succeeded」となっていることが分かります。
次により詳細なNotebookの実行結果を見ていきたいと思います。
DatabricksのNotebookの実行結果を確認する

①DatabricksNotebookアクティビティのアウトプットのアイコンを選択します。
②runPageUrlを選択します。

実行結果のページに遷移します。

各コマンドの結果を確認することができます。

今回テーブルを保存した際にアテンションが表示されていました。
テーブルの保存形式はparquetなのですが、2019年の秋ごろに「Delta Lake」と呼ばれるフォーマットがリリースされ、こちらにすることでパフォーマンスを向上できることを促しているようです。
ちなみにこのDelta Lakeフォーマットはテーブルを差分管理することによりロールバック等タイムトラベルが行えます。
また、ZORDERによる最適化を行うことによりパフォーマンス向上が見込めます。
統合したテーブルをグラフ化する
いよいよ大詰めですね。統合したテーブルをグラフ化していきたいと思います。

Notebook「 03-Data-Transformation 」を開きます。
cmd 8から再開します。
# Display that list of tables, filtering for users, products, and weblogs tables, to ensure they aren't buried among other tables in the results
sqlContext.tables().filter("tableName = 'homicides_2016'").show()

こちらはパイプラインによって統合されたテーブルが存在するのかを確認するコマンドですね。
%sql
SELECT * FROM homicides_2016 LIMIT 20

テーブル内のデータを確認しています。
マジックコマンド(%sql)を冒頭に記述することでその言語でNotebookを実行することができます。今回はSQLで実行しています。
homicidesDf = spark.sql("SELECT * FROM homicides_2016")

テーブルを読み出し、データフレーム形式で保持します。
display(homicidesDf.select("city", "month") \
.orderBy("city", "month") \
.groupBy("city", "month") \
.count())

事項結果として、月別、都市別に殺人事件を集計しています。
棒グラフから円グラフに変更してみます。

①▽を選択します。
②Pieを選択します。

このように簡単に表示するグラフを切り替えることができます。
まとめ

3回にわたってMicrosoft Learnを教材にAzure Data FactoryとDatabricksを使ってデータのインポートから、統合、可視化を実施してみました。
今回の手順をざっくりまとめると
1 Data FactoryからDatabricksを操作するためのアクセストークン取得
2 Data FactoryのパイプラインにDatabricksを追加する
3 Data FactoryのLinked serviceにDatabriksのNotebookを追加する
4 パイプラインで実行するNoteBookを選択する
5 パイプラインを確定させる
6 パイプラインを実行する
7 パイプラインの実行結果を確認する
8 DatabricksのNotebookの実行結果を確認する
9 統合したテーブルをグラフ化する
是非あなたもご自身の手で構築してみてみましょう。
最後まで読んでいただきありがとうございました。
以上、やましー@データ活用クラウドエンジニア(@yamashi18041)でした。
コメント