|
8 | 8 |
|
9 | 9 | SEARCH_WORMS_URL = "/searchworms/{}" |
10 | 10 | TAXA_FROM_CENTRAL_URL = "/taxa/pull_from_central" |
| 11 | +TAXON_PUT = "/taxon/central" |
11 | 12 |
|
12 | 13 | ACARTIA_RSP = [ |
13 | 14 | { |
@@ -123,3 +124,56 @@ def test_pull_taxa_update_from_central(fastapi, mocker): |
123 | 124 | rsp = fastapi.get(TAXA_FROM_CENTRAL_URL, headers=ADMIN_AUTH) |
124 | 125 | # assert rsp.status_code == status.HTTP_200_OK |
125 | 126 | assert rsp.json() == {"inserts": 0, "updates": 0, "error": None} |
| 127 | + |
| 128 | + |
| 129 | +def test_add_taxon_in_central(fastapi, mocker): |
| 130 | + # Mock the 'call' method of EcoTaxoServerClient |
| 131 | + mock_call = mocker.patch("providers.EcoTaxoServer.EcoTaxoServerClient.call") |
| 132 | + mock_response = mocker.Mock() |
| 133 | + mock_response.json.return_value = {"msg": "ok", "id": 789999} |
| 134 | + mock_call.return_value = mock_response |
| 135 | + |
| 136 | + params = { |
| 137 | + "name": "NewTaxon", |
| 138 | + "parent_id": 1, |
| 139 | + "taxotype": "P", |
| 140 | + "creator_email": "creator@test.com", |
| 141 | + "source_desc": "Test source", |
| 142 | + "source_url": "http://test.com", |
| 143 | + } |
| 144 | + |
| 145 | + rsp = fastapi.put(TAXON_PUT, params=params, headers=ADMIN_AUTH) |
| 146 | + |
| 147 | + assert rsp.status_code == status.HTTP_200_OK |
| 148 | + assert rsp.json()["msg"] == "ok" |
| 149 | + |
| 150 | + # Verify the mock was called correctly |
| 151 | + # The service adds 'creation_datetime' and 'taxostatus' |
| 152 | + called_args = mock_call.call_args |
| 153 | + assert called_args[0][0] == "/settaxon/" |
| 154 | + sent_params = called_args[0][1] |
| 155 | + assert sent_params["name"] == "NewTaxon" |
| 156 | + assert ( |
| 157 | + sent_params["parent_id"] == "1" |
| 158 | + ) # FastAPI Query params are strings in request.query_params |
| 159 | + assert sent_params["taxotype"] == "P" |
| 160 | + assert sent_params["creator_email"] == "creator@test.com" |
| 161 | + assert sent_params["taxostatus"] == "N" |
| 162 | + assert "creation_datetime" in sent_params |
| 163 | + |
| 164 | + |
| 165 | +def test_add_taxon_in_central_unauthorized(fastapi, mocker): |
| 166 | + # Mock the 'call' method of EcoTaxoServerClient |
| 167 | + mock_call = mocker.patch("providers.EcoTaxoServer.EcoTaxoServerClient.call") |
| 168 | + |
| 169 | + params = { |
| 170 | + "name": "NewTaxonUnauthorized", |
| 171 | + "parent_id": 1, |
| 172 | + "taxotype": "P", |
| 173 | + "creator_email": "creator@test.com", |
| 174 | + } |
| 175 | + |
| 176 | + # Unauthenticated call |
| 177 | + rsp = fastapi.put(TAXON_PUT, params=params) |
| 178 | + assert rsp.status_code == status.HTTP_403_FORBIDDEN |
| 179 | + assert mock_call.call_count == 0 |
0 commit comments