Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ def _compile_mib_to_json(mib, source_mib_directories, destination_directory, sou
from pysmi.codegen import JsonCodeGen
from pysmi.compiler import MibCompiler
from pysmi.parser import SmiV1CompatParser
from pysmi.reader import getReadersFromUrls
from pysmi.reader import get_readers_from_urls
from pysmi.searcher import AnyFileSearcher, StubSearcher
from pysmi.writer import FileWriter

Expand All @@ -282,22 +282,22 @@ def _compile_mib_to_json(mib, source_mib_directories, destination_directory, sou

code_generator = JsonCodeGen()

file_writer = FileWriter(destination_directory).setOptions(suffix='.json')
file_writer = FileWriter(destination_directory).set_options(suffix='.json')

mib_compiler = MibCompiler(SmiV1CompatParser(tempdir=''), code_generator, file_writer)

# use source_mib_directories as mibs source
sources = [source]
sources.extend(source_mib_directories)
mib_compiler.addSources(*getReadersFromUrls(*sources, **{'fuzzyMatching': True}))
mib_compiler.add_sources(*get_readers_from_urls(*sources, fuzzy_matching=True))

searchers = [AnyFileSearcher(destination_directory).setOptions(exts=['.json']), StubSearcher(*mib_stubs)]
mib_compiler.addSearchers(*searchers)
searchers = [AnyFileSearcher(destination_directory).set_options(exts=['.json']), StubSearcher(*mib_stubs)]
mib_compiler.add_searchers(*searchers)

# borrowers, aka compiled mibs source
borrowers = [
AnyFileBorrower(borrower_reader, genTexts=True).setOptions(exts=['.json'])
for borrower_reader in getReadersFromUrls(*[compiled_mibs_path], **{'lowcaseMatching': False})
AnyFileBorrower(borrower_reader, genTexts=True).set_options(exts=['.json'])
for borrower_reader in get_readers_from_urls(*[compiled_mibs_path], lowcase_matching=False)
]
mib_compiler.addBorrowers(*borrowers)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@
ALLOWED_EXTENSIONS_BY_FORMAT = {"json": [".json"], "yaml": [".yml", ".yaml"]}


def _name_for_output(name: str) -> str:
"""Normalize trap/variable name for output: hyphens to underscores (pysmi 0.3 compatibility)."""
return name.replace("-", "_")


class MappingType(Enum):
INTEGER = 0
BITS = 1
Expand Down Expand Up @@ -105,11 +110,93 @@ def generate_traps_db(mib_sources, output_dir, output_file, output_format, no_de
"""
from pysmi.codegen import JsonCodeGen
from pysmi.compiler import MibCompiler
from pysmi.mibinfo import MibInfo
from pysmi.parser import SmiV1CompatParser
from pysmi.reader import getReadersFromUrls
from pysmi.reader import get_readers_from_urls
from pysmi.reader.httpclient import HttpReader
from pysmi.searcher import AnyFileSearcher
from pysmi.writer import FileWriter

# pysmi's HttpReader decodes HTTP response as UTF-8 only; MIBs with non-UTF-8 bytes

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a new issue with the version bump or was it already present?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a new issue with the bump

# (e.g. https://github.com/DataDog/mibs.snmplabs.com/blob/master/asn1/A3COM-HUAWEI-DEVICE-MIB#L593)
# raise UnicodeDecodeError and are reported as "missing".
# Use a tolerant reader for HTTP(S) URLs that decodes with errors='replace'.

def _make_readers(sources, fuzzy_matching=True):
readers = []
for src in sources:
if src.startswith('http://') or src.startswith('https://'):
readers.append(_TolerantHttpReader(src, fuzzy_matching=fuzzy_matching))
else:
readers.extend(get_readers_from_urls(src, fuzzy_matching=fuzzy_matching))
return readers

# This code is copied from https://github.com/lextudio/pysmi/blob/main/pysmi/reader/httpclient.py.
# Except that it decodes the response content with errors='replace' for non-UTF-8 bytes.
class _TolerantHttpReader(HttpReader):
"""HttpReader that decodes response content with errors='replace' for non-UTF-8 bytes."""

def __init__(self, url, fuzzy_matching=True):
super().__init__(url)
self.set_options(fuzzy_matching=fuzzy_matching)

def get_data(self, mibname, **options):
import sys
import time

from pysmi import debug, error
Comment thread
NouemanKHAL marked this conversation as resolved.
Outdated
from pysmi.compat import decode

headers = {"Accept": "text/plain", "User-Agent": self._user_agent}

mibname = decode(mibname)

debug.logger & debug.FLAG_READER and debug.logger(f"looking for MIB {mibname}")
Comment thread
NouemanKHAL marked this conversation as resolved.
Outdated

for mibalias, mibfile in self.get_mib_variants(mibname, **options):
if self.MIB_MAGIC in self._url:
url = self._url.replace(self.MIB_MAGIC, mibfile)
else:
url = self._url + mibfile

debug.logger & debug.FLAG_READER and debug.logger(f"trying to fetch MIB from {url}")

try:
response = self.session.get(url, headers=headers)

except Exception:
debug.logger & debug.FLAG_READER and debug.logger(
f"failed to fetch MIB from {url}: {sys.exc_info()[1]}"
)
continue

debug.logger & debug.FLAG_READER and debug.logger(f"HTTP response {response.status_code}")

if response.status_code == 200:
try:
mtime = time.mktime(
time.strptime(
response.headers["Last-Modified"],
"%a, %d %b %Y %H:%M:%S %Z",
)
)

except Exception:
debug.logger & debug.FLAG_READER and debug.logger(
f"malformed HTTP headers: {sys.exc_info()[1]}"
)
mtime = time.time()

debug.logger & debug.FLAG_READER and debug.logger(
f"fetching source MIB {url}, mtime {response.headers['Last-Modified']}"
)

return MibInfo(path=url, file=mibfile, name=mibalias, mtime=mtime), response.content.decode(
"utf-8", errors='replace'
)

raise error.PySmiReaderFileNotFoundError(f"source MIB {mibname} not found", reader=self)

if debug:
set_debug()
from pysmi import debug
Comment thread
NouemanKHAL marked this conversation as resolved.
Outdated
Expand Down Expand Up @@ -144,12 +231,12 @@ def generate_traps_db(mib_sources, output_dir, output_file, output_format, no_de
mib_sources = sorted({pathlib.Path(x).parent.as_uri() for x in mib_files if os.path.sep in x}) + mib_sources

mib_files = [os.path.basename(x) for x in mib_files]
searchers = [AnyFileSearcher(compiled_mibs_sources).setOptions(exts=['.json'])]
searchers = [AnyFileSearcher(compiled_mibs_sources).set_options(exts=['.json'])]
code_generator = JsonCodeGen()
file_writer = FileWriter(compiled_mibs_sources).setOptions(suffix='.json')
file_writer = FileWriter(compiled_mibs_sources).set_options(suffix='.json')
mib_compiler = MibCompiler(SmiV1CompatParser(tempdir=''), code_generator, file_writer)
mib_compiler.addSources(*getReadersFromUrls(*mib_sources, **{'fuzzyMatching': True}))
mib_compiler.addSearchers(*searchers)
mib_compiler.add_sources(*_make_readers(mib_sources, fuzzy_matching=True))
mib_compiler.add_searchers(*searchers)

compiled_mibs, compiled_dependencies_mibs = compile_and_report_status(mib_files, mib_compiler)

Expand Down Expand Up @@ -307,12 +394,13 @@ def generate_trap_db(compiled_mibs, compiled_mibs_sources, no_descr):
trap_name = trap['name']
trap_oid = trap['oid']
trap_descr = trap.get('description', '')
trap_db["traps"][trap_oid] = {"name": trap_name, "mib": file_mib_name}
trap_db["traps"][trap_oid] = {"name": _name_for_output(trap_name), "mib": file_mib_name}
if not no_descr:
trap_db["traps"][trap_oid]["descr"] = trap_descr
for trap_var in trap.get('objects', []):
try:
var_name, mib_name = trap_var['object'], trap_var['module']
var_name = trap_var['object']
mib_name = trap_var['module']
var_metadata = get_var_metadata(
var_name,
mib_name,
Expand All @@ -331,8 +419,7 @@ def generate_trap_db(compiled_mibs, compiled_mibs_sources, no_descr):
"Ignoring this variable.".format(trap_name, var_name, mib_name)
)
continue
var_name = trap_var['object']
trap_db["vars"][var_metadata.oid] = {"name": var_name}
trap_db["vars"][var_metadata.oid] = {"name": _name_for_output(var_name)}
if not no_descr:
trap_db["vars"][var_metadata.oid]["descr"] = var_metadata.description
if var_metadata.enum:
Expand Down Expand Up @@ -430,8 +517,14 @@ def get_mapping(var_name, mib_name, mapping_type: MappingType, search_locations=
:param search_locations: Tuple of path to directories containing json-compiled MIB files
:return: The oid and the description of the variable.
"""
visited = set() # Guard against circular type references (would cause infinite loop)
mapping = {}
while mib_name and var_name and not mapping:
key = (var_name, mib_name)
if key in visited:
break
visited.add(key)

for location in search_locations:
file_name = os.path.join(location, mib_name + '.json')
if os.path.isfile(file_name):
Expand Down Expand Up @@ -481,7 +574,7 @@ def get_import_mib(var_name, mib_name, search_locations=None):
if os.path.isfile(file_name):
break
else:
return MissingMIBException
raise MissingMIBException()

with open(file_name, 'r') as f:
file_content = json.load(f)
Expand Down
2 changes: 1 addition & 1 deletion datadog_checks_dev/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ cli = [
"pathspec>=0.10.0",
"platformdirs>=2.0.0a3",
"pydantic>=2.0.2",
"pysmi==0.3.4",
"pysmi==1.6.2",
"securesystemslib[crypto]==0.28.0",
"semver>=2.13.0",
"tabulate>=0.8.9",
Expand Down
Loading