From 9bc6fc2996255514e0d49e3da0655e2f93f14d8c Mon Sep 17 00:00:00 2001 From: Tilman Sauerbeck Date: Sun, 5 Jan 2020 22:22:41 +0100 Subject: [PATCH] common: Add the logger module. --- SConscript.libcommon.rs | 1 + src/common/lib.rs | 1 + src/common/logger.rs | 550 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 552 insertions(+) create mode 100644 src/common/logger.rs diff --git a/SConscript.libcommon.rs b/SConscript.libcommon.rs index 23f8277..afbf463 100644 --- a/SConscript.libcommon.rs +++ b/SConscript.libcommon.rs @@ -27,6 +27,7 @@ source_files = [ 'src/common/shell.rs', 'src/common/yencode.rs', 'src/common/varint.rs', + 'src/common/logger.rs', ] libcommon_rlib = env.Rustc('libcommon.rlib', source_files[0]) diff --git a/src/common/lib.rs b/src/common/lib.rs index a9c4f90..e7fb361 100644 --- a/src/common/lib.rs +++ b/src/common/lib.rs @@ -49,3 +49,4 @@ pub mod mx25l; pub mod shell; pub mod yencode; pub mod varint; +pub mod logger; diff --git a/src/common/logger.rs b/src/common/logger.rs new file mode 100644 index 0000000..be06327 --- /dev/null +++ b/src/common/logger.rs @@ -0,0 +1,550 @@ +/* + * Copyright (c) 2017-2020 Tilman Sauerbeck (tilman at code-monkey de) + * + * Permission is hereby granted, free of charge, to any person obtaining + * a copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to + * permit persons to whom the Software is furnished to do so, subject to + * the following conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE + * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION + * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION + * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +use gps::TimeAndPos; +use storage::Storage; +use varint; +use systick::elapsed_ms; + +pub const MEMORY_SIZE: usize = 2 << 20; +const SECTOR_SIZE: usize = 4 << 10; + +const NUM_SECTORS: usize = MEMORY_SIZE / SECTOR_SIZE; + +enum SectorFlag { + InUse = 1 << 0, + DataRecord = 1 << 1, + + // Overrides InUse. The idea is to have erased flash sectors + // (0xff..ff) be detected as not in use. + NotInUse = 1 << 7, +} + +#[repr(C)] +#[derive(Clone, Copy)] +struct SectorHeader { + flags: u16, // Combination of SectorFlag items. + recording_id: u16, // Zero is considered invalid. + start_time: u32, // UNIX time. Only present if flag DataRecord isn't set. +} + +impl SectorHeader { + fn new() -> SectorHeader { + SectorHeader { + flags: 0, + recording_id: 0, + start_time: 0, + } + } + + fn is_in_use(&self) -> bool { + let mask = (SectorFlag::InUse as u16) | (SectorFlag::NotInUse as u16); + let value = SectorFlag::InUse as u16; + + (self.flags & mask) == value + } + + fn starts_recording(&self) -> bool { + let mask = (SectorFlag::InUse as u16) + | (SectorFlag::DataRecord as u16) + | (SectorFlag::NotInUse as u16); + let value = SectorFlag::InUse as u16; + + (self.flags & mask) == value + } +} + +#[derive(Clone, Copy)] +struct InFlight { + d_time_s: u32, + d_lat: i32, + d_lon: i32, +} + +impl InFlight { + fn new() -> InFlight { + InFlight { + d_time_s: 0, + d_lat: 0, + d_lon: 0, + } + } +} + +pub struct Logger<'a> { + storage: &'a mut dyn Storage, + + recording_id: u16, // Zero is considered invalid. + + // The index of the first sector of the currently running recording. + // Only written in logger_start_recording. + first_sector: u16, + + recording_started: u32, + + // The number of slots filled in num_flight. + num_in_flight: usize, + + // Deltas not yet written out to write_buffer. + // + // Limiting ourselves to 7 items here means we can use + // 0xff as a padding byte. + in_flight: [InFlight; 7], + + sector_header: [SectorHeader; NUM_SECTORS], + + sectors_written: u16, + + write_buffer_offset: usize, + write_buffer: [u8; SECTOR_SIZE], +} + +struct SectorHeaderIter<'a> { + sector_header: &'a [SectorHeader; NUM_SECTORS], + it_front: usize, + it_back: usize, + indices: [u16; NUM_SECTORS], +} + +fn cmp_sector_header_indices(a: u16, b: u16, + sector_header: &[SectorHeader]) -> i32 { + let header_a = §or_header[a as usize]; + let header_b = §or_header[b as usize]; + + // Latest entries come first. + if header_a.start_time > header_b.start_time { + -1 + } else if header_a.start_time < header_b.start_time { + 1 + } else { + 0 + } +} + +fn downheap(heap: &mut [u16], mut index: usize, num_elts: usize, + sector_header: &[SectorHeader]) { + let orig = heap[index]; + + loop { + let mut worker = index * 2; + + if worker < num_elts && + cmp_sector_header_indices(heap[worker], heap[worker + 1], sector_header) < 0 { + worker += 1; + } + + if worker > num_elts || + cmp_sector_header_indices(orig, heap[worker], sector_header) >= 0 { + break; + } + + heap[index] = heap[worker]; + index = worker; + } + + heap[index] = orig; +} + +impl<'a> SectorHeaderIter<'a> { + fn new(logger: &'a Logger) -> SectorHeaderIter<'a> { + let mut iter = SectorHeaderIter { + sector_header: &logger.sector_header, + it_front: 0, + it_back: NUM_SECTORS, + indices: [0; NUM_SECTORS] + }; + + let mut num_used = 0; + + // Put the indices of the used directory entries at the beginning + // of the array. Ignore the unused ones since we are not going + // to sort them anyway. + for i in 0..NUM_SECTORS { + let sector_header = &iter.sector_header[i]; + + if sector_header.starts_recording() { + iter.indices[num_used] = i as u16; + num_used += 1; + } + } + + let num_elts_to_sort = num_used; + + if num_elts_to_sort != 0 { + // Sort the used directory entries. + for i in (1..((num_elts_to_sort + 1) / 2) + 1).rev() { + downheap(&mut iter.indices, i - 1, num_elts_to_sort - 1, + iter.sector_header); + } + + for i in (1..num_elts_to_sort).rev() { + let t = iter.indices[0]; + iter.indices[0] = iter.indices[i]; + iter.indices[i] = t; + + downheap(&mut iter.indices, 0, i - 1, iter.sector_header); + } + } + + // Now put the indices of the unused directory entries in the array. + if num_used == 0 { + for i in 0..NUM_SECTORS { + iter.indices[i] = i as u16; + } + } else { + let latest_used = iter.indices[0] as usize; + let mut offset_unused = num_used; + + // First put the entries that come after the latest one in use... + for i in (latest_used + 1)..NUM_SECTORS { + let sector_header = &iter.sector_header[i]; + + if !sector_header.is_in_use() { + iter.indices[offset_unused] = i as u16; + offset_unused += 1; + } + } + + // ... then wrap around if necessary. + for i in 0..latest_used { + let sector_header = &iter.sector_header[i]; + + if !sector_header.is_in_use() { + iter.indices[offset_unused] = i as u16; + offset_unused += 1; + } + } + } + + // XXX: + // Need to handle those sectors that don't start recordings + // but that are still used. + + iter + } +} + +impl<'a> Iterator for SectorHeaderIter<'a> { + type Item = usize; + + fn next(&mut self) -> Option { + if self.it_front == self.it_back { + None + } else { + let next_index = self.indices[self.it_front] as usize; + + self.it_front += 1; + + Some(next_index) + } + } +} + +impl<'a> DoubleEndedIterator for SectorHeaderIter<'a> { + fn next_back(&mut self) -> Option { + if self.it_back == self.it_front { + None + } else { + self.it_back -= 1; + + let next_index = self.indices[self.it_back] as usize; + + Some(next_index) + } + } +} + +fn normalize_angle(mut angle: i32) -> i32 { + let deg90 = 90 * 60 * 10000; + let deg180 = deg90 << 1; + let deg360 = deg180 << 1; + + while angle >= deg180 { + angle -= deg360; + } + + while angle <= -deg180 { + angle += deg360; + } + + angle +} + +fn max(a: T, b: T) -> T + where T: PartialOrd { + if a > b { + a + } else { + b + } +} + +impl<'a> Logger<'a> { + pub fn new(storage: &'a mut dyn Storage) -> Logger { + Logger { + storage: storage, + + recording_id: 0, + first_sector: 0, + recording_started: 0, + num_in_flight: 0, + + in_flight: [InFlight::new(); 7], + sector_header: [SectorHeader::new(); NUM_SECTORS], + + sectors_written: 0, + + write_buffer_offset: 0, + write_buffer: [0xff; SECTOR_SIZE], + } + } + + pub fn init(&mut self) { + // Reading the directory entries one by one means + // we won't need an as large buffer on the stack. + for i in 0..NUM_SECTORS { + let address = i * SECTOR_SIZE; + let mut chunk = [0u8; 4]; + + self.storage.read(address, &mut chunk); + + let sector_header_ptr: *mut SectorHeader = + &mut self.sector_header[i]; + + unsafe { + core::ptr::copy(chunk.as_ptr(), + sector_header_ptr as *mut u8, + chunk.len()); + } + } + } + + fn prepare_write_buffer(&mut self, is_initial: bool) { + self.write_buffer = [0xff; SECTOR_SIZE]; + + let flags = if is_initial { + (SectorFlag::InUse as u16) + } else { + (SectorFlag::InUse as u16) | (SectorFlag::DataRecord as u16) + }; + + // Write sector header. + self.write_buffer[0..2].copy_from_slice(&flags.to_le_bytes()); + self.write_buffer[2..4].copy_from_slice(&self.recording_id.to_le_bytes()); + + self.write_buffer_offset = 4; + + if is_initial { + let start = self.write_buffer_offset; + let end = start + 4; + + self.write_buffer[start..end].copy_from_slice( + &self.recording_started.to_le_bytes()); + + self.write_buffer_offset += 4; + } + } + + pub fn start_recording(&mut self, tap: &TimeAndPos) -> u16 { + self.find_next_record_slot(); + + self.sectors_written = 0; + self.recording_started = tap.unix_time; + self.num_in_flight = 0; + + self.prepare_write_buffer(true); + + self.write_packet(0, tap.latitude, tap.longitude); + + self.recording_id + } + + pub fn log(&mut self, prev_tap: &TimeAndPos, tap: &TimeAndPos) { + let d_time_ms = elapsed_ms(tap.system_time, prev_tap.system_time); + + // We know that our hardware cannot deliver updates more often + // than once a second. However when there's a delay in evaluating + // the hardware's messages, we will end up with intervals like + // 1050ms and 950ms (the latter will "make up" for the slowness + // in the former). To avoid logging deltas of 0 seconds, we round + // the intervals to full seconds. + let d_time_s = (d_time_ms + 500) / 1000; + + let d_lat = tap.latitude - prev_tap.latitude; + let d_lon = tap.longitude - prev_tap.longitude; + + if self.write_packet(d_time_s, d_lat, d_lon) { + self.flush_in_flight(false); + } + } + + pub fn stop_recording(&mut self, tap: &TimeAndPos) -> u16 { + // Mark the end of the points stream. + self.write_packet(0xffffffff, 0, 0); + self.flush_in_flight(true); + + // Write footer. + let duration = (tap.unix_time - self.recording_started) as u16; + + let start = self.write_buffer_offset; + let end = start + 2; + let dst = &mut self.write_buffer[start..end]; + + dst.copy_from_slice(&duration.to_le_bytes()); + + let this_sector = self.first_sector + self.sectors_written; + + self.storage.write(this_sector as usize * SECTOR_SIZE, + &self.write_buffer); + + self.sectors_written + 1 + } + + fn sector_header_iter(&self) -> SectorHeaderIter { + SectorHeaderIter::new(self) + } + + fn find_next_record_slot(&mut self) { + let mut candidate_index = 0; + let mut max_recording_id = 0; + + for index in self.sector_header_iter() { + candidate_index = index; + + let sector_header = &self.sector_header[index]; + + if !sector_header.is_in_use() { + // Due to our sorting we know that there will be no more + // used directory entries following. At this point + // we aren't interested in unused ones, so break the loop. + break; + } + + max_recording_id = + max(max_recording_id, sector_header.recording_id); + } + + self.first_sector = candidate_index as u16; + self.recording_id = max_recording_id.wrapping_add(1); + } + + fn write_packet(&mut self, d_time_s: u32, d_lat: i32, d_lon: i32) -> bool { + { + let in_flight = &mut self.in_flight[self.num_in_flight]; + + in_flight.d_time_s = d_time_s; + in_flight.d_lat = normalize_angle(d_lat); + in_flight.d_lon = normalize_angle(d_lon); + } + + self.num_in_flight += 1; + + self.num_in_flight == self.in_flight.len() + } + + // Flushes the "in flight" items to the write buffer. + // + // @param is_final @c true iff this is the final flush in this recording. + // + // @note May only be called if logger.num_in_flight is greater than zero. + fn flush_in_flight(&mut self, is_final: bool) { + let mut flags = 0u8; + + // Normally our items will have a time delta of one second. + // Mark the ones that differ from that. + for i in 0..self.num_in_flight { + if self.in_flight[i].d_time_s != 1 { + flags |= 1 << i; + } + } + + let mut buffer = [0u8; 128]; + let mut offset = 0; + + buffer[offset] = flags; + offset += 1; + + for i in 0..(self.num_in_flight - 1) { + let in_flight = &self.in_flight[i]; + + // Only write the time delta for the atypical cases. + if (flags & (1 << i)) != 0 { + offset += + varint::write_u32(&mut buffer[offset..], in_flight.d_time_s); + } + + offset += + varint::write_s32(&mut buffer[offset..], in_flight.d_lat); + + offset += + varint::write_s32(&mut buffer[offset..], in_flight.d_lon); + } + + let i = self.num_in_flight - 1; + let in_flight = &self.in_flight[i]; + + // Only write the time delta for the atypical cases. + if (flags & (1 << i)) != 0 { + offset += + varint::write_u32(&mut buffer[offset..], in_flight.d_time_s); + } + + // The final point is an end-of-stream marker and doesn't store + // d_lat or d_lon. + if !is_final { + offset += + varint::write_s32(&mut buffer[offset..], in_flight.d_lat); + + offset += + varint::write_s32(&mut buffer[offset..], in_flight.d_lon); + } + + self.num_in_flight = 0; + + let num_bytes_written = offset; + + let remaining = self.write_buffer.len() - self.write_buffer_offset; + + if remaining < num_bytes_written { + // We may use 0xff as padding bytes, since 0xff isn't a valid + // first byte in a points batch. prepare_write_buffer() fills + // our buffer with 0xff, so we don't need to do anything here. + let this_sector = self.first_sector + self.sectors_written; + + self.storage.write(this_sector as usize * SECTOR_SIZE, + &self.write_buffer); + + self.sectors_written += 1; + + self.prepare_write_buffer(false); + } + + let start = self.write_buffer_offset; + let end = start + num_bytes_written; + let dst = &mut self.write_buffer[start..end]; + + dst.copy_from_slice(&buffer[0..num_bytes_written]); + + self.write_buffer_offset += num_bytes_written; + } +} -- 2.30.2