Skip to content

Commit

Permalink
UART: Added embedded-io traits tests
Browse files Browse the repository at this point in the history
  • Loading branch information
SteelPh0enix committed Oct 23, 2023
1 parent 7111e32 commit 3506c0c
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 37 deletions.
101 changes: 86 additions & 15 deletions testbins/test-hal-uart/src/uart_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use core::cell::RefCell;

use aerugo::hal::drivers::nvic::{Interrupt, NVIC};
use aerugo::hal::drivers::uart::config::LoopbackMode;
use aerugo::hal::drivers::uart::reader::Reader;
use aerugo::hal::drivers::uart::writer::Writer;
use aerugo::hal::drivers::uart::reader::{Read, Reader};
use aerugo::hal::drivers::uart::writer::{Write, Writer};
use aerugo::hal::drivers::uart::{
Config as UartConfig, Error, Interrupt as UartInterrupt, NotConfigured, ParityBit,
ReceiverConfig, UART,
Expand All @@ -23,12 +23,14 @@ use heapless::Vec;
/// * Test state-specific configuration methods
/// * Test transmission and reception via loopback in blocking mode
/// * Test transmission and reception with test host in interrupt mode
/// * Test transmission and reception with test host using embedded-io traits implementation
pub fn test_uart(uart: UART<UART4, NotConfigured>, nvic: NVIC) {
let uart = test_uart_configuration(uart);
let uart = test_uart_state_transition(uart);
let uart = test_reader_writer(uart);
let uart = test_uart_local_loopback(uart);
test_uart_io(uart, nvic);
let uart = test_uart_io(uart, nvic);
test_embedded_io(uart);
write_str("All UART functional tests finished successfully.");
}

Expand Down Expand Up @@ -237,7 +239,7 @@ const TEST_DATA_LENGTH: usize = 1024;
static UART_RX_BUFFER: Mutex<RefCell<Vec<u8, TEST_DATA_LENGTH>>> =
Mutex::new(RefCell::new(Vec::new()));

fn test_uart_io(uart: UART<UART4, NotConfigured>, mut nvic: NVIC) {
fn test_uart_io(uart: UART<UART4, NotConfigured>, mut nvic: NVIC) -> UART<UART4, NotConfigured> {
let uart_config = UartConfig::new(57600, 12.MHz()).unwrap();
let mut uart = uart.into_bidirectional(
uart_config,
Expand All @@ -247,7 +249,6 @@ fn test_uart_io(uart: UART<UART4, NotConfigured>, mut nvic: NVIC) {
);

// Enable all RX-related interrupts.
uart.disable_all_interrupts();
uart.enable_interrupt(UartInterrupt::RxReady);
uart.enable_interrupt(UartInterrupt::FramingError);
uart.enable_interrupt(UartInterrupt::OverrunError);
Expand All @@ -259,18 +260,23 @@ fn test_uart_io(uart: UART<UART4, NotConfigured>, mut nvic: NVIC) {
// Safety: This is safe, because UART IRQ is disabled.
unsafe { UART_READER_STORAGE.replace(reader) };

// From this point, accessing UART_READER_STORAGE from non-IRQ code is unsafe.
// From this point, accessing UART_READER_STORAGE from non-IRQ code is UNSAFE.
nvic.enable(Interrupt::UART4);

// Run all I/O tests now.
perform_uart_handshake(&mut writer);
test_data_reception();
test_data_transmission(writer);
test_data_transmission(&mut writer);

// This should be the last test, so disable everything.
// From this point, accessing UART_READER_STORAGE from non-IRQ code is SAFE.
nvic.disable(Interrupt::UART4);
uart.disable_all_interrupts();
uart.disable();

let reader = unsafe { UART_READER_STORAGE.take().unwrap() };
uart.put_reader(reader);
uart.put_writer(writer);

uart.disable()
}

fn perform_uart_handshake(writer: &mut Writer<UART4>) {
Expand All @@ -284,8 +290,7 @@ fn perform_uart_handshake(writer: &mut Writer<UART4>) {
let buffer = buffer_ref.borrow();
panic!(
"unexpected handshake message received, expected {:?}, got {:?}",
expected_message,
buffer.as_slice()
expected_message, buffer
)
})
}
Expand Down Expand Up @@ -313,7 +318,7 @@ fn wait_for_handshake_message(expected_message: &[u8]) -> bool {
return;
}

if buffer.as_slice() == expected_message {
if buffer == expected_message {
handshake_received = true;
buffer.clear();
}
Expand Down Expand Up @@ -351,9 +356,9 @@ fn test_data_reception() {
write_str("Data reception test successful!");
}

fn test_data_transmission(mut writer: Writer<UART4>) {
fn test_data_transmission(writer: &mut Writer<UART4>) {
let tx_data: Vec<u8, TEST_DATA_LENGTH> = (0..TEST_DATA_LENGTH).map(|i| i as u8).collect();
writer.transmit_bytes(tx_data.as_slice(), u32::MAX).unwrap();
writer.transmit_bytes(&tx_data, u32::MAX).unwrap();
write_str("Test data transmitted!");
}

Expand Down Expand Up @@ -381,9 +386,75 @@ fn UART4() {
UART_RX_BUFFER.lock(|buffer_ref| {
buffer_ref
.get_mut()
// Safety: This is safe, because we verified that receiver is ready.
// Safety: This is safe, because we verified that the receiver is ready.
.push(unsafe { reader.get_received_byte() })
.unwrap()
});
}
}

fn test_embedded_io(uart: UART<UART4, NotConfigured>) -> UART<UART4, NotConfigured> {
let uart_config = UartConfig::new(57600, 12.MHz()).unwrap();
let mut uart = uart.into_bidirectional(
uart_config,
ReceiverConfig {
rx_filter_enabled: true,
},
);

let mut reader = uart.take_reader().unwrap();
let mut writer = uart.take_writer().unwrap();

write_str("Starting embedded-io traits tests.");

test_embedded_io_read(&mut reader);
test_embedded_io_write(&mut writer);

write_str("Embedded-io tests successful!");

uart.put_reader(reader);
uart.put_writer(writer);
uart.disable()
}

fn test_embedded_io_read(reader: &mut Reader<UART4>) {
let mut data_buffer: [u8; TEST_DATA_LENGTH] = [0; TEST_DATA_LENGTH];

write_str("Starting embedded-io read test...");

reader.read_exact(&mut data_buffer).unwrap();

for (index, &value) in data_buffer.iter().enumerate() {
let expected_value = 0xFF - (index as u8);
if value != expected_value {
panic!(
"Received invalid byte @ index {}, expected {}, got {}",
index, expected_value, value
);
}
}

write_str("Embedded-io read test successful!");
}

fn test_embedded_io_write(writer: &mut Writer<UART4>) {
let tx_data: Vec<u8, TEST_DATA_LENGTH> =
(0..TEST_DATA_LENGTH).map(|i| 0xFF - (i as u8)).collect();
writer.write_all(&tx_data).unwrap();

write_str("Embedded-io test data chunk sent!");

let some_var = 123;
let some_hex = 0xBEEF;
let some_str = "hello world";
let some_char = 'a';

writer
.write_fmt(format_args!(
"Formatting test: {}, {:#08X}, {}, {}!",
some_var, some_hex, some_str, some_char
))
.unwrap();

write_str("Embedded-io formatted string sent!");
}
99 changes: 77 additions & 22 deletions tests/requirements/test/test_hal_uart.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def main() -> None:
uart = RemoteUARTConnection(ssh, uart_config)
if not uart.open_uart():
logging.critical("TEST FAILED, could not establish UART connection!")
sys.exit(100)
sys.exit(1000)

perform_uart_io_tests(uart, rtt, ssh, 1024)

Expand All @@ -72,33 +72,16 @@ def perform_uart_io_tests(
uart: RemoteUARTConnection,
rtt: CalldwellRTTClient,
ssh: SSHClient,
data_length: int,
test_data_length: int,
) -> None:
"""Tests UART I/O using UART interface that's connected to target board"""
# Perform a simple handshake to make sure that the connection is established correctly
# (baudrates are OK, etc.)
perform_uart_handshake(uart, rtt, ssh)
perform_uart_standard_io_test(uart, rtt, ssh, test_data_length)
perform_uart_embedded_io_test(uart, rtt, ssh, test_data_length)

# Reception test (test host -> MCU) with large chunk of data
uart.write_bytes(bytes([i % 0x100 for i in range(data_length)]))
wait_for_messages(rtt, ssh, ["Data reception test successful!", "Test data transmitted!"])

# Transmission test (MCU -> test host) with large chunk of data
reception_result = uart.read_exact_bytes(data_length)
if reception_result.is_err:
logging.critical("TEST FAILED, could not receive all the data from target board!")
sys.exit(50)

received_data = reception_result.unwrap()
for byte, i in zip(received_data, range(data_length)):
if byte != (expected := i % 0x100):
logging.critical(
f"TEST FAILED, unexpected byte @ index {i}, expected {expected}, got {byte}",
)
sys.exit(60)

logging.info("Data transmission test successful!")
logging.info("All I/O test successful!")
logging.info("All I/O tests successful!")


def perform_uart_handshake(
Expand Down Expand Up @@ -151,6 +134,78 @@ def perform_uart_handshake(
logging.info("UART handshake successful!")


def perform_uart_standard_io_test(
uart: RemoteUARTConnection,
rtt: CalldwellRTTClient,
ssh: SSHClient,
test_data_length: int,
) -> None:
"""Performs I/O tests using standard UART implementation"""
# Reception test (test host -> MCU) with large chunk of data
uart.write_bytes(bytes([i % 0x100 for i in range(test_data_length)]))
wait_for_messages(rtt, ssh, ["Data reception test successful!", "Test data transmitted!"])

# Transmission test (MCU -> test host) with large chunk of data
reception_result = uart.read_exact_bytes(test_data_length)
if reception_result.is_err:
logging.critical("TEST FAILED, could not receive all the data from target board!")
sys.exit(50)

received_data = reception_result.unwrap()
for byte, i in zip(received_data, range(test_data_length)):
if byte != (expected := i % 0x100):
logging.critical(
f"TEST FAILED, unexpected byte @ index {i}, expected {expected}, got {byte}",
)
sys.exit(60)

logging.info("Data transmission test successful!")


def perform_uart_embedded_io_test(
uart: RemoteUARTConnection,
rtt: CalldwellRTTClient,
ssh: SSHClient,
test_data_length: int,
) -> None:
"""Performs I/o test using embedded-io traits implementation"""
wait_for_messages(rtt, ssh, ["Starting embedded-io traits tests."])
wait_for_messages(rtt, ssh, ["Starting embedded-io read test..."])

uart.write_bytes(bytes([0xFF - (i % 0x100) for i in range(test_data_length)]))

wait_for_messages(rtt, ssh, ["Embedded-io read test successful!"])

wait_for_messages(rtt, ssh, ["Embedded-io test data chunk sent!"])

chunk_reception_result = uart.read_exact_bytes(test_data_length)
if chunk_reception_result.is_err:
logging.critical("TEST FAILED, could not receive all the data from target board!")
sys.exit(70)

received_data_chunk = chunk_reception_result.unwrap()
for byte, i in zip(received_data_chunk, range(test_data_length)):
if byte != (expected := 0xFF - (i % 0x100)):
logging.critical(
f"TEST FAILED, unexpected byte @ index {i}, expected {expected}, got {byte}",
)
sys.exit(80)

wait_for_messages(rtt, ssh, ["Embedded-io formatted string sent!"])

string_reception_result = uart.read_string(b"!")
if string_reception_result.is_err:
logging.critical("TEST FAILED, could not receive formatted string from target board!")
sys.exit(90)

expected_string = "Formatting test: 123, 0x00BEEF, hello world, a!"
if (received_string := string_reception_result.unwrap()) != expected_string:
logging.critical(f"TEST FAILED, expected {expected_string}, got {received_string}")
sys.exit(100)

wait_for_messages(rtt, ssh, ["Embedded-io tests successful!"])


if __name__ == "__main__":
init_default_logger()
main()

0 comments on commit 3506c0c

Please sign in to comment.