|
| 1 | +#!/usr/bin/env python |
| 2 | +# -*- coding: utf-8 -*- |
| 3 | +# Copyright (c) 2025 LG Electronics Inc. |
| 4 | +# SPDX-License-Identifier: Apache-2.0 |
| 5 | + |
| 6 | +import os |
| 7 | +import logging |
| 8 | +import subprocess |
| 9 | +import json |
| 10 | +import shutil |
| 11 | +import fosslight_util.constant as constant |
| 12 | +import fosslight_dependency.constant as const |
| 13 | +from fosslight_dependency._package_manager import PackageManager, get_url_to_purl |
| 14 | +from fosslight_dependency.dependency_item import DependencyItem, change_dependson_to_purl |
| 15 | +from fosslight_dependency.package_manager.Npm import check_multi_license |
| 16 | +from fosslight_util.oss_item import OssItem |
| 17 | + |
| 18 | +logger = logging.getLogger(constant.LOGGER_NAME) |
| 19 | +node_modules = 'node_modules' |
| 20 | + |
| 21 | + |
| 22 | +class Pnpm(PackageManager): |
| 23 | + package_manager_name = const.PNPM |
| 24 | + |
| 25 | + dn_url = 'https://www.npmjs.com/package/' |
| 26 | + input_file_name = 'tmp_pnpm_license_output.json' |
| 27 | + flag_tmp_node_modules = False |
| 28 | + project_name_list = [] |
| 29 | + pkg_list = {} |
| 30 | + |
| 31 | + def __init__(self, input_dir, output_dir): |
| 32 | + super().__init__(self.package_manager_name, self.dn_url, input_dir, output_dir) |
| 33 | + |
| 34 | + def __del__(self): |
| 35 | + if os.path.isfile(self.input_file_name): |
| 36 | + os.remove(self.input_file_name) |
| 37 | + if self.flag_tmp_node_modules: |
| 38 | + shutil.rmtree(node_modules, ignore_errors=True) |
| 39 | + |
| 40 | + def run_plugin(self): |
| 41 | + ret = True |
| 42 | + |
| 43 | + pnpm_install_cmd = 'pnpm install --prod --ignore-scripts --ignore-pnpmfile' |
| 44 | + if os.path.isdir(node_modules) != 1: |
| 45 | + logger.info(f"node_modules directory is not existed. So it executes '{pnpm_install_cmd}'.") |
| 46 | + self.flag_tmp_node_modules = True |
| 47 | + cmd_ret = subprocess.call(pnpm_install_cmd, shell=True) |
| 48 | + if cmd_ret != 0: |
| 49 | + logger.error(f"{pnpm_install_cmd} returns an error") |
| 50 | + ret = False |
| 51 | + if ret: |
| 52 | + project_cmd = 'pnpm ls -r --depth -1 -P --json' |
| 53 | + ret_txt = subprocess.check_output(project_cmd, text=True, shell=True) |
| 54 | + if ret_txt is not None: |
| 55 | + deps_l = json.loads(ret_txt) |
| 56 | + for items in deps_l: |
| 57 | + self.project_name_list.append(items["name"]) |
| 58 | + return ret |
| 59 | + |
| 60 | + def parse_direct_dependencies(self): |
| 61 | + if not self.direct_dep: |
| 62 | + return |
| 63 | + try: |
| 64 | + direct_cmd = 'pnpm ls -r --depth 0 -P --json' |
| 65 | + ret_txt = subprocess.check_output(direct_cmd, text=True, shell=True) |
| 66 | + if ret_txt is not None: |
| 67 | + deps_l = json.loads(ret_txt) |
| 68 | + for item in deps_l: |
| 69 | + if 'dependencies' in item and isinstance(item['dependencies'], dict): |
| 70 | + self.direct_dep_list.extend(item['dependencies'].keys()) |
| 71 | + else: |
| 72 | + self.direct_dep = False |
| 73 | + logger.warning('Cannot print direct/transitive dependency') |
| 74 | + except Exception as e: |
| 75 | + logger.warning(f'Fail to print direct/transitive dependency: {e}') |
| 76 | + self.direct_dep = False |
| 77 | + if self.direct_dep: |
| 78 | + self.direct_dep_list = list(filter(lambda dep: dep not in self.project_name_list, self.direct_dep_list)) |
| 79 | + |
| 80 | + def extract_dependencies(self, dependencies, purl_dict): |
| 81 | + dep_item_list = [] |
| 82 | + for dep_name, dep_info in dependencies.items(): |
| 83 | + if dep_name not in self.project_name_list: |
| 84 | + if dep_name in self.pkg_list.keys(): |
| 85 | + if dep_info.get('version') in self.pkg_list[dep_name]: |
| 86 | + continue |
| 87 | + self.pkg_list.setdefault(dep_name, []).append(dep_info.get('version')) |
| 88 | + dep_item = DependencyItem() |
| 89 | + oss_item = OssItem() |
| 90 | + oss_item.name = f'npm:{dep_name}' |
| 91 | + oss_item.version = dep_info.get('version') |
| 92 | + |
| 93 | + license_name = dep_info.get('license') |
| 94 | + if license_name: |
| 95 | + multi_license, license_comment, multi_flag = check_multi_license(license_name, '') |
| 96 | + if multi_flag: |
| 97 | + oss_item.comment = license_comment |
| 98 | + license_name = multi_license |
| 99 | + else: |
| 100 | + license_name = license_name.replace(",", "") |
| 101 | + oss_item.license = license_name |
| 102 | + |
| 103 | + oss_item.homepage = f'{self.dn_url}{dep_name}' |
| 104 | + oss_item.download_location = dep_info.get('repository') |
| 105 | + if oss_item.download_location: |
| 106 | + if oss_item.download_location.endswith('.git'): |
| 107 | + oss_item.download_location = oss_item.download_location[:-4] |
| 108 | + if oss_item.download_location.startswith('git://'): |
| 109 | + oss_item.download_location = 'https://' + oss_item.download_location[6:] |
| 110 | + elif oss_item.download_location.startswith('git+https://'): |
| 111 | + oss_item.download_location = 'https://' + oss_item.download_location[12:] |
| 112 | + elif oss_item.download_location.startswith('git+ssh://git@'): |
| 113 | + oss_item.download_location = 'https://' + oss_item.download_location[14:] |
| 114 | + else: |
| 115 | + oss_item.download_location = f'{self.dn_url}{dep_name}/v/{oss_item.version}' |
| 116 | + |
| 117 | + dn_loc = f'{oss_item.homepage}/v/{oss_item.version}' |
| 118 | + dep_item.purl = get_url_to_purl(dn_loc, 'npm') |
| 119 | + purl_dict[f'{dep_name}({oss_item.version})'] = dep_item.purl |
| 120 | + |
| 121 | + if dep_name in self.direct_dep_list: |
| 122 | + oss_item.comment = 'direct' |
| 123 | + else: |
| 124 | + oss_item.comment = 'transitive' |
| 125 | + |
| 126 | + if 'dependencies' in dep_info: |
| 127 | + for dn, di in dep_info.get('dependencies').items(): |
| 128 | + if dn not in self.project_name_list: |
| 129 | + dep_item.depends_on_raw.append(f"{dn}({di['version']})") |
| 130 | + |
| 131 | + dep_item.oss_items.append(oss_item) |
| 132 | + dep_item_list.append(dep_item) |
| 133 | + |
| 134 | + if 'dependencies' in dep_info: |
| 135 | + dep_item_list_inner, purl_dict_inner = self.extract_dependencies(dep_info['dependencies'], purl_dict) |
| 136 | + dep_item_list.extend(dep_item_list_inner) |
| 137 | + purl_dict.update(purl_dict_inner) |
| 138 | + |
| 139 | + return dep_item_list, purl_dict |
| 140 | + |
| 141 | + def parse_oss_information_for_pnpm(self): |
| 142 | + project_cmd = 'pnpm ls --json -r --depth Infinity -P --long' |
| 143 | + ret_txt = subprocess.check_output(project_cmd, text=True, shell=True) |
| 144 | + if ret_txt is not None: |
| 145 | + deps_l = json.loads(ret_txt) |
| 146 | + purl_dict = {} |
| 147 | + for items in deps_l: |
| 148 | + if 'dependencies' in items: |
| 149 | + dep_item_list_inner, purl_dict_inner = self.extract_dependencies(items['dependencies'], purl_dict) |
| 150 | + self.dep_items.extend(dep_item_list_inner) |
| 151 | + purl_dict.update(purl_dict_inner) |
| 152 | + if self.direct_dep: |
| 153 | + self.dep_items = change_dependson_to_purl(purl_dict, self.dep_items) |
| 154 | + else: |
| 155 | + logger.warning(f'No output for {project_cmd}') |
0 commit comments