Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: update utcnow to now(timezone.utc) #189

Merged
merged 2 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions google/cloud/alloydb/connector/refresh.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from __future__ import annotations

import asyncio
from datetime import datetime
from datetime import datetime, timezone
import logging
import ssl
from tempfile import TemporaryDirectory
Expand All @@ -36,7 +36,7 @@


def _seconds_until_refresh(
expiration: datetime, now: datetime = datetime.utcnow()
expiration: datetime, now: datetime = datetime.now(timezone.utc)
) -> int:
"""
Calculates the duration to wait before starting the next refresh.
Expand All @@ -45,7 +45,7 @@ def _seconds_until_refresh(

Args:
expiration (datetime.datetime): Time of certificate expiration.
now (datetime.datetime): Current time. Defaults to datetime.utcnow()
now (datetime.datetime): Current time (UTC)
Returns:
int: Time in seconds to wait before performing next refresh.
"""
Expand Down Expand Up @@ -92,7 +92,7 @@ def __init__(
ca_cert, cert_chain = certs
# get expiration from client certificate
cert_obj = x509.load_pem_x509_certificate(cert_chain[0].encode("UTF-8"))
self.expiration = cert_obj.not_valid_after
self.expiration = cert_obj.not_valid_after.replace(tzinfo=timezone.utc)
Copy link
Collaborator Author

@jackwotherspoon jackwotherspoon Dec 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cert is in UTC "Zulu" format so making it timezone aware for utc is fine here (basically a no-op).
https://cloud.google.com/sql/docs/mysql/admin-api/rest/v1beta4/sslCerts#SslCert


# tmpdir and its contents are automatically deleted after the CA cert
# and cert chain are loaded into the SSLcontext. The values
Expand All @@ -109,7 +109,7 @@ async def _is_valid(task: asyncio.Task) -> bool:
try:
result = await task
# valid if current time is before cert expiration
if datetime.utcnow() < result.expiration:
if datetime.now(timezone.utc) < result.expiration:
return True
except Exception:
# suppress any errors from task
Expand Down
10 changes: 5 additions & 5 deletions tests/unit/mocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from typing import Any, Callable, List, Optional, Tuple

from cryptography import x509
Expand All @@ -29,7 +29,7 @@ def __init__(self) -> None:
def refresh(self, request: Callable) -> None:
"""Refreshes the access token."""
self.token = "12345"
self.expiry = datetime.utcnow() + timedelta(minutes=60)
self.expiry = datetime.now(timezone.utc) + timedelta(minutes=60)

@property
def expired(self) -> bool:
Expand Down Expand Up @@ -67,7 +67,7 @@ def generate_cert(
# generate private key
key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
# calculate expiry time
now = datetime.utcnow()
now = datetime.now(timezone.utc)
expiration = now + timedelta(minutes=expires_in)
# configure cert subject
subject = issuer = x509.Name(
Expand Down Expand Up @@ -103,8 +103,8 @@ def __init__(
name: str = "test-instance",
ip_address: str = "127.0.0.1",
server_name: str = "00000000-0000-0000-0000-000000000000.server.alloydb",
cert_before: datetime = datetime.utcnow(),
cert_expiry: datetime = datetime.utcnow() + timedelta(hours=1),
cert_before: datetime = datetime.now(timezone.utc),
cert_expiry: datetime = datetime.now(timezone.utc) + timedelta(hours=1),
) -> None:
self.project = project
self.region = region
Expand Down
12 changes: 7 additions & 5 deletions tests/unit/test_refresh.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
import ssl

from cryptography import x509
Expand Down Expand Up @@ -41,7 +41,7 @@ def test_seconds_until_refresh_under_1_hour_over_4_mins() -> None:
If expiration is under 1 hour and over 4 minutes,
should return duration-refresh_buffer (refresh_buffer = 4 minutes).
"""
now = datetime.now()
now = datetime.now(timezone.utc)
assert _seconds_until_refresh(now + timedelta(minutes=5), now) == 60


Expand All @@ -50,7 +50,9 @@ def test_seconds_until_refresh_under_4_mins() -> None:
Test _seconds_until_refresh returns proper time in seconds.
If expiration is under 4 minutes, should return 0.
"""
assert _seconds_until_refresh(datetime.now() + timedelta(minutes=3)) == 0
assert (
_seconds_until_refresh(datetime.now(timezone.utc) + timedelta(minutes=3)) == 0
)


def test_RefreshResult_init_(fake_instance: FakeInstance) -> None:
Expand All @@ -68,8 +70,8 @@ def test_RefreshResult_init_(fake_instance: FakeInstance) -> None:
.issuer_name(fake_instance.intermediate_cert.issuer)
.public_key(key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.now())
.not_valid_after(datetime.now() + timedelta(minutes=10))
.not_valid_before(datetime.now(timezone.utc))
.not_valid_after(datetime.now(timezone.utc) + timedelta(minutes=10))
)
# sign client cert with intermediate cert
client_cert = client_cert.sign(fake_instance.intermediate_key, hashes.SHA256())
Expand Down