メインコンテンツまでスキップ

チュートリアル: テーブル間のトランザクションを調整する

備考

プレビュー

Unity Catalog で管理される Delta テーブルに書き込むトランザクションは、パブリック プレビュー段階にあります。

Unity Catalog で管理される Iceberg テーブルに書き込むトランザクションは、プライベート プレビュー段階です。このプレビューに参加するには、マネージド Iceberg テーブル プレビュー登録フォームを送信してください。

この「チュートリアル」では、「Databricks」上の複数のステートメントおよびテーブル間で更新を調整するために、両方のトランザクションモードを使用します。非インタラクティブ (BEGIN ATOMIC) は自動的に「コミット」され、インタラクティブ (BEGIN TRANSACTION) は明示的に制御できます。この「チュートリアル」では、ストアドプロシージャおよび「SQL」スクリプトでトランザクションを使用する方法も示します。

要件

  • 環境 : Databricks ワークスペースへのアクセス。

  • コンピュート : サポートされるコンピュートのタイプはトランザクション モードによって異なります。

  • 権限 : Unity Catalogスキーマ内のCREATE TABLE

サンプルテーブルを設定する

複数ステートメント、複数テーブルのトランザクションで書き込まれるすべてのテーブルは次の条件を満たす必要があります。

SQL エディターまたはノートブックで 2 つのサンプル テーブルを作成します。

SQL
-- Account data
CREATE TABLE IF NOT EXISTS sample_accounts (
id INT,
account_name STRING,
balance DECIMAL(10,2)
) USING DELTA
TBLPROPERTIES (
'delta.feature.catalogManaged' = 'supported'
);

-- Transaction records
CREATE TABLE IF NOT EXISTS sample_transactions (
id INT,
account_id INT,
transaction_type STRING,
amount DECIMAL(10,2)
) USING DELTA
TBLPROPERTIES (
'delta.feature.catalogManaged' = 'supported'
);
注記

既存のテーブルでトランザクションを有効にするには、実行します:

SQL
ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');

両方のテーブルにサンプルデータを挿入します。

SQL
INSERT INTO sample_accounts VALUES
(1, 'Alice', 1000.00),
(2, 'Bob', 500.00);

INSERT INTO sample_transactions VALUES
(1, 1, 'deposit', 100.00);

セットアップを確認します。

SQL
SELECT * FROM sample_accounts;
SELECT * FROM sample_transactions;

出力:

sample_accounts:
id account_name balance
1 Alice 1000.00
2 Bob 500.00

sample_transactions:
id account_id transaction_type amount
1 1 deposit 100.00

非対話型トランザクション

非対話型トランザクションではBEGIN ATOMIC ... END;構文が使用されます。すべてのステートメントは単一のアトミック単位として実行されます。すべてのステートメントが成功すると、Databricks は自動的にコミットします。いずれかのステートメントが失敗した場合、Databricks はすべての変更を自動的にロールバックします。詳細な構文と使用パターンについては、 「非対話型トランザクション」を参照してください。

成功した取引を実行する

両方のテーブルをアトミックに更新します。

SQL
BEGIN ATOMIC
-- Update Alice's account balance
UPDATE sample_accounts
SET balance = balance + 100.00
WHERE id = 1;

-- Record the deposit transaction
INSERT INTO sample_transactions
VALUES (2, 1, 'deposit', 100.00);
END;

Aliceの残高が現在1100.00であることを確認してください:

SQL
SELECT * FROM sample_accounts WHERE id = 1;

現在、2つのトランザクションレコードが存在することを確認してください:

SQL
SELECT * FROM sample_transactions;

残高の更新と取引記録の両方が一緒に作成されました。どちらかのステートメントが失敗した場合、どちらの変更もコミットされず、Databricks は副作用なしでトランザクションを終了していました。

SIGNALを使用して条件に基づいてトランザクションを失敗させる

SIGNALBEGIN ATOMIC ... END;ユーザー定義の条件が満たされない場合、 ブロック内で 「 」 を使用してトランザクションを失敗させることができます。この例では、負の残高を持つアカウントを挿入し、残高チェックが失敗した場合にSIGNALを使用してトランザクションを失敗させます。

SQL
BEGIN ATOMIC
INSERT INTO sample_accounts VALUES (3, 'Charlie', -50.00);

IF (SELECT balance FROM sample_accounts WHERE id = 3) < 0 THEN
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Account balance cannot be negative';
END IF;
END;

SIGNALはエラーを発生させ、これによりトランザクション全体が自動的にロールバックされます。挿入がロールバックされたため、これは0行を返します。

SQL
SELECT * FROM sample_accounts WHERE id = 3;

失敗時の自動ロールバックを参照

最初のステートメントが有効で、2番目のステートメントが存在しないテーブルを参照するトランザクションを実行します:

SQL
BEGIN ATOMIC
-- Valid
INSERT INTO sample_accounts VALUES (4, 'David', 300.00);
-- Invalid
INSERT INTO non_existent_table VALUES (1, 2, 3);
END;

トランザクションはエラーで失敗します。トランザクション全体がロールバックされたため、これは0行を返します。

SQL
SELECT * FROM sample_accounts WHERE id = 4;

最初のINSERTステートメントは有効でしたが、2 番目のステートメントが失敗したためロールバックされました。これは、トランザクションの「すべてか無か」の保証を示しています。

インタラクティブな取引

対話型トランザクションを使用すると、コミットまたはロールバックするタイミングを明示的に制御できます。BEGIN TRANSACTIONを使用して開始し、コミットして変更を保存するか、 ROLLBACK を使用して変更を破棄します。

変更をコミットする

取引を開始します:

SQL
BEGIN TRANSACTION;

変更を加える(まだコミットされていません):

SQL
INSERT INTO sample_accounts VALUES (5, 'Eve', 850.00);
UPDATE sample_accounts SET balance = balance + 50.00 WHERE id = 2;

変更を永続的にすることを約束します:

SQL
COMMIT;

Eve のアカウントが表示されるようになりました:

SQL
SELECT * FROM sample_accounts WHERE id = 5;

Bobの残高が現在550.00であることを確認してください:

SQL
SELECT * FROM sample_accounts WHERE id = 2;

変更をロールバックする

新しい取引を開始します:

SQL
BEGIN TRANSACTION;

変更を加える:

SQL
INSERT INTO sample_accounts VALUES (6, 'Frank', 600.00);

変更がセッションに表示されることを確認します(コミットされるまで、その行は他のセッションには表示されません):

SQL
SELECT * FROM sample_accounts WHERE id = 6;

変更を破棄するにはロールバックします。

SQL
ROLLBACK;

挿入がロールバックされたため、これは0行を返します。

SQL
SELECT * FROM sample_accounts WHERE id = 6;

ストアドプロシージャとSQLスクリプトでの使用

トランザクションとストアド プロシージャを組み合わせて、再利用可能なトランザクション ロジックを作成できます。このパターンは、頻繁に実行する複雑な操作に役立ちます。

  1. カタログコミットを有効にしてテーブルを作成する

    SQL
    CREATE SCHEMA IF NOT EXISTS main.retail;

    CREATE TABLE IF NOT EXISTS main.retail.orders (
    order_id STRING,
    customer_id STRING,
    amount DECIMAL(18,2)
    ) TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');

    CREATE TABLE IF NOT EXISTS main.retail.orders_staging (
    order_id STRING,
    customer_id STRING,
    amount DECIMAL(18,2),
    batch_id STRING
    ) TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');

    CREATE TABLE IF NOT EXISTS main.retail.total_sales (
    customer_id STRING,
    total_amount DECIMAL(18,2)
    ) TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');
  2. ストアドプロシージャを定義する

    SQL
    CREATE OR REPLACE PROCEDURE main.retail.apply_order(
    IN p_order_id STRING,
    IN p_customer_id STRING,
    IN p_order_amount DECIMAL(18,2)
    )
    LANGUAGE SQL
    SQL SECURITY INVOKER
    MODIFIES SQL DATA
    AS
    BEGIN
    -- Insert the order
    INSERT INTO main.retail.orders (order_id, customer_id, amount)
    VALUES (p_order_id, p_customer_id, p_order_amount);

    -- Update total sales per customer
    MERGE INTO main.retail.total_sales AS t
    USING (
    SELECT
    p_customer_id AS customer_id,
    p_order_amount AS order_amount
    ) s
    ON t.customer_id = s.customer_id
    WHEN MATCHED THEN
    UPDATE SET t.total_amount = t.total_amount + s.order_amount
    WHEN NOT MATCHED THEN
    INSERT (customer_id, total_amount)
    VALUES (s.customer_id, s.order_amount);
    END;

  3. 取引を定義する

    SQL
    BEGIN ATOMIC
    -- Staging batch id for this transaction
    DECLARE new_order_id STRING DEFAULT uuid();
    DECLARE v_batch_id STRING DEFAULT uuid();

    -- 1) Stage incoming customer and order rows
    INSERT INTO main.retail.orders_staging (order_id, customer_id, amount, batch_id)
    VALUES (new_order_id, 'CUST_123', 249.99, v_batch_id);

    -- 2) Drive final writes from staging to production via stored procedure
    FOR o AS
    SELECT
    order_id,
    customer_id,
    amount
    FROM main.retail.orders_staging
    WHERE batch_id = v_batch_id
    DO
    CALL main.retail.apply_order(
    o.order_id,
    o.customer_id,
    o.amount
    );
    END FOR;

    -- 3) Clean up processed staging rows
    DELETE FROM main.retail.orders_staging
    WHERE batch_id = v_batch_id;
    END; -- 4) Commit the transaction

トランザクションの一部が失敗した場合、Databricks はすべての変更を自動的にロールバックします。

掃除

サンプル テーブルを削除します。

SQL
DROP TABLE IF EXISTS sample_accounts;
DROP TABLE IF EXISTS sample_transactions;

DROP TABLE IF EXISTS main.retail.orders;
DROP TABLE IF EXISTS main.retail.orders_staging;
DROP TABLE IF EXISTS main.retail.total_sales;

その他のリソース