Bindings: Python: Add formatDisk

master
Thomas Stoddard 2026-03-03 20:38:13 +00:00 committed by Kyle Schwarz
parent 224e840841
commit 0aa7d338fd
8 changed files with 304 additions and 0 deletions

View File

@ -41,6 +41,7 @@ pybind11_add_module(icsneopy
icsneopy/core/macseccfg.cpp
icsneopy/flexray/flexray.cpp
icsneopy/disk/diskdriver.cpp
icsneopy/disk/diskdetails.cpp
icsneopy/device/chipid.cpp
icsneopy/device/versionreport.cpp
icsneopy/device/device.cpp

View File

@ -5,6 +5,7 @@
#include "icsneo/device/device.h"
#include "icsneo/device/extensions/deviceextension.h"
#include "icsneo/disk/diskdetails.h"
#include <fstream>
@ -62,6 +63,11 @@ void init_device(pybind11::module_& m) {
.def("write_macsec_config", &Device::writeMACsecConfig, pybind11::call_guard<pybind11::gil_scoped_release>())
.def("send_eth_phy_msg", &Device::sendEthPhyMsg, pybind11::arg("message"), pybind11::arg("timeout") = std::chrono::milliseconds(50), pybind11::call_guard<pybind11::gil_scoped_release>())
.def("get_chip_versions", &Device::getChipVersions, pybind11::arg("refreshComponents") = true, pybind11::call_guard<pybind11::gil_scoped_release>())
.def("supports_disk_formatting", &Device::supportsDiskFormatting, pybind11::call_guard<pybind11::gil_scoped_release>())
.def("get_disk_count", &Device::getDiskCount, pybind11::call_guard<pybind11::gil_scoped_release>())
.def("get_disk_details", &Device::getDiskDetails, pybind11::arg("timeout") = std::chrono::milliseconds(100), pybind11::call_guard<pybind11::gil_scoped_release>())
.def("force_disk_config_update", &Device::forceDiskConfigUpdate, pybind11::arg("config"), pybind11::call_guard<pybind11::gil_scoped_release>())
.def("format_disk", [](Device& device, const DiskDetails& config) -> bool { return device.formatDisk(config); }, pybind11::arg("config"), pybind11::call_guard<pybind11::gil_scoped_release>())
.def_readonly("settings", &Device::settings);
}

View File

@ -0,0 +1,29 @@
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
#include "icsneo/disk/diskdetails.h"
namespace icsneo {
void init_diskdetails(pybind11::module_& m) {
pybind11::enum_<DiskLayout>(m, "DiskLayout")
.value("Spanned", DiskLayout::Spanned)
.value("RAID0", DiskLayout::RAID0);
pybind11::classh<DiskInfo>(m, "DiskInfo")
.def(pybind11::init())
.def_readwrite("present", &DiskInfo::present)
.def_readwrite("initialized", &DiskInfo::initialized)
.def_readwrite("formatted", &DiskInfo::formatted)
.def_readwrite("sectors", &DiskInfo::sectors)
.def_readwrite("bytes_per_sector", &DiskInfo::bytesPerSector)
.def("size", &DiskInfo::size);
pybind11::classh<DiskDetails>(m, "DiskDetails")
.def(pybind11::init())
.def_readwrite("layout", &DiskDetails::layout)
.def_readwrite("full_format", &DiskDetails::fullFormat)
.def_readwrite("disks", &DiskDetails::disks);
}
} // namespace icsneo

View File

@ -25,6 +25,7 @@ void init_ethernetstatusmessage(pybind11::module_&);
void init_macsecconfig(pybind11::module_&);
void init_scriptstatusmessage(pybind11::module_&);
void init_diskdriver(pybind11::module_&);
void init_diskdetails(pybind11::module_&);
void init_deviceextension(pybind11::module_&);
void init_chipid(pybind11::module_&);
void init_versionreport(pybind11::module_&);
@ -67,6 +68,7 @@ PYBIND11_MODULE(icsneopy, m) {
init_messagefilter(m);
init_messagecallback(m);
init_diskdriver(m);
init_diskdetails(m);
init_flexray(m);
init_ethphymessage(m);
init_chipid(m);

View File

@ -15,6 +15,7 @@ option(LIBICSNEO_BUILD_CPP_FLEXRAY_EXAMPLE "Build the FlexRay example." ON)
option(LIBICSNEO_BUILD_CPP_SPI_EXAMPLE "Build the SPI example." ON)
option(LIBICSNEO_BUILD_CPP_MUTEX_EXAMPLE "Build the NetworkMutex example." ON)
option(LIBICSNEO_BUILD_CPP_ANALOG_OUT_EXAMPLE "Build the analog output example." ON)
option(LIBICSNEO_BUILD_CPP_DISKFORMAT_EXAMPLE "Build the disk format example." ON)
add_compile_options(${LIBICSNEO_COMPILER_WARNINGS})
@ -85,3 +86,7 @@ endif()
if(LIBICSNEO_BUILD_CPP_ANALOG_OUT_EXAMPLE)
add_subdirectory(cpp/analog_out)
endif()
if(LIBICSNEO_BUILD_CPP_DISKFORMAT_EXAMPLE)
add_subdirectory(cpp/diskformat)
endif()

View File

@ -0,0 +1,2 @@
add_executable(libicsneocpp-diskformat-example src/DiskFormatExample.cpp)
target_link_libraries(libicsneocpp-diskformat-example icsneocpp)

View File

@ -0,0 +1,145 @@
#include <iostream>
#include <string>
#include "icsneo/icsneocpp.h"
#include "icsneo/disk/diskdetails.h"
int main() {
std::cout << "Running libicsneo " << icsneo::GetVersion() << std::endl;
std::cout << "\nFinding devices... " << std::flush;
auto devices = icsneo::FindAllDevices();
std::cout << "OK, " << devices.size() << " device" << (devices.size() == 1 ? "" : "s") << " found" << std::endl;
if(devices.empty()) {
std::cout << "error: no devices found" << std::endl;
return -1;
}
// List devices and let the user pick one
for(size_t i = 0; i < devices.size(); i++) {
std::cout << " [" << i << "] " << devices[i]->describe() << std::endl;
}
size_t choice = 0;
if(devices.size() > 1) {
std::cout << "Select a device [0-" << (devices.size() - 1) << "]: ";
std::cin >> choice;
if(choice >= devices.size()) {
std::cout << "error: invalid selection" << std::endl;
return -1;
}
}
auto& device = devices[choice];
std::cout << "\nOpening " << device->describe() << "... " << std::flush;
if(!device->open()) {
std::cout << "FAIL" << std::endl;
std::cout << "error: " << icsneo::GetLastError() << std::endl;
return -1;
}
std::cout << "OK" << std::endl;
// Check that this device supports disk formatting
if(!device->supportsDiskFormatting()) {
std::cout << "error: " << device->describe() << " does not support disk formatting" << std::endl;
device->close();
return -1;
}
std::cout << "Disk count: " << device->getDiskCount() << std::endl;
// Query the current disk state from the device
std::cout << "\nQuerying disk details... " << std::flush;
auto details = device->getDiskDetails();
if(!details) {
std::cout << "FAIL" << std::endl;
std::cout << "error: " << icsneo::GetLastError() << std::endl;
device->close();
return -1;
}
std::cout << "OK" << std::endl;
std::cout << " Layout : " << (details->layout == icsneo::DiskLayout::RAID0 ? "RAID0" : "Spanned") << std::endl;
for(size_t i = 0; i < details->disks.size(); i++) {
const auto& disk = details->disks[i];
std::cout << " Disk [" << i << "]:" << std::endl;
std::cout << " Present : " << (disk.present ? "yes" : "no") << std::endl;
std::cout << " Initialized : " << (disk.initialized ? "yes" : "no") << std::endl;
std::cout << " Formatted : " << (disk.formatted ? "yes" : "no") << std::endl;
if(disk.present) {
uint64_t bytes = disk.size();
std::cout << " Size : " << (bytes / (1024 * 1024)) << " MB"
<< " (" << disk.sectors << " sectors x " << disk.bytesPerSector << " bytes)" << std::endl;
}
}
// Build a format configuration.
// We keep the existing layout and re-use the disk geometry reported by the device.
// The 'formatted' flag must be true for each disk you want the device to format.
icsneo::DiskDetails formatConfig;
formatConfig.layout = details->layout;
formatConfig.fullFormat = false; // Quick format; set to true for a full (slow) format
formatConfig.disks = details->disks;
// Mark all present disks for formatting
bool anyPresent = false;
for(auto& disk : formatConfig.disks) {
if(disk.present) {
disk.formatted = true;
anyPresent = true;
}
}
if(!anyPresent) {
std::cout << "\nerror: no disks are present in the device" << std::endl;
device->close();
return -1;
}
std::cout << "\nThis will format the disk(s) in " << device->describe() << "." << std::endl;
std::cout << "All existing data will be lost. Continue? [y/N]: ";
std::string confirm;
std::cin >> confirm;
if(confirm != "y" && confirm != "Y") {
std::cout << "Aborted." << std::endl;
device->close();
return 0;
}
std::cout << "\nStarting format..." << std::endl;
// Progress callback — called every 500 ms while formatting
auto progressHandler = [](uint64_t sectorsFormatted, uint64_t sectorsTotal) -> icsneo::Device::DiskFormatDirective {
double pct = sectorsTotal > 0 ? (100.0 * sectorsFormatted / sectorsTotal) : 0.0;
std::cout << "\r Progress: " << sectorsFormatted << " / " << sectorsTotal
<< " sectors (" << static_cast<int>(pct) << "%)" << std::flush;
return icsneo::Device::DiskFormatDirective::Continue;
};
bool success = device->formatDisk(formatConfig, progressHandler);
std::cout << std::endl; // newline after progress line
if(!success) {
std::cout << "error: format failed: " << icsneo::GetLastError() << std::endl;
device->close();
return -1;
}
std::cout << "Format complete!" << std::endl;
// Verify by re-querying disk details
std::cout << "\nVerifying disk state after format... " << std::flush;
auto postDetails = device->getDiskDetails();
if(!postDetails) {
std::cout << "FAIL (could not re-query disk details)" << std::endl;
} else {
std::cout << "OK" << std::endl;
for(size_t i = 0; i < postDetails->disks.size(); i++) {
const auto& disk = postDetails->disks[i];
std::cout << " Disk [" << i << "] formatted: " << (disk.formatted ? "yes" : "no") << std::endl;
}
}
device->close();
return 0;
}

View File

@ -0,0 +1,114 @@
import sys
import icsneopy
def disk_format_example():
devices = icsneopy.find_all_devices()
if not devices:
print("error: no devices found")
return False
print(f"Found {len(devices)} device(s):")
for i, d in enumerate(devices):
print(f" [{i}] {d}")
if len(devices) == 1:
choice = 0
else:
try:
choice = int(input(f"Select a device [0-{len(devices)-1}]: "))
except (ValueError, EOFError):
print("error: invalid selection")
return False
if choice < 0 or choice >= len(devices):
print("error: invalid selection")
return False
device = devices[choice]
print(f"\nOpening {device}... ", end="", flush=True)
if not device.open():
print("FAIL")
print(f"error: {icsneopy.get_last_error().describe()}")
return False
print("OK")
if not device.supports_disk_formatting():
print(f"error: {device} does not support disk formatting")
device.close()
return False
print(f"Disk count: {device.get_disk_count()}")
# Query current disk state
print("\nQuerying disk details... ", end="", flush=True)
details = device.get_disk_details()
if details is None:
print("FAIL")
print(f"error: {icsneopy.get_last_error().describe()}")
device.close()
return False
print("OK")
layout_name = "RAID0" if details.layout == icsneopy.DiskLayout.RAID0 else "Spanned"
print(f" Layout : {layout_name}")
for i, disk in enumerate(details.disks):
print(f" Disk [{i}]:")
print(f" Present : {'yes' if disk.present else 'no'}")
print(f" Initialized : {'yes' if disk.initialized else 'no'}")
print(f" Formatted : {'yes' if disk.formatted else 'no'}")
if disk.present:
mb = disk.size() // (1024 * 1024)
print(f" Size : {mb} MB ({disk.sectors} sectors x {disk.bytes_per_sector} bytes)")
any_present = any(d.present for d in details.disks)
if not any_present:
print("\nerror: no disks are present in the device")
device.close()
return False
# Build format config from the queried state
fmt = icsneopy.DiskDetails()
fmt.layout = details.layout
fmt.full_format = False # Quick format; set True for a full (slow) format
fmt.disks = details.disks
for disk in fmt.disks:
if disk.present:
disk.formatted = True # mark for formatting
confirm = input(
f"\nThis will format the disk(s) in {device}.\n"
"All existing data will be lost. Continue? [y/N]: "
).strip()
if confirm.lower() != "y":
print("Aborted.")
device.close()
return True
print("\nStarting format...")
state = {"total": 0}
ok = device.format_disk(fmt)
print() # newline after progress line
if not ok:
print(f"error: format failed: {icsneopy.get_last_error().describe()}")
device.close()
return False
print("Format complete!")
# Verify
print("\nVerifying disk state after format... ", end="", flush=True)
post = device.get_disk_details()
if post is None:
print("FAIL (could not re-query disk details)")
else:
print("OK")
for i, disk in enumerate(post.disks):
print(f" Disk [{i}] formatted: {'yes' if disk.formatted else 'no'}")
device.close()
return True
if __name__ == "__main__":
sys.exit(0 if disk_format_example() else 1)