メインコンテンツまでスキップ

JSON ファイルの読み書き

JSON(JavaScript Object Notation)は、データ交換および保存に広く使用されている半構造化形式です。Databricks は、Apache Spark を使用した JSON の読み書きをサポートしており、単一行モードと複数行モード、自動スキーマ推論、およびレスキューされたデータが含まれます。クラウドストレージから JSON ファイルを Spark DataFrame API または SQL を使用して読み込み、DataFrames を JSON 形式で書き戻すことができます。

前提条件

Databricks は JSON ファイルを使用するのに追加の設定は必要ありません。

オプション

DataFrameReaderDataFrameWriter.option()メソッドと.options()メソッドを使用して、 JSONデータ ソースを構成します。 サポートされているオプションの完全なリストについては、 DataFrameReader JSON オプションDataFrameWriter JSON オプションを参照してください。

使い方

以下の例では、Wanderbricksサンプルデータセットを使用して、Spark DataFrame APIとSQLで単一行モードと複数行モードでのJSONファイルの読み取りと書き込みについて説明します。

JSONファイルの書き込みと読み取り

単一行モード(デフォルト)では、出力の各行に完全なJSONオブジェクトが1つ含まれています。WanderbricksのレビューをJSON形式で書き込み、読み戻します。

Python
# Write wanderbricks reviews to JSON format
df = spark.read.table("samples.wanderbricks.reviews")
df.write.format("json").save("/Volumes/<catalog>/<schema>/<volume>/reviews_json")

# Read the JSON files into a DataFrame
df = spark.read.format("json").load("/Volumes/<catalog>/<schema>/<volume>/reviews_json")
df.printSchema()
display(df)

複数行のJSONファイルを読み込む

複数行モードでは、単一のJSONオブジェクトが複数行にまたがることがあります。レコードが複数行にまたがってフォーマットされているJSONファイルを読み取るには、マルチラインモードを有効にします。

Python
mdf = spark.read.option("multiline", "true").format("json").load("/Volumes/<catalog>/<schema>/<volume>/multi-line.json")
mdf.show(truncate=False)

SQLを使用してjsonファイルをread.json

SQLのテーブル値関数read_filesを使用して、.json ファイルをread.jsonことができます。

SQL
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_json',
format => 'json',
multiLine => true)

USING JSONを使用して json ファイルをread.jsonこともできます。 ただし、DatabricksはUSING JSONではなくread_filesを使用することを推奨しています。なぜなら、 read_filesスキーマと追加のファイル処理オプションを指定できるからです。

SQL
DROP TABLE IF EXISTS reviews_json_table;

CREATE TABLE reviews_json_table
USING JSON
OPTIONS (path "/Volumes/<catalog>/<schema>/<volume>/reviews_json", multiline true);

SELECT * FROM reviews_json_table;

文字エンコーディングの指定

デフォルトでは、入力ファイルの文字セットは自動的に検出されます。 文字セットは、 charset オプションを使用して明示的に指定できます。

Python
spark.read.option("charset", "UTF-16BE").format("json").load("/Volumes/<catalog>/<schema>/<volume>/fileInUTF16.json")

サポートされている文字セットには、 UTF-8UTF-16BEUTF-16LEUTF-16UTF-32BEUTF-32LEUTF-32などがあります。 Oracle Java SEでサポートされている文字セットの完全なリストは、 サポートされているエンコーディングを参照してください。

レスキューされたデータ列を有効にする

レスキューされたデータ列により、ETL中にデータが失われることはありません。レコード内の1つ以上のフィールドに次のいずれかの問題があるため、解析されなかったデータをキャプチャします。

  • 指定されたスキーマに存在しない
  • 指定されたスキーマのデータ型と一致しない
  • 指定されたスキーマのフィールド名と大文字小文字の組み合わせが一致しない

レスキューされたデータ列は、レスキューされた列とレコードのソースファイルパスを含むJSONドキュメントとして返されます。

レスキューされたデータ列を有効にするには、読み取るときに rescuedDataColumn オプションを列名に設定します。

Python
df = spark.read.option("rescuedDataColumn", "_rescued_data").format("json").load("/Volumes/<catalog>/<schema>/<volume>/reviews_json")

レスキューされたデータ列からソースファイルパスを削除するには、設定します:

Python
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")

JSONパーサーは、レコードの解析時にPERMISSIVEDROPMALFORMED、およびFAILFASTの3つのモードに対応します。rescuedDataColumnと併用する場合、以下のルールが適用されます。

  • データ型の不一致によって、DROPMALFORMEDモードでレコードが削除されたり、FAILFASTモードでエラーがスローされたりすることはありません。
  • 破損したレコード(不完全または不正な形式のJSON)のみが削除されるか、エラーがスローされます。
  • badRecordsPathオプションを使用すると、データ型の不一致は不良レコードとは見なされません。不完全で不正な形式のJSONレコードのみがbadRecordsPathに保存されます。

その他のリソース

  • Parquet ファイルの読み取りと書き込み: ワークロードが主に分析的で読み込み負荷が高い場合は、Parquet の列指向レイアウトは、JSON の行ベースのテキスト形式よりも効率的なクエリパフォーマンスを提供します。
  • Avroファイルの読み書き: Apache Kafka のようなイベントストリーミングシステムからJSONを生成または消費している場合、Avroはスキーマ進化のサポートを備えた、よりコンパクトなバイナリエンコーディングを提供します。