Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/linter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,6 @@ jobs:
run: |
pip install --upgrade "pip>=21.3.1"
- name: Install dependencies
run: make install-go-ci-dependencies
run: make install-go-proto-dependencies
- name: Lint go
run: make lint-go
2 changes: 1 addition & 1 deletion .github/workflows/unit_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ jobs:
with:
go-version: 1.17.7
- name: Install dependencies
run: make install-go-ci-dependencies
run: make install-go-proto-dependencies
- name: Compile protos
run: make compile-protos-go
- name: Test
Expand Down
10 changes: 6 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ build: protos build-java build-docker build-html

# Python SDK

install-python-ci-dependencies: install-go-ci-dependencies
install-python-ci-dependencies: install-go-proto-dependencies
cd sdk/python && python -m piptools sync requirements/py$(PYTHON)-ci-requirements.txt
cd sdk/python && COMPILE_GO=true python setup.py develop

Expand Down Expand Up @@ -125,19 +125,21 @@ build-java-no-tests:

# Go SDK

install-go-ci-dependencies:
install-go-proto-dependencies:
go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.26.0
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.1.0

compile-protos-go: install-go-ci-dependencies
install-protoc-dependencies:
pip install grpcio-tools==1.34.0

compile-protos-go: install-go-proto-dependencies install-protoc-dependencies
python sdk/python/setup.py build_go_protos

compile-go-feature-server: compile-protos-go
go mod tidy
go build -o ${ROOT_DIR}/sdk/python/feast/binaries/goserver github.com/feast-dev/feast/go/cmd/goserver

test-go: install-go-ci-dependencies
test-go: compile-protos-go
go test ./...

format-go:
Expand Down
1 change: 0 additions & 1 deletion go/cmd/goserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ type FeastEnvConfig struct {
}

// TODO: Add a proper logging library such as https://github.com/Sirupsen/logrus

func main() {

var feastEnvConfig FeastEnvConfig
Expand Down
Binary file not shown.
19 changes: 3 additions & 16 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,7 @@ class FeatureStore:

@log_exceptions
def __init__(
self,
repo_path: Optional[str] = None,
config: Optional[RepoConfig] = None,
go_server_use_thread: bool = False,
self, repo_path: Optional[str] = None, config: Optional[RepoConfig] = None,
):
"""
Creates a FeatureStore object.
Expand All @@ -135,7 +132,6 @@ def __init__(
self._registry._initialize_registry()
self._provider = get_provider(self.config, self.repo_path)
self._go_server = None
self._go_server_use_thread = go_server_use_thread

@log_exceptions
def version(self) -> str:
Expand Down Expand Up @@ -1233,11 +1229,8 @@ def get_online_features(
if self.config.go_feature_server:
# Lazily start the go server on the first request
if self._go_server is None:
self._go_server = GoServer(
str(self.repo_path.absolute()),
self.config,
self._go_server_use_thread,
)
self._go_server = GoServer(str(self.repo_path.absolute()), self.config,)
self._go_server._shared_connection._check_grpc_connection()
return self._go_server.get_online_features(
features, columnar, full_feature_names
)
Expand Down Expand Up @@ -1861,12 +1854,6 @@ def kill_go_server(self):
if self._go_server:
self._go_server.kill_go_server_explicitly()

def set_go_server_use_thread(self, use: bool):
if self._go_server:
self._go_server.set_use_thread(use)
else:
self._go_server_use_thread = use


def _validate_entity_values(join_key_values: Dict[str, List[Value]]):
set_of_row_lengths = {len(v) for v in join_key_values.values()}
Expand Down
155 changes: 47 additions & 108 deletions sdk/python/feast/go_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import atexit
import ctypes
import logging
import os
import pathlib
import platform
import random
import shutil
import signal
import string
import subprocess
import tempfile
Expand All @@ -44,13 +41,22 @@
from feast.repo_config import RepoConfig
from feast.type_map import python_values_to_proto_values

_logger = logging.getLogger(__name__)


class GoServerConnection:
def __init__(self, config: RepoConfig, repo_path: str):
self._process: Optional[Popen[bytes]] = None
self._config = config
self._repo_path = repo_path
self.temp_dir = tempfile.TemporaryDirectory()
self._client: Optional[ServingServiceStub] = None

@property
def client(self):
if self._client:
return self._client
raise RuntimeError("Client not established with go subprocess")

def _get_unix_domain_file_path(self) -> Path:
# This method should return a file that go server should listen on and that the python channel
Expand Down Expand Up @@ -81,7 +87,7 @@ def connect(self) -> bool:
self._process = Popen([executable], cwd=cwd, env=env,)

channel = grpc.insecure_channel(f"unix:{self.sock_file}")
self.client: ServingServiceStub = ServingServiceStub(channel)
self._client = ServingServiceStub(channel)

try:
self._check_grpc_connection()
Expand All @@ -91,11 +97,10 @@ def connect(self) -> bool:

def kill_process(self):
if self._process:
# self._process.terminate()
self._process.send_signal(signal.SIGINT)
self._process.terminate()

def is_process_alive(self):
return self._process and self._process.poll()
return self._process and self._process.poll() is None

def wait_for_process(self, timeout):
self._process.wait(timeout)
Expand All @@ -107,7 +112,7 @@ def wait_for_process(self, timeout):
wait=wait_exponential(multiplier=0.1, min=0.1, max=5),
)
def _check_grpc_connection(self):
self.client.GetFeastServingInfo(request=GetFeastServingInfoRequest())
return self.client.GetFeastServingInfo(request=GetFeastServingInfoRequest())


class GoServer:
Expand All @@ -118,57 +123,20 @@ class GoServer:
Attributes:
_repo_path: The path to the Feast repo for which this go server is defined.
_config: The RepoConfig for the Feast repo for which this go server is defined.
_go_server_use_thread: set whether or not to use a background thread to monitor go server
"""

_repo_path: str
_config: RepoConfig
_go_server_use_thread: bool

def __init__(
self, repo_path: str, config: RepoConfig, go_server_use_thread: bool = False,
):
def __init__(self, repo_path: str, config: RepoConfig):
"""Creates a GoServer object."""
self._repo_path = repo_path
self._config = config
self._go_server_started = threading.Event()
self._use_thread = go_server_use_thread
self._shared_connection = GoServerConnection(config, repo_path)
self._dev_mode = "dev" in feast.__version__
if not is_test() and self._dev_mode:
self._build_binaries()

if self._check_use_thread():
self._start_go_server_use_thread()
else:
self._start_go_server()

def _check_use_thread(self):
return self._use_thread

def set_use_thread(self, use: bool):
self._use_thread = use

def _build_binaries(self):

goos = platform.system().lower()
goarch = "amd64" if platform.machine() == "x86_64" else "arm64"
binaries_path = (pathlib.Path(__file__).parent / "../feast/binaries").resolve()
binaries_path_abs = str(binaries_path.absolute())
if binaries_path.exists():
shutil.rmtree(binaries_path_abs)
os.mkdir(binaries_path_abs)

subprocess.check_output(
[
"go",
"build",
"-o",
f"{binaries_path_abs}/goserver_{goos}_{goarch}",
"github.com/feast-dev/feast/go/cmd/goserver",
],
env={"GOOS": goos, "GOARCH": goarch, **os.environ},
)
self._start_go_server_use_thread()

def get_online_features(
self,
Expand Down Expand Up @@ -198,7 +166,7 @@ def get_online_features(
ValueError: If some other error occurs.
"""
# Wait for go server subprocess to restart before asking for features
if self._check_use_thread() and not self._go_server_started.is_set():
if not self._go_server_started.is_set():
self._go_server_started.wait()

request = GetOnlineFeaturesRequest(full_feature_names=full_feature_names)
Expand All @@ -220,9 +188,7 @@ def get_online_features(
if rpc_error.code() == grpc.StatusCode.UNAVAILABLE:
# If the server became unavailable, it could mean that the subprocess died or fell
# into a bad state, so the resolution is to wait for go server to restart in the background
if not self._check_use_thread():
self._start_go_server()
elif not self._go_server_started.is_set():
if not self._go_server_started.is_set():
self._go_server_started.wait()
# Retry request with the new Go subprocess
response = self._shared_connection.client.GetOnlineFeatures(
Expand All @@ -249,96 +215,69 @@ def get_online_features(

def _start_go_server_use_thread(self):

self._go_server_background_thread = GoServerBackgroundThread(
"GoServerBackgroundThread",
self._shared_connection,
self._go_server_started,
self._go_server_background_thread = GoServerMonitorThread(
"GoServerMonitorThread", self._shared_connection, self._go_server_started
)
self._go_server_background_thread.start()
atexit.register(lambda: self._go_server_background_thread.stop_go_server())
signal.signal(
signal.SIGTERM,
lambda sig, frame: self._go_server_background_thread.stop_go_server(),
)
signal.signal(
signal.SIGINT,
lambda sig, frame: self._go_server_background_thread.stop_go_server(),
)
atexit.register(lambda: self._go_server_background_thread.stop())

# Wait for go server subprocess to start for the first time before returning
self._go_server_started.wait()

def _start_go_server(self):
if self._shared_connection.is_process_alive():
self._shared_connection.kill_process()

self._shared_connection.connect()
atexit.register(lambda: self._shared_connection.kill_process())
signal.signal(
signal.SIGTERM, lambda sig, frame: self._shared_connection.kill_process()
)
signal.signal(
signal.SIGINT, lambda sig, frame: self._shared_connection.kill_process()
)

def kill_go_server_explicitly(self):
if self._check_use_thread():
self._go_server_background_thread.stop_go_server()
else:
self._shared_connection.kill_process()
self._go_server_background_thread.stop()
self._go_server_background_thread.join()


# https://www.geeksforgeeks.org/python-different-ways-to-kill-a-thread/
class GoServerBackgroundThread(threading.Thread):
class GoServerMonitorThread(threading.Thread):
def __init__(
self,
name: str,
shared_connection: GoServerConnection,
go_server_started: threading.Event,
go_server_first_started: threading.Event,
):
threading.Thread.__init__(self)
self.name = name
self._shared_connection = shared_connection
self._go_server_started = go_server_started
self._is_cancelled = False
self.daemon = True
self._go_server_started = go_server_first_started

def run(self):
# Target function of the thread class
_logger.debug("Started monitoring thread to keep go feature server alive")
try:
while True:
self._go_server_started.clear()
while not self._is_cancelled:

# If we fail to connect to grpc stub, terminate subprocess and repeat
_logger.info("Connecting to subprocess")
if not self._shared_connection.connect():
_logger.info("Failed to connect, killing and retrying")
self._shared_connection.kill_process()
continue
self._go_server_started.set()
while True:
else:
_logger.debug("Go feature server started")
self._go_server_started.set()
_logger.info("Status: %s", self._is_cancelled)
while not self._is_cancelled:
try:
# Making a blocking wait by setting timeout to a very long time so we don't waste cpu cycle
self._shared_connection.wait_for_process(3600)
except subprocess.TimeoutExpired:
pass
_logger.info(
"No longer waiting for process: %s, %s, %s",
self._shared_connection._process.pid,
self._shared_connection._process.returncode,
self._shared_connection.is_process_alive(),
)
if not self._shared_connection.is_process_alive():
break
finally:
# Main thread exits
self._shared_connection.kill_process()

def stop_go_server(self):
thread_id = self._get_id()
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
thread_id, ctypes.py_object(SystemExit)
)
# TODO: Review that kill process here but run also has to stop
if res > 1:
ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, 0)
else:
self._shared_connection.kill_process()

def _get_id(self):
# returns id of the respective thread
if hasattr(self, "_thread_id"):
return self._thread_id
for id, thread in threading._active.items():
if thread is self:
return id
def stop(self):
_logger.info("Stopping monitoring thread and terminating go feature server")
self._is_cancelled = True
self._shared_connection.kill_process()
3 changes: 2 additions & 1 deletion sdk/python/feast/infra/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,5 @@ def _write_registry(self, registry_proto: RegistryProto):
registry_proto.last_updated.FromDatetime(datetime.utcnow())
file_dir = self._filepath.parent
file_dir.mkdir(exist_ok=True)
self._filepath.write_bytes(registry_proto.SerializeToString())
with open(self._filepath, mode="wb", buffering=0) as f:
f.write(registry_proto.SerializeToString())
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
2 changes: 1 addition & 1 deletion sdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def finalize_options(self):

def _generate_python_protos(self, path: str):
proto_files = glob.glob(os.path.join(self.proto_folder, path))

Path(self.python_folder).mkdir(exist_ok=True)
subprocess.check_call(
self.python_protoc
+ [
Expand Down
Loading