From 3506c0c8dc7073f1c218842b8010bb11195f08df Mon Sep 17 00:00:00 2001 From: Wojciech Olech Date: Wed, 18 Oct 2023 17:59:42 +0200 Subject: [PATCH] UART: Added embedded-io traits tests --- testbins/test-hal-uart/src/uart_tests.rs | 101 +++++++++++++++++++---- tests/requirements/test/test_hal_uart.py | 99 +++++++++++++++++----- 2 files changed, 163 insertions(+), 37 deletions(-) diff --git a/testbins/test-hal-uart/src/uart_tests.rs b/testbins/test-hal-uart/src/uart_tests.rs index 69f95df6..332b746d 100644 --- a/testbins/test-hal-uart/src/uart_tests.rs +++ b/testbins/test-hal-uart/src/uart_tests.rs @@ -4,8 +4,8 @@ use core::cell::RefCell; use aerugo::hal::drivers::nvic::{Interrupt, NVIC}; use aerugo::hal::drivers::uart::config::LoopbackMode; -use aerugo::hal::drivers::uart::reader::Reader; -use aerugo::hal::drivers::uart::writer::Writer; +use aerugo::hal::drivers::uart::reader::{Read, Reader}; +use aerugo::hal::drivers::uart::writer::{Write, Writer}; use aerugo::hal::drivers::uart::{ Config as UartConfig, Error, Interrupt as UartInterrupt, NotConfigured, ParityBit, ReceiverConfig, UART, @@ -23,12 +23,14 @@ use heapless::Vec; /// * Test state-specific configuration methods /// * Test transmission and reception via loopback in blocking mode /// * Test transmission and reception with test host in interrupt mode +/// * Test transmission and reception with test host using embedded-io traits implementation pub fn test_uart(uart: UART, nvic: NVIC) { let uart = test_uart_configuration(uart); let uart = test_uart_state_transition(uart); let uart = test_reader_writer(uart); let uart = test_uart_local_loopback(uart); - test_uart_io(uart, nvic); + let uart = test_uart_io(uart, nvic); + test_embedded_io(uart); write_str("All UART functional tests finished successfully."); } @@ -237,7 +239,7 @@ const TEST_DATA_LENGTH: usize = 1024; static UART_RX_BUFFER: Mutex>> = Mutex::new(RefCell::new(Vec::new())); -fn test_uart_io(uart: UART, mut nvic: NVIC) { +fn test_uart_io(uart: UART, mut nvic: NVIC) -> UART { let uart_config = UartConfig::new(57600, 12.MHz()).unwrap(); let mut uart = uart.into_bidirectional( uart_config, @@ -247,7 +249,6 @@ fn test_uart_io(uart: UART, mut nvic: NVIC) { ); // Enable all RX-related interrupts. - uart.disable_all_interrupts(); uart.enable_interrupt(UartInterrupt::RxReady); uart.enable_interrupt(UartInterrupt::FramingError); uart.enable_interrupt(UartInterrupt::OverrunError); @@ -259,18 +260,23 @@ fn test_uart_io(uart: UART, mut nvic: NVIC) { // Safety: This is safe, because UART IRQ is disabled. unsafe { UART_READER_STORAGE.replace(reader) }; - // From this point, accessing UART_READER_STORAGE from non-IRQ code is unsafe. + // From this point, accessing UART_READER_STORAGE from non-IRQ code is UNSAFE. nvic.enable(Interrupt::UART4); // Run all I/O tests now. perform_uart_handshake(&mut writer); test_data_reception(); - test_data_transmission(writer); + test_data_transmission(&mut writer); - // This should be the last test, so disable everything. + // From this point, accessing UART_READER_STORAGE from non-IRQ code is SAFE. nvic.disable(Interrupt::UART4); uart.disable_all_interrupts(); - uart.disable(); + + let reader = unsafe { UART_READER_STORAGE.take().unwrap() }; + uart.put_reader(reader); + uart.put_writer(writer); + + uart.disable() } fn perform_uart_handshake(writer: &mut Writer) { @@ -284,8 +290,7 @@ fn perform_uart_handshake(writer: &mut Writer) { let buffer = buffer_ref.borrow(); panic!( "unexpected handshake message received, expected {:?}, got {:?}", - expected_message, - buffer.as_slice() + expected_message, buffer ) }) } @@ -313,7 +318,7 @@ fn wait_for_handshake_message(expected_message: &[u8]) -> bool { return; } - if buffer.as_slice() == expected_message { + if buffer == expected_message { handshake_received = true; buffer.clear(); } @@ -351,9 +356,9 @@ fn test_data_reception() { write_str("Data reception test successful!"); } -fn test_data_transmission(mut writer: Writer) { +fn test_data_transmission(writer: &mut Writer) { let tx_data: Vec = (0..TEST_DATA_LENGTH).map(|i| i as u8).collect(); - writer.transmit_bytes(tx_data.as_slice(), u32::MAX).unwrap(); + writer.transmit_bytes(&tx_data, u32::MAX).unwrap(); write_str("Test data transmitted!"); } @@ -381,9 +386,75 @@ fn UART4() { UART_RX_BUFFER.lock(|buffer_ref| { buffer_ref .get_mut() - // Safety: This is safe, because we verified that receiver is ready. + // Safety: This is safe, because we verified that the receiver is ready. .push(unsafe { reader.get_received_byte() }) .unwrap() }); } } + +fn test_embedded_io(uart: UART) -> UART { + let uart_config = UartConfig::new(57600, 12.MHz()).unwrap(); + let mut uart = uart.into_bidirectional( + uart_config, + ReceiverConfig { + rx_filter_enabled: true, + }, + ); + + let mut reader = uart.take_reader().unwrap(); + let mut writer = uart.take_writer().unwrap(); + + write_str("Starting embedded-io traits tests."); + + test_embedded_io_read(&mut reader); + test_embedded_io_write(&mut writer); + + write_str("Embedded-io tests successful!"); + + uart.put_reader(reader); + uart.put_writer(writer); + uart.disable() +} + +fn test_embedded_io_read(reader: &mut Reader) { + let mut data_buffer: [u8; TEST_DATA_LENGTH] = [0; TEST_DATA_LENGTH]; + + write_str("Starting embedded-io read test..."); + + reader.read_exact(&mut data_buffer).unwrap(); + + for (index, &value) in data_buffer.iter().enumerate() { + let expected_value = 0xFF - (index as u8); + if value != expected_value { + panic!( + "Received invalid byte @ index {}, expected {}, got {}", + index, expected_value, value + ); + } + } + + write_str("Embedded-io read test successful!"); +} + +fn test_embedded_io_write(writer: &mut Writer) { + let tx_data: Vec = + (0..TEST_DATA_LENGTH).map(|i| 0xFF - (i as u8)).collect(); + writer.write_all(&tx_data).unwrap(); + + write_str("Embedded-io test data chunk sent!"); + + let some_var = 123; + let some_hex = 0xBEEF; + let some_str = "hello world"; + let some_char = 'a'; + + writer + .write_fmt(format_args!( + "Formatting test: {}, {:#08X}, {}, {}!", + some_var, some_hex, some_str, some_char + )) + .unwrap(); + + write_str("Embedded-io formatted string sent!"); +} diff --git a/tests/requirements/test/test_hal_uart.py b/tests/requirements/test/test_hal_uart.py index 7a0f5035..f5c7ff70 100644 --- a/tests/requirements/test/test_hal_uart.py +++ b/tests/requirements/test/test_hal_uart.py @@ -51,7 +51,7 @@ def main() -> None: uart = RemoteUARTConnection(ssh, uart_config) if not uart.open_uart(): logging.critical("TEST FAILED, could not establish UART connection!") - sys.exit(100) + sys.exit(1000) perform_uart_io_tests(uart, rtt, ssh, 1024) @@ -72,33 +72,16 @@ def perform_uart_io_tests( uart: RemoteUARTConnection, rtt: CalldwellRTTClient, ssh: SSHClient, - data_length: int, + test_data_length: int, ) -> None: """Tests UART I/O using UART interface that's connected to target board""" # Perform a simple handshake to make sure that the connection is established correctly # (baudrates are OK, etc.) perform_uart_handshake(uart, rtt, ssh) + perform_uart_standard_io_test(uart, rtt, ssh, test_data_length) + perform_uart_embedded_io_test(uart, rtt, ssh, test_data_length) - # Reception test (test host -> MCU) with large chunk of data - uart.write_bytes(bytes([i % 0x100 for i in range(data_length)])) - wait_for_messages(rtt, ssh, ["Data reception test successful!", "Test data transmitted!"]) - - # Transmission test (MCU -> test host) with large chunk of data - reception_result = uart.read_exact_bytes(data_length) - if reception_result.is_err: - logging.critical("TEST FAILED, could not receive all the data from target board!") - sys.exit(50) - - received_data = reception_result.unwrap() - for byte, i in zip(received_data, range(data_length)): - if byte != (expected := i % 0x100): - logging.critical( - f"TEST FAILED, unexpected byte @ index {i}, expected {expected}, got {byte}", - ) - sys.exit(60) - - logging.info("Data transmission test successful!") - logging.info("All I/O test successful!") + logging.info("All I/O tests successful!") def perform_uart_handshake( @@ -151,6 +134,78 @@ def perform_uart_handshake( logging.info("UART handshake successful!") +def perform_uart_standard_io_test( + uart: RemoteUARTConnection, + rtt: CalldwellRTTClient, + ssh: SSHClient, + test_data_length: int, +) -> None: + """Performs I/O tests using standard UART implementation""" + # Reception test (test host -> MCU) with large chunk of data + uart.write_bytes(bytes([i % 0x100 for i in range(test_data_length)])) + wait_for_messages(rtt, ssh, ["Data reception test successful!", "Test data transmitted!"]) + + # Transmission test (MCU -> test host) with large chunk of data + reception_result = uart.read_exact_bytes(test_data_length) + if reception_result.is_err: + logging.critical("TEST FAILED, could not receive all the data from target board!") + sys.exit(50) + + received_data = reception_result.unwrap() + for byte, i in zip(received_data, range(test_data_length)): + if byte != (expected := i % 0x100): + logging.critical( + f"TEST FAILED, unexpected byte @ index {i}, expected {expected}, got {byte}", + ) + sys.exit(60) + + logging.info("Data transmission test successful!") + + +def perform_uart_embedded_io_test( + uart: RemoteUARTConnection, + rtt: CalldwellRTTClient, + ssh: SSHClient, + test_data_length: int, +) -> None: + """Performs I/o test using embedded-io traits implementation""" + wait_for_messages(rtt, ssh, ["Starting embedded-io traits tests."]) + wait_for_messages(rtt, ssh, ["Starting embedded-io read test..."]) + + uart.write_bytes(bytes([0xFF - (i % 0x100) for i in range(test_data_length)])) + + wait_for_messages(rtt, ssh, ["Embedded-io read test successful!"]) + + wait_for_messages(rtt, ssh, ["Embedded-io test data chunk sent!"]) + + chunk_reception_result = uart.read_exact_bytes(test_data_length) + if chunk_reception_result.is_err: + logging.critical("TEST FAILED, could not receive all the data from target board!") + sys.exit(70) + + received_data_chunk = chunk_reception_result.unwrap() + for byte, i in zip(received_data_chunk, range(test_data_length)): + if byte != (expected := 0xFF - (i % 0x100)): + logging.critical( + f"TEST FAILED, unexpected byte @ index {i}, expected {expected}, got {byte}", + ) + sys.exit(80) + + wait_for_messages(rtt, ssh, ["Embedded-io formatted string sent!"]) + + string_reception_result = uart.read_string(b"!") + if string_reception_result.is_err: + logging.critical("TEST FAILED, could not receive formatted string from target board!") + sys.exit(90) + + expected_string = "Formatting test: 123, 0x00BEEF, hello world, a!" + if (received_string := string_reception_result.unwrap()) != expected_string: + logging.critical(f"TEST FAILED, expected {expected_string}, got {received_string}") + sys.exit(100) + + wait_for_messages(rtt, ssh, ["Embedded-io tests successful!"]) + + if __name__ == "__main__": init_default_logger() main()