2828 InsufficientPermissionsError ,
2929 UnexpectedValidationError ,
3030)
31- from common .data_source .interfaces import CheckpointedConnectorWithPermSyncGH , CheckpointOutput
31+ from common .data_source .interfaces import (
32+ CheckpointedConnectorWithPermSyncGH ,
33+ CheckpointOutput ,
34+ CheckpointOutputWrapper ,
35+ )
3236from common .data_source .models import (
3337 ConnectorCheckpoint ,
3438 ConnectorFailure ,
3539 Document ,
3640 DocumentFailure ,
3741 ExternalAccess ,
42+ GenerateSlimDocumentOutput ,
3843 SecondsSinceUnixEpoch ,
44+ SlimDocument ,
3945)
4046from common .data_source .connector_runner import ConnectorRunner
4147from .models import SerializedRepository
@@ -594,14 +600,8 @@ def _fetch_from_github(
594600 done_with_prs = False
595601 num_prs = 0
596602 pr = None
597- print ("start: " , start )
598603 for pr in pr_batch :
599604 num_prs += 1
600- print ("-" * 40 )
601- print ("PR name" , pr .title )
602- print ("updated at" , pr .updated_at )
603- print ("-" * 40 )
604- print ("\n " )
605605 # we iterate backwards in time, so at this point we stop processing prs
606606 if (
607607 start is not None
@@ -732,10 +732,10 @@ def _fetch_from_github(
732732
733733 if checkpoint .cached_repo_ids :
734734 logging .info (
735- f"{ len (checkpoint .cached_repo_ids )} repos remaining (IDs: { checkpoint .cached_repo_ids } )"
735+ f"{ len (checkpoint .cached_repo_ids )} checkpoint repos remaining (IDs: { checkpoint .cached_repo_ids } )"
736736 )
737737 else :
738- logging .info ("No more repos remaining " )
738+ logging .info ("There are no more checkpoint repos left. " )
739739
740740 return checkpoint
741741
@@ -923,6 +923,53 @@ def validate_checkpoint_json(
923923 ) -> GithubConnectorCheckpoint :
924924 return GithubConnectorCheckpoint .model_validate_json (checkpoint_json )
925925
926+ def retrieve_slim_document (
927+ self ,
928+ start : SecondsSinceUnixEpoch | None = None ,
929+ end : SecondsSinceUnixEpoch | None = None ,
930+ callback : Any = None ,
931+ ) -> GenerateSlimDocumentOutput :
932+ start_value = 0.0 if start is None else start
933+ end_value = (
934+ datetime .now (timezone .utc ).timestamp () if end is None else end
935+ )
936+ checkpoint = self .build_dummy_checkpoint ()
937+ slim_batch : list [SlimDocument ] = []
938+
939+ while checkpoint .has_more :
940+ wrapper = CheckpointOutputWrapper [GithubConnectorCheckpoint ]()
941+ for document , failure , next_checkpoint in wrapper (
942+ self .load_from_checkpoint (start_value , end_value , checkpoint )
943+ ):
944+ if failure is not None :
945+ logging .warning (
946+ "GitHub connector failure during slim retrieval: %s" ,
947+ getattr (failure , "failure_message" , failure ),
948+ )
949+ continue
950+
951+ if document is not None :
952+ slim_batch .append (SlimDocument (id = document .id ))
953+ if len (slim_batch ) >= SLIM_BATCH_SIZE :
954+ yield slim_batch
955+ slim_batch = []
956+ if callback :
957+ callback .progress ("github_slim_document" , 1 )
958+
959+ if next_checkpoint is not None :
960+ checkpoint = next_checkpoint
961+
962+ if slim_batch :
963+ yield slim_batch
964+
965+ def retrieve_all_slim_docs_perm_sync (
966+ self ,
967+ start : SecondsSinceUnixEpoch | None = None ,
968+ end : SecondsSinceUnixEpoch | None = None ,
969+ callback : Any = None ,
970+ ) -> GenerateSlimDocumentOutput :
971+ yield from self .retrieve_slim_document (start = start , end = end , callback = callback )
972+
926973 def build_dummy_checkpoint (self ) -> GithubConnectorCheckpoint :
927974 return GithubConnectorCheckpoint (
928975 stage = GithubConnectorStage .PRS , curr_page = 0 , has_more = True , num_retrieved = 0
@@ -970,4 +1017,4 @@ def build_dummy_checkpoint(self) -> GithubConnectorCheckpoint:
9701017 if failure :
9711018 print (f"Failure: { failure .failure_message } " )
9721019 if next_checkpoint :
973- checkpoint = next_checkpoint
1020+ checkpoint = next_checkpoint
0 commit comments