From: Soeren Apel Date: Wed, 14 Jun 2017 22:10:09 +0000 (+0200) Subject: Implement logic data muxer thread X-Git-Url: http://git.code-monkey.de/?a=commitdiff_plain;h=27a3f09baf61c7f9b8c07630d34df75ddfdd476b;p=pulseview.git Implement logic data muxer thread Multiple changes in one commit due to complexity: 1) data::decode::Decoder: Make use of the DecodeChannel struct for channel assigments 2) DecodeSignal: Store DecodeChannel list in vector, not map 3) DecodeSignal: Remove unused get_data() 4) Remove boost::optional usage 5) Use DecodeSignal::segment_ as the container for muxed logic data 6) Implement the DecodeSignal::logic_mux_proc thread and its helper method mux_logic_samples() 7) Update DecodeSignal::decode_proc() to interface with the logic muxer thread 8) Remove no longer needed DecodeSignal::wait_for_data() --- diff --git a/pv/data/decode/decoder.cpp b/pv/data/decode/decoder.cpp index ffdbeb9..2a9b980 100644 --- a/pv/data/decode/decoder.cpp +++ b/pv/data/decode/decoder.cpp @@ -25,7 +25,9 @@ #include "decoder.hpp" #include +#include +using pv::data::DecodeChannel; using std::set; using std::map; using std::shared_ptr; @@ -62,14 +64,12 @@ void Decoder::show(bool show) shown_ = show; } -const map >& -Decoder::channels() const +const vector& Decoder::channels() const { return channels_; } -void Decoder::set_channels(map > channels) +void Decoder::set_channels(vector channels) { channels_ = channels; } @@ -88,28 +88,13 @@ void Decoder::set_option(const char *id, GVariant *value) bool Decoder::have_required_channels() const { - for (GSList *l = decoder_->channels; l; l = l->next) { - const srd_channel *const pdch = (const srd_channel*)l->data; - assert(pdch); - if (channels_.find(pdch) == channels_.end()) + for (DecodeChannel *ch : channels_) + if (!ch->assigned_signal && !ch->is_optional) return false; - } return true; } -set< shared_ptr > Decoder::get_data() -{ - set< shared_ptr > data; - for (const auto& channel : channels_) { - shared_ptr b(channel.second); - assert(b); - data.insert(b->logic_data()); - } - - return data; -} - srd_decoder_inst* Decoder::create_decoder_inst(srd_session *session) const { GHashTable *const opt_hash = g_hash_table_new_full(g_str_hash, @@ -138,20 +123,18 @@ srd_decoder_inst* Decoder::create_decoder_inst(srd_session *session) const GHashTable *const channels = g_hash_table_new_full(g_str_hash, g_str_equal, g_free, (GDestroyNotify)g_variant_unref); - for (const auto& channel : channels_) { - shared_ptr b(channel.second); - -// init_pin_states->data[pdch->order] = -// channel.initial_pin_state; + for (DecodeChannel *ch : channels_) { + init_pin_states->data[ch->id] = ch->initial_pin_state; - GVariant *const gvar = g_variant_new_int32(b->index()); + GVariant *const gvar = g_variant_new_int32(ch->id); // id = bit position g_variant_ref_sink(gvar); - g_hash_table_insert(channels, channel.first->id, gvar); + // key is channel name, value is bit position in each sample + g_hash_table_insert(channels, ch->pdch_->id, gvar); } srd_inst_channel_set_all(decoder_inst, channels); -// srd_inst_initial_pins_set_all(decoder_inst, initial_pin_states); + srd_inst_initial_pins_set_all(decoder_inst, init_pin_states); g_array_free(init_pin_states, TRUE); return decoder_inst; diff --git a/pv/data/decode/decoder.hpp b/pv/data/decode/decoder.hpp index 1b65566..4ec193b 100644 --- a/pv/data/decode/decoder.hpp +++ b/pv/data/decode/decoder.hpp @@ -23,6 +23,7 @@ #include #include #include +#include #include @@ -30,6 +31,7 @@ using std::map; using std::set; using std::shared_ptr; using std::string; +using std::vector; struct srd_decoder; struct srd_decoder_inst; @@ -40,6 +42,7 @@ namespace pv { namespace data { +struct DecodeChannel; class Logic; class SignalBase; @@ -57,10 +60,8 @@ public: bool shown() const; void show(bool show = true); - const map >& channels() const; - void set_channels(map > channels); + const vector& channels() const; + void set_channels(vector channels); const map& options() const; @@ -70,14 +71,12 @@ public: srd_decoder_inst* create_decoder_inst(srd_session *session) const; - set< shared_ptr > get_data(); - private: const srd_decoder *const decoder_; bool shown_; - map > channels_; + vector channels_; map options_; }; diff --git a/pv/data/decodesignal.cpp b/pv/data/decodesignal.cpp index 14bef5d..cfc98cd 100644 --- a/pv/data/decodesignal.cpp +++ b/pv/data/decodesignal.cpp @@ -31,7 +31,6 @@ #include #include -using boost::optional; using std::lock_guard; using std::make_pair; using std::make_shared; @@ -59,7 +58,6 @@ DecodeSignal::DecodeSignal(pv::Session &session) : logic_mux_data_invalid_(false), start_time_(0), samplerate_(0), - sample_count_(0), annotation_count_(0), samples_decoded_(0), frame_complete_(false) @@ -103,10 +101,11 @@ void DecodeSignal::stack_decoder(srd_decoder *decoder) if (stack_.size() == 1) set_name(QString::fromUtf8(decoder->name)); - // Include the newly created decode channels in the channel list + // Include the newly created decode channels in the channel lists update_channel_list(); auto_assign_signals(); + commit_decoder_channels(); begin_decode(); } @@ -148,7 +147,6 @@ bool DecodeSignal::toggle_decoder_visibility(int index) void DecodeSignal::reset_decode() { - sample_count_ = 0; annotation_count_ = 0; frame_complete_ = false; samples_decoded_ = 0; @@ -173,6 +171,18 @@ void DecodeSignal::begin_decode() reset_decode(); + if (stack_.size() == 0) { + error_message_ = tr("No decoders"); + return; + } + + assert(channels_.size() > 0); + + if (get_assigned_signal_count() == 0) { + error_message_ = tr("There are no channels assigned to this decoder"); + return; + } + // Check that all decoders have the required channels for (const shared_ptr &dec : stack_) if (!dec->have_required_channels()) { @@ -210,8 +220,24 @@ void DecodeSignal::begin_decode() } } - // Make sure the logic output data is complete and up-to-date - logic_mux_thread_ = std::thread(&DecodeSignal::logic_mux_proc, this); + // TODO Currently we assume all channels have the same sample rate + auto any_channel = find_if(channels_.begin(), channels_.end(), + [](data::DecodeChannel ch) { return ch.assigned_signal; }); + shared_ptr first_segment = + any_channel->assigned_signal->logic_data()->logic_segments().front(); + samplerate_ = first_segment->samplerate(); + + // Free the logic data and its segment(s) if it needs to be updated + if (logic_mux_data_invalid_) + logic_mux_data_.reset(); + + if (!logic_mux_data_) { + const int64_t ch_count = get_assigned_signal_count(); + const int64_t unit_size = (ch_count + 7) / 8; + logic_mux_data_ = make_shared(ch_count); + segment_ = make_shared(*logic_mux_data_, unit_size, samplerate_); + logic_mux_data_->push_segment(segment_); + } // Update the samplerate and start time start_time_ = segment_->start_time(); @@ -219,6 +245,11 @@ void DecodeSignal::begin_decode() if (samplerate_ == 0.0) samplerate_ = 1.0; + // Make sure the logic output data is complete and up-to-date + logic_mux_interrupt_ = false; + logic_mux_thread_ = std::thread(&DecodeSignal::logic_mux_proc, this); + + // Decode the muxed logic data decode_interrupt_ = false; decode_thread_ = std::thread(&DecodeSignal::decode_proc, this); } @@ -252,6 +283,7 @@ void DecodeSignal::auto_assign_signals() if (new_assignment) { logic_mux_data_invalid_ = true; + commit_decoder_channels(); channels_updated(); } } @@ -264,11 +296,18 @@ void DecodeSignal::assign_signal(const uint16_t channel_id, const SignalBase *si logic_mux_data_invalid_ = true; } + commit_decoder_channels(); channels_updated(); - begin_decode(); } +int DecodeSignal::get_assigned_signal_count() const +{ + // Count all channels that have a signal assigned to them + return count_if(channels_.begin(), channels_.end(), + [](data::DecodeChannel ch) { return ch.assigned_signal; }); +} + void DecodeSignal::set_initial_pin_state(const uint16_t channel_id, const int init_state) { for (data::DecodeChannel &ch : channels_) @@ -466,39 +505,130 @@ void DecodeSignal::update_channel_list() channels_updated(); } -void DecodeSignal::logic_mux_proc() +void DecodeSignal::commit_decoder_channels() { + // Submit channel list to every decoder, containing only the relevant channels + for (shared_ptr dec : stack_) { + vector channel_list; + for (data::DecodeChannel &ch : channels_) + if (ch.decoder_ == dec) + channel_list.push_back(&ch); + + dec->set_channels(channel_list); + } } -optional DecodeSignal::wait_for_data() const +void DecodeSignal::mux_logic_samples(const int64_t start, const int64_t end) { - unique_lock input_lock(input_mutex_); + // Enforce end to be greater than start + if (end <= start) + return; + + // Fetch all segments and their data + // TODO Currently, we assume only a single segment exists + vector > segments; + vector signal_data; + vector signal_in_bytepos; + vector signal_in_bitpos; + + for (data::DecodeChannel &ch : channels_) + if (ch.assigned_signal) { + const shared_ptr logic_data = ch.assigned_signal->logic_data(); + const shared_ptr segment = logic_data->logic_segments().front(); + segments.push_back(segment); + signal_data.push_back(segment->get_samples(start, end)); + + const int bitpos = ch.assigned_signal->logic_bit_index(); + signal_in_bytepos.push_back(bitpos / 8); + signal_in_bitpos.push_back(bitpos % 8); + } + + // Perform the muxing of signal data into the output data + uint8_t* output = new uint8_t[(end - start) * segment_->unit_size()]; + unsigned int signal_count = signal_data.size(); - // Do wait if we decoded all samples but we're still capturing - // Do not wait if we're done capturing - while (!decode_interrupt_ && !frame_complete_ && - (samples_decoded_ >= sample_count_) && - (session_.get_capture_state() != Session::Stopped)) { + for (int64_t sample_cnt = 0; sample_cnt < (end - start); sample_cnt++) { + int bitpos = 0; + uint8_t bytepos = 0; - decode_input_cond_.wait(input_lock); + const int out_sample_pos = sample_cnt * segment_->unit_size(); + for (unsigned int i = 0; i < segment_->unit_size(); i++) + output[out_sample_pos + i] = 0; + + for (unsigned int i = 0; i < signal_count; i++) { + const int in_sample_pos = sample_cnt * segments[i]->unit_size(); + const uint8_t in_sample = 1 & + ((signal_data[i][in_sample_pos + signal_in_bytepos[i]]) >> (signal_in_bitpos[i])); + + const uint8_t out_sample = output[out_sample_pos + bytepos]; + + output[out_sample_pos + bytepos] = out_sample | (in_sample << bitpos); + + bitpos++; + if (bitpos > 7) { + bitpos = 0; + bytepos++; + } + } } - // Return value is valid if we're not aborting the decode, - return boost::make_optional(!decode_interrupt_ && - // and there's more work to do... - (samples_decoded_ < sample_count_ || !frame_complete_) && - // and if the end of the data hasn't been reached yet - (!((samples_decoded_ >= sample_count_) && (session_.get_capture_state() == Session::Stopped))), - sample_count_); + segment_->append_payload(output, (end - start) * segment_->unit_size()); + delete[] output; + + for (const uint8_t* data : signal_data) + delete[] data; +} + +void DecodeSignal::logic_mux_proc() +{ + do { + + const uint64_t input_sample_count = get_working_sample_count(); + const uint64_t output_sample_count = segment_->get_sample_count(); + + const uint64_t samples_to_process = + (input_sample_count > output_sample_count) ? + (input_sample_count - output_sample_count) : 0; + + // Process the samples if necessary... + if (samples_to_process > 0) { + const uint64_t unit_size = segment_->unit_size(); + const uint64_t chunk_sample_count = DecodeChunkLength / unit_size; + + uint64_t processed_samples = 0; + do { + const uint64_t start_sample = output_sample_count + processed_samples; + const uint64_t sample_count = + min(samples_to_process - processed_samples, chunk_sample_count); + + mux_logic_samples(start_sample, start_sample + sample_count); + processed_samples += sample_count; + + // ...and process the newly muxed logic data + decode_input_cond_.notify_one(); + } while (processed_samples < samples_to_process); + } + + if (session_.get_capture_state() != Session::Stopped) { + // Wait for more input + unique_lock logic_mux_lock(logic_mux_mutex_); + logic_mux_cond_.wait(logic_mux_lock); + } + } while ((session_.get_capture_state() != Session::Stopped) && !logic_mux_interrupt_); + + // No more input data and session is stopped, let the decode thread + // process any pending data, terminate and release the global SRD mutex + // in order to let other decoders run + decode_input_cond_.notify_one(); } void DecodeSignal::decode_data( const int64_t abs_start_samplenum, const int64_t sample_count, srd_session *const session) { - const unsigned int unit_size = segment_->unit_size(); - const unsigned int chunk_sample_count = DecodeChunkLength / unit_size; + const int64_t unit_size = segment_->unit_size(); + const int64_t chunk_sample_count = DecodeChunkLength / unit_size; for (int64_t i = abs_start_samplenum; !decode_interrupt_ && (i < (abs_start_samplenum + sample_count)); @@ -526,7 +656,6 @@ void DecodeSignal::decode_data( void DecodeSignal::decode_proc() { - optional sample_count; srd_session *session; srd_decoder_inst *prev_di = nullptr; @@ -553,12 +682,6 @@ void DecodeSignal::decode_proc() prev_di = di; } - // Get the initial sample count - { - unique_lock input_lock(input_mutex_); - sample_count = sample_count_ = get_working_sample_count(); - } - // Start the session srd_session_metadata_set(session, SRD_CONF_SAMPLERATE, g_variant_new_uint64(samplerate_)); @@ -568,11 +691,32 @@ void DecodeSignal::decode_proc() srd_session_start(session); - int64_t abs_start_samplenum = 0; + uint64_t sample_count; + uint64_t abs_start_samplenum = 0; do { - decode_data(abs_start_samplenum, *sample_count, session); - abs_start_samplenum = *sample_count; - } while (error_message_.isEmpty() && (sample_count = wait_for_data())); + // Keep processing new samples until we exhaust the input data + do { + { + lock_guard input_lock(input_mutex_); + sample_count = segment_->get_sample_count() - abs_start_samplenum; + } + + if (sample_count > 0) { + decode_data(abs_start_samplenum, sample_count, session); + abs_start_samplenum += sample_count; + } + } while (error_message_.isEmpty() && (sample_count > 0)); + + // Terminate if we exhausted the input data + if ((int64_t)segment_->get_sample_count() == get_working_sample_count()) + break; + + if (error_message_.isEmpty()) { + // Wait for new input data or an interrupt request + unique_lock input_wait_lock(input_mutex_); + decode_input_cond_.wait(input_wait_lock); + } + } while (error_message_.isEmpty() && !decode_interrupt_); // Make sure all annotations are known to the frontend new_annotations(); diff --git a/pv/data/decodesignal.hpp b/pv/data/decodesignal.hpp index 9c8d382..b9cd430 100644 --- a/pv/data/decodesignal.hpp +++ b/pv/data/decodesignal.hpp @@ -25,8 +25,6 @@ #include #include -#include - #include #include @@ -101,6 +99,7 @@ public: const vector get_channels() const; void auto_assign_signals(); void assign_signal(const uint16_t channel_id, const SignalBase *signal); + int get_assigned_signal_count() const; void set_initial_pin_state(const uint16_t channel_id, const int init_state); @@ -140,9 +139,11 @@ public: private: void update_channel_list(); - void logic_mux_proc(); + void commit_decoder_channels(); - boost::optional wait_for_data() const; + void mux_logic_samples(const int64_t start, const int64_t end); + + void logic_mux_proc(); void decode_data(const int64_t abs_start_samplenum, const int64_t sample_count, srd_session *const session); @@ -172,7 +173,7 @@ private: pv::util::Timestamp start_time_; double samplerate_; - int64_t sample_count_, annotation_count_, samples_decoded_; + int64_t annotation_count_, samples_decoded_; vector< shared_ptr > stack_; map rows_; @@ -186,7 +187,7 @@ private: */ static mutex global_srd_mutex_; - mutable mutex input_mutex_, output_mutex_; + mutable mutex input_mutex_, output_mutex_, logic_mux_mutex_; mutable condition_variable decode_input_cond_, logic_mux_cond_; bool frame_complete_; diff --git a/pv/data/signalbase.cpp b/pv/data/signalbase.cpp index 621a3dd..09e4400 100644 --- a/pv/data/signalbase.cpp +++ b/pv/data/signalbase.cpp @@ -33,6 +33,7 @@ using std::dynamic_pointer_cast; using std::make_shared; using std::shared_ptr; using std::tie; +using std::unique_lock; namespace pv { namespace data { @@ -51,9 +52,7 @@ SignalBase::SignalBase(shared_ptr channel, ChannelType channel_ SignalBase::~SignalBase() { - // Wait for the currently ongoing conversion to finish - if (conversion_thread_.joinable()) - conversion_thread_.join(); + stop_conversion(); } shared_ptr SignalBase::channel() const @@ -101,7 +100,15 @@ SignalBase::ChannelType SignalBase::type() const unsigned int SignalBase::index() const { - return (channel_) ? channel_->index() : (unsigned int)-1; + return (channel_) ? channel_->index() : 0; +} + +unsigned int SignalBase::logic_bit_index() const +{ + if (channel_type_ == LogicChannel) + return channel_->index(); + else + return 0; } QColor SignalBase::colour() const @@ -170,9 +177,7 @@ shared_ptr SignalBase::logic_data() const void SignalBase::set_conversion_type(ConversionType t) { if (conversion_type_ != NoConversion) { - // Wait for the currently ongoing conversion to finish - if (conversion_thread_.joinable()) - conversion_thread_.join(); + stop_conversion(); // Discard converted data converted_data_.reset(); @@ -180,20 +185,7 @@ void SignalBase::set_conversion_type(ConversionType t) conversion_type_ = t; - if ((channel_type_ == AnalogChannel) && - ((conversion_type_ == A2LConversionByTreshold) || - (conversion_type_ == A2LConversionBySchmittTrigger))) { - - shared_ptr analog_data = dynamic_pointer_cast(data_); - - if (analog_data->analog_segments().size() > 0) { - AnalogSegment *asegment = analog_data->analog_segments().front().get(); - - // Begin conversion of existing sample data - // TODO Support for multiple segments is missing - on_samples_added(asegment, 0, 0); - } - } + start_conversion(); conversion_type_changed(t); } @@ -237,104 +229,142 @@ uint8_t SignalBase::convert_a2l_schmitt_trigger(float lo_thr, float hi_thr, return state; } -void SignalBase::conversion_thread_proc(QObject* segment, uint64_t start_sample, - uint64_t end_sample) +void SignalBase::conversion_thread_proc(QObject* segment) { // TODO Support for multiple segments is missing - if ((channel_type_ == AnalogChannel) && - ((conversion_type_ == A2LConversionByTreshold) || - (conversion_type_ == A2LConversionBySchmittTrigger))) { + uint64_t start_sample, end_sample; + start_sample = end_sample = 0; - AnalogSegment *asegment = qobject_cast(segment); - - // Create the logic data container if needed - shared_ptr logic_data; - if (!converted_data_) { - logic_data = make_shared(1); // Contains only one channel - converted_data_ = logic_data; - } else - logic_data = dynamic_pointer_cast(converted_data_); - - // Create the initial logic data segment if needed - if (logic_data->segments().size() == 0) { - shared_ptr lsegment = - make_shared(*logic_data.get(), 1, asegment->samplerate()); - logic_data->push_segment(lsegment); - } + do { + if ((channel_type_ == AnalogChannel) && + ((conversion_type_ == A2LConversionByTreshold) || + (conversion_type_ == A2LConversionBySchmittTrigger))) { + + AnalogSegment *asegment = qobject_cast(segment); + + // Create the logic data container if needed + shared_ptr logic_data; + if (!converted_data_) { + logic_data = make_shared(1); // Contains only one channel + converted_data_ = logic_data; + } else + logic_data = dynamic_pointer_cast(converted_data_); + + // Create the initial logic data segment if needed + if (logic_data->segments().size() == 0) { + shared_ptr lsegment = + make_shared(*logic_data.get(), 1, asegment->samplerate()); + logic_data->push_segment(lsegment); + } - LogicSegment *lsegment = dynamic_cast(logic_data->segments().front().get()); + LogicSegment *lsegment = dynamic_cast(logic_data->segments().front().get()); - // start_sample=end_sample=0 means we need to figure out the unprocessed range - if ((start_sample == 0) && (end_sample == 0)) { start_sample = lsegment->get_sample_count(); end_sample = asegment->get_sample_count(); - } - - if (start_sample == end_sample) - return; // Nothing to do - - float min_v, max_v; - tie(min_v, max_v) = asegment->get_min_max(); - - vector lsamples; - lsamples.reserve(ConversionBlockSize); - - uint64_t i = start_sample; - if (conversion_type_ == A2LConversionByTreshold) { - const float threshold = (min_v + max_v) * 0.5; // middle between min and max - - // Convert as many sample blocks as we can - while ((end_sample - i) > ConversionBlockSize) { - const float* asamples = asegment->get_samples(i, i + ConversionBlockSize); - for (uint32_t j = 0; j < ConversionBlockSize; j++) - lsamples.push_back(convert_a2l_threshold(threshold, asamples[j])); - lsegment->append_payload(lsamples.data(), lsamples.size()); - i += ConversionBlockSize; - lsamples.clear(); - delete[] asamples; + if (end_sample > start_sample) { + float min_v, max_v; + tie(min_v, max_v) = asegment->get_min_max(); + + vector lsamples; + lsamples.reserve(ConversionBlockSize); + + uint64_t i = start_sample; + + if (conversion_type_ == A2LConversionByTreshold) { + const float threshold = (min_v + max_v) * 0.5; // middle between min and max + + // Convert as many sample blocks as we can + while ((end_sample - i) > ConversionBlockSize) { + const float* asamples = asegment->get_samples(i, i + ConversionBlockSize); + for (uint32_t j = 0; j < ConversionBlockSize; j++) + lsamples.push_back(convert_a2l_threshold(threshold, asamples[j])); + lsegment->append_payload(lsamples.data(), lsamples.size()); + samples_added(lsegment, i, i + ConversionBlockSize); + i += ConversionBlockSize; + lsamples.clear(); + delete[] asamples; + } + + // Convert remaining samples + const float* asamples = asegment->get_samples(i, end_sample); + for (uint32_t j = 0; j < (end_sample - i); j++) + lsamples.push_back(convert_a2l_threshold(threshold, asamples[j])); + lsegment->append_payload(lsamples.data(), lsamples.size()); + samples_added(lsegment, i, end_sample); + delete[] asamples; + } + + if (conversion_type_ == A2LConversionBySchmittTrigger) { + const float amplitude = max_v - min_v; + const float lo_thr = min_v + (amplitude * 0.1); // 10% above min + const float hi_thr = max_v - (amplitude * 0.1); // 10% below max + uint8_t state = 0; // TODO Use value of logic sample n-1 instead of 0 + + // Convert as many sample blocks as we can + while ((end_sample - i) > ConversionBlockSize) { + const float* asamples = asegment->get_samples(i, i + ConversionBlockSize); + for (uint32_t j = 0; j < ConversionBlockSize; j++) + lsamples.push_back(convert_a2l_schmitt_trigger(lo_thr, hi_thr, asamples[j], state)); + lsegment->append_payload(lsamples.data(), lsamples.size()); + samples_added(lsegment, i, i + ConversionBlockSize); + i += ConversionBlockSize; + lsamples.clear(); + delete[] asamples; + } + + // Convert remaining samples + const float* asamples = asegment->get_samples(i, end_sample); + for (uint32_t j = 0; j < (end_sample - i); j++) + lsamples.push_back(convert_a2l_schmitt_trigger(lo_thr, hi_thr, asamples[j], state)); + lsegment->append_payload(lsamples.data(), lsamples.size()); + samples_added(lsegment, i, end_sample); + delete[] asamples; + } + + // If acquisition is ongoing, start-/endsample may have changed + end_sample = asegment->get_sample_count(); } + } - // Convert remaining samples - const float* asamples = asegment->get_samples(i, end_sample); - for (uint32_t j = 0; j < (end_sample - i); j++) - lsamples.push_back(convert_a2l_threshold(threshold, asamples[j])); - lsegment->append_payload(lsamples.data(), lsamples.size()); - delete[] asamples; - - samples_added(lsegment, start_sample, end_sample); + if (!conversion_interrupt_ && (start_sample == end_sample)) { + unique_lock input_lock(conversion_input_mutex_); + conversion_input_cond_.wait(input_lock); } + } while (!conversion_interrupt_); +} - if (conversion_type_ == A2LConversionBySchmittTrigger) { - const float amplitude = max_v - min_v; - const float lo_thr = min_v + (amplitude * 0.1); // 10% above min - const float hi_thr = max_v - (amplitude * 0.1); // 10% below max - uint8_t state = 0; // TODO Use value of logic sample n-1 instead of 0 - - // Convert as many sample blocks as we can - while ((end_sample - i) > ConversionBlockSize) { - const float* asamples = asegment->get_samples(i, i + ConversionBlockSize); - for (uint32_t j = 0; j < ConversionBlockSize; j++) - lsamples.push_back(convert_a2l_schmitt_trigger(lo_thr, hi_thr, asamples[j], state)); - lsegment->append_payload(lsamples.data(), lsamples.size()); - i += ConversionBlockSize; - lsamples.clear(); - delete[] asamples; - } +void SignalBase::start_conversion() +{ + stop_conversion(); + + if ((channel_type_ == AnalogChannel) && + ((conversion_type_ == A2LConversionByTreshold) || + (conversion_type_ == A2LConversionBySchmittTrigger))) { + + shared_ptr analog_data = dynamic_pointer_cast(data_); - // Convert remaining samples - const float* asamples = asegment->get_samples(i, end_sample); - for (uint32_t j = 0; j < (end_sample - i); j++) - lsamples.push_back(convert_a2l_schmitt_trigger(lo_thr, hi_thr, asamples[j], state)); - lsegment->append_payload(lsamples.data(), lsamples.size()); - delete[] asamples; + if (analog_data->analog_segments().size() > 0) { + // TODO Support for multiple segments is missing + AnalogSegment *asegment = analog_data->analog_segments().front().get(); - samples_added(lsegment, start_sample, end_sample); + conversion_interrupt_ = false; + conversion_thread_ = std::thread( + &SignalBase::conversion_thread_proc, this, asegment); } } } +void SignalBase::stop_conversion() +{ + // Stop conversion so we can restart it from the beginning + conversion_interrupt_ = true; + conversion_input_cond_.notify_one(); + if (conversion_thread_.joinable()) + conversion_thread_.join(); +} + void SignalBase::on_samples_cleared() { if (converted_data_) @@ -347,14 +377,13 @@ void SignalBase::on_samples_added(QObject* segment, uint64_t start_sample, uint64_t end_sample) { if (conversion_type_ != NoConversion) { - - // Wait for the currently ongoing conversion to finish - if (conversion_thread_.joinable()) - conversion_thread_.join(); - - conversion_thread_ = std::thread( - &SignalBase::conversion_thread_proc, this, - segment, start_sample, end_sample); + if (conversion_thread_.joinable()) { + // Notify the conversion thread since it's running + conversion_input_cond_.notify_one(); + } else { + // Start the conversion thread + start_conversion(); + } } samples_added(segment, start_sample, end_sample); @@ -362,26 +391,11 @@ void SignalBase::on_samples_added(QObject* segment, uint64_t start_sample, void SignalBase::on_capture_state_changed(int state) { - return; - if (state == Session::Stopped) { - // Make sure that all data is converted - - if ((channel_type_ == AnalogChannel) && - ((conversion_type_ == A2LConversionByTreshold) || - (conversion_type_ == A2LConversionBySchmittTrigger))) { - - shared_ptr analog_data = dynamic_pointer_cast(data_); - - if (analog_data->analog_segments().size() > 0) { - // TODO Support for multiple segments is missing - AnalogSegment *asegment = analog_data->analog_segments().front().get(); - - if (conversion_thread_.joinable()) - conversion_thread_.join(); - - conversion_thread_ = std::thread( - &SignalBase::conversion_thread_proc, this, asegment, 0, 0); - } + if (state == Session::Running) { + if (conversion_type_ != NoConversion) { + // Restart conversion + stop_conversion(); + start_conversion(); } } } diff --git a/pv/data/signalbase.hpp b/pv/data/signalbase.hpp index 90a1355..322bdf4 100644 --- a/pv/data/signalbase.hpp +++ b/pv/data/signalbase.hpp @@ -21,6 +21,8 @@ #ifndef PULSEVIEW_PV_DATA_SIGNALBASE_HPP #define PULSEVIEW_PV_DATA_SIGNALBASE_HPP +#include +#include #include #include @@ -30,6 +32,9 @@ #include +using std::atomic; +using std::condition_variable; +using std::mutex; using std::shared_ptr; namespace sigrok { @@ -94,10 +99,19 @@ public: ChannelType type() const; /** - * Gets the index number of this channel. + * Gets the index number of this channel, i.e. a unique ID assigned by + * the device driver. */ unsigned int index() const; + /** + * Returns which bit of a given sample for this signal represents the + * signal itself. This is relevant for compound signals like logic, + * rather meaningless for everything else but provided in case there + * is a conversion active that provides a digital signal using bit #0. + */ + unsigned int logic_bit_index() const; + /** * Gets the name of this signal. */ @@ -161,8 +175,10 @@ private: uint8_t convert_a2l_schmitt_trigger(float lo_thr, float hi_thr, float value, uint8_t &state); - void conversion_thread_proc(QObject* segment, uint64_t start_sample, - uint64_t end_sample); + void conversion_thread_proc(QObject* segment); + + void start_conversion(); + void stop_conversion(); Q_SIGNALS: void enabled_changed(const bool &value); @@ -194,6 +210,9 @@ protected: int conversion_type_; std::thread conversion_thread_; + atomic conversion_interrupt_; + mutex conversion_input_mutex_; + condition_variable conversion_input_cond_; QString internal_name_, name_; QColor colour_, bgcolour_;