Skip to content

Test compression strategies #353

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 138 additions & 0 deletions examples/compression_example/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
from snowplow_tracker import (
Tracker,
Emitter,
Subject,
SelfDescribingJson,
PageView,
SelfDescribing,
)
import snowflake.connector
import brotli
import gzip


TRACKER_COLS = [
"PLATFORM",
"EVENT",
"USER_ID",
"USER_IPADDRESS",
"DOMAIN_USERID",
"DOMAIN_SESSIONIDX",
"NETWORK_USERID",
"PAGE_URL",
"PAGE_TITLE",
"PAGE_REFERRER",
"BR_LANG",
"BR_COOKIES",
"BR_COLORDEPTH",
"BR_VIEWWIDTH",
"BR_VIEWHEIGHT",
"OS_TIMEZONE",
"DVCE_SCREENWIDTH",
"DVCE_SCREENHEIGHT",
"DOC_CHARSET",
"DOC_WIDTH",
"DOC_HEIGHT",
"DVCE_SENT_TSTAMP",
"DOMAIN_SESSIONID",
"TRUE_TSTAMP",
"CONTEXTS_COM_SNOWPLOWANALYTICS_SNOWPLOW_WEB_PAGE_1",
"CONTEXTS_SNOWPLOWANALYTICS_COM_USER_1",
"UNSTRUCT_EVENT_SNOWPLOWANALYTICS_COM_IMPRESSION_1",
"UNSTRUCT_EVENT_SNOWPLOWANALYTICS_COM_CLICK_1",
"UNSTRUCT_EVENT_SNOWPLOWANALYTICS_COM_ACTIVATION_1",
"UNSTRUCT_EVENT_SNOWPLOWANALYTICS_COM_VIEW_1",
"UNSTRUCT_EVENT_SNOWPLOWANALYTICS_COM_DISMISS_1",
"UNSTRUCT_EVENT_SNOWPLOWANALYTICS_COM_PRODUCT_CLICK_1",
"CONTEXTS_SNOWPLOWANALYTICS_COM_PRODUCT_1",
"CONTEXTS_SNOWPLOWANALYTICS_COM_CART_1",
"CONTEXTS_SNOWPLOWANALYTICS_COM_CHECKOUT_STEP_1",
"CONTEXTS_SNOWPLOWANALYTICS_COM_TRANSACTION_1",
"event_name"
]

ctx = snowflake.connector.connect(
user="XXXXXXXXXXXX",
password="XXXXXXXXXXXX",
account="XXXXXXXXXXXX"
)

def gzip_compression(data):
return gzip.compress(bytes(data, "UTF-8"))

def brotli_compression(data):
return brotli.compress(bytes(data, "UTF-8"))


def get_snowflake_data(num_rows):
cols = ",".join(str(element) for element in TRACKER_COLS)
sql_comm = "select "
sql_comm += cols
sql_comm += " from ANALYTICS_DEV_DB.ATOMIC.EVENTS limit " + str(num_rows)

cs = ctx.cursor()
try:
df = cs.execute(sql_comm)
ret = df.fetchall()
finally:
cs.close()
ctx.close()
return ret

def create_event(row, tracker):
subject= Subject().set_user_id(row[2]).set_ip_address(row[3]).set_domain_user_id(row[4]).set_domain_session_index(row[5]).set_network_user_id(row[6]).set_lang(row[10]).set_platform(row[0]).set_screen_resolution(row[13], row[14]).set_domain_session_id(row[22])

contexts = []
contexts.append( SelfDescribingJson(
"iglu:com.my_company/CONTEXTS_COM_SNOWPLOWANALYTICS_SNOWPLOW_WEB_PAGE_1/jsonschema/1-0-0",
row[24]
))
contexts.append( SelfDescribingJson(
"iglu:com.my_company/CONTEXTS_SNOWPLOWANALYTICS_COM_USER_1/jsonschema/1-0-0",
row[25]
))
contexts.append( SelfDescribingJson(
"iglu:com.my_company/CONTEXTS_SNOWPLOWANALYTICS_COM_PRODUCT_1/jsonschema/1-0-0",
row[32]
))
contexts.append( SelfDescribingJson(
"iglu:com.my_company/CONTEXTS_SNOWPLOWANALYTICS_COM_CART_1/jsonschema/1-0-0",
row[33]
))
contexts.append( SelfDescribingJson(
"iglu:com.my_company/CONTEXTS_SNOWPLOWANALYTICS_COM_CHECKOUT_STEP_1/jsonschema/1-0-0",
row[34]
))
contexts.append( SelfDescribingJson(
"iglu:com.my_company/CONTEXTS_SNOWPLOWANALYTICS_COM_TRANSACTION_1/jsonschema/1-0-0",
row[35]
))

if(row[1] == 'page_view'):
event = PageView(row[7], row[8], row[9], subject, context=contexts)
else:
event_name = row[36]
event_types = {"impression": row[26], "product_click": row[31], 'transaction': row[35]}
if event_name in event_types.keys():
event_desc = event_types[event_name]
else:
event_desc="unstruct"

event = SelfDescribing(SelfDescribingJson("iglu:com.snowplowanalytics.snowplow/unstruct_event/1-0-0", event_desc), event_subject=subject, context=contexts)

tracker.track(event)

def run_test(data_length, tracker):
df = get_snowflake_data(data_length)
for row in df:
create_event(row, tracker)
tracker.flush()

def main():
data_length = 10 # Edit for different length tests
e = Emitter("localhost", batch_size=data_length, buffer_capacity=10000000, request_timeout=1)
tracker = Tracker(namespace="snowplow_tracker", emitters=e, encode_base64=False)
run_test(data_length, tracker=tracker)

if __name__ == "__main__":
main()
23 changes: 22 additions & 1 deletion snowplow_tracker/emitters.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def __init__(
custom_retry_codes: Dict[int, bool] = {},
event_store: Optional[EventStore] = None,
session: Optional[requests.Session] = None,
compression = None,
) -> None:
"""
:param endpoint: The collector URL. If protocol is not set in endpoint it will automatically set to "https://" - this is done automatically.
Expand Down Expand Up @@ -117,6 +118,7 @@ def __init__(
self.endpoint = Emitter.as_collector_uri(endpoint, protocol, port, method)

self.method = method
self.compression = compression

if event_store is None:
if buffer_capacity is None:
Expand Down Expand Up @@ -240,6 +242,14 @@ def flush(self) -> None:
if self.bytes_queued is not None:
self.bytes_queued = 0

@staticmethod
def print_request(req):
print('{}\n{}\r\n{}\r\n\r\n{}'.format(
'-----------START-----------',
req.method + ' ' + req.url,
'\r\n'.join('{}: {}'.format(k, v) for k, v in req.headers.items()), ""
))

Comment on lines +245 to +252
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just to demonstrate the size of the request

def http_post(self, data: str) -> int:
"""
:param data: The array of JSONs to be sent
Expand All @@ -248,6 +258,12 @@ def http_post(self, data: str) -> int:
logger.info("Sending POST request to %s..." % self.endpoint)
logger.debug("Payload: %s" % data)
try:
req = requests.Request('POST', url=self.endpoint,
data=data,
headers={"Content-Type": "application/json; charset=utf-8"},
)
prepared = req.prepare()
self.print_request(prepared)
Comment on lines +261 to +266
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also to demonstrate the size of the request and wont be needed

r = self.request_method.post(
self.endpoint,
data=data,
Expand Down Expand Up @@ -309,7 +325,12 @@ def send_events(self, evts: PayloadDictList) -> None:

if self.method == "post":
data = SelfDescribingJson(PAYLOAD_DATA_SCHEMA, evts).to_string()
status_code = self.http_post(data)
if self.compression is not None:
logger.info("Compressing payload data")
compressed = self.compression(data)
status_code = self.http_post(compressed)
else:
status_code = self.http_post(data)
request_succeeded = Emitter.is_good_status_code(status_code)
if request_succeeded:
success_events += evts
Expand Down