diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..801112a --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,65 @@ +# SPDX-License-Identifier: GPL-2.0-only +# +# Copyright (c) 2025 Linux CAN project +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +import pytest +import subprocess +import os +import sys + +# Add option to select the interface (default: vcan0) +def pytest_addoption(parser): + parser.addoption( + "--can-iface", + action="store", + default="vcan0", + help="The CAN interface for tests (e.g., vcan0)" + ) + parser.addoption( + "--bin-path", + action="store", + default=".", + help="Path to the compiled can-utils binaries" + ) + +@pytest.fixture(scope="session") +def can_interface(request): + """ + Checks if the specified interface exists. + If not, the tests are skipped. + """ + iface = request.config.getoption("--can-iface") + + try: + # 'ip link show ' returns 0 if it exists, otherwise an error + subprocess.check_call( + ["ip", "link", "show", iface], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL + ) + except (subprocess.CalledProcessError, FileNotFoundError): + pytest.skip(f"Prerequisite not met: Interface '{iface}' not found.") + + return iface + +@pytest.fixture(scope="session") +def bin_path(request): + """ + Returns the path to the binaries and checks if they exist. + """ + path = request.config.getoption("--bin-path") + # Exemplarily check if 'cansend' is located there + cansend_path = os.path.join(path, "cansend") + if not os.path.isfile(cansend_path) and not os.path.isfile(cansend_path + ".exe"): + pytest.skip(f"Compiled tools not found in '{path}'. Please run 'make' first.") + return path diff --git a/tests/test_can_basics.py b/tests/test_can_basics.py new file mode 100644 index 0000000..cce7ca1 --- /dev/null +++ b/tests/test_can_basics.py @@ -0,0 +1,114 @@ +# SPDX-License-Identifier: GPL-2.0-only +# +# Copyright (c) 2023 Linux CAN project +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +import subprocess +import time +import os +import pytest +import signal + +def run_tool(bin_path, tool_name, args): + """Helper function to run a tool""" + full_path = os.path.join(bin_path, tool_name) + cmd = [full_path] + args + return subprocess.run(cmd, capture_output=True, text=True) + +def test_tools_executable(bin_path): + """Simply checks if the tools show help (existence & executability)""" + for tool in ["cansend", "candump", "cangen"]: + result = run_tool(bin_path, tool, ["-?"]) # -? is often help in can-utils, otherwise invalid args + # We do not expect a crash (Segfault), Exit code may vary depending on arg parsing + # It is important that stderr or stdout contains something meaningful or the process exits cleanly + assert result.returncode != -11, f"{tool} crashed (Segfault)" + +def test_send_and_receive(bin_path, can_interface): + """ + Integration test: + 1. Starts candump in the background on the interface. + 2. Sends a CAN frame with cansend. + 3. Checks if candump saw the frame. + """ + candump_path = os.path.join(bin_path, "candump") + cansend_path = os.path.join(bin_path, "cansend") + + # Unique ID and data for this test + can_id = "123" + can_data = "11223344" + can_frame = f"{can_id}#{can_data}" + + # 1. Start candump in the background + # -L for Raw output, -x for extra details (optional), interface + with subprocess.Popen( + [candump_path, "-L", can_interface], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + preexec_fn=os.setsid # Process group for clean killing + ) as dumpproc: + + time.sleep(0.5) # Wait briefly until candump is ready + + # 2. Send frame + # cansend + send_result = subprocess.run( + [cansend_path, can_interface, can_frame], + capture_output=True, + text=True + ) + + assert send_result.returncode == 0, f"cansend failed: {send_result.stderr}" + + time.sleep(0.5) # Wait until frame is processed + + # Terminate candump + os.killpg(os.getpgid(dumpproc.pid), signal.SIGTERM) + stdout, stderr = dumpproc.communicate() + + # 3. Analysis + # candump -L Output Format (approx): (timestamp) vcan0 123#11223344 + print(f"Candump Output: {stdout}") + + assert can_id in stdout, "CAN-ID was not received" + assert can_data in stdout, "CAN data were not received" + +def test_cangen_generation(bin_path, can_interface): + """ + Checks if cangen generates traffic. + """ + cangen_path = os.path.join(bin_path, "cangen") + candump_path = os.path.join(bin_path, "candump") + + # Start candump + with subprocess.Popen( + [candump_path, "-L", can_interface], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + preexec_fn=os.setsid + ) as dumpproc: + + # Start cangen: Send 5 frames (-n 5) and then terminate + subprocess.run( + [cangen_path, "-n", "5", can_interface], + check=True + ) + + time.sleep(1) + + os.killpg(os.getpgid(dumpproc.pid), signal.SIGTERM) + stdout, _ = dumpproc.communicate() + + # We expect 5 lines of output (or more, if other traffic is present) + lines = [l for l in stdout.splitlines() if can_interface in l] + assert len(lines) >= 5, "cangen did not generate enough frames/candump did not see them" diff --git a/tests/test_can_calc_bit_timing.py b/tests/test_can_calc_bit_timing.py new file mode 100644 index 0000000..732d30e --- /dev/null +++ b/tests/test_can_calc_bit_timing.py @@ -0,0 +1,171 @@ +# SPDX-License-Identifier: GPL-2.0-only +# +# Copyright (c) 2025 Linux CAN project +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +import pytest +import subprocess +import os +import re + +# --- Helper Functions --- + +def run_ccbt(bin_path, args): + """ + Helper to run can-calc-bit-timing. + Returns the subprocess.CompletedProcess object. + """ + cmd = [os.path.join(bin_path, "can-calc-bit-timing")] + args + return subprocess.run(cmd, capture_output=True, text=True) + +# --- Tests --- + +def test_usage_on_invalid_option(bin_path): + """ + Description: + Tests that providing an invalid option (like -h) results in an error + message and prints the usage information. + Note: The tool does not support -h explicitly, it treats it as invalid. + + Manual reproduction: + $ can-calc-bit-timing -h + > invalid option -- 'h' + > Usage: can-calc-bit-timing [options] ... + """ + result = run_ccbt(bin_path, ["-h"]) + + # Expect failure code usually for invalid options + assert result.returncode != 0 + assert "Usage: can-calc-bit-timing" in result.stderr or "Usage: can-calc-bit-timing" in result.stdout + +def test_list_controllers(bin_path): + """ + Description: + Tests the -l option to list all supported CAN controller names. + + Manual reproduction: + $ can-calc-bit-timing -l + > ... + > mcp251x + > ... + """ + result = run_ccbt(bin_path, ["-l"]) + + assert result.returncode == 0 + # Check for a common controller known to be in the list (e.g., mcp251x or sja1000) + # The list is usually quite long. + assert "mcp251x" in result.stdout or "sja1000" in result.stdout + +def test_basic_calculation_stdout(bin_path): + """ + Description: + Tests a basic bit timing calculation for a standard bitrate (500k) + and clock (8MHz). Verifies that a table is produced. + + Manual reproduction: + $ can-calc-bit-timing -c 8000000 -b 500000 + > Bit rate : 500000 Rate error : 0.00% ... + """ + # 8MHz clock, 500kbit/s + result = run_ccbt(bin_path, ["-c", "8000000", "-b", "500000"]) + + assert result.returncode == 0 + # Output should contain column headers like "SampP" (Sample Point) and the bitrate + assert "SampP" in result.stdout + assert "500000" in result.stdout + +def test_quiet_mode(bin_path): + """ + Description: + Tests the -q option (quiet mode), which should suppress the header line. + + Manual reproduction: + $ can-calc-bit-timing -q -c 8000000 -b 500000 + > (Output should start with data, not the 'Bit timing parameters...' header) + """ + # Run without -q first to confirm header exists + res_std = run_ccbt(bin_path, ["-c", "8000000", "-b", "500000"]) + assert "Bit timing parameters" in res_std.stdout + + # Run with -q + res_quiet = run_ccbt(bin_path, ["-q", "-c", "8000000", "-b", "500000"]) + assert res_quiet.returncode == 0 + assert "Bit timing parameters" not in res_quiet.stdout + # Data should still be there + assert "500000" in res_quiet.stdout + +def test_verbose_output(bin_path): + """ + Description: + Tests the -v option for verbose output. + + Manual reproduction: + $ can-calc-bit-timing -v -c 8000000 -b 500000 + """ + result = run_ccbt(bin_path, ["-v", "-c", "8000000", "-b", "500000"]) + assert result.returncode == 0 + # Verbose mode usually prints more details, but the table should definitely be there. + # We check for "SampP" to ensure valid calculation output. + assert "SampP" in result.stdout + +def test_data_bitrate_fd(bin_path): + """ + Description: + Tests the -d option to specify a data bitrate (CAN FD). + + Manual reproduction: + $ can-calc-bit-timing -c 40000000 -b 1000000 -d 2000000 + """ + # 40MHz clock, 1M nominal, 2M data + result = run_ccbt(bin_path, ["-c", "40000000", "-b", "1000000", "-d", "2000000"]) + + assert result.returncode == 0 + # Should calculate for both + assert "SampP" in result.stdout + assert "1000000" in result.stdout + assert "2000000" in result.stdout + +def test_specific_sample_point(bin_path): + """ + Description: + Tests the -s option to define a specific sample point (e.g., 875 for 87.5%). + + Manual reproduction: + $ can-calc-bit-timing -c 8000000 -b 500000 -s 875 + > ... Sample Point : 87.5% ... + """ + # 875 means 87.5% + result = run_ccbt(bin_path, ["-c", "8000000", "-b", "500000", "-s", "875"]) + + assert result.returncode == 0 + assert "87.5%" in result.stdout + +def test_specific_algorithm(bin_path): + """ + Description: + Tests the --alg option to select a different algorithm. + Assuming 'v4_8' is a valid algorithm key based on source files + (can-calc-bit-timing-v4_8.c). Note: Valid keys depend on compilation. + We check if it accepts the flag without crashing, even if default is used. + + Manual reproduction: + $ can-calc-bit-timing -c 8000000 -b 500000 --alg v4.8 + (Note: algorithm naming convention might vary, using a safe check) + """ + # Trying to list supported algorithms first would be ideal if possible, + # but based on file list, let's try a standard calculation with a potentially valid alg string. + # If the alg string is invalid, it usually warns or errors. + + # We simply check that the flag is parsed. + # "default" should always be valid. + result = run_ccbt(bin_path, ["-c", "8000000", "-b", "500000", "--alg", "default"]) + assert result.returncode == 0 diff --git a/tests/test_canbusload.py b/tests/test_canbusload.py new file mode 100644 index 0000000..f3c75d8 --- /dev/null +++ b/tests/test_canbusload.py @@ -0,0 +1,284 @@ +# SPDX-License-Identifier: GPL-2.0-only +# +# Copyright (c) 2025 Linux CAN project +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +import pytest +import subprocess +import os +import time +import signal +import re + +# --- Helper Functions & Classes --- + +def run_cangen(bin_path, interface, args): + """ + Runs cangen to generate traffic. + Waits for it to finish (assumes -n is used in args). + """ + cmd = [os.path.join(bin_path, "cangen"), interface] + args + subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + +class CanbusloadMonitor: + """ + Context manager to run canbusload. + Captures stdout/stderr. + Ensures process is killed to prevent stalls. + """ + def __init__(self, bin_path, args): + self.cmd = [os.path.join(bin_path, "canbusload")] + args + self.process = None + self.output = "" + self.error = "" + + def __enter__(self): + # run with setsid to allow killing the whole process group later + self.process = subprocess.Popen( + self.cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + preexec_fn=os.setsid + ) + time.sleep(0.5) # Wait for tool to start and initialize + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.process: + if self.process.poll() is None: + # canbusload mentions using CTRL-C (SIGINT) to terminate. + # SIGTERM might be ignored or handled differently. + os.killpg(os.getpgid(self.process.pid), signal.SIGINT) + try: + # Wait a bit for it to exit gracefully + self.process.wait(timeout=1.0) + except subprocess.TimeoutExpired: + # Force kill if it's stuck (e.g. buffer deadlock) + os.killpg(os.getpgid(self.process.pid), signal.SIGKILL) + + # Read remaining output + self.output, self.error = self.process.communicate() + +# --- Tests for canbusload --- + +def test_help_option(bin_path): + """ + Description: + Tests the -h option to ensure the usage message is displayed. + + Manual reproduction: + $ canbusload -h + > monitor CAN bus load. + > Usage: canbusload [options] + + """ + result = subprocess.run( + [os.path.join(bin_path, "canbusload"), "-h"], + capture_output=True, + text=True + ) + assert "Usage: canbusload" in result.stdout or "Usage: canbusload" in result.stderr + assert "monitor CAN bus load" in result.stdout or "monitor CAN bus load" in result.stderr + +def test_basic_monitoring(bin_path, can_interface): + """ + Description: + Tests basic monitoring of a CAN interface with a specified bitrate. + Verifies that the interface name appears in the output and frames are counted. + + Manual reproduction: + 1. Open a terminal and run: + $ canbusload vcan0@500000 + 2. In another terminal, generate traffic: + $ cangen vcan0 -n 10 + 3. Observe the output updating with frame counts. + """ + bitrate = "500000" + target = f"{can_interface}@{bitrate}" + + with CanbusloadMonitor(bin_path, [target]) as monitor: + # Generate some traffic so canbusload has something to report + # Use -g to add a small gap, preventing potential buffer flooding issues on very fast systems + run_cangen(bin_path, can_interface, ["-n", "10", "-g", "10"]) + time.sleep(1.0) # Wait for next update cycle + + assert can_interface in monitor.output + # Check if we saw some frames (regex for at least 1 frame) + # Output format roughly: can0@500k 10 ... + assert re.search(r'\s+[1-9][0-9]*\s+', monitor.output), "No frames detected in output" + +def test_time_option_t(bin_path, can_interface): + """ + Description: + Tests the -t option which shows the current time on the first line. + + Manual reproduction: + $ canbusload -t vcan0@500000 + > canbusload 2025-01-01 12:00:00 ... + """ + target = f"{can_interface}@500000" + + with CanbusloadMonitor(bin_path, ["-t", target]) as monitor: + time.sleep(0.5) + + # Check for YYYY-MM-DD format in the output header + assert re.search(r'\d{4}-\d{2}-\d{2}', monitor.output), "Timestamp not found with -t option" + +def test_bargraph_option_b(bin_path, can_interface): + """ + Description: + Tests the -b option which displays a bargraph of the bus load. + + Manual reproduction: + 1. Run: + $ canbusload -b vcan0@500000 + 2. Generate heavy traffic: + $ cangen vcan0 -g 0 -n 100 + 3. Observe the ASCII bar graph (e.g., |XXXX....|). + """ + target = f"{can_interface}@500000" + + with CanbusloadMonitor(bin_path, ["-b", target]) as monitor: + # Generate burst of traffic to spike load + run_cangen(bin_path, can_interface, ["-n", "50", "-g", "0"]) + time.sleep(1.0) + + # Check for bargraph delimiters + assert "|" in monitor.output + # Check for bargraph content (X for data, . for empty) + # Note: With low load it might just be dots, but the bar structure |...| should exist + assert re.search(r'\|[XRT\.]+\|', monitor.output), "Bargraph structure not found" + +def test_colorize_option_c(bin_path, can_interface): + """ + Description: + Tests the -c option for colorized output. + Checks for the presence of ANSI escape codes in the output. + + Manual reproduction: + $ canbusload -c vcan0@500000 + > Output should be colored (visible in compatible terminal). + """ + target = f"{can_interface}@500000" + + with CanbusloadMonitor(bin_path, ["-c", target]) as monitor: + run_cangen(bin_path, can_interface, ["-n", "1"]) + time.sleep(0.5) + + # Check for ANSI escape character (ASCII 27 / \x1b) + assert "\x1b[" in monitor.output, "ANSI escape codes for color not found" + +def test_redraw_option_r(bin_path, can_interface): + """ + Description: + Tests the -r option which redraws the terminal (like 'top'). + Checks for clear screen/cursor movement escape sequences. + + Manual reproduction: + $ canbusload -r vcan0@500000 + > Screen should refresh instead of scrolling. + """ + target = f"{can_interface}@500000" + + with CanbusloadMonitor(bin_path, ["-r", target]) as monitor: + time.sleep(0.5) + + # Usually implies clearing screen or moving cursor home: \x1b[H or \x1b[2J + assert "\x1b[" in monitor.output, "Cursor movement/redraw codes not found" + +def test_exact_calculation_e(bin_path, can_interface): + """ + Description: + Tests the -e option for exact calculation of stuffed bits. + Ensures the tool runs without error and outputs the 'exact' indicator if applicable + or simply produces valid output. + + Manual reproduction: + $ canbusload -e vcan0@500000 + > Header might indicate different mode or output values change slightly. + """ + target = f"{can_interface}@500000" + + with CanbusloadMonitor(bin_path, ["-e", target]) as monitor: + run_cangen(bin_path, can_interface, ["-n", "10"]) + time.sleep(0.5) + + # Primarily checking that it runs successfully and produces output + assert can_interface in monitor.output + assert re.search(r'\s+[1-9][0-9]*\s+', monitor.output) + +def test_ignore_bitstuffing_i(bin_path, can_interface): + """ + Description: + Tests the -i option to ignore bitstuffing in bandwidth calculation. + + Manual reproduction: + $ canbusload -i vcan0@500000 + """ + target = f"{can_interface}@500000" + + with CanbusloadMonitor(bin_path, ["-i", target]) as monitor: + run_cangen(bin_path, can_interface, ["-n", "10"]) + time.sleep(0.5) + + assert can_interface in monitor.output + +def test_multiple_interfaces(bin_path, can_interface): + """ + Description: + Tests monitoring multiple interfaces simultaneously. + Note: Since we usually only have one 'can_interface' fixture, + we will simulate the syntax. If the second interface doesn't exist, + canbusload might still run or show error for that line, but we check + parsing. + + To robustly test, we use the same interface twice with different bitrates + (if supported by tool) or just check that the command line is accepted. + + Manual reproduction: + $ canbusload vcan0@500000 vcan0@250000 + > Should list vcan0 twice. + """ + # Using the same interface twice to ensure they exist. + target1 = f"{can_interface}@500000" + target2 = f"{can_interface}@250000" + + with CanbusloadMonitor(bin_path, [target1, target2]) as monitor: + run_cangen(bin_path, can_interface, ["-n", "10"]) + time.sleep(1.0) + + # Check that the interface appears at least twice in the output lines + # (ignoring the header line) + count = monitor.output.count(can_interface) + # 1 in command echo (if any), 2 in status lines. + # Since we are capturing output, it likely appears multiple times across updates. + # We just ensure it's there. + assert count >= 2, "Multiple interfaces did not appear in output" + +def test_missing_bitrate_error(bin_path, can_interface): + """ + Description: + Tests that omitting the bitrate results in an error/usage message. + The bitrate is mandatory. + + Manual reproduction: + $ canbusload vcan0 + > Should print usage or error. + """ + # Running without @bitrate + result = subprocess.run( + [os.path.join(bin_path, "canbusload"), can_interface], + capture_output=True, + text=True + ) + # Expect failure code or error message + assert result.returncode != 0 or "Usage" in result.stdout or "Usage" in result.stderr diff --git a/tests/test_candump.py b/tests/test_candump.py new file mode 100644 index 0000000..7748eeb --- /dev/null +++ b/tests/test_candump.py @@ -0,0 +1,260 @@ +# SPDX-License-Identifier: GPL-2.0-only +# +# Copyright (c) 2023 Linux CAN project +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License for more details. + +import pytest +import subprocess +import os +import time +import signal +import re +import shutil + +# --- Helper Functions --- + +def run_cangen(bin_path, interface, args): + """ + Runs cangen to generate traffic. + Waits for it to finish (assumes -n is used in args). + """ + cmd = [os.path.join(bin_path, "cangen"), interface] + args + subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + +class CandumpMonitor: + """ + Context manager to run candump. + Captures stdout/stderr. + """ + def __init__(self, bin_path, interface, args=None): + self.cmd = [os.path.join(bin_path, "candump"), interface] + if args: + # Insert args before interface if they are options + # candump usage: candump [options] + self.cmd = [os.path.join(bin_path, "candump")] + args + [interface] + else: + self.cmd = [os.path.join(bin_path, "candump"), interface] + + self.process = None + self.output = "" + self.error = "" + + def __enter__(self): + self.process = subprocess.Popen( + self.cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + preexec_fn=os.setsid + ) + time.sleep(0.1) # Wait for start + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.process: + if self.process.poll() is None: + os.killpg(os.getpgid(self.process.pid), signal.SIGTERM) + self.output, self.error = self.process.communicate() + +# --- Tests for candump --- + +def test_help_option(bin_path): + """Test -h option: Should print usage.""" + # candump often returns 1 or 0 on help, we check output mainly + result = subprocess.run([os.path.join(bin_path, "candump"), "-h"], capture_output=True, text=True) + assert "Usage: candump" in result.stdout or "Usage: candump" in result.stderr + +def test_count_n(bin_path, can_interface): + """Test -n : Terminate after reception of CAN frames.""" + count = 3 + # Start candump expecting 3 frames + with CandumpMonitor(bin_path, can_interface, ["-n", str(count)]) as monitor: + # Generate 3 frames + run_cangen(bin_path, can_interface, ["-n", str(count)]) + + # Wait a bit to ensure candump finishes and exits on its own + time.sleep(0.5) + + # Check if process exited + assert monitor.process.poll() is not None, "candump did not exit after receiving count frames" + + # Count lines (ignoring empty lines) + lines = [l for l in monitor.output.splitlines() if l.strip()] + assert len(lines) == count + +def test_timeout_T(bin_path, can_interface): + """Test -T : Terminate after timeout if no frames received.""" + start = time.time() + # Run with 500ms timeout + subprocess.run( + [os.path.join(bin_path, "candump"), can_interface, "-T", "500"], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL + ) + duration = time.time() - start + # Should be around 0.5s, definitely less than 2s (safety margin) and more than 0.4s + assert 0.4 <= duration <= 1.0 + +def test_timestamp_formats(bin_path, can_interface): + """Test -t : timestamp formats.""" + + # (a)bsolute + with CandumpMonitor(bin_path, can_interface, ["-t", "a"]) as monitor: + run_cangen(bin_path, can_interface, ["-n", "1"]) + assert re.search(r'\(\d+\.\d+\)', monitor.output), "Absolute timestamp missing" + + # (d)elta + with CandumpMonitor(bin_path, can_interface, ["-t", "d"]) as monitor: + run_cangen(bin_path, can_interface, ["-n", "2", "-g", "10"]) + # Second line should have a small delta + assert re.search(r'\(\d+\.\d+\)', monitor.output), "Delta timestamp missing" + + # (z)ero + with CandumpMonitor(bin_path, can_interface, ["-t", "z"]) as monitor: + run_cangen(bin_path, can_interface, ["-n", "1"]) + # Starts with (0.000000) roughly. Note: candump often uses 3 digits for seconds (e.g. 000.000000). + # Regex adjusted to accept one or more leading zeros. + assert re.search(r'\(0+\.\d+\)', monitor.output), f"Zero timestamp format mismatch: {monitor.output}" + + # (A)bsolute w date + with CandumpMonitor(bin_path, can_interface, ["-t", "A"]) as monitor: + run_cangen(bin_path, can_interface, ["-n", "1"]) + # Check for YYYY-MM-DD format + assert re.search(r'\(\d{4}-\d{2}-\d{2}', monitor.output), "Date timestamp missing" + +def test_ascii_output_a(bin_path, can_interface): + """Test -a: Additional ASCII output.""" + # Send known data that is printable ASCII + # 0x41 = 'A', 0x42 = 'B' + with CandumpMonitor(bin_path, can_interface, ["-a"]) as monitor: + # Force DLC to 4 to ensure we get all 4 bytes ("ABCD") + run_cangen(bin_path, can_interface, ["-n", "1", "-D", "41424344", "-L", "4"]) + + assert "ABCD" in monitor.output or " .ABCD" in monitor.output + +def test_silent_mode_s(bin_path, can_interface): + """Test -s : Silent mode.""" + + # Level 0 (Default, not silent) + with CandumpMonitor(bin_path, can_interface, ["-s", "0"]) as monitor: + run_cangen(bin_path, can_interface, ["-n", "1"]) + assert len(monitor.output) > 0 + + # Level 2 (Silent) + with CandumpMonitor(bin_path, can_interface, ["-s", "2"]) as monitor: + run_cangen(bin_path, can_interface, ["-n", "1"]) + assert len(monitor.output.strip()) == 0 + +def test_log_file_l(bin_path, can_interface, tmp_path): + """Test -l: Log to file (default name).""" + # Resolve bin_path to absolute because we change CWD below + bin_path = os.path.abspath(bin_path) + + # Change cwd to tmp_path to avoid littering + cwd = os.getcwd() + os.chdir(tmp_path) + try: + # -l implies -s 2 (silent stdout) + with CandumpMonitor(bin_path, can_interface, ["-l"]) as monitor: + run_cangen(bin_path, can_interface, ["-n", "1"]) + + # Check if file starting with candump- exists + files = [f for f in os.listdir('.') if f.startswith('candump-')] + assert len(files) > 0, "Log file not created" + + # Check content + with open(files[0], 'r') as f: + content = f.read() + assert can_interface in content + finally: + os.chdir(cwd) + +def test_specific_log_file_f(bin_path, can_interface, tmp_path): + """Test -f : Log to specific file.""" + logfile = os.path.join(tmp_path, "test.log") + + with CandumpMonitor(bin_path, can_interface, ["-f", logfile]) as monitor: + run_cangen(bin_path, can_interface, ["-n", "1"]) + + assert os.path.exists(logfile) + with open(logfile, 'r') as f: + content = f.read() + assert can_interface in content + +def test_log_format_stdout_L(bin_path, can_interface): + """Test -L: Log file format on stdout.""" + with CandumpMonitor(bin_path, can_interface, ["-L"]) as monitor: + run_cangen(bin_path, can_interface, ["-n", "1"]) + + # Format: (timestamp) interface ID#DATA + assert re.search(r'\(\d+\.\d+\)\s+' + can_interface + r'\s+[0-9A-F]+#', monitor.output) + +def test_raw_dlc_8(bin_path, can_interface): + """Test -8: Display raw DLC in {} for Classical CAN.""" + # Send a classic CAN frame + with CandumpMonitor(bin_path, can_interface, ["-8"]) as monitor: + run_cangen(bin_path, can_interface, ["-n", "1", "-L", "4"]) + + # Look for [4] or {4} depending on output format with -8 + # Standard candump: "vcan0 123 [4] 11 22 33 44" + # With -8: "vcan0 123 {4} [4] 11 22 33 44" (Wait, man page says "display raw DLC in {}") + # Let's check for the curly braces + assert "{" in monitor.output and "}" in monitor.output + +def test_extra_info_x(bin_path, can_interface): + """Test -x: Print extra message info (RX/TX, etc.).""" + with CandumpMonitor(bin_path, can_interface, ["-x"]) as monitor: + run_cangen(bin_path, can_interface, ["-n", "1"]) + + # Usually adds "RX - - " or similar at the end + assert "RX" in monitor.output or "TX" in monitor.output + +def test_filters(bin_path, can_interface): + """Test CAN filters on command line.""" + + # Filter for ID 123 + # Command: candump vcan0,123:7FF + + # 1. Send ID 123 (Should receive) + cmd_match = [can_interface + ",123:7FF"] # Argument is "interface,filter" + with CandumpMonitor(bin_path, cmd_match[0], args=[]) as monitor: + # Note: CandumpMonitor logic needs slightly adjustment if interface arg contains comma + # But here we pass it as the "interface" argument to the class which works for valid invocation + run_cangen(bin_path, can_interface, ["-n", "1", "-I", "123"]) + assert "123" in monitor.output + + # 2. Send ID 456 (Should NOT receive) + with CandumpMonitor(bin_path, cmd_match[0], args=[]) as monitor: + run_cangen(bin_path, can_interface, ["-n", "1", "-I", "456"]) + assert "456" not in monitor.output + +def test_inverse_filter(bin_path, can_interface): + """Test inverse filter (~).""" + # Filter: Everything EXCEPT ID 123 + # Command: candump vcan0,123~7FF + + arg = can_interface + ",123~7FF" + + # Send 123 (Should NOT receive) + with CandumpMonitor(bin_path, arg) as monitor: + run_cangen(bin_path, can_interface, ["-n", "1", "-I", "123"]) + assert "123" not in monitor.output + + # Send 456 (Should receive) + with CandumpMonitor(bin_path, arg) as monitor: + run_cangen(bin_path, can_interface, ["-n", "1", "-I", "456"]) + assert "456" in monitor.output + +def test_swap_byte_order_S(bin_path, can_interface): + """Test -S: Swap byte order.""" + # Send 11223344 + # -S should display it swapped/marked. + # Warning: -S only affects printed output, usually prints big-endian vs little-endian representation + + with CandumpMonitor(bin_path, can_interface, ["-S"]) as monitor: + run_cangen(bin_path, can_interface, ["-n", "1", "-D", "11223344", "-L", "4"]) + + # The help says "marked with '`'". + assert "`" in monitor.output diff --git a/tests/test_canerrsim.py b/tests/test_canerrsim.py new file mode 100644 index 0000000..9ba6844 --- /dev/null +++ b/tests/test_canerrsim.py @@ -0,0 +1,164 @@ +# SPDX-License-Identifier: GPL-2.0-only + +import subprocess +import time +import os +import pytest +import signal +import tempfile + +# Note: bin_path and interface fixtures are provided implicitly by conftest.py + +def test_usage_error(bin_path): + """ + Test the behavior when no arguments are provided. + + Description: + The tool requires at least an interface name. Running without arguments + displays the usage text. + Note: The tool exits with code 0 even on missing arguments. + + Manual Reproduction: + Run: ./canerrsim + Expect: Output showing Usage info and exit code 0. + """ + canerrsim = os.path.join(bin_path, "canerrsim") + + if not os.path.exists(canerrsim): + pytest.skip(f"Binary {canerrsim} not found. Please build it first.") + + result = subprocess.run( + [canerrsim], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True + ) + + # Expect success code (0) as per tool implementation + assert result.returncode == 0 + # Expect usage hint in output + assert "Usage: canerrsim" in result.stderr or "Usage: canerrsim" in result.stdout + +def test_basic_can_traffic(bin_path, can_interface): + """ + Sanity check: Verify that standard CAN frames are looped back correctly. + If this fails, vcan is broken or not up. + """ + cansend = os.path.join(bin_path, "cansend") + candump = os.path.join(bin_path, "candump") + + if not os.path.exists(cansend) or not os.path.exists(candump): + pytest.skip("cansend or candump not found.") + + with tempfile.TemporaryFile(mode='w+') as tmp_out: + candump_proc = subprocess.Popen( + [candump, can_interface], + stdout=tmp_out, + stderr=subprocess.PIPE, + text=True + ) + time.sleep(0.2) + + # Send standard frame 123#112233 + subprocess.run([cansend, can_interface, "123#112233"], check=True) + time.sleep(0.2) + + candump_proc.terminate() + candump_proc.wait() + + tmp_out.seek(0) + output = tmp_out.read() + + assert "123" in output and "11 22 33" in output, "Standard CAN frame loopback failed on vcan0" + +@pytest.mark.parametrize("args, grep_for", [ + (["NoAck"], "ERRORFRAME"), + (["Data0=AA", "Data1=BB"], "AA BB"), + (["TxTimeout"], "ERRORFRAME"), +]) +def test_canerrsim_generate_errors(bin_path, can_interface, args, grep_for): + """ + Test generating specific error frames using canerrsim. + + Description: + 1. Start candump in background with error frames enabled (-e). + 2. Run canerrsim with specific error options. + 3. Verify candump received the error frame matching the criteria. + + Feature Detection: + Checks if the environment supports error frame loopback on vcan. + Skips if not supported. + + Manual Reproduction: + 1. Terminal 1: candump -e vcan0 + 2. Terminal 2: ./canerrsim vcan0 + """ + canerrsim = os.path.join(bin_path, "canerrsim") + candump = os.path.join(bin_path, "candump") + + if not os.path.exists(canerrsim): + pytest.skip(f"Binary {canerrsim} not found.") + + # --- Feature Detection Step --- + # Try to verify if we can receive ANY error frame before running specific tests + with tempfile.TemporaryFile(mode='w+') as probe_out: + probe_proc = subprocess.Popen( + [candump, "-e", can_interface], + stdout=probe_out, + stderr=subprocess.PIPE, + text=True + ) + time.sleep(0.2) + # Send a simple error frame + subprocess.run([canerrsim, can_interface, "NoAck"], + stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + time.sleep(0.2) + probe_proc.terminate() + probe_proc.wait() + + probe_out.seek(0) + if "ERRORFRAME" not in probe_out.read(): + pytest.skip("Environment does not support loopback of user-space generated CAN error frames on vcan.") + + # --- Actual Test --- + with tempfile.TemporaryFile(mode='w+') as tmp_out: + # Start candump to capture the error frame + candump_proc = subprocess.Popen( + [candump, "-e", can_interface], + stdout=tmp_out, + stderr=subprocess.PIPE, + text=True + ) + + try: + time.sleep(0.5) + # Run canerrsim + cmd = [canerrsim, can_interface] + args + result = subprocess.run( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + timeout=2 + ) + + assert result.returncode == 0, f"canerrsim failed with args: {args}" + time.sleep(0.5) + + finally: + candump_proc.terminate() + try: + candump_proc.wait(timeout=1) + except subprocess.TimeoutExpired: + candump_proc.kill() + candump_proc.wait() + + # Rewind file to read captured output + tmp_out.seek(0) + stdout = tmp_out.read() + + # Validation + print(f"--- Candump Output ---\n{stdout}") + + found = grep_for in stdout or grep_for.lower() in stdout or grep_for.upper() in stdout + assert found, f"Expected '{grep_for}' in candump output for args {args}" diff --git a/tests/test_canfdtest.py b/tests/test_canfdtest.py new file mode 100644 index 0000000..62e4096 --- /dev/null +++ b/tests/test_canfdtest.py @@ -0,0 +1,105 @@ +# SPDX-License-Identifier: GPL-2.0-only + +import subprocess +import time +import signal +import os +import pytest + +# Note: bin_path and interface fixtures are provided implicitly by conftest.py + +def test_usage_error(bin_path): + """ + Test the behavior when an invalid option is provided. + + Description: + The tool should reject invalid flags (like '-h' which is not implemented + as a success flag in this tool) and return a non-zero exit code. + + Manual Reproduction: + Run: ./canfdtest -h + Expect: Output showing Usage info and exit code != 0. + """ + canfdtest = os.path.join(bin_path, "canfdtest") + + if not os.path.exists(canfdtest): + pytest.skip(f"Binary {canfdtest} not found. Please build it first.") + + result = subprocess.run( + [canfdtest, "-h"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True + ) + + # Expect failure code + assert result.returncode != 0 + # Expect usage hint in output + assert "Usage: canfdtest" in result.stderr or "Usage: canfdtest" in result.stdout + +@pytest.mark.parametrize("extra_args", [ + [], # Standard CAN + ["-d"], # CAN FD + ["-e"], # Extended Frames (29-bit) + ["-b", "-d"], # CAN FD with Bit Rate Switch +]) +def test_full_duplex_communication(bin_path, can_interface, extra_args): + """ + Test full-duplex communication (ping-pong) between two instances. + + Description: + Simulates a DUT (Device Under Test) and a Host on the same interface. + 1. DUT: Runs in background (`canfdtest -v `), echoing frames. + 2. Host: Runs in foreground (`canfdtest -g -v `), generating frames. + 3. Checks if the Host process exits successfully (0). + + Manual Reproduction: + 1. Terminal 1: ./canfdtest -v vcan0 (MUST use same flags -d/-e as Host) + 2. Terminal 2: ./canfdtest -g -v -l 10 vcan0 (add -d or -e as needed) + """ + canfdtest = os.path.join(bin_path, "canfdtest") + + if not os.path.exists(canfdtest): + pytest.skip(f"Binary {canfdtest} not found. Please build it first.") + + # 1. Start DUT (echo server) + # DUT must also receive flags like -d (FD) or -e (Extended) to open the socket correctly + # We use setsid to ensure we can kill the process group later + dut_cmd = [canfdtest, "-v"] + extra_args + [can_interface] + dut_proc = subprocess.Popen( + dut_cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + preexec_fn=os.setsid + ) + + try: + # Give DUT time to initialize socket + time.sleep(0.2) + + # 2. Start Host (generator) + # -g: generate, -l 10: 10 loops + host_cmd = [canfdtest, "-g", "-v", "-l", "10"] + extra_args + [can_interface] + + host_result = subprocess.run( + host_cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + timeout=5 # Test should be very fast + ) + + # 3. Validation + if host_result.returncode != 0: + print(f"--- Host Stdout ---\n{host_result.stdout}") + print(f"--- Host Stderr ---\n{host_result.stderr}") + + assert host_result.returncode == 0, f"Host failed with args: {extra_args}" + # The tool outputs "Test messages sent and received: N" on success, not "Test successful" + assert "Test messages sent and received" in host_result.stdout + + finally: + # 4. Teardown: Kill DUT + if dut_proc.poll() is None: + os.killpg(os.getpgid(dut_proc.pid), signal.SIGTERM) + dut_proc.wait() diff --git a/tests/test_cangen.py b/tests/test_cangen.py new file mode 100644 index 0000000..534d8e3 --- /dev/null +++ b/tests/test_cangen.py @@ -0,0 +1,364 @@ +# SPDX-License-Identifier: GPL-2.0-only +# +# Copyright (c) 2023 Linux CAN project +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License for more details. + +import pytest +import subprocess +import os +import time +import signal +import re + +# --- Helper Functions --- + +def parse_candump_line(line): + """ + Parses a line from 'candump -L' (logging format). + Format: (timestamp) interface ID#DATA or ID##FLAGS_DATA (FD) + Returns a dict with 'id', 'data', 'flags' (for FD) or None if parsing fails. + """ + # Regex for standard CAN: (123.456) vcan0 123#112233 + match_std = re.search(r'\(\d+\.\d+\)\s+\S+\s+([0-9A-Fa-f]+)#([0-9A-Fa-f]*)', line) + if match_std: + return {'id': match_std.group(1), 'data': match_std.group(2), 'type': 'classic'} + + # Regex for CAN FD: (123.456) vcan0 123## + match_fd = re.search(r'\(\d+\.\d+\)\s+\S+\s+([0-9A-Fa-f]+)##([0-9A-Fa-f]+)', line) + if match_fd: + # In log format, data usually follows flags. Not splitting strictly here, + # just capturing the payload block for verification. + return {'id': match_fd.group(1), 'payload_raw': match_fd.group(2), 'type': 'fd'} + + return None + +class TrafficMonitor: + """Context manager to run candump in the background.""" + def __init__(self, bin_path, interface, args=None): + self.cmd = [os.path.join(bin_path, "candump"), "-L", interface] + if args: + self.cmd.extend(args) + self.process = None + self.output = "" + + def __enter__(self): + self.process = subprocess.Popen( + self.cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + preexec_fn=os.setsid + ) + time.sleep(0.2) # Wait for socket bind + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.process: + os.killpg(os.getpgid(self.process.pid), signal.SIGTERM) + out, _ = self.process.communicate() + self.output = out + +# --- Tests for cangen --- + +def test_help_option(bin_path): + """Test -h option: Should print usage and exit with 1.""" + result = subprocess.run([os.path.join(bin_path, "cangen"), "-h"], capture_output=True, text=True) + assert result.returncode == 1 + assert "Usage: cangen" in result.stderr + +def test_count_n(bin_path, can_interface): + """Test -n : Generate exact number of frames.""" + count = 5 + with TrafficMonitor(bin_path, can_interface) as monitor: + subprocess.run( + [os.path.join(bin_path, "cangen"), can_interface, "-n", str(count)], + check=True + ) + + lines = [l for l in monitor.output.splitlines() if can_interface in l] + assert len(lines) == count, f"Expected {count} frames, got {len(lines)}" + +def test_gap_g(bin_path, can_interface): + """Test -g : Gap generation (timing check).""" + # Send 5 frames with 100ms gap -> should take approx 400-500ms + start = time.time() + subprocess.run( + [os.path.join(bin_path, "cangen"), can_interface, "-n", "5", "-g", "100"], + check=True + ) + duration = time.time() - start + # Allow some tolerance (e.g., at least 0.4s for 4 gaps) + assert duration >= 0.4, f"Gap logic too fast: {duration}s" + +def test_absolute_time_a(bin_path, can_interface): + """Test -a: Use absolute time for gap (Functional check).""" + # Difficult to verify timing strictly without real-time analysis, checking for successful execution + result = subprocess.run( + [os.path.join(bin_path, "cangen"), can_interface, "-n", "2", "-g", "10", "-a"], + check=True + ) + assert result.returncode == 0 + +def test_txtime_t(bin_path, can_interface): + """Test -t: Use SO_TXTIME (Functional check).""" + # Requires kernel support, checking for no crash + result = subprocess.run( + [os.path.join(bin_path, "cangen"), can_interface, "-n", "1", "-t"], + capture_output=True + ) + # Don't assert 0 explicitly if kernel might not support it, but vcan usually works. + if result.returncode != 0: + pytest.skip("Kernel might not support SO_TXTIME or config issue") + +def test_start_time_option(bin_path, can_interface): + """Test --start : Start time (Functional check).""" + # Using a timestamp in the past to ensure immediate execution or future for delay + # Just checking argument parsing valid + result = subprocess.run( + [os.path.join(bin_path, "cangen"), can_interface, "-n", "1", "--start", "0"], + check=True + ) + assert result.returncode == 0 + +def test_extended_frames_e(bin_path, can_interface): + """Test -e: Extended frame mode (EFF).""" + with TrafficMonitor(bin_path, can_interface) as monitor: + subprocess.run( + [os.path.join(bin_path, "cangen"), can_interface, "-n", "1", "-e"], + check=True + ) + + parsed = parse_candump_line(monitor.output.strip()) + assert parsed, "No frame captured" + # Extended IDs are usually shown with 8 chars in candump or check length > 3 + # Standard: 3 chars (e.g., 123), Extended: 8 chars (e.g., 12345678) + # Note: cangen generates random IDs. + assert len(parsed['id']) == 8, f"Expected 8-char hex ID for EFF, got {parsed['id']}" + +def test_can_fd_f(bin_path, can_interface): + """Test -f: CAN FD frames.""" + with TrafficMonitor(bin_path, can_interface) as monitor: + subprocess.run( + [os.path.join(bin_path, "cangen"), can_interface, "-n", "1", "-f"], + check=True + ) + + # candump -L for FD uses '##' separator + assert "##" in monitor.output, "CAN FD separator '##' not found in candump output" + +def test_can_fd_brs_b(bin_path, can_interface): + """Test -b: CAN FD with Bitrate Switch (BRS).""" + with TrafficMonitor(bin_path, can_interface) as monitor: + subprocess.run( + [os.path.join(bin_path, "cangen"), can_interface, "-n", "1", "-b"], + check=True + ) + assert "##" in monitor.output + +def test_can_fd_esi_E(bin_path, can_interface): + """Test -E: CAN FD with Error State Indicator (ESI).""" + with TrafficMonitor(bin_path, can_interface) as monitor: + subprocess.run( + [os.path.join(bin_path, "cangen"), can_interface, "-n", "1", "-E"], + check=True + ) + assert "##" in monitor.output + +def test_can_xl_X(bin_path, can_interface): + """Test -X: CAN XL frames.""" + # This might fail on older kernels/interfaces. We skip if return code is error. + result = subprocess.run( + [os.path.join(bin_path, "cangen"), can_interface, "-n", "1", "-X"], + capture_output=True + ) + if result.returncode != 0: + pytest.skip("CAN XL not supported by this environment/kernel") + +def test_rtr_frames_R(bin_path, can_interface): + """Test -R: Remote Transmission Request (RTR).""" + with TrafficMonitor(bin_path, can_interface) as monitor: + subprocess.run( + [os.path.join(bin_path, "cangen"), can_interface, "-n", "1", "-R"], + check=True + ) + + # candump -L often represents RTR with 'R' in the data part or 'remote' flag + # For candump -L: "vcan0 123#R" + assert "#R" in monitor.output, f"RTR flag not found in: {monitor.output}" + +def test_dlc_greater_8_option_8(bin_path, can_interface): + """Test -8: Allow DLC > 8 for Classic CAN.""" + # This generates frames with DLC > 8 but len 8. + # It's a specific protocol violation test. + result = subprocess.run( + [os.path.join(bin_path, "cangen"), can_interface, "-n", "1", "-8"], + check=True + ) + assert result.returncode == 0 + +def test_mix_modes_m(bin_path, can_interface): + """Test -m: Mix CC, FD, XL.""" + result = subprocess.run( + [os.path.join(bin_path, "cangen"), can_interface, "-n", "5", "-m"], + check=True + ) + assert result.returncode == 0 + +def test_id_generation_mode_I(bin_path, can_interface): + """Test -I : ID generation.""" + + # Fixed ID + target_id = "123" + with TrafficMonitor(bin_path, can_interface) as monitor: + subprocess.run( + [os.path.join(bin_path, "cangen"), can_interface, "-n", "1", "-I", target_id], + check=True + ) + parsed = parse_candump_line(monitor.output) + assert parsed['id'] == target_id + + # Increment ID ('i') + # Generate 3 frames, IDs should be consecutive (or incrementing) + with TrafficMonitor(bin_path, can_interface) as monitor: + subprocess.run( + [os.path.join(bin_path, "cangen"), can_interface, "-n", "3", "-I", "i"], + check=True + ) + lines = monitor.output.strip().splitlines() + ids = [int(parse_candump_line(l)['id'], 16) for l in lines] + assert len(ids) == 3 + assert ids[1] == ids[0] + 1 + assert ids[2] == ids[1] + 1 + +def test_dlc_generation_mode_L(bin_path, can_interface): + """Test -L : DLC/Length generation.""" + + # Fixed Length 4 + with TrafficMonitor(bin_path, can_interface) as monitor: + subprocess.run( + [os.path.join(bin_path, "cangen"), can_interface, "-n", "1", "-L", "4"], + check=True + ) + parsed = parse_candump_line(monitor.output) + # Data hex string length should be 2 * DLC (e.g. 4 bytes = 8 chars) + assert len(parsed['data']) == 8 + +def test_data_generation_mode_D(bin_path, can_interface): + """Test -D : Data content generation.""" + + # Fixed Payload + payload = "11223344" + with TrafficMonitor(bin_path, can_interface) as monitor: + subprocess.run( + [os.path.join(bin_path, "cangen"), can_interface, "-n", "1", "-D", payload, "-L", "4"], + check=True + ) + parsed = parse_candump_line(monitor.output) + assert parsed['data'] == payload + + # Increment Payload ('i') + with TrafficMonitor(bin_path, can_interface) as monitor: + subprocess.run( + [os.path.join(bin_path, "cangen"), can_interface, "-n", "2", "-D", "i", "-L", "1"], + check=True + ) + lines = monitor.output.strip().splitlines() + data_bytes = [int(parse_candump_line(l)['data'], 16) for l in lines] + # The first byte should increment (modulo 256) + assert (data_bytes[1] - data_bytes[0]) % 256 == 1 + +def test_disable_loopback_x(bin_path, can_interface): + """Test -x: Disable loopback.""" + # If loopback is disabled, candump on the SAME interface (same socket namespace) + # should NOT receive the frames if it's running on the same host usually. + # Note: On vcan, local loopback is handled by the driver. + # cangen -x sets CAN_RAW_LOOPBACK = 0. + + with TrafficMonitor(bin_path, can_interface) as monitor: + subprocess.run( + [os.path.join(bin_path, "cangen"), can_interface, "-n", "3", "-x"], + check=True + ) + + # We expect output to be empty because candump is a local socket on the same node + # and we disabled loopback for the sender. + assert monitor.output.strip() == "", "candump received frames despite -x (disable loopback)" + +def test_poll_p(bin_path, can_interface): + """Test -p : Poll on ENOBUFS.""" + # Functional test, hard to provoke ENOBUFS on empty vcan + result = subprocess.run( + [os.path.join(bin_path, "cangen"), can_interface, "-n", "1", "-p", "10"], + check=True + ) + assert result.returncode == 0 + +def test_priority_P(bin_path, can_interface): + """Test -P : Set socket priority.""" + result = subprocess.run( + [os.path.join(bin_path, "cangen"), can_interface, "-n", "1", "-P", "5"], + check=True + ) + assert result.returncode == 0 + +def test_ignore_enobufs_i(bin_path, can_interface): + """Test -i: Ignore ENOBUFS.""" + result = subprocess.run( + [os.path.join(bin_path, "cangen"), can_interface, "-n", "1", "-i"], + check=True + ) + assert result.returncode == 0 + +def test_burst_c(bin_path, can_interface): + """Test -c : Burst count.""" + count = 6 + burst = 3 + # Sending 6 frames in bursts of 3. Total should still be 6. + with TrafficMonitor(bin_path, can_interface) as monitor: + subprocess.run( + [os.path.join(bin_path, "cangen"), can_interface, "-n", str(count), "-c", str(burst)], + check=True + ) + lines = [l for l in monitor.output.splitlines() if can_interface in l] + assert len(lines) == count + +def test_verbose_v(bin_path, can_interface): + """Test -v: Verbose output to stdout.""" + # -v prints ascii art/dots, -v -v prints details + result = subprocess.run( + [os.path.join(bin_path, "cangen"), can_interface, "-n", "1", "-v", "-v"], + capture_output=True, + text=True + ) + # Output should contain the Interface and ID + assert can_interface in result.stdout + # The output format is like: " vcan0 737 [3] EF 32 23" + # It does not contain labels like "ID:", "data:", "DLC:". + # Check for the presence of brackets which indicate DLC in this format. + assert "[" in result.stdout and "]" in result.stdout + +def test_random_id_flags(bin_path, can_interface): + """Test ID generation flags: r (random), e (even), o (odd).""" + # Test Even + with TrafficMonitor(bin_path, can_interface) as monitor: + subprocess.run( + [os.path.join(bin_path, "cangen"), can_interface, "-n", "5", "-I", "e"], + check=True + ) + lines = monitor.output.strip().splitlines() + for l in lines: + pid = int(parse_candump_line(l)['id'], 16) + assert pid % 2 == 0, f"ID {pid} is not even" + + # Test Odd + with TrafficMonitor(bin_path, can_interface) as monitor: + subprocess.run( + [os.path.join(bin_path, "cangen"), can_interface, "-n", "5", "-I", "o"], + check=True + ) + lines = monitor.output.strip().splitlines() + for l in lines: + pid = int(parse_candump_line(l)['id'], 16) + assert pid % 2 != 0, f"ID {pid} is not odd" diff --git a/tests/test_canlogserver.py b/tests/test_canlogserver.py new file mode 100644 index 0000000..dd7c3a0 --- /dev/null +++ b/tests/test_canlogserver.py @@ -0,0 +1,286 @@ +# SPDX-License-Identifier: GPL-2.0-only +# +# Copyright (c) 2023 Linux CAN project +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License for more details. + +import pytest +import subprocess +import os +import time +import signal +import socket +import select + +# --- Helper Functions --- + +def get_free_port(): + """Finds a free TCP port on localhost.""" + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + # Bind to port 0 lets the OS choose an available port + s.bind(('localhost', 0)) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + # Get the port chosen by the OS + return s.getsockname()[1] + +class CanLogServerMonitor: + """ + Context manager to run canlogserver. + """ + def __init__(self, bin_path, interface, args=None, port=None): + self.cmd = [os.path.join(bin_path, "canlogserver")] + + # Determine port: use provided one or find a free one + if port is None: + self.port = get_free_port() + else: + self.port = port + + # Explicitly add port argument + self.cmd.extend(["-p", str(self.port)]) + + if args: + self.cmd.extend(args) + + self.cmd.append(interface) + + self.process = None + self.client_socket = None + + def __enter__(self): + # Redirect stderr to DEVNULL to prevent buffer filling deadlocks + # If the server writes too much to stderr and we don't read it, it hangs. + self.process = subprocess.Popen( + self.cmd, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + text=True, + preexec_fn=os.setsid + ) + time.sleep(0.5) # Wait for server startup + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.client_socket: + try: + self.client_socket.close() + except OSError: + pass + + if self.process: + # Check if process is still running before attempting to kill + if self.process.poll() is None: + try: + # Send SIGTERM to the process group + os.killpg(os.getpgid(self.process.pid), signal.SIGTERM) + # Wait with timeout to avoid infinite hang + self.process.wait(timeout=2) + except (ProcessLookupError, subprocess.TimeoutExpired): + # Force kill if it doesn't exit nicely + try: + os.killpg(os.getpgid(self.process.pid), signal.SIGKILL) + self.process.wait(timeout=1) + except (ProcessLookupError, subprocess.TimeoutExpired): + pass + + def connect(self): + """Connects to the canlogserver via TCP.""" + retries = 10 + for i in range(retries): + try: + self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.client_socket.connect(('localhost', self.port)) + return True + except ConnectionRefusedError: + time.sleep(0.2) + return False + + def read_data(self, timeout=2.0): + """Reads data from the TCP socket until data arrives or timeout.""" + if not self.client_socket: + return "" + + start_time = time.time() + received_data = "" + + while time.time() - start_time < timeout: + try: + ready = select.select([self.client_socket], [], [], 0.1) + if ready[0]: + chunk = self.client_socket.recv(4096).decode('utf-8', errors='ignore') + if chunk: + received_data += chunk + if len(chunk) > 0: + time.sleep(0.1) + ready_more = select.select([self.client_socket], [], [], 0) + if ready_more[0]: + received_data += self.client_socket.recv(4096).decode('utf-8', errors='ignore') + break + else: + # Connection closed + break + except OSError: + break + + return received_data + +def run_cansend(bin_path, interface, frame): + """Executes cansend.""" + subprocess.run([os.path.join(bin_path, "cansend"), interface, frame], check=True) + +# --- Tests for canlogserver --- + +def test_help_option(bin_path): + """ + Tests the help option '-h'. + Manual reproduction: $ ./canlogserver -h + """ + result = subprocess.run([os.path.join(bin_path, "canlogserver"), "-h"], capture_output=True, text=True) + assert "Usage: canlogserver" in result.stderr or "Usage: canlogserver" in result.stdout + +def test_basic_logging(bin_path, can_interface): + """ + Tests basic logging functionality. + Manual reproduction: + $ ./canlogserver vcan0 & + $ nc localhost 28700 + $ ./cansend vcan0 123#11 + """ + with CanLogServerMonitor(bin_path, can_interface) as server: + assert server.connect(), f"Could not connect to canlogserver on port {server.port}" + run_cansend(bin_path, can_interface, "123#112233") + data = server.read_data(timeout=2.0) + assert "123" in data + assert "112233" in data + +def test_custom_port(bin_path, can_interface): + """ + Tests the custom port option '-p'. + Manual reproduction: ./canlogserver -p 28701 vcan0 + """ + custom_port = get_free_port() + with CanLogServerMonitor(bin_path, can_interface, port=custom_port) as server: + assert server.connect(), f"Could not connect to canlogserver on port {custom_port}" + run_cansend(bin_path, can_interface, "456#AA") + data = server.read_data(timeout=2.0) + assert "456" in data + +def test_id_filter_mask_value(bin_path, can_interface): + """ + Tests ID filtering with mask (-m) and value (-v). + Manual reproduction: ./canlogserver -m 0x7FF -v 0x123 vcan0 + """ + args = ["-m", "0x7FF", "-v", "0x123"] + with CanLogServerMonitor(bin_path, can_interface, args=args) as server: + assert server.connect() + run_cansend(bin_path, can_interface, "123#11") + data = server.read_data(timeout=2.0) + assert "123" in data + + run_cansend(bin_path, can_interface, "456#22") + data = server.read_data(timeout=1.0) + assert "456" not in data + +def test_id_filter_invert(bin_path, can_interface): + """ + Tests the inverted filter option '-i'. + Manual reproduction: ./canlogserver -m 0x7FF -v 0x123 -i 1 vcan0 + """ + args = ["-m", "0x7FF", "-v", "0x123", "-i", "1"] + with CanLogServerMonitor(bin_path, can_interface, args=args) as server: + assert server.connect() + run_cansend(bin_path, can_interface, "123#11") + data = server.read_data(timeout=1.0) + assert "123" not in data + + run_cansend(bin_path, can_interface, "456#22") + data = server.read_data(timeout=2.0) + assert "456" in data + +def test_multiple_interfaces(bin_path, can_interface): + """ + Tests passing multiple interfaces. + Manual reproduction: ./canlogserver vcan0 vcan0 + """ + with CanLogServerMonitor(bin_path, can_interface, args=[can_interface]) as server: + assert server.connect(), "Could not connect to canlogserver with multiple interfaces" + +def test_error_frame_mask(bin_path, can_interface): + """ + Tests the error frame mask option '-e'. + Manual reproduction: ./canlogserver -e 0xFFFFFFFF vcan0 + """ + args = ["-e", "0xFFFFFFFF"] + with CanLogServerMonitor(bin_path, can_interface, args=args) as server: + assert server.connect() + run_cansend(bin_path, can_interface, "123#11") + data = server.read_data(timeout=2.0) + assert "123" in data + +def test_signal_sigint_shutdown(bin_path, can_interface): + """ + Tests the server shutdown on SIGINT (Ctrl-C) during 'accept()'. + + Verifies that the server terminates correctly when receiving SIGINT while + waiting for a client connection. + + Manual reproduction: + 1. Start server: $ ./canlogserver vcan0 + 2. Press Ctrl-C (send SIGINT). + 3. Check exit code (should be 130). + """ + with CanLogServerMonitor(bin_path, can_interface) as server: + assert server.process.poll() is None + try: + os.killpg(os.getpgid(server.process.pid), signal.SIGINT) + except ProcessLookupError: + pytest.fail("Server process not found for SIGINT") + + try: + server.process.wait(timeout=2.0) + except subprocess.TimeoutExpired: + pytest.fail("Server did not exit on SIGINT (Infinite Accept Loop Bug)") + + assert server.process.returncode == 130 + +def test_bind_retry_shutdown(bin_path, can_interface): + """ + Tests shutdown behavior when the server is stuck in the bind retry loop. + + This reproduces the bug where the server would print dots endlessly + and ignore SIGINT if the port was already in use. + + Manual reproduction: + 1. Open a listening socket on port 28700: $ nc -l 28700 & + 2. Start server: $ ./canlogserver -p 28700 vcan0 + -> Server prints dots ............ + 3. Press Ctrl-C. + -> Server should exit immediately, not hang. + """ + # 1. Occupy a port locally + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(('localhost', 0)) + s.listen(1) + busy_port = s.getsockname()[1] + + # 2. Start canlogserver trying to bind to the same port + with CanLogServerMonitor(bin_path, can_interface, port=busy_port) as server: + # Allow time for server to start and enter the retry loop + time.sleep(1.0) + + # 3. Send SIGINT + # The server should be stuck in the bind loop printing dots. + try: + os.killpg(os.getpgid(server.process.pid), signal.SIGINT) + except ProcessLookupError: + pytest.fail("Server process died unexpectedly") + + # 4. Wait for graceful exit + try: + server.process.wait(timeout=2.0) + except subprocess.TimeoutExpired: + pytest.fail("Server hung in bind retry loop (Infinite Bind Loop Bug)") + + # Check expected exit code (128 + 2 = 130) + assert server.process.returncode == 130 diff --git a/tests/test_canplayer.py b/tests/test_canplayer.py new file mode 100644 index 0000000..082508e --- /dev/null +++ b/tests/test_canplayer.py @@ -0,0 +1,245 @@ +# SPDX-License-Identifier: GPL-2.0-only +# +# Copyright (c) 2023 Linux CAN project +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License for more details. + +import pytest +import subprocess +import os +import time +import signal +import re + +# --- Helper Functions --- + +class TrafficMonitor: + """ + Context manager to run candump in the background. + Captures the bus traffic to verify what canplayer sends. + """ + def __init__(self, bin_path, interface, args=None): + self.cmd = [os.path.join(bin_path, "candump"), "-L", interface] + if args: + self.cmd.extend(args) + self.process = None + self.output = "" + + def __enter__(self): + self.process = subprocess.Popen( + self.cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + preexec_fn=os.setsid + ) + # Give candump a moment to bind to the socket + time.sleep(0.2) + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.process: + os.killpg(os.getpgid(self.process.pid), signal.SIGTERM) + self.output, _ = self.process.communicate() + +def create_logfile(path, content): + """Creates a temporary CAN log file.""" + with open(path, 'w') as f: + f.write(content) + return path + +# --- Tests for canplayer --- + +def test_help_option(bin_path): + """Test -h option: Should print usage.""" + # canplayer might exit with 0 or 1, check output primarily + result = subprocess.run([os.path.join(bin_path, "canplayer"), "-h"], capture_output=True, text=True) + assert "Usage: canplayer" in result.stdout or "Usage: canplayer" in result.stderr + +def test_replay_file(bin_path, can_interface, tmp_path): + """Test -I: Replay a simple log file.""" + log_content = f"(1600000000.000000) {can_interface} 123#112233\n" + logfile = create_logfile(os.path.join(tmp_path, "test.log"), log_content) + + with TrafficMonitor(bin_path, can_interface) as monitor: + subprocess.run( + [os.path.join(bin_path, "canplayer"), "-I", logfile], + check=True + ) + + assert "123#112233" in monitor.output + +def test_stdin_input(bin_path, can_interface): + """Test input via stdin (default behavior without -I).""" + log_content = f"(1600000000.000000) {can_interface} 123#AABBCC\n" + + with TrafficMonitor(bin_path, can_interface) as monitor: + subprocess.run( + [os.path.join(bin_path, "canplayer")], + input=log_content, + text=True, + check=True + ) + + assert "123#AABBCC" in monitor.output + +def test_interface_mapping(bin_path, can_interface, tmp_path): + """Test interface assignment (e.g., vcan0=can99).""" + # Log file contains 'can99', but we map it to our real 'can_interface' + fake_iface = "can99" + log_content = f"(1600000000.000000) {fake_iface} 123#DEADBEEF\n" + logfile = create_logfile(os.path.join(tmp_path, "test.log"), log_content) + + # Assignment syntax: dest=src (send frames received from src on dest) + # We want to send ON can_interface frames that came FROM fake_iface + mapping = f"{can_interface}={fake_iface}" + + with TrafficMonitor(bin_path, can_interface) as monitor: + subprocess.run( + [os.path.join(bin_path, "canplayer"), "-I", logfile, mapping], + check=True + ) + + # Monitor should see it on can_interface + assert "123#DEADBEEF" in monitor.output + +def test_loop_l(bin_path, can_interface, tmp_path): + """Test -l : Loop playback.""" + log_content = f"(1600000000.000000) {can_interface} 123#01\n" + logfile = create_logfile(os.path.join(tmp_path, "test.log"), log_content) + + count = 3 + with TrafficMonitor(bin_path, can_interface) as monitor: + subprocess.run( + [os.path.join(bin_path, "canplayer"), "-I", logfile, "-l", str(count)], + check=True + ) + + # Should appear 3 times + occurrences = monitor.output.count("123#01") + assert occurrences == count + +def test_ignore_timestamps_t(bin_path, can_interface, tmp_path): + """Test -t: Ignore timestamps (send immediately).""" + # Create log with 2 seconds delay between frames + # If -t works, execution should be instant, not taking >2 seconds + log_content = ( + f"(100.000000) {can_interface} 123#01\n" + f"(102.000000) {can_interface} 123#02\n" + ) + logfile = create_logfile(os.path.join(tmp_path, "test.log"), log_content) + + start = time.time() + subprocess.run( + [os.path.join(bin_path, "canplayer"), "-I", logfile, "-t"], + check=True + ) + duration = time.time() - start + + # Should be very fast, definitely under 1 second + assert duration < 1.0 + +def test_skip_gaps_s(bin_path, can_interface, tmp_path): + """Test -s : Skip gaps in timestamps > 's' seconds.""" + # Gap of 2 seconds in log file + log_content = ( + f"(100.000000) {can_interface} 123#01\n" + f"(102.000000) {can_interface} 123#02\n" + ) + logfile = create_logfile(os.path.join(tmp_path, "test.log"), log_content) + + # Tell canplayer to skip gaps > 1s (-s 1) + start = time.time() + subprocess.run( + [os.path.join(bin_path, "canplayer"), "-I", logfile, "-s", "1"], + check=True + ) + duration = time.time() - start + + # Should skip the 2s wait + assert duration < 1.5 + +def test_terminate_n(bin_path, can_interface, tmp_path): + """Test -n : Terminate after sending count frames.""" + # Timestamps fixed to 6 decimal places (microseconds) + log_content = ( + f"(100.000000) {can_interface} 123#01\n" + f"(100.001000) {can_interface} 123#02\n" + f"(100.002000) {can_interface} 123#03\n" + ) + logfile = create_logfile(os.path.join(tmp_path, "test.log"), log_content) + + # Process only 2 frames + with TrafficMonitor(bin_path, can_interface) as monitor: + subprocess.run( + [os.path.join(bin_path, "canplayer"), "-I", logfile, "-n", "2"], + check=True + ) + + assert "123#01" in monitor.output + assert "123#02" in monitor.output + assert "123#03" not in monitor.output + +def test_verbose_v(bin_path, can_interface, tmp_path): + """Test -v: Verbose output.""" + log_content = f"(1600000000.000000) {can_interface} 123#11\n" + logfile = create_logfile(os.path.join(tmp_path, "test.log"), log_content) + + result = subprocess.run( + [os.path.join(bin_path, "canplayer"), "-I", logfile, "-v"], + capture_output=True, + text=True, + check=True + ) + + # Verbose mode usually prints the frame being sent to stdout + assert "123" in result.stdout and "11" in result.stdout + +def test_disable_loopback_x(bin_path, can_interface, tmp_path): + """Test -x: Disable local loopback.""" + log_content = f"(1600000000.000000) {can_interface} 123#FF\n" + logfile = create_logfile(os.path.join(tmp_path, "test.log"), log_content) + + # If loopback is disabled, a local candump should NOT see the frame + with TrafficMonitor(bin_path, can_interface) as monitor: + subprocess.run( + [os.path.join(bin_path, "canplayer"), "-I", logfile, "-x"], + check=True + ) + + # Expect empty output (or at least the frame shouldn't be there) + assert "123#FF" not in monitor.output + +def test_gap_g(bin_path, can_interface, tmp_path): + """Test -g : Gap generation (functional check).""" + # -g adds a fixed gap. Difficult to measure precisely without affecting test stability, + # but we can check it runs successfully. + # Timestamps fixed to 6 decimal places + log_content = f"(100.000000) {can_interface} 123#11\n(100.000000) {can_interface} 123#22\n" + logfile = create_logfile(os.path.join(tmp_path, "test.log"), log_content) + + subprocess.run( + [os.path.join(bin_path, "canplayer"), "-I", logfile, "-g", "10"], + check=True + ) + +def test_parsing_bad_lines(bin_path, can_interface, tmp_path): + """Test that lines not starting with '(' are ignored.""" + # Timestamps fixed to 6 decimal places + log_content = ( + "This is a comment line\n" + f"(100.000000) {can_interface} 123#AA\n" + "Another invalid line\n" + ) + logfile = create_logfile(os.path.join(tmp_path, "test.log"), log_content) + + with TrafficMonitor(bin_path, can_interface) as monitor: + subprocess.run( + [os.path.join(bin_path, "canplayer"), "-I", logfile], + check=True + ) + + # Should only process the valid frame + assert "123#AA" in monitor.output + # Ensure it didn't crash or error on invalid lines (return code check is implicit in check=True) diff --git a/tests/test_cansend.py b/tests/test_cansend.py new file mode 100644 index 0000000..bf1153b --- /dev/null +++ b/tests/test_cansend.py @@ -0,0 +1,205 @@ +# SPDX-License-Identifier: GPL-2.0-only +# +# Copyright (c) 2023 Linux CAN project +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License for more details. + +import pytest +import subprocess +import os +import time +import signal +import re + +# --- Helper Functions --- + +class TrafficMonitor: + """ + Context manager to run candump in the background. + Captures the bus traffic to verify what cansend sends. + """ + def __init__(self, bin_path, interface, args=None): + self.cmd = [os.path.join(bin_path, "candump"), "-L", interface] + if args: + self.cmd.extend(args) + self.process = None + self.output = "" + + def __enter__(self): + self.process = subprocess.Popen( + self.cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + preexec_fn=os.setsid + ) + # Give candump a moment to bind to the socket + time.sleep(0.2) + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.process: + os.killpg(os.getpgid(self.process.pid), signal.SIGTERM) + self.output, _ = self.process.communicate() + +def run_cansend(bin_path, interface, frame): + """Executes cansend with the given frame string.""" + cmd = [os.path.join(bin_path, "cansend"), interface, frame] + subprocess.run(cmd, check=True) + +# --- Tests for cansend --- + +def test_help_option(bin_path): + """Test -h option: Should print usage.""" + # cansend might exit with 1 on help (since it expects args), check output + result = subprocess.run([os.path.join(bin_path, "cansend"), "-h"], capture_output=True, text=True) + # The output provided by user shows usage on stdout or stderr + assert "Usage: " in result.stdout or "Usage: " in result.stderr + assert "cansend" in result.stdout or "cansend" in result.stderr + +def test_classic_can_simple(bin_path, can_interface): + """Test standard frame: #{data}""" + # 123#DEADBEEF + frame = "123#DEADBEEF" + with TrafficMonitor(bin_path, can_interface) as monitor: + run_cansend(bin_path, can_interface, frame) + + assert "123#DEADBEEF" in monitor.output + +def test_classic_can_dots(bin_path, can_interface): + """Test data with dots: 5A1#11.2233.44556677.88""" + frame_input = "5A1#11.2233.44556677.88" + # candump -L output will be without dots: 5A1#1122334455667788 + expected = "5A1#1122334455667788" + + with TrafficMonitor(bin_path, can_interface) as monitor: + run_cansend(bin_path, can_interface, frame_input) + + assert expected in monitor.output + +def test_classic_can_no_data(bin_path, can_interface): + """Test frame with no data: 5AA#""" + frame = "5AA#" + with TrafficMonitor(bin_path, can_interface) as monitor: + run_cansend(bin_path, can_interface, frame) + + # Expect ID with empty data. candump -L format: 5AA# (or 5AA# with nothing following) + # Regex ensures 5AA# is at end of line or followed by space + assert re.search(r'5AA#(\s|$)', monitor.output) + +def test_classic_can_extended_id(bin_path, can_interface): + """Test Extended Frame Format (EFF): 1F334455#11...""" + frame = "1F334455#112233" + with TrafficMonitor(bin_path, can_interface) as monitor: + run_cansend(bin_path, can_interface, frame) + + assert "1F334455#112233" in monitor.output + +def test_classic_can_rtr(bin_path, can_interface): + """Test RTR frame: #R""" + frame = "123#R" + with TrafficMonitor(bin_path, can_interface) as monitor: + run_cansend(bin_path, can_interface, frame) + + # candump -L typically represents RTR via 'R' in output or specific formatting + # e.g., 123#R + assert "123#R" in monitor.output + +def test_classic_can_rtr_len(bin_path, can_interface): + """Test RTR with length: #R{len}""" + # 00000123#R3 -> ID 123 (EFF padded?), len 3 + # Note: 00000123 is 8 chars -> EFF. + frame = "00000123#R3" + with TrafficMonitor(bin_path, can_interface) as monitor: + run_cansend(bin_path, can_interface, frame) + + # Check for EFF ID and RTR marker. + # candump -L output for RTR might differ slightly depending on version, + # but usually preserves the #R syntax if length matches. + assert "00000123#R" in monitor.output + +def test_classic_can_explicit_dlc(bin_path, can_interface): + """Test explicit DLC: #{data}_{dlc}""" + # 1F334455#1122334455667788_B (DLC 11) + # This sets the DLC field > 8, but payload is 8 bytes. + # Classic CAN allows DLC > 8 (interpreted as 8 usually, but passed on wire). + frame = "1F334455#1122334455667788_B" + + # We need candump to show the DLC to verify this, or just check it sends successfully. + # Standard candump -L doesn't always show DLC > 8 unless -8 is used. + # But here we verify 'cansend' doesn't crash and sends *something*. + with TrafficMonitor(bin_path, can_interface) as monitor: + run_cansend(bin_path, can_interface, frame) + + assert "1F334455#1122334455667788" in monitor.output + +def test_classic_can_rtr_len_dlc(bin_path, can_interface): + """Test RTR with len and DLC: #R{len}_{dlc}""" + # 333#R8_E (Len 8, DLC 14) + frame = "333#R8_E" + with TrafficMonitor(bin_path, can_interface) as monitor: + run_cansend(bin_path, can_interface, frame) + + assert "333#R" in monitor.output + +def test_can_fd_flags_only(bin_path, can_interface): + """Test CAN FD flags only: ##""" + # 123##1 (Flags: 1 = BRS) + frame = "123##1" + with TrafficMonitor(bin_path, can_interface) as monitor: + run_cansend(bin_path, can_interface, frame) + + # candump -L for FD uses ## + # Output should contain 123##1 + # Note: Kernel/Driver often adds CANFD_FDF (0x04) flag automatically, + # so we might see 5 (1|4) instead of 1. + assert "123##1" in monitor.output or "123##5" in monitor.output + +def test_can_fd_flags_data(bin_path, can_interface): + """Test CAN FD with flags and data: ##{data}""" + # 213##311223344 (Flags 3, Data 11223344) + # Flags 3 = BRS (1) | ESI (2) + frame = "213##311223344" + with TrafficMonitor(bin_path, can_interface) as monitor: + run_cansend(bin_path, can_interface, frame) + + # Note: Kernel/Driver often adds CANFD_FDF (0x04) flag automatically, + # so we might see 7 (3|4) instead of 3. + assert "213##311223344" in monitor.output or "213##711223344" in monitor.output + +def test_can_xl_frame(bin_path, can_interface): + """Test CAN XL frame (if supported).""" + # Example: 45123#81:00:12345678#11223344.556677 + # Prio: 123, Flags: 81, SDT: 00, AF: 12345678, Data... + # Note: VCID is separate? The example in help says: + # #... -> 45123 means VCID 45, Prio 123? + frame = "45123#81:00:12345678#11223344556677" + + # This might fail on kernels without CAN XL support. + # We wrap in try/except or check return code to skip. + try: + with TrafficMonitor(bin_path, can_interface) as monitor: + run_cansend(bin_path, can_interface, frame) + + # If it worked, check output. XL output format in candump might vary. + # But we expect the hex data to appear. + assert "11223344556677" in monitor.output + except subprocess.CalledProcessError: + pytest.skip("CAN XL frame sending failed (kernel support missing?)") + +def test_invalid_syntax(bin_path, can_interface): + """Test invalid syntax handling.""" + # Missing separator + frame = "123DEADBEEF" + cmd = [os.path.join(bin_path, "cansend"), can_interface, frame] + + # Should exit with error code + result = subprocess.run(cmd, capture_output=True, text=True) + assert result.returncode != 0 + +def test_missing_args(bin_path): + """Test missing arguments.""" + cmd = [os.path.join(bin_path, "cansend")] + result = subprocess.run(cmd, capture_output=True, text=True) + assert result.returncode != 0 diff --git a/tests/test_cansequence.py b/tests/test_cansequence.py new file mode 100644 index 0000000..ce0d184 --- /dev/null +++ b/tests/test_cansequence.py @@ -0,0 +1,192 @@ +# SPDX-License-Identifier: GPL-2.0-only +# +# Copyright (c) 2023 Linux CAN project +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License for more details. + +import pytest +import subprocess +import os +import time +import signal +import re + +# --- Helper Functions --- + +class TrafficMonitor: + """ + Context manager to run candump in the background. + Captures the bus traffic to verify what cansequence sends. + """ + def __init__(self, bin_path, interface, args=None): + self.cmd = [os.path.join(bin_path, "candump"), "-L", interface] + if args: + self.cmd.extend(args) + self.process = None + self.output = "" + + def __enter__(self): + self.process = subprocess.Popen( + self.cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + preexec_fn=os.setsid + ) + time.sleep(0.2) + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.process: + os.killpg(os.getpgid(self.process.pid), signal.SIGTERM) + self.output, _ = self.process.communicate() + +def run_cansequence_sender(bin_path, interface, count=None, args=None): + """Runs cansequence in sender mode.""" + cmd = [os.path.join(bin_path, "cansequence"), interface] + if count is not None: + cmd.append(f"--loop={count}") + if args: + cmd.extend(args) + + subprocess.run(cmd, check=True) + +class CanSequenceReceiver: + """Context manager for cansequence receiver.""" + def __init__(self, bin_path, interface, args=None): + self.cmd = [os.path.join(bin_path, "cansequence"), interface, "--receive"] + if args: + self.cmd.extend(args) + self.process = None + self.stdout = "" + self.stderr = "" + + def __enter__(self): + self.process = subprocess.Popen( + self.cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + preexec_fn=os.setsid + ) + time.sleep(0.2) + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.process: + if self.process.poll() is None: + os.killpg(os.getpgid(self.process.pid), signal.SIGTERM) + self.stdout, self.stderr = self.process.communicate() + +# --- Tests for cansequence --- + +def test_help_option(bin_path): + """Test -h option.""" + result = subprocess.run([os.path.join(bin_path, "cansequence"), "-h"], capture_output=True, text=True) + assert "Usage: cansequence" in result.stdout or "Usage: cansequence" in result.stderr + +def test_sender_payload_increment(bin_path, can_interface): + """ + Test 1: Sender + Candump. + Verify that cansequence sends frames with incrementing payload. + """ + count = 5 + with TrafficMonitor(bin_path, can_interface) as monitor: + run_cansequence_sender(bin_path, can_interface, count=count) + + # Parse output to find payloads + # candump format: (timestamp) iface ID#DATA + # cansequence default ID is 2. Payload is sequence number (hex). + # e.g. 00, 01, 02... + + lines = monitor.output.strip().splitlines() + assert len(lines) == count + + # Extract data bytes. Assuming standard classic CAN frame. + # Regex to capture the first byte of data: ID#... + data_bytes = [] + for line in lines: + match = re.search(r'#([0-9A-Fa-f]{2})', line) + if match: + data_bytes.append(int(match.group(1), 16)) + + # Verify increment + for i in range(len(data_bytes)): + assert data_bytes[i] == i % 256, f"Payload mismatch at index {i}" + +def test_sender_receiver_success(bin_path, can_interface): + """ + Test 2: Sender + Receiver. + Verify that receiver accepts the stream from sender without error. + """ + # Start receiver + with CanSequenceReceiver(bin_path, can_interface, args=["-v"]) as receiver: + # Run sender + run_cansequence_sender(bin_path, can_interface, count=10) + + # Give receiver a moment to process + time.sleep(0.5) + + # Receiver should NOT have exited with error (if we didn't use -q) + # and shouldn't have printed "sequence mismatch" errors. + # Note: cansequence prints to stderr usually on error. + + assert "sequence number mismatch" not in receiver.stderr + assert "sequence number mismatch" not in receiver.stdout + +def test_receiver_detects_error(bin_path, can_interface): + """ + Test 3: Cansend (Injection) + Receiver. + Verify receiver detects missing sequence number. + """ + # Use -q 1 to quit immediately on first error + with CanSequenceReceiver(bin_path, can_interface, args=["--quit", "1"]) as receiver: + + # Manually send sequence 0 (valid) + # Sequence number is usually little endian integer in payload. + # ID 2 is default for cansequence. + subprocess.run([os.path.join(bin_path, "cansend"), can_interface, "002#00"], check=True) + time.sleep(0.1) + + # Manually send sequence 2 (skipping 1) -> Invalid! + subprocess.run([os.path.join(bin_path, "cansend"), can_interface, "002#02"], check=True) + time.sleep(0.5) + + # Check if receiver exited + assert receiver.process.poll() is not None, "Receiver did not exit on sequence error" + assert receiver.process.returncode != 0 + +def test_extended_id(bin_path, can_interface): + """Test sending extended frames (-e).""" + with TrafficMonitor(bin_path, can_interface) as monitor: + # Send 1 frame, extended mode + run_cansequence_sender(bin_path, can_interface, count=1, args=["-e"]) + + # Output should contain 8-character ID (padded) or match extended syntax. + # Default ID is 2, so extended is usually 00000002. + assert "00000002#" in monitor.output or "2#" in monitor.output + # Depending on candump formatting, we might verify extended flag logic if needed, + # but existence of traffic is the main check here. + +def test_custom_identifier(bin_path, can_interface): + """Test custom CAN ID (-i).""" + custom_id = "0x123" # Hex string ensures strtoul interprets as hex + with TrafficMonitor(bin_path, can_interface) as monitor: + run_cansequence_sender(bin_path, can_interface, count=1, args=["-i", custom_id]) + + # Check for ID 123 + assert "123#" in monitor.output + +def test_can_fd_mode(bin_path, can_interface): + """Test CAN-FD mode (-f).""" + # Requires hardware/vcan support for FD + try: + with TrafficMonitor(bin_path, can_interface) as monitor: + run_cansequence_sender(bin_path, can_interface, count=1, args=["-f"]) + + # FD frames usually appear with ## in candump -L (if not strict classic view) + # or we verify the output exists. + assert "##" in monitor.output or "#" in monitor.output + except subprocess.CalledProcessError: + pytest.skip("CAN-FD not supported or failed") diff --git a/tests/test_cansniffer.py b/tests/test_cansniffer.py new file mode 100644 index 0000000..586cdb0 --- /dev/null +++ b/tests/test_cansniffer.py @@ -0,0 +1,252 @@ +# SPDX-License-Identifier: GPL-2.0-only +# +# Copyright (c) 2023 Linux CAN project +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License for more details. + +import pytest +import subprocess +import os +import time +import signal +import select +import shutil +import re + +# --- Helper Functions --- + +class CanSnifferMonitor: + """ + Context manager to run cansniffer. + Captures stdout/stderr non-blocking way. + """ + def __init__(self, bin_path, interface, args=None): + # CRITICAL: Use stdbuf -o0 to force unbuffered stdout. + # cansniffer uses printf, which is fully buffered when writing to a pipe. + # Without stdbuf, we would detect empty output until 4KB of data accumulates. + if not shutil.which("stdbuf"): + pytest.fail("stdbuf utility (coreutils) is required for testing cansniffer TUI") + + self.cmd = ["stdbuf", "-o0", os.path.join(bin_path, "cansniffer"), interface] + if args: + self.cmd.extend(args) + self.process = None + self.output_buffer = "" + + def __enter__(self): + # We need bufsize=0 or unbuffered to get output in real-time + self.process = subprocess.Popen( + self.cmd, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + bufsize=0, + preexec_fn=os.setsid + ) + time.sleep(0.5) # Wait for initialization (clearing screen etc) + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.process: + try: + os.killpg(os.getpgid(self.process.pid), signal.SIGTERM) + except ProcessLookupError: + pass + + def send_input(self, text): + """Sends commands to cansniffer stdin.""" + if self.process and self.process.stdin: + self.process.stdin.write(text) + self.process.stdin.flush() + time.sleep(0.5) # Allow processing time (increased for stability) + + def read_output(self, timeout=1.0): + """ + Reads available output from stdout without blocking forever. + """ + collected = "" + start = time.time() + while time.time() - start < timeout: + # Check if there is data to read + reads = [self.process.stdout.fileno()] + ret = select.select(reads, [], [], 0.1) + + if ret[0]: + # Read chunks + chunk = self.process.stdout.read(1024) + if not chunk: + break + collected += chunk + else: + # If we have collected something and no new data is coming fast, break early + # to speed up tests, unless we are waiting for specific timeout + if collected and (time.time() - start > 0.3): + break + + self.output_buffer += collected + return collected + + def clear_buffer(self): + """Discards currently available output.""" + self.read_output(timeout=0.2) + +# --- Tests for cansniffer --- + +def test_help_option(bin_path): + """Test -? option (cansniffer uses -? for help).""" + # Note: The help text says '-?' prints help. + result = subprocess.run([os.path.join(bin_path, "cansniffer"), "-?"], capture_output=True, text=True) + # cansniffer usually returns error code because ? is invalid opt for standard parsers + assert "Usage: cansniffer" in result.stdout or "Usage: cansniffer" in result.stderr + +def test_basic_sniffing(bin_path, can_interface): + """Test if cansniffer shows traffic.""" + with CanSnifferMonitor(bin_path, can_interface) as sniffer: + # Generate traffic: ID 123 + subprocess.run([os.path.join(bin_path, "cansend"), can_interface, "123#112233"], check=True) + + # Read output + out = sniffer.read_output(timeout=2.0) + + # Check if 123 appears. + assert "123" in out + +def test_filtering_add(bin_path, can_interface): + """Test adding specific ID filter (+ID).""" + # Start with -q (quiet, all IDs deactivated) + with CanSnifferMonitor(bin_path, can_interface, args=["-q"]) as sniffer: + # 1. Send ID 123 -> Should NOT appear (quiet mode) + subprocess.run([os.path.join(bin_path, "cansend"), can_interface, "123#AA"], check=True) + out = sniffer.read_output(timeout=1.0) + assert "123" not in out + + # 2. Add filter +123 via stdin + sniffer.send_input("+123\n") + + # 3. Send ID 123 again -> Should appear now + subprocess.run([os.path.join(bin_path, "cansend"), can_interface, "123#BB"], check=True) + out = sniffer.read_output(timeout=2.0) + assert "123" in out + +def test_filtering_remove(bin_path, can_interface): + """Test removing specific ID filter (-ID).""" + # Start normal mode (sniffs all) + with CanSnifferMonitor(bin_path, can_interface) as sniffer: + # 1. Send ID 456 -> Should appear + subprocess.run([os.path.join(bin_path, "cansend"), can_interface, "456#11"], check=True) + out = sniffer.read_output(timeout=1.0) + assert "456" in out + + # 2. Remove 456 via stdin + sniffer.send_input("-456\n") + + # Clear buffer to ignore previous output about 456 + sniffer.clear_buffer() + + # Send a different ID to force a screen refresh / activity on a visible ID + # This ensures cansniffer produces NEW output + subprocess.run([os.path.join(bin_path, "cansend"), can_interface, "789#AA"], check=True) + time.sleep(0.2) + + # 3. Send ID 456 again with NEW unique data (CC) + # Using CC is safer than 22 as numbers might appear in timestamps + subprocess.run([os.path.join(bin_path, "cansend"), can_interface, "456#CC"], check=True) + + # Read subsequent output + out = sniffer.read_output(timeout=1.5) + + # Verify 789 is there (sanity check that we captured output) + assert "789" in out + + # Verify 456 (and specifically its new data CC) is NOT in the new block. + assert "CC" not in out + +def test_binary_mode_b(bin_path, can_interface): + """Test binary mode output (-b).""" + with CanSnifferMonitor(bin_path, can_interface, args=["-b"]) as sniffer: + subprocess.run([os.path.join(bin_path, "cansend"), can_interface, "123#DEADBEEF"], check=True) + out = sniffer.read_output(timeout=1.0) + # In binary mode (-b), data bytes are often visualized as bits/dots. + # But the ID "123" should still be visible. + assert "123" in out + +def test_color_mode_c(bin_path, can_interface): + """Test color mode (-c).""" + # Color output depends on terminal capabilities usually, but cansniffer -c forces it. + with CanSnifferMonitor(bin_path, can_interface, args=["-c"]) as sniffer: + subprocess.run([os.path.join(bin_path, "cansend"), can_interface, "123#11"], check=True) + out = sniffer.read_output(timeout=1.0) + # Color mode uses ANSI escape sequences (e.g. \033[...m) + # Note: In Python string literals, \x1b is ESC. + assert "\x1b[" in out or "\033[" in out + +def test_clear_screen_space(bin_path, can_interface): + """Test clearing screen ().""" + with CanSnifferMonitor(bin_path, can_interface) as sniffer: + # Give it a moment to startup + time.sleep(0.5) + + # Clear screen command + sniffer.send_input("\n") # Just enter sometimes works, or Space+Enter + sniffer.send_input(" \n") + + # If it clears, it emits ANSI clear sequence "\033[2J" or "\033[H" (Home) + out = sniffer.read_output(timeout=1.0) + + # Check for Common ANSI escape sequences for clearing/home + # \033[2J = Clear Screen, \033[H = Cursor Home + assert "\x1b[2J" in out or "\x1b[H" in out or "\x1b[" in out + +def test_timeout_t(bin_path, can_interface): + """Test timeout for ID display (-t).""" + # -t