forked from s7clarke10/pipelinewise-tap-s3-csv
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrun_integration_tests.py
More file actions
72 lines (57 loc) · 2.21 KB
/
run_integration_tests.py
File metadata and controls
72 lines (57 loc) · 2.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import os
import docker
import logging
from subprocess import run, CalledProcessError
# check_container.py
from typing import Optional
def is_container_running(container_name: str) -> Optional[bool]:
"""Verify the status of a container by it's name
:param container_name: the name of the container
:return: boolean or None
"""
RUNNING = "running"
# Connect to Docker using the default socket or the configuration
# in your environment
docker_client = docker.from_env()
# Or give configuration
# docker_socket = "unix://var/run/docker.sock"
# docker_client = docker.DockerClient(docker_socket)
try:
container = docker_client.containers.get(container_name)
except docker.errors.NotFound as exc:
print(f"Check container name!\n{exc.explanation}")
else:
container_state = container.attrs["State"]
return container_state["Status"] == RUNNING
def run_command(command, env=None):
""" Runs requested process with arguments.
Return: returncode of executed program.
"""
logging.debug("Command: {}".format(command))
try:
result = run(command, env=env, shell=False, capture_output=False, check=True)
except FileNotFoundError as bad_cmd:
print("FileNotFound Error")
result = False
except CalledProcessError as exc:
result = False
return result
if __name__ == '__main__':
# Check if docker is available
rc = run_command(['docker', 'info'],env=os.environ)
if rc:
# Check if the Minio Server is running
container_name = "minio_server"
result = is_container_running(container_name)
else:
result = False
if result or 'GITHUB_ACTIONS' in os.environ:
rc = run_command(['poetry', 'run', 'pytest', 'tests/integration'],env=os.environ)
raise SystemExit(rc.returncode)
else:
print('Skipping Integration Tests - minio is not running')
print('-------------------------------------------------')
print('Ensure minio server is running to run these tests.')
print('You can start minio server by these steps:')
print('docker compose up -d')
print('poetry run python run_integration_tests.py')