Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Use the official Python image from the Docker Hub
FROM python:3.9-slim

# Set the working directory in the container
WORKDIR /app

# Copy the requirements file into the container
COPY requirements.txt requirements.txt

# Install the required packages
RUN pip install --no-cache-dir -r requirements.txt

# Copy the entire project directory into the container
COPY . .

# Change directory to /app/examples/v201 where server.py is located
WORKDIR /app/examples/v201

# Expose port 9000
EXPOSE 9000
# EXPOSE 8080

# Run server.py when the container launches
CMD ["python", "central_system.py"]
55 changes: 40 additions & 15 deletions examples/v201/central_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,60 @@
from ocpp.routing import on
from ocpp.v201 import ChargePoint as cp
from ocpp.v201 import call_result
from ocpp.v201.enums import Action

logging.basicConfig(level=logging.INFO)


class ChargePoint(cp):
@on(Action.boot_notification)
@on("BootNotification")
def on_boot_notification(self, charging_station, reason, **kwargs):
return call_result.BootNotification(
current_time=datetime.now(timezone.utc).isoformat(),
interval=10,
status="Accepted",
current_time=datetime.now(timezone.utc).isoformat(), interval=10, status="Accepted"
)

@on(Action.heartbeat)
@on("Heartbeat")
def on_heartbeat(self):
print("Got a Heartbeat!")
return call_result.Heartbeat(
current_time=datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") + "Z"
current_time=datetime.now(timezone.utc).strftime()
)

@on("CancelReservation")
def on_cancel_reservation(self, reservation_id):
return call_result.CancelReservation(
status="Accepted",
)

async def on_connect(websocket):
@on("StatusNotification")
def on_Status_Notification(self,**kwargs):
return call_result.StatusNotification()

@on("SecurityEventNotification")
def on_SecurityEventNotification(self, **kwargs):
return call_result.SecurityEventNotification()

@on("Authorize")
def on_authorize(self, **kwargs):
# logging.info(f"Received Authorize request with id_token: {id_token}")
return call_result.Authorize(id_token_info={"status": "Accepted"})

@on('MeterValues')
def on_meter_values(self, **kwargs):
logging.info("Received MeterValues.")
return call_result.MeterValues()

@on("ReportChargingProfiles")
def on_report_charging_profiles(self, custom_data=None, **kwargs):
logging.info(f"Received ReportChargingProfiles with custom_data: {custom_data}, kwargs: {kwargs}")
# Process the ReportChargingProfiles request here, if needed.

# Return the result with a status, e.g., "Accepted".
return call_result.ReportChargingProfiles()

async def on_connect(websocket, path):
"""For every new charge point that connects, create a ChargePoint
instance and start listening for messages.
"""
try:
requested_protocols = websocket.request.headers["Sec-WebSocket-Protocol"]
requested_protocols = websocket.request_headers["Sec-WebSocket-Protocol"]
except KeyError:
logging.error("Client hasn't requested any Subprotocol. Closing Connection")
return await websocket.close()
Expand All @@ -61,16 +87,15 @@ async def on_connect(websocket):
)
return await websocket.close()

charge_point_id = websocket.request.path.strip("/")
charge_point_id = path.strip("/")
charge_point = ChargePoint(charge_point_id, websocket)

await charge_point.start()


async def main():
# deepcode ignore BindToAllNetworkInterfaces: <Example Purposes>
server = await websockets.serve(
on_connect, "0.0.0.0", 9000, subprotocols=["ocpp2.0.1"]
on_connect, "0.0.0.0", 9000, subprotocols=["ocpp2.0.1"]
)

logging.info("Server Started listening to new connections...")
Expand All @@ -79,4 +104,4 @@ async def main():

if __name__ == "__main__":
# asyncio.run() is used when running this example with Python >= 3.7v
asyncio.run(main())
asyncio.run(main())
176 changes: 156 additions & 20 deletions examples/v201/charge_point.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import asyncio
import logging
import datetime
from dataclasses import dataclass
from typing import List, Dict, Optional, Any

# Attempt to import the 'websockets' package, exiting if not found
try:
import websockets
except ModuleNotFoundError:
Expand All @@ -12,43 +16,175 @@

sys.exit(1)


# Import necessary classes from the OCPP 2.0.1 package
from ocpp.v201 import ChargePoint as cp
from ocpp.v201 import call

# Configure logging to display information
logging.basicConfig(level=logging.INFO)

# Data class to represent a sampled value in a meter reading
@dataclass
class SampledValue:
value: str
context: str = "Sample.Periodic"
format: str = "Raw"
measurand: str = "Energy.Active.Import.Register"
unit: str = "Wh"

# Data class to represent a meter value containing sampled values and a timestamp
@dataclass
class MeterValue:
timestamp: str
sampled_value: List[SampledValue]


# Define the ChargePoint class, inheriting from the OCPP ChargePoint class
class ChargePoint(cp):

# Method to periodically send heartbeat messages to the central system
async def send_heartbeat(self, interval):
request = call.Heartbeat()
while True:
await self.call(request)
try:
await self.call(request) # Send the heartbeat request
except Exception as e:
logging.error(f"Failed to send Heartbeat: {e}")
await asyncio.sleep(interval)

# Method to send a boot notification to the central system
async def send_boot_notification(self):
request = call.BootNotification(
charging_station={"model": "Wallbox XYZ", "vendor_name": "anewone"},
reason="PowerUp",
)
response = await self.call(request)
try:
request = call.BootNotification(
charging_station={"model": "Wallbox XYZ", "vendor_name": "anewone"},
reason="PowerUp",
)
response = await self.call(request)

if response.status == "Accepted":
print("Connected to central system.")
await self.send_heartbeat(response.interval)
if response.status == "Accepted":
await self.send_heartbeat(response.interval)

except Exception as e:
logging.error(f"Failed to send Boot Notification: {e}")

# Method to send a cancel reservation request to the central system
async def send_cancel_reservation(self, reservation_id):
try:
request = call.CancelReservation(
reservation_id=reservation_id
)
response = await self.call(request) # Send the cancel reservation request

if response is not None and response.status == "Accepted":
print("**********Reservation successfully canceled.********* \n")
else:
print("Failed to cancel reservation.\n")
except Exception as e:
logging.error(f"Failed to cancel reservation: {e}")

# Method to send a status notification to the central system
async def send_status_notification(self, evse_id: int, connector_id: int, is_connected: bool):
try:
connector_status = "Available" if is_connected else "Unavailable" # Determine connector status based on connection state
request = call.StatusNotification(
timestamp=self.get_current_time(),
connector_status=connector_status,
connector_id=connector_id,
evse_id=evse_id,
)
response = await self.call(request)

Check warning on line 94 in examples/v201/charge_point.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove the unused local variable "response".

See more on https://sonarcloud.io/project/issues?id=mobilityhouse_ocpp&issues=AZqtJOnp2M1tEHgWbwob&open=AZqtJOnp2M1tEHgWbwob&pullRequest=746

except Exception as e:
logging.error(f"Failed to send Status Notification: {e}")

# Method to send a security event notification to the central system
async def send_security_event_notification(self):
try:
request = call.SecurityEventNotification(
type="Secure",
timestamp=self.get_current_time(),
)
response = await self.call(request)

Check warning on line 106 in examples/v201/charge_point.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove the unused local variable "response".

See more on https://sonarcloud.io/project/issues?id=mobilityhouse_ocpp&issues=AZqtJOnp2M1tEHgWbwoc&open=AZqtJOnp2M1tEHgWbwoc&pullRequest=746

except Exception as e:
logging.error(f"Failed to send Security Event Notification: {e}")

async def main():
async with websockets.connect(
"ws://localhost:9000/CP_1", subprotocols=["ocpp2.0.1"]
) as ws:

charge_point = ChargePoint("CP_1", ws)
await asyncio.gather(
charge_point.start(), charge_point.send_boot_notification()
)
async def send_meter_values(self, evse_id: int, sampled_values: List[SampledValue]):
try:
# Create a meter value dictionary with the current timestamp and sampled values
meter_value = {
'timestamp': self.get_current_time(),
'sampledValue': [
{
'value': sv.value,
} for sv in sampled_values
]
}

request = call.MeterValues(
evse_id=evse_id,
meter_value=[meter_value]
)
response = await self.call(request)

Check warning on line 128 in examples/v201/charge_point.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove the unused local variable "response".

See more on https://sonarcloud.io/project/issues?id=mobilityhouse_ocpp&issues=AZqtJOnp2M1tEHgWbwod&open=AZqtJOnp2M1tEHgWbwod&pullRequest=746

except Exception as e:
logging.error(f"Failed to send MeterValues: {e}")

# Helper method to get the current time in ISO 8601 format
def get_current_time(self):
return datetime.datetime.now().isoformat()

# Method to send an authorization request to the central system
async def send_authorization(self):
try:
request = call.Authorize(
id_token={"idToken": "13", "type": "ISO14443" }
)
response = await self.call(request)

Check warning on line 143 in examples/v201/charge_point.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove the unused local variable "response".

See more on https://sonarcloud.io/project/issues?id=mobilityhouse_ocpp&issues=AZqtJOnp2M1tEHgWbwoe&open=AZqtJOnp2M1tEHgWbwoe&pullRequest=746

except Exception as e:
logging.error(f"Failed to send Authorization: {e}")





# Main function to start the ChargePoint and send various messages
async def main():
try:
async with websockets.connect(
"ws://localhost:9000/CP_1", subprotocols=["ocpp2.0.1"]

) as ws:
charge_point = ChargePoint("CP_1", ws)
reservation_id = 12345

# Create a list of sampled values for the meter values message
sampled_values = [
SampledValue(value=2000),
SampledValue(value=1000)
]

is_vehicle_connected = True

# Gather and run all asynchronous tasks concurrently
await asyncio.gather(
charge_point.start(),
charge_point.send_boot_notification(),
charge_point.send_cancel_reservation(reservation_id),
charge_point.send_status_notification(evse_id=2, connector_id=1, is_connected=is_vehicle_connected),
charge_point.send_security_event_notification(),
charge_point.send_meter_values(evse_id=1, sampled_values = sampled_values),
charge_point.send_authorization(),

)

except Exception as e:
logging.error(f"Connection failed: {e}")

# Entry point of the script
if __name__ == "__main__":
# asyncio.run() is used when running this example with Python >= 3.7v
asyncio.run(main())