Pular para o conteúdo principal

APIs AUTO CDC : Simplifique a captura de dados de alterações (CDC) com pipeline

Lakeflow Spark Declarative Pipelines (SDP) simplifica a captura de dados de alterações (CDC) com as APIs AUTO CDC e AUTO CDC FROM SNAPSHOT. Essas APIs automatizam a complexidade do cálculo de dimensões que mudam lentamente (SCD) Tipo 1 e Tipo 2 a partir de um feed de CDC ou snapshots de banco de dados. A API AUTO CDC também oferece suporte ao acompanhamento bitemporal, que registra as alterações em duas dimensões de tempo (Beta). Para saber mais sobre SCD Tipo 1 e Tipo 2, consulte Captura de dados de alterações (CDC) e Snapshots. Para saber mais sobre o acompanhamento bitemporal, consulte Como o CDC AUTOMÁTICO Bitemporal funciona.

nota

As APIs AUTO CDC substituem as APIs APPLY CHANGES e têm a mesma sintaxe. As APIs APPLY CHANGES ainda estão disponíveis, mas a Databricks recomenda o uso das APIs AUTO CDC em seu lugar.

A API que você utiliza depende da origem dos seus dados de alteração:

  • AUTO CDC : Use isso quando o banco de dados de origem tiver um feed CDC ativado. AUTO CDC processa alterações de um feed de dados de alteração (CDF). É compatível com as interfaces SQL e Python do pipeline.
  • AUTO CDC FROM SNAPSHOT : Use esta opção quando CDC não estiver habilitado no banco de dados de origem e apenas os Snapshots estiverem disponíveis. Esta API compara o Snapshot para determinar as alterações e, em seguida, as processa. Só é suportado na interface Python.

Ambas as APIs suportam a atualização de tabelas usando SCD Tipo 1 e Tipo 2:

  • Utilize o SCD Tipo 1 para atualizar registros diretamente. não é mantido para fins de atualização de registros.
  • Use o SCD tipo 2 para manter um histórico de registros, em todas as atualizações ou em atualizações de um conjunto específico de colunas.

Somente para AUTO CDC, você também pode usar o armazenamento bitemporal, que estende o histórico do SCD Tipo 2 para rastrear alterações em duas dimensões de tempo: tempo de negócios e tempo de sistema. Bitemporal está em Beta. Consulte Como o CDC AUTOMÁTICO Bitemporal funciona.

As APIs AUTO CDC não são suportadas pelo pipeline declarativo Apache Spark .

Para sintaxe e outras referências, consulte AUTO CDC INTO (pipeline), create_auto_cdc_flow e create_auto_cdc_from_snapshot_flow.

nota

Esta página descreve como atualizar tabelas em seus pipelines com base nas alterações nos dados de origem. Para saber como registrar e consultar informações de alteração em nível de linha para tabelas Delta, consulte Usar o feed de dados de alteração no Databricks.

Requisitos

Para usar as APIs CDC , seu pipeline deve ser configurado para usar o SDPserverless ou as edições SDP Pro ou Advanced .

Como funciona o AUTO CDC

Para realizar o processamento CDC com AUTO CDC, crie uma tabela de transmissão e use a instrução AUTO CDC ... INTO em SQL ou a função create_auto_cdc_flow() em Python para especificar a origem, a chave e a sequência para o feed de alterações. Para obter uma explicação de como funcionam o sequenciamento e a lógica SCD , consulte captura de dados de alterações (CDC) e Snapshot. Veja os exemplos do AUTO CDC.

Para hidratação inicial a partir de uma fonte com alimentação de mudança, use AUTO CDC com um fluxo once e continue processando a alimentação de mudança. Consulte Replicar uma tabela RDBMS externa usando o AUTO CDC.

Para detalhes de sintaxe, consulte AUTO CDC INTO (pipeline) ou create_auto_cdc_flow.

Como funciona o AUTO CDC do Snapshot

AUTO CDC FROM SNAPSHOT Determina as alterações nos dados de origem comparando os Snapshots em ordem. Só é suportado na interface de pipeline do Python. Você pode ler o Snapshot diretamente de uma tabela Delta , arquivos de armazenamento cloud ou JDBC .

Para realizar o processamento CDC com AUTO CDC FROM SNAPSHOT, crie uma tabela de transmissão e use a função create_auto_cdc_from_snapshot_flow() para especificar o Snapshot, a chave e outros argumentos. Para obter detalhes sobre os dois padrões de ingestão e quando usar cada um, consulte Padrões de processamentoSnapshot. Veja os exemplos de Auto CDC FROM Snapshot.

Para detalhes de sintaxe, consulte create_auto_cdc_from_snapshot_flow.

Como funciona o CDC AUTOMÁTICO Bitemporal

info

Beta

O CDC AUTOMÁTICO Bitemporal está em Beta.

SCD Tipo 1 e Tipo 2 são unitemporais: eles rastreiam alterações em uma única dimensão de tempo. Bitemporal estende o histórico do SCD Tipo 2 para rastrear alterações em duas dimensões de tempo e distinguir entre duas perspectivas:

  • Tempo de negócios : quando o evento realmente aconteceu.
  • Tempo do sistema : quando o sistema registrou ou ingeriu o evento.

Assim como o SCD Tipo 2, o bitemporal preserva um histórico completo de registros. Ele adiciona uma segunda linha do tempo para que você possa reconstruir o que os dados mostraram e o que o sistema acreditava em qualquer ponto do passado.

Por exemplo, um fundo de cobertura ingere dados de ações de um sistema de origem. O preço das ações da Acme Corp muda em 1º de janeiro, mas o fundo não ingere essa atualização até 5 de janeiro. O CDC AUTOMÁTICO bitemporal permite que o fundo responda a duas perguntas distintas: qual era o preço real das ações da Acme Corp em 1º de janeiro (tempo de negócios) e qual o preço que o sistema acreditava quando o fundo tomou decisões de negociação em 3 de janeiro (tempo do sistema). A capacidade de distinguir entre essas linhas do tempo é útil para auditoria, relatórios regulatórios e tomada de decisões financeiras.

Para habilitar o processamento bitemporal, defina STORED AS BITEMPORAL (SQL) ou stored_as_scd_type="bitemporal" (Python), use SEQUENCE BY para a coluna de tempo de negócios e use SYSTEM SEQUENCE BY para a coluna de tempo do sistema. A tabela de destino adiciona __SYSTEM_START_AT e __SYSTEM_END_AT colunas junto com as colunas SCD Tipo 2 __START_AT e __END_AT. Para obter detalhes de sintaxe, consulte AUTO CDC INTO (pipelines) ou create_auto_cdc_flow.

Para um passo a passo de como inserções, atualizações, atualizações fora de ordem e exclusões afetam uma tabela bitemporal, consulte os exemplos de CDC automático bitemporal.

Use várias colunas para sequenciamento

Para sequenciar por várias colunas (por exemplo, um carimbo de data/hora e um ID para desempatar), use STRUCT para combiná-las. A API ordena os resultados pelo primeiro campo e, em caso de empate, considera o segundo campo, e assim por diante.

SQL
SEQUENCE BY STRUCT(timestamp_col, id_col)

Exemplos de AUTO CDC

Os exemplos a seguir demonstram o processamento de SCD Tipo 1 e Tipo 2 usando uma fonte de dados de alteração. Os dados de exemplo criam novos registros de usuário, excluem registros de usuário e atualizam registros de usuário. No exemplo SCD Tipo 1, as últimas operações UPDATE chegam atrasadas e são descartadas da tabela de destino, demonstrando o tratamento de eventos fora de ordem.

A seguir, estão os registros de entrada usados nestes exemplos. Esses dados são criados executando a consulta na seção Criar dados de exemplo .

userId

name

city

operation

sequenceNum

124

Raul

Oaxaca

INSERT

1

123

Isabel

Monterrey

INSERT

1

125

Mercedes

Tijuana

INSERT

2

126

Lily

Cancun

INSERT

2

123

null

null

DELETE

6

125

Mercedes

Guadalajara

UPDATE

6

125

Mercedes

Mexicali

UPDATE

5

123

Isabel

Chihuahua

UPDATE

5

Se você remover o comentário da última linha na consulta de geração de dados de exemplo, será inserido o seguinte registro que especifica o truncamento da tabela (limpeza da tabela) em sequenceNum=3:

userId

name

city

operation

sequenceNum

null

null

null

TRUNCATE

3

nota

Todos os exemplos a seguir incluem opções para especificar as operações DELETE e TRUNCATE , mas cada uma é opcional.

Criar dados de exemplo

Execute as seguintes instruções para criar um dataset de exemplo. Este código não se destina a ser executado como parte de uma definição pipeline . execute-o a partir da pasta de exploração do seu pipeline, em vez da pasta de transformações.

SQL
CREATE SCHEMA IF NOT EXISTS main.cdc_tutorial;

CREATE TABLE main.cdc_tutorial.users_cdf
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The batch at sequenceNum 6 is the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);

Atualizações do Processo SCD Tipo 1

O SCD Tipo 1 mantém apenas a versão mais recente de cada registro. O exemplo a seguir lê o feed de dados de alteração criado acima e aplica as alterações a um destino de tabela de transmissão. O que são pipelines? para executar este código.

Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr

@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")

dp.create_streaming_table("users_current")

dp.create_auto_cdc_flow(
target = "users_current",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)

Após executar o exemplo do SCD tipo 1, a tabela de destino contém os seguintes registros:

userId

name

city

124

Raul

Oaxaca

125

Mercedes

Guadalajara

126

Lily

Cancun

O usuário 123 (Isabel) foi excluído e não aparece mais. O usuário 125 (Mercedes) mostra apenas a cidade mais recente (Guadalajara) porque o SCD Tipo 1 sobrescreve os valores anteriores. O UPDATE anterior em sequenceNum=5 foi removido porque uma atualização posterior em sequenceNum=6 chegou.

Após executar o exemplo com o registro TRUNCATE descomentado, a tabela é limpa em sequenceNum=3. Isso significa que os registros 124 e 126 não estão na tabela, e a tabela de destino final contém apenas o seguinte registro:

userId

name

city

125

Mercedes

Guadalajara

Atualizações do Processo SCD Tipo 2

SCD Tipo 2 preserva um histórico completo de alterações, criando novas linhas para cada versão de um registro, com colunas __START_AT e __END_AT indicando quando cada versão estava ativa.

Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr

@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")

dp.create_streaming_table("users_history")

dp.create_auto_cdc_flow(
target = "users_history",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)

Após executar o exemplo do SCD tipo 2, a tabela de destino contém os seguintes registros:

userId

name

city

__START_AT

__END_AT

123

Isabel

Monterrey

1

5

123

Isabel

Chihuahua

5

6

124

Raul

Oaxaca

1

null

125

Mercedes

Tijuana

2

5

125

Mercedes

Mexicali

5

6

125

Mercedes

Guadalajara

6

null

126

Lily

Cancun

2

null

A mesa preserva toda a história. O usuário 123 possui duas versões (terminada na sequência 6 quando excluída). O usuário 125 possui três versões que mostram alterações na cidade. Registros com __END_AT = null estão atualmente ativos.

Rastreie um subconjunto de colunas com SCD Tipo 2.

Por default, SCD Tipo 2 cria uma nova versão sempre que o valor de qualquer coluna é alterado. Você pode especificar um subconjunto de colunas para rastrear, de forma que as alterações em outras colunas atualizem a versão atual no mesmo local, em vez de gerar um novo registro no histórico.

O exemplo a seguir exclui a coluna city da história acompanhamento:

Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr

@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")

dp.create_streaming_table("users_history")

dp.create_auto_cdc_flow(
target = "users_history",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)

Como as alterações city não são rastreadas, as atualizações de cidade sobrescrevem a linha atual em vez de criar uma nova versão. A tabela de destino contém os seguintes registros:

userId

name

city

__START_AT

__END_AT

123

Isabel

Chihuahua

1

6

124

Raul

Oaxaca

1

null

125

Mercedes

Guadalajara

2

null

126

Lily

Cancun

2

null

CDC AUTOMÁTICO DE exemplos de instantâneos

As seções a seguir fornecem exemplos de uso de AUTO CDC FROM SNAPSHOT para processar Snapshot em tabelas de destino SCD Tipo 1 ou Tipo 2. Para obter informações sobre quando usar esta API, consulte captura de dados de alterações (CDC) e Snapshot.

Exemplo: Captura instantânea do processo usando o tempo de ingestão pipeline

Utilize essa abordagem quando os snapshots chegarem regularmente e em ordem, e você puder confiar no timestamp de execução pipeline para o versionamento. Um novo Snapshot é incorporado a cada atualização pipeline .

Você pode ler Snapshots de vários tipos de origem, incluindo tabelas Delta , arquivos de armazenamento cloud e conexões JDBC .

o passo 1: Criar dados de exemplo

Crie uma tabela contendo dados de instantâneo. Execute o seguinte código a partir de um Notebook ou Databricks SQL na pasta explorations do seu pipeline:

SQL
CREATE SCHEMA IF NOT EXISTS main.cdc_tutorial;

CREATE TABLE main.cdc_tutorial.snapshot (
userId INT,
city STRING
);

INSERT INTO main.cdc_tutorial.snapshot VALUES
(1, 'Oaxaca'),
(2, 'Monterrey'),
(3, 'Tijuana');

o passo 2: execução AUTO CDC FROM Snapshot

O que são pipelines? Para executar o código neste passo.

Escolha um tipo de origem para a view de instantâneo (o código de criação de exemplo gera uma tabela Delta ):

Opção A: Ler de uma tabela Delta

Python
from pyspark import pipelines as dp

@dp.view(name="source")
def source():
return spark.read.table("main.cdc_tutorial.snapshot")

Opção B: Ler do armazenamento cloud

Python
from pyspark import pipelines as dp

@dp.view(name="source")
def source():
return spark.read.format("csv").option("header", True).load("<snapshot-path>")

Opção C: Ler do JDBC (somente compute clássica)

Python
from pyspark import pipelines as dp

@dp.view(name="source")
def source():
return (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
)

Todas as opções, escreva para o destino

Em seguida, adicione a tabela e o fluxo de destino:

Python
dp.create_streaming_table("target")

dp.create_auto_cdc_from_snapshot_flow(
target = "target",
source = "source",
keys = ["userId"],
stored_as_scd_type = 2
)

Após a primeira execução do pipeline, todos os registros são inseridos como linhas ativas:

userId

city

__START_AT

__END_AT

1

Oaxaca

0

null

2

Monterrey

0

null

3

Tijuana

0

null

nota

Para usar o SCD Tipo 1 e manter apenas o estado atual, defina stored_as_scd_type=1. Neste caso, a tabela de destino não inclui as colunas __START_AT e __END_AT .

o passo 3: Simule um novo Snapshot e execute novamente

Atualize a tabela de origem para simular a chegada de um novo Snapshot (execute este código a partir de um Notebook ou arquivo SQL na pasta explorations do seu pipeline):

SQL
TRUNCATE TABLE main.cdc_tutorial.snapshot;

INSERT INTO main.cdc_tutorial.snapshot VALUES
(2, 'Carmel'),
(3, 'Los Angeles'),
(4, 'Death Valley'),
(6, 'Kings Canyon');

executar o pipeline novamente. AUTO CDC FROM SNAPSHOT compara o novo Snapshot com o anterior e detecta que o usuário 1 foi excluído, os usuários 2 e 3 foram atualizados e os usuários 4 e 6 foram inseridos. Isso gera um feed de alterações e usa AUTO CDC para criar a tabela de saída.

Após a segunda execução com SCD Tipo 2, a tabela de destino contém os seguintes registros:

userId

city

__START_AT

__END_AT

1

Oaxaca

0

1

2

Monterrey

0

1

2

Carmelo

1

null

3

Tijuana

0

1

3

Los Angeles

1

null

4

Vale da Morte

1

null

6

Kings Canyon

1

null

O usuário 1 foi encerrado (excluído). Os usuários 2 e 3 têm duas versões cada, mostrando as alterações em suas cidades. Os usuários 4 e 6 foram inseridos recentemente.

Após a segunda execução com SCD Tipo 1, a tabela de destino mostra apenas o estado atual:

userId

city

2

Carmelo

3

Los Angeles

4

Vale da Morte

6

Kings Canyon

Exemplo: Captura instantânea do processo usando funções de versão

Utilize essa abordagem quando precisar de controle explícito sobre a ordem dos snapshots. Por exemplo, utilize esta abordagem quando vários Snapshots chegarem ao mesmo tempo ou quando os Snapshots chegarem fora de ordem. Você escreve uma função que especifica qual Snapshot processar em seguida e seu número de versão. A API processa os Snapshots em ordem crescente de versão:

  • Se houver vários Snapshots armazenados, todos serão processados em ordem.
  • Se um Snapshot chegar fora de ordem (por exemplo, snapshot_3 chegar depois snapshot_4), ele será ignorado.
  • Se não houver novos Snapshots, a função retorna None e nenhum processamento ocorre.

o passo 1: Preparar arquivos de instantâneo

Crie arquivos CSV contendo dados de Snapshot e adicione-os a um volume ou local de armazenamento cloud . Nomeie os arquivos cronologicamente (por exemplo, snapshot_1.csv, snapshot_2.csv).

Cada arquivo deve conter colunas para userId e city. Por exemplo:

snapshot_1.csv :

userId

city

1

Oaxaca

2

Monterrey

3

Tijuana

snapshot_2.csv :

userId

city

2

Carmelo

3

Los Angeles

4

Vale da Morte

o passo 2: execução AUTO CDC FROM Snapshot com função de versão

Crie um novo Notebook e cole o código de pipeline a seguir. Então O que são pipelines?.

Python
from pyspark import pipelines as dp
from typing import Optional, Tuple
from pyspark.sql import DataFrame

def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Optional[Tuple[DataFrame, int]]:
snapshot_dir = "/Volumes/main/cdc_tutorial/snapshots/" # or the location you created the sample data

files = dbutils.fs.ls(snapshot_dir)
snapshot_files = [f.name for f in files if f.name.startswith("snapshot_") and f.name.endswith(".csv")]

snapshot_versions = []
for filename in snapshot_files:
try:
version = int(filename.replace("snapshot_", "").replace(".csv", ""))
snapshot_versions.append(version)
except ValueError:
continue

snapshot_versions.sort()

if latest_snapshot_version is None:
if snapshot_versions:
next_version = snapshot_versions[0]
else:
return None
else:
next_versions = [v for v in snapshot_versions if v > latest_snapshot_version]
if next_versions:
next_version = next_versions[0]
else:
return None

snapshot_path = f"{snapshot_dir}snapshot_{next_version}.csv"
df = spark.read.format("csv").option("header", True).load(snapshot_path)
return (df, next_version)


dp.create_streaming_table("main.cdc_tutorial.target_versioned")

dp.create_auto_cdc_from_snapshot_flow(
target = "main.cdc_tutorial.target_versioned",
source = next_snapshot_and_version,
keys = ["userId"],
stored_as_scd_type = 2
)
nota

Para usar o SCD Tipo 1 em vez disso, defina stored_as_scd_type=1.

Após o processamento snapshot_1.csv, a tabela de destino contém os seguintes registros:

userId

city

__START_AT

__END_AT

1

Oaxaca

1

null

2

Monterrey

1

null

3

Tijuana

1

null

Após o processamento snapshot_2.csv, a tabela de destino contém os seguintes registros:

userId

city

__START_AT

__END_AT

1

Oaxaca

1

2

2

Monterrey

1

2

2

Carmelo

2

null

3

Tijuana

1

2

3

Los Angeles

2

null

4

Vale da Morte

2

null

nota

Lembre-se de que, para SCD Tipo 1, a tabela é exatamente igual ao Snapshot mais recente. A diferença é que as consultas subsequentes podem usar o feed de alterações para processar apenas os registros modificados.

o passo 3: Adicionar novo instantâneo

Adicione um novo arquivo CSV ao local de armazenamento com os dados modificados (por exemplo, valores de cidade alterados, novas linhas ou linhas removidas). Em seguida, execute o pipeline novamente para processar o novo Snapshot.

Exemplos de AUTO CDC Bitemporal

info

Beta

O CDC AUTOMÁTICO Bitemporal está em Beta.

O exemplo a seguir cria uma tabela de destino bitemporal a partir de um pequeno conjunto de eventos CDC sintéticos. A coluna bt tem o tempo de negócio e a coluna st tem o tempo do sistema.

Python
from pyspark import pipelines as dp

# Source: synthetic CDC events
dp.create_streaming_table(name="cdc_source")

@dp.append_flow(target="cdc_source", once=True)
def load_cdc_source():
return spark.createDataFrame(
[
(1, "x10", "y10", 10, 100),
(1, "x20", "y20", 20, 200)
],
schema="id INT, x STRING, y STRING, bt INT, st INT",
)

# Target: bitemporal table
dp.create_streaming_table(name="target_bitemporal")

dp.create_auto_cdc_flow(
target = "target_bitemporal",
source = "cdc_source",
keys = ["id"],
sequence_by = "bt",
system_sequence_by = "st",
stored_as_scd_type = "bitemporal"
)

Os passos seguintes demonstram como uma tabela bitemporal registra inserções, atualizações, atualizações fora de ordem e exclusões para uma única empresa. A coluna de sequenciamento gera as colunas __START_AT e __END_AT (tempo de negócio), e a coluna de sequenciamento do sistema gera as colunas __SYSTEM_START_AT e __SYSTEM_END_AT (tempo de sistema):

Coluna

Descrição

__START_AT

O horário comercial em que esta linha se tornou válida.

__END_AT

O período comercial em que a validade desta linha termina. null se válido por tempo indefinido.

__SYSTEM_START_AT

O horário do sistema no qual os dados desta linha e o intervalo de tempo de negócios são conhecidos como verdadeiros.

__SYSTEM_END_AT

O horário do sistema no qual os dados e o intervalo de tempo comercial desta linha são conhecidos por serem invalidados. null se conhecido por ser verdadeiro indefinidamente.

O sistema lida com eventos que chegam em qualquer ordem em ambas as linhas do tempo. Quando um evento chega com um tempo de negócios ou tempo de sistema anterior aos eventos já processados, o sistema corrige a história afetada em vez de apenas anexar ao final.

Etapa 1: Inserir

A Empresa A é adicionada em 18/07/2025 10:01:00 (tempo de negócios), mas não é ingerida até 10:05:00 (tempo do sistema).

Entrada:

CompanyId

pontos de dados

Sequenciamento

Sequenciamento do Sistema

Operação

A

XFv1

18/07/2025 10:01:00

18/07/2025 10:05:00

INSERT

Saída:

CompanyId

pontos de dados

__START_AT

__END_AT

__SYSTEM_START_AT

__SYSTEM_END_AT

A

XFv1

18/07/2025 10:01:00

null

18/07/2025 10:05:00

null

XFv1 é válido a partir das 10:01:00 sem término conhecido. O sistema tomou conhecimento deste fato no horário do sistema 10:05:00, sem fim conhecido.

Etapa 2: Atualizar

A Empresa A é atualizada em 18/07/2025 12:15:43 (hora comercial), e o sistema consome o evento às 12:20:00 (hora do sistema). O sistema preserva tanto o que acreditava ser conhecido antes da atualização quanto a história comercial corrigida após a ingestão da atualização.

Entrada:

CompanyId

pontos de dados

Sequenciamento

Sequenciamento do Sistema

Operação

A

XFv2

18/07/2025 12:15:43

18/07/2025 12:20:00

UPDATE

Saída:

CompanyId

pontos de dados

__START_AT

__END_AT

__SYSTEM_START_AT

__SYSTEM_END_AT

A

XFv1

18/07/2025 10:01:00

null

18/07/2025 10:05:00

18/07/2025 12:20:00

A

XFv1

18/07/2025 10:01:00

18/07/2025 12:15:43

18/07/2025 12:20:00

null

A

XFv2

18/07/2025 12:15:43

null

18/07/2025 12:20:00

null

Acreditava-se que o XFv1 era válido das 10:01:00 sem fim conhecido, e o sistema manteve essa crença das 10:05:00 até as 12:20:00. O XFv1 agora é conhecido por ser válido apenas até as 12:15:43, uma história corrigida e eficaz a partir do horário do sistema 12:20:00, sem fim conhecido. O XFv2 é válido a partir das 12:15:43, sem fim conhecido, e foi aprendido no horário do sistema 12:20:00.

Passo 3: Atualização fora de ordem

Uma atualização fora de ordem chega indicando que a Empresa A foi realmente atualizada em 18/07/2025 12:05:00 (tempo de negócio), mas não é processada até 12:25:00 (tempo do sistema). Quando uma atualização chega mais tarde no tempo do sistema, mas com um tempo de negócio anterior, o sistema corrige o tempo de negócio histórico e preserva tanto o que ele acreditava antes da atualização fora de ordem quanto o histórico corrigido.

Entrada:

CompanyId

pontos de dados

Sequenciamento

Sequenciamento do Sistema

Operação

A

XFv3

18/07/2025 12:05:00

18/07/2025 12:25:00

UPDATE

Saída:

CompanyId

pontos de dados

__START_AT

__END_AT

__SYSTEM_START_AT

__SYSTEM_END_AT

A

XFv1

18/07/2025 10:01:00

null

18/07/2025 10:05:00

18/07/2025 12:20:00

A

XFv1

18/07/2025 10:01:00

18/07/2025 12:15:43

18/07/2025 12:20:00

18/07/2025 12:25:00

A

XFv1

18/07/2025 10:01:00

18/07/2025 12:05:00

18/07/2025 12:25:00

null

A

XFv3

18/07/2025 12:05:00

18/07/2025 12:15:43

18/07/2025 12:25:00

null

A

XFv2

18/07/2025 12:15:43

null

18/07/2025 12:20:00

null

Acreditava-se que o XFv1 era válido das 10:01:00 às 12:15:43, e essa crença agora é válida no horário do sistema até 12:25:00. A nova atualização corrige a validade comercial do XFv1 para terminar às 12:05:00, uma história corrigida com efeito a partir do horário do sistema 12:25:00. O XFv3 agora é conhecido por ser válido das 12:05:00 até 12:15:43, uma crença válida no horário do sistema a partir das 12:25:00 sem fim conhecido.

Passo 4: Excluir

A Empresa A é excluída em 18/07/2025 às 12:30:00, e o sistema consome o evento às 12:30:00. Como uma operação de exclusão representa o fim da existência comercial da entidade, o sistema não cria nenhuma linha de substituição. XFv2 aparece em duas linhas, preservando um registro de auditoria completo de quando a empresa deixou de existir e de quando o sistema soube da exclusão.

Entrada:

CompanyId

pontos de dados

Sequenciamento

Sequenciamento do Sistema

Operação

A

XFv2

18/07/2025 12:30:00

18/07/2025 12:30:00

DELETE

Saída:

CompanyId

pontos de dados

__START_AT

__END_AT

__SYSTEM_START_AT

__SYSTEM_END_AT

A

XFv1

18/07/2025 10:01:00

null

18/07/2025 10:05:00

18/07/2025 12:20:00

A

XFv1

18/07/2025 10:01:00

18/07/2025 12:15:43

18/07/2025 12:20:00

18/07/2025 12:25:00

A

XFv1

18/07/2025 10:01:00

18/07/2025 12:05:00

18/07/2025 12:25:00

null

A

XFv3

18/07/2025 12:05:00

18/07/2025 12:15:43

18/07/2025 12:25:00

null

A

XFv2

18/07/2025 12:15:43

null

18/07/2025 12:20:00

18/07/2025 12:30:00

A

XFv2

18/07/2025 12:15:43

18/07/2025 12:30:00

18/07/2025 12:30:00

null

XFv2 era válido a partir das 12:15:43, sem fim conhecido, e o sistema manteve essa crença das 12:20:00 às 12:30:00. Após a exclusão ser ingerida, o XFv2 é conhecido por ser válido apenas até 12:30:00, uma história corrigida efetiva a partir do horário do sistema 12:30:00.

Limitações

  • A coluna de sequenciamento deve ser de um tipo de dados classificável. Os valores de sequenciamento NULL não são suportados.
  • AUTO CDC FROM SNAPSHOT É suportado apenas na interface de pipeline Python; a interface SQL não é suportada.
  • Para transmitir dados do destino de um processo AUTO CDC , leia o feed de alterações. Para obter detalhes, consulte Ler um feed de dados de alterações de uma tabela de destino AUTO CDC.

Recurso adicional