From 009527f2ab352a51eeee130923025a6173a26970 Mon Sep 17 00:00:00 2001 From: Jack-Keene Date: Wed, 22 Nov 2023 10:28:41 +0000 Subject: [PATCH 1/5] Add compression option to emitter --- snowplow_tracker/emitters.py | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/snowplow_tracker/emitters.py b/snowplow_tracker/emitters.py index af233566..aaa99637 100644 --- a/snowplow_tracker/emitters.py +++ b/snowplow_tracker/emitters.py @@ -22,6 +22,8 @@ import random from typing import Optional, Union, Tuple, Dict from queue import Queue +import gzip +import brotli from snowplow_tracker.self_describing_json import SelfDescribingJson from snowplow_tracker.typing import ( @@ -70,6 +72,7 @@ def __init__( custom_retry_codes: Dict[int, bool] = {}, event_store: Optional[EventStore] = None, session: Optional[requests.Session] = None, + compression = None, ) -> None: """ :param endpoint: The collector URL. If protocol is not set in endpoint it will automatically set to "https://" - this is done automatically. @@ -117,6 +120,7 @@ def __init__( self.endpoint = Emitter.as_collector_uri(endpoint, protocol, port, method) self.method = method + self.compression = compression if event_store is None: if buffer_capacity is None: @@ -240,6 +244,14 @@ def flush(self) -> None: if self.bytes_queued is not None: self.bytes_queued = 0 + @staticmethod + def print_request(req): + print('{}\n{}\r\n{}\r\n\r\n{}'.format( + '-----------START-----------', + req.method + ' ' + req.url, + '\r\n'.join('{}: {}'.format(k, v) for k, v in req.headers.items()), "" + )) + def http_post(self, data: str) -> int: """ :param data: The array of JSONs to be sent @@ -248,12 +260,14 @@ def http_post(self, data: str) -> int: logger.info("Sending POST request to %s..." % self.endpoint) logger.debug("Payload: %s" % data) try: - r = self.request_method.post( - self.endpoint, + req = requests.Request('POST', url=self.endpoint, data=data, headers={"Content-Type": "application/json; charset=utf-8"}, - timeout=self.request_timeout, ) + prepared = req.prepare() + self.print_request(prepared) # + s = requests.Session() + s.send(prepared,timeout=self.request_timeout) except requests.RequestException as e: logger.warning(e) return -1 @@ -309,7 +323,12 @@ def send_events(self, evts: PayloadDictList) -> None: if self.method == "post": data = SelfDescribingJson(PAYLOAD_DATA_SCHEMA, evts).to_string() - status_code = self.http_post(data) + if self.compression is not None: + logger.info("Compressing payload data") + compressed = self.compression(data) + status_code = self.http_post(compressed) + else: + status_code = self.http_post(data) request_succeeded = Emitter.is_good_status_code(status_code) if request_succeeded: success_events += evts From ae9c280382dafcc12685baccfcbc87965a89fd38 Mon Sep 17 00:00:00 2001 From: Jack-Keene Date: Wed, 22 Nov 2023 10:29:46 +0000 Subject: [PATCH 2/5] remove dependencies --- snowplow_tracker/emitters.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/snowplow_tracker/emitters.py b/snowplow_tracker/emitters.py index aaa99637..65f3d942 100644 --- a/snowplow_tracker/emitters.py +++ b/snowplow_tracker/emitters.py @@ -22,8 +22,6 @@ import random from typing import Optional, Union, Tuple, Dict from queue import Queue -import gzip -import brotli from snowplow_tracker.self_describing_json import SelfDescribingJson from snowplow_tracker.typing import ( From 138df10c535713bac5052a5fb60cabae409faf62 Mon Sep 17 00:00:00 2001 From: Jack-Keene Date: Wed, 22 Nov 2023 10:33:26 +0000 Subject: [PATCH 3/5] Add compression example app --- examples/compression_example/app.py | 138 ++++++++++++++++++++++++++++ 1 file changed, 138 insertions(+) create mode 100644 examples/compression_example/app.py diff --git a/examples/compression_example/app.py b/examples/compression_example/app.py new file mode 100644 index 00000000..9731514b --- /dev/null +++ b/examples/compression_example/app.py @@ -0,0 +1,138 @@ +from snowplow_tracker import ( + Tracker, + Emitter, + Subject, + SelfDescribingJson, + PageView, + SelfDescribing, +) +import snowflake.connector +import brotli +import gzip + + +TRACKER_COLS = [ +"PLATFORM", +"EVENT", +"USER_ID", +"USER_IPADDRESS", +"DOMAIN_USERID", +"DOMAIN_SESSIONIDX", +"NETWORK_USERID", +"PAGE_URL", +"PAGE_TITLE", +"PAGE_REFERRER", +"BR_LANG", +"BR_COOKIES", +"BR_COLORDEPTH", +"BR_VIEWWIDTH", +"BR_VIEWHEIGHT", +"OS_TIMEZONE", +"DVCE_SCREENWIDTH", +"DVCE_SCREENHEIGHT", +"DOC_CHARSET", +"DOC_WIDTH", +"DOC_HEIGHT", +"DVCE_SENT_TSTAMP", +"DOMAIN_SESSIONID", +"TRUE_TSTAMP", +"CONTEXTS_COM_SNOWPLOWANALYTICS_SNOWPLOW_WEB_PAGE_1", +"CONTEXTS_SNOWPLOWANALYTICS_COM_USER_1", +"UNSTRUCT_EVENT_SNOWPLOWANALYTICS_COM_IMPRESSION_1", +"UNSTRUCT_EVENT_SNOWPLOWANALYTICS_COM_CLICK_1", +"UNSTRUCT_EVENT_SNOWPLOWANALYTICS_COM_ACTIVATION_1", +"UNSTRUCT_EVENT_SNOWPLOWANALYTICS_COM_VIEW_1", +"UNSTRUCT_EVENT_SNOWPLOWANALYTICS_COM_DISMISS_1", +"UNSTRUCT_EVENT_SNOWPLOWANALYTICS_COM_PRODUCT_CLICK_1", +"CONTEXTS_SNOWPLOWANALYTICS_COM_PRODUCT_1", +"CONTEXTS_SNOWPLOWANALYTICS_COM_CART_1", +"CONTEXTS_SNOWPLOWANALYTICS_COM_CHECKOUT_STEP_1", +"CONTEXTS_SNOWPLOWANALYTICS_COM_TRANSACTION_1", +"event_name" +] + +ctx = snowflake.connector.connect( + user="XXXXXXXXXXXX", + password="XXXXXXXXXXXX", + account="XXXXXXXXXXXX" +) + +def gzip_compression(data): + return gzip.compress(bytes(data, "UTF-8")) + +def brotli_compression(data): + return brotli.compress(bytes(data, "UTF-8")) + + +def get_snowflake_data(num_rows): + cols = ",".join(str(element) for element in TRACKER_COLS) + sql_comm = "select " + sql_comm += cols + sql_comm += " from ANALYTICS_DEV_DB.ATOMIC.EVENTS limit " + str(num_rows) + + cs = ctx.cursor() + try: + df = cs.execute(sql_comm) + ret = df.fetchall() + finally: + cs.close() + ctx.close() + return ret + +def create_event(row, tracker): + subject= Subject().set_user_id(row[2]).set_ip_address(row[3]).set_domain_user_id(row[4]).set_domain_session_index(row[5]).set_network_user_id(row[6]).set_lang(row[10]).set_platform(row[0]).set_screen_resolution(row[13], row[14]).set_domain_session_id(row[22]) + + contexts = [] + contexts.append( SelfDescribingJson( + "iglu:com.my_company/CONTEXTS_COM_SNOWPLOWANALYTICS_SNOWPLOW_WEB_PAGE_1/jsonschema/1-0-0", + row[24] + )) + contexts.append( SelfDescribingJson( + "iglu:com.my_company/CONTEXTS_SNOWPLOWANALYTICS_COM_USER_1/jsonschema/1-0-0", + row[25] + )) + contexts.append( SelfDescribingJson( + "iglu:com.my_company/CONTEXTS_SNOWPLOWANALYTICS_COM_PRODUCT_1/jsonschema/1-0-0", + row[32] + )) + contexts.append( SelfDescribingJson( + "iglu:com.my_company/CONTEXTS_SNOWPLOWANALYTICS_COM_CART_1/jsonschema/1-0-0", + row[33] + )) + contexts.append( SelfDescribingJson( + "iglu:com.my_company/CONTEXTS_SNOWPLOWANALYTICS_COM_CHECKOUT_STEP_1/jsonschema/1-0-0", + row[34] + )) + contexts.append( SelfDescribingJson( + "iglu:com.my_company/CONTEXTS_SNOWPLOWANALYTICS_COM_TRANSACTION_1/jsonschema/1-0-0", + row[35] + )) + + if(row[1] == 'page_view'): + event = PageView(row[7], row[8], row[9], subject, context=contexts) + else: + event_name = row[36] + event_types = {"impression": row[26], "product_click": row[31], 'transaction': row[35]} + if event_name in event_types.keys(): + event_desc = event_types[event_name] + else: + event_desc="unstruct" + + event = SelfDescribing(SelfDescribingJson("iglu:com.snowplowanalytics.snowplow/unstruct_event/1-0-0", event_desc), event_subject=subject, context=contexts) + + tracker.track(event) + +def run_test(data_length, tracker): + df = get_snowflake_data(data_length) + for row in df: + create_event(row, tracker) + tracker.flush() + +def main(): + data_length = 10 # Edit for different length tests + e = Emitter("localhost", batch_size=data_length, buffer_capacity=10000000, request_timeout=1) + tracker = Tracker(namespace="snowplow_tracker", emitters=e, encode_base64=False) + run_test(data_length, tracker=tracker) + +if __name__ == "__main__": + main() From 8d5b285833ce0e58edadedb1672daa20f84b7107 Mon Sep 17 00:00:00 2001 From: Jack-Keene Date: Wed, 22 Nov 2023 11:04:12 +0000 Subject: [PATCH 4/5] fix request naming --- snowplow_tracker/emitters.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/snowplow_tracker/emitters.py b/snowplow_tracker/emitters.py index 65f3d942..764555d9 100644 --- a/snowplow_tracker/emitters.py +++ b/snowplow_tracker/emitters.py @@ -264,8 +264,8 @@ def http_post(self, data: str) -> int: ) prepared = req.prepare() self.print_request(prepared) # - s = requests.Session() - s.send(prepared,timeout=self.request_timeout) + r = requests.Session() + r.send(prepared,timeout=self.request_timeout) except requests.RequestException as e: logger.warning(e) return -1 From 4220b0aaaf9fb6bd3b661cf6f85051b38d1e896d Mon Sep 17 00:00:00 2001 From: Jack-Keene Date: Wed, 22 Nov 2023 11:07:14 +0000 Subject: [PATCH 5/5] Add request method back --- snowplow_tracker/emitters.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/snowplow_tracker/emitters.py b/snowplow_tracker/emitters.py index 764555d9..92753058 100644 --- a/snowplow_tracker/emitters.py +++ b/snowplow_tracker/emitters.py @@ -263,9 +263,13 @@ def http_post(self, data: str) -> int: headers={"Content-Type": "application/json; charset=utf-8"}, ) prepared = req.prepare() - self.print_request(prepared) # - r = requests.Session() - r.send(prepared,timeout=self.request_timeout) + self.print_request(prepared) + r = self.request_method.post( + self.endpoint, + data=data, + headers={"Content-Type": "application/json; charset=utf-8"}, + timeout=self.request_timeout, + ) except requests.RequestException as e: logger.warning(e) return -1