-
Notifications
You must be signed in to change notification settings - Fork 402
/
Copy pathtest_filter.py
174 lines (138 loc) · 6.54 KB
/
test_filter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
import base64
import json
from urllib.error import HTTPError
from urllib.parse import urlencode
from urllib.request import Request, urlopen
import pytest
from assertions import assert_cassette_has_one_response, assert_is_json
import vcr
def _request_with_auth(url, username, password):
request = Request(url)
base64string = base64.b64encode(username.encode("ascii") + b":" + password.encode("ascii"))
request.add_header(b"Authorization", b"Basic " + base64string)
return urlopen(request)
def _find_header(cassette, header):
return any(header in request.headers for request in cassette.requests)
def test_filter_basic_auth(tmpdir, httpbin):
url = httpbin.url + "/basic-auth/user/passwd"
cass_file = str(tmpdir.join("basic_auth_filter.yaml"))
my_vcr = vcr.VCR(match_on=["uri", "method", "headers"])
# 2 requests, one with auth failure and one with auth success
with my_vcr.use_cassette(cass_file, filter_headers=["authorization"]):
with pytest.raises(HTTPError):
resp = _request_with_auth(url, "user", "wrongpasswd")
assert resp.getcode() == 401
resp = _request_with_auth(url, "user", "passwd")
assert resp.getcode() == 200
# make same 2 requests, this time both served from cassette.
with my_vcr.use_cassette(cass_file, filter_headers=["authorization"]) as cass:
with pytest.raises(HTTPError):
resp = _request_with_auth(url, "user", "wrongpasswd")
assert resp.getcode() == 401
resp = _request_with_auth(url, "user", "passwd")
assert resp.getcode() == 200
# authorization header should not have been recorded
assert not _find_header(cass, "authorization")
assert len(cass) == 2
def test_filter_querystring(tmpdir, httpbin):
url = httpbin.url + "/?foo=bar"
cass_file = str(tmpdir.join("filter_qs.yaml"))
with vcr.use_cassette(cass_file, filter_query_parameters=["foo"]):
urlopen(url)
with vcr.use_cassette(cass_file, filter_query_parameters=["foo"]) as cass:
urlopen(url)
assert "foo" not in cass.requests[0].url
def test_filter_post_data(tmpdir, httpbin):
url = httpbin.url + "/post"
data = urlencode({"id": "secret", "foo": "bar"}).encode("utf-8")
cass_file = str(tmpdir.join("filter_pd.yaml"))
with vcr.use_cassette(cass_file, filter_post_data_parameters=["id"]):
urlopen(url, data)
with vcr.use_cassette(cass_file, filter_post_data_parameters=["id"]) as cass:
assert b"id=secret" not in cass.requests[0].body
def test_filter_json_post_data(tmpdir, httpbin):
data = json.dumps({"id": "secret", "foo": "bar"}).encode("utf-8")
request = Request(httpbin.url + "/post", data=data)
request.add_header("Content-Type", "application/json")
cass_file = str(tmpdir.join("filter_jpd.yaml"))
with vcr.use_cassette(cass_file, filter_post_data_parameters=["id"]):
urlopen(request)
with vcr.use_cassette(cass_file, filter_post_data_parameters=["id"]) as cass:
assert b'"id": "secret"' not in cass.requests[0].body
def test_filter_callback(tmpdir, httpbin):
url = httpbin.url + "/get"
cass_file = str(tmpdir.join("basic_auth_filter.yaml"))
def before_record_cb(request):
if request.path != "/get":
return request
# Test the legacy keyword.
my_vcr = vcr.VCR(before_record=before_record_cb)
with my_vcr.use_cassette(cass_file, filter_headers=["authorization"]) as cass:
urlopen(url)
assert len(cass) == 0
my_vcr = vcr.VCR(before_record_request=before_record_cb)
with my_vcr.use_cassette(cass_file, filter_headers=["authorization"]) as cass:
urlopen(url)
assert len(cass) == 0
def test_before_record_interaction(tmpdir, httpbin):
url = httpbin.url + "/get"
cass_file = str(tmpdir.join("basic_auth_filter1.yaml"))
def before_record_interaction_cb(request, response):
if request.path == "/get":
return
return request, response
my_vcr = vcr.VCR(before_record_interaction=before_record_interaction_cb)
with my_vcr.use_cassette(cass_file, filter_headers=["authorization"]) as cass:
urlopen(url)
assert len(cass) == 0
def test_before_record_interaction_override(tmpdir, httpbin):
url = httpbin.url + "/get"
cass_file = str(tmpdir.join("basic_auth_filter2.yaml"))
def before_record_interaction_cb(request, response):
if request.path == "/get":
return
return request, response
my_vcr = vcr.VCR()
with my_vcr.use_cassette(cass_file, before_record_interaction=before_record_interaction_cb) as cass:
urlopen(url)
assert len(cass) == 0
def test_decompress_gzip(tmpdir, httpbin):
url = httpbin.url + "/gzip"
request = Request(url, headers={"Accept-Encoding": ["gzip, deflate"]})
cass_file = str(tmpdir.join("gzip_response.yaml"))
with vcr.use_cassette(cass_file, decode_compressed_response=True):
urlopen(request)
with vcr.use_cassette(cass_file) as cass:
decoded_response = urlopen(url).read()
assert_cassette_has_one_response(cass)
assert_is_json(decoded_response)
def test_decomptess_empty_body(tmpdir, httpbin):
url = httpbin.url + "/gzip"
request = Request(url, headers={"Accept-Encoding": ["gzip, deflate"]}, method="HEAD")
cass_file = str(tmpdir.join("gzip_empty_response.yaml"))
with vcr.use_cassette(cass_file, decode_compressed_response=True):
response = urlopen(request).read()
with vcr.use_cassette(cass_file) as cass:
decoded_response = urlopen(request).read()
assert_cassette_has_one_response(cass)
assert decoded_response == response
def test_decompress_deflate(tmpdir, httpbin):
url = httpbin.url + "/deflate"
request = Request(url, headers={"Accept-Encoding": ["gzip, deflate"]})
cass_file = str(tmpdir.join("deflate_response.yaml"))
with vcr.use_cassette(cass_file, decode_compressed_response=True):
urlopen(request)
with vcr.use_cassette(cass_file) as cass:
decoded_response = urlopen(url).read()
assert_cassette_has_one_response(cass)
assert_is_json(decoded_response)
def test_decompress_regular(tmpdir, httpbin):
"""Test that it doesn't try to decompress content that isn't compressed"""
url = httpbin.url + "/get"
cass_file = str(tmpdir.join("noncompressed_response.yaml"))
with vcr.use_cassette(cass_file, decode_compressed_response=True):
urlopen(url)
with vcr.use_cassette(cass_file) as cass:
resp = urlopen(url).read()
assert_cassette_has_one_response(cass)
assert_is_json(resp)