can-utils/tests/test_isotpdump.py

219 lines
7.0 KiB
Python

# SPDX-License-Identifier: GPL-2.0-only
import subprocess
import time
import os
import pytest
import signal
import pty
import select
# Note: bin_path and interface fixtures are provided implicitly by conftest.py
def check_isotp_support(bin_path, interface):
"""
Helper to check if ISO-TP kernel module is loaded/available.
Reused logic to prevent failing tests on systems without can-isotp.
"""
isotpsend = os.path.join(bin_path, "isotpsend")
if not os.path.exists(isotpsend):
return False
try:
proc = subprocess.Popen(
[isotpsend, "-s", "123", "-d", "321", interface],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
try:
outs, errs = proc.communicate(input="00", timeout=0.5)
except subprocess.TimeoutExpired:
proc.kill()
outs, errs = proc.communicate()
if "socket: Protocol not supported" in errs or "socket: Protocol not supported" in outs:
return False
return True
except Exception:
return False
def test_isotpdump_usage(bin_path):
"""
Test usage/help output for isotpdump.
Manual Reproduction:
Run: ./isotpdump -h
Expect: Output containing "Usage: isotpdump".
"""
isotpdump = os.path.join(bin_path, "isotpdump")
if not os.path.exists(isotpdump):
pytest.skip("isotpdump binary not found")
# Running with -h usually triggers error output about invalid option 'h'
# or just prints usage.
result = subprocess.run(
[isotpdump, "-h"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
assert "Usage: isotpdump" in result.stderr or "Usage: isotpdump" in result.stdout
@pytest.mark.parametrize("payload_hex, desc", [
("11 22 33", "Single Frame"),
("00 11 22 33 44 55 66 77 88 99 AA BB CC DD EE", "Multi Frame (Segmentation)")
])
def test_isotpdump_traffic(bin_path, can_interface, payload_hex, desc):
"""
Test that isotpdump correctly captures and decodes ISO-TP traffic.
Description:
1. Start isotpdump in background using a PTY.
It uses a raw CAN socket to monitor traffic and interprets ISO-TP headers.
2. Start isotprecv (Background) to handle Flow Control (FC).
3. Run isotpsend to generate traffic.
4. Read dump output continuously.
5. Stop dump with SIGINT.
6. Verify dump output contains the payload bytes.
Manual Reproduction:
1. Terminal 1: ./isotpdump -s 123 -d 321 vcan0
2. Terminal 2: ./isotprecv -s 321 -d 123 vcan0
3. Terminal 3: echo "11 22 33" | ./isotpsend -s 123 -d 321 vcan0
"""
isotpdump = os.path.join(bin_path, "isotpdump")
isotpsend = os.path.join(bin_path, "isotpsend")
isotprecv = os.path.join(bin_path, "isotprecv")
for tool in [isotpdump, isotpsend, isotprecv]:
if not os.path.exists(tool):
pytest.skip(f"{tool} not found")
if not check_isotp_support(bin_path, can_interface):
pytest.skip("ISO-TP kernel support missing")
SRC_ID = "123"
DST_ID = "321"
print(f"\nDEBUG: Starting test '{desc}'")
# 1. Start Dump with PTY
master_fd, slave_fd = pty.openpty()
# -s SourceID, -d DestID
dump_cmd = [isotpdump, "-s", SRC_ID, "-d", DST_ID, can_interface]
print(f"DEBUG: Starting dump: {' '.join(dump_cmd)}")
dump_proc = subprocess.Popen(
dump_cmd,
stdin=slave_fd,
stdout=slave_fd,
stderr=slave_fd,
close_fds=True
)
os.close(slave_fd)
output_bytes = b""
try:
time.sleep(1.0) # Wait for startup
if dump_proc.poll() is not None:
pytest.fail(f"isotpdump process died during startup. RC={dump_proc.returncode}")
# 2. Start Receiver (isotprecv)
# Required for Multi-Frame Flow Control
recv_cmd = [isotprecv, "-s", DST_ID, "-d", SRC_ID, can_interface]
recv_proc = subprocess.Popen(
recv_cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
text=True
)
try:
time.sleep(1.0)
if recv_proc.poll() is not None:
_, err = recv_proc.communicate()
pytest.fail(f"isotprecv failed to start. RC={recv_proc.returncode}. Stderr: {err}")
# 3. Send Data (isotpsend)
send_cmd = [isotpsend, "-s", SRC_ID, "-d", DST_ID, can_interface]
print(f"DEBUG: Sending data: {' '.join(send_cmd)} with payload '{payload_hex}'")
subprocess.run(
send_cmd,
input=payload_hex,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=True,
timeout=2
)
# 4. Capture dump output
start_wait = time.time()
# isotpdump output format is raw-ish but includes data bytes.
# Example: "vcan0 123 [3] 11 22 33"
expected_parts = payload_hex.split()
print("DEBUG: Waiting for data in dump output...")
while time.time() - start_wait < 4:
r, _, _ = select.select([master_fd], [], [], 0.1)
if master_fd in r:
try:
chunk = os.read(master_fd, 4096)
if chunk:
output_bytes += chunk
except OSError:
pass
current_out = output_bytes.decode('utf-8', errors='replace')
# Check if we have the data
if all(part in current_out for part in expected_parts):
print("DEBUG: Found all expected parts in output.")
break
if dump_proc.poll() is not None:
break
finally:
if recv_proc.poll() is None:
recv_proc.terminate()
recv_proc.wait()
finally:
if dump_proc.poll() is None:
print("DEBUG: Sending SIGINT to dump.")
dump_proc.send_signal(signal.SIGINT)
try:
end_time = time.time() + 1
while time.time() < end_time:
r, _, _ = select.select([master_fd], [], [], 0.1)
if master_fd in r:
try:
chunk = os.read(master_fd, 4096)
except OSError:
break
if dump_proc.poll() is not None:
break
finally:
if dump_proc.poll() is None:
dump_proc.kill()
dump_proc.wait()
os.close(master_fd)
output = output_bytes.decode('utf-8', errors='replace')
print(f"--- Dump Output ({desc}) ---\n{output}")
expected_parts = payload_hex.split()
missing_bytes = [b for b in expected_parts if b not in output]
assert not missing_bytes, f"Missing bytes in dump output: {missing_bytes}. Output:\n{output}"