メインコンテンツまでスキップ

スキーマ進化によるテーブルスキーマの更新

テーブルはスキーマ進化をサポートしており、データ要件の変化に合わせてテーブル構造の変更ができるようになります。次の変更の種類がサポートされています。

これらの変更は、DDLを使用して明示的に行うことも、DMLを使用して暗黙的に行うこともできます。

重要

スキーマの更新は、すべての並列書き込み操作と競合します。Databricks は、書き込みの競合を回避するためにスキーマの変更を調整することをお勧めします。

テーブルスキーマを更新すると、そのテーブルから読み取るストリームが終了します。処理を続行するには、「構造化ストリーミングの本番運用の考慮事項」に記載されている方法を使用してストリームを再開してください。

手動でのスキーマの変更

ALTER TABLEステートメントを使用して、新しいデータを書き込むことなくテーブルのスキーマを明示的に変更します。

列を追加

ALTER TABLE ... ADD COLUMNSを使用して、既存のテーブルに1つ以上の列を追加し、任意で位置とコメントを指定します:

SQL
ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)

デフォルトでは、NULL値の許容はtrueです。

例:入れ子フィールドを追加する

ネストされた列の追加は、構造体に対してのみサポートされています。配列とマップはサポートされていません。

ネストされたフィールドに列を追加するには、次のコマンドを使用します:

SQL
ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)

たとえば、ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1)を実行する前のスキーマが次のようになっているとします:

- root
| - colA
| - colB
| +-field1
| +-field2

その後のスキーマは次のとおりです:

- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2

列コメントと順序を変更する

列のコメントを更新したり、他の列との相対的な順序を変更したりするには、ALTER TABLE ... ALTER COLUMNを使用します。

SQL
ALTER TABLE table_name ALTER [COLUMN] col_name (COMMENT col_comment | FIRST | AFTER colA_name)

例: 入れ子になったフィールドを変更

ネストされたフィールドの列を変更するには、次のコマンドを使用します。

SQL
ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name (COMMENT col_comment | FIRST | AFTER colA_name)

たとえば、ALTER TABLE boxes ALTER COLUMN colB.field2 FIRSTを実行する前のスキーマが次のようになっているとします:

- root
| - colA
| - colB
| +-field1
| +-field2

その後のスキーマは次のとおりです:

- root
| - colA
| - colB
| +-field2
| +-field1

列の置換

単一の操作で、列の追加、削除、並べ替え、名前の変更など、テーブルの完全な列リストを再定義するには、ALTER TABLE ... REPLACE COLUMNS を使用します。

SQL
ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)

例:ネストされたフィールドを置き換える

例えば、以下のようなDDLを実行する場合:

SQL
ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)

前のスキーマが次の場合:

- root
| - colA
| - colB
| +-field1
| +-field2

その後のスキーマは次のとおりです:

- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA

列の名前を変更

列の既存のデータを書き換えずに列の名前を変更するには、テーブルの列マッピングを有効にする必要があります。Delta Lake 列マッピングを使用した列の名前変更と削除を参照してください。

列の名前を変更するには:

SQL
ALTER TABLE table_name RENAME COLUMN old_col_name TO new_col_name

例:ネストされたフィールドの名前変更

ネストされたフィールドの名前を変更するには:

SQL
ALTER TABLE table_name RENAME COLUMN col_name.old_nested_field TO new_nested_field

たとえば、次のコマンドを実行するとします:

SQL
ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001

以前のスキーマが次の場合:

- root
| - colA
| - colB
| +-field1
| +-field2

その後のスキーマは次のようになります:

- root
| - colA
| - colB
| +-field001
| +-field2

Delta Lake 列マッピングを使用した列の名前変更と削除を参照してください。

列のドロップ

データファイルを書き換えることなく、メタデータのみの操作として列をドロップするには、テーブルの列マッピングを有効にする必要があります。 Delta Lake 列マッピングを使用した列の名前変更と削除を参照してください。

注記

メタデータから列を削除しても、ファイル内の列の基礎となるデータは削除されません。削除された列のデータを消去するには:

  • ファイルを書き換えるには、REORG TABLEを使用します。
  • その後、VACUUMを使用して、ドロップされた列データを含むファイルを物理的に削除します。

列を削除するには:

SQL
ALTER TABLE table_name DROP COLUMN col_name

複数の列を削除するには:

SQL
ALTER TABLE table_name DROP COLUMNS (col_name_1, col_name_2)

列の型または名前の変更

テーブルを書き換えることで、列のタイプや名前を変更したり、列を削除したりできます。これを行うには、overwriteSchemaオプションを使用します。

次の例は、列の型を変更する方法を示しています:

Python
(spark.read.table(...)
.withColumn("birthDate", col("birthDate").cast("date"))
.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
)

次の例は、列名の変更を示しています:

Python
(spark.read.table(...)
.withColumnRenamed("dateOfBirth", "birthDate")
.write
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(...)
)

スキーマ進化を有効化

WITH SCHEMA EVOLUTIONを使用するか、mergeSchematrueに設定して、既存のテーブルにINSERTまたはMERGEするデータのスキーマに基づいてスキーマ変更を行います。

次のいずれかの方法を使用して、スキーマ進化を有効にします:

Databricksでは、Spark設定を直接設定するのではなく、WITH SCHEMA EVOLUTION 構文または mergeSchema オプションを使用して、各書き込み操作でスキーマ進化を有効にすることを推奨しています。

書き込み操作でスキーマ進化を有効にするためにオプションまたは構文を使用する場合、これがSpark構成よりも優先されます。

新しい列を追加するには、書き込みのスキーマ進化の有効化

スキーマ進化が有効な場合、ソースクエリに存在するがターゲットテーブルには存在しない列は、書き込みトランザクションの一部として自動的に追加されます。スキーマ進化を有効にするを参照してください。

以下を考慮してください:

  • 大文字と小文字は、新しい列を追加するときに保持されます。
  • 新しい列がテーブルスキーマの最後尾に追加されます。
  • 追加列が構造体に含まれている場合、それらはターゲットテーブルの構造体の末尾に追加されます。

INSERT の SQL を使用したスキーマ進化

スキーマ進化を有効にするには、INSERTステートメントでWITH SCHEMA EVOLUTION句を使用します:

SQL
INSERT WITH SCHEMA EVOLUTION INTO target_table
SELECT * FROM source_table

source_table に対するクエリが、ターゲットテーブルに存在しないカラムを返す場合、それらのカラムは自動的に target_table スキーマに追加されます。既存の行には、新しい列に対してNULLの値が付与されます。

DataFrame API を使用したスキーマ進化を伴うINSERT

次の例は、バッチ書き込み操作でのmergeSchemaオプションの使用方法を示しています。

Python
(spark.read
.table("source_table")
.write
.option("mergeSchema", "true")
.mode("append")
.saveAsTable("target_table")
)

構造化ストリーミングによるスキーマ進化を伴うINSERT

次の例では、Auto LoaderでmergeSchemaオプションを構造化ストリーミングに使用する方法を示します。「Auto Loader とは何ですか?」を参照してください。

Python
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.trigger(availableNow=True)
.toTable("table_name")
)

マージのための自動スキーマ進化

MERGEの場合、スキーマ進化により、ターゲットテーブルとソーステーブル間のスキーマの不一致を解決できます。以下の2つのケースに対応する:

  1. ソーステーブルに列が存在しますが、ターゲットテーブルには存在せず、挿入または更新アクションの割り当てにおいて名前で指定されています。あるいは、UPDATE SET *またはINSERT *のアクションが存在します。

    その列はターゲットスキーマに追加され、その値はソースの対応する列から入力されます。

    • これは、マージソース内の列名と構造がターゲットの割り当てと完全に一致する場合にのみ適用されます。

    • 新しい列はソーススキーマに存在する必要があります。アクション句で新しい列を割り当てても、その列は定義されません。

    これらの例ではスキーマ進化が可能です:

    SQL
    -- The column newcol is present in the source but not in the target. It will be added to the target.
    UPDATE SET target.newcol = source.newcol

    -- The field newfield doesn't exist in struct column somestruct of the target. It will be added to that struct column.
    UPDATE SET target.somestruct.newfield = source.somestruct.newfield

    -- The column newcol is present in the source but not in the target.
    -- It will be added to the target.
    UPDATE SET target.newcol = source.newcol + 1

    -- Any columns and nested fields in the source that don't exist in target will be added to the target.
    UPDATE SET *
    INSERT *

    newcolsource スキーマに存在しない場合、これらの例ではスキーマ進化はトリガーされません。

    SQL
    UPDATE SET target.newcol = source.someothercol
    UPDATE SET target.newcol = source.x + source.y
    UPDATE SET target.newcol = source.output.newcol
  2. ターゲットテーブルには列が存在しますが、ソーステーブルには存在しません。

    ターゲットスキーマは変更されません。次の列:

    • UPDATE SET *の場合、変更されないままになります。

    • INSERT *向けにNULLに設定されています。

    • アクション句で割り当てられている場合、引き続き明示的に変更される可能性があります。

    例えば:

    SQL
    UPDATE SET *  -- The target columns that are not in the source are left unchanged.
    INSERT * -- The target columns that are not in the source are set to NULL.
    UPDATE SET target.onlyintarget = 5 -- The target column is explicitly updated.
    UPDATE SET target.onlyintarget = source.someothercol -- The target column is explicitly updated from some other source column.

自動スキーマ進化を手動で有効にする必要があります。 スキーマ進化の有効化を参照してください。

注記

Databricks Runtime 11.3 LTS以下では、マージによるスキーマ進化にはINSERT *またはUPDATE SET *アクションのみを使用できます。

Databricks Runtime 12.2 LTS以降では、ソーステーブルに存在する列および構造体フィールドは、挿入または更新アクションで名前によって指定できます。

Databricks Runtime 13.3 LTS以降では、map<int, struct<a: int, b: int>>のようなマップ内にネストされた構造体でのスキーマ進化を使用できます。

SQL、Python、および Scala を使用したスキーマ進化を伴う MERGE

Databricks Runtime 15.4 LTS 以降では、SQL またはテーブル APIs を使用して、マージ ステートメントでスキーマ進化を指定できます。

SQL
MERGE WITH SCHEMA EVOLUTION INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE

スキーマ進化を伴うMERGEの操作例

ここでは、スキーマ進化を伴う場合と伴わない場合のMERGE操作の影響の例をいくつか示します。

クエリー(SQL の場合)

スキーマ進化なしの動作(既定)

スキーマの進化に伴う動作

ターゲット列: key, value

ソース列: key, value, new_value

SQL
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
THEN UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *

テーブルのスキーマは変更されません。列keyvalueのみが更新/挿入されます。

テーブルスキーマが(key, value, new_value)に変更されます。一致する既存のレコードは、ソース内のvaluenew_valueで更新されます。新しい行がスキーマ(key, value, new_value)とともに挿入されます。

ターゲット列: key, old_value

ソース列: key, new_value

SQL
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
THEN UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *

UPDATE ターゲット列old_valueがソースにないため、INSERTアクションはエラーをスローします。

テーブルスキーマが(key, old_value, new_value)に変更されます。一致する既存のレコードは、ソース内のnew_valueで更新され、old_valueは変更されません。old_valueに指定されたkeynew_value、およびNULLを使用して新しいレコードが挿入されます。

ターゲット列: key, old_value

ソース列: key, new_value

SQL
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
THEN UPDATE SET new_value = s.new_value

UPDATEnew_valueがターゲットテーブルに存在しないため、エラーがスローされます。

テーブルスキーマが (key, old_value, new_value) に変更されます。一致する既存のレコードは、ソース内の new_value で更新されます。old_value は変更されません。一致しないレコードでは new_valueNULL が入力されます。注(1)を参照してください。

ターゲット列: key, old_value

ソース列: key, new_value

SQL
MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN NOT MATCHED
THEN INSERT (key, new_value) VALUES (s.key, s.new_value)

INSERTnew_valueがターゲットテーブルに存在しないため、エラーがスローされます。

テーブルスキーマが (key, old_value, new_value) に変更されます。新しいレコードは、old_value に指定された keynew_value、および NULL を使用して挿入されます。既存のレコードでは、new_valueNULL が入力されます。old_value は変更されません。注(1)を参照してください。

(1) この動作はDatabricks Runtime 12.2 LTS以降で利用できます。Databricks Runtime 11.3 LTS以前では、この条件でエラーが発生します。

マージでの列の除外

Databricks Runtime 12.2 LTS 以降では、マージ条件で EXCEPT 句を使用することで、列を明示的に除外できます。EXCEPT キーワードの動作は、スキーマ進化が有効かどうかによって異なります。

スキーマ進化が無効になっている場合、EXCEPT キーワードはターゲットテーブルの列リストに適用され、UPDATE または INSERT アクションから列を除外できます。除外された列は null に設定されます。

スキーマ進化が有効な場合、EXCEPTキーワードはソーステーブルの列のリストに適用され、スキーマ進化から列を除外できます。ターゲットテーブルに存在しないソースの新しい列は、EXCEPT句にリストされている場合、ターゲットスキーマに追加されません。ターゲットに既に存在する除外された列はnullに設定されます。

EXCLUDE と併用した例 MERGE

次の例は、構文を示しています。

クエリー(SQL の場合)

スキーマ進化なしの動作(既定)

スキーマの進化に伴う動作

ターゲット列: id, title, last_updated

ソース列: id, title, review, last_updated

SQL
MERGE INTO target t
USING source s
ON t.id = s.id
WHEN MATCHED
THEN UPDATE SET last_updated = current_date()
WHEN NOT MATCHED
THEN INSERT * EXCEPT (last_updated)

一致した行は、last_updated フィールドが現在の日付に設定されて更新されます。新しい行は、idtitle の値を使って挿入されます。除外されたフィールド last_updatednull に設定されます。フィールド review はターゲットにないため無視されます。

一致した行は、last_updated フィールドが現在の日付に設定されて更新されます。スキーマは、フィールド review を追加するように進化します。新しい行は、null に設定される last_updated を除き、すべてのソースフィールドを使用して挿入されます。

ターゲット列: id, title, last_updated

ソース列: id, title, review, internal_count

SQL
MERGE INTO target t
USING source s
ON t.id = s.id
WHEN MATCHED
THEN UPDATE SET last_updated = current_date()
WHEN NOT MATCHED
THEN INSERT * EXCEPT (last_updated, internal_count)

INSERTinternal_countがターゲットテーブルに存在しないため、エラーがスローされます。

一致した行は、last_updated フィールドが現在の日付に設定されて更新されます。review フィールドはターゲットテーブルに追加されますが、internal_count フィールドは無視されます。新たに挿入された行では、last_updatednull に設定されます。

Spark 設定によるスキーマ進化の有効化 (レガシ)

現在のSparkSessionにおけるすべての書き込み操作に対してスキーマ進化を有効にするには、Spark構成 spark.databricks.delta.schema.autoMerge.enabledtrue に設定できます。

Python
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", True)
注記

Databricksでは、このアプローチを本番運用には推奨しません。セッション全体の構成を設定すると、複数の操作で意図しないスキーマ変更が発生する可能性があり、どの操作でスキーマが進化するかを判断することが難しくなります。

代わりに、各書き込み操作に対してスキーマ進化を有効にします:

書き込み操作でスキーマ進化を有効にするためにオプションまたは構文を使用する場合、これがSpark構成よりも優先されます。

テーブルスキーマを置き換える

デフォルトでは、テーブル内のデータを上書きしてもスキーマは上書きされません。replaceWhereを使用せずにmode("overwrite")を使用してテーブルを上書きする場合、書き込まれるデータのスキーマを上書きしたい場合があります。

テーブルのスキーマとパーティションを置き換えるには、overwriteSchemaオプションをtrueに設定します:

Python
df.write.option("overwriteSchema", "true")
注記

動的パーティションの上書きを使用する場合、overwriteSchematrueとして指定することはできません。「partitionOverwriteModeを使用した動的パーティションの上書き(レガシー)」を参照してください。