Skip to content

Metadata test harness #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 102 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
Show all changes
102 commits
Select commit Hold shift + click to select a range
4add0cb
Testing new quoting fixes for DB creds.
valayDave Sep 2, 2021
f0caa22
Fixed Docker image based go image.
valayDave Sep 3, 2021
e656dd9
reverting old commit
valayDave Sep 3, 2021
1e13c4e
Creating shell of a mini test harness with docker
valayDave Sep 3, 2021
98ca3d2
added main command in env builder.
valayDave Sep 3, 2021
c511caf
Added comments.
valayDave Sep 3, 2021
aa10b7f
import tweek ?
valayDave Sep 3, 2021
6f71d2c
fix.
valayDave Sep 3, 2021
4826bab
Fixing bug >?
valayDave Sep 3, 2021
221848f
bug fix
valayDave Sep 3, 2021
858edfa
fix
valayDave Sep 3, 2021
3d895f8
Added Fixes to test harness shell.
valayDave Sep 3, 2021
fd54555
Added logs and comments.
valayDave Sep 3, 2021
2f8bf2e
Got test harness kinda working
valayDave Sep 4, 2021
8d20390
Testing buffer name changes
valayDave Sep 4, 2021
729f141
Fixing bugs
valayDave Sep 4, 2021
7581109
Fixing temp versioning arguement.
valayDave Sep 4, 2021
a83779b
Probable bug fix for logs ?
valayDave Sep 4, 2021
ea157ab
added log
valayDave Sep 4, 2021
3e369b0
prettyinh logs
valayDave Sep 4, 2021
cff02b4
Dependency baking.
valayDave Sep 4, 2021
0ec7da0
Fixing more requirements.
valayDave Sep 4, 2021
1716c2c
temporary fix.
valayDave Sep 4, 2021
1ea130b
added db port tweeks.
valayDave Sep 4, 2021
6aac834
adding meaningless log to debug
valayDave Sep 4, 2021
a95228b
added logs
valayDave Sep 4, 2021
c84aaff
logging fix and tweek
valayDave Sep 4, 2021
94e1e0e
logs and fixes.
valayDave Sep 4, 2021
64d0047
trying bug fix.
valayDave Sep 4, 2021
f4edac5
Increasing timeouts to ip resolution
valayDave Sep 4, 2021
59a685c
Fixing ip resolution
valayDave Sep 4, 2021
5ab06de
logging bug fix
valayDave Sep 4, 2021
2b45c3d
adding sleep to fix init?
valayDave Sep 4, 2021
e7cb452
bug fix?
valayDave Sep 4, 2021
11b6e9a
Hard baking deps
valayDave Sep 4, 2021
9ebdfaa
tweek in dockerfile
valayDave Sep 4, 2021
aaf5331
checking with changed image
valayDave Sep 4, 2021
2fd0dab
fix
valayDave Sep 4, 2021
4846490
fixing bug
valayDave Sep 4, 2021
3564103
Removing pointless logs.
valayDave Sep 4, 2021
e847b52
Tweeking fixes.
valayDave Sep 4, 2021
06ceb54
tweek
valayDave Sep 4, 2021
dca6e0d
Tweek.
valayDave Sep 4, 2021
64ea2f4
adding logs + hygenie refactor
valayDave Sep 4, 2021
bbaf0a4
bug fix
valayDave Sep 4, 2021
0ac2726
hygenie refactor;
valayDave Sep 4, 2021
4c9c613
bug fix
valayDave Sep 4, 2021
a134e84
minimizing timeouts
valayDave Sep 4, 2021
57b3e4f
Adding logs to return responses
valayDave Sep 4, 2021
2053f47
increasing timeout between flows.
valayDave Sep 4, 2021
d5ea731
changing to MP pool to test
valayDave Sep 4, 2021
511aec4
testing tweek
valayDave Sep 4, 2021
b2516f0
rolling back change
valayDave Sep 4, 2021
2eca2e4
testing mp pool
valayDave Sep 4, 2021
db03f19
adding job lib to scheduler
valayDave Sep 4, 2021
9ef43f0
reducing concurrency
valayDave Sep 4, 2021
e271664
changing parallel to serial
valayDave Sep 4, 2021
c466a88
added error handle
valayDave Sep 4, 2021
cee2c43
logging
valayDave Sep 4, 2021
3e2d213
tsting tweek password
valayDave Sep 4, 2021
31e52cc
Refactored code to decouple execution
valayDave Sep 4, 2021
bfc1517
bug fix.
valayDave Sep 4, 2021
b257e7f
adding dummy default password
valayDave Sep 4, 2021
e4d5d9a
uncoupling goose version for test
valayDave Sep 4, 2021
f1f48a8
Unpinning dependences to check effects on ttests.
valayDave Sep 4, 2021
e116f32
testing psycopg2 make dsn fn
valayDave Sep 6, 2021
f019ec1
fixiing bug
valayDave Sep 6, 2021
3a34f64
Reverting requirements to version freeze.
valayDave Sep 6, 2021
a1a6475
Decouping docker filee
valayDave Sep 6, 2021
916bfb0
Added version spec for quick tests
valayDave Sep 6, 2021
f169aa2
Tweeking password for test.
valayDave Sep 6, 2021
d252c3e
bug fix
valayDave Sep 6, 2021
c2d1d59
Adding log printing in tests.
valayDave Sep 6, 2021
34cb302
Testing new logging change
valayDave Sep 6, 2021
ee445a5
adding hygenie checks for replaying env
valayDave Sep 6, 2021
95378aa
added to do.
valayDave Sep 6, 2021
b27383f
Following changes:
valayDave Sep 7, 2021
31150ac
Dumb log
valayDave Sep 7, 2021
d7e1dca
dummy log
valayDave Sep 7, 2021
026a7a2
Fixing bug with port exposure.
valayDave Sep 7, 2021
add4e8a
dummy print statement.
valayDave Sep 7, 2021
fbaf173
Dummy log
valayDave Sep 7, 2021
b66e1f7
dummy log
valayDave Sep 7, 2021
2c19ae7
Possible fix to Shell problem ?
valayDave Sep 7, 2021
4e84ffe
Unbundling Goose Dep
valayDave Sep 7, 2021
b0dac42
Moving to unpinned deps in requirements
valayDave Sep 7, 2021
82b1e38
Reverting useless log
valayDave Sep 7, 2021
8e4836a
Added changes relating to connectionstring.
valayDave Sep 8, 2021
1a9a3fa
fixing bug in conn str ?
valayDave Sep 8, 2021
3198ff2
Adding more complexity to test
valayDave Sep 8, 2021
1626fc0
fixed imports;
valayDave Sep 8, 2021
b641cb1
removing pointless imports;
valayDave Sep 8, 2021
71877ab
added dummy logs
valayDave Sep 8, 2021
30d2a99
Tweeking connection string.
valayDave Sep 9, 2021
34f6ea5
Refactored goose cmd code for shlex.
valayDave Sep 9, 2021
4ecbdca
Fixing import error .
valayDave Sep 9, 2021
cf49f1f
Removed quotes.
valayDave Sep 9, 2021
291d16e
Refactored Exception to make it simple
valayDave Sep 14, 2021
9abaf24
Merge branch 'master' into metadata-test-harness
valayDave Oct 12, 2021
e9bc2d9
removing dead code + fixing warnings.
valayDave Oct 12, 2021
6e4a9a2
Added MF versions.
valayDave Oct 12, 2021
ba5f58d
Tweek to the test to ensure run 0 is captured.
valayDave Oct 12, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
272 changes: 272 additions & 0 deletions docker-tests/env_buider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
import abc
import shutil
import docker
import click
from docker.errors import BuildError, NotFound, APIError
import time
import traceback

from docker.models.containers import Container
from versioned_tests import EnvConfig,MFTestRunner,METAFLOW_VERSIONS
POSTGRES_IMAGE = 'postgres:latest'
import os
class IpNotResolved(Exception):
headline = 'IP Address of Container Unresolvable'
def __init__(self, container_name='',container_id='', lineno=None):
self.message = f"Cannot Resolve IP Address of container : {container_name} : {container_id}"
self.line_no = lineno
super(IpNotResolved, self).__init__()

class DockerTestEnvironment:
"""
Lifecycle :
create_env
--> setup network
--> setup image of the repo
--> create database container
--> create mdservice container from image

run_tests
--> MFTestRunner
-->
-->
-->

teardown_env
--> stop all containers
--> remove all containers
--> remove the network
--> remove the image
"""

def __init__(self,\
versions = METAFLOW_VERSIONS,\
flow_dir = './test_flows',\
temp_env_store='./tmp_verions',\
max_ip_wait_time = 20,\
database_name='metaflow',
database_password='password',
database_user='metaflow',
dont_remove_containers=False,
database_port = 5432,
logger=None,
with_md_logs = True,
image_name = 'metaflow_metadata_service',
network_name='postgres-network',\
image_build_path='../',\
docker_file_path=os.path.abspath('../Dockerfile')) -> None:

self._docker = docker.DockerClient(base_url='unix://var/run/docker.sock')

self._logger = logger if logger is not None else lambda *args:print(*args)
self._with_md_logs = with_md_logs
self._dont_remove_containers = dont_remove_containers
# Network Related Properties
self._network = None
self._network_name = network_name

# database related configurations.
self._database_container = None
self._database_container_name = 'testharness-postgres'
self._database_name = database_name
self._database_password = database_password
self._database_user = database_user
self._database_port = database_port
self._max_ip_wait_time = max_ip_wait_time

# Configuration related to the containers for the test harness.
self._docker_file_path = docker_file_path
self._image_name = image_name # Image of MD Service
self._metadataservice_container = None
self._metadata_image = None
self._image_build_path = image_build_path
self._metadataservice_name = 'testharness-metadataservice'

# Local Test Harness Related Configuration
self._flow_dir = flow_dir
self._mf_versions = versions
self._temp_env_store = temp_env_store

def lifecycle(self):
# Create the network and the image.
self._logger('Creating New Environment',fg='green')
self._create_enivornment()
try:
self._logger('Environment Created, Now Running Tests',fg='green')
self._run_tests()
self._logger('Finished Running Test ! Wohoo!',fg='green')
except Exception as e:
error_string = traceback.format_exc()
self._logger(f'Something Failed ! {error_string}',fg='red')
finally:
self._logger("Tearing down environment",fg='green')
self._teardown_environment()


def _run_tests(self):
url = f"http://localhost:8080/"
test_runner = MFTestRunner(
self._flow_dir,
EnvConfig(
datastore='local',
metadata='service',
metadata_url=url
),
versions=self._mf_versions,
temp_env_store=self._temp_env_store,
)
test_results = test_runner.run_tests()
for res in test_results:
message = f"Successfully executed flow {res['flow']}/{res['run']} with Metaflow version {res['version']}"
fg='green'
if not res['success']:
message = f"Failed in executing flow {res['flow']}/{res['run']} with Metaflow version {res['version']}"
fg='red'
self._logger(message,fg=fg)

def _teardown_environment(self):
container_set = [
self._metadataservice_container,
self._database_container
]
self._logger('Stopping all containers',fg='blue')
if self._with_md_logs:
md_logs = str(self._metadataservice_container.logs().decode('utf-8'))
self._logger(f'Metadata Logs :: \n {md_logs}',fg='blue')

# If we set an arg not remove containers.
if not self._dont_remove_containers :
# first stop all containers
for container in container_set:
container.stop(timeout=10)
container.reload()

self._logger('Removing all containers',fg='blue')
# Then remove all the containers
for container in container_set:
container.remove()

self._logger('Removing Network',fg='blue')
# Remove the network
self._network.remove()

self._logger('Removing Docker Images',fg='blue')
# remove the images.
self._docker.images.remove(self._metadata_image.id)

# remove temporary directory of MF versions
is_present = False
try:
os.stat(self._temp_env_store)
is_present = True
except FileNotFoundError as e:
pass
if is_present:
shutil.rmtree(self._temp_env_store)
shutil.rmtree('.metaflow')


def _db_env_vars(self):
return dict(
POSTGRES_USER=self._database_user,
POSTGRES_PASSWORD=self._database_password ,
POSTGRES_DB=self._database_name
)

def _resolve_ipaddr(self,container:Container,wait_time=None):
# Wait for 20 seconds until the IP addr of the
# database container is available
wait_time = wait_time if wait_time is not None else self._max_ip_wait_time
for i in range(wait_time):
try:
ipaddr = container.attrs['NetworkSettings']['Networks'][self._network_name]['IPAddress']
except KeyError:
ipaddr = ''

if ipaddr == '':
self._logger(f"Couldn't resolve IP Address for container {container.name} of image {container.image.tags}. Waiting for {wait_time-i} seconds",fg='red')
container.reload()
else:
return ipaddr
time.sleep(1)
raise IpNotResolved(container_name=container.name,container_id=container.id)

def _mdcontainer_env_vars(self):
ip_addr = None
ip_addr = self._resolve_ipaddr(self._database_container,wait_time=120)
self._logger(f'Using DB Ip Address {ip_addr} ',fg='green')
return dict(
MF_METADATA_DB_HOST = ip_addr,
MF_METADATA_DB_PORT = self._database_port,
MF_METADATA_DB_USER = self._database_user,
MF_METADATA_DB_PSWD = self._database_password,
MF_METADATA_DB_NAME = self._database_name,
)

def _mdservice_ports(self):
return {
'8082/tcp':8082,
'8080/tcp':8080,
}
def _db_ports(self):
return {
f'{self._database_port}/tcp':self._database_port,
}

def _create_enivornment(self):
try:
# If .metaflow is found then remove it
# this is done for fresh tests not conflicting with old tests
os.stat('.metaflow')
shutil.rmtree('.metaflow')
except FileNotFoundError:
pass
self._logger('Creating a network',fg='blue')
self._network = self._find_network()

if self._network is None:
self._network = self._create_network()

# Build the image of the Metadata service.
self._logger('Building Metadata Image',fg='blue')
self._metadata_image = self._build_mdservice_image()

self._logger('Creating Postgres Docker Container',fg='blue')
# Create the Postgres container
self._database_container = self._docker.containers.run(POSTGRES_IMAGE,\
detach=True,\
ports=self._db_ports(),\
name=self._database_container_name,\
environment=self._db_env_vars(),\
network=self._network_name,)

time.sleep(5)
self._logger('Creating Metadata Service Container',fg='blue')
# Create the metadata service container
self._metadataservice_container = self._docker.containers.run(self._image_name,\
detach=True,\
stdin_open=True,\
tty=True,\
environment=self._mdcontainer_env_vars(),\
network=self._network_name,\
ports=self._mdservice_ports(),\
)

time.sleep(5)

def _build_mdservice_image(self):
image,log_generator = self._docker.images.build(path=self._image_build_path,\
dockerfile=self._docker_file_path,\
tag=self._image_name,)
return image

def _find_network(self):
try:
return self._docker.networks.get(self._network_name)
except NotFound as e:
return None
except:
raise

def _create_network(self):
return self._docker.networks.create(self._network_name)
2 changes: 2 additions & 0 deletions docker-tests/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
docker
metaflow
45 changes: 45 additions & 0 deletions docker-tests/run_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from env_buider import DockerTestEnvironment,METAFLOW_VERSIONS
import click
import os

@click.command()
@click.option('--database-password',default="ByvI)Sr_uamaPx$w&Xp_LoB*DVBzTO+3oK{Z_Nw4SRcxut?-B>h]&WD}_mU!AgOm'\"")
@click.option('--flow-dir',default='./test_flows')
@click.option('--with-md-logs',is_flag=True)
@click.option('--dont-remove-containers',is_flag=True)
@click.option('--temp-env-store',default='./tmp_verions')
@click.option('--database-name',default='metaflow')
@click.option('--database-user',default='metaflow')
@click.option('--database-port',default=5432)
@click.option('--versions',default=','.join(METAFLOW_VERSIONS))
@click.option('--image-build-path',default='../')
@click.option('--docker-file-path',default=os.path.abspath('../Dockerfile'))
def run_tests(database_password=None, \
flow_dir='./test_flows', \
temp_env_store='./tmp_verions', \
database_name='metaflow', \
versions=None,\
dont_remove_containers=False,
with_md_logs = False,\
database_user='metaflow', \
database_port=5432,
image_build_path='../',
docker_file_path=os.path.abspath('../Dockerfile')):
test_runner = DockerTestEnvironment(
logger=click.secho,
database_password=database_password,\
flow_dir = flow_dir,\
with_md_logs= with_md_logs,\
versions=versions.split(','),
dont_remove_containers=dont_remove_containers,\
temp_env_store = temp_env_store,\
database_name = database_name,\
database_user = database_user,\
database_port = database_port,\
image_build_path = image_build_path,\
docker_file_path = docker_file_path,\
)
test_runner.lifecycle()

if __name__ == '__main__':
run_tests()
58 changes: 58 additions & 0 deletions docker-tests/test_flows/helloaws_testflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from metaflow import FlowSpec, step, batch, retry,current

class MultipleVersionFlow(FlowSpec):
"""
A flow where Metaflow prints 'Metaflow says Hi from AWS!'

Run this flow to validate your AWS configuration.

"""
@step
def start(self):
"""
The 'start' step is a regular step, so runs locally on the machine from
which the flow is executed.

"""
from metaflow import get_metadata

print("HelloAWS is starting.")
print("")
print("Using metadata provider: %s" % get_metadata())
print("")
print("The start step is running locally. Next, the ")
print("'hello' step will run remotely on AWS batch. ")
print("If you are running in the Netflix sandbox, ")
print("it may take some time to acquire a compute resource.")
self.x = [i for i in range(10)]
self.next(self.hello,foreach='x')

@step
def hello(self):
"""
This steps runs remotely on AWS batch using 1 virtual CPU and 500Mb of
memory. Since we are now using a remote metadata service and data
store, the flow information and artifacts are available from
anywhere. The step also uses the retry decorator, so that if something
goes wrong, the step will be automatically retried.

"""
self.message = 'Hi from AWS!'
self.next(self.join)

@step
def join(self,inputs):
self.next(self.end)

@step
def end(self):
"""
The 'end' step is a regular step, so runs locally on the machine from
which the flow is executed.

"""
print("MultipleVersionFlow is finished.")


if __name__ == '__main__':
MultipleVersionFlow()
305 changes: 305 additions & 0 deletions docker-tests/versioned_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,305 @@
# This is the a test runner for various metaflow versions. It will run test cases with different versions of the metaflow clients.
# Very barebone; Needs polishing to make it very efficient and customizable.
import multiprocessing
import os
import glob
from sys import version
import venv
import shutil
import subprocess
from multiprocessing import Process
import logging
import click
import time
import re
import json
WORKFLOW_EXTRACT_REGEX = re.compile('\(run-id (?P<runid>[a-zA-Z0-9_-]+)',re.IGNORECASE)
FLOW_EXTRACTOR_REGEX = re.compile('^(\S+) (\S+) (\S+) (?P<flow>[A-Za-z0-9_]+) (\S+) (\S+)',re.IGNORECASE)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

def load_json(pth):
with open(pth,'r') as f:
return json.load(f)

def save_json(data,pth):
with open(pth,'w') as f:
json.dump(data,f)

METAFLOW_VERSIONS = [
"2.0.1",
"2.0.2",
"2.0.3",
"2.0.4",
"2.0.5",
"2.1.0",
"2.1.1",
"2.2.0",
"2.2.1",
"2.2.2",
"2.2.3",
"2.2.4",
"2.2.5",
"2.2.6",
"2.2.7",
"2.2.8",
"2.2.9",
"2.2.10",
"2.2.11",
"2.2.12",
"2.2.13",
"2.3.0",
"2.3.1",
"2.3.2",
"2.3.3",
"2.3.4",
"2.3.5",
"2.3.6",
"2.4.0",
]
def create_logger(logger_name:str,level=logging.INFO):
custom_logger = logging.getLogger(logger_name)
ch1 = logging.StreamHandler()
ch1.setLevel(level)
ch1.setFormatter(formatter)
custom_logger.addHandler(ch1)
custom_logger.setLevel(level)
return custom_logger


def is_present(pth):
try:
os.stat(pth)
return True
except:
return False

class EnvConfig:
def __init__(self,\
datastore='local',\
s3_datastore_root=None,\
s3_datatools_root=None,\
metadata='local',\
metadata_url=None) -> None:
self.datastore= datastore
self.s3_datastore_root= s3_datastore_root
self.metadata= metadata
self.metadata_url= metadata_url
self.s3_datatools_root= s3_datatools_root
self._validate()

def _validate(self):
if self.datastore == 's3':
assert self.s3_datastore_root is not None
assert self.s3_datatools_root is not None
if self.metadata == 'service':
assert self.metadata_url is not None and self.metadata != 'local'


def get_env(self):
env_dict = {}
if self.datastore == 'local':
env_dict['METAFLOW_DEFAULT_DATASTORE'] = 'local'
else:
env_dict['METAFLOW_DEFAULT_DATASTORE'] = 's3'
env_dict['METAFLOW_DATASTORE_SYSROOT_S3']= self.s3_datastore_root
env_dict['METAFLOW_DATATOOLS_S3ROOT']= self.s3_datastore_root

if self.metadata == 'local':
env_dict['METAFLOW_DEFAULT_METADATA'] = 'local'
else:
env_dict['METAFLOW_DEFAULT_METADATA'] = 'service'
env_dict['METAFLOW_SERVICE_URL'] = self.metadata_url
return env_dict

class TestEnvironment:
# this will create a session id
def __init__(self,version_number,temp_dir_name,env_config:EnvConfig):
self.session_id =self.session_id_hash(version_number)
self.version_number=version_number
self.parent_dir = temp_dir_name
self.env_path = os.path.join(temp_dir_name,self.session_id)
self.env_config = env_config
self.python_path = os.path.join(
self.env_path,
'bin',
'python'
)
self.pip_path = os.path.join(
self.env_path,
'bin',
'pip'
)

@staticmethod
def session_id_hash(version_number):
return hex(hash(version_number))

def execute_flow(self,file_pth,batch=False,):
cmd = [
self.python_path,
file_pth,
'run',
]
if batch:
cmd+=['--with','batch']
env = {}
env.update({k: os.environ[k] for k in os.environ if k not in env})
env.update(self.env_config.get_env())
env["PYTHONPATH"] = self.python_path
run_response,fail = self._run_command(cmd,env)
return dict(success=not fail,**self._get_runid(run_response))


def _get_runid(self,run_response):
# Todo Improve ways to get the runID's
lines = run_response.decode('utf-8').split('\n')
# print(lines)
flow,runid=None,None
try:
runidstr=run_response.decode('utf-8').split(' Workflow starting ')[1]
datadict = WORKFLOW_EXTRACT_REGEX.match(runidstr).groupdict()
runid = datadict['runid']
flow = FLOW_EXTRACTOR_REGEX.match(lines[0]).groupdict()['flow']
except IndexError:
pass
return dict(
flow=flow,
run=runid,
logs = run_response.decode('utf-8')
)


def _run_command(self,cmd,env):
fail = False
try:
rep = subprocess.check_output(
cmd,
env=env,stderr=subprocess.STDOUT,
)
except subprocess.CalledProcessError as e:
rep = e.output
fail=True
return rep,fail

def to_dict(self):
return dict(
env_path=self.env_path,
version = self.version_number,
)

def __enter__(self):
env_builder = venv.EnvBuilder(with_pip=True)
env_builder.create(self.env_path)
# Install verion of Metaflow over here.
pip_install_cmd = [
self.pip_path,
'install',
f'metaflow=={self.version_number}'
]
env = {}
env.update({k: os.environ[k] for k in os.environ if k not in env})
env["PYTHONPATH"] = self.python_path
self._run_command(pip_install_cmd,env)
return self

def __exit__(self, exc_type, exc_value, exc_traceback):
shutil.rmtree(self.env_path)


class FlowInstanceTest:
# this should use the TestEnvironment and run the actual test
def __init__(self,version_number,flow_files,temp_dir,envionment_config) -> None:
# Process.__init__(self,daemon=False,) # Making it a deamon process.
self.version_number,self.flow_files = version_number,flow_files
self.temp_dir=temp_dir
self._version = version_number
self.envionment_config = envionment_config
self.logger = create_logger(self.logger_name)

@property
def logger_name(self):
return f'FlowInstanceTest-{TestEnvironment.session_id_hash(self._version)}-{self._version}'

@property
def saved_file_name(self):
return f'{self.logger_name}.json'

def metadata(self):
import json
return f"""
Metaflow Version : {self.version_number}
Configuration Variables:
{json.dumps(self.envionment_config.get_env(),indent=4)}
"""

def run(self):
"""run
"""
with TestEnvironment(self.version_number,self.temp_dir,self.envionment_config) as env:
test_res_data = []
for file in self.flow_files:
# env.execute_flow should be blocking.
flow_exec_resp = env.execute_flow(file)
env_info = env.to_dict()
env_info.update(flow_exec_resp)
env_info.update(dict(file_name=file))
test_res_data.append(env_info)
# Todo : Create script to manipulate the run flow
# todo : Manage Local datastore/metadata
self.logger.debug(f"Ran Flow File : {file} On Version : {self.version_number}")
self.logger.debug(f"Saving File : {file} On Version : {self.version_number}")
filename = self.saved_file_name
save_json(test_res_data,filename)
return filename


def run_test(version_number,flow_files,temp_dir,envionment_config):
return FlowInstanceTest(
version_number,flow_files,temp_dir,envionment_config
).run()

class MFTestRunner:

def __init__(self,\
flow_dir,\
envionment_config:EnvConfig,
max_concurrent_tests= 2,\
versions=METAFLOW_VERSIONS,\
temp_env_store='./tmp_verions') -> None:
self.flow_files = glob.glob(os.path.join(flow_dir,"*_testflow.py"))
self.versions = versions
self._max_concurrent_tests = max_concurrent_tests
self.envionment_config = envionment_config
# Todo : figure test concurrency
# todo assert versions are the same as `METAFLOW_VERSIONS`
self.temp_env_store = temp_env_store
assert not is_present(temp_env_store),"temp directory should be empty"
os.makedirs(self.temp_env_store)
assert len(self.flow_files) > 0, "Provide a directory with *_testflow.py as files; These files should contain metaflow flows"

def _make_tests(self):
return [(version,self.flow_files,self.temp_env_store,self.envionment_config) \
for version in self.versions]

def run_tests(self):
# create a session Id for each test
# Make a virtual environment in the same name in temp dir
tests = self._make_tests()
results = []
for test in tests:
try:
p = run_test(*test)
results.extend(load_json(p))
except Exception as e:
print(e)
shutil.rmtree(self.temp_env_store)
return results


# def run_tests():
# test_runner = MFTestRunner('./test_flows',EnvConfig(),versions=METAFLOW_VERSIONS,)
# test_runner.run_tests()

# if __name__ == '__main__':
# run_tests()
5 changes: 4 additions & 1 deletion services/migration_service/run_script.py
Original file line number Diff line number Diff line change
@@ -5,23 +5,26 @@
my_env = os.environ
migration_server_process = Popen("PYTHONPATH=/ python3 -m services.migration_service.migration_server", shell=True,
close_fds=True, env=my_env)

print("Started the Migration Server")
get_env_version = Popen(
"python3 -m services.migration_service.get_virtual_env",
shell=True,
close_fds=True)

get_env_version.wait()

print("Extracted the version and Environment")
# read in version of metadata service to load
version_value_file = open('/root/services/migration_service/config', 'r')
version_value = str(version_value_file.read()).strip()

print("Found Version Value : ",version_value)
# start proper version of metadata service
virtual_env_path = '/opt/' + version_value
my_env['VIRTUAL_ENV'] = '/opt/' + version_value
path = my_env['PATH']
my_env['PATH'] = virtual_env_path + "/bin:" + path
print("Starting Metadata service ")
metadata_server_process = Popen(
"metadata_service", shell=True,
close_fds=True, env=my_env)