Skip to content

Commit 7d27047

Browse files
committed
[tests] Add test for the logger
1 parent bbe4b3f commit 7d27047

File tree

1 file changed

+120
-0
lines changed

1 file changed

+120
-0
lines changed

tests/test_compute.py

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
# coding:utf-8
2+
3+
"""
4+
In this test we test the code that is usually launched directly from the meshroom_compute script
5+
6+
TODO : Directly test by launching the executable
7+
- We can get the path with `desc.node._MESHROOM_COMPUTE_EXE`
8+
- However this is not implemented yet because it requires to create a plugin that meshroom could discover
9+
and I didn't want to create a plugin just for what I'm doing here right now (which is testing the logger)
10+
"""
11+
12+
import os
13+
import re
14+
from pathlib import Path
15+
import logging
16+
17+
from meshroom.core.graph import Graph
18+
from meshroom.core import desc
19+
from .utils import registerNodeDesc, unregisterNodeDesc
20+
21+
LOGGER = logging.getLogger("TestCompute")
22+
23+
24+
def executeChunks(node, tmpPath, size):
25+
nodeCache = os.path.join(tmpPath, node.internalFolder)
26+
os.makedirs(nodeCache)
27+
logFiles = {}
28+
for chunkIndex in range(size):
29+
iteration = chunkIndex if size > 1 else -1
30+
logFileName = "log"
31+
if size > 1:
32+
logFileName = f"{chunkIndex}.log"
33+
logFile = Path(nodeCache) / logFileName
34+
logFiles[chunkIndex] = logFile
35+
logFile.touch()
36+
node.prepareLogger(iteration)
37+
node.preprocess()
38+
if size > 1:
39+
chunk = node.chunks[chunkIndex]
40+
chunk.process(True, True)
41+
else:
42+
node.process(True, True)
43+
node.postprocess()
44+
node.restoreLogger()
45+
return logFiles
46+
47+
48+
class TestNodeA(desc.BaseNode):
49+
__test__ = False
50+
51+
size = desc.StaticNodeSize(2)
52+
parallelization = desc.Parallelization(blockSize=1)
53+
54+
inputs = [
55+
desc.IntParam(
56+
name="input",
57+
label="Input",
58+
description="input",
59+
value=0,
60+
),
61+
]
62+
outputs = [
63+
desc.IntParam(
64+
name="output",
65+
label="Output",
66+
description="Output",
67+
value=None,
68+
),
69+
]
70+
71+
def processChunk(self, chunk):
72+
chunk.logManager.start("info")
73+
chunk.logger.info("> Message A")
74+
LOGGER.info(f"> Message B")
75+
chunk.logManager.end()
76+
77+
78+
class TestNodeB(TestNodeA):
79+
size = desc.StaticNodeSize(1)
80+
parallelization = None
81+
82+
83+
class TestNodeLogger:
84+
85+
reA = re.compile(r"\[\d{2}:\d{2}:\d{2}\.\d{3}\]\[info\] > Message A")
86+
reB = re.compile(r"\[\d{2}:\d{2}:\d{2}\.\d{3}\]\[info\] > Message B")
87+
88+
@classmethod
89+
def setup_class(cls):
90+
registerNodeDesc(TestNodeA)
91+
registerNodeDesc(TestNodeB)
92+
93+
@classmethod
94+
def teardown_class(cls):
95+
unregisterNodeDesc(TestNodeA)
96+
unregisterNodeDesc(TestNodeB)
97+
98+
def test_nodeWithChunks(self, tmp_path):
99+
graph = Graph("")
100+
graph._cacheDir = tmp_path
101+
node = graph.addNewNode(TestNodeA.__name__)
102+
# Compute
103+
logFiles = executeChunks(node, tmp_path, 2)
104+
for chunkId, logFile in logFiles.items():
105+
with open(logFile, "r") as f:
106+
content = f.read()
107+
assert len(self.reA.findall(content)) == 1
108+
assert len(self.reB.findall(content)) == 1
109+
110+
def test_nodeWithoutChunks(self, tmp_path):
111+
graph = Graph("")
112+
graph._cacheDir = tmp_path
113+
node = graph.addNewNode(TestNodeB.__name__)
114+
# Compute
115+
logFiles = executeChunks(node, tmp_path, 1)
116+
for _, logFile in logFiles.items():
117+
with open(logFile, "r") as f:
118+
content = f.read()
119+
assert len(self.reA.findall(content)) == 1
120+
assert len(self.reB.findall(content)) == 1

0 commit comments

Comments
 (0)