この記事は、最も有名なフレームワークライブラリであるPysparkについての全体的かつ単独の記事です。
ビッグデータとデータ分析のために、Apache Sparkはユーザーから選ばれています。
これは、これから説明するいくつかのクールな機能によるものです。
しかし、その前に、PySparkの異なる部分を理解することから始めましょう、ビッグデータ、そしてApache Sparkから始めます。
こちらもお読みください。
PyGameチュートリアル。
この記事もチェック:Pythonのデータ分析ライブラリのおススメを4つ紹介する
ビッグデータとは何か?
このネット時代、あらゆる情報がソーシャルメディアやニュースなどの大手テレメディアで手に入る。
ほんの数年前までは、このようなことは不可能でした。
すべての情報は、ファイルや物理的な紙の記録に保存されていました。
しかし、今日では、これは非常に簡単なことです。
そのために、私たちは知らず知らずのうちに、リアルタイムで多くの処理と分析を必要とする多くのデータを生成しています。
このように、技術の巣から、新しい分野であるビッグデータが出てくる。
ビッグデータとは、その名の通り「大量に存在するデータ」のことです。
しかし、なぜビッグデータが必要なのでしょうか。
それは、利用可能なリソースを高度に研究することで、将来の予測が可能になるからです。
多くの大企業やハイテク企業は、ビッグデータエンジニアに投資しています。
彼らは、重要かつ膨大な情報を日々管理し、維持することができる。
しかし、そのためには、いくつかのツールやテクニックが必要であることは周知の通りだ。
Apache Sparkとは?
Apache Sparkは、Data Bricks社が提供するWebフレームワークです。
データ管理を唯一の目的として開発されています。
ビッグデータに最も関連性の高いエンジンの1つです。
その特徴は以下の通りです。
- 1.フリーでオープンソースであること。300社以上の開発者が開発に参加しており、無料で利用できる。
- 2.高速性と安定性 Hadoopのような他のエンジンに比べ、100倍以上の速度があります。
- 3.シンプルかつパワフル:様々なデータ解析技術を簡単に利用・実装できる。
-
- クロスプラットフォーム。すべてのOSに対応している。
-
- 複数言語サポート Java、Python、R、Scala、SQLを独自に使用することが可能です。
- 複雑なデータ操作のハンドリング。最も複雑なデータの分析が可能です。
- 巨大なコミュニティーのサポート。世界的な開発者の支持を得ている。
また、4つの主要なライブラリセットとその他のサードパーティライブラリを備えています。
-
- SQLとDataFrames。
-
- Spark Streaming
- MLib(機械学習)
- GraphX
主なサードパーティライブラリには、以下のものが追加でサポートされています:C#/.NET、Groovy、Kotlin、Julia、Clojure。
クラウドのサポートは、IBM、Amazon AWS、その他です。
詳しくは、こちらのドキュメントをご覧ください。
Pysparkとは?
PysparkはApache SparkのPython向け拡張として有名です。
オープンソースのライブラリで、主に次のようなことに重点を置いています。
- 機械学習
- 独自のデータ解析
- データサイエンス
- Pythonとサポートライブラリによるビッグデータ。
広範な貢献と開発者のサポートにより、例外的なモデル構築のための最も安定した適切なパッケージとなりました。
このフレームワークの下で開発されたモデルは、より正確で高速な結果をもたらす傾向があります。
これは、常にAPIのサポートを受けているため、より便利です。
このフレームワークについてもっと知りたい方は、このリンクをご覧ください。
システムに追加する前に、依存関係を考慮する必要があります。
システム要件
- Pythonのバージョン。Python 3.6以上
-
- オペレーティングシステム Windows 7 以上
- その他ライブラリ対応。Numpy、Pandasなど。
- 安定したインターネット接続環境は必須です。
WindowsでPySparkを設定するには?
以下の手順に従って、システムにPysparkをインストールします。
-
- https://pypi.org/project/pyspark/ にアクセスすると、このプロジェクトのドキュメントが見つかります。
-
- pipコマンドでインストールするリンクもあります。
-
- コマンドプロンプトに移動して、次のコマンドを入力します。
pip install pyspark |
import pyspark # importing the module
from pyspark.sql import SparkSession # importing the SparkSession module
session = SparkSession.builder.appName( 'First App' ).getOrCreate() # creating a session
session # calling the session variable
|
注意:デフォルトの環境はAnacondaなので、pipによるライブラリもすべてAnacondaにのみ追加されます。
最も重要なことは、システムに正常に追加されたことを確認するために、任意のPython IDEを開き、それをインポートしてみることです。
もしエラーが出なければ、次のステップに進むことができます。
data = session.read.csv( 'Datasets/titanic.csv' ) # reading the dataset through the given path
data # calling the variable for column created
|
PySparkを使い始める
Pyspark を使って作業し、遊ぶためには、サンプルデータセットが必要です。
これはクイックスタートガイドであり、基本的なことを説明します。
環境 Anaconda
IDEです。
使用したデータセット:titanic.csv
Pysparkで最初に作るべき最も重要なものはSessionです。
セッションは、私たちのsparkアプリケーションが存在する参照フレームです。
作成したセッションは、開始から最終的なチェックポイントまでの進捗をカプセル化します。
このセッションはsparkのSQLモジュールで作成します。
この記事もチェック:Pythonでデータセットから別のデータセットへピボットテーブルを作成する
1. セッションの作成
PySparkを使う上で最も基本的な部分である、セッションの作成から始めましょう。
以下のコードを使用して、最初のセッションをセットアップすることができます。
コード
data.show() |
data = session.read.option.( 'header' , 'true' ).csv( 'Datasets/titanic.csv' )
data |
2. Sparkでデータセットを読み込む
データセットというと、膨大な量のデータ、レコードを行・列形式で集めたものです。
その数は数千からそれ以上になることもあります。
具体的には、データの前処理と呼ばれるプロセスの重要な側面です。
何らかの出力や結果を予測するためには、そのデータを前処理し、余分な演算や感傷的な分析を行う必要があります。
そのためにPythonは特定のモジュールやライブラリを提供しています。
一般的には、Pandasライブラリを使用します。
しかし、PySparkにはサブモジュールがあり、他のライブラリをインポートする手間を省くことができます。
そのため、read()メソッドを持っています。
このメソッドには、ファイルの拡張子や形式に応じて多くのサブメソッドが用意されています。
以下はその例です。
- csv
- フォーマット
- Jdbc
- ロード
- オプション
- オプション
- オーク
- パラケート
- スキーマ
- テーブル
- テキスト
今回は拡張子が.csvのタイタニックデータセットを使用するので、最初の方法を使用します。
コード
data.printSchema() |
data = session.readoption( 'header' , 'true' ).csv( 'Datasets/titanic.csv' , inferSchema = True )
data.printSchema() |
作成するセッションが、このコードの主なアプローチです。
そして、readメソッドとextensionメソッドがドット演算子で入ってきます。
データセットのすべての列と行をチェックするには、show() メソッドを使用します。
これは、データセットが大きすぎる場合に最初の 20 行を取得するものです。
dataframe.select(column_name) # selecting one column
dataframe.select(column_1, column_2, .., column_N) # selecting many columns
|
# adding columns in dataframe data = data.withColumn( 'Age_after_3_y' , data[ 'Age' ] + 3 )
|
さて、このデータセットの表示方法は適切でないことがわかるでしょう。
列の名前がきちんと表示されないのです。
そこで、いくつか変更を加える必要があります。
read.option() メソッドを使うと、作業が楽になります。
これはヘッダー付きのデータセットを取得するのに役立ちます。
コード
# dropping the columns dataframe = dataframe.drop( 'column_name in strings' )
dataframe.show() |
結果は以下の通りです。
# dropping the columns data = data.drop( 'Age_after_3_y' )
data.show() |
データセットを確認すると、このように表示されます。
# renaming the columns data = data.withColumnRenamed( 'Fare' , 'Price' )
data.show() |
さて、データセットの準備ができたので、そのカラムについての情報を取得してみましょう。
pandasでは、単にinfo()メソッドを使うだけです。
Pysparkでは、printSchema()が各カラムに関する情報を読み込んで値を与えるメソッドです。
コード
data = data.na.drop(how = 'any' , thresh = 2 )
data.show() |
data = data. filter (data[ 'Survived' ] = = 1 )
data.show() |
デフォルトでは、Pysparkはすべてのデータセットを文字列の形で読み込みます。
そのため、inferSchemaというパラメータを1つ有効にする必要があります。
これは、すべての主要なカラムをそれぞれのデータ型とともに読み込み、それに応じて情報を返します。
この記事もチェック:Linux Ubuntuで複数のファイルの名前を一括変更する
3. データプリプロセッシング
Pysparkのデータ前処理は非常にシンプルです。
ライブラリはそのような作業を行うための特定の関数を提供しています。
ここでは、以下の作業を行う。
- 特定の1列を選択する
- 複数の列を選択する
- 列を追加する
- 列の削除
- カラムの名前変更
- null 値の削除
1 つまたは複数のカラムを選択するには、select() 関数が完全に動作します。
構文は次のとおりです。
結果は以下の通りです。
データフレームに列を追加するには、withColumn()関数が有効です。
これは2つのパラメータを取ります。
- 追加する新しい列の名前
- 使用する既存のカラム名 (新しいカラムが既存のカラムと関係ない場合は必要ありません)
コード
データフレームに3年後の年齢という列を追加します。
取り出されたAge列は既存のものです。
これを通して、3年後の乗客の年齢を計算します。
出力してください。
見ての通り、最後に新しいカラムが追加されています。
4. 列の削除とデータセットからの NULL 値の削除
Pysparkでの削除は、私たちが実行できる最もシンプルなタスクです。
なぜなら、それだけ柔軟性と適応性を与えてくれるからです。
drop()関数は、カラムを削除するタスクを実行します。
このための構文は単純です。
以前データセットに追加したAge_after_3_gucci_yカラムを削除します。
コード
カラムの名前を変更するのはもう少し簡単な作業です。
withColunRenamed()関数がその手助けをしてくれます。
この関数は2つのパラメータを受け取ります。
- カラムの新しい名前。
- 既存のカラムの名前
例として、データセットの Fare カラムの名前を Price に変更することにします。
コード
null 値を削除するには、drop() 属性を持つ na 関数を使用します。
一般的に、これはNULL値を持つすべての行を削除します。
しかし、この関数には2つのパラメータがあり、それを使って新しい方法でドロップすることができます。
- how: 割り当てられた値に従って値をドロップするようにします。値にはanyとallがある。how = “any “の場合、NULL値を持つセルだけが削除されますが、how = “all “の場合、行全体が削除されます。
- しきい値:整数値で値をとる。例えば、thresh = 2と設定すると、2つのNULL値のみが削除される。
- サブセット: これは、ヌル値を削除する必要のある特定のカラムを選択するのに役立つ。
コード
このように、我々は2つのヌル値を削除しました。
5. Pysparkによるフィルタリング操作
何かをフィルタリングするということは、データフレームから必要なものを取り出すということです。
例えば、タイタニックのデータセットから生存者が必要な場合。
この場合、フィルタリングが最適な選択肢となる。
コード
結果は、以下の通りになります。
つまり、事件を生き延びた人たちのデータが目の前にあるわけです。
まとめ
このような形で、Pysparkの話題は終了です。
システムへsparkを追加する、データの読み込み、前処理、フィルタリングのテクニックといったトピックを取り上げました。
以上、Pyspark全般に関する基本的な紹介をしました。