Skip to content

Commit ebf9a42

Browse files
Merge pull request #12 from nirupama-dev/main
Bug fix on pagination and execution history
2 parents 6d27562 + f57b1b7 commit ebf9a42

File tree

20 files changed

+565
-404
lines changed

20 files changed

+565
-404
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ dependencies = [
3434
"google-cloud-compute",
3535
"google-cloud-iam",
3636
"cron-descriptor>=1.4.5",
37+
"gcs-jupyter-plugin",
3738
]
3839
dynamic = ["version", "description", "authors", "urls", "keywords"]
3940

scheduler_jupyter_plugin/controllers/composer.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,21 @@ async def get(self):
3939
except Exception as e:
4040
self.log.exception(f"Error fetching composer environments: {str(e)}")
4141
self.finish({"error": str(e)})
42+
43+
class EnvironmentGetController(APIHandler):
44+
@tornado.web.authenticated
45+
async def get(self):
46+
"""Returns details of composer environment"""
47+
try:
48+
env_name = self.get_argument("env_name")
49+
async with aiohttp.ClientSession() as client_session:
50+
client = composer.Client(
51+
await credentials.get_cached(), self.log, client_session
52+
)
53+
environment = await client.get_environment(env_name)
54+
self.set_header("Content-Type", "application/json")
55+
self.finish(json.dumps(environment, default=lambda x: x.dict()))
56+
except Exception as e:
57+
self.log.exception(f"Error fetching composer environment: {str(e)}")
58+
self.finish({"error": str(e)})
59+

scheduler_jupyter_plugin/dagTemplates/localPythonTemplate-v1.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ def run_notebook_task(**kwargs):
6464
parameters = '''
6565
{{parameters}}
6666
'''
67-
if len(parameters)>0:
67+
if parameters.strip():
6868
parameters_dict = convert_parameters(parameters)
6969
else:
7070
parameters_dict = {}

scheduler_jupyter_plugin/handlers.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ def full_path(name):
156156
"getGcpServiceUrls": UrlHandler,
157157
"log": LogHandler,
158158
"composerList": composer.EnvironmentListController,
159+
"getComposerEnvironment": composer.EnvironmentGetController,
159160
"dagRun": airflow.DagRunController,
160161
"dagRunTask": airflow.DagRunTaskController,
161162
"dagRunTaskLogs": airflow.DagRunTaskLogsController,

scheduler_jupyter_plugin/models/models.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,10 @@ class ComposerEnvironment(BaseModel):
2727
name: str
2828
label: str
2929
description: str
30+
state: str
3031
file_extensions: List[str] # Supported input file types
3132
metadata: Optional[Dict[str, str]] # Optional metadata
33+
pypi_packages: Optional[Dict[str, str]] = None
3234

3335
def __str__(self):
3436
return self.json()

scheduler_jupyter_plugin/services/composer.py

Lines changed: 52 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -67,19 +67,20 @@ async def list_environments(
6767
return environments
6868
else:
6969
environment = resp.get("environments", [])
70-
# Extract the 'name' values from the 'environments' list
71-
names = [env["name"] for env in environment]
72-
# Extract the last value after the last slash for each 'name'
73-
last_values = [name.split("/")[-1] for name in names]
74-
for env in last_values:
75-
name = env
70+
for env in environment:
71+
path = env["name"]
72+
name = env["name"].split("/")[-1]
73+
state = env["state"]
74+
pypi_packages = env.get("config", {}).get("softwareConfig", {}).get("pypiPackages", None)
7675
environments.append(
7776
ComposerEnvironment(
7877
name=name,
7978
label=name,
8079
description=f"Environment: {name}",
80+
state=state,
8181
file_extensions=["ipynb"],
82-
metadata={"path": env},
82+
metadata={"path": path},
83+
pypi_packages=pypi_packages
8384
)
8485
)
8586
return environments
@@ -94,3 +95,47 @@ async def list_environments(
9495
except Exception as e:
9596
self.log.exception(f"Error fetching environments list: {str(e)}")
9697
return {"Error fetching environments list": str(e)}
98+
99+
async def get_environment(
100+
self, env_name
101+
) -> ComposerEnvironment:
102+
try:
103+
environment = {}
104+
composer_url = await urls.gcp_service_url(COMPOSER_SERVICE_NAME)
105+
api_endpoint = f"{composer_url}/v1/{env_name}"
106+
107+
headers = self.create_headers()
108+
async with self.client_session.get(
109+
api_endpoint, headers=headers
110+
) as response:
111+
if response.status == HTTP_STATUS_OK:
112+
resp = await response.json()
113+
if not resp:
114+
return environment
115+
else:
116+
path = resp.get("name")
117+
name = resp.get("name").split("/")[-1]
118+
state = resp.get("state")
119+
pypi_packages = resp.get("config", {}).get("softwareConfig", {}).get("pypiPackages", None)
120+
environment = ComposerEnvironment(
121+
name=name,
122+
label=name,
123+
description=f"Environment: {name}",
124+
state=state,
125+
file_extensions=["ipynb"],
126+
metadata={"path": path},
127+
pypi_packages=pypi_packages
128+
)
129+
130+
return environment
131+
elif response.status == HTTP_STATUS_FORBIDDEN:
132+
resp = await response.json()
133+
return resp
134+
else:
135+
self.log.exception("Error fetching environment")
136+
raise Exception(
137+
f"Error getting composer: {response.reason} {await response.text()}"
138+
)
139+
except Exception as e:
140+
self.log.exception(f"Error fetching environment: {str(e)}")
141+
return {"Error fetching environment": str(e)}

scheduler_jupyter_plugin/tests/test_composer.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,12 @@ def get(self, api_endpoint, headers=None):
3131
{
3232
"environments": [
3333
{
34-
"name": "projects/mock-project/locations/mock-location/environments/env1"
34+
"name": "projects/mock-project/locations/mock-location/environments/env1",
35+
"state": "RUNNING",
3536
},
3637
{
37-
"name": "projects/mock-project/locations/mock-location/environments/env2"
38+
"name": "projects/mock-project/locations/mock-location/environments/env2",
39+
"state": "UPDATING",
3840
},
3941
]
4042
}

src/scheduler/common/SchedulerInteface.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,13 @@ export interface IClusterAPIResponse {
6969
}
7070

7171
export interface IComposerAPIResponse {
72-
name: string;
72+
name: string | '';
7373
label: string;
7474
description: string;
75+
state: string;
7576
file_extensions: [];
7677
metadata: Record<string, never>;
78+
pypi_packages: Record<string, string> | undefined;
7779
}
7880

7981
export interface IDagRunList {

0 commit comments

Comments
 (0)