libicsneo/examples/python/t1s/t1s_symbol_decoding.py

283 lines
9.0 KiB
Python

"""
10BASE-T1S Symbol Decoding Example
Demonstrates T1S bus symbol decoding and analysis
"""
import icsneopy
import time
from enum import IntEnum
class T1SSymbol(IntEnum):
"""10BASE-T1S Symbol Types"""
SSD = 0x04
ESDOK = 0x07
BEACON = 0x08
ESD = 0x0D
ESDERR = 0x11
SYNC = 0x18
ESDJAB = 0x19
SILENCE = 0x1F
@classmethod
def get_name(cls, value):
"""Get human-readable name for symbol value."""
try:
return cls(value).name
except ValueError:
if 0x00 <= value <= 0x0F:
return f"DATA(0x{value:X})"
return f"UNKNOWN(0x{value:02X})"
def get_user_confirmation(prompt):
"""Get yes/no confirmation from user."""
response = input(f"{prompt} (y/n): ").strip().lower()
return response == 'y'
def configure_t1s_decoding(device, network, enable_symbols, enable_beacons):
"""Configure T1S bus decoding settings."""
settings = device.get_settings()
if not settings:
raise RuntimeError("Failed to get device settings")
print(f"\nConfiguring T1S decoding on network {network}...")
if not settings.set_t1s_bus_decoding_all(network, enable_symbols):
raise RuntimeError("Failed to set T1S symbol decoding")
if enable_symbols:
print(" ✓ Enabled decoding of all T1S symbols")
else:
print(" • T1S symbol decoding disabled")
if not settings.set_t1s_bus_decoding_beacons(network, enable_beacons):
raise RuntimeError("Failed to set T1S beacon decoding")
if enable_beacons:
print(" ✓ Enabled T1S beacon decoding")
else:
print(" • T1S beacon decoding disabled")
if not device.set_settings(settings):
raise RuntimeError("Failed to apply settings to device")
print(" ✓ Settings applied successfully")
def setup_symbol_monitoring(device, network):
"""Setup callback to monitor and decode T1S symbols."""
state = {
'symbol_count': 0,
'beacon_count': 0,
'wake_count': 0,
'burst_count': 0,
'symbol_stats': {},
'data_frame_count': 0
}
def symbol_handler(msg):
"""Handle incoming T1S messages."""
if not isinstance(msg, icsneopy.EthernetMessage):
return
if not msg.isT1S:
return
timestamp_ms = msg.timestamp / 1000000.0
if msg.isT1SSymbol:
num_symbols = len(msg.data)
print(f"[{timestamp_ms:12.3f} ms] T1S Symbols", end="")
if num_symbols > 0:
print(f" ({num_symbols} symbol{'s' if num_symbols > 1 else ''})", end="")
print(f" | Node ID: {msg.t1sNodeId}")
for i, symbol_value in enumerate(msg.data):
symbol_name = T1SSymbol.get_name(symbol_value)
state['symbol_count'] += 1
if symbol_name not in state['symbol_stats']:
state['symbol_stats'][symbol_name] = 0
state['symbol_stats'][symbol_name] += 1
if symbol_value == T1SSymbol.BEACON:
state['beacon_count'] += 1
print(f" [{i}] {symbol_name:10s} = 0x{symbol_value:02X}")
if num_symbols == 0 and msg.t1sSymbolType != 0:
symbol_value = msg.t1sSymbolType
symbol_name = T1SSymbol.get_name(symbol_value)
state['symbol_count'] += 1
if symbol_name not in state['symbol_stats']:
state['symbol_stats'][symbol_name] = 0
state['symbol_stats'][symbol_name] += 1
if symbol_value == T1SSymbol.BEACON:
state['beacon_count'] += 1
print(f" {symbol_name:10s} = 0x{symbol_value:02X} (from t1sSymbolType field)")
elif msg.isT1SBurst:
state['burst_count'] += 1
print(f"[{timestamp_ms:12.3f} ms] BURST | "
f"Node ID: {msg.t1sNodeId} | "
f"Burst Count: {msg.t1sBurstCount}")
elif msg.isT1SWake:
state['wake_count'] += 1
print(f"[{timestamp_ms:12.3f} ms] WAKE signal detected | "
f"Node ID: {msg.t1sNodeId}")
else:
state['data_frame_count'] += 1
print(f"[{timestamp_ms:12.3f} ms] T1S Data Frame | "
f"Length: {len(msg.data)} bytes | "
f"Node ID: {msg.t1sNodeId}")
if msg.data and len(msg.data) > 0:
preview = ' '.join([f"{b:02X}" for b in msg.data[:16]])
if len(msg.data) > 16:
preview += " ..."
print(f" Data: {preview}")
frame_filter = icsneopy.MessageFilter(network)
callback = icsneopy.MessageCallback(symbol_handler, frame_filter)
device.add_message_callback(callback)
return state
def print_statistics(state):
"""Print monitoring statistics."""
print("\n" + "=" * 70)
print("T1S SYMBOL DECODING STATISTICS")
print("=" * 70)
print(f"Total Symbols: {state['symbol_count']}")
print(f"Total Beacons: {state['beacon_count']}")
print(f"Total Wake Signals: {state['wake_count']}")
print(f"Total Bursts: {state['burst_count']}")
print(f"Total Data Frames: {state['data_frame_count']}")
if state['symbol_stats']:
print("\n" + "-" * 70)
print("Symbol Type Breakdown:")
print("-" * 70)
for symbol_name, count in sorted(state['symbol_stats'].items(),
key=lambda x: x[1], reverse=True):
print(f" {symbol_name:20s}{count:>10d}")
print("=" * 70)
def main():
"""Main T1S symbol decoding example."""
device = None
try:
MONITOR_NETWORK = icsneopy.Network.NetID.AE_02
MONITOR_DURATION = 30
print("\n" + "=" * 70)
print("10BASE-T1S SYMBOL DECODING EXAMPLE")
print("=" * 70)
print(f"libicsneo {icsneopy.get_version()}")
print("=" * 70)
print("\nFinding devices... ", end="", flush=True)
devices = icsneopy.find_all_devices()
print(f"OK, {len(devices)} device{'s' if len(devices) != 1 else ''} found")
if not devices:
print("No devices found!")
return 1
for d in devices:
print(f" {d}")
device = None
for d in devices:
if d.get_type() == icsneopy.DeviceType.RADComet3:
device = d
break
if not device and devices:
device = devices[0]
if not device:
print("No suitable device found!")
return 1
print(f"\nSelected device: {device}")
print(f"Serial: {device.get_serial()}")
print("\n" + "-" * 70)
print("T1S DECODING CONFIGURATION")
print("-" * 70)
enable_symbols = get_user_confirmation("Enable T1S symbol decoding (all symbols)")
enable_beacons = get_user_confirmation("Enable T1S beacon decoding")
print("-" * 70)
print("\nOpening device... ", end="", flush=True)
if not device.open():
print("✗ Failed")
return 1
print("")
print("Enabling message polling... ", end="", flush=True)
if not device.enable_message_polling():
print("✗ Failed")
device.close()
return 1
device.set_polling_message_limit(100000)
print("")
configure_t1s_decoding(device, MONITOR_NETWORK, enable_symbols, enable_beacons)
print("Going online... ", end="", flush=True)
if not device.go_online():
print("✗ Failed")
device.close()
return 1
print("")
state = setup_symbol_monitoring(device, MONITOR_NETWORK)
print("\n" + "-" * 70)
print(f"Monitoring T1S traffic for {MONITOR_DURATION} seconds...")
print("-" * 70)
start_time = time.time()
while time.time() - start_time < MONITOR_DURATION:
device.get_messages()
time.sleep(0.01)
print("\n" + "-" * 70)
print("Closing device... ", end="", flush=True)
device.close()
time.sleep(0.1)
print("")
print_statistics(state)
except KeyboardInterrupt:
print("\n\nMonitoring interrupted by user")
if 'state' in locals():
print_statistics(state)
except Exception as e:
print(f"\nError: {e}")
import traceback
traceback.print_exc()
return 1
finally:
if device and device.is_open():
device.close()
return 0
if __name__ == "__main__":
exit(main())