[feature] Suggests next available subnet #29#196
[feature] Suggests next available subnet #29#196pranshustuff wants to merge 5 commits intoopenwisp:masterfrom
Conversation
Functioin clean() now calculates next availible subnet and adds it as a suggestion in the error message. Added an api endpoint to get next availible subnet for a particular subnet. And modified tests. Fixes openwisp#29
WalkthroughThis pull request implements a "get next available subnet" feature across the API and model layers. It introduces a new API endpoint ( Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes 🚥 Pre-merge checks | ✅ 4✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@openwisp_ipam/base/models.py`:
- Around line 201-266: The loop over
parent_network.subnets(new_prefix=target_prefix) in get_next_available_subnet
can produce huge iteration counts for large parent networks; add a safeguard by
capping evaluated candidates (e.g., introduce MAX_CANDIDATE_SUBNETS or an
optional parameter max_candidates on get_next_available_subnet) and iterate
using a bounded iterator (islice) so you stop after the limit, returning None
(or raising a specific exception) if the cap is reached; ensure the check still
calls _validate_candidate_subnet and preserves existing organization filtering,
and reference get_next_available_subnet, parent_network.subnets, target_prefix,
and _validate_candidate_subnet when making the change.
🧹 Nitpick comments (6)
openwisp_ipam/tests/test_api.py (2)
155-161: Consider using a more explicit assertion for subnet validation.Using
assertTrue(ip_network(response.data["subnet"]))only checks that the result is truthy. Consider asserting on a specific property to ensure the response is valid.♻️ Suggested improvement
def test_get_next_available_subnet_success(self): subnet = self._create_subnet(subnet="10.0.0.0/24") url = reverse("ipam:get_next_available_subnet", kwargs={"subnet_id": subnet.pk}) response = self.client.get(url) self.assertEqual(response.status_code, 200) self.assertIn("subnet", response.data) - self.assertTrue(ip_network(response.data["subnet"])) + # Verify it's a valid network with the expected prefix length + result_network = ip_network(response.data["subnet"]) + self.assertEqual(result_network.prefixlen, 24)
170-186: Subnets created viaSubnet.objects.createbypass validation and lack organization.The subnets created on lines 176-177 use
Subnet.objects.create()which bypassesfull_clean()and doesn't assign anorganization. This may not accurately simulate real-world conditions where subnets would have an organization. Additionally, thenamefield is set to "Full" which seems like a placeholder.♻️ Consider using `_create_subnet` for consistency
def test_get_next_available_subnet_no_available(self): parent_subnet = self._create_subnet( subnet="10.0.0.0/23" ) # 512 IPs, can fit two /24s # fill all available /24 subnets - Subnet.objects.create(subnet="10.0.0.0/24", name="Full") - Subnet.objects.create(subnet="10.0.1.0/24", name="Full") + self._create_subnet( + subnet="10.0.0.0/24", + organization=parent_subnet.organization + ) + self._create_subnet( + subnet="10.0.1.0/24", + organization=parent_subnet.organization + ) url = reverse( "ipam:get_next_available_subnet", kwargs={"subnet_id": parent_subnet.pk} )openwisp_ipam/api/views.py (1)
177-209: Consider validatingparent_subnetformat andprefixrange.The
prefixis validated as an integer, but there's no validation that:
parent_subnetis a valid CIDR notation string (invalid values will causeip_network()to raiseValueErrorin the model)prefixis within a valid range for the subnet's IP version (0-32 for IPv4, 0-128 for IPv6)While the model method may handle some of these gracefully, returning a user-friendly error at the API layer would improve the developer experience.
♻️ Suggested validation improvements
+from ipaddress import ip_network + class AvailableSubnetView( ProtectedAPIMixin, FilterByOrganizationManaged, RetrieveAPIView ): subnet_model = Subnet queryset = Subnet.objects.none() serializer_class = serializers.Serializer def get(self, request, *args, **kwargs): subnet = get_object_or_404(self.subnet_model, pk=self.kwargs["subnet_id"]) target_prefix = request.query_params.get("prefix") parent_subnet = request.query_params.get("parent_subnet") # Convert prefix to int if provided if target_prefix: try: target_prefix = int(target_prefix) except ValueError: return Response( {"error": _("Invalid prefix length. Must be an integer.")}, status=status.HTTP_400_BAD_REQUEST, ) + # Validate prefix range + max_prefix = 32 if subnet.subnet.version == 4 else 128 + if not (0 <= target_prefix <= max_prefix): + return Response( + {"error": _("Prefix length out of valid range.")}, + status=status.HTTP_400_BAD_REQUEST, + ) + + # Validate parent_subnet format if provided + if parent_subnet: + try: + ip_network(parent_subnet, strict=False) + except ValueError: + return Response( + {"error": _("Invalid parent_subnet format.")}, + status=status.HTTP_400_BAD_REQUEST, + ) next_subnet = subnet.get_next_available_subnet( target_prefix=target_prefix, parent_subnet=parent_subnet )openwisp_ipam/base/models.py (3)
59-75: Improve exception handling with proper chaining.The static analysis correctly identifies that the raised
ValidationErrorloses the original exception context. Usingraise ... from epreserves the traceback chain for debugging. Also, the bareraise eshould just beraise.♻️ Proposed fix for exception handling
except ValidationError as e: next_subnet = self.get_next_available_subnet() if next_subnet: self._suggested_subnet = next_subnet error_dict = e.message_dict.copy() if "subnet" in error_dict: original_message = error_dict["subnet"][0] error_dict["subnet"] = [ f"{original_message} Suggested alternative: {next_subnet}" ] else: error_dict["subnet"] = [ # fallback f"Subnet conflict found. Suggested alternative: {next_subnet}" ] - raise ValidationError(error_dict) + raise ValidationError(error_dict) from e else: - raise e + raise
221-245: Duplicated organization query logic.The
organization_queryconstruction is duplicated in lines 221-226 and 240-245. Consider extracting this into a helper method or computing it once and reusing.♻️ Extract organization query logic
+ def _get_organization_query(self): + """Returns Q object for filtering subnets by organization context.""" + if self.organization: + return Q(organization_id=self.organization_id) | Q(organization_id__isnull=True) + return Q() + def get_next_available_subnet(self, target_prefix=None, parent_subnet=None): # ... earlier code ... else: current_network = ip_network(self.subnet) parent_network = self._find_optimal_parent_subnet(current_network) - if self.organization: - organization_query = Q(organization_id=self.organization_id) | Q( - organization_id__isnull=True - ) - else: - organization_query = Q() + organization_query = self._get_organization_query() # ... middle code ... - if self.organization: - organization_query = Q(organization_id=self.organization_id) | Q( - organization_id__isnull=True - ) - else: - organization_query = Q() + organization_query = self._get_organization_query()
268-286: Consider movingreturn Trueto anelseblock.As noted by static analysis, returning inside a
tryblock can make the code flow less clear. Moving the successful return to anelseblock improves readability.♻️ Improved structure
def _validate_candidate_subnet(self, candidate_subnet): try: temp_instance = self._meta.model( name=self.name, subnet=str(candidate_subnet), description=self.description, master_subnet=self.master_subnet, organization=self.organization, ) temp_instance._validate_multitenant_uniqueness() temp_instance._validate_multitenant_master_subnet() temp_instance._validate_multitenant_unique_child_subnet() temp_instance._validate_overlapping_subnets() temp_instance._validate_master_subnet_consistency() - - return True except ValidationError: return False + else: + return True
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
openwisp_ipam/api/urls.pyopenwisp_ipam/api/views.pyopenwisp_ipam/base/models.pyopenwisp_ipam/tests/test_api.pyopenwisp_ipam/tests/test_models.pytests/openwisp2/sample_ipam/views.py
🧰 Additional context used
🧬 Code graph analysis (5)
tests/openwisp2/sample_ipam/views.py (2)
openwisp_ipam/api/views.py (1)
AvailableSubnetView(177-209)openwisp_ipam/base/models.py (1)
get_next_available_subnet(201-266)
openwisp_ipam/api/views.py (2)
openwisp_ipam/models.py (1)
Subnet(6-9)openwisp_ipam/base/models.py (1)
get_next_available_subnet(201-266)
openwisp_ipam/tests/test_api.py (1)
openwisp_ipam/api/views.py (2)
get(172-174)get(184-209)
openwisp_ipam/api/urls.py (1)
openwisp_ipam/base/models.py (1)
get_next_available_subnet(201-266)
openwisp_ipam/base/models.py (1)
openwisp_ipam/admin.py (1)
organization(235-236)
🪛 Ruff (0.14.14)
openwisp_ipam/api/views.py
[warning] 184-184: Unused method argument: args
(ARG002)
[warning] 184-184: Unused method argument: kwargs
(ARG002)
openwisp_ipam/base/models.py
[warning] 73-73: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
[warning] 75-75: Use raise without specifying exception name
Remove exception name
(TRY201)
[warning] 284-284: Consider moving this statement to an else block
(TRY300)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (11)
- GitHub Check: Python==3.10 | django~=5.2.0
- GitHub Check: Python==3.12 | django~=5.1.0
- GitHub Check: Python==3.12 | django~=5.2.0
- GitHub Check: Python==3.13 | django~=5.1.0
- GitHub Check: Python==3.11 | django~=5.1.0
- GitHub Check: Python==3.12 | django~=4.2.0
- GitHub Check: Python==3.11 | django~=4.2.0
- GitHub Check: Python==3.13 | django~=5.2.0
- GitHub Check: Python==3.11 | django~=5.2.0
- GitHub Check: Python==3.10 | django~=4.2.0
- GitHub Check: Python==3.10 | django~=5.1.0
🔇 Additional comments (8)
openwisp_ipam/api/urls.py (1)
18-22: LGTM!The new URL route follows the established pattern of the existing
get-next-available-ipendpoint. The naming convention is consistent with the codebase.tests/openwisp2/sample_ipam/views.py (2)
57-63: LGTM!The new
AvailableSubnetViewclass follows the established pattern used by other views in this sample module, properly extending the base view and including a descriptive docstring.
10-10: LGTM!The import and module-level binding are consistent with how other views are set up in this file.
Also applies to: 113-113
openwisp_ipam/tests/test_models.py (2)
141-144: LGTM!The test assertions are correctly updated to expect the new error message format that includes the suggested alternative subnet.
Also applies to: 155-158
393-398: LGTM!The assertion for the duplicate subnet validation correctly expects the new message format with the suggested alternative.
openwisp_ipam/api/views.py (1)
320-320: LGTM!The module-level binding correctly exposes the new view.
openwisp_ipam/base/models.py (2)
116-125: LGTM!The new duplicate check for organization-specific subnets correctly prevents creating duplicate subnets within the same organization, complementing the existing shared subnet validation.
77-92: LGTM!The
_find_optimal_parent_subnethelper uses reasonable heuristics to determine parent prefix lengths for both IPv4 and IPv6, providing a good balance between search space and availability.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
| def get_next_available_subnet(self, target_prefix=None, parent_subnet=None): | ||
| if target_prefix is None: | ||
| try: | ||
| network = ( | ||
| ip_network(self.subnet) | ||
| if isinstance(self.subnet, str) | ||
| else self.subnet | ||
| ) | ||
| except ValueError: | ||
| return None | ||
| target_prefix = network.prefixlen | ||
|
|
||
| if parent_subnet: | ||
| parent_network = ip_network(parent_subnet) | ||
| elif self.master_subnet: | ||
| parent_network = ip_network(self.master_subnet.subnet) | ||
| else: | ||
| current_network = ip_network(self.subnet) | ||
| parent_network = self._find_optimal_parent_subnet(current_network) | ||
|
|
||
| if self.organization: | ||
| organization_query = Q(organization_id=self.organization_id) | Q( | ||
| organization_id__isnull=True | ||
| ) | ||
| else: | ||
| organization_query = Q() | ||
|
|
||
| existing_subnets = self._meta.model.objects.filter( | ||
| organization_query | ||
| ).exclude(pk=self.pk) | ||
| existing_networks = [ip_network(sub.subnet) for sub in existing_subnets] | ||
|
|
||
| for existing in existing_networks: | ||
| if parent_network.overlaps(existing) and parent_network.subnet_of( | ||
| existing | ||
| ): | ||
| parent_network = self._find_optimal_parent_subnet(existing) | ||
| break | ||
|
|
||
| if self.organization: | ||
| organization_query = Q(organization_id=self.organization_id) | Q( | ||
| organization_id__isnull=True | ||
| ) | ||
| else: | ||
| organization_query = Q() | ||
|
|
||
| existing_subnets = self._meta.model.objects.filter(organization_query).exclude( | ||
| pk=self.pk | ||
| ) | ||
| existing_networks = { | ||
| ip_network(sub.subnet) | ||
| for sub in existing_subnets | ||
| if ip_network(sub.subnet).subnet_of(parent_network) | ||
| or ip_network(sub.subnet).overlaps(parent_network) | ||
| } | ||
|
|
||
| for candidate_subnet in parent_network.subnets(new_prefix=target_prefix): | ||
| if any( | ||
| candidate_subnet.overlaps(existing) for existing in existing_networks | ||
| ): | ||
| continue | ||
|
|
||
| if self._validate_candidate_subnet(candidate_subnet): | ||
| return str(candidate_subnet) | ||
|
|
||
| return None |
There was a problem hiding this comment.
Potential performance concern with large parent networks.
When parent_network is large (e.g., a /8 network with target_prefix /24), parent_network.subnets(new_prefix=target_prefix) generates 2^16 = 65,536 candidate subnets to iterate through. For IPv6 with larger prefix differences, this could be exponentially worse.
Consider adding a safeguard to limit the number of candidates evaluated or document the expected behavior for large ranges.
💡 Suggested safeguard
+ MAX_CANDIDATES = 10000 # Prevent runaway iteration
+ candidate_count = 0
for candidate_subnet in parent_network.subnets(new_prefix=target_prefix):
+ candidate_count += 1
+ if candidate_count > MAX_CANDIDATES:
+ return None # Too many candidates to evaluate
+
if any(
candidate_subnet.overlaps(existing) for existing in existing_networks
):
continue📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def get_next_available_subnet(self, target_prefix=None, parent_subnet=None): | |
| if target_prefix is None: | |
| try: | |
| network = ( | |
| ip_network(self.subnet) | |
| if isinstance(self.subnet, str) | |
| else self.subnet | |
| ) | |
| except ValueError: | |
| return None | |
| target_prefix = network.prefixlen | |
| if parent_subnet: | |
| parent_network = ip_network(parent_subnet) | |
| elif self.master_subnet: | |
| parent_network = ip_network(self.master_subnet.subnet) | |
| else: | |
| current_network = ip_network(self.subnet) | |
| parent_network = self._find_optimal_parent_subnet(current_network) | |
| if self.organization: | |
| organization_query = Q(organization_id=self.organization_id) | Q( | |
| organization_id__isnull=True | |
| ) | |
| else: | |
| organization_query = Q() | |
| existing_subnets = self._meta.model.objects.filter( | |
| organization_query | |
| ).exclude(pk=self.pk) | |
| existing_networks = [ip_network(sub.subnet) for sub in existing_subnets] | |
| for existing in existing_networks: | |
| if parent_network.overlaps(existing) and parent_network.subnet_of( | |
| existing | |
| ): | |
| parent_network = self._find_optimal_parent_subnet(existing) | |
| break | |
| if self.organization: | |
| organization_query = Q(organization_id=self.organization_id) | Q( | |
| organization_id__isnull=True | |
| ) | |
| else: | |
| organization_query = Q() | |
| existing_subnets = self._meta.model.objects.filter(organization_query).exclude( | |
| pk=self.pk | |
| ) | |
| existing_networks = { | |
| ip_network(sub.subnet) | |
| for sub in existing_subnets | |
| if ip_network(sub.subnet).subnet_of(parent_network) | |
| or ip_network(sub.subnet).overlaps(parent_network) | |
| } | |
| for candidate_subnet in parent_network.subnets(new_prefix=target_prefix): | |
| if any( | |
| candidate_subnet.overlaps(existing) for existing in existing_networks | |
| ): | |
| continue | |
| if self._validate_candidate_subnet(candidate_subnet): | |
| return str(candidate_subnet) | |
| return None | |
| def get_next_available_subnet(self, target_prefix=None, parent_subnet=None): | |
| if target_prefix is None: | |
| try: | |
| network = ( | |
| ip_network(self.subnet) | |
| if isinstance(self.subnet, str) | |
| else self.subnet | |
| ) | |
| except ValueError: | |
| return None | |
| target_prefix = network.prefixlen | |
| if parent_subnet: | |
| parent_network = ip_network(parent_subnet) | |
| elif self.master_subnet: | |
| parent_network = ip_network(self.master_subnet.subnet) | |
| else: | |
| current_network = ip_network(self.subnet) | |
| parent_network = self._find_optimal_parent_subnet(current_network) | |
| if self.organization: | |
| organization_query = Q(organization_id=self.organization_id) | Q( | |
| organization_id__isnull=True | |
| ) | |
| else: | |
| organization_query = Q() | |
| existing_subnets = self._meta.model.objects.filter( | |
| organization_query | |
| ).exclude(pk=self.pk) | |
| existing_networks = [ip_network(sub.subnet) for sub in existing_subnets] | |
| for existing in existing_networks: | |
| if parent_network.overlaps(existing) and parent_network.subnet_of( | |
| existing | |
| ): | |
| parent_network = self._find_optimal_parent_subnet(existing) | |
| break | |
| if self.organization: | |
| organization_query = Q(organization_id=self.organization_id) | Q( | |
| organization_id__isnull=True | |
| ) | |
| else: | |
| organization_query = Q() | |
| existing_subnets = self._meta.model.objects.filter(organization_query).exclude( | |
| pk=self.pk | |
| ) | |
| existing_networks = { | |
| ip_network(sub.subnet) | |
| for sub in existing_subnets | |
| if ip_network(sub.subnet).subnet_of(parent_network) | |
| or ip_network(sub.subnet).overlaps(parent_network) | |
| } | |
| MAX_CANDIDATES = 10000 # Prevent runaway iteration | |
| candidate_count = 0 | |
| for candidate_subnet in parent_network.subnets(new_prefix=target_prefix): | |
| candidate_count += 1 | |
| if candidate_count > MAX_CANDIDATES: | |
| return None # Too many candidates to evaluate | |
| if any( | |
| candidate_subnet.overlaps(existing) for existing in existing_networks | |
| ): | |
| continue | |
| if self._validate_candidate_subnet(candidate_subnet): | |
| return str(candidate_subnet) | |
| return None |
🤖 Prompt for AI Agents
In `@openwisp_ipam/base/models.py` around lines 201 - 266, The loop over
parent_network.subnets(new_prefix=target_prefix) in get_next_available_subnet
can produce huge iteration counts for large parent networks; add a safeguard by
capping evaluated candidates (e.g., introduce MAX_CANDIDATE_SUBNETS or an
optional parameter max_candidates on get_next_available_subnet) and iterate
using a bounded iterator (islice) so you stop after the limit, returning None
(or raising a specific exception) if the cap is reached; ensure the check still
calls _validate_candidate_subnet and preserves existing organization filtering,
and reference get_next_available_subnet, parent_network.subnets, target_prefix,
and _validate_candidate_subnet when making the change.
|
As coderabbit pointed out the brute-force algorithm might not work for large parent subnets like /8 or /16. However, limiting the search to 10,000 subnets isn't a good solution. I will try to find a different algorithm to find the next available subnet. |
Checklist
Reference to Existing Issue
Closes #29.
Description of Changes
Function clean() now calculates next available subnet and adds it as a suggestion in the error message.
Added an API endpoint to get next available subnet for a particular subnet. And modified tests.
Screenshot
subnet-issue-2025-10-12_05.56.39.mp4