Skip to content

Commit ff1088c

Browse files
committed
MINIFICPP-2688 Move site-to-site tests to modular docker tests
1 parent 0957574 commit ff1088c

File tree

14 files changed

+496
-182
lines changed

14 files changed

+496
-182
lines changed

behave_framework/src/minifi_test_framework/containers/nifi_container.py

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,27 @@
1515

1616
import io
1717
import gzip
18-
from typing import List, Optional
18+
import logging
19+
import os
20+
from pathlib import Path
21+
from OpenSSL import crypto
22+
1923
from minifi_test_framework.containers.file import File
2024
from minifi_test_framework.containers.container import Container
2125
from minifi_test_framework.core.helpers import wait_for_condition
2226
from minifi_test_framework.core.minifi_test_context import MinifiTestContext
2327
from minifi_test_framework.minifi.nifi_flow_definition import NifiFlowDefinition
28+
from minifi_test_framework.containers.host_file import HostFile
29+
from minifi_test_framework.core.ssl_utils import make_server_cert
2430

2531

2632
class NifiContainer(Container):
27-
NIFI_VERSION = '2.7.2'
28-
29-
def __init__(self, test_context: MinifiTestContext, command: Optional[List[str]] = None, use_ssl: bool = False):
33+
def __init__(self, test_context: MinifiTestContext, command: list[str] | None = None, use_ssl: bool = False):
3034
self.flow_definition = NifiFlowDefinition()
3135
name = f"nifi-{test_context.scenario_id}"
3236
if use_ssl:
33-
entry_command = (r"sed -i -e 's/^\(nifi.remote.input.host\)=.*/\1={name}/' "
37+
entry_command = (r"/scripts/convert_cert_to_jks.sh /tmp/resources /tmp/resources/nifi_client.key /tmp/resources/nifi_client.crt /tmp/resources/root_ca.crt &&"
38+
r"sed -i -e 's/^\(nifi.remote.input.host\)=.*/\1={name}/' "
3439
r"-e 's/^\(nifi.remote.input.secure\)=.*/\1=true/' "
3540
r"-e 's/^\(nifi.sensitive.props.key\)=.*/\1=secret_key_12345/' "
3641
r"-e 's/^\(nifi.web.https.port\)=.*/\1=8443/' "
@@ -68,10 +73,18 @@ def __init__(self, test_context: MinifiTestContext, command: Optional[List[str]]
6873
if not command:
6974
command = ["/bin/sh", "-c", entry_command]
7075

71-
super().__init__("apache/nifi:" + self.NIFI_VERSION, name, test_context.network, entrypoint=command)
76+
super().__init__("apache/nifi:" + NifiFlowDefinition.NIFI_VERSION, name, test_context.network, entrypoint=command)
77+
resource_dir = Path(__file__).resolve().parent / "resources" / "nifi"
78+
self.host_files.append(HostFile("/scripts/convert_cert_to_jks.sh", os.path.join(resource_dir, "convert_cert_to_jks.sh")))
79+
80+
nifi_client_cert, nifi_client_key = make_server_cert(common_name=f"nifi-{test_context.scenario_id}", ca_cert=test_context.root_ca_cert, ca_key=test_context.root_ca_key)
81+
self.files.append(File("/tmp/resources/root_ca.crt", crypto.dump_certificate(type=crypto.FILETYPE_PEM, cert=test_context.root_ca_cert)))
82+
self.files.append(File("/tmp/resources/nifi_client.crt", crypto.dump_certificate(type=crypto.FILETYPE_PEM, cert=nifi_client_cert)))
83+
self.files.append(File("/tmp/resources/nifi_client.key", crypto.dump_privatekey(type=crypto.FILETYPE_PEM, pkey=nifi_client_key)))
7284

7385
def deploy(self):
7486
flow_config = self.flow_definition.to_json()
87+
logging.info(f"Deploying NiFi container '{self.container_name}' with flow configuration:\n{flow_config}")
7588
buffer = io.BytesIO()
7689

7790
with gzip.GzipFile(fileobj=buffer, mode='wb') as gz_file:
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
#!/bin/bash
2+
set -euo pipefail
3+
4+
# Usage: ./create_jks.sh <base directory> <ssl_key_path> <ssl_cert_path> <ca_cert_path>
5+
6+
DIR=$1
7+
SSL_KEY_PATH=$2
8+
SSL_CERT_PATH=$3
9+
CA_CERT_PATH=$4
10+
11+
KEYSTORE="$DIR/keystore.jks"
12+
TRUSTSTORE="$DIR/truststore.jks"
13+
PKCS12_FILE="$DIR/keystore.p12"
14+
PASSWORD="passw0rd1!"
15+
16+
cat "${CA_CERT_PATH}" >> "${SSL_CERT_PATH}"
17+
18+
if [ ! -d "$DIR" ]; then
19+
mkdir -p "$DIR"
20+
fi
21+
22+
openssl pkcs12 -export \
23+
-inkey "$SSL_KEY_PATH" \
24+
-in "$SSL_CERT_PATH" \
25+
-name "nifi-key" \
26+
-out "$PKCS12_FILE" \
27+
-password pass:$PASSWORD
28+
29+
keytool -importkeystore \
30+
-destkeystore "$KEYSTORE" \
31+
-deststoretype jks \
32+
-destalias nifi-key \
33+
-srckeystore "$PKCS12_FILE" \
34+
-srcstoretype pkcs12 \
35+
-srcalias "nifi-key" \
36+
-storepass "$PASSWORD" \
37+
-srcstorepass "$PASSWORD" \
38+
-noprompt
39+
40+
keytool -importcert \
41+
-alias "nifi-cert" \
42+
-file "$CA_CERT_PATH" \
43+
-keystore "$TRUSTSTORE" \
44+
-storepass "$PASSWORD" \
45+
-noprompt

behave_framework/src/minifi_test_framework/minifi/connection.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ def __init__(self, source_name: str, source_relationship: str, target_name: str)
2424
self.source_name: str = source_name
2525
self.source_relationship: str = source_relationship
2626
self.target_name: str = target_name
27+
self.drop_empty_flowfiles: bool = False
2728

2829
def __repr__(self):
2930
return f"({self.source_name}:{self.source_relationship} --> {self.target_name})"

behave_framework/src/minifi_test_framework/minifi/flow_definition.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
from .funnel import Funnel
2323
from .parameter_context import ParameterContext
2424
from .processor import Processor
25+
from .remote_process_group import RemoteProcessGroup
26+
from .input_port import InputPort
27+
from .output_port import OutputPort
2528

2629

2730
class FlowDefinition(ABC):
@@ -32,10 +35,59 @@ def __init__(self, flow_name: str = "MiNiFi Flow"):
3235
self.funnels: list[Funnel] = []
3336
self.connections: list[Connection] = []
3437
self.parameter_contexts: list[ParameterContext] = []
38+
self.remote_process_groups: list[RemoteProcessGroup] = []
39+
self.input_ports: list[InputPort] = []
40+
self.output_ports: list[OutputPort] = []
3541

3642
def add_processor(self, processor: Processor):
3743
self.processors.append(processor)
3844

45+
def add_remote_process_group(self, address: str, name: str, protocol: str = "RAW"):
46+
rpg = RemoteProcessGroup(name, address, protocol)
47+
self.remote_process_groups.append(rpg)
48+
49+
def add_input_port_to_rpg(self, rpg_name: str, port_name: str, use_compression: bool = False):
50+
rpg = next((rpg for rpg in self.remote_process_groups if rpg.name == rpg_name), None)
51+
if rpg:
52+
rpg.add_input_port(port_name, use_compression)
53+
else:
54+
raise ValueError(f"RemoteProcessGroup with name '{rpg_name}' not found.")
55+
56+
def add_output_port_to_rpg(self, rpg_name: str, port_name: str, use_compression: bool = False):
57+
rpg = next((rpg for rpg in self.remote_process_groups if rpg.name == rpg_name), None)
58+
if rpg:
59+
rpg.add_output_port(port_name, use_compression)
60+
else:
61+
raise ValueError(f"RemoteProcessGroup with name '{rpg_name}' not found.")
62+
63+
def get_input_port_id_of_rpg(self, rpg_name: str, rpg_port_name: str) -> str:
64+
rpg = next((rpg for rpg in self.remote_process_groups if rpg.name == rpg_name), None)
65+
if rpg:
66+
port = next((port for port in rpg.input_ports if port.name == rpg_port_name), None)
67+
if port:
68+
return port.id
69+
else:
70+
raise ValueError(f"InputPort with name '{rpg_port_name}' not found in RPG '{rpg_name}'.")
71+
else:
72+
raise ValueError(f"RemoteProcessGroup with name '{rpg_name}' not found.")
73+
74+
def get_output_port_id_of_rpg(self, rpg_name: str, rpg_port_name: str) -> str:
75+
rpg = next((rpg for rpg in self.remote_process_groups if rpg.name == rpg_name), None)
76+
if rpg:
77+
port = next((port for port in rpg.output_ports if port.name == rpg_port_name), None)
78+
if port:
79+
return port.id
80+
else:
81+
raise ValueError(f"OutputPort with name '{rpg_port_name}' not found in RPG '{rpg_name}'.")
82+
else:
83+
raise ValueError(f"RemoteProcessGroup with name '{rpg_name}' not found.")
84+
85+
def add_input_port(self, input_port_id: str, input_port_name: str):
86+
self.input_ports.append(InputPort(input_port_name, input_port_id))
87+
88+
def add_output_port(self, output_port_id: str, output_port_name: str):
89+
self.output_ports.append(OutputPort(output_port_name, output_port_id))
90+
3991
def get_processor(self, processor_name: str) -> Processor | None:
4092
return next((proc for proc in self.processors if proc.name == processor_name), None)
4193

@@ -46,12 +98,20 @@ def get_parameter_context(self, parameter_context_name: str) -> ParameterContext
4698
return next((parameter_context for parameter_context in self.parameter_contexts if
4799
parameter_context.name == parameter_context_name), None)
48100

101+
def get_remote_process_group(self, rpg_name: str) -> RemoteProcessGroup | None:
102+
return next((rpg for rpg in self.remote_process_groups if rpg.name == rpg_name), None)
103+
49104
def add_funnel(self, funnel: Funnel):
50105
self.funnels.append(funnel)
51106

52107
def add_connection(self, connection: Connection):
53108
self.connections.append(connection)
54109

110+
def set_drop_empty_for_destination(self, destination_name: str):
111+
for connection in self.connections:
112+
if connection.target_name == destination_name:
113+
connection.drop_empty_flowfiles = True
114+
55115
def to_yaml(self) -> str:
56116
raise NotImplementedError("to_yaml method must be implemented in subclasses")
57117

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
class InputPort:
19+
def __init__(self, name: str, id: str):
20+
self.id: str = id
21+
self.name: str = name
22+
23+
def to_json_dict(self):
24+
data = {
25+
"identifier": self.id,
26+
"instanceIdentifier": self.id,
27+
"name": self.name,
28+
"comments": "",
29+
"position": {
30+
"x": 0,
31+
"y": 0
32+
},
33+
"type": "INPUT_PORT",
34+
"concurrentlySchedulableTaskCount": 1,
35+
"scheduledState": "RUNNING",
36+
"allowRemoteAccess": True,
37+
"portFunction": "STANDARD",
38+
"componentType": "INPUT_PORT",
39+
"groupIdentifier": "9802c873-3322-3b60-a71d-732d02bd60f8"
40+
}
41+
return data

behave_framework/src/minifi_test_framework/minifi/minifi_flow_definition.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,10 @@ def to_yaml(self) -> str:
3131
# This is crucial for finding the source/destination IDs for connections
3232
processors_by_name = {p.name: p for p in self.processors}
3333
funnels_by_name = {f.name: f for f in self.funnels}
34+
remote_input_ports_by_name = {port.name: port for rpg in self.remote_process_groups for port in rpg.input_ports}
35+
remote_output_ports_by_name = {port.name: port for rpg in self.remote_process_groups for port in rpg.output_ports}
3436

35-
connectables_by_name = {**processors_by_name, **funnels_by_name}
37+
connectables_by_name = {**processors_by_name, **funnels_by_name, **remote_input_ports_by_name, **remote_output_ports_by_name}
3638

3739
if len(self.parameter_contexts) > 0:
3840
parameter_context_name = self.parameter_contexts[0].name
@@ -44,7 +46,8 @@ def to_yaml(self) -> str:
4446
'Processors': [p.to_yaml_dict() for p in self.processors],
4547
'Funnels': [f.to_yaml_dict() for f in self.funnels], 'Connections': [],
4648
'Controller Services': [c.to_yaml_dict() for c in self.controller_services],
47-
'Remote Processing Groups': [], 'Parameter Context Name': parameter_context_name}
49+
'Remote Processing Groups': [rpg.to_yaml_dict() for rpg in self.remote_process_groups],
50+
'Parameter Context Name': parameter_context_name}
4851

4952
# Build the connections list by looking up processor IDs
5053
for conn in self.connections:
@@ -58,6 +61,6 @@ def to_yaml(self) -> str:
5861
config['Connections'].append(
5962
{'name': f"{conn.source_name}/{conn.source_relationship}/{conn.target_name}", 'id': conn.id,
6063
'source id': source_proc.id, 'source relationship name': conn.source_relationship,
61-
'destination id': dest_proc.id})
64+
'destination id': dest_proc.id, "drop empty": conn.drop_empty_flowfiles})
6265

6366
return yaml.dump(config, sort_keys=False, indent=2, width=120)

behave_framework/src/minifi_test_framework/minifi/nifi_flow_definition.py

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222

2323
class NifiFlowDefinition(FlowDefinition):
24-
NIFI_VERSION: str = '2.2.0'
24+
NIFI_VERSION: str = '2.7.2'
2525

2626
def __init__(self, flow_name: str = "NiFi Flow"):
2727
super().__init__(flow_name)
@@ -113,9 +113,22 @@ def to_json(self) -> str:
113113

114114
connections_node = config["rootGroup"]["connections"]
115115

116+
processors_by_name = {p.name: p for p in self.processors}
117+
input_ports_by_name = {port.name: port for port in self.input_ports}
118+
output_ports_by_name = {port.name: port for port in self.output_ports}
119+
116120
for conn in self.connections:
121+
source_type = "PROCESSOR"
117122
source_proc = processors_by_name.get(conn.source_name)
123+
if not source_proc:
124+
source_proc = input_ports_by_name.get(conn.source_name)
125+
source_type = "INPUT_PORT"
126+
conn.source_relationship = ""
127+
dest_type = "PROCESSOR"
118128
dest_proc = processors_by_name.get(conn.target_name)
129+
if not dest_proc:
130+
dest_proc = output_ports_by_name.get(conn.target_name)
131+
dest_type = "OUTPUT_PORT"
119132
if not source_proc or not dest_proc:
120133
raise ValueError(
121134
f"Could not find processors for connection from '{conn.source_name}' to '{conn.target_name}'")
@@ -126,15 +139,15 @@ def to_json(self) -> str:
126139
"name": f"{conn.source_name}/{conn.source_relationship}/{conn.target_name}",
127140
"source": {
128141
"id": source_proc.id,
129-
"type": "PROCESSOR",
142+
"type": source_type,
130143
"groupId": "9802c873-3322-3b60-a71d-732d02bd60f8",
131144
"name": conn.source_name,
132145
"comments": "",
133146
"instanceIdentifier": source_proc.id
134147
},
135148
"destination": {
136149
"id": dest_proc.id,
137-
"type": "PROCESSOR",
150+
"type": dest_type,
138151
"groupId": "9802c873-3322-3b60-a71d-732d02bd60f8",
139152
"name": dest_proc.name,
140153
"comments": "",
@@ -155,4 +168,7 @@ def to_json(self) -> str:
155168
"groupIdentifier": "9802c873-3322-3b60-a71d-732d02bd60f8"
156169
})
157170

171+
config["rootGroup"]["inputPorts"] = [input_port.to_json_dict() for input_port in self.input_ports]
172+
config["rootGroup"]["outputPorts"] = [output_port.to_json_dict() for output_port in self.output_ports]
173+
158174
return json.dumps(config)
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
class OutputPort:
19+
def __init__(self, name: str, id: str):
20+
self.id: str = id
21+
self.name: str = name
22+
23+
def to_json_dict(self):
24+
data = {
25+
"identifier": self.id,
26+
"instanceIdentifier": self.id,
27+
"name": self.name,
28+
"comments": "",
29+
"position": {
30+
"x": 0,
31+
"y": 0
32+
},
33+
"type": "OUTPUT_PORT",
34+
"concurrentlySchedulableTaskCount": 1,
35+
"scheduledState": "RUNNING",
36+
"allowRemoteAccess": True,
37+
"portFunction": "STANDARD",
38+
"componentType": "OUTPUT_PORT",
39+
"groupIdentifier": "9802c873-3322-3b60-a71d-732d02bd60f8"
40+
}
41+
return data

0 commit comments

Comments
 (0)