Pular para o conteúdo principal

Carregar dados no AI Runtime

info

Pré-visualização pública

Runtime AI para tarefas de nó único está em versão prévia pública. A API de treinamento distribuído para cargas de trabalho com múltiplas GPUs permanece em versão Beta.

Esta seção aborda informações sobre o carregamento de dados no AI Runtime , especificamente para aplicações de aprendizado ML e aprendizado profundo (DL). Confira o tutorial para saber mais sobre como carregar e transformar dados usando a API Spark Python .

nota

É necessário o Unity Catalog. Todo o acesso a dados no AI Runtime é feito através do Unity Catalog. Suas tabelas e volumes devem estar registrados no Unity Catalog e acessíveis ao seu usuário ou entidade de serviço.

Carregar dados tabulares

Utilize Spark Connect para carregar dados tabulares machine learning a partir de tabelasDelta.

Para treinamento de nó único, você pode converter DataFrames Apache Spark em DataFrames Pandas usando o métodoPySpark toPandas() e, opcionalmente, converter para o formato NumPy usando o métodoPySpark to_numpy().

nota

O Spark Connect adia a análise e a resolução de nomes para o momento da execução, o que pode alterar o comportamento do seu código. Veja Comparar Spark Connect com Spark Classic.

Spark Connect é compatível com a maioria APIs PySpark , incluindo Spark SQL, Pandas API on Spark, transmissão estruturada e MLlib (baseada em DataFrame). Consulte a documentação de referência da API PySpark para obter informações sobre as APIs suportadas mais recentes.

Para outras limitações, consulte Limitações compute sem servidor.

Carregar tabelas Delta grandes usando volumes

Para tabelas Delta grandes demais para serem convertidas com toPandas(), exporte os dados para um volume Unity Catalog e carregue-os diretamente usando PyTorch ou Hugging Face:

Python
# Step 1: Export the Delta table to Parquet files in a UC volume
output_path = "/Volumes/catalog/schema/my_volume/training_data"
spark.table("catalog.schema.my_table").write.mode("overwrite").parquet(output_path)
Python
# Step 2: Load the exported data directly using Hugging Face datasets
from datasets import load_dataset

dataset = load_dataset("parquet", data_files="/Volumes/catalog/schema/my_volume/training_data/*.parquet")

Essa abordagem evita a sobrecarga Spark durante o treinamento e funciona bem tanto para treinamento com uma única GPU quanto para treinamento distribuído com fluxo de trabalho.

Carregar dados não estruturados de volumes com UCVolumeDataset

Para dados não estruturados, como imagens, áudio e arquivos de texto armazenados em volumes do Unity Catalog, use UCVolumeDataset do pacote serverless_gpu.data. UCVolumeDataset é um PyTorch IterableDataset que copia cada arquivo do volume para um cache local rápido no primeiro acesso e retorna o caminho do arquivo local armazenado em cache. Ele aborda as questões de desempenho e distribuição que, de outra forma, seriam implementadas manualmente:

  • Armazenamento em cache local. Os arquivos são copiados do ponto de montagem FUSE para um diretório de cache local no primeiro acesso e servidos do cache depois, para que o treinamento multi-época não releia o volume.
  • Particionamento automático. Quando torch.distributed é inicializado, os arquivos são particionados por classificações e, então, divididos entre DataLoader workers, para que cada par (rank, worker) receba uma fatia não sobreposta sem configuração adicional.
nota

UCVolumeDataset e serverless_gpu.data.DataLoader requerem ambiente GPU 5 ou acima.

UCVolumeDataset retorna caminhos de arquivo locais RAW. Para decodificar esses arquivos em tensores, envolva-o em um segundo IterableDataset que consome a transmissão de caminho e aplica sua lógica de análise. Isso mantém as preocupações de I/O e de análise separadas.

Python
from serverless_gpu.data import UCVolumeDataset
from torch.utils.data import IterableDataset
from PIL import Image
import torchvision.transforms.functional as TF

class ImageDataset(IterableDataset):
"""Decodes each cached file path from UCVolumeDataset into a tensor."""

def __init__(self, path_dataset: UCVolumeDataset):
self._path_dataset = path_dataset

def __iter__(self):
for local_path in self._path_dataset:
image = Image.open(local_path).convert("RGB")
yield TF.to_tensor(image)

path_dataset = UCVolumeDataset("/Volumes/catalog/schema/my_volume/images")
dataset = ImageDataset(path_dataset)

O wrapper recebe caminhos locais já armazenados em cache, portanto, o passo de análise nunca toca na montagem FUSE. É possível encadear wrappers adicionais para aumento, tokenização ou filtragem.

Para desempenho ideal, combine UCVolumeDataset com serverless_gpu.data.DataLoader em vez do PyTorch padrão DataLoader. É ajustado para I/O de GPU serverless e busca e armazena arquivos em cache simultaneamente enquanto a GPU computa. Consulte desempenho no carregamento de dados.

Carregar dados dentro do decorador @distributed

Ao usar a APIde GPU sem servidor para treinamento distribuído, mova o código de carregamento de dados para dentro do decorador @distributed . O tamanho dataset pode exceder o tamanho máximo permitido pelo pickle, portanto, recomenda-se gerar o dataset dentro do decorador, conforme mostrado abaixo:

Python
from serverless_gpu import distributed

# This may cause a pickle error if the dataset is too large
dataset = get_dataset(file_path)

@distributed(gpus=8, gpu_type='H100')
def run_train():
# Load data inside the decorator to avoid pickle serialization issues
dataset = get_dataset(file_path)
...

Ao construir um UCVolumeDataset dentro do decorador, ele lê as informações de classificação torch.distributed no momento da iteração e particiona arquivos automaticamente entre as classificações, para que não seja necessário um DistributedSampler para dados de volume baseados em arquivo.

Desempenho de carregamento de dados

/Workspace e os diretórios /Volumes estão hospedados no armazenamento remoto do Unity Catalog. Se o seu dataset estiver armazenado no Unity Catalog, a velocidade de carregamento de dados é limitada pela largura de banda de rede disponível. Se você estiver treinando várias épocas, a abordagem recomendada é usar UCVolumeDataset, que realiza o cache para você: ele copia cada arquivo para o armazenamento local no primeiro acesso e atende às leituras subsequentes a partir da cópia local. Para datasets em volumes, prefira-o em vez de um shutil.copytree manual, que copia a árvore inteira antecipadamente mesmo que o treinamento toque apenas uma parte dela.

Se o dataset for grande, as seguintes técnicas podem melhorar a taxa de transferência:

  • Use serverless_gpu.data.DataLoader para paralelizar a obtenção. Esta é uma subclasse drop-in de torch DataLoader ajustada para E/S de GPU serverless: num_workers defaults to 6 e prefetch_factor to 4 (em comparação com os 0 e 2 do PyTorch), então os arquivos são buscados e armazenados em cache simultaneamente enquanto a GPU computa. Isso também registra o log do tempo de busca por lotes na execução ativa do MLflow, o que ajuda a identificar gargalos no carregamento de dados.

    Python
    from serverless_gpu.data import DataLoader

    loader = DataLoader(
    dataset,
    batch_size=32,
    pin_memory=True,
    # num_workers=6, by default
    # prefetch_factor=4, by default
    # raise num_workers to increase parallel reads, or prefetch_factor to deepen each worker's queue.
    )

    Todas as classificações devem usar o mesmo valor de num_workers, porque UCVolumeDataset particiona arquivos usando um passo global em world_size × num_workers slots. Incompatibilidades de valores causam que os arquivos sejam duplicados ou ignorados.

  • Aumentar o tamanho do lote. Lotes maiores amortizam a sobrecarga de carregamento de dados por lote em mais amostras e reduzem o número de operações de busca de arquivo por passo. Se a memória da GPU for o fator limitante, combine um tamanho de lote maior com acumulação de gradiente para preservar o tamanho efetivo do lote.

Conjunto de dados Me

Para conjuntos de dados muito grandes que não cabem na memória, utilize abordagens de transmissão: