1616# Configuração de diretórios
1717# ------------------------------------------------------------
1818BASE_DIR = Path (__file__ ).resolve ().parent .parent
19- RAW_DB_PATH = BASE_DIR / "db" / "YouTubeStatsPipeline .sqlite3"
20- OUTPUT_PATH = BASE_DIR / "data" / "processed" / "transcripts_limpos2Metric .csv"
19+ RAW_DB_PATH = BASE_DIR / "db" / "YouTubeStatsPipe2 .sqlite3"
20+ OUTPUT_PATH = BASE_DIR / "data" / "processed" / "transcripts_limpos4ComMetric .csv"
2121
2222# Configuração de memória - otimizada para 32GB RAM
2323INITIAL_CHUNK_SIZE = 2000 # Aumentado de 500 para 2000
@@ -238,7 +238,7 @@ def preprocess_text_batch(texts, batch_size: int = 64, n_process: int = N_PROCES
238238# ------------------------------------------------------------
239239# Pipeline
240240# ------------------------------------------------------------
241- def process_chunk (chunk , start_date , end_date ):
241+ def process_chunk (chunk ):
242242 """Processa um chunk de dados."""
243243 # Converte 'publishedAt' para datetime, tratando erros
244244 chunk ['publishedAt' ] = pd .to_datetime (chunk ['publishedAt' ], errors = 'coerce' )
@@ -249,41 +249,37 @@ def process_chunk(chunk, start_date, end_date):
249249 if metric in chunk .columns :
250250 chunk [metric ] = pd .to_numeric (chunk [metric ], errors = 'coerce' )
251251
252- # Filtra o DataFrame para o período desejado
253- mask = (chunk ['publishedAt' ] >= start_date ) & (chunk ['publishedAt' ] < end_date )
254- df_filtrado = chunk [mask ].copy ()
255-
256- if len (df_filtrado ) == 0 :
252+ if len (chunk ) == 0 :
257253 return None
258254
259- print (f"Processando chunk com { len (df_filtrado )} registros..." )
255+ print (f"Processando chunk com { len (chunk )} registros..." )
260256
261257 # Lista as colunas presentes para verificação
262- print (f"Colunas disponíveis: { df_filtrado .columns .tolist ()} " )
258+ print (f"Colunas disponíveis: { chunk .columns .tolist ()} " )
263259
264260 # Verifica se as métricas de engajamento estão presentes
265- metrics_present = [metric for metric in engagement_metrics if metric in df_filtrado .columns ]
261+ metrics_present = [metric for metric in engagement_metrics if metric in chunk .columns ]
266262 if metrics_present :
267263 print (f"Métricas de engajamento incluídas: { metrics_present } " )
268264 else :
269265 print ("AVISO: Nenhuma métrica de engajamento encontrada nos dados." )
270266
271267 # Mais otimizações para usar mais memória disponível
272- df_filtrado ["cleanTranscript" ] = preprocess_text_batch (
273- df_filtrado ["videoTranscript" ].tolist (),
268+ chunk ["cleanTranscript" ] = preprocess_text_batch (
269+ chunk ["videoTranscript" ].tolist (),
274270 batch_size = 64 ,
275271 n_process = N_PROCESS ,
276272 show_progress = True
277273 )
278274
279275 # Remove a coluna videoTranscript para economizar espaço
280- df_filtrado = df_filtrado .drop (columns = ['videoTranscript' ])
276+ chunk = chunk .drop (columns = ['videoTranscript' ])
281277
282278 # Garantir que valores nulos nas métricas sejam substituídos por zeros
283279 for metric in metrics_present :
284- df_filtrado [metric ] = df_filtrado [metric ].fillna (0 ).astype (int )
280+ chunk [metric ] = chunk [metric ].fillna (0 ).astype (int )
285281
286- return df_filtrado
282+ return chunk
287283
288284def main ():
289285 # Add global declaration for CHUNK_SIZE
@@ -353,22 +349,33 @@ def main():
353349 print (f"Reduzindo para { CHUNK_SIZE } registros por chunk." )
354350 continue # Tente novamente com chunk menor
355351
356- processed_chunk = process_chunk (chunk , start_date , end_date )
352+ processed_chunk = process_chunk (chunk )
357353
358354 # Liberar memória do chunk original imediatamente
359355 del chunk
360356 gc .collect ()
361357
362358 if processed_chunk is not None and len (processed_chunk ) > 0 :
363- # Verificar e informar métricas disponíveis
359+ # Filtra o DataFrame para o período desejado APÓS o processamento
360+ mask = (processed_chunk ['publishedAt' ] >= start_date ) & (processed_chunk ['publishedAt' ] < end_date )
361+ df_filtrado = processed_chunk [mask ].copy ()
362+
363+ if len (df_filtrado ) == 0 :
364+ print ("Nenhum registro no período para este chunk." )
365+ del processed_chunk
366+ del df_filtrado
367+ gc .collect ()
368+ offset += CHUNK_SIZE
369+ continue
370+
364371 engagement_metrics = ['viewCount' , 'likeCount' , 'commentCount' ]
365- metrics_present = [metric for metric in engagement_metrics if metric in processed_chunk .columns ]
372+ metrics_present = [metric for metric in engagement_metrics if metric in df_filtrado .columns ]
366373
367374 if metrics_present :
368375 print (f"Exportando com métricas de engajamento: { metrics_present } " )
369376 # Mostrar estatísticas básicas
370377 for metric in metrics_present :
371- print (f" - { metric } : média = { processed_chunk [metric ].mean ():.1f} , máx = { processed_chunk [metric ].max ()} " )
378+ print (f" - { metric } : média = { df_filtrado [metric ].mean ():.1f} , máx = { df_filtrado [metric ].max ()} " )
372379 else :
373380 print ("AVISO: Nenhuma métrica de engajamento será exportada." )
374381
@@ -377,9 +384,9 @@ def main():
377384 header = first_chunk
378385
379386 # Escrever em pedaços maiores para economizar operações de I/O
380- write_chunk_size = min (500 , len (processed_chunk )) # Aumentado de 100 para 500
381- for i in range (0 , len (processed_chunk ), write_chunk_size ):
382- sub_df = processed_chunk .iloc [i :i + write_chunk_size ]
387+ write_chunk_size = min (500 , len (df_filtrado )) # Aumentado de 100 para 500
388+ for i in range (0 , len (df_filtrado ), write_chunk_size ):
389+ sub_df = df_filtrado .iloc [i :i + write_chunk_size ]
383390 sub_df .to_csv (
384391 OUTPUT_PATH ,
385392 index = False ,
@@ -395,14 +402,16 @@ def main():
395402 if first_chunk :
396403 first_chunk = False
397404
398- total_processed += len (processed_chunk )
399- print (f"Salvos { len (processed_chunk )} registros no arquivo. Total: { total_processed } " )
405+ total_processed += len (df_filtrado )
406+ print (f"Salvos { len (df_filtrado )} registros no arquivo. Total: { total_processed } " )
400407
401408 chunks_processed += 1
402409 offset += CHUNK_SIZE
403410
404411 # Liberar memória do chunk processado explicitamente
405412 del processed_chunk
413+ if 'df_filtrado' in locals ():
414+ del df_filtrado
406415 gc .collect ()
407416
408417 # Verificar uso de memória e ajustar o tamanho do chunk se necessário
0 commit comments