Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 51 additions & 47 deletions parsons/targetsmart/targetsmart_smartmatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import tempfile
import time
import uuid
from pathlib import Path

import petl
import requests
Expand Down Expand Up @@ -100,7 +101,6 @@ def __init__(self):
self.connection = None

def _smartmatch_poll(self, poll_url, submit_filename):
download_url = None
while True:
poll_response = requests.get(
poll_url,
Expand Down Expand Up @@ -242,18 +242,21 @@ def smartmatch(
)

# Write Petl table to CSV and upload for SmartMatch to process
with tempfile.NamedTemporaryFile(
tmp = tempfile.NamedTemporaryFile(
mode="w+",
encoding="utf8",
newline="\n",
prefix="smartmatch_input",
suffix=".csv",
dir=tmp_location,
delete=not keep_smartmatch_input_file,
) as tmp:
dataprep_table.tocsv(tmp.name, encoding="utf8")
tmp.flush()
_smartmatch_upload(response_1_info["url"], tmp.name)
delete=False,
)
dataprep_table.tocsv(tmp.name, encoding="utf8")
_smartmatch_upload(response_1_info["url"], tmp.name)

tmp.close()
if not keep_smartmatch_input_file:
Path(tmp.name).unlink()

logger.info(
"The SmartMatch workflow execution has been submitted using file"
Expand All @@ -266,46 +269,47 @@ def smartmatch(

# Download SmartMatch .csv.gz results, decompress, and Petl table wrap.
# The final tmp file cannot be deleted due to Petl tables being lazy.
with tempfile.NamedTemporaryFile(
tmp_gz = tempfile.NamedTemporaryFile(
prefix="smartmatch_output",
suffix=".csv.gz",
dir=tmp_location,
delete=not keep_smartmatch_output_gz_file,
) as tmp_gz:
with tempfile.NamedTemporaryFile(
prefix="smartmatch_output",
suffix=".csv",
dir=tmp_location,
delete=False,
) as tmp_csv:
logger.info(
f"Downloading the '{submit_filename}' SmartMatch results to {tmp_gz.name}."
)
_smartmatch_download(download_url, tmp_gz)
tmp_gz.flush()

logger.info("Decompressing results")
with gzip.open(tmp_gz.name, "rb") as gz_reader:
shutil.copyfileobj(gz_reader, tmp_csv)
tmp_csv.flush()

raw_outtable = petl.fromcsv(tmp_csv.name, encoding="utf8").convert(
INTERNAL_JOIN_ID, int
)
logger.info(
"SmartMatch remote execution successful. Joining results to input table."
)
outtable = (
petl.leftjoin(
input_table,
raw_outtable,
key=INTERNAL_JOIN_ID,
tempdir=tmp_location,
)
.sort(key=INTERNAL_JOIN_ID)
.cutout(INTERNAL_JOIN_ID)
)
if INTERNAL_JOIN_ID_CONFLICT in input_table.fieldnames():
input_table = input_table.rename(INTERNAL_JOIN_ID_CONFLICT, INTERNAL_JOIN_ID)

return Table(outtable)
delete=False,
)

tmp_csv = tempfile.NamedTemporaryFile(
prefix="smartmatch_output",
suffix=".csv",
dir=tmp_location,
delete=False,
)

logger.info(f"Downloading the '{submit_filename}' SmartMatch results to {tmp_gz.name}.")
_smartmatch_download(download_url, tmp_gz)
tmp_gz.flush()

logger.info("Decompressing results")
with gzip.open(tmp_gz.name, "rb") as gz_reader:
shutil.copyfileobj(gz_reader, tmp_csv)
tmp_csv.flush()

tmp_gz.close()
if not keep_smartmatch_output_gz_file:
Path(tmp_gz.name).unlink()
tmp_csv.close()

raw_outtable = petl.fromcsv(tmp_csv.name, encoding="utf8").convert(INTERNAL_JOIN_ID, int)
logger.info("SmartMatch remote execution successful. Joining results to input table.")
outtable = (
petl.leftjoin(
input_table,
raw_outtable,
key=INTERNAL_JOIN_ID,
tempdir=tmp_location,
)
.sort(key=INTERNAL_JOIN_ID)
.cutout(INTERNAL_JOIN_ID)
)
if INTERNAL_JOIN_ID_CONFLICT in input_table.fieldnames():
input_table.rename(INTERNAL_JOIN_ID_CONFLICT, INTERNAL_JOIN_ID)

return Table(outtable)
112 changes: 88 additions & 24 deletions test/test_targetsmart/test_targetsmart_smartmatch.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
import csv
import gzip
import io
import sys
import tempfile
from pathlib import Path
from unittest.mock import MagicMock

import petl
import pytest
from petl.util.base import TableWrapper

from parsons.targetsmart.targetsmart_api import TargetSmartAPI


@pytest.fixture
def intable():
def intable() -> TableWrapper:
return petl.wrap(
[
[
Expand All @@ -29,7 +32,7 @@ def intable():


@pytest.fixture
def raw_outtable(intable):
def raw_outtable(intable: TableWrapper) -> tuple:
return (
intable.addrownumbers(field="ts__input_row")
.addrownumbers(field="ts__row")
Expand All @@ -42,53 +45,114 @@ def raw_outtable(intable):


@pytest.fixture
def prep_intable(intable):
def prep_intable(intable: TableWrapper) -> TableWrapper:
return intable.addrownumbers(field="matchback_id")


@pytest.fixture
def raw_outcsv(raw_outtable):
def raw_outcsv(raw_outtable: tuple) -> str:
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerows(list(raw_outtable))
return buf.getvalue()


@pytest.fixture
def raw_outgz(raw_outcsv):
def raw_outgz(raw_outcsv: str) -> bytes:
buf = io.BytesIO()
with gzip.GzipFile(fileobj=buf, mode="w") as gz:
gz.write(raw_outcsv.encode("utf8"))
return buf.getvalue()


@pytest.fixture
def final_outtable(prep_intable, raw_outtable):
def final_outtable(prep_intable: TableWrapper, raw_outtable: tuple) -> TableWrapper:
return petl.leftjoin(prep_intable, raw_outtable, key="matchback_id").cutout("matchback_id")


@pytest.fixture
def submit_filename():
return "parsons_test.csv"


@pytest.mark.skipif(sys.platform == "win32", reason="need to fix this test on windows")
def test_smartmatch(
intable,
submit_filename,
raw_outgz,
raw_outcsv,
raw_outtable,
final_outtable,
requests_mock,
):
ts = TargetSmartAPI("mockkey")
def smartmatch_requests_mock(requests_mock: MagicMock, raw_outgz: bytes):
resp1 = {"url": "https://mock_smartmatch_upload_endpoint", "error": None}
poll_resp = {"url": "https://mock_smartmatch_download_endpoint", "error": None}
requests_mock.get("https://api.targetsmart.com/service/smartmatch", json=resp1)
requests_mock.put(resp1["url"])
poll_resp = {"url": "https://mock_smartmatch_download_endpoint", "error": None}
requests_mock.get("https://api.targetsmart.com/service/smartmatch/poll", json=poll_resp)
requests_mock.get(poll_resp["url"], content=raw_outgz)
return requests_mock


def test_smartmatch_returned_petl(
intable: TableWrapper,
raw_outgz: bytes,
final_outtable: TableWrapper,
requests_mock: MagicMock,
):
ts = TargetSmartAPI("mockkey")
smartmatch_requests_mock(requests_mock, raw_outgz)

results = ts.smartmatch(intable).to_petl()
assert list(final_outtable) == list(results)


def test_smartmatch_output_csv_exists(
intable: TableWrapper,
raw_outgz: bytes,
requests_mock: MagicMock,
):
ts = TargetSmartAPI("mockkey")
smartmatch_requests_mock(requests_mock, raw_outgz)

temp_dir = tempfile.mkdtemp()
ts.smartmatch(intable, tmp_location=temp_dir)
assert sorted(Path(temp_dir).glob("smartmatch_output*.csv")) != []


def test_smartmatch_keep_smartmatch_input_csv(
intable: TableWrapper,
raw_outgz: bytes,
requests_mock: MagicMock,
):
ts = TargetSmartAPI("mockkey")
smartmatch_requests_mock(requests_mock, raw_outgz)

temp_dir = tempfile.mkdtemp()
ts.smartmatch(intable, tmp_location=temp_dir, keep_smartmatch_input_file=True)
assert sorted(Path(temp_dir).glob("smartmatch_input*.csv")) != []


def test_smartmatch_keep_smartmatch_input_csv_false(
intable: TableWrapper,
raw_outgz: bytes,
requests_mock: MagicMock,
):
ts = TargetSmartAPI("mockkey")
smartmatch_requests_mock(requests_mock, raw_outgz)

temp_dir = tempfile.mkdtemp()
ts.smartmatch(intable, tmp_location=temp_dir, keep_smartmatch_input_file=False)
assert sorted(Path(temp_dir).glob("smartmatch_input*.csv")) == []


def test_smartmatch_keep_smartmatch_output_gz(
intable: TableWrapper,
raw_outgz: bytes,
requests_mock: MagicMock,
):
ts = TargetSmartAPI("mockkey")
smartmatch_requests_mock(requests_mock, raw_outgz)

temp_dir = tempfile.mkdtemp()
ts.smartmatch(intable, tmp_location=temp_dir, keep_smartmatch_output_gz_file=True)
assert sorted(Path(temp_dir).glob("smartmatch_output*.csv.gz")) != []


def test_smartmatch_keep_smartmatch_output_gz_false(
intable: TableWrapper,
raw_outgz: bytes,
requests_mock: MagicMock,
):
ts = TargetSmartAPI("mockkey")
smartmatch_requests_mock(requests_mock, raw_outgz)

temp_dir = tempfile.mkdtemp()
ts.smartmatch(intable, tmp_location=temp_dir, keep_smartmatch_output_gz_file=False)
assert sorted(Path(temp_dir).glob("smartmatch_output*.csv.gz")) == []
Loading