Skip to content

Commit 8eef62e

Browse files
authored
Merge pull request #661 from YoungHypo/issue-anchor_peer
Implemented anchor peer configuration
2 parents a177906 + 3d88da4 commit 8eef62e

File tree

4 files changed

+266
-33
lines changed

4 files changed

+266
-33
lines changed

src/api-engine/api/lib/configtxlator/configtxlator.py

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
from subprocess import call, run
55
from api.config import FABRIC_TOOL, FABRIC_VERSION
66

7+
import logging
8+
LOG = logging.getLogger(__name__)
79

810
class ConfigTxLator:
911
"""
@@ -24,17 +26,21 @@ def proto_encode(self, input, type, output):
2426
output: A file to write the output to.
2527
"""
2628
try:
27-
call([self.configtxlator,
28-
"proto_encode",
29-
"--input={}".format(input),
30-
"--type={}".format(type),
31-
"--output={}".format(output),
32-
])
29+
command = [self.configtxlator,
30+
"proto_encode",
31+
"--input={}".format(input),
32+
"--type={}".format(type),
33+
"--output={}".format(output),
34+
]
35+
36+
LOG.info(" ".join(command))
37+
38+
call(command)
3339
except Exception as e:
3440
err_msg = "configtxlator proto decode fail! "
3541
raise Exception(err_msg + str(e))
3642

37-
def proto_decode(self, input, type):
43+
def proto_decode(self, input, type, output):
3844
"""
3945
Converts a proto message to JSON.
4046
@@ -45,16 +51,17 @@ def proto_decode(self, input, type):
4551
config
4652
"""
4753
try:
48-
res = run([self.configtxlator,
54+
command = [self.configtxlator,
4955
"proto_decode",
5056
"--type={}".format(type),
5157
"--input={}".format(input),
52-
],
53-
capture_output=True)
54-
if res.returncode == 0 :
55-
return res.stdout
56-
else:
57-
return res.stderr
58+
"--output={}".format(output),
59+
]
60+
61+
LOG.info(" ".join(command))
62+
63+
call(command)
64+
5865
except Exception as e:
5966
err_msg = "configtxlator proto decode fail! "
6067
raise Exception(err_msg + str(e))
@@ -71,13 +78,17 @@ def compute_update(self, original, updated, channel_id, output):
7178
output: A file to write the JSON document to.
7279
"""
7380
try:
74-
call([self.configtxlator,
75-
"compute_update",
76-
"--original={}".format(original),
77-
"--updated={}".format(updated),
78-
"--channel_id={}".format(channel_id),
79-
"--output={}".format(output),
80-
])
81+
command = [self.configtxlator,
82+
"compute_update",
83+
"--original={}".format(original),
84+
"--updated={}".format(updated),
85+
"--channel_id={}".format(channel_id),
86+
"--output={}".format(output),
87+
]
88+
89+
LOG.info(" ".join(command))
90+
91+
call(command)
8192
except Exception as e:
8293
err_msg = "configtxlator compute update fail! "
8394
raise Exception(err_msg + str(e))

src/api-engine/api/lib/peer/channel.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,12 +87,25 @@ def update(self, channel, channel_tx, orderer_url):
8787
orderer_url: Ordering service endpoint.
8888
"""
8989
try:
90-
res = os.system("{} channel update -c {} -f {} -o {}"
91-
.format(self.peer, channel, channel_tx, orderer_url))
90+
ORDERER_CA = os.getenv("ORDERER_CA")
91+
92+
command = [
93+
self.peer,
94+
"channel", "update",
95+
"-f", channel_tx,
96+
"-c", channel,
97+
"-o", orderer_url,
98+
"--ordererTLSHostnameOverride", orderer_url.split(":")[0],
99+
"--tls",
100+
"--cafile", ORDERER_CA
101+
]
102+
LOG.info(" ".join(command))
103+
104+
res = subprocess.run(command, check=True)
105+
92106
except Exception as e:
93107
err_msg = "update channel failed for {}!".format(e)
94108
raise Exception(err_msg)
95-
res = res >> 8
96109
return res
97110

98111
def fetch(self, block_path, channel, orderer_general_url, max_retries=5, retry_interval=1):

src/api-engine/api/routes/channel/views.py

Lines changed: 109 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
from api.config import CELLO_HOME
2020
from api.common.serializers import PageQuerySerializer
21-
from api.utils.common import with_common_response, parse_block_file, to_dict
21+
from api.utils.common import with_common_response, parse_block_file, to_dict, json_filter, json_add_anchor_peer, json_create_envelope
2222
from api.lib.configtxgen import ConfigTX, ConfigTxGen
2323
from api.lib.peer.channel import Channel as PeerChannel
2424
from api.lib.configtxlator.configtxlator import ConfigTxLator
@@ -147,7 +147,8 @@ def create(self, request):
147147
peer_channel_join(name, peers, org)
148148

149149
# set anchor peer
150-
set_anchor_peer(name, org, peers, ordering_node)
150+
anchor_peer = Node.objects.get(id=peers[0])
151+
set_anchor_peer(name, org, anchor_peer, ordering_node)
151152

152153
# save channel to db
153154
channel = Channel(
@@ -425,31 +426,130 @@ def peer_channel_join(name, peers, org):
425426
CELLO_HOME, org.network.name, name)
426427
)
427428

428-
def set_anchor_peer(name, org, peers, ordering_node):
429+
def set_anchor_peer(name, org, anchor_peer, ordering_node):
429430
"""
430431
Set anchor peer for the channel.
431432
:param org: Organization object.
432-
:param peers: list of Node objects
433+
:param anchor_peer: Anchor peer node
434+
:param ordering_node: Orderer node
433435
:return: none
434436
"""
435-
peer_channel_fetch(name, org, peers, ordering_node)
437+
org_msp = '{}'.format(org.name.split(".", 1)[0].capitalize())
438+
channel_artifacts_path = "{}/{}".format(CELLO_HOME, org.network.name)
439+
440+
# Fetch the channel block from the orderer
441+
peer_channel_fetch(name, org, anchor_peer, ordering_node)
442+
443+
# Decode block to JSON
444+
ConfigTxLator().proto_decode(
445+
input="{}/config_block.pb".format(channel_artifacts_path),
446+
type="common.Block",
447+
output="{}/config_block.json".format(channel_artifacts_path),
448+
)
449+
450+
# Get the config data from the block
451+
json_filter(
452+
input="{}/config_block.json".format(channel_artifacts_path),
453+
output="{}/config.json".format(channel_artifacts_path),
454+
expression=".data.data[0].payload.data.config"
455+
)
436456

457+
# add anchor peer config
458+
anchor_peer_config = {
459+
"AnchorPeers": {
460+
"mod_policy": "Admins",
461+
"value": {
462+
"anchor_peers": [
463+
{
464+
"host": anchor_peer.name + "." + org.name,
465+
"port": 7051
466+
}
467+
]
468+
},
469+
"version": 0
470+
}
471+
}
472+
473+
json_add_anchor_peer(
474+
input="{}/config.json".format(channel_artifacts_path),
475+
output="{}/modified_config.json".format(channel_artifacts_path),
476+
anchor_peer_config=anchor_peer_config,
477+
org_msp=org_msp
478+
)
479+
480+
ConfigTxLator().proto_encode(
481+
input="{}/config.json".format(channel_artifacts_path),
482+
type="common.Config",
483+
output="{}/config.pb".format(channel_artifacts_path),
484+
)
485+
486+
ConfigTxLator().proto_encode(
487+
input="{}/modified_config.json".format(channel_artifacts_path),
488+
type="common.Config",
489+
output="{}/modified_config.pb".format(channel_artifacts_path),
490+
)
491+
492+
ConfigTxLator().compute_update(
493+
original="{}/config.pb".format(channel_artifacts_path),
494+
updated="{}/modified_config.pb".format(channel_artifacts_path),
495+
channel_id=name,
496+
output="{}/config_update.pb".format(channel_artifacts_path),
497+
)
498+
499+
ConfigTxLator().proto_decode(
500+
input="{}/config_update.pb".format(channel_artifacts_path),
501+
type="common.ConfigUpdate",
502+
output="{}/config_update.json".format(channel_artifacts_path),
503+
)
437504

438-
def peer_channel_fetch(name, org, peers, ordering_node):
505+
# Create config update envelope
506+
json_create_envelope(
507+
input="{}/config_update.json".format(channel_artifacts_path),
508+
output="{}/config_update_in_envelope.json".format(channel_artifacts_path),
509+
channel=name
510+
)
511+
512+
ConfigTxLator().proto_encode(
513+
input="{}/config_update_in_envelope.json".format(channel_artifacts_path),
514+
type="common.Envelope",
515+
output="{}/config_update_in_envelope.pb".format(channel_artifacts_path),
516+
)
517+
518+
# Update the channel of anchor peer
519+
peer_channel_update(name, org, anchor_peer, ordering_node, channel_artifacts_path)
520+
521+
522+
def peer_channel_fetch(name, org, anchor_peer, ordering_node):
439523
"""
440524
Fetch the channel block from the orderer.
441-
:param peers: list of Node objects
525+
:param anchor_peer: Anchor peer node
442526
:param org: Organization object.
443527
:param channel_name: Name of the channel.
444528
:return: none
445529
"""
446-
peer_node = Node.objects.get(id=peers[0])
447-
envs = init_env_vars(peer_node, org)
530+
envs = init_env_vars(anchor_peer, org)
448531
peer_channel_cli = PeerChannel(**envs)
449532
peer_channel_cli.fetch(block_path="{}/{}/config_block.pb".format(CELLO_HOME, org.network.name),
450533
channel=name, orderer_general_url="{}.{}:{}".format(
451534
ordering_node.name, org.name.split(".", 1)[1], str(7050)))
452535

536+
def peer_channel_update(name, org, anchor_peer, ordering_node, channel_artifacts_path):
537+
"""
538+
Update the channel.
539+
:param anchor_peer: Anchor peer node
540+
:param org: Organization object.
541+
:param channel_name: Name of the channel.
542+
:return: none
543+
"""
544+
envs = init_env_vars(anchor_peer, org)
545+
peer_channel_cli = PeerChannel(**envs)
546+
peer_channel_cli.update(
547+
channel=name,
548+
channel_tx="{}/config_update_in_envelope.pb".format(channel_artifacts_path),
549+
orderer_url="{}.{}:{}".format(
550+
ordering_node.name, org.name.split(".", 1)[1], str(7050)),
551+
)
552+
453553

454554
def init_env_vars(node, org):
455555
"""

src/api-engine/api/utils/common.py

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@
1313
import uuid
1414
from zipfile import ZipFile
1515
from json import loads
16+
import json
17+
import logging
1618

19+
LOG = logging.getLogger(__name__)
1720

1821
def make_uuid():
1922
return str(uuid.uuid4())
@@ -153,3 +156,109 @@ def parse_block_file(data):
153156

154157
def to_dict(data):
155158
return loads(data)
159+
160+
161+
def json_filter(input, output, expression):
162+
"""
163+
Process JSON data using path expression similar to jq
164+
165+
Args:
166+
input (str): JSON data or file path to JSON
167+
output (str): Path expression like ".data.data[0].payload.data.config"
168+
169+
Returns:
170+
dict: Processed JSON data
171+
"""
172+
# if json_data is a file path, read the file
173+
if isinstance(input, str):
174+
with open(input, 'r', encoding='utf-8') as f:
175+
data = json.load(f)
176+
else:
177+
data = input
178+
179+
# parse the path expression
180+
path_parts = expression.strip('.').split('.')
181+
result = data
182+
183+
for part in path_parts:
184+
# handle array index, like data[0]
185+
if '[' in part and ']' in part:
186+
array_name = part.split('[')[0]
187+
index = int(part.split('[')[1].split(']')[0])
188+
result = result[array_name][index]
189+
else:
190+
result = result[part]
191+
192+
with open(output, 'w', encoding='utf-8') as f:
193+
json.dump(result, f, sort_keys=False, indent=4)
194+
195+
LOG.info("jq {} {} -> {}".format(expression, input, output))
196+
197+
def json_add_anchor_peer(input, output, anchor_peer_config, org_msp):
198+
"""
199+
Add anchor peer to the organization
200+
201+
Args:
202+
input (str): JSON data or file path to JSON
203+
output (str): Path expression like ".data.data[0].payload.data.config"
204+
expression (str): Anchor peer data
205+
"""
206+
# if json_data is a file path, read the file
207+
if isinstance(input, str):
208+
with open(input, 'r', encoding='utf-8') as f:
209+
data = json.load(f)
210+
else:
211+
data = input
212+
213+
if "groups" not in data["channel_group"]:
214+
data["channel_group"]["groups"] = {}
215+
if "Application" not in data["channel_group"]["groups"]:
216+
data["channel_group"]["groups"]["Application"] = {"groups": {}}
217+
if org_msp not in data["channel_group"]["groups"]["Application"]["groups"]:
218+
data["channel_group"]["groups"]["Application"]["groups"][org_msp] = {"values": {}}
219+
220+
data["channel_group"]["groups"]["Application"]["groups"][org_msp]["values"].update(anchor_peer_config)
221+
222+
with open(output, 'w', encoding='utf-8') as f:
223+
json.dump(data, f, sort_keys=False, indent=4)
224+
225+
LOG.info("jq '.channel_group.groups.Application.groups.Org1MSP.values += ... ' {} -> {}".format(input, output))
226+
227+
def json_create_envelope(input, output, channel):
228+
"""
229+
Create a config update envelope structure
230+
231+
Args:
232+
input (str): Path to the config update JSON file
233+
output (str): Path to save the envelope JSON
234+
channel (str): Name of the channel
235+
"""
236+
try:
237+
# Read the config update file
238+
with open(input, 'r', encoding='utf-8') as f:
239+
config_update = json.load(f)
240+
241+
# Create the envelope structure
242+
envelope = {
243+
"payload": {
244+
"header": {
245+
"channel_header": {
246+
"channel_id": channel,
247+
"type": 2
248+
}
249+
},
250+
"data": {
251+
"config_update": config_update
252+
}
253+
}
254+
}
255+
256+
# Write the envelope to output file
257+
with open(output, 'w', encoding='utf-8') as f:
258+
json.dump(envelope, f, sort_keys=False, indent=4)
259+
260+
LOG.info("echo 'payload ... ' | jq . > {}".format(output))
261+
262+
except Exception as e:
263+
LOG.error("Failed to create config update envelope: {}".format(str(e)))
264+
raise

0 commit comments

Comments
 (0)