Skip to content

Commit 2369fc2

Browse files
committed
FLEX: Support Interactive both in gsctl and coordinator side
1 parent f64e089 commit 2369fc2

21 files changed

+1389
-65
lines changed

coordinator/gscoordinator/coordinator.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
from graphscope.config import Config
3131
from graphscope.proto import coordinator_service_pb2_grpc
3232

33+
from gscoordinator.servicer import init_interactive_service_servicer
3334
from gscoordinator.servicer import init_graphscope_one_service_servicer
3435
from gscoordinator.utils import GS_GRPC_MAX_MESSAGE_LENGTH
3536

@@ -109,6 +110,7 @@ def get_servicer(config: Config):
109110
"""Get servicer of specified solution under FLEX architecture"""
110111
service_initializers = {
111112
"GraphScope One": init_graphscope_one_service_servicer,
113+
"Interactive": init_interactive_service_servicer,
112114
}
113115

114116
initializer = service_initializers.get(config.solution)
Lines changed: 260 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
#! /usr/bin/env python3
2+
# -*- coding: utf-8 -*-
3+
#
4+
# Copyright 2023 Alibaba Group Holding Limited.
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
19+
import datetime
20+
import json
21+
import time
22+
from abc import ABCMeta
23+
from abc import abstractmethod
24+
25+
import schedule
26+
from schedule import CancelJob
27+
28+
from gscoordinator.stoppable_thread import StoppableThread
29+
from gscoordinator.utils import decode_datetimestr
30+
31+
32+
class Schedule(object):
33+
"""Schedule class that wrapper dbader schedule
34+
35+
Repo: https://github.com/dbader/schedule.
36+
"""
37+
38+
def __init__(self):
39+
self._schedule = schedule.Scheduler()
40+
self._run_pending_thread = StoppableThread(target=self.run_pending, args=())
41+
self._run_pending_thread.daemon = True
42+
self._run_pending_thread.start()
43+
44+
@property
45+
def schedule(self):
46+
return self._schedule
47+
48+
def run_pending(self):
49+
"""Run all jobs that are scheduled to run."""
50+
while True:
51+
try:
52+
self._schedule.run_pending()
53+
time.sleep(1)
54+
except: # noqa: E722
55+
pass
56+
57+
58+
schedule = Schedule().schedule # noqa: F811
59+
60+
61+
class Scheduler(metaclass=ABCMeta):
62+
"""
63+
Objects instantiated by the :class:`Scheduler <Scheduler>` are
64+
factories to create jobs, keep record of scheduled jobs and
65+
handle their execution in the :method:`run` method.
66+
"""
67+
68+
def __init__(self, at_time, repeat):
69+
# scheduler id
70+
self._scheduler_id = "Job-scheduler-{0}".format(
71+
datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
72+
)
73+
# periodic job as used
74+
self._job = None
75+
# true will be run immediately
76+
self._run_now = False
77+
# time at which this job to schedule
78+
self._at_time = self._decode_datetimestr(at_time)
79+
# repeat every day or week, or run job once(no repeat)
80+
# optional value "day", "week", "null"
81+
self._repeat = repeat
82+
# job running thread, note that:
83+
# the last job should be end of execution at the beginning of the next job
84+
self._running_thread = None
85+
# tags
86+
self._tags = []
87+
88+
# when the job actually scheduled, the following variables will be generated and overridden.
89+
self._jobid = None
90+
self._last_run = None
91+
92+
def _decode_datetimestr(self, datetime_str):
93+
if datetime_str == "now":
94+
self._run_now = True
95+
return datetime.datetime.now()
96+
return decode_datetimestr(datetime_str)
97+
98+
def __str__(self):
99+
return "Scheduler(at_time={}, repeat={})".format(self._at_time, self._repeat)
100+
101+
@property
102+
def monday(self):
103+
return self._at_time.weekday() == 0
104+
105+
@property
106+
def tuesday(self):
107+
return self._at_time.weekday() == 1
108+
109+
@property
110+
def wednesday(self):
111+
return self._at_time.weekday() == 2
112+
113+
@property
114+
def thursday(self):
115+
return self._at_time.weekday() == 3
116+
117+
@property
118+
def friday(self):
119+
return self._at_time.weekday() == 4
120+
121+
@property
122+
def saturday(self):
123+
return self._at_time.weekday() == 5
124+
125+
@property
126+
def sunday(self):
127+
return self._at_time.weekday() == 6
128+
129+
@property
130+
def timestr(self):
131+
"""return str of the time object.
132+
time([hour[, minute[, second[, microsecond[, tzinfo]]]]]) --> a time object
133+
"""
134+
return str(self._at_time.time())
135+
136+
@property
137+
def job(self):
138+
"""A periodic job managed by the dbader scheduler.
139+
https://github.com/dbader/schedule.
140+
"""
141+
return self._job
142+
143+
@property
144+
def jobid(self):
145+
"""id for the last scheduled job"""
146+
return self._jobid
147+
148+
@property
149+
def schedulerid(self):
150+
"""id for the scheduler"""
151+
return self._scheduler_id
152+
153+
@property
154+
def last_run(self):
155+
"""datetime of the last run"""
156+
return self._last_run
157+
158+
@property
159+
def tags(self):
160+
return self._tags
161+
162+
@property
163+
def running_thread(self):
164+
return self._running_thread
165+
166+
def run_once(self):
167+
"""Run the job immediately."""
168+
self.do_run()
169+
return CancelJob
170+
171+
def waiting_until_to_run(self):
172+
"""Run the job once at a specific time."""
173+
if datetime.datetime.now() >= self._at_time:
174+
return self.run_once()
175+
176+
def do_run(self):
177+
"""Start a thread for the job."""
178+
# overwrite for each scheduled job
179+
self._jobid = "job-{0}".format(
180+
datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
181+
)
182+
self._last_run = datetime.datetime.now()
183+
# schedule in a thread
184+
self._running_thread = StoppableThread(target=self.run, args=())
185+
self._running_thread.daemon = True
186+
self._running_thread.start()
187+
188+
def submit(self):
189+
if not self._run_now and self._repeat not in ["week", "day", "null", None]:
190+
raise RuntimeError(
191+
"Submit schedule job failed: at_time is '{0}', repeat is '{1}'".format(
192+
self._at_time, self._repeat
193+
)
194+
)
195+
196+
if self._run_now:
197+
self._job = schedule.every().seconds.do(self.run_once)
198+
199+
if not self._run_now and self._repeat == "week":
200+
if self.monday:
201+
self._job = schedule.every().monday.at(self.timestr).do(self.do_run)
202+
elif self.tuesday:
203+
self._job = schedule.every().tuesday.at(self.timestr).do(self.do_run)
204+
elif self.wednesday:
205+
self._job = schedule.every().wednesday.at(self.timestr).do(self.do_run)
206+
elif self.thursday:
207+
self._job = schedule.every().thursday.at(self.timestr).do(self.do_run)
208+
elif self.friday:
209+
self._job = schedule.every().friday.at(self.timestr).do(self.do_run)
210+
elif self.saturday:
211+
self._job = schedule.every().saturday.at(self.timestr).do(self.do_run)
212+
elif self.sunday:
213+
self._job = schedule.every().sunday.at(self.timestr).do(self.do_run)
214+
215+
if not self._run_now and self._repeat == "day":
216+
self._job = schedule.every().day.at(self.timestr).do(self.do_run)
217+
218+
if not self._run_now and self._repeat in ["null", None]:
219+
self._job = (
220+
schedule.every().day.at(self.timestr).do(self.waiting_until_to_run)
221+
)
222+
223+
# tag
224+
self._job.tag(self._scheduler_id, *self._tags)
225+
226+
def start(self):
227+
"""Submit and schedule the job."""
228+
self.submit()
229+
230+
def cancel(self):
231+
"""
232+
Set the running job thread stoppable and wait for the
233+
thread to exit properly by using join() method.
234+
"""
235+
if self._running_thread is not None and self._running_thread.is_alive():
236+
self._running_thread.stop()
237+
self._running_thread.join()
238+
239+
@abstractmethod
240+
def run(self):
241+
"""
242+
Methods that all subclasses need to implement, note that
243+
subclass needs to handle exception by itself.
244+
"""
245+
raise NotImplementedError
246+
247+
248+
def cancel_job(job, delete_scheduler=True):
249+
"""
250+
Cancel the job which going to scheduled or cancel the whole scheduler.
251+
252+
Args:
253+
job: Periodic job as used by :class:`Scheduler`.
254+
delete_scheduler: True will can the whole scheduler, otherwise,
255+
delay the next-run time by on period.
256+
"""
257+
if delete_scheduler:
258+
schedule.cancel_job(job)
259+
else:
260+
job.next_run += job.period

coordinator/gscoordinator/servicer/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,5 @@
1616
# limitations under the License.
1717
#
1818

19+
from gscoordinator.servicer.interactive.service import *
1920
from gscoordinator.servicer.graphscope_one.service import *
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
#! /usr/bin/env python3
2+
# -*- coding: utf-8 -*-
3+
#
4+
# Copyright 2023 Alibaba Group Holding Limited.
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
19+
import atexit
20+
import logging
21+
22+
from graphscope.config import Config
23+
from graphscope.proto import coordinator_service_pb2_grpc
24+
from graphscope.proto import message_pb2
25+
26+
27+
class BaseServiceServicer(coordinator_service_pb2_grpc.CoordinatorServiceServicer):
28+
"""Base class of coordinator service"""
29+
30+
def __init__(self, config: Config):
31+
self._config = config
32+
atexit.register(self.cleanup)
33+
34+
def __del__(self):
35+
self.cleanup()
36+
37+
def Connect(self, request, context):
38+
return message_pb2.ConnectResponse(solution=self._config.solution)
39+
40+
@property
41+
def launcher_type(self):
42+
return self._config.launcher_type
43+
44+
def cleanup(self):
45+
pass
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
#! /usr/bin/env python3
2+
# -*- coding: utf-8 -*-
3+
#
4+
# Copyright 2023 Alibaba Group Holding Limited.
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
19+
import os
20+
import sys
21+
22+
try:
23+
sys.path.insert(0, os.path.dirname(__file__))
24+
import interactive_client
25+
except ImportError:
26+
raise

0 commit comments

Comments
 (0)