import ics import time def transmit_i2c( device, netid, read, slave_addr, control_len, controldata, data_len, data ): msg = ics.SpyMessage() msg.NetworkID = netid & 0xFF msg.NetworkID2 = (netid >> 8) & 0xFF msg.Protocol = ics.SPY_PROTOCOL_I2C msg.StatusBitField = 0 msg.StatusBitField2 = 0 msg.StatusBitField |= ics.SPY_STATUS_NETWORK_MESSAGE_TYPE msg.StatusBitField |= ics.SPY_STATUS_TX_MSG if slave_addr & 0x380: # if 10-bit address msg.StatusBitField |= ics.SPY_STATUS_XTD_FRAME if read: msg.StatusBitField2 |= ics.SPY_STATUS2_I2C_DIR_READ msg.ArbIDOrHeader = 0x10000000 msg.DescriptionID = 0x6869 # arbitrary tx msg "key" msg.NodeID = 0 msg.ArbIDOrHeader |= slave_addr msg.NumberBytesHeader = control_len msg.MiscData = (data_len >> 8) & 0xFF msg.NumberBytesData = data_len & 0xFF if data_len: for x in data[:data_len]: controldata.append(x) msg.ExtraDataPtr = tuple(controldata) msg.ExtraDataPtrEnabled = 1 ics.transmit_messages(device, msg) return msg def are_errors_present(msg): error_flags = 0 error_flags |= ics.SPY_STATUS_GLOBAL_ERR error_flags |= ics.SPY_STATUS_CRC_ERROR error_flags |= ics.SPY_STATUS_CAN_ERROR_PASSIVE error_flags |= ics.SPY_STATUS_HEADERCRC_ERROR error_flags |= ics.SPY_STATUS_INCOMPLETE_FRAME error_flags |= ics.SPY_STATUS_LOST_ARBITRATION error_flags |= ics.SPY_STATUS_UNDEFINED_ERROR error_flags |= ics.SPY_STATUS_CAN_BUS_OFF error_flags |= ics.SPY_STATUS_BUS_RECOVERED error_flags |= ics.SPY_STATUS_BUS_SHORTED_PLUS error_flags |= ics.SPY_STATUS_BUS_SHORTED_GND error_flags |= ics.SPY_STATUS_CHECKSUM_ERROR error_flags |= ics.SPY_STATUS_BAD_MESSAGE_BIT_TIME_ERROR error_flags |= ics.SPY_STATUS_TX_NOMATCH error_flags |= ics.SPY_STATUS_COMM_IN_OVERFLOW error_flags |= ics.SPY_STATUS_EXPECTED_LEN_MISMATCH error_flags |= ics.SPY_STATUS_MSG_NO_MATCH error_flags |= ics.SPY_STATUS_BREAK error_flags |= ics.SPY_STATUS_AVSI_REC_OVERFLOW if (msg.StatusBitField & error_flags) != 0: return True error_flags = 0 error_flags |= ics.SPY_STATUS2_I2C_ERR_TIMEOUT error_flags |= ics.SPY_STATUS2_I2C_ERR_NACK if (msg.StatusBitField2 & error_flags) != 0: return True return False def find_i2c_message(msg, msgs): for msg_rx in msgs: if msg.NetworkID == msg_rx.NetworkID and msg.NetworkID2 == msg_rx.NetworkID2: # check for any possible errors if are_errors_present(msg_rx): return None data = msg_rx.ExtraDataPtr return data return None def wait_for_i2c_msg(device, msg, timeout=2): start = time.time() while time.time() - start <= timeout: time.sleep(0.01) msgs, errors = ics.get_messages(device) data = find_i2c_message(msg, msgs) if data is not None: return data return None def i2c_read( device, netid, slave_addr, control_len, controldata, data_len, data, timeout=2 ): msg = transmit_i2c( device, netid, 1, slave_addr, control_len, controldata, data_len, data ) return wait_for_i2c_msg(device, msg, timeout) def i2c_write( device, netid, slave_addr, control_len, controldata, data_len, data, timeout=2 ): msg = transmit_i2c( device, netid, 0, slave_addr, control_len, controldata, data_len, data ) return wait_for_i2c_msg(device, msg, timeout)