Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
237 changes: 133 additions & 104 deletions applications/finetune-quora-embeddings/embed.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import os
from IPython import embed

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The embed import from IPython is not used in the code. Consider removing it to keep the code clean.

from regex import D

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The D import from regex is not used in the code. Consider removing it to keep the code clean.


from torch import Generator
from modal import Volume, Image, Stub, gpu, Secret
from helpers.models import EmbeddingModel, Provider
import tenacity
Expand Down Expand Up @@ -101,43 +105,33 @@ def return_sentence_batchs(
yield batch


def update_dataset_with_embeddings(
def yield_dataset_with_embeddings(
dataset: Dataset,
sentence_to_id_map: dict[str, int],
sentence_embeddings,
):
import pyarrow as pa

# We generate a new
dataset_questions_with_embeddings = []
dataset_labels = []
s2id: dict[str, int],
sentence_embeddings: dict[str, list[float]],
) -> Generator:
for row in dataset:
# s is the sentence
# h is the hash
# id is the id from the original dataset
s1, s2 = row["questions"]["text"]
sentence_1_embedding_id = sentence_to_id_map[s1]
sentence_2_embedding_id = sentence_to_id_map[s2]
h1, h2 = hash(s1), hash(s2)
id1, id2 = s2id[h1], s2id[h2]

yield {
"id1": id1,
"id2": id2,
"embedding1": sentence_embeddings[id1],
"embedding2": sentence_embeddings[id2],
"is_duplicate": bool(row["is_duplicate"] == 1),
}

sentence_1_embedding = sentence_embeddings[sentence_1_embedding_id]
sentence_2_embedding = sentence_embeddings[sentence_2_embedding_id]

new_dataset_row_with_embeddings = {
"id": row["questions"]["id"],
"text": row["questions"]["text"],
"embeddings": [sentence_1_embedding, sentence_2_embedding],
}
dataset_questions_with_embeddings.append(new_dataset_row_with_embeddings)
dataset_labels.append(row["is_duplicate"])

# Convert the sentences and their embeddings to a table
return pa.Table.from_arrays(
[
pa.array(dataset_questions_with_embeddings),
pa.array(dataset_labels),
],
names=[
"questions",
"is_duplicate",
],
)
# pd.DataFrame(
# yield_dataset_with_embeddings(
# train_dataset, sentence_to_id_map, sentence_embeddings
# ).to_pyarrow()
# columns: id1, id2, embedding1, embedding2, is_duplicate


@stub.function(image=image, volumes={DATASET_DIR: DATASET_VOLUME})
Expand All @@ -155,6 +149,55 @@ def download_dataset():
DATASET_VOLUME.commit()


@classmethod
def from_name(cls, model_name: str) -> EmbeddingModel:
if MODEL_TO_PROVIDER[model_name] == Provider.HUGGINGFACE:
return EmbeddingModel.from_hf(model_name)

if MODEL_TO_PROVIDER[model_name] == Provider.COHERE:
return EmbeddingModel.from_cohere(model_name)

if MODEL_TO_PROVIDER[model_name] == Provider.OPENAI:
return EmbeddingModel.from_openai(model_name, max_limit=20)

raise ValueError(
f"Invalid Model of {model_name} was supplied to embed_dataset function"
)


async def process_embeddings(
embed_model,
combined_dataset,
combined_num_rows,
sentence_to_id_map=None,
sentence_embeddings=None,
):
if sentence_to_id_map is None:
sentence_to_id_map = {}

if sentence_embeddings is None:
sentence_embeddings = [] # should have some idea of IDS

retrying = tenacity.Retrying(
wait=tenacity.wait_exponential(multiplier=1, min=4, max=10),
stop=tenacity.stop_after_attempt(5),
)
for attempt in retrying:
with attempt:
try:
batch_size = BATCH_SIZE_CONFIG[embed_model.provider]
sentences = return_sentence_batchs(
combined_dataset, sentence_to_id_map, batch_size
)
sentence_embeddings = await embed_model.embed(sentences)
if len(sentence_embeddings) == combined_num_rows:
break
except Exception as e:
print(f"Error occurred while creating embeddings: {e}")
raise e
return sentence_to_id_map, sentence_embeddings_map


@stub.function(
image=image,
gpu=GPU_CONFIG,
Expand All @@ -175,89 +218,72 @@ async def split_embed_train_test(model_name: str):

# Load the dataset for embedding
dataset = load_from_disk(f"{DATASET_DIR}/{DATASET_NAME}")
test_dataset = dataset["test"]
train_dataset = dataset["train"]
combined_dataset = concatenate_datasets([test_dataset, train_dataset])
embed_model = EmbeddingModel(model_name)

combined_num_rows = 408651 # Extract
# First Load the model
if MODEL_TO_PROVIDER[model_name] == Provider.HUGGINGFACE:
embed_model = EmbeddingModel.from_hf(model_name)
elif MODEL_TO_PROVIDER[model_name] == Provider.COHERE:
embed_model = EmbeddingModel.from_cohere(model_name)
elif MODEL_TO_PROVIDER[model_name] == Provider.OPENAI:
embed_model = EmbeddingModel.from_openai(model_name, max_limit=20)
else:
raise ValueError(
f"Invalid Model of {model_name} was supplied to embed_dataset function"
)
combined_dataset = concatenate_datasets([dataset["test"], dataset["train"]])

sentence_to_id_map = dict()
# we've precomputed the combined number of rows
combined_rows = 408651

retrying = tenacity.Retrying(
wait=tenacity.wait_exponential(multiplier=1, min=4, max=10),
stop=tenacity.stop_after_attempt(5),
sentence_to_id_map, sentence_embeddings = await process_embeddings(
embed_model, combined_dataset, combined_num_rows=combined_rows
)
for attempt in retrying:
with attempt:
try:
batch_size = BATCH_SIZE_CONFIG[embed_model.provider]
sentences = return_sentence_batchs(
combined_dataset, sentence_to_id_map, batch_size
)
sentence_embeddings = await embed_model.embed(sentences)
if len(sentence_embeddings) == combined_num_rows:
break
except Exception as e:
print(f"Error occurred while creating embeddings: {e}")
raise e

return update_dataset_with_embeddings(
train_dataset,
sentence_to_id_map,
sentence_embeddings,
), update_dataset_with_embeddings(
test_dataset,
sentence_to_id_map,
sentence_embeddings,
train_generator = yield_dataset_with_embeddings(
dataset["train"], sentence_to_id_map, sentence_embeddings
)

test_generator = yield_dataset_with_embeddings(
dataset["test"], sentence_to_id_map, sentence_embeddings
)

@stub.function(image=image, volumes={DATASET_DIR: DATASET_VOLUME}, timeout=2400)
def generate_embeddings():
import pyarrow as pa
import os
return train_generator, test_generator

model_names = list(MODEL_TO_PROVIDER.keys())

if not os.path.exists(CACHE_DIRECTORY):
os.makedirs(CACHE_DIRECTORY)
@stub.function(image=image, volumes={DATASET_DIR: DATASET_VOLUME}, timeout=2400)
def generate_embeddings(model_name):
train_dir = f"{CACHE_DIRECTORY}/{model_name}-train.arrow"
test_dir = f"{CACHE_DIRECTORY}/{model_name}-test.arrow"

for model_name, resp in zip(
model_names, split_embed_train_test.map(model_names, order_outputs=True)
):
if has_embedding_cache(model_name):
print(f"Embedding has already been generated for {model_name}")
continue

train_dataset, test_dataset = resp
model_slug = model_name
if MODEL_TO_PROVIDER[model_name] == Provider.HUGGINGFACE:
model_slug = model_name.split("/").pop()

with pa.OSFile(f"{CACHE_DIRECTORY}/{model_slug}-train.arrow", "wb") as sink:
writer = pa.RecordBatchFileWriter(sink, train_dataset.schema)
writer.write_table(train_dataset)
writer.close()

with pa.OSFile(f"{CACHE_DIRECTORY}/{model_slug}-test.arrow", "wb") as sink:
writer = pa.RecordBatchFileWriter(sink, test_dataset.schema)
writer.write_table(test_dataset)
writer.close()

print(f"Cache files generated for {model_name}")
if has_embedding_cache(model_name):
print(f"Embedding has already been generated for {model_name}")
continue

train_dataset, test_dataset = split_embed_train_test(model_name)

start = time.time()
for split, dataset_generator in [
("train", train_dataset),
("test", test_dataset),
]:
print(f"saving {split=} for {model_name=}")
for dataset in dataset_generator:
print(dataset)
pass
total_time = time.time() - start

try:
DATASET_VOLUME.commit()
print("Succesfully saved changes")
print("Succesfully saved changes")
saved = True
except Exception as e:
print(f"Error occurred while saving changes: {e}")
saved = False
raise e

return {
"train": train_dir,
"test": test_dir,
"time (s)": round(total_time, 4),
"model": model_name,
"is_successful": saved,
}


def model_slug(model_name):
if MODEL_TO_PROVIDER[model_name] == Provider.HUGGINGFACE:
return model_name.split("/").pop()
return model_name


@stub.function(image=image, volumes={DATASET_DIR: DATASET_VOLUME}, timeout=2400)
Expand All @@ -282,4 +308,7 @@ def validate_dataset():
@stub.local_entrypoint()
def main():
# download_dataset.remote()
generate_embeddings.remote()
for resp in generate_embeddings.map(
[model for model in MODEL_TO_PROVIDER.keys() if not has_embedding_cache(model)]
):
print(resp)
Loading