From 41f5f25e3d8f5be072a2e6941d63abf6035119d6 Mon Sep 17 00:00:00 2001 From: yuli_han Date: Mon, 26 Feb 2024 16:16:43 -0800 Subject: [PATCH] add env as a passing parameter --- api/py/ai/chronon/repo/run.py | 44 +++++++++++++++++++---------------- 1 file changed, 24 insertions(+), 20 deletions(-) diff --git a/api/py/ai/chronon/repo/run.py b/api/py/ai/chronon/repo/run.py index 9d68d09f7..d70885a7f 100755 --- a/api/py/ai/chronon/repo/run.py +++ b/api/py/ai/chronon/repo/run.py @@ -184,7 +184,7 @@ def download_only_once(url, path): @retry_decorator(retries=3, backoff=50) def download_jar(version, jar_type="uber", release_tag=None, spark_version="2.4.0"): assert ( - spark_version in SUPPORTED_SPARK + spark_version in SUPPORTED_SPARK ), f"Received unsupported spark version {spark_version}. Supported spark versions are {SUPPORTED_SPARK}" scala_version = SCALA_VERSION_FOR_SPARK[spark_version] maven_url_prefix = os.environ.get("CHRONON_MAVEN_MIRROR_PREFIX", None) @@ -237,7 +237,8 @@ def set_runtime_env(args): - Environment variables derived from args (like app_name) - conf.metaData.modeToEnvMap for the mode (set on config) - team environment per context and mode set on teams.json - - default team environment per context and mode set on teams.json + - production team environment per mode set on teams.json + - default production team environment per context and mode set on teams.json - Common Environment set in teams.json """ environment = { @@ -245,6 +246,7 @@ def set_runtime_env(args): "conf_env": {}, "default_env": {}, "team_env": {}, + "production_team_env ": {}, "cli_args": {}, } conf_type = None @@ -262,7 +264,7 @@ def set_runtime_env(args): ) if args.conf and effective_mode: try: - context, conf_type, team, _ = args.conf.split("/")[-4:] + _, conf_type, team, _ = args.conf.split("/")[-4:] except Exception as e: logging.error( "Invalid conf path: {}, please ensure to supply the relative path to zipline/ folder".format( @@ -272,6 +274,7 @@ def set_runtime_env(args): raise e if not team: team = "default" + context = args.env logging.info( f"Context: {context} -- conf_type: {conf_type} -- team: {team}" ) @@ -298,8 +301,8 @@ def set_runtime_env(args): environment["team_env"] = ( teams_json[team].get(context, {}).get(effective_mode, {}) ) - environment["dev_team_env"] = ( - teams_json[team].get("dev", {}).get(effective_mode, {}) + environment["production_team_env"] = ( + teams_json[team].get("production", {}).get(effective_mode, {}) ) environment["default_env"] = ( teams_json.get("default", {}) @@ -317,10 +320,10 @@ def set_runtime_env(args): [ k for k in [ - "chronon", - conf_type, - args.mode.replace("-", "_") if args.mode else None, - ] + "chronon", + conf_type, + args.mode.replace("-", "_") if args.mode else None, + ] if k is not None ] ) @@ -329,12 +332,7 @@ def set_runtime_env(args): environment["cli_args"]["CHRONON_DRIVER_JAR"] = args.chronon_jar environment["cli_args"]["CHRONON_ONLINE_JAR"] = args.online_jar environment["cli_args"]["CHRONON_ONLINE_CLASS"] = args.online_class - # If the job is running on airflow, ignore the dev team environment. - if 'AIRFLOW_CTX_EXECUTION_DATE' in os.environ: - order = ["conf_env", "team_env", "default_env", "common_env", "cli_args"] - else: - # If the job is running locally for testing, dev team environment should be prioritized. - order = ["conf_env", "dev_team_env", "team_env", "default_env", "common_env", "cli_args"] + order = ["conf_env", "team_env", "production_team_env", "default_env", "common_env", "cli_args"] print("Setting env variables:") for key in os.environ: if any([key in environment[set_key] for set_key in order]): @@ -375,7 +373,7 @@ def __init__(self, args, jar_path): raise e possible_modes = list(ROUTES[self.conf_type].keys()) + UNIVERSAL_ROUTES assert ( - args.mode in possible_modes + args.mode in possible_modes ), "Invalid mode:{} for conf:{} of type:{}, please choose from {}".format( args.mode, self.conf, self.conf_type, possible_modes ) @@ -451,7 +449,7 @@ def run(self): ) if self.mode == "streaming": assert ( - len(filtered_apps) == 1 + len(filtered_apps) == 1 ), "More than one found, please kill them all" print("All good. No need to start a new app.") return @@ -576,6 +574,12 @@ def set_defaults(parser): required=False, help="Conf param - required for every mode except fetch", ) + parser.add_argument( + "--env", + required=False, + default='dev', + help="Running environment - default to be dev" + ) parser.add_argument("--mode", choices=MODE_ARGS.keys()) parser.add_argument("--ds", help="the end partition to backfill the data") parser.add_argument( @@ -597,7 +601,7 @@ def set_defaults(parser): parser.add_argument( "--online-jar", help="Jar containing Online KvStore & Deserializer Impl. " - + "Used for streaming and metadata-upload mode.", + + "Used for streaming and metadata-upload mode.", ) parser.add_argument( "--online-class", @@ -614,7 +618,7 @@ def set_defaults(parser): parser.add_argument( "--online-jar-fetch", help="Path to script that can pull online jar. " - + "This will run only when a file doesn't exist at location specified by online_jar", + + "This will run only when a file doesn't exist at location specified by online_jar", ) parser.add_argument( "--sub-help", @@ -639,7 +643,7 @@ def set_defaults(parser): parser.add_argument( "--render-info", help="Path to script rendering additional information of the given config. " - + "Only applicable when mode is set to info", + + "Only applicable when mode is set to info", ) set_defaults(parser) pre_parse_args, _ = parser.parse_known_args()