-
Notifications
You must be signed in to change notification settings - Fork 55
Open Telemetry Transport #1229
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Open Telemetry Transport #1229
Changes from 11 commits
d34edd4
15c0403
57d50cc
e3aca57
bc0b572
22f3bb9
5c3f188
00116f6
069ef52
19eca12
ae430bb
d9c9779
9462017
1ffd6a2
1884372
0217af5
8b163f0
a4bf7e1
8431a88
5fe0df4
1e2c4ce
e17c449
447682d
c5246b5
b366b54
238c883
7c56dde
393f6e2
5e228ea
ef62d8f
e586a03
c39a1b4
b709d12
1036729
adea362
116086b
4c26218
a9042ab
42e35ce
08c913a
a4e986e
e20f0dc
a615853
05dbfd0
6362d2b
81278d9
b851c34
b7d7a30
e4a076e
af71d6c
af50ba2
75334ef
300584f
62dc253
d627f14
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,78 @@ | ||
| import datetime | ||
| import json | ||
| import urllib | ||
| from base64 import b64encode | ||
|
|
||
| from urllib.parse import urlparse | ||
|
|
||
| import requests | ||
|
|
||
|
|
||
| class ClientAuth(object): | ||
| def __init__(self, configuration, headers): | ||
| self.configuration = configuration | ||
| self.headers = headers | ||
| if self.configuration.auth == "oauth2": | ||
| self.auth = OAuth2(self.configuration, self.headers) | ||
| elif self.configuration.auth == "bearer": | ||
| self.auth = BearerToken(self.configuration, self.headers) | ||
| else: | ||
| self.auth = NoAuth(self.configuration, self.headers) | ||
|
|
||
| def authenticate(self): | ||
| return self.auth.authenticate() | ||
|
|
||
| class NoAuth(object): | ||
| def __init__(self, configuration, headers): | ||
| self.headers = headers | ||
|
|
||
| def authenticate(self): | ||
| # Add a header just so for traceability | ||
| self.headers["x-no-auth"]="true" | ||
| return True | ||
|
|
||
| # Simple Authorization Header as Bearer Token using `api_key` configuration | ||
| class BearerToken(object): | ||
| def __init__(self, configuration, headers): | ||
| headers.set("Authorization", "Bearer " + configuration.api_key) | ||
|
|
||
| def authenticate(self): | ||
| return True | ||
|
|
||
| # Implement https://datatracker.ietf.org/doc/html/rfc6749#section-4.4 flow | ||
| class OAuth2(object): | ||
| def __init__(self, configuration, headers): | ||
| self.headers = headers # Headers modified for external requests | ||
| self.client_id = configuration.client_id | ||
| self.client_secret = configuration.client_secret | ||
| self.token_url = configuration.token_url | ||
| scopes = " ".join(configuration.scopes) | ||
| # Payload Body for the token exchange | ||
| self.auth_request = "grant_type=client_credentials&scope=" + urllib.parse.quote_plus(scopes) | ||
| # Our token to use in requests | ||
| self.token = None | ||
| # When the token expires | ||
| self.expiry_time = datetime.datetime.now() | ||
| self.verify_ssl = configuration.verify_server_certificate | ||
| # Authentication Headers for the token exchange | ||
| authorization_header_value = b64encode( bytes(('' + self.client_id + ':' + self.client_secret).encode("utf-8"))).decode('utf-8') | ||
| self.auth_headers = { "Content-Type": "application/x-www-form-urlencoded", "Authorization" : "Basic " + authorization_header_value} | ||
|
|
||
| def authenticate(self): | ||
| if self.token == None or self.expiry_time < datetime.datetime.now(): | ||
| print("Request/Refresh OAuth2 Token") | ||
| if not self.refresh_token(): | ||
| raise Exception("OAuth2: Unable to refresh token") | ||
| self.headers["Authorization"]="Bearer " + self.token | ||
| return True | ||
|
|
||
| def refresh_token(self): | ||
| resp = requests.post(self.token_url, data=self.auth_request, headers=self.auth_headers, verify=self.verify_ssl) | ||
| if resp.status_code == 200: | ||
| auth_response = json.loads(resp.content) | ||
| self.token = auth_response["access_token"] | ||
| self.expiry_time = datetime.datetime.now() + datetime.timedelta(seconds=auth_response["expires_in"]) | ||
| return True | ||
| else: | ||
| raise Exception("Unable to obtain OAuth2 Token: " + str(resp.status_code) + "(" + str(resp.content) + ")") | ||
| return False | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -765,6 +765,8 @@ def print_useful_settings(self, other_config=None): | |
| "default_sessions_per_worker", | ||
| "default_worker_session_status_message_interval", | ||
| "enable_worker_session_process_metrics_gather", | ||
| "server_url" | ||
| "transport", | ||
| # NOTE: It's important we use sanitzed_ version of this method which masks the API key | ||
| "sanitized_worker_configs", | ||
| ] | ||
|
|
@@ -1497,6 +1499,41 @@ def api_key(self): | |
| """Returns the configuration value for 'api_key'.""" | ||
| return self.__get_config().get_string("api_key") | ||
|
|
||
| @property | ||
| def server_url(self): | ||
| """Returns the configuration value for 'server_url'.""" | ||
| return self.__get_config().get_string("server_url") | ||
|
|
||
| @property | ||
| def auth(self): | ||
| """Returns the configuration value for 'auth'.""" | ||
| return self.__get_config().get_string("auth") | ||
|
|
||
| @property | ||
| def client_id(self): | ||
|
||
| """Returns the configuration value for 'client_id'.""" | ||
| return self.__get_config().get_string("client_id") | ||
|
|
||
| @property | ||
| def client_secret(self): | ||
| """Returns the configuration value for 'client_secret'.""" | ||
| return self.__get_config().get_string("client_secret") | ||
|
|
||
| @property | ||
| def token_url(self): | ||
| """Returns the configuration value for 'token_url'.""" | ||
| return self.__get_config().get_string("token_url") | ||
|
|
||
| @property | ||
| def scopes(self): | ||
| """Returns the configuration value for 'scopes'.""" | ||
| return self.__get_config().get_json_array("scopes") | ||
|
|
||
| @property | ||
| def transport(self): | ||
| """Returns the configuration value for 'transport'.""" | ||
| return self.__get_config().get_string("transport") | ||
|
|
||
| @property | ||
| def scalyr_server(self): | ||
| """Returns the configuration value for 'scalyr_server'.""" | ||
|
|
@@ -2826,6 +2863,27 @@ def __verify_main_config_and_apply_defaults( | |
| apply_defaults, | ||
| env_aware=True, | ||
| ) | ||
| self.__verify_or_set_optional_string( | ||
| config, | ||
| "transport", | ||
| "", | ||
| description, | ||
| apply_defaults | ||
| ) | ||
| self.__verify_or_set_optional_string( | ||
| config, | ||
| "server_url", | ||
| "", | ||
| description, | ||
| apply_defaults | ||
| ) | ||
| self.__verify_or_set_optional_string( | ||
| config, | ||
| "auth", | ||
| "", | ||
| description, | ||
| apply_defaults | ||
| ) | ||
| self.__verify_or_set_optional_bool( | ||
| config, | ||
| "use_new_ingestion", | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -212,7 +212,8 @@ def __init__(self, add_events_request, completion_callback): | |||
| # If there is a AddEventsTask object already created for the next request due to pipelining, this is set to it. | ||||
| # This must be the next request if this request is successful, otherwise, we will lose bytes. | ||||
| self.next_pipelined_task = None | ||||
|
|
||||
| # last status | ||||
| self.__receive_response_status = () | ||||
|
|
||||
| class CopyingManagerWorkerSessionInterface(six.with_metaclass(ABCMeta)): | ||||
| """ | ||||
|
|
@@ -354,6 +355,7 @@ def __init__(self, configuration, worker_config_entry, session_id, is_daemon=Fal | |||
|
|
||||
| # The current pending AddEventsTask. We will retry the contained AddEventsRequest several times. | ||||
| self.__pending_add_events_task = None | ||||
| self.__receive_response_status = {} | ||||
|
||||
| in self.__pending_add_events_task.__receive_response_status |
Trying to resolve this error (it doesn't):
2024-01-26 17:17:51.891Z ERROR [core] [scalyr_agent.scalyr_logging:707] Failed while attempting to scan and transmit logs :stack_trace:
Traceback (most recent call last):
File "/Users/anthonyj/_Scalyr/scalyr-agent-2/scalyr_agent/copying_manager/worker.py", line 500, in run
"Repeatedly failed to parse response due to exception. Dropping events",
AttributeError: 'AddEventsTask' object has no attribute '_CopyingManagerWorkerSession__receive_response_status'
I defined the attribute in hopes that this was just a sequencing issue in the way the scalyr_client works, but this didn't fix it. Happens when we start to hit publishing delays. Easiest way for me to hit it is to Debug Agent;Pause Execution for X minutes;Resume Agent
Since this is experimental, I'm comfortable shipping this bug to the customer so they can provide feedback on the feature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant the second one on the line 358. I believe that's a mistake.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't there a python library for OAuth2 to avoid having an in-house implementation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Python2 requirement paints us into some really weird corners like having to be on older versions of code that opens up security risks. This seems like the path of least resistance. I originally used the OpenTelemetry Python SDK, but had to back out of that because of no Python2 support.