Skip to content
Open
2 changes: 1 addition & 1 deletion .tekton/on-pull-request.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ spec:
make lint-pr TARGET_BRANCH=$TARGET_BRANCH_NAME

print_banner "RUNNING UNIT TESTS"
make test-unit PYTEST_OPTS="--log-cli-level=DEBUG"
make test-unit PYTEST_OPTS="--log-cli-level=DEBUG -s"

print_banner "LINT AND TEST COMPLETE"
- name: integration-test
Expand Down
25 changes: 20 additions & 5 deletions src/exploit_iq_commons/utils/c_segmenter_custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,24 @@
from langchain_community.document_loaders.parsers.language.c import CSegmenter
from typing import List


def _comment_replacer(match):
"""Preserve string literals while removing C/C++ comments."""
if match.group(1) is not None: # string literal — keep it
return match.group(0)
return ' ' # comment — replace with space to preserve token boundaries


_COMMENT_OR_STRING = re.compile(
r'("(?:[^"\\]|\\.)*"|\'(?:[^\'\\]|\\.)*\')' # group 1: string literals
r'|'
r'(/\*[\s\S]*?\*/)' # block comment
r'|'
r'(//[^\n]*)', # line comment
re.DOTALL
)


#class extened CSegmenter
class CSegmenterExtended(CSegmenter):

Expand All @@ -32,11 +50,8 @@ def __init__(self, code: str):

@staticmethod
def remove_comments(code: str) -> str:
# Remove all multi-line comments (/* ... */)
code = re.sub(r'/\*.*?\*/', '', code, flags=re.DOTALL)
# Remove all single-line comments (//...)
code = re.sub(r'//.*', '', code)
return code
# Remove comments while preserving comment-like patterns inside string literals
return _COMMENT_OR_STRING.sub(_comment_replacer, code)

@staticmethod
def remove_macro_blocks(text: str) -> str:
Expand Down
124 changes: 96 additions & 28 deletions src/exploit_iq_commons/utils/chain_of_calls_retriever.py

Large diffs are not rendered by default.

21 changes: 3 additions & 18 deletions src/exploit_iq_commons/utils/dep_tree.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def detect_ecosystem(git_repo_path: Path) -> Ecosystem | None:
]
if any(p.is_file() for p in c_candidates):
for root, dirs, files in os.walk(git_repo_path):
dirs[:] = [d for d in dirs if not d.startswith('.')]
dirs[:] = [d for d in dirs if d not in _WALK_EXCLUDE_DIRS and not d.startswith('.')]
if any(Path(f).suffix in C_CPLUSPLUS_EXTENSIONS for f in files):
return MANIFESTS_TO_ECOSYSTEMS[C_CPLUSPLUS_MANIFEST_1]
return None
Expand Down Expand Up @@ -1640,21 +1640,6 @@ def _try_file(path: Path, extractor) -> str | None:

return None

def _ensure_venv(self, manifest_path: Path) -> str:
"""Ensure transitive_env exists with a working python binary."""
venv_python = f'{manifest_path}/{TRANSITIVE_ENV_NAME}/bin/python'
if Path(venv_python).exists():
return venv_python
logger.warning("Venv python not found at %s — creating venv", venv_python)
python_version = self.determine_python_version(str(manifest_path))
if not python_version:
import sys
python_version = f"{sys.version_info.major}.{sys.version_info.minor}"
logger.info("Python version undetermined; using current interpreter %s", python_version)
logger.info("Creating transitive_env with Python %s using uv", python_version)
run_command(["uv", "venv" ,TRANSITIVE_ENV_NAME, "--python", python_version] ,cwd=manifest_path)
return venv_python

def install_dependencies(self, manifest_path: Path):
"""Install Python dependencies for the given repository into a virtual environment.

Expand Down Expand Up @@ -1710,7 +1695,7 @@ def _install_from_best_manifest(self, manifest_path: Path, venv_python: str,
# Project manifests: uv pip install . resolves and installs all declared deps
for manifest_name in (PYPROJECT_TOML, SETUP_PY, SETUP_CFG):
if (manifest_path / manifest_name).exists():
run_command([ "uv", "pip", "install", "." , "--python" "venv_python"] , cwd=manifest_path)
run_command(["uv", "pip", "install", ".", "--python", venv_python], cwd=manifest_path)
return manifest_name

# Pipfile: requires pipenv; skip silently if not available
Expand Down Expand Up @@ -1778,7 +1763,7 @@ def _find_module_dirs(self, package_name: str, site_packages: Path) -> list[str]

if package_name.startswith('types-'):
base = package_name[6:]
candidates = [f'{base}-stubs', f'{base.lower()}-stubs', base, base.lower()]
candidates = list(dict.fromkeys([f'{base}-stubs', f'{base.lower()}-stubs', base, base.lower()]))
elif package_name.startswith('mypy-boto3-'):
base = package_name[11:]
candidates = [f'mypy_boto3_{base}']
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -750,9 +750,6 @@ def is_call_allowed(self, pkg_docs: list[Document], caller_function: Document, c

callee_name = self.get_function_name(callee_function)

if callee_name == "do_shell":
print(f"callee_name: {callee_name}")

if callee_name in caller_functions:
doc = caller_functions[callee_name]
# if static and in same file → call is allowed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ def get_package_name_file(function: Document):
class GoLanguageFunctionsParser(LanguageFunctionsParser):

def is_same_package(self, package_name_from_input, package_name_from_tree):
if not package_name_from_input or not package_name_from_tree:
return False
return package_name_from_input.lower() in package_name_from_tree.lower()

def is_tree_key_match(self, package_from_doc: str, tree_key: str) -> bool:
Expand Down Expand Up @@ -289,7 +291,7 @@ def parse_all_type_struct_class_to_fields(self, types: list[Document], type_inhe
[name, _, type_name] = declaration_parts
elif len(declaration_parts) == 2:
[name, type_name] = declaration_parts
if len(declaration_parts) == (2 or 3):
if len(declaration_parts) in (2, 3):
self.parse_one_type(Document(page_content=f"type {name} {type_name}",
metadata={"source": the_type.metadata['source']}),
types_mapping)
Expand Down Expand Up @@ -431,7 +433,6 @@ def search_for_called_function(self, caller_function: Document, callee_function_
index_of_function_closing = caller_function.page_content.rfind("}")
caller_function_body = str(
caller_function.page_content[index_of_function_opening + 1: index_of_function_closing])
re.search("", caller_function_body)
escaped_name = re.escape(callee_function_name)
regex = fr'(?<![a-zA-Z0-9_])(?:[a-zA-Z0-9_\[\]\(\).]*\.)?{escaped_name}\('
matching = re.search(regex, caller_function_body, re.MULTILINE)
Expand Down Expand Up @@ -627,17 +628,21 @@ def is_package_imported(self, code_content: str, identifier: str, callee_package
start_of_package_name = code_content[identifier_import_position + len("import ")
+ len(identifier):]
index_of_end_of_line = start_of_package_name.find(os.linesep)
package_name_to_check = start_of_package_name[:index_of_end_of_line]
package_name_to_check = start_of_package_name[:index_of_end_of_line].strip().strip("'\"")
if package_name_to_check.strip().lower() == callee_package.strip().lower():
return True
# import without alias, in this case maybe package name contain alias
else:
# re.search(regex, caller_function_body, re.MULTILINE)
matching = re.search(rf"import [\'\"].*{identifier}[\'\"]", code_content)
esc_id = re.escape(identifier)
matching = re.search(rf"import [\'\"].*{esc_id}.*[\'\"]", code_content)
if matching and matching.group(0):
import_line = code_content[matching.start():]
import_package_line = import_line[:import_line.find(os.linesep)].strip()
package_name = import_package_line.split(r"\s")[1]
parts = import_package_line.split()
if len(parts) < 2:
return False
package_name = parts[1].strip("'\"")
if package_name.strip().lower() == callee_package.strip().lower():
return True
return False
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,68 @@ def is_constructor_header(hstart: int, name_start: int) -> bool:
# No declaration found, and snippet didn't begin with a lambda
return ""

@staticmethod
def _count_call_args(s: str, open_idx: int, close_idx: int) -> int:
"""Count top-level arguments in s[open_idx+1 : close_idx].

Respects nested parens, brackets, angle brackets, string/char literals.
Returns 0 for empty parens, otherwise comma_count + 1.
"""
inner = s[open_idx + 1:close_idx]
if not inner.strip():
return 0
commas_with_angles = 0
commas_without_angles = 0
depth_p = depth_b = depth_a = 0
in_str = in_chr = False
prev_esc = False
for ch in inner:
if in_str:
if prev_esc:
prev_esc = False
continue
if ch == '\\':
prev_esc = True
continue
if ch == '"':
in_str = False
continue
if in_chr:
if prev_esc:
prev_esc = False
continue
if ch == '\\':
prev_esc = True
continue
if ch == "'":
in_chr = False
continue
if ch == '"':
in_str = True
continue
if ch == "'":
in_chr = True
continue
if ch == '(':
depth_p += 1
elif ch == ')':
depth_p -= 1
elif ch == '[':
depth_b += 1
elif ch == ']':
depth_b -= 1
elif ch == '<':
depth_a += 1
elif ch == '>' and depth_a > 0:
depth_a -= 1
elif ch == ',' and depth_p == 0 and depth_b == 0:
commas_without_angles += 1
if depth_a == 0:
commas_with_angles += 1
if depth_a == 0:
return commas_with_angles + 1
return commas_without_angles + 1

def search_for_called_function(
self,
caller_function: Document,
Expand Down Expand Up @@ -1054,6 +1116,23 @@ def _method_ref_lhs_start(s: str, dc_idx: int, max_back: int = 512) -> int:
re.MULTILINE,
)

# ---------------------------
# Callee parameter count (for argument-count pre-filter on regular method calls)
# ---------------------------
_callee_sig = extract_method_name_with_params(callee_function.page_content)
if _callee_sig and _callee_sig != "lambda":
_paren_open = _callee_sig.index('(')
_paren_close = _callee_sig.rindex(')')
_params_str = _callee_sig[_paren_open + 1:_paren_close]
_callee_has_varargs = '...' in _params_str
if not _params_str.strip():
_callee_param_count = 0
else:
_callee_param_count = self._count_call_args(_callee_sig, _paren_open, _paren_close)
else:
_callee_param_count = -1
_callee_has_varargs = False

callee_function_source = callee_function.metadata['source']

# CHANGED: get_class_name_from_class_function now returns FQCN (possibly inner).
Expand Down Expand Up @@ -1175,17 +1254,23 @@ def _process_call(start_idx: int, open_paren_pos: int) -> bool:
):
logger.debug(
"__check_identifier_resolved_to_callee_function_package resolved successfully - "
f"callee_function_name={callee_function_name}, identifier_function={ident_snippet}, "
f"target_class_names={target_class_names}, \ncaller_function_source={caller_function.metadata['source']}"
f", \ncaller_function={caller_function.page_content}"
"callee_function_name=%s, identifier_function=%s, "
"target_class_names=%s, \ncaller_function_source=%s"
", \ncaller_function=%s",
callee_function_name, ident_snippet,
target_class_names, caller_function.metadata['source'],
caller_function.page_content,
)
return True

logger.debug(
"__check_identifier_resolved_to_callee_function_package resolved unsuccessfully - "
f"callee_function_name={callee_function_name}, identifier_function={ident_snippet}, "
f"target_class_names={target_class_names}, \ncaller_function_source={caller_function.metadata['source']}"
f", \ncaller_function={caller_function.page_content}"
"callee_function_name=%s, identifier_function=%s, "
"target_class_names=%s, \ncaller_function_source=%s"
", \ncaller_function=%s",
callee_function_name, ident_snippet,
target_class_names, caller_function.metadata['source'],
caller_function.page_content,
)
return False

Expand Down Expand Up @@ -1244,6 +1329,11 @@ def _process_method_ref(dc_idx: int, ref_len: int, make_ctor: bool) -> bool:
if nxt == '{' or nxt == 'throws':
continue

if _callee_param_count >= 0 and not _callee_has_varargs:
call_arg_count = self._count_call_args(caller_function_body, open_paren_pos, close_paren_pos)
if call_arg_count != _callee_param_count:
continue

if _process_call(m.start(), open_paren_pos):
return True

Expand Down
Loading