对 RTIC 框架的探索
Jens
Posted on April 12, 2024
本文尝试通过 The RTIC Book, Tjäder 2021: A Zero-Cost Abstraction for Memory Safe Concurrency, Eriksson et al. 2013: Real-Time For the Masses, Step 1: Programming API and Static Priority SRP Kernel Primitives 以及RTIC 源码, 对 RTIC 框架的原理和实现进行简单的解释与分析.
1. RTIC 简介
RTIC (Real-time Interrupt-driven Concurrency) "实时中断驱动并发框架" 旨在使用硬件支持的优先级中断 (特别是 ARM 中的 NVIC 机制, 允许低优先级 ISR 被高优先级 ISR 抢占) 以及 Rust 异步执行器来安全实现 zero-cost 并行抽象, 我们可以把它理解成一个 "调度器".
RTIC 使用了基于优先级的临界区(Priority-based Critical Sections) 保证了高效的无竞争内存共享 (Data-race-free Memory Sharing), 并在编译时能够保证运行期间无死锁 (Deadlock-free Execution)
RTIC 还能够保证所有任务都能够在一个栈中运行.
2. SRP 规则
参考 Eriksson et al. 2013
SRP (Stack Resource Policy) 是一个使不同优先级能够共享同一个执行栈的资源分配规则, 是 Priority Ceiling Protocol (PCP) 的改进方法. RTIC 使用该方法实现对共享资源的分配和申请这些资源的作业调度.
- 作业 (Job)
- 将作业(Job, $J$) 定义: 为一个有限待执行指令序列
- 作业 "抢占性" (preemptive level)
定义为:
- 资源 (Resource, $R$)
- 将所有 有可能 访问资源 $R$ 的任务集合定义为: $L(R)$
- 将资源$R$的 "当前最大访问优先级" (current ceiling)
定义为: 所有可能访问资源
R
工作(Job) 优先级的最大值
- 系统 "当前最大访问优先级" (current system ceiling)
- 这个 ceiling 是动态变化的, 根据执行任务和任务对资源的占有有关
- 将系统 "当前最大访问优先级" $\prod$ 为: 当前执行作业优先级与所有被占有且未完成资源的 "当前访问最大值" 的最大值 其中 $J$ 为当前执行的作业, 为所有被占有且未完成的资源. 执行算法:
- 在锁住资源时, 将满足下面条件的任务停止执行:
- i.e. 能够成功持锁的任务必须 严格大于 当前的 Priority Ceiling
- 在释放资源时, 将之前停止的任务恢复
直观图像如下: (From Emil Fresk: RTIC: Real Time Interrupt Driven Concurrency Talk , Slides)
其中 Job1
, Job2
共享这个资源, Job3
不使用这个资源. Job1
, Job2
对资源进行访问时, 蓝色线条及以下优先级的任务 (除了正在访问的任务外) 都被禁止运行. 可以看到在 Job3
运行完毕后应该让 Job2
运行, 但是因为 Job2
可能会 访问 Job1
正在访问的资源, 它被禁止运行, 直到 Job1
释放资源后, 才被立即执行.
常见的 RTOS 不能使用这样硬件加速的 SAP 抢占式调度的原因是: 他们使用的线程模型 (Threading Model) 并在编译时静态分析出“任务-资源”依赖关系. 而 RTIC 的设计使得静态分析出每个资源的 ceiling
变得可能.
3. ARM Cortex-M 中断机制
在 ARM 架构下, BASEPRI
(Base Priority) 寄存器就能够实现 Priority Ceiling 算法, 它会将低于这个寄存器的所有中断全部屏蔽. 我们只需要在进入临界区和离开临界区时修改这个寄存器即可.
这使用到了 ARM 的 NVIC (Nested Vectored Interrupt Controller) 机制, 这是硬件实现的中断优先级机制, 允许高优先级中断 ISR 抢占正在执行的低优先级中断 ISR. 这样的机制使得硬件加速的抢占式优先级调度成为可能.
3.1 NVIC 模块
3.2 中断向量表 (Vector Table)
在 ARM 中, 中断 (Interrupts) 被认为是一种特殊的 异常 (Exception). 异常和中断的 ISR 地址被保存在 中断向量表 (Vector Table) 中, 其格式如下 (Cortex-m7)
我们也可以从 cortex-m-rt
中对 .vector_table
段的定义了解它的格式
/* ## Sections in FLASH */
/* ### Vector table */
.vector_table ORIGIN(FLASH) :
{
__vector_table = .;
/* Initial Stack Pointer (SP) value.
* We mask the bottom three bits to force 8-byte alignment.
* Despite having an assert for this later, it's possible that a separate
* linker script could override _stack_start after the assert is checked.
*/
LONG(_stack_start & 0xFFFFFFF8);
/* Reset vector */
KEEP(*(.vector_table.reset_vector)); /* this is the `__RESET_VECTOR` symbol */
/* Exceptions */
__exceptions = .; /* start of exceptions */
KEEP(*(.vector_table.exceptions)); /* this is the `__EXCEPTIONS` symbol */
__eexceptions = .; /* end of exceptions */
/* Device specific interrupts */
KEEP(*(.vector_table.interrupts)); /* this is the `__INTERRUPTS` symbol */
} > FLASH
我们从 __vector_table
地址开始, 一次对每一个入口进行讲解
-
V[0]
: 初始化栈指针, 这会在设备启动或重置时设置栈指针SP
-
V[1]
: 重置行为入口地址, 如果设备被重置, 其指向的函数入口将会被执行 -
V[2]
:NMI
(Non-maskable Interrupt) ISR -
V[3]
:HardFault
异常, 在无法恢复的错误发生时会被触发- 它相当于一个
catch all
- 其可以细化为
BusFault
,UsageFaule
,Memory Management Fault
等
- 它相当于一个
-
V[4]
:Memory Management Fault
异常, 被违反 MMU 规则的内存访问触发 -
V[5]
:BusFault
异常- 例如对为对其的地址进行访问等
-
V[6]
:UsageFaule
异常- 例如 除零异常 等
-
V[11]
:SVCall
异常, 在使用系统调用 (Superviosr Call) 时会被触发 -
V[14]
:PendSV
异常, 用于在多个 IRQ 发生时延迟回到用户态 (Thread Mode) 的上下文切换来 避免 IRQ 延迟- 参考这篇文章
-
V[15]
:SysTick
异常, 用于系统时钟 -
V[16-255]
:IRQ
异常, 外部设备中断触发的异常
在 cortex-m-rt
中, 我们定义了 __EXCEPTIONS
, __RESET
, __INTERRUPTS
等数据结构并将他们放入 .vector_table
段对向量表进行初始化. 并且将中断/异常名称相同的函数作为其入口. 我们如果希望重写中断, 仅需要覆盖定义这些函数即可
PROVIDE(NonMaskableInt = DefaultHandler);
PROVIDE(MemoryManagement = DefaultHandler);
PROVIDE(BusFault = DefaultHandler);
PROVIDE(UsageFault = DefaultHandler);
PROVIDE(SecureFault = DefaultHandler);
PROVIDE(SVCall = DefaultHandler);
PROVIDE(DebugMonitor = DefaultHandler);
PROVIDE(PendSV = DefaultHandler);
PROVIDE(SysTick = DefaultHandler);
4. Rust 异步状态机
在 Rust 中 async/await 语法用于编写可以执行 非阻塞 的异步代码. 这对于 I/O 绑定任务尤其有用, 例如从文件中读取数据或进行网络请求, 在这种情况下, 在等待操作完成时阻塞程序的执行. Rust 中的 async/await 语法建立在状态机之上, 状态机是一个用于管理异步操作执行状态的概念. 我们将一个异步状态机定义为 Future
, 对状态机的一次执行通过调用 Future::poll
实现. 如果状态机执行完成, 会返回 Poll::Ready(_)
(其中包含计算结果), 如果未执行完成, 会返回 Poll::Pending
, 代表这个状态机需要再次调用 Future::poll
进行执行.
调用 Rust 的 async
关键词修饰的函数 async fn foo(...) -> T
会被认为返回了 Future<Output = T>
, 这个 Future
对象是 Rust 编译器自动生成的匿名对象.
编译器在 async
函数的每一个 .await
或 yield
语句后生成一个状态, 执行这个语句之前会保存下一个状态所需的所有变量, 状态会保存在 Future
中.
5. 任务 (Tasks)
RTIC 将任务分为 硬件任务 (Hardware Task) 和 软件任务 (Software Task).
任务通过 #[task]
修饰, 对于硬件任务, 需要通过 binds
指定其中断源, 对于软件任务就不需要. 我们需要在每个任务的 #[task]
中声明其优先级 priority
, 局部资源 loacal
和 共享资源 shared
.
所有任务都由中断驱动, 因为 BASEPRI
作为 SRP 调度的. 我们通过保存和恢复 BASEPRI
维护调度的正确性, 每次对执行器的调用都被包裹在 rtic::cortex_basepri::run
中, 如下
#[inline(always)]
pub fn run<F>(priority: u8, f: F)
where
F: FnOnce(),
{
if priority == 1 {
// If the priority of this interrupt is `1` then BASEPRI can only be `0`
f();
unsafe { basepri::write(0) }
} else {
let initial = basepri::read();
f();
unsafe { basepri::write(initial) }
}
}
5.1 硬件任务 (Hardware Tasks)
硬件任务会绑定一个中断源, 在中断源被触发时, 硬件任务会被以 ISR 被执行. 我们要求硬件任务必须 完整执行, i.e. , "run-to-completion".
对于硬件任务
#[task(binds = SPI3)]
fn hw_task(_cx: hw_task::Context) {
print!("hello world");
}
其生成的代码为
#[no_mangle]
unsafe fn SPI3() {
const PRIORITY: u8 = 1u8;
rtic::export::run(PRIORITY, || hw_task(hw_task::Context::new()));
}
fn hw_task(_cx: hw_task::Context) {
print!("hello world");
}
5.2 软件任务 (Software Tasks)
软件任务被定义为一个 async
函数, 这说明软件任务使用了 Rust 的异步状态机 实现. 那么这个状态机什么时候执行, 被谁执行, 又怎样实现抢占式优先级调度呢?
每一个软件任务都会被分配一个 Executor
(执行器), 这个执行器包装了对这个软件任务 Future
的一次执行.
5.2.1 Executor
执行器
Future
作为一个状态机, 能够在一次执行中改变其状态. 我们将每一次 .await
或 yield
作为一次保存状态的位置, 在这里我们主动的将 CPU 执行时间让出. 在这两个语句之间的代码会被一次 Future::poll
执行.
Future::poll
返回一个 Poll<T>
对象, 它为一个枚举类型, Poll::Pending
表示状态机未达到完成状态, 需要再次被 poll
, Poll::Ready(T)
表示状态机执行完成, 其内容 T
为执行结果.
RTIC 创建了一个 AsyncTaskExecutor
来保存任务的 调度执行状态 (是否可以运行, 是否正在运行)和 状态机状态 (状态节点, 需要的局部变量).
/// Executor for an async task.
pub struct AsyncTaskExecutor<F: Future> {
// `task` is protected by the `running` flag.
task: UnsafeCell<MaybeUninit<F>>,
running: AtomicBool,
pending: AtomicBool,
}
其中, 我们通过 running
表示任务是否活跃, 仅在初始化 task
前 和 任务执行完成 (返回 Poll::Ready(_)
) 后为 false
. 如果 running == false
, 则可以修改 task
成员.
因为 软件任务 为一个异步函数, 返回实现 Future
的对象, 通过调用这个任务函数我们就能够获得对应的 Future
. 这样可以将这个 Future
通过 AsyncTaskExecutor::spawn
移动到 AsyncTaskExecutor
中 (所有权可以避免一个 Future
被多个 Executor
使用).
执行器通过对该软件任务的一次执行后返回执行结果. Rust 的 Future::poll(self, cx) -> Poll
接口需要传入一个 Context
对象, 这个对象会被用于在能任务能够继续被执行是唤醒执行器. 比如: 如果任务需要等待一个延迟, 需要在返回 Poll::Pending
前设置一个时钟延迟, 在延迟到期后回调 Context::waker::wake
函数唤醒这个执行器使它能够继续执行 (不一定马上执行).
AsyncTaskExecutor::poll
函数包装了 Future::poll
函数, 在执行前检查任务是否活跃 (AsyncTaskExecutor::is_running
), 如果活跃就执行. 如果任务执行完成 (返回 Poll::Ready(_)
) 就将状态设置为不活跃 (is_running = false
). 函数还接受一个自定义 wake
闭包, 使得调用函数能够自定义唤醒行为.
impl<F: Future> AsyncTaskExecutor<F> {
/// Poll the future in the executor.
#[inline(always)]
pub fn poll(&self, wake: fn()) {
if self.is_running() && self.check_and_clear_pending() {
let waker = unsafe { Waker::from_raw(RawWaker::new(wake as *const (), &WAKER_VTABLE)) };
let mut cx = Context::from_waker(&waker);
let future = unsafe { Pin::new_unchecked(&mut *(self.task.get() as *mut F)) };
match future.poll(&mut cx) {
Poll::Ready(_) => {
self.running.store(false, Ordering::Release);
}
Poll::Pending => {}
}
}
}
}
我们需要注意 RTIC 的执行器是全局的, 它的创建十分巧妙. 我们在全局声明这个执行器的指针 AsyncTaskExecutorPtr
, 此时这个指针为空.
/// Pointer to executor holder.
pub struct AsyncTaskExecutorPtr {
// Void pointer.
ptr: AtomicPtr<()>,
}
impl AsyncTaskExecutorPtr {
pub const fn new() -> Self {
Self {
ptr: AtomicPtr::new(core::ptr::null_mut()),
}
}
}
这是它的生成代码
// Generate executor definition and priority in global scope
for (name, _) in app.software_tasks.iter() {
let exec_name = util::internal_task_ident(name, "EXEC");
items.push(quote!(
#[allow(non_upper_case_globals)]
static #exec_name: rtic::export::executor::AsyncTaskExecutorPtr =
rtic::export::executor::AsyncTaskExecutorPtr::new();
));
}
我们在 main
入口中创建一个执行器, 并将这个执行器, 并使用 AsyncTaskExecutorPtr::set_in_main
传入这个执行器在 main
函数栈贞中的地址初始化这个全局指针.
impl AsyncTaskExecutorPtr {
#[inline(always)]
pub fn set_in_main<F: Future>(&self, executor: &ManuallyDrop<AsyncTaskExecutor<F>>) {
self.ptr.store(executor as *const _ as _, Ordering::Relaxed);
}
}
我们可以在 rtic-macros::main::codegen
中看到它的代码: 创建 executor
, 调用 set_in_main
传入它的地址初始化
for (name, _) in app.software_tasks.iter() {
let exec_name = util::internal_task_ident(name, "EXEC");
let new_n_args = util::new_n_args_ident(app.software_tasks[name].inputs.len());
executor_allocations.push(quote!(
let executor = ::core::mem::ManuallyDrop::new(rtic::export::executor::AsyncTaskExecutor::#new_n_args(#name));
executors_size += ::core::mem::size_of_val(&executor);
#exec_name.set_in_main(&executor);
));
}
尽管执行器在 main
中创建, 是它的全局变量, 它在整个运行过程中不会被销毁. 因为 main
永远不会退出, 执行完初始化后它会进入循环. 因为它的优先级最低, 当有任务时总会被其他任务抢占, 它可以用来执行 idle
后台任务 (如果有的话) 或者 无限循环.
#[no_mangle]
unsafe extern "C" fn main() -> ! { /* ... */}
它的布局如下, 可以在 rtic-macros::main::codegen
中查看
quote!(
#(#extra_mods_stmts)*
#[doc(hidden)]
#[no_mangle]
unsafe extern "C" fn #main() -> ! {
#(#assertion_stmts)*
#(#pre_init_stmts)*
#[inline(never)]
fn __rtic_init_resources<F>(f: F) where F: FnOnce() {
f();
}
// Generate allocations for async executors.
let mut executors_size = 0;
#(#executor_allocations)*
#(#msp_check)*
// Wrap late_init_stmts in a function to ensure that stack space is reclaimed.
__rtic_init_resources(||{
let (shared_resources, local_resources) = #init_name(#init_name::Context::new(#init_args));
#(#post_init_stmts)*
});
#call_idle
}
)
5.2.2 Dispatcher
分发器
所有优先级相同的任务会被分配到一个 Dispatcher
(分法器) 中, 每一个 Dispatcher
会与一个独一的中断源进行绑定, 注意这个中断源不能够是硬件任务绑定过的中断源. 我们这里讨论 优先级 > 0 的 Dispatcher
, 因为 0-优先级 Dispatcher
被认为是一个后台任务, 不通过中断驱动.
关于刚才这个限制的检查代码能够在 rtio-macro::syntax::check::app
定义中找到
// check that dispatchers are not used as hardware tasks
for task in app.hardware_tasks.values() {
let binds = &task.args.binds;
if app.args.dispatchers.contains_key(binds) {
return Err(parse::Error::new(
binds.span(),
"dispatcher interrupts can't be used as hardware tasks",
));
}
}
每一个 Dispatcher
负责使用轮转调度当前优先级的所有任务.
在静态代码分析时, rtic-macros::syntax::analyze::app
将相同优先级的任务收集成为一个 Channel
, 使用 BTreeMap<Priority, Channel>
进行 优先级 - 任务 的 1:n 映射.
let mut channels = Channels::new();
for (name, spawnee) in &app.software_tasks {
let spawnee_prio = spawnee.args.priority;
let channel = channels.entry(spawnee_prio).or_default();
channel.tasks.insert(name.clone());
// All inputs are send as we do not know from where they may be spawned.
spawnee.inputs.iter().for_each(|input| {
send_types.insert(input.ty.clone());
});
}
这个 Dispatcher
的布局如下. 其中 dispatcher_name
为中断名, 因为使用 cortex-m-rt
库, 中断名同名函数自动会被作为这个中断的 ISR.
let doc = format!("Interrupt handler to dispatch async tasks at priority {level}");
let attribute = &interrupts.get(&level).expect("UNREACHABLE").1.attrs;
let config = handler_config(app, analysis, dispatcher_name.clone());
items.push(quote!(
#[allow(non_snake_case)]
#[doc = #doc]
#[no_mangle]
#(#attribute)*
#(#config)*
unsafe fn #dispatcher_name() {
/// The priority of this interrupt handler
const PRIORITY: u8 = #level;
rtic::export::run(PRIORITY, || {
#(#stmts)*
});
}
));
其中 stmsts
是对当前优先级的所有软件任务执行语句.
for name in channel.tasks.iter() {
let exec_name = util::internal_task_ident(name, "EXEC");
let from_ptr_n_args =
util::from_ptr_n_args_ident(app.software_tasks[name].inputs.len());
stmts.push(quote!(
let exec = rtic::export::executor::AsyncTaskExecutor::#from_ptr_n_args(#name, &#exec_name);
exec.poll(|| {
let exec = rtic::export::executor::AsyncTaskExecutor::#from_ptr_n_args(#name, &#exec_name);
exec.set_pending();
#pend_interrupt
});
));
}
我们看到 Dispatcher
执行是在 AsyncTaskExecutor::poll
上的一个 wrapper, 这个 wrapper 定义了唤醒行为, 这个行为包括
- 设置
AsyncExecutor::pending
值, 使得执行器被唤醒时继续执行 - 触发当前
Dispatcher
中断源的中断, 达到重新执行这个Dispatcher
的效果
由于唤醒时会触发中断, 会使得 Dispatcher
再次被调用, 直到任务结束. 如果软件任务是一个无限循环, 那么 Dispatcher
永远不会退出, 如果软件任务只需要被运行一次, 在它结束之后如果再次被唤醒, 因为 AsyncTaskExecutor::poll
执行时的检查条件 self.is_running() && self.check_and_clear_pending()
, 它将不会被执行.
let pend_interrupt = if level > 0 {
let int_mod = interrupt_mod(app);
quote!(rtic::export::pend(#int_mod::#dispatcher_name);)
} else {
// For 0 priority tasks we don't need to pend anything
quote!()
};
这里触发中断通过 cortex_m::periphral::NVIC
对应的中断 bit 写入实现
/// Sets the given `interrupt` as pending
///
/// This is a convenience function around
/// [`NVIC::pend`](../cortex_m/peripheral/struct.NVIC.html#method.pend)
pub fn pend<I>(interrupt: I)
where
I: InterruptNumber,
{
NVIC::pend(interrupt);
}
Dispatcher
的执行流程
Waker::wake
-> HW Interrupt -> dispatcher
ISR -> AsyncTaskExecutor::poll
-> Future::poll
-> ISR Done
5.3 idle
任务 和 0-优先级任务
idle
作为一个 0-优先级 任务, 在 main
函数完成初始化后一直运行. 因为 main
函数不能够退出, idle
任务也不能够退出. 并且 idle
任务和 0-优先级任务不能够同时存在, 因为 idle
任务永不退出, 不能与 Dispatcher
兼容. 在 rt-macro::analyze::app
中能够找到其逻辑
// Check 0-priority async software tasks and idle dependency
for (name, task) in &app.software_tasks {
if task.args.priority == 0 {
// If there is a 0-priority task, there must be no idle
if app.idle.is_some() {
error.push(syn::Error::new(
name.span(),
format!(
"Async task {:?} has priority 0, but `#[idle]` is defined. 0-priority async tasks are only allowed if there is no `#[idle]`.",
name.to_string(),
)
));
}
}
}
在 main
函数中, call_idle
块 (最后部分) 被定义为如下
let call_idle = if let Some(idle) = &app.idle {
let name = &idle.name;
quote!(#name(#name::Context::new()))
} else if analysis.channels.get(&0).is_some() {
let dispatcher = util::zero_prio_dispatcher_ident();
quote!(#dispatcher();)
} else {
quote!(loop {})
};
如果有 0-优先级 软件任务, 就会调用 0-Priority Dispatcher
. 它的工作是循环对 0-优先级 任务进行 poll
, 它与 idle
本质一样, 也不会返回.
它的代码可以在 rtic-macros::async_dispatchers::codegen
中找到
if level > 0 {
// generate code for non-zero dispatcher
} else {
items.push(quote!(
#[allow(non_snake_case)]
unsafe fn #dispatcher_name() -> ! {
loop {
#(#stmts)*
}
}
));
}
5.4 init
任务
init
任务被用户定义, 在 main
函数中被执行. 它初始化所有任务的 本地资源 (local resources) 和 共享资源 (shared resources), 并返回一个元组.
在整个过程中, 中断被关闭, i.e. 执行流不会被打断.
6. 资源
RTIC 的资源分为 "局部资源" 和 "共享资源". 他们使用 #[local]
和 #[shared]
修饰的结构体来标识
#[shared]
struct Shared {
shared1: u32,
shared2: u32,
}
#[local]
struct Local {
foo_local_res_1: u32,
bar_local_res_1: u32,
}
6.1 局部资源 (Local Resources)
局部资源只能够被一个任务访问, 通过 #[task(local = [...])]
声明. 这里有两种声明方式
- 在
app
范围使用#[shared]
修饰的结构体定义, 如上 - 在任务范围使用
#[task(local = [foo: FooType = FooType{ ... }])]
因为局部资源可以在 init
任务 (处于临界区, 互斥访问) 中被初始化, 后交还给指定的任务, 局部资源需要实现 Send
接口, 因为在 init
和目标任务之间, 这个变量穿过了线程边界 (thread boundary)
[!info]
Send
andSync
Send
是指类型能在线程之间传递, 如Arc
,Mutex
等.Sync
是指类型能在线程之间共享,T impls Sync
iff&T impls Send
.
6.2 共享资源 (Shared Resources)
共享资源通过在 #[task(shared = [...])]
声明.
对于仅在同一优先级之间共享的资源, 因为为单核模型, 它们只能顺序执行, 互不抢占, 所以我们能够不使用 #[lock_free]
修饰这个共享变量. 它的互斥访问通过 Rust 的所属权语义来保障, i.e. 在一个时刻, 一个变量只可能存在以下两种的引用情况
- 多个非可变引用 (Immutable Reference)
&T
- 一个可变引用 (Mutable Reference)
&mut T
如果在不同优先级间共享的变量被 #[lock_free]
修饰将不能通过编译, 这会在 rtic-macros::syntax::analyze
中被检查
// Check that lock_free resources are correct
for lf_res in lock_free.iter() {
for (task, tr, _, priority) in task_resources_list.iter() {
for r in tr {
// Get all uses of resources annotated lock_free
if lf_res == r {
// Check so async tasks do not use lock free resources
if app.software_tasks.get(task).is_some() {
error.push(syn::Error::new(
r.span(),
format!(
"Lock free shared resource {:?} is used by an async tasks, which is forbidden",
r.to_string(),
),
));
}
// HashMap returns the previous existing object if old.key == new.key
if let Some(lf_res) = lf_hash.insert(
r.to_string(),
(task, r, priority)
) {
// Check if priority differ, if it does, append to
// list of resources which will be annotated with errors
if priority != lf_res.2 {
lf_res_with_error.push(lf_res.1);
lf_res_with_error.push(r);
}
// If the resource already violates lock free properties
if lf_res_with_error.contains(&r) {
lf_res_with_error.push(lf_res.1);
lf_res_with_error.push(r);
}
}
}
}
}
}
对于可能被抢占的共享变量的访问, 我们需要临界区保护. RTIC 使用了基于优先级的临界区(Priority-based Critical Sections) 保证了高效的无竞争内存共享(Data-race-free Memory Sharing)
传入每个任务的 Context
中的共享变量实现了 Mutex
接口, 我们通过 lock
调用传入闭包进行对共享变量的访问和修改. 闭包内形成了一个临界区. lock
API 的实现采用了之前提到的 Priority Ceiling Protocol 进行同步, 并使用 SRP 进行调度.
我们能看到 rtic::cortex_basepri::lock
的实现, 通过资源在 lock
实现时的 ceiling
, 将 BASEPRI
在临界区设置为访问当前资源的 ceiling
. 如果希望在最高优先级执行, 使用 critical_section::with
方法关闭所有中断, 进入全局临界区.
#[inline(always)]
pub unsafe fn lock<T, R>(
ptr: *mut T,
ceiling: u8,
nvic_prio_bits: u8,
f: impl FnOnce(&mut T) -> R,
) -> R {
if ceiling == (1 << nvic_prio_bits) {
critical_section::with(|_| f(&mut *ptr))
} else {
let current = basepri::read();
basepri_max::write(cortex_logical2hw(ceiling, nvic_prio_bits));
let r = f(&mut *ptr);
basepri::write(current);
r
}
}
这里并不是共享资源 lock
的实现, 仅是硬件资源提供的抽象. 对于共享资源的 lock
实现, 我们能够在 rtic-macros::codegen::bindings::cortex::inpl_mutex
中找到, 它将通过静态分析得到的资源 ceiling
作为常量 CEILING
保存在资源的 lock
函数实现里, 调用硬件提供的 lock
函数.
/// Generates a `Mutex` implementation
#[allow(clippy::too_many_arguments)]
pub fn impl_mutex(
app: &App,
_analysis: &CodegenAnalysis,
cfgs: &[Attribute],
resources_prefix: bool,
name: &Ident,
ty: &TokenStream2,
ceiling: u8,
ptr: &TokenStream2,
) -> TokenStream2 {
let path = if resources_prefix {
quote!(shared_resources::#name)
} else {
quote!(#name)
};
let device = &app.args.device;
quote!(
#(#cfgs)*
impl<'a> rtic::Mutex for #path<'a> {
type T = #ty;
#[inline(always)]
fn lock<RTIC_INTERNAL_R>(&mut self, f: impl FnOnce(&mut #ty) -> RTIC_INTERNAL_R) -> RTIC_INTERNAL_R {
/// Priority ceiling
const CEILING: u8 = #ceiling;
unsafe {
rtic::export::lock(
#ptr,
CEILING,
#device::NVIC_PRIO_BITS,
f,
)
}
}
}
)
}
7. Rust Macros
Rust macro 分为 声明性(Declarative) Macro 和 步骤性(Procedural) Macro, 更多资讯请参考 The Little Book of Rust Macros
由于声明性 macro 的功能较弱, 与 C 中的 #define
类似 (但是工作在 AST 上, 而不是 pre-processor 的直接替换文本, 提高了安全性和可读性), RTIC 使用步骤性 macro 对代码进行分析.
编译器在词法分析 (Lexical Analysis) 后会生成 Token Stream, 步骤性 Macro 就是在这里进行的, 它通过截取词法分析的 Token Stream, 将其转化为新的 Token Stream 再交还给编译器, 对于后续分析是透明的. 当然, 我们并不需要完成从 Token Stream 到 AST 的词法分析 (Syntax Analysis) 过程, "there is a crate for that": syn
能够通过 syn::parse(input)
得到 一个 Rust AST.
关于编译步骤, 你可以通过下图有一个高层次的了解 (From Geek for Geeks: Phases of a Compiler)
有关 RTIC 的静态分析代码可以在 rtic-macros
中找到, 我们先了解它的大致工作流程
#[proc_macro_attribute]
pub fn app(_args: TokenStream, _input: TokenStream) -> TokenStream {
let (mut app, analysis) = match syntax::parse(_args, _input) {
Err(e) => return e.to_compile_error().into(),
Ok(x) => x,
};
// Modify app based on backend before continuing
if let Err(e) = preprocess::app(&mut app, &analysis) {
return e.to_compile_error().into();
}
let app = app;
// App is not mutable after this point
if let Err(e) = check::app(&app, &analysis) {
return e.to_compile_error().into();
}
let analysis = analyze::app(analysis, &app);
let ts = codegen::app(&app, &analysis);
// write to disk ...
}
可以通过下图直观的了解 (From *Tjäder 2021 , Figure 3.6)
Rust syn
库能够将 Token 流转换成 AST (Abstract Syntax Tree), 我们将在 AST 层面探讨对 RTIC 代码的静态分析 .
8. Priority Ceiling 的计算方法
经过 parsing 后, 我们收集到下面的结构体
pub struct App {
/// The arguments to the `#[app]` attribute
pub args: AppArgs,
/// The name of the `const` item on which the `#[app]` attribute has been placed
pub name: Ident,
/// The `#[init]` function
pub init: Init,
/// The `#[idle]` function
pub idle: Option<Idle>,
/// Resources shared between tasks defined in `#[shared]`
pub shared_resources: Map<SharedResource>,
pub shared_resources_vis: syn::Visibility,
/// Task local resources defined in `#[local]`
pub local_resources: Map<LocalResource>,
pub local_resources_vis: syn::Visibility,
/// User imports
pub user_imports: Vec<ItemUse>,
/// User code
pub user_code: Vec<Item>,
/// Hardware tasks: `#[task(binds = ..)]`s
pub hardware_tasks: Map<HardwareTask>,
/// Async software tasks: `#[task]`
pub software_tasks: Map<SoftwareTask>,
}
要对每个资源进行分析, 我们需要首先收集 任务-资源 关系. 我们将所有的资源都收集到 task_resource_list
中. 我们将 app.init
, app.idle
, app.software_tasks
和 app.hardware_tasks
中所有的中的资源收集成一个 (Task Name, Shared Resources, Local Resources, Priority)
集合.
为得到每个共享资源的 priority ceiling
, 我们现在分析资源占有的情况. 定义 Access
结构体, 保存独占访问 x
和 不可变访问 &x
.
pub enum Access {
/// `[x]`, a mutable resource
Exclusive,
/// `[&x]`, a static non-mutable resource
Shared,
}
我们能够通过下面的 parse 代码了解其定义, 其中:
-
Expr::Path(
ExprPath
)
是一个完整变量名, 如std::mem::replace
-
Expr::Reference(
ExprReference
)]
是一个变量引用, 如&a
或&mut a
- 在代码中检查了
r.mutability
, 不允许可变引用的变量, 因为它不可能被共享
- 在代码中检查了
let (access, path) = match e {
Expr::Path(e) => (Access::Exclusive, e.path),
Expr::Reference(ref r) if r.mutability.is_none() => match &*r.expr {
Expr::Path(e) => (Access::Shared, e.path.clone()),
_ => return err,
},
_ => return err,
};
我们定义资源的所属情况 Ownership
-
Ownership::Owned
: 资源被一个任务独享, 我们保存这个任务的优先级 -
Ownership::CoOwned
: 资源被多个 相同优先级 的任务共享, 我们保存这个优先级 -
Ownership::Contended
: 资源可能被多个 不同优先级 任务抢占, 需要保存资源的 priority ceiling
pub enum Ownership {
/// Owned by a single task
Owned {
/// Priority of the task that owns this resource
priority: u8,
},
/// "Co-owned" by more than one task; all of them have the same priority
CoOwned {
/// Priority of the tasks that co-own this resource
priority: u8,
},
/// Contended by more than one task; the tasks have different priorities
Contended {
/// Priority ceiling
ceiling: u8,
},
}
我们通过遍历所有任务的资源访问情况, 算出 PCP (Priority Ceiling Protocol) 中每个任务需要的 Priority Ceiling:
- 对于每个新发现的资源
- 认为它是被一个任务独占的, 初始化为
Owned
类型 - 设置 资源优先级 为该 任务优先级
- 认为它是被一个任务独占的, 初始化为
- 对于重复发现的资源 (任何类型)
- 它一定 不是 被独占的的资源, 我们比较 资源优先级 与 (重复发现的) 任务优先级
- 不等, 认为是
Contended
(不同优先级任务共享), 并更新 资源优先级. - 相等, 认为是
CoOwned
(相同优先级任务共享)- 但不能确定一定是仅同优先级共享, 下一回合可能发现其他优先级转为
Contended
- 但不能确定一定是仅同优先级共享, 下一回合可能发现其他优先级转为
let mut used_shared_resource = IndexSet::new();
let mut ownerships = Ownerships::new();
let mut sync_types = SyncTypes::new();
for (prio, name, access) in app.shared_resource_accesses() {
let res = app.shared_resources.get(name).expect("UNREACHABLE");
// This shared resource is used
used_shared_resource.insert(name.clone());
if let Some(priority) = prio {
if let Some(ownership) = ownerships.get_mut(name) {
match *ownership {
Ownership::Owned { priority: ceiling }
| Ownership::CoOwned { priority: ceiling }
| Ownership::Contended { ceiling }
if priority != ceiling =>
{
*ownership = Ownership::Contended {
ceiling: cmp::max(ceiling, priority),
};
if access.is_shared() {
sync_types.insert(res.ty.clone());
}
}
Ownership::Owned { priority: ceil } if ceil == priority => {
*ownership = Ownership::CoOwned { priority };
}
_ => {}
}
} else {
ownerships.insert(name.clone(), Ownership::Owned { priority });
}
}
}
至此我们已经静态计算出每一个资源的 Priority Ceiling 了
9. RTIC 启动过程
RTIC 的入口函数为 main
, 它的工作为初始化和执行后台任务. 这个函数永不退出, 我们将 执行器 分配在 main
函数的栈贞中.
main
函数的主要工作为
- 关闭中断, 进入临界区
- 设置中断源优先级, 启用中断源 (中断仍然关闭)
- 调用
init
初始化资源 - 开启中断, 离开临界区
- 执行后台任务 (0-优先级)
关闭中断的代码能够在 rtic-macros::codegen::pre_init::codegen
中找到, 使用 interrupt::disable()
将中断关闭.
启用中断源的代码能够在 rtic-macros::codegen::bindings::pre_init_enable_interrupts
中找到.
它使用 NVIC
设置外部中断 (外部中断被认为是特殊的异常), 使用 SBC
设置系统异常.
我们对 interrupt_ids
和 hardware_tasks
中的中断进行, 其中 interrupt_ids
保存了 软件任务 对应的 Dispatcher
的中断源.
因为(外部)中断默认关闭, 我们通过 NVIC
设置外部中断后, 需要通过 unmask
开启中断源.
for (&priority, name) in interrupt_ids.chain(app.hardware_tasks.values().filter_map(|task| {
if is_exception(&task.args.binds) {
// We do exceptions in another pass
None
} else {
Some((&task.args.priority, &task.args.binds))
}
})) {
stmts.push(quote!(
core.NVIC.set_priority(
#rt_err::#interrupt::#name,
rtic::export::cortex_logical2hw(#priority, #nvic_prio_bits),
);
));
// NOTE unmask the interrupt *after* setting its priority: changing the priority of a pended
// interrupt is implementation defined
stmts.push(quote!(rtic::export::NVIC::unmask(#rt_err::#interrupt::#name);));
}
这里的 异常 被认为是 能够定义优先级的系统异常, 它包含以下异常
MemoryManagement
BusFault
UsageFault
SecureFault
SVCall
DebugMonitor
PendSV
-
SysTick
因为这些异常被认为是系统异常, 他们通过SCB
(System Configuration Block) 进行设置, 并默认开启
// Set exception priorities
for (name, priority) in app.hardware_tasks.values().filter_map(|task| {
if is_exception(&task.args.binds) {
Some((&task.args.binds, task.args.priority))
} else {
None
}
}) {
stmts.push(quote!(core.SCB.set_priority(
rtic::export::SystemHandler::#name,
rtic::export::cortex_logical2hw(#priority, #nvic_prio_bits),
);));
}
我们接下来分配软件任务的执行器. 我们提到, 我们为 每一个 软件任务分配一个执行器, 它的分配先通过在 main
栈上创建 AsyncTaskExecutor
, 再将它的地址复值到全局指针 AsyncTaskExecutorPtr
中, 使得所有函数都能够通过这个变量得到执行器.
for (name, _) in app.software_tasks.iter() {
let exec_name = util::internal_task_ident(name, "EXEC");
let new_n_args = util::new_n_args_ident(app.software_tasks[name].inputs.len());
executor_allocations.push(quote!(
let executor = ::core::mem::ManuallyDrop::new(rtic::export::executor::AsyncTaskExecutor::#new_n_args(#name));
executors_size += ::core::mem::size_of_val(&executor);
#exec_name.set_in_main(&executor);
));
}
我们接下来调用 init
任务函数对资源进行初始化. 注意到这里我们并不再 main
的执行栈上创建资源, 而是将 init
初始化资源的值拷贝到资源的全局变量中.
这里使用闭包并且不允许内联, 这样使得编译器对 let (shared_resources, local_resources)
分配的栈空间能够回收, 减少 main
调用栈的空间占用.
#[inline(never)]
fn __rtic_init_resources<F>(f: F) where F: FnOnce() {
f();
}
// Wrap late_init_stmts in a function to ensure that stack space is reclaimed.
__rtic_init_resources(||{
let (shared_resources, local_resources) = #init_name(#init_name::Context::new(#init_args));
#(#post_init_stmts)*
});
在获得了初始化的 shared_resources
和 local_resources
后, 通过拷贝的方式复制到对应资源的全局变量中. 注意这里可能对位置敏感类型产生影响.
完成拷贝后, 开启中断, 离开临界区
/// Generates code that runs after `#[init]` returns
pub fn codegen(app: &App, analysis: &Analysis) -> Vec<TokenStream2> {
let mut stmts = vec![];
// Initialize shared resources
for (name, res) in &app.shared_resources {
let mangled_name = util::static_shared_resource_ident(name);
// If it's live
let cfgs = res.cfgs.clone();
if analysis.shared_resources.get(name).is_some() {
stmts.push(quote!(
// We include the cfgs
#(#cfgs)*
// Resource is a RacyCell<MaybeUninit<T>>
// - `get_mut` to obtain a raw pointer to `MaybeUninit<T>`
// - `write` the defined value for the late resource T
#mangled_name.get_mut().write(core::mem::MaybeUninit::new(shared_resources.#name));
));
}
}
// Initialize local resources
for (name, res) in &app.local_resources {
let mangled_name = util::static_local_resource_ident(name);
// If it's live
let cfgs = res.cfgs.clone();
if analysis.local_resources.get(name).is_some() {
stmts.push(quote!(
// We include the cfgs
#(#cfgs)*
// Resource is a RacyCell<MaybeUninit<T>>
// - `get_mut` to obtain a raw pointer to `MaybeUninit<T>`
// - `write` the defined value for the late resource T
#mangled_name.get_mut().write(core::mem::MaybeUninit::new(local_resources.#name));
));
}
}
// Enable the interrupts -- this completes the `init`-ialization phase
stmts.push(quote!(rtic::export::interrupt::enable();));
stmts
}
现在初始化完成, 进入后台任务执行, 通过调用 idle
函数或 zero_prio_dispatcher
.
10. 总结
RTIC 是一个 Rust 语言编写的使用硬件调度加速的实时框架, 支持优先级抢占式调度. 它能够进行硬件调度的多亏于在代码定义时对每个任务占有的局部和共享资源的声明和任务优先级的静态定义. 这些信息能被用来分析出每个资源临界区时需要组织执行的任务. 它通过 ARM 的 NVIC 机制实现硬件电路级别的优先级抢占, 通过 BASEPRIO
寄存器实现阻塞互斥访问的任务, 以满足 Priority Ceiling Protocol, 实现对资源无锁访问的调度顺序.
Posted on April 12, 2024
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.
Related
November 29, 2024