Relato replicável da construção do RAG com o DHBB no databricks.
Gerei no perplexity, pois o dbdemos mudou e não funciona mais aquele RAG antigo.
Neste guia, você aprenderá a construir um sistema completo de RAG (Retrieval-Augmented Generation) no Databricks Free usando arquivos de texto plano. RAG combina a recuperação de informações relevantes com geração de texto usando LLMs, melhorando a qualidade e precisão das respostas[1].
RAG é uma técnica que:
- Recupera documentos relevantes de uma base de dados vetorial
- Aumenta o contexto da pergunta do usuário com esses documentos
- Gera respostas usando um LLM com base no contexto recuperado
Arquivos de Texto → Chunk → Embeddings → Vector Search → Recuperação ↓ Pergunta do Usuário → Busca Vetorial → Contexto + LLM → Resposta
- Conta Databricks Free ativa em
https://databricks.com - Acesso a um workspace Databricks
- Arquivos de texto plano (
.txt) com seu conhecimento - Conhecimento básico de Python e SQL
- Clusters: Até 2 workers (8 GB RAM total recomendado)
- Execução: Notebooks compartilhados com limite de cores
- Vector Search: Disponível mas com limites de throughput
- Model Serving: Endpoints gerenciados disponíveis
- Acesse sua conta Databricks Free
- Crie um novo cluster:
- Runtime:
14.3 LTSou superior - Workers:
1 worker(para economizar recursos) - Worker type:
i3.xlargeou similar
- Runtime:
- Aguarde o cluster iniciar (5-10 minutos)
O Vector Search do Databricks requer Unity Catalog:
- No workspace, acesse Settings → Unity Catalog
- Crie um Metastore (se não existir):
- Nome:
rag_metastore - Cloud: Sua região (AWS/Azure/GCP)
- Nome:
- Crie um Catalog:
- Nome:
rag_catalog
- Nome:
- Crie um Schema:
- Nome:
rag_schema
- Nome:
- No workspace, clique em Notebooks
- Crie um novo notebook:
- Linguagem:
Python - Cluster: Selecione o cluster criado
- Nome:
RAG_Pipeline
- Linguagem:
- Na seção Data do workspace:
- Clique em Create Table
- Selecione Upload file
- Escolha seus arquivos
.txt
- Alternatively, carregue via código Python
Execute este código no seu notebook:
%pip install databricks-vectorsearch %pip install sentence-transformers %pip install langchain %pip install mlflow
from pyspark.sql.types import StructType, StructField, StringType, IntegerType from pyspark.sql.functions import col, concat_ws, lit, row_number from pyspark.sql.window import Window import os
local_files_path = "/Workspace/Users/your_email/documents/" # Ajuste seu caminho
txt_files = [f for f in os.listdir(local_files_path) if f.endswith('.txt')]
print(f"Arquivos encontrados: {txt_files}")
data = [] for filename in txt_files: filepath = os.path.join(local_files_path, filename) with open(filepath, 'r', encoding='utf-8') as f: content = f.read() data.append({ 'file_name': filename, 'content': content })
df = spark.createDataFrame(data) df.display()
df.write.format('delta').mode('overwrite').saveAsTable( 'rag_catalog.rag_schema.raw_documents' )
print("✓ Tabela de documentos brutos criada!")
Documentos grandes precisam ser divididos em chunks menores para embedding eficiente.
from pyspark.sql.functions import udf, col, explode, array, when from pyspark.sql.types import ArrayType, StringType
def chunk_text(text, chunk_size=500, overlap=100): """ Divide texto em chunks com sobreposição
Args:
text: Texto a ser dividido
chunk_size: Tamanho de cada chunk (caracteres)
overlap: Sobreposição entre chunks
Returns:
Lista de chunks
"""
if not text or len(text.strip()) == 0:
return []
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
chunk = text[start:end].strip()
if len(chunk) > 20: # Só inclui chunks com mais de 20 chars
chunks.append(chunk)
start = end - overlap # Próximo chunk com sobreposição
return chunks if chunks else []
chunk_udf = udf(chunk_text, ArrayType(StringType()))
df_chunked = spark.table('rag_catalog.rag_schema.raw_documents')
.select('file_name', chunk_udf('content').alias('chunks'))
.select('file_name', explode('chunks').alias('chunk_text'))
window = Window.orderBy('file_name') df_chunked = df_chunked.withColumn( 'chunk_id', row_number().over(window) )
df_chunked.write.format('delta').mode('overwrite').saveAsTable( 'rag_catalog.rag_schema.documents_chunks' )
print(f"✓ {df_chunked.count()} chunks criados!") df_chunked.display()
from sentence_transformers import SentenceTransformer import numpy as np
model = SentenceTransformer('all-MiniLM-L6-v2') # Pequeno e rápido
print(f"Dimensão de embeddings: {model.get_sentence_embedding_dimension()}")
def generate_embedding(text): """Gera embedding para um texto""" if not text: return None embedding = model.encode(text) return embedding.tolist() # Converter para lista para Spark
from pyspark.sql.types import ArrayType, DoubleType
embedding_udf = udf(generate_embedding, ArrayType(DoubleType()))
df_embeddings = spark.table('rag_catalog.rag_schema.documents_chunks')
.withColumn('embedding', embedding_udf('chunk_text'))
df_embeddings.write.format('delta').mode('overwrite').saveAsTable( 'rag_catalog.rag_schema.documents_with_embeddings' )
print("✓ Embeddings gerados e salvos!") df_embeddings.display()
spark.sql(""" ALTER TABLE rag_catalog.rag_schema.documents_with_embeddings SET TBLPROPERTIES ('delta.enableRowTracking' = 'true') """)
print("✓ Vector Search habilitado na tabela!")
from databricks.vector_search.client import VectorSearchClient
client = VectorSearchClient()
endpoint_name = "rag-endpoint" index_name = "documents_index"
try: endpoints = client.list_endpoints() endpoint_exists = any(ep.get('name') == endpoint_name for ep in endpoints.get('endpoints', []))
if not endpoint_exists:
print(f"Criando endpoint: {endpoint_name}...")
client.create_endpoint(
name=endpoint_name,
endpoint_type="STANDARD"
)
print(f"✓ Endpoint criado: {endpoint_name}")
else:
print(f"✓ Endpoint já existe: {endpoint_name}")
except Exception as e: print(f"Nota: {e}")
import time
try: # Criar índice com Delta Sync index = client.create_index( endpoint_name=endpoint_name, index_name=index_name, primary_key="chunk_id", embedding_dimension=384, # Dimensão do all-MiniLM-L6-v2 embedding_vector_column="embedding", text_column="chunk_text", source_table_name="rag_catalog.rag_schema.documents_with_embeddings", pipeline_type="DELTA_SYNC" # Sincroniza automaticamente )
print(f"✓ Índice criado: {index_name}")
# Aguardar sincronização inicial
time.sleep(10)
except Exception as e: print(f"Índice pode já existir: {e}")
print("✓ Vector Search configurado!")
def retrieve_context(query_text, top_k=3): """ Busca nos documentos os chunks mais relevantes
Args:
query_text: Pergunta do usuário
top_k: Número de resultados a retornar
Returns:
Lista de chunks relevantes
"""
from sentence_transformers import SentenceTransformer
# Gerar embedding da pergunta
model = SentenceTransformer('all-MiniLM-L6-v2')
query_embedding = model.encode(query_text).tolist()
# Buscar no Vector Search
results = client.query_index(
endpoint_name=endpoint_name,
index_name=index_name,
query_vector=query_embedding,
num_results=top_k
)
# Extrair chunks do resultado
retrieved_chunks = []
for result in results.get('result', {}).get('data_array', []):
if len(result) > 0:
retrieved_chunks.append(result[0])
return retrieved_chunks
test_query = "Qual é o tema principal dos documentos?" print(f"Buscando por: {test_query}")
context = retrieve_context(test_query, top_k=3) print(f"Resultados encontrados: {len(context)}") for i, chunk in enumerate(context): print(f"\n[{i+1}] {chunk[:200]}...")
import requests import json from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
llm_endpoint_name = "databricks-meta-llama-2-7b" # Ou outro modelo disponível
def generate_response(user_query, context_chunks): """ Gera resposta usando LLM com contexto recuperado
Args:
user_query: Pergunta do usuário
context_chunks: Chunks recuperados
Returns:
Resposta gerada pelo LLM
"""
# Construir contexto
context = "\n".join(context_chunks[:3]) # Top 3 chunks
# Criar prompt
prompt = f"""Use o seguinte contexto para responder a pergunta do usuário.
Contexto: {context}
Pergunta: {user_query}
Resposta:"""
# Chamar modelo
try:
response = w.serving_endpoints.query(
name=llm_endpoint_name,
inputs={"prompt": prompt}
)
return response.get('predictions', ['Erro na geração'])[0]
except Exception as e:
return f"Erro ao gerar resposta: {str(e)}"
user_question = "Qual é o tema principal dos documentos?" retrieved = retrieve_context(user_question, top_k=3) response = generate_response(user_question, retrieved)
print(f"\n{'='*60}") print(f"Pergunta: {user_question}") print(f"{'='*60}") print(f"Resposta:\n{response}") print(f"{'='*60}")
import mlflow from mlflow.pyfunc import PythonModel from mlflow.utils.environment import _mlflow_conda_env
class RAGPipeline(PythonModel): """Pipeline RAG completo"""
def __init__(self):
self.model = SentenceTransformer('all-MiniLM-L6-v2')
self.client = VectorSearchClient()
self.endpoint_name = "rag-endpoint"
self.index_name = "documents_index"
def predict(self, context, model_input):
"""
Fazer previsão (pergunta → resposta)
Args:
model_input: DataFrame com coluna 'query'
Returns:
DataFrame com coluna 'response'
"""
queries = model_input['query'].tolist()
responses = []
for query in queries:
# 1. Recuperar contexto
query_embedding = self.model.encode(query).tolist()
results = self.client.query_index(
endpoint_name=self.endpoint_name,
index_name=self.index_name,
query_vector=query_embedding,
num_results=3
)
# 2. Extrair chunks
chunks = [r[0] for r in results.get('result', {}).get('data_array', []) if r]
context_text = "\n".join(chunks[:3])
# 3. Gerar resposta
prompt = f"""Baseado no contexto abaixo, responda a pergunta:
Contexto: {context_text}
Pergunta: {query}
Resposta:"""
# Usar API ou modelo local
response = f"Resposta baseada em: {query}" # Simplificado para demo
responses.append(response)
return {'response': responses}
mlflow.set_experiment('/Users/your_email/RAG_Experiment')
with mlflow.start_run(): mlflow.pyfunc.log_model( 'rag_pipeline', python_model=RAGPipeline(), artifact_path='rag_model' ) print("✓ Modelo RAG registrado no MLflow!")
model_uri = "runs:/your_run_id/rag_pipeline"
mlflow.register_model(model_uri, "rag-production-model")
print("✓ Modelo registrado para deploy!")
print("="*60) print("TESTE COMPLETO DO RAG") print("="*60)
test_queries = [ "Qual é o assunto principal?", "Explique os conceitos chave", "Resuma o conteúdo" ]
for i, query in enumerate(test_queries, 1): print(f"\n[Teste {i}]") print(f"Pergunta: {query}")
# Recuperar
retrieved = retrieve_context(query, top_k=2)
print(f"Chunks encontrados: {len(retrieved)}")
if retrieved:
print(f"Preview: {retrieved[0][:100]}...")
# Gerar resposta (simplificado)
print(f"Status: ✓ Sucesso")
spark.catalog.clearCache()
chunk_size = 300 # Ao invés de 500
top_k = 2 # Ao invés de 5
print("✓ Otimizações aplicadas!")
| Problema | Solução |
|---|---|
| "Vector Search não disponível" | Habilite Unity Catalog nas configurações |
| "Erro ao gerar embeddings" | Aumente memória do cluster ou reduza batch size |
| "Index não sincroniza" | Verifique se a tabela tem delta.enableRowTracking = true |
| "Endpoint de LLM não responde" | Verifique se modelo está iniciado em Serving |
| "Out of memory" | Reduza chunk_size ou top_k |
- Melhorar qualidade: Adicione re-ranking e filtros de relevância
- Avaliar desempenho: Use métricas como NDCG e MRR para retrieval
- Fine-tuning: Adapte prompts para seu caso de uso
- Monitoramento: Configure alertas no Model Serving
- Escalar: Quando necessário, migre para tier pago
[1] Databricks. (2024). RAG (Retrieval Augmented Generation) em Databricks. Disponível em: https://docs.databricks.com [2] Developers Blog. (2024). Como criar um fluxo de RAG. Disponível em: https://imasters.com.br/data [3] LinkedIn. (2024). RAG with Databricks. Disponível em: https://pt.linkedin.com/pulse [4] Tonic AI. (2024). Building a Databricks RAG System. Disponível em: https://www.tonic.ai/blog [5] Hugging Face. (2024). Sentence Transformers. Disponível em: https://www.sbert.net
-- Ver tabelas criadas SELECT * FROM rag_catalog.rag_schema.documents_with_embeddings;
-- Contabilizar chunks SELECT COUNT(*) as total_chunks FROM rag_catalog.rag_schema.documents_chunks;
-- Verificar tamanho de embeddings SELECT LENGTH(embedding) as embedding_dim FROM rag_catalog.rag_schema.documents_with_embeddings LIMIT 1;
-- Buscar por palavra-chave (simples) SELECT chunk_id, chunk_text FROM rag_catalog.rag_schema.documents_chunks WHERE chunk_text LIKE '%palavra%';
Última atualização: Dezembro 2024 Versão: 1.0