Twitter Streaming APIでMongoDBに溜めたツイートをJupyter NotebookからApache Sparkで読み込んでみた

CentOS
スポンサーリンク

こんにちは!やましー@データ活用クラウドエンジニア(@yamashi18041)です!

私はビッグデータを扱うためのデータ基盤を作る仕事をしています。

しかし、まだ経験が浅くそれほど多くの基盤を構築したことがありません。

今後の仕事で活かせるよう、今日はOSSを使って簡単なデータ基盤の構築してみたいと思います。

使う技術は

  • データソース:Twitter Streaming API
  • 分散ストレージ:MongoDB
  • 分散データ処理:Apache Spark
  • データ整形:pandas
  • 対話的コンソール:Jupyter Notebook

構築するにあたり参考にしたのが前回の記事で紹介した書籍「ビッグデータを支える技術」の6章です。

ビッグデータに興味がある方はぜひ読んでみることをお勧めします。

それではまず構築するシステム構成から説明していきたいと思います。

スポンサーリンク

システム構成

  • サーバー:CentOS7.5(Azure VM)
  • データソース:Twitter Streaming API
  • 分散ストレージ:MongoDB
  • 分散データ処理:Apache Spark
  • データ整形:pandas
  • 対話的コンソール:Jupyter Notebook

サーバー:CentOS7.5(Azure VM)

今回構築していくために採用した環境はAzureのVMにCentOS7.5を入れたサーバーを使っています。

データソース:Twitter Streaming API

テストデータとしてTwitterからツイートを収集し、MongoDBにデータを入れていきます。

分散ストレージ:MongoDB

収集するTwitterのデータはJSON形式なので、通常のリレーショナルデータベースは使うことができません。なのでNoSQLデータベースであるMongoDBを使ってJSONを蓄えていきます。

分散データ処理:Apache Spark

データ基盤を作る目的の一つとして溜めたデータを処理して分析する必要があります。

しかし超大量のデータ(ビッグデータ)を処理すると通常、非常に時間がかかってしまいます。そこで複数のコンピュータに処理を分散して一気に並列でビッグデータを素早く処理することができます。

今回は簡単に環境を構築したかったので1台のマシンだけで稼働させます。

興味があれば併せてご覧いただければと思います。

データ整形:pandas

データ処理を行う際は構造化データの方が扱いやすいです。今回溜めたツイートはJSON形式で半構造化データと呼ばれデータ処理にはあまり適さないので表形式にします。

そして表形式のデータを抽象化したオブジェクトをデータフレームと言い、加工や集計、分析がプログラムで行いやすくなります。

今回はPythonでデータフレームを扱うためのライブラリであるpandasを使います。

対話的コンソール:Jupyter Notebook

データサイエンティストや機械学習エンジニアにとっては欠かせないツールとJupyter Notebook(ジュピターノートブック、ジュパイターノートブック)。プログラムコードを「実行」し、「その結果を残して」、「仲間と共有」することができるツールです。

Jupyter Notebookをインストールせずに使えるAzureのサービスもあります。興味がある方は以下の記事を読んでみてください。

スポンサーリンク

環境構築

いよいよ環境を構築していきます。

Javaをインストールする

まずベースとなるJavaをインストールします。

今回はyumでOpenJDKを入れました。

sudo yum install java-1.8.0-openjdk
[user01@vmcentos7 ~]$ java -version
openjdk version "1.8.0_252"
OpenJDK Runtime Environment (build 1.8.0_252-b09)
OpenJDK 64-Bit Server VM (build 25.252-b09, mixed mode)

Java 1.8.0_252がインストールできました。

MongoDBをインストールする

次にデータを溜めておくストレージでるMongoDBをインストールします。

yumでインストールするためにダウンロード先のリポジトリを登録します。

sudo vi /etc/yum.repos.d/mongodb-org-4.0.repo

以下の設定を上記ファイル名で作成します。

[mongodb-org-4.0]
name=MongoDB Repository
baseurl=https://repo.mongodb.org/yum/redhat/$releasever/mongodb-org/4.0/x86_64/
gpgcheck=1
enabled=1
gpgkey=https://www.mongodb.org/static/pgp/server-4.0.asc

リポジトリの設定ファイルを作成したら、以下のコマンドでインストールを実行します。

sudo yum install -y mongodb-org
[user01@vmcentos7 ~]$ mongod -version
db version v4.0.19
git version: 7e28f4296a04d858a2e3dd84a1e79c9ba59a9568
OpenSSL version: OpenSSL 1.0.1e-fips 11 Feb 2013
allocator: tcmalloc
modules: none
build environment:
    distmod: rhel70
    distarch: x86_64
    target_arch: x86_64

MongoDB version 4.0.19がインストールできました。

インストールができたのでサービスを起動します。

[user01@vmcentos7 ~]$ sudo service mongod start
Redirecting to /bin/systemctl start mongod.service

Pythonのパッケージpymongo requests_oauthlib tqdmをインストールする

データ分析の要となるプログラム言語のPythonはすでにサーバーにインストールしていたので割愛。Version は2.7.5を使います。

[user01@vmcentos7 ~]$ python --version
Python 2.7.5

まずPythonのパッケージ管理ツールであるpipをインストールします。

sudo yum -y install python-pip

インストールが完了したら、pipのアップグレードをして最新にしておきます。

sudo pip install --upgrade pip

次に必要なパッケージを入れていきます。

sudo pip install pymongo requests_oauthlib tqdm
  • pymongo
    PythonでMongoDBを扱うためのパッケージです。
  • requests_oauthlib
    Twitter Streaming APIを使う際にTwitterへ認証を行うためのパッケージです。
  • tqdm
    MongoDBへツイートを流し込む際に進捗を表示するためのパッケージです。

TwitterにDeveloper登録する

今回分析対象データとしてTwitterのツイートを採用します。

ツイートを収集するためにTwitter Streaming APIを使っていきます。

こちらはTwitterにDeveloper登録をすることで簡単に使うことができます。

まずは以下のサイトにアクセスします

https://developer.twitter.com/en/apps

Create an appをクリックします。

Applyをクリックします。

HobbyistのExploring the APIを選択して次へ進みます。

必要事項を記入してNext。

次にAPIを使うための理由を記入します。英語で200字以上ということで必死に翻訳しながら書いてみました。

最終確認を行って、こちらの画面が表示されたら登録したEmailを確認します。

EmailのConfirm your emailをクリック

これで登録されました。

さらに作るアプリを登録し、キーを取得します。

アプリの名前を適当に決めて先に進めます。

以下の情報を控えておきます。特にAccess token secretは1度表示した後は2度と表示できなくなるので要注意です。

  • API key
  • API secret key
  • Access token key
  • Access token secret

これでTwitter Streaming APIを使うための準備が整いました。

Twitter Streaming APIを使ってツイート収集する

APIを使ってMongoDBにツイートを収集するため、以下のスクリプト「twitter-streaming.py」を作成します。

import datetime
import json
import pymongo
import requests_oauthlib
import tqdm

#Twitter DevelopersサイトでAPIキーを発行
api_key = '****'
api_secret = '****'
access_token_key = '****'
access_token_secret = '****'

#Twitter Streaming APIを実行
twitter = requests_oauthlib.OAuthlSession(
  api_key, api_secret, access_token_key, access_token_secret)
uri = 'https://stream,twitter.com/1.1/statuses/sample.json'
r = twitter.get(uri, stream=True)
r.raise_for_status()

#サンプリングされたツイートをMongoDBに格納
mongo = pymongo.MongoClient()
for line in tqdm.tqdm(r.iter_lines(), unit='tweets', mininterval=1);
    if line:
        tweet = json.loads(line)
        #データ受信時のタイムスタンプを追加
        tweet['_timestamp'] = datetime.datetime.utcnow().isoformat()
        mongo.twitter.sample.insert_one(tweet)

作ったスクリプトを実行するとみるみる蓄積されていきます。

[user01@vmcentos7 ~]$ python ./twitter-streaming.py
2210tweets [00:44, 47.25tweets/s]

PythonのパッケージpandasとJupyterをインストールする

データフレームのpandasと対話ツールJupyterをインストールします。

sudo pip install pandas jupyter

Jupyter Notebookを起動する

Jupyter Notebookを起動してpandasを使ってみます。

外部IPアドレスからNotebookへアクセスできるようにコンフィグファイル作成します。

[user01@vmcentos7 ~]$ jupyter notebook --generate-config
Writing default config to: /home/user01/.jupyter/jupyter_notebook_config.py

作成したファイルを修正します。

vim /home/user01/.jupyter/jupyter_notebook_config.py

以下の部分をlocalhostから0.0.0.0とすることで受け入れIPアドレスの範囲を広げることができます。

## The IP address the notebook server will listen on.
#c.NotebookApp.ip = 'localhost'
c.NotebookApp.ip = '0.0.0.0'

jupyter-notebookを起動します。

[user01@vmcentos7 ~]$ jupyter-notebook
[I 17:14:05.342 NotebookApp] Serving notebooks from local directory: /home/user01
[I 17:14:05.342 NotebookApp] The Jupyter Notebook is running at:
[I 17:14:05.342 NotebookApp] http://(vmcentos7 or 127.0.0.1):8888/?token=38fcef605730ab5e6822cc633899f2d0dc83b9980e51238a
[I 17:14:05.342 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[W 17:14:05.347 NotebookApp] No web browser found: could not locate runnable browser.
[C 17:14:05.347 NotebookApp]

    To access the notebook, open this file in a browser:
        file:///home/user01/.local/share/jupyter/runtime/nbserver-42086-open.html
    Or copy and paste one of these URLs:
        http://(vmcentos7 or 127.0.0.1):8888/?token=38fcef605730ab5e6822cc633899f2d0dc83b9980e51238a

12行目に表示されたURLにWEBブラウザでアクセスすることができます。

Jupyter NotebookとMongoDBの接続をしてツイートを見てみる

WEBブラウザでアクセスしてプロジェクトを作成してMongoDBの中身を見てみます。

① 「New」を選択します。

② Notebookの「Python2」を選択します。

するとNotebookが作成されます。

① 次に以下のコードを入力してMongoDBのPythonライブラリをインポートして、溜まっている最初のツイートを表示してみます。

import pymongo
mongo = pymongo.MongoClient()

#最初のツイートを確認
mongo.twitter.sample.find_one()

フォーカスが当たっていセル上でCtrl + Enter又はShift + Enter実行します。

② MongoDBに蓄えられている最初のツイートがJSON形式で表示されました。

Pandasでデータフレームを表示する

このままでは見にくいので次にpandasデータフレームに変換して表示してみます。

import pandas as pd

#最初の100件をデータフレームに変換する
pd.DataFrame(list(mongo.twitter.sample.find(limit=100)))

① 以下のコードを入力して、MongoDBにあるツイート100件をデータフレームに変換して表示します。

② pandasデータフレームはJupyter上できれいに整形して表示してくれて非常に見やすいです。

これでJupyter NotebookとMongoDBとPandasとの接続確認を取ることができました。

一度ターミナルからCtrl-Cでjupyterを停止します。

最後にSparkを入手してJupyterの処理基盤としてSparkを使えるようにします。

Apache Sparkを入手する

まずMongoDBとSparkをつなぐためのコネクタがサポートするSparkのバージョンを以下のサイトから確認します。
https://docs.mongodb.com/spark-connector/current/

① 最新のMongoDB Connector for Sparkのコネクタバージョン2.4.2でサポートしているSpark Version は2.4.xでした。

以下のサイトからSpark 2.4.2のダウンロード用のURLを入手します。

Downloads | Apache Spark

①  Choose a Spark release は2.4.6を選択します。

② ダウンロードサイトにアクセスしてダウンロードURLを入手します。

Apache Download Mirrors
Home page of The Apache Software Foundation

CentOSのターミナルに戻り以下のコマンドを入力しダウンロードします。

wget https://ftp.jaist.ac.jp/pub/apache/spark/spark-2.4.6/spark-2.4.6-bin-hadoop2.7.tgz

解凍します。

tar xfz spark-3.0.0-bin-hadoop3.2.tgz

以下のコマンドにより環境変数へJupyter NotebookからSparkを使う設定を行います。

export PYSPARK_DRIVER_PYTHON=jupyter-notebook

解凍したディレクトリに移動します。

cd spark-2.4.6-bin-hadoop2.7/

以下のコマンドでpysparkを起動しsparkを実行します。

「–conf」でmongoDBをinputとoutputに指定します。

「–packages」でorg.mongodb.spark:mongo-spark-connector_2.11:2.4.6を指定します。sparkのバージョンとして「2.4.6」を指定しています。

./bin/pyspark --conf "spark.mongodb.input.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred"  --conf "spark.mongodb.output.uri=mongodb://127.0.0.1/test.myCollection"  --packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.6
[user01@vmcentos7 spark-2.4.6-bin-hadoop2.7]$ ./bin/pyspark --conf "spark.mongodb.input.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred"  --conf "spark.mongodb.output.uri=mongodb://127.0.0.1/test.myCollection"  --packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.6
[I 21:02:44.119 NotebookApp] Serving notebooks from local directory: /home/user01/adhoc/spark-2.4.6-bin-hadoop2.7
[I 21:02:44.119 NotebookApp] The Jupyter Notebook is running at:
[I 21:02:44.119 NotebookApp] http://(vmcentos7 or 127.0.0.1):8888/?token=e858a9aad851e1177429cc4339ed0728f48c2e0db1f093ee
[I 21:02:44.120 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[W 21:02:44.124 NotebookApp] No web browser found: could not locate runnable browser.
[C 21:02:44.124 NotebookApp]

    To access the notebook, open this file in a browser:
        file:///home/user01/.local/share/jupyter/runtime/nbserver-43090-open.html
    Or copy and paste one of these URLs:
        http://(vmcentos7 or 127.0.0.1):8888/?token=e858a9aad851e1177429cc4339ed0728f48c2e0db1f093ee

表示されたURLにWEBブラウザでアクセスし、以下のsparkコマンドが実行できることを確認します。

spark.range(1000 * 1000 * 1000).count()

これでjupyterからsparkを使えることの確認が取れました。

JupyterからMongoDBにあるツイートを読み込んでSparkデータフレームに変換する

最後にJupyterからMongoDBにあるツイートを読み込んでSparkのデータフレーム形式(pandasではない)に変換してみることにします。

# MongoDBからロードしてデータフレームに変換
df = (spark.read
      .format("com.mongodb.spark.sql.DefaultSource")
      .option("uri","mongodb://localhost/twitter.sample")
      .load())
# 一時的なビューとしてtweetsを作成する。
df.createOrReplaceTempView('tweets')

#ビューに対してSELECTを実行する
query = '''
SELECT created_at, text
FROM tweets WHERE lang = 'ja'
'''
print(spark.sql(query).show())

スポンサーリンク

まとめ

今回は以下の技術を使用して、簡単なデータ基盤を構築してみました。

  • データソース:Twitter Streaming API
  • 分散ストレージ:MongoDB
  • 分散データ処理:Apache Spark
  • データ整形:pandas
  • 対話的コンソール:Jupyter Notebook

各々の記述のインストールや接続は非常に簡単でした。

クラウド技術の発達により、今後このように一つ一つをインストールしてシステムを構築することも少なくなっていきます。

ただ、クラウドによって隠蔽された裏の部分を少しの時間をかけて理解しておくことで抽象化されたクラウドを扱うスキルはあがり、なにか問題が発生した時の対処スキルも上がっていくと思います。

かんたんに環境は構築できるものの、やはり1度やってみることで理解度は格段に変わってくるので是非皆さんも手を動かして構築してみることをお勧めします。

最後までご覧いただきありがとうございました。

以上、やましー@データ活用クラウドエンジニア(@yamashi18041)でした。

タイトルとURLをコピーしました