Skip to content

Commit

Permalink
Merge pull request #1 from HenryL27/main
Browse files Browse the repository at this point in the history
fix formatting
  • Loading branch information
jonfritz authored Jan 16, 2025
2 parents e226d1f + da00b8d commit e8c0d30
Showing 1 changed file with 77 additions and 44 deletions.
121 changes: 77 additions & 44 deletions notebooks/integrations/aryn/aryn-elasticsearch-blog-dataprep.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@
"from sycamore.transforms.partition import ArynPartitioner\n",
"from sycamore.transforms.extract_schema import LLMPropertyExtractor\n",
"from sycamore.transforms.summarize_images import SummarizeImages, LLMImageSummarizer\n",
"from sycamore.transforms.standardizer import USStateStandardizer, DateTimeStandardizer, ignore_errors\n",
"from sycamore.transforms.standardizer import (\n",
" USStateStandardizer,\n",
" DateTimeStandardizer,\n",
" ignore_errors,\n",
")\n",
"from sycamore.transforms.merge_elements import GreedySectionMerger\n",
"from sycamore.functions.tokenizer import HuggingFaceTokenizer\n",
"from sycamore.transforms.embed import SentenceTransformerEmbedder\n",
Expand All @@ -33,18 +37,24 @@
"import pyarrow.fs\n",
"\n",
"llm = OpenAI(OpenAIModels.GPT_4O_MINI)\n",
"os.environ[\"ARYN_API_KEY\"] = \"<MY-ARYN-API-KEY>\"\n",
"os.environ[\"ARYN_API_KEY\"] = \"<MY-ARYN-API-KEY>\"\n",
"\n",
"paths = [\"s3://aryn-public/ntsb/\"]\n",
"\n",
"context = sycamore.init()\n",
"# Add exec_mode=ExecMode.LOCAL to .init to run without Ray\n",
"docset = context.read.binary(paths=paths, binary_format=\"pdf\")\n",
"docset = docset.materialize(path=\"./elasticsearch-tutorial/downloaded-docset\", source_mode=sycamore.MATERIALIZE_USE_STORED)\n",
"docset = docset.materialize(\n",
" path=\"./elasticsearch-tutorial/downloaded-docset\",\n",
" source_mode=sycamore.MATERIALIZE_USE_STORED,\n",
")\n",
"# Make sure your Aryn token is accessible in the environment variable ARYN_API_KEY\n",
"partitioned_docset = (docset.partition(partitioner=ArynPartitioner(extract_table_structure=True, extract_images=True))\n",
" .materialize(path=\"./elasticsearch-tutorial/partitioned-docset\", source_mode=sycamore.MATERIALIZE_USE_STORED)\n",
" )\n",
"partitioned_docset = docset.partition(\n",
" partitioner=ArynPartitioner(extract_table_structure=True, extract_images=True)\n",
").materialize(\n",
" path=\"./elasticsearch-tutorial/partitioned-docset\",\n",
" source_mode=sycamore.MATERIALIZE_USE_STORED,\n",
")\n",
"partitioned_docset.execute()"
]
},
Expand All @@ -56,51 +66,72 @@
"outputs": [],
"source": [
"schema = {\n",
" 'type': 'object',\n",
" 'properties': {'accidentNumber': {'type': 'string'},\n",
" 'dateAndTime': {'type': 'date'},\n",
" 'location': {'type': 'string', 'description': 'US State where the incident occured'},\n",
" 'aircraft': {'type': 'string'},\n",
" 'aircraftDamage': {'type': 'string'},\n",
" 'injuries': {'type': 'string'},\n",
" 'definingEvent': {'type': 'string'}},\n",
" 'required': ['accidentNumber',\n",
" 'dateAndTime',\n",
" 'location',\n",
" 'aircraft']\n",
" }\n",
" \"type\": \"object\",\n",
" \"properties\": {\n",
" \"accidentNumber\": {\"type\": \"string\"},\n",
" \"dateAndTime\": {\"type\": \"date\"},\n",
" \"location\": {\n",
" \"type\": \"string\",\n",
" \"description\": \"US State where the incident occured\",\n",
" },\n",
" \"aircraft\": {\"type\": \"string\"},\n",
" \"aircraftDamage\": {\"type\": \"string\"},\n",
" \"injuries\": {\"type\": \"string\"},\n",
" \"definingEvent\": {\"type\": \"string\"},\n",
" },\n",
" \"required\": [\"accidentNumber\", \"dateAndTime\", \"location\", \"aircraft\"],\n",
"}\n",
"\n",
"schema_name = 'FlightAccidentReport'\n",
"property_extractor=LLMPropertyExtractor(llm=llm, num_of_elements=20, schema_name=schema_name, schema=schema)\n",
"schema_name = \"FlightAccidentReport\"\n",
"property_extractor = LLMPropertyExtractor(\n",
" llm=llm, num_of_elements=20, schema_name=schema_name, schema=schema\n",
")\n",
"\n",
"enriched_docset = (\n",
" partitioned_docset\n",
" # Extracts the properties based on the schema defined \n",
" .extract_properties(property_extractor=property_extractor)\n",
"\n",
" # Summarizes images that were extracted using an LLM\n",
" .transform(SummarizeImages, summarizer=LLMImageSummarizer(llm=llm))\n",
" # Extracts the properties based on the schema defined\n",
" .extract_properties(property_extractor=property_extractor)\n",
" # Summarizes images that were extracted using an LLM\n",
" .transform(SummarizeImages, summarizer=LLMImageSummarizer(llm=llm))\n",
")\n",
"\n",
"formatted_docset = (\n",
" enriched_docset\n",
" \n",
" # Converts state abbreviations to their full names.\n",
" .map( lambda doc: ignore_errors(doc, USStateStandardizer, [\"properties\",\"entity\",\"location\"]))\n",
"\n",
" # Converts datetime into a common format\n",
" .map( lambda doc: ignore_errors(doc, DateTimeStandardizer, [\"properties\",\"entity\",\"dateAndTime\"]))\n",
" # Converts state abbreviations to their full names.\n",
" .map(\n",
" lambda doc: ignore_errors(\n",
" doc, USStateStandardizer, [\"properties\", \"entity\", \"location\"]\n",
" )\n",
" )\n",
" # Converts datetime into a common format\n",
" .map(\n",
" lambda doc: ignore_errors(\n",
" doc, DateTimeStandardizer, [\"properties\", \"entity\", \"dateAndTime\"]\n",
" )\n",
" )\n",
")\n",
"\n",
"\n",
"merger = GreedySectionMerger(tokenizer=HuggingFaceTokenizer(\"sentence-transformers/all-MiniLM-L6-v2\"), max_tokens=512)\n",
"merger = GreedySectionMerger(\n",
" tokenizer=HuggingFaceTokenizer(\"sentence-transformers/all-MiniLM-L6-v2\"),\n",
" max_tokens=512,\n",
")\n",
"chunked_docset = formatted_docset.merge(merger=merger)\n",
"\n",
"model_name = \"thenlper/gte-small\"\n",
"\n",
"embedded_docset = chunked_docset.spread_properties([\"entity\", \"path\"]).explode().embed(embedder=SentenceTransformerEmbedder(batch_size=10_000, model_name=model_name))\n",
"embedded_docset = (\n",
" chunked_docset.spread_properties([\"entity\", \"path\"])\n",
" .explode()\n",
" .embed(\n",
" embedder=SentenceTransformerEmbedder(batch_size=10_000, model_name=model_name)\n",
" )\n",
")\n",
"\n",
"embedded_docset = embedded_docset.materialize(path=\"./elasticsearch-tutorial/embedded-docset\", source_mode=sycamore.MATERIALIZE_USE_STORED)\n",
"embedded_docset = embedded_docset.materialize(\n",
" path=\"./elasticsearch-tutorial/embedded-docset\",\n",
" source_mode=sycamore.MATERIALIZE_USE_STORED,\n",
")\n",
"embedded_docset.execute()"
]
},
Expand All @@ -117,21 +148,21 @@
"url = \"http://localhost:9200\"\n",
"index_name = \"aryn-demo\"\n",
"embedded_ds.write.elasticsearch(\n",
" url=url, \n",
" url=url,\n",
" index_name=index_name,\n",
" es_client_args={\"basic_auth\": (<YOUR-USERNAME>, os.getenv(\"ELASTIC_PASSWORD\"))},\n",
" es_client_args={\"basic_auth\": (\"<YOUR-USERNAME>\", os.getenv(\"ELASTIC_PASSWORD\"))},\n",
" mappings={\n",
" \"properties\": {\n",
" \"embeddings\": {\n",
" \"type\": \"dense_vector\",\n",
" \"dims\": dimensions,\n",
" \"index\": True,\n",
" \"similarity\": \"cosine\",\n",
" },\n",
" },\n",
" \"properties\": {\"type\": \"object\"},\n",
" }\n",
" }\n",
" )"
" },\n",
")"
]
},
{
Expand All @@ -143,10 +174,12 @@
"source": [
"# Verify data has been loaded using DocSet Query to retrieve chunks\n",
"query_params = {\"match_all\": {}}\n",
"query_docs = ctx.read.elasticsearch(url=url, \n",
" index_name=index_name, \n",
" query=query_params,\n",
" es_client_args={\"basic_auth\": (“<YOUR-USERNAME>”, os.getenv(\"ELASTIC_PASSWORD\"))}\n",
"query_docs = ctx.read.elasticsearch(\n",
" url=url,\n",
" index_name=index_name,\n",
" query=query_params,\n",
" es_client_args={\"basic_auth\": (\"<YOUR-USERNAME>\", os.getenv(\"ELASTIC_PASSWORD\"))},\n",
")\n",
"query_docs.show(show_embedding=False)"
]
}
Expand Down

0 comments on commit e8c0d30

Please sign in to comment.