|
28 | 28 | except ImportError: |
29 | 29 | Py4JNetworkError = Exception # no py4j |
30 | 30 |
|
| 31 | +# Spark/JVM 실패 시 폴백용: JAVA_GATEWAY_EXITED 등 Py4J 이외 예외도 잡기 위함 |
| 32 | +def _is_spark_or_jvm_error(e: Exception) -> bool: |
| 33 | + msg = str(e).upper() |
| 34 | + return ( |
| 35 | + "JAVA_GATEWAY" in msg |
| 36 | + or "PY4J" in msg |
| 37 | + or "SPARK" in msg |
| 38 | + or isinstance(e, (Py4JNetworkError, BrokenPipeError, ConnectionError, OSError, EOFError)) |
| 39 | + ) |
| 40 | + |
| 41 | + |
| 42 | +def _spark_disabled() -> bool: |
| 43 | + """Docker 등 JVM 없는 환경에서 Spark 비활성화 시 True.""" |
| 44 | + try: |
| 45 | + from .config import Config |
| 46 | + return getattr(Config, "DISABLE_SPARK", False) |
| 47 | + except Exception: |
| 48 | + return False |
| 49 | + |
| 50 | + |
31 | 51 | _spark_session = None |
32 | 52 |
|
33 | 53 |
|
@@ -517,15 +537,18 @@ def calculate_single_restaurant_ratios( |
517 | 537 | texts = [s for s in reviews if s and isinstance(s, str)] |
518 | 538 | if not texts: |
519 | 539 | return {"service": 0.0, "price": 0.0} |
520 | | - if SPARK_AVAILABLE: |
| 540 | + if SPARK_AVAILABLE and not _spark_disabled(): |
521 | 541 | try: |
522 | 542 | spark = _get_spark() |
523 | 543 | rdd = spark.sparkContext.parallelize(texts, numSlices=max(1, min(len(texts) // 50, 32))) |
524 | 544 | out = _spark_calculate_ratios(rdd, stopwords) |
525 | 545 | return {"service": round(out["service"], 2), "price": round(out["price"], 2)} |
526 | | - except (Py4JNetworkError, BrokenPipeError, ConnectionError, OSError, EOFError) as e: |
527 | | - logger.warning("Spark/Py4J 오류, Python 폴백 사용: %s", e) |
528 | | - _reset_spark() |
| 546 | + except Exception as e: |
| 547 | + if _is_spark_or_jvm_error(e): |
| 548 | + logger.warning("Spark/JVM 오류, Python 폴백 사용: %s", e) |
| 549 | + _reset_spark() |
| 550 | + else: |
| 551 | + raise |
529 | 552 | out = _python_calculate_ratios(texts, stopwords) |
530 | 553 | return {"service": round(out["service"], 2), "price": round(out["price"], 2)} |
531 | 554 |
|
@@ -596,6 +619,18 @@ def calculate_all_average_ratios_from_file( |
596 | 619 | logger.warning("pyspark 미설치. calculate_all_average_ratios_from_file 불가.") |
597 | 620 | return None |
598 | 621 |
|
| 622 | + # Docker 등 JVM 없는 환경: Spark 건너뛰고 Python 경로만 사용 |
| 623 | + if _spark_disabled(): |
| 624 | + try: |
| 625 | + rows = load_reviews_from_aspect_data_file(path, project_root) |
| 626 | + texts = [(r.get("content") or r.get("text") or "").strip() for r in rows if isinstance(r, dict)] |
| 627 | + texts = [t for t in texts if t] |
| 628 | + if texts: |
| 629 | + return _python_calculate_ratios(texts, stopwords) |
| 630 | + except Exception as e: |
| 631 | + logger.warning("DISABLE_SPARK 시 Python 경로 실패: %s", e) |
| 632 | + return None |
| 633 | + |
599 | 634 | try: |
600 | 635 | from pyspark.sql.functions import col, length, explode |
601 | 636 |
|
@@ -632,20 +667,20 @@ def calculate_all_average_ratios_from_file( |
632 | 667 |
|
633 | 668 | texts_rdd = base_df.select("text").rdd.map(lambda r: r["text"]) |
634 | 669 | return _spark_calculate_ratios(texts_rdd, stopwords) |
635 | | - except (Py4JNetworkError, BrokenPipeError, ConnectionError, OSError, EOFError) as e: |
636 | | - logger.warning("Spark/Py4J 오류, Python 폴백 시도: %s", e) |
637 | | - _reset_spark() |
638 | | - try: |
639 | | - rows = load_reviews_from_aspect_data_file(path, project_root) |
640 | | - texts = [(r.get("content") or r.get("text") or "").strip() for r in rows if isinstance(r, dict)] |
641 | | - texts = [t for t in texts if t] |
642 | | - if texts: |
643 | | - return _python_calculate_ratios(texts, stopwords) |
644 | | - except Exception as fb: |
645 | | - logger.warning("Python 폴백 실패: %s", fb) |
646 | | - return None |
647 | 670 | except Exception as e: |
648 | | - logger.warning("calculate_all_average_ratios_from_file 실패: %s — %s", path, e) |
| 671 | + if _is_spark_or_jvm_error(e): |
| 672 | + logger.warning("Spark/JVM 오류, Python 폴백 시도: %s", e) |
| 673 | + _reset_spark() |
| 674 | + try: |
| 675 | + rows = load_reviews_from_aspect_data_file(path, project_root) |
| 676 | + texts = [(r.get("content") or r.get("text") or "").strip() for r in rows if isinstance(r, dict)] |
| 677 | + texts = [t for t in texts if t] |
| 678 | + if texts: |
| 679 | + return _python_calculate_ratios(texts, stopwords) |
| 680 | + except Exception as fb: |
| 681 | + logger.warning("Python 폴백 실패: %s", fb) |
| 682 | + else: |
| 683 | + logger.warning("calculate_all_average_ratios_from_file 실패: %s — %s", path, e) |
649 | 684 | return None |
650 | 685 |
|
651 | 686 |
|
@@ -727,16 +762,19 @@ def calculate_all_average_ratios_from_reviews( |
727 | 762 | texts = [t for t in texts if t and isinstance(t, str)] |
728 | 763 | if not texts: |
729 | 764 | return {"service": 0.0, "price": 0.0} |
730 | | - if SPARK_AVAILABLE: |
| 765 | + if SPARK_AVAILABLE and not _spark_disabled(): |
731 | 766 | try: |
732 | 767 | spark = _get_spark() |
733 | 768 | rdd = spark.sparkContext.parallelize( |
734 | 769 | texts, numSlices=max(1, min(len(texts) // 100, 256)) |
735 | 770 | ) |
736 | 771 | return _spark_calculate_ratios(rdd, stopwords) |
737 | | - except (Py4JNetworkError, BrokenPipeError, ConnectionError, OSError, EOFError) as e: |
738 | | - logger.warning("Spark/Py4J 오류, Python 폴백 사용: %s", e) |
739 | | - _reset_spark() |
| 772 | + except Exception as e: |
| 773 | + if _is_spark_or_jvm_error(e): |
| 774 | + logger.warning("Spark/JVM 오류, Python 폴백 사용: %s", e) |
| 775 | + _reset_spark() |
| 776 | + else: |
| 777 | + raise |
740 | 778 | return _python_calculate_ratios(texts, stopwords) |
741 | 779 |
|
742 | 780 |
|
|
0 commit comments