-
Notifications
You must be signed in to change notification settings - Fork 678
Expand file tree
/
Copy pathalidns.py
More file actions
183 lines (160 loc) · 5.65 KB
/
alidns.py
File metadata and controls
183 lines (160 loc) · 5.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# coding=utf-8
"""
AliDNS API
阿里DNS解析操作库
https://help.aliyun.com/document_detail/29739.html
@author: New Future
"""
from hashlib import sha1
from hmac import new as hmac
from uuid import uuid4
from base64 import b64encode
from json import loads as jsondecode
from logging import debug, info, warning
from datetime import datetime
try: # python 3
from http.client import HTTPSConnection
from urllib.parse import urlencode, quote_plus, quote
except ImportError: # python 2
from httplib import HTTPSConnection
from urllib import urlencode, quote_plus, quote
__author__ = 'New Future'
# __all__ = ["request", "ID", "TOKEN", "PROXY"]
class Config:
ID = "id"
TOKEN = "TOKEN"
PROXY = None # 代理设置
TTL = None
class API:
# API 配置
SITE = "alidns.aliyuncs.com" # API endpoint
METHOD = "POST" # 请求方法
def signature(params):
"""
计算签名,返回签名后的查询参数
"""
params.update({
'Format': 'json',
'Version': '2015-01-09',
'AccessKeyId': Config.ID,
'Timestamp': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ'),
'SignatureMethod': 'HMAC-SHA1',
'SignatureNonce': uuid4(),
'SignatureVersion': "1.0",
})
query = urlencode(sorted(params.items()))
query = query.replace('+', '%20')
debug(query)
sign = API.METHOD + "&" + quote_plus("/") + "&" + quote(query, safe='')
debug("signString: %s", sign)
sign = hmac((Config.TOKEN + "&").encode('utf-8'),
sign.encode('utf-8'), sha1).digest()
sign = b64encode(sign).strip()
params["Signature"] = sign
return params
def request(param=None, **params):
"""
发送请求数据
"""
if param:
params.update(param)
params = dict((k, params[k]) for k in params if params[k] is not None)
params = signature(params)
info("%s: %s", API.SITE, params)
if Config.PROXY:
conn = HTTPSConnection(Config.PROXY)
conn.set_tunnel(API.SITE, 443)
else:
conn = HTTPSConnection(API.SITE)
conn.request(API.METHOD, '/', urlencode(params),
{"Content-type": "application/x-www-form-urlencoded"})
response = conn.getresponse()
data = response.read().decode('utf8')
conn.close()
if response.status < 200 or response.status >= 300:
warning('%s : error[%d]: %s', params['Action'], response.status, data)
raise Exception(data)
else:
data = jsondecode(data)
debug('%s : result:%s', params['Action'], data)
return data
def get_domain_info(domain):
"""
切割域名获取主域名和对应ID
https://help.aliyun.com/document_detail/29755.html
http://alidns.aliyuncs.com/?Action=GetMainDomainName&InputString=www.example.com
"""
res = request(Action="GetMainDomainName", InputString=domain)
sub, main = res.get('RR'), res.get('DomainName')
return sub, main
def get_records(domain, **conditions):
"""
获取记录ID
返回满足条件的所有记录[]
https://help.aliyun.com/document_detail/29776.html
TODO 大于500翻页
"""
if not hasattr(get_records, "records"):
get_records.records = {} # "静态变量"存储已查询过的id
get_records.keys = ("RecordId", "RR", "Type", "Line",
"Locked", "Status", "Priority", "Value")
if domain not in get_records.records:
get_records.records[domain] = {}
data = request(Action="DescribeDomainRecords",
DomainName=domain, PageSize=500)
if data:
for record in data.get('DomainRecords').get('Record'):
get_records.records[domain][record["RecordId"]] = {
k: v for (k, v) in record.items() if k in get_records.keys}
records = {}
for (rid, record) in get_records.records[domain].items():
for (k, value) in conditions.items():
if record.get(k) != value:
break
else: # for else push
records[rid] = record
return records
def update_record(domain, value, record_type='A'):
"""
更新记录
update
https://help.aliyun.com/document_detail/29774.html
add
https://help.aliyun.com/document_detail/29772.html?
"""
debug(">>>>>%s(%s)", domain, record_type)
sub, main = get_domain_info(domain)
if not sub:
raise Exception("invalid domain: [ %s ] " % domain)
records = get_records(main, RR=sub, Type=record_type)
result = {}
if records:
for (rid, record) in records.items():
if record["Value"] != value:
debug(sub, record)
res = request(Action="UpdateDomainRecord", RecordId=rid,
Value=value, RR=sub, Type=record_type, TTL=Config.TTL)
if res:
# update records
get_records.records[main][rid]["Value"] = value
result[rid] = res
else:
result[rid] = "update fail!\n" + str(res)
else:
result[rid] = domain
else: # https://help.aliyun.com/document_detail/29772.html
res = request(Action="AddDomainRecord", DomainName=main,
Value=value, RR=sub, Type=record_type, TTL=Config.TTL)
if res:
# update records INFO
rid = res.get('RecordId')
get_records.records[main][rid] = {
'Value': value,
"RecordId": rid,
"RR": sub,
"Type": record_type
}
result = res
else:
result = domain + " created fail!"
return result