Skip to content

Commit 2b91890

Browse files
authored
feat/support iteratively deleting cache (#123)
* Support iteratively deleting cache * update download step to return download dir for cache dir * Have each step define method to delete their cache * Conforma all logs to use lowercase
1 parent 5879abf commit 2b91890

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

64 files changed

+202
-133
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
## 0.0.15-dev2
1+
## 0.0.15-dev3
22

33
### Fixes
44

@@ -8,6 +8,7 @@
88
### Enhancements
99

1010
* **Migrate airtable connector to v2**
11+
* **Support iteratively deleting cached content** Add a flag to delete cached content once it's no longer needed for systems that are limited in memory.
1112

1213
## 0.0.14
1314

unstructured_ingest/__version__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "0.0.15-dev2" # pragma: no cover
1+
__version__ = "0.0.15-dev3" # pragma: no cover

unstructured_ingest/connector/astradb.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ def check_connection(self):
222222
raise DestinationConnectionError(f"failed to validate connection: {e}")
223223

224224
def write_dict(self, *args, elements_dict: t.List[t.Dict[str, t.Any]], **kwargs) -> None:
225-
logger.info(f"Inserting / updating {len(elements_dict)} documents to Astra DB.")
225+
logger.info(f"inserting / updating {len(elements_dict)} documents to Astra DB.")
226226

227227
astra_db_batch_size = self.write_config.batch_size
228228

unstructured_ingest/connector/biomed.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ def cleanup_file(self):
123123
and self.filename.is_file()
124124
and not self.read_config.download_only
125125
):
126-
logger.debug(f"Cleaning up {self}")
126+
logger.debug(f"cleaning up {self}")
127127
Path.unlink(self.filename)
128128

129129
@SourceConnectionError.wrap
@@ -132,12 +132,12 @@ def get_file(self):
132132
download_path = self.file_meta.download_filepath # type: ignore
133133
dir_ = Path(os.path.dirname(download_path)) # type: ignore
134134
if not dir_.is_dir():
135-
logger.debug(f"Creating directory: {dir_}")
135+
logger.debug(f"creating directory: {dir_}")
136136

137137
if dir_:
138138
dir_.mkdir(parents=True, exist_ok=True)
139139
self._retrieve()
140-
logger.debug(f"File downloaded: {self.file_meta.download_filepath}")
140+
logger.debug(f"file downloaded: {self.file_meta.download_filepath}")
141141

142142
@SourceConnectionNetworkError.wrap
143143
def _retrieve(self):
@@ -229,7 +229,7 @@ def _list_objects(self) -> t.List[BiomedFileMeta]:
229229

230230
def traverse(path, download_dir, output_dir):
231231
full_path = Path(PMC_DIR) / path
232-
logger.debug(f"Traversing directory: {full_path}")
232+
logger.debug(f"traversing directory: {full_path}")
233233

234234
ftp = FTP(DOMAIN)
235235
ftp.login()

unstructured_ingest/connector/chroma.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ def prepare_chroma_list(chunk: t.Tuple[t.Dict[str, t.Any]]) -> t.Dict[str, t.Lis
139139
return chroma_dict
140140

141141
def write_dict(self, *args, elements_dict: t.List[t.Dict[str, t.Any]], **kwargs) -> None:
142-
logger.info(f"Inserting / updating {len(elements_dict)} documents to destination ")
142+
logger.info(f"inserting / updating {len(elements_dict)} documents to destination ")
143143

144144
chroma_batch_size = self.write_config.batch_size
145145

unstructured_ingest/connector/fsspec/fsspec.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ def does_path_match_glob(self, path: str) -> bool:
221221
for pattern in patterns:
222222
if fnmatch.filter([path], pattern):
223223
return True
224-
logger.debug(f"The file {path!r} is discarded as it does not match any given glob.")
224+
logger.debug(f"the file {path!r} is discarded as it does not match any given glob.")
225225
return False
226226

227227
def get_ingest_docs(self):
@@ -328,7 +328,7 @@ def write_dict(
328328
**self.connector_config.get_access_config(),
329329
)
330330

331-
logger.info(f"Writing content using filesystem: {type(fs).__name__}")
331+
logger.info(f"writing content using filesystem: {type(fs).__name__}")
332332

333333
output_folder = self.connector_config.path_without_protocol
334334
output_folder = os.path.join(output_folder) # Make sure folder ends with file separator

unstructured_ingest/connector/git.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,5 +120,5 @@ def does_path_match_glob(self, path: str) -> bool:
120120
for pattern in patterns:
121121
if fnmatch.filter([path], pattern):
122122
return True
123-
logger.debug(f"The file {path!r} is discarded as it does not match any given glob.")
123+
logger.debug(f"the file {path!r} is discarded as it does not match any given glob.")
124124
return False

unstructured_ingest/connector/google_drive.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -222,15 +222,15 @@ def get_file(self):
222222
dir_ = Path(self.meta["download_dir"])
223223
if dir_:
224224
if not dir_.is_dir():
225-
logger.debug(f"Creating directory: {self.meta.get('download_dir')}")
225+
logger.debug(f"creating directory: {self.meta.get('download_dir')}")
226226

227227
if dir_:
228228
dir_.mkdir(parents=True, exist_ok=True)
229229

230230
with open(self.filename, "wb") as handler:
231231
handler.write(file.getbuffer())
232232
saved = True
233-
logger.debug(f"File downloaded: {self.filename}.")
233+
logger.debug(f"file downloaded: {self.filename}.")
234234
if not saved:
235235
logger.error(f"Error while downloading and saving file: {self.filename}.")
236236

@@ -241,7 +241,7 @@ def write_result(self):
241241
self._output_filename.parent.mkdir(parents=True, exist_ok=True)
242242
with open(self._output_filename, "w") as output_f:
243243
output_f.write(json.dumps(self.isd_elems_no_filename, ensure_ascii=False, indent=2))
244-
logger.info(f"Wrote {self._output_filename}")
244+
logger.info(f"wrote {self._output_filename}")
245245

246246

247247
@dataclass

unstructured_ingest/connector/hubspot.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ def get_ingest_docs(self):
271271

272272
ingest_docs: t.List[HubSpotIngestDoc] = []
273273
for obj_name, obj_method in obj_method_resolver.items():
274-
logger.info(f"Retrieving - {obj_name}")
274+
logger.info(f"retrieving - {obj_name}")
275275
results: t.List[HubSpotIngestDoc] = obj_method() # type: ignore
276276
ingest_docs += results # type: ignore
277277

unstructured_ingest/connector/kafka.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ def check_connection(self):
114114

115115
def initialize(self):
116116
topic = self.connector_config.topic
117-
logger.info(f"Subscribing to topic: {topic}")
117+
logger.info(f"subscribing to topic: {topic}")
118118
self.kafka_consumer.subscribe([topic])
119119

120120
@property
@@ -149,7 +149,7 @@ def create_consumer(self) -> "Consumer":
149149
conf["sasl.password"] = secret
150150

151151
consumer = Consumer(conf)
152-
logger.debug(f"Kafka Consumer connected to bootstrap: {bootstrap}")
152+
logger.debug(f"kafka consumer connected to bootstrap: {bootstrap}")
153153
return consumer
154154

155155
@SourceConnectionError.wrap
@@ -161,7 +161,7 @@ def get_ingest_docs(self):
161161

162162
collected = []
163163
num_messages_to_consume = self.connector_config.num_messages_to_consume
164-
logger.info(f"Config set for blocking on {num_messages_to_consume} messages")
164+
logger.info(f"config set for blocking on {num_messages_to_consume} messages")
165165
# Consume specified number of messages
166166
while running:
167167
msg = consumer.poll(timeout=self.connector_config.timeout)
@@ -178,7 +178,7 @@ def get_ingest_docs(self):
178178
else:
179179
collected.append(json.loads(msg.value().decode("utf8")))
180180
if len(collected) >= num_messages_to_consume:
181-
logger.debug(f"Found {len(collected)} messages, stopping")
181+
logger.debug(f"found {len(collected)} messages, stopping")
182182
consumer.commit(asynchronous=False)
183183
break
184184

@@ -243,7 +243,7 @@ def create_producer(self) -> "Producer":
243243
conf["sasl.password"] = secret
244244

245245
producer = Producer(conf)
246-
logger.debug(f"Connected to bootstrap: {bootstrap}")
246+
logger.debug(f"connected to bootstrap: {bootstrap}")
247247
return producer
248248

249249
def check_connection(self):
@@ -255,7 +255,7 @@ def check_connection(self):
255255

256256
@DestinationConnectionError.wrap
257257
def upload_msg(self, batch) -> int:
258-
logger.debug(f"Uploading batch: {batch}")
258+
logger.debug(f"uploading batch: {batch}")
259259
topic = self.connector_config.topic
260260
producer = self.kafka_producer
261261
uploaded = 0
@@ -267,15 +267,15 @@ def upload_msg(self, batch) -> int:
267267

268268
@DestinationConnectionError.wrap
269269
def write_dict(self, *args, dict_list: t.List[t.Dict[str, t.Any]], **kwargs) -> None:
270-
logger.info(f"Writing {len(dict_list)} documents to Kafka")
270+
logger.info(f"writing {len(dict_list)} documents to Kafka")
271271
num_uploaded = 0
272272

273273
for chunk in batch_generator(dict_list, self.write_config.batch_size):
274274
num_uploaded += self.upload_msg(chunk) # noqa: E203
275275

276276
producer = self.kafka_producer
277277
producer.flush()
278-
logger.info(f"Uploaded {num_uploaded} documents to Kafka")
278+
logger.info(f"uploaded {num_uploaded} documents to Kafka")
279279

280280
def write(self, docs: t.List[BaseIngestDoc]) -> None:
281281
content_list: t.List[t.Dict[str, t.Any]] = []

0 commit comments

Comments
 (0)