From cda02480f381906a785af4dfda872f2e7ea0975d Mon Sep 17 00:00:00 2001 From: quekie Date: Thu, 20 Nov 2025 23:18:16 +0800 Subject: [PATCH] =?UTF-8?q?=E9=98=BF=E9=87=8C=E4=BA=91ESA=E5=A4=9A?= =?UTF-8?q?=E5=9B=9E=E6=BA=90IP=E5=85=BC=E5=AE=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ddns/__main__.py | 44 +++++++++++++++++++++++++++ ddns/provider/aliesa.py | 67 +++++++++++++++++++++++++++++++++++++++-- 2 files changed, 108 insertions(+), 3 deletions(-) diff --git a/ddns/__main__.py b/ddns/__main__.py index 913ba45f2..f7069f5fa 100755 --- a/ddns/__main__.py +++ b/ddns/__main__.py @@ -43,6 +43,46 @@ def get_ip(ip_type, rules): logger.error("Failed to get %s address: %s", ip_type, e) return None +def update_ip_all(dns, cache, config): + # type: (SimpleProvider, Cache | None, list[str]|bool, list[str], str, Config) -> bool | None + """ + 更新IP并变更A/AAAA记录 + """ + domains = config.ipv4 + config.ipv6 + if not domains: + return None + + record_type = "A/AAAA" + address_4 = get_ip('4', config.index4) + address_6 = get_ip('6', config.index6) + address = ",".join(filter(None,[address_4,address_6])) + if not address: + logger.error("Fail to get %s ipv4 or ipv6 address!") + return False + + update_success = False + + for domain in domains: + domain = domain.lower() + cache_key = "{}:{}".format(domain, record_type) + if cache and cache.get(cache_key) == address: + logger.info("%s[%s] address not changed, using cache: %s", domain, record_type, address) + update_success = True + else: + try: + result = dns.set_record( + domain, address, record_type=record_type, ttl=config.ttl, line=config.line, **config.extra + ) + if result: + logger.warning("set %s: %s successfully.", domain, address) + update_success = True + if isinstance(cache, dict): + cache[cache_key] = address + else: + logger.error("Failed to update %s record for %s", record_type, domain) + except Exception as e: + logger.exception("Failed to update %s record for %s: %s", record_type, domain, e) + return update_success def update_ip(dns, cache, index_rule, domains, record_type, config): # type: (SimpleProvider, Cache | None, list[str]|bool, list[str], str, Config) -> bool | None @@ -97,6 +137,10 @@ def run(config): config.id, config.token, endpoint=config.endpoint, logger=logger, proxy=config.proxy, ssl=config.ssl ) cache = Cache.new(config.cache, config.md5(), logger) + + if(config.dns == "aliesa"): + return update_ip_all(dns, cache, config) + return ( update_ip(dns, cache, config.index4, config.ipv4, "A", config) is not False and update_ip(dns, cache, config.index6, config.ipv6, "AAAA", config) is not False diff --git a/ddns/provider/aliesa.py b/ddns/provider/aliesa.py index 647228f2c..9bd450d4b 100644 --- a/ddns/provider/aliesa.py +++ b/ddns/provider/aliesa.py @@ -6,6 +6,7 @@ """ from time import strftime +import ipaddress from ._base import TYPE_JSON, join_domain from .alidns import AliBaseProvider @@ -49,7 +50,8 @@ def _query_record(self, zone_id, subdomain, main_domain, record_type, line, extr action="ListRecords", SiteId=int(zone_id), RecordName=full_domain, - Type=self._get_type(record_type), + # AliESA 只有 A/AAAA 记录类型表示 有不确定数量的IPv4 和 不确定数量的IPv6 + Type='A/AAAA', RecordMatchType="exact", # 精确匹配 PageSize=100, ) @@ -91,6 +93,60 @@ def _create_record(self, zone_id, subdomain, main_domain, value, record_type, tt self.logger.error("Failed to create record: %s", data) return False + + def extract_unique_ip_addresses(self,input_string): + """ + 从逗号分隔的字符串中提取第一个有效的 IPv4 地址和第一个有效的 IPv6 地址。 + + :param input_string: 包含潜在 IP 地址的字符串 + :return: 一个元组 (first_ipv4, first_ipv6),如果未找到则对应位置为空字符串 + """ + # 默认为空字符串方便拼接 + first_ipv4 = "" + first_ipv6 = "" + + # 1. 按逗号分割字符串,并去除每个部分前后的空白字符 + candidates = [part.strip() for part in input_string.split(',')] + + # 2. 遍历候选列表 + for candidate in candidates: + # 跳过空字符串 + if not candidate: + continue + + # 3. 尝试将候选者解析为 IP 地址 + try: + ip_obj = ipaddress.ip_address(candidate) + + # 4. 判断 IP 地址类型,并记录第一个出现的地址 + if isinstance(ip_obj, ipaddress.IPv4Address) and first_ipv4 == "": + first_ipv4 = str(ip_obj) + elif isinstance(ip_obj, ipaddress.IPv6Address) and first_ipv6 == "": + first_ipv6 = str(ip_obj) + + # 5. 如果两个地址都找到了,可以提前退出循环以提高效率 + if first_ipv4 and first_ipv6 : + break + + except ValueError: + # 如果解析失败,说明不是有效的 IP 地址,继续检查下一个 + continue + + # 6. 返回结果元组 + return first_ipv4, first_ipv6 + + def record_final(self, old_record , new_record): + # type: (str, str) -> str + """ + 获取最终结果 只保留一个ipv4和一个ipv6 + """ + ipv4old, ipv6old = self.extract_unique_ip_addresses(old_record) + ipv4new, ipv6new = self.extract_unique_ip_addresses(new_record) + ipv4final,ipv6final = self.extract_unique_ip_addresses(ipv4new+","+ipv6new+","+ipv4old+","+ipv6old) + + final = ipv4final+","+ipv6final + # // 去除可能的开头的逗号 + return final.lstrip(',') def _update_record(self, zone_id, old_record, value, record_type, ttl, line, extra): # type: (str, dict, str, str, int | str | None, str | None, dict) -> bool @@ -98,9 +154,14 @@ def _update_record(self, zone_id, old_record, value, record_type, ttl, line, ext 更新DNS记录 https://help.aliyun.com/zh/edge-security-acceleration/esa/api-esa-2024-09-10-updaterecord """ + # EAS只有A/AAAA记录并且可能包含多个ip,如果当前域名只配置ipV4或ipV6类型, 则只更新对应部分,然后另一种记录不变 + # ESA可以配置多个Ip作为回源ip, 所以要提取第一个ipv4地址和ipv6地址(DDNS场景下不可能有多个ip) + # 获取最终结果 只保留一个ipv4和一个ipv6 + final_value = self.record_final(old_record.get("Data", {}).get("Value"), value) + # 检查是否需要更新 if ( - old_record.get("Data", {}).get("Value") == value + old_record.get("Data", {}).get("Value") == final_value and old_record.get("RecordType") == self._get_type(record_type) and (not ttl or old_record.get("Ttl") == ttl) ): @@ -113,7 +174,7 @@ def _update_record(self, zone_id, old_record, value, record_type, ttl, line, ext method="POST", action="UpdateRecord", RecordId=old_record.get("RecordId"), - Data={"Value": value}, + Data={"Value": final_value}, Ttl=ttl, **extra ) # fmt: skip