Implement logic data muxer thread
authorSoeren Apel <soeren@apelpie.net>
Wed, 14 Jun 2017 22:10:09 +0000 (00:10 +0200)
committerUwe Hermann <uwe@hermann-uwe.de>
Wed, 5 Jul 2017 22:37:07 +0000 (00:37 +0200)
Multiple changes in one commit due to complexity:
1) data::decode::Decoder: Make use of the DecodeChannel struct for
channel assigments

2) DecodeSignal: Store DecodeChannel list in vector, not map

3) DecodeSignal: Remove unused get_data()

4) Remove boost::optional usage

5) Use DecodeSignal::segment_ as the container for muxed
logic data

6) Implement the DecodeSignal::logic_mux_proc thread and its
helper method mux_logic_samples()

7) Update DecodeSignal::decode_proc() to interface with the
logic muxer thread

8) Remove no longer needed DecodeSignal::wait_for_data()

pv/data/decode/decoder.cpp
pv/data/decode/decoder.hpp
pv/data/decodesignal.cpp
pv/data/decodesignal.hpp
pv/data/signalbase.cpp
pv/data/signalbase.hpp

index ffdbeb93bc3835eee32594045af1e79f32260467..2a9b980b14ef58039641f22df519cb291b6acdb7 100644 (file)
@@ -25,7 +25,9 @@
 #include "decoder.hpp"
 
 #include <pv/data/signalbase.hpp>
+#include <pv/data/decodesignal.hpp>
 
+using pv::data::DecodeChannel;
 using std::set;
 using std::map;
 using std::shared_ptr;
@@ -62,14 +64,12 @@ void Decoder::show(bool show)
        shown_ = show;
 }
 
-const map<const srd_channel*, shared_ptr<data::SignalBase> >&
-Decoder::channels() const
+const vector<DecodeChannel*>& Decoder::channels() const
 {
        return channels_;
 }
 
-void Decoder::set_channels(map<const srd_channel*,
-       shared_ptr<data::SignalBase> > channels)
+void Decoder::set_channels(vector<DecodeChannel*> channels)
 {
        channels_ = channels;
 }
@@ -88,28 +88,13 @@ void Decoder::set_option(const char *id, GVariant *value)
 
 bool Decoder::have_required_channels() const
 {
-       for (GSList *l = decoder_->channels; l; l = l->next) {
-               const srd_channel *const pdch = (const srd_channel*)l->data;
-               assert(pdch);
-               if (channels_.find(pdch) == channels_.end())
+       for (DecodeChannel *ch : channels_)
+               if (!ch->assigned_signal && !ch->is_optional)
                        return false;
-       }
 
        return true;
 }
 
-set< shared_ptr<pv::data::Logic> > Decoder::get_data()
-{
-       set< shared_ptr<pv::data::Logic> > data;
-       for (const auto& channel : channels_) {
-               shared_ptr<data::SignalBase> b(channel.second);
-               assert(b);
-               data.insert(b->logic_data());
-       }
-
-       return data;
-}
-
 srd_decoder_inst* Decoder::create_decoder_inst(srd_session *session) const
 {
        GHashTable *const opt_hash = g_hash_table_new_full(g_str_hash,
@@ -138,20 +123,18 @@ srd_decoder_inst* Decoder::create_decoder_inst(srd_session *session) const
        GHashTable *const channels = g_hash_table_new_full(g_str_hash,
                g_str_equal, g_free, (GDestroyNotify)g_variant_unref);
 
-       for (const auto& channel : channels_) {
-               shared_ptr<data::SignalBase> b(channel.second);
-
-//             init_pin_states->data[pdch->order] =
-//                     channel.initial_pin_state;
+       for (DecodeChannel *ch : channels_) {
+               init_pin_states->data[ch->id] = ch->initial_pin_state;
 
-               GVariant *const gvar = g_variant_new_int32(b->index());
+               GVariant *const gvar = g_variant_new_int32(ch->id);  // id = bit position
                g_variant_ref_sink(gvar);
-               g_hash_table_insert(channels, channel.first->id, gvar);
+               // key is channel name, value is bit position in each sample
+               g_hash_table_insert(channels, ch->pdch_->id, gvar);
        }
 
        srd_inst_channel_set_all(decoder_inst, channels);
 
-//     srd_inst_initial_pins_set_all(decoder_inst, initial_pin_states);
+       srd_inst_initial_pins_set_all(decoder_inst, init_pin_states);
        g_array_free(init_pin_states, TRUE);
 
        return decoder_inst;
index 1b655662bcb707dfe5fd83dbc847164e81bc04a2..4ec193b1df24dc73d61936e8e5d71bfb278b6316 100644 (file)
@@ -23,6 +23,7 @@
 #include <map>
 #include <memory>
 #include <set>
+#include <vector>
 
 #include <glib.h>
 
@@ -30,6 +31,7 @@ using std::map;
 using std::set;
 using std::shared_ptr;
 using std::string;
+using std::vector;
 
 struct srd_decoder;
 struct srd_decoder_inst;
@@ -40,6 +42,7 @@ namespace pv {
 
 namespace data {
 
+struct DecodeChannel;
 class Logic;
 class SignalBase;
 
@@ -57,10 +60,8 @@ public:
        bool shown() const;
        void show(bool show = true);
 
-       const map<const srd_channel*,
-               shared_ptr<data::SignalBase> >& channels() const;
-       void set_channels(map<const srd_channel*,
-               shared_ptr<data::SignalBase> > channels);
+       const vector<data::DecodeChannel*>& channels() const;
+       void set_channels(vector<data::DecodeChannel*> channels);
 
        const map<string, GVariant*>& options() const;
 
@@ -70,14 +71,12 @@ public:
 
        srd_decoder_inst* create_decoder_inst(srd_session *session) const;
 
-       set< shared_ptr<pv::data::Logic> > get_data();
-
 private:
        const srd_decoder *const decoder_;
 
        bool shown_;
 
-       map<const srd_channel*, shared_ptr<pv::data::SignalBase> > channels_;
+       vector<data::DecodeChannel*> channels_;
        map<string, GVariant*> options_;
 };
 
index 14bef5d45bcecc6a89cad2345a6b15f145eecbe1..cfc98cd55bb2cfc48706d9400a4d01df5d7c2631 100644 (file)
@@ -31,7 +31,6 @@
 #include <pv/data/decode/row.hpp>
 #include <pv/session.hpp>
 
-using boost::optional;
 using std::lock_guard;
 using std::make_pair;
 using std::make_shared;
@@ -59,7 +58,6 @@ DecodeSignal::DecodeSignal(pv::Session &session) :
        logic_mux_data_invalid_(false),
        start_time_(0),
        samplerate_(0),
-       sample_count_(0),
        annotation_count_(0),
        samples_decoded_(0),
        frame_complete_(false)
@@ -103,10 +101,11 @@ void DecodeSignal::stack_decoder(srd_decoder *decoder)
        if (stack_.size() == 1)
                set_name(QString::fromUtf8(decoder->name));
 
-       // Include the newly created decode channels in the channel list
+       // Include the newly created decode channels in the channel lists
        update_channel_list();
 
        auto_assign_signals();
+       commit_decoder_channels();
        begin_decode();
 }
 
@@ -148,7 +147,6 @@ bool DecodeSignal::toggle_decoder_visibility(int index)
 
 void DecodeSignal::reset_decode()
 {
-       sample_count_ = 0;
        annotation_count_ = 0;
        frame_complete_ = false;
        samples_decoded_ = 0;
@@ -173,6 +171,18 @@ void DecodeSignal::begin_decode()
 
        reset_decode();
 
+       if (stack_.size() == 0) {
+               error_message_ = tr("No decoders");
+               return;
+       }
+
+       assert(channels_.size() > 0);
+
+       if (get_assigned_signal_count() == 0) {
+               error_message_ = tr("There are no channels assigned to this decoder");
+               return;
+       }
+
        // Check that all decoders have the required channels
        for (const shared_ptr<decode::Decoder> &dec : stack_)
                if (!dec->have_required_channels()) {
@@ -210,8 +220,24 @@ void DecodeSignal::begin_decode()
                }
        }
 
-       // Make sure the logic output data is complete and up-to-date
-       logic_mux_thread_ = std::thread(&DecodeSignal::logic_mux_proc, this);
+       // TODO Currently we assume all channels have the same sample rate
+       auto any_channel = find_if(channels_.begin(), channels_.end(),
+               [](data::DecodeChannel ch) { return ch.assigned_signal; });
+       shared_ptr<LogicSegment> first_segment =
+                       any_channel->assigned_signal->logic_data()->logic_segments().front();
+       samplerate_ = first_segment->samplerate();
+
+       // Free the logic data and its segment(s) if it needs to be updated
+       if (logic_mux_data_invalid_)
+               logic_mux_data_.reset();
+
+       if (!logic_mux_data_) {
+               const int64_t ch_count = get_assigned_signal_count();
+               const int64_t unit_size = (ch_count + 7) / 8;
+               logic_mux_data_ = make_shared<Logic>(ch_count);
+               segment_ = make_shared<LogicSegment>(*logic_mux_data_, unit_size, samplerate_);
+               logic_mux_data_->push_segment(segment_);
+       }
 
        // Update the samplerate and start time
        start_time_ = segment_->start_time();
@@ -219,6 +245,11 @@ void DecodeSignal::begin_decode()
        if (samplerate_ == 0.0)
                samplerate_ = 1.0;
 
+       // Make sure the logic output data is complete and up-to-date
+       logic_mux_interrupt_ = false;
+       logic_mux_thread_ = std::thread(&DecodeSignal::logic_mux_proc, this);
+
+       // Decode the muxed logic data
        decode_interrupt_ = false;
        decode_thread_ = std::thread(&DecodeSignal::decode_proc, this);
 }
@@ -252,6 +283,7 @@ void DecodeSignal::auto_assign_signals()
 
        if (new_assignment) {
                logic_mux_data_invalid_ = true;
+               commit_decoder_channels();
                channels_updated();
        }
 }
@@ -264,11 +296,18 @@ void DecodeSignal::assign_signal(const uint16_t channel_id, const SignalBase *si
                        logic_mux_data_invalid_ = true;
                }
 
+       commit_decoder_channels();
        channels_updated();
-
        begin_decode();
 }
 
+int DecodeSignal::get_assigned_signal_count() const
+{
+       // Count all channels that have a signal assigned to them
+       return count_if(channels_.begin(), channels_.end(),
+               [](data::DecodeChannel ch) { return ch.assigned_signal; });
+}
+
 void DecodeSignal::set_initial_pin_state(const uint16_t channel_id, const int init_state)
 {
        for (data::DecodeChannel &ch : channels_)
@@ -466,39 +505,130 @@ void DecodeSignal::update_channel_list()
        channels_updated();
 }
 
-void DecodeSignal::logic_mux_proc()
+void DecodeSignal::commit_decoder_channels()
 {
+       // Submit channel list to every decoder, containing only the relevant channels
+       for (shared_ptr<decode::Decoder> dec : stack_) {
+               vector<data::DecodeChannel*> channel_list;
 
+               for (data::DecodeChannel &ch : channels_)
+                       if (ch.decoder_ == dec)
+                               channel_list.push_back(&ch);
+
+               dec->set_channels(channel_list);
+       }
 }
 
-optional<int64_t> DecodeSignal::wait_for_data() const
+void DecodeSignal::mux_logic_samples(const int64_t start, const int64_t end)
 {
-       unique_lock<mutex> input_lock(input_mutex_);
+       // Enforce end to be greater than start
+       if (end <= start)
+               return;
+
+       // Fetch all segments and their data
+       // TODO Currently, we assume only a single segment exists
+       vector<shared_ptr<LogicSegment> > segments;
+       vector<const uint8_t*> signal_data;
+       vector<uint8_t> signal_in_bytepos;
+       vector<uint8_t> signal_in_bitpos;
+
+       for (data::DecodeChannel &ch : channels_)
+               if (ch.assigned_signal) {
+                       const shared_ptr<Logic> logic_data = ch.assigned_signal->logic_data();
+                       const shared_ptr<LogicSegment> segment = logic_data->logic_segments().front();
+                       segments.push_back(segment);
+                       signal_data.push_back(segment->get_samples(start, end));
+
+                       const int bitpos = ch.assigned_signal->logic_bit_index();
+                       signal_in_bytepos.push_back(bitpos / 8);
+                       signal_in_bitpos.push_back(bitpos % 8);
+               }
+
+       // Perform the muxing of signal data into the output data
+       uint8_t* output = new uint8_t[(end - start) * segment_->unit_size()];
+       unsigned int signal_count = signal_data.size();
 
-       // Do wait if we decoded all samples but we're still capturing
-       // Do not wait if we're done capturing
-       while (!decode_interrupt_ && !frame_complete_ &&
-               (samples_decoded_ >= sample_count_) &&
-               (session_.get_capture_state() != Session::Stopped)) {
+       for (int64_t sample_cnt = 0; sample_cnt < (end - start); sample_cnt++) {
+               int bitpos = 0;
+               uint8_t bytepos = 0;
 
-               decode_input_cond_.wait(input_lock);
+               const int out_sample_pos = sample_cnt * segment_->unit_size();
+               for (unsigned int i = 0; i < segment_->unit_size(); i++)
+                       output[out_sample_pos + i] = 0;
+
+               for (unsigned int i = 0; i < signal_count; i++) {
+                       const int in_sample_pos = sample_cnt * segments[i]->unit_size();
+                       const uint8_t in_sample = 1 &
+                               ((signal_data[i][in_sample_pos + signal_in_bytepos[i]]) >> (signal_in_bitpos[i]));
+
+                       const uint8_t out_sample = output[out_sample_pos + bytepos];
+
+                       output[out_sample_pos + bytepos] = out_sample | (in_sample << bitpos);
+
+                       bitpos++;
+                       if (bitpos > 7) {
+                               bitpos = 0;
+                               bytepos++;
+                       }
+               }
        }
 
-       // Return value is valid if we're not aborting the decode,
-       return boost::make_optional(!decode_interrupt_ &&
-               // and there's more work to do...
-               (samples_decoded_ < sample_count_ || !frame_complete_) &&
-               // and if the end of the data hasn't been reached yet
-               (!((samples_decoded_ >= sample_count_) && (session_.get_capture_state() == Session::Stopped))),
-               sample_count_);
+       segment_->append_payload(output, (end - start) * segment_->unit_size());
+       delete[] output;
+
+       for (const uint8_t* data : signal_data)
+               delete[] data;
+}
+
+void DecodeSignal::logic_mux_proc()
+{
+       do {
+
+               const uint64_t input_sample_count = get_working_sample_count();
+               const uint64_t output_sample_count = segment_->get_sample_count();
+
+               const uint64_t samples_to_process =
+                       (input_sample_count > output_sample_count) ?
+                       (input_sample_count - output_sample_count) : 0;
+
+               // Process the samples if necessary...
+               if (samples_to_process > 0) {
+                       const uint64_t unit_size = segment_->unit_size();
+                       const uint64_t chunk_sample_count = DecodeChunkLength / unit_size;
+
+                       uint64_t processed_samples = 0;
+                       do {
+                               const uint64_t start_sample = output_sample_count + processed_samples;
+                               const uint64_t sample_count =
+                                       min(samples_to_process - processed_samples,     chunk_sample_count);
+
+                               mux_logic_samples(start_sample, start_sample + sample_count);
+                               processed_samples += sample_count;
+
+                               // ...and process the newly muxed logic data
+                               decode_input_cond_.notify_one();
+                       } while (processed_samples < samples_to_process);
+               }
+
+               if (session_.get_capture_state() != Session::Stopped) {
+                       // Wait for more input
+                       unique_lock<mutex> logic_mux_lock(logic_mux_mutex_);
+                       logic_mux_cond_.wait(logic_mux_lock);
+               }
+       } while ((session_.get_capture_state() != Session::Stopped) && !logic_mux_interrupt_);
+
+       // No more input data and session is stopped, let the decode thread
+       // process any pending data, terminate and release the global SRD mutex
+       // in order to let other decoders run
+       decode_input_cond_.notify_one();
 }
 
 void DecodeSignal::decode_data(
        const int64_t abs_start_samplenum, const int64_t sample_count,
        srd_session *const session)
 {
-       const unsigned int unit_size = segment_->unit_size();
-       const unsigned int chunk_sample_count = DecodeChunkLength / unit_size;
+       const int64_t unit_size = segment_->unit_size();
+       const int64_t chunk_sample_count = DecodeChunkLength / unit_size;
 
        for (int64_t i = abs_start_samplenum;
                !decode_interrupt_ && (i < (abs_start_samplenum + sample_count));
@@ -526,7 +656,6 @@ void DecodeSignal::decode_data(
 
 void DecodeSignal::decode_proc()
 {
-       optional<int64_t> sample_count;
        srd_session *session;
        srd_decoder_inst *prev_di = nullptr;
 
@@ -553,12 +682,6 @@ void DecodeSignal::decode_proc()
                prev_di = di;
        }
 
-       // Get the initial sample count
-       {
-               unique_lock<mutex> input_lock(input_mutex_);
-               sample_count = sample_count_ = get_working_sample_count();
-       }
-
        // Start the session
        srd_session_metadata_set(session, SRD_CONF_SAMPLERATE,
                g_variant_new_uint64(samplerate_));
@@ -568,11 +691,32 @@ void DecodeSignal::decode_proc()
 
        srd_session_start(session);
 
-       int64_t abs_start_samplenum = 0;
+       uint64_t sample_count;
+       uint64_t abs_start_samplenum = 0;
        do {
-               decode_data(abs_start_samplenum, *sample_count, session);
-               abs_start_samplenum = *sample_count;
-       } while (error_message_.isEmpty() && (sample_count = wait_for_data()));
+               // Keep processing new samples until we exhaust the input data
+               do {
+                       {
+                               lock_guard<mutex> input_lock(input_mutex_);
+                               sample_count = segment_->get_sample_count() - abs_start_samplenum;
+                       }
+
+                       if (sample_count > 0) {
+                               decode_data(abs_start_samplenum, sample_count, session);
+                               abs_start_samplenum += sample_count;
+                       }
+               } while (error_message_.isEmpty() && (sample_count > 0));
+
+               // Terminate if we exhausted the input data
+               if ((int64_t)segment_->get_sample_count() == get_working_sample_count())
+                       break;
+
+               if (error_message_.isEmpty()) {
+                       // Wait for new input data or an interrupt request
+                       unique_lock<mutex> input_wait_lock(input_mutex_);
+                       decode_input_cond_.wait(input_wait_lock);
+               }
+       } while (error_message_.isEmpty() && !decode_interrupt_);
 
        // Make sure all annotations are known to the frontend
        new_annotations();
index 9c8d382c808a74087f9566cae9535f66fd4ced78..b9cd430aa227c60cab911efdeee1a85e3ce56af0 100644 (file)
@@ -25,8 +25,6 @@
 #include <unordered_set>
 #include <vector>
 
-#include <boost/optional.hpp>
-
 #include <QSettings>
 #include <QString>
 
@@ -101,6 +99,7 @@ public:
        const vector<data::DecodeChannel> get_channels() const;
        void auto_assign_signals();
        void assign_signal(const uint16_t channel_id, const SignalBase *signal);
+       int get_assigned_signal_count() const;
 
        void set_initial_pin_state(const uint16_t channel_id, const int init_state);
 
@@ -140,9 +139,11 @@ public:
 private:
        void update_channel_list();
 
-       void logic_mux_proc();
+       void commit_decoder_channels();
 
-       boost::optional<int64_t> wait_for_data() const;
+       void mux_logic_samples(const int64_t start, const int64_t end);
+
+       void logic_mux_proc();
 
        void decode_data(const int64_t abs_start_samplenum, const int64_t sample_count,
                srd_session *const session);
@@ -172,7 +173,7 @@ private:
        pv::util::Timestamp start_time_;
        double samplerate_;
 
-       int64_t sample_count_, annotation_count_, samples_decoded_;
+       int64_t annotation_count_, samples_decoded_;
 
        vector< shared_ptr<decode::Decoder> > stack_;
        map<const decode::Row, decode::RowData> rows_;
@@ -186,7 +187,7 @@ private:
         */
        static mutex global_srd_mutex_;
 
-       mutable mutex input_mutex_, output_mutex_;
+       mutable mutex input_mutex_, output_mutex_, logic_mux_mutex_;
        mutable condition_variable decode_input_cond_, logic_mux_cond_;
        bool frame_complete_;
 
index 621a3dda3560216fa3508594ccda5ff5e55deaae..09e4400e20a9d1584ca88dc6caa7d8974ddd3bb4 100644 (file)
@@ -33,6 +33,7 @@ using std::dynamic_pointer_cast;
 using std::make_shared;
 using std::shared_ptr;
 using std::tie;
+using std::unique_lock;
 
 namespace pv {
 namespace data {
@@ -51,9 +52,7 @@ SignalBase::SignalBase(shared_ptr<sigrok::Channel> channel, ChannelType channel_
 
 SignalBase::~SignalBase()
 {
-       // Wait for the currently ongoing conversion to finish
-       if (conversion_thread_.joinable())
-               conversion_thread_.join();
+       stop_conversion();
 }
 
 shared_ptr<sigrok::Channel> SignalBase::channel() const
@@ -101,7 +100,15 @@ SignalBase::ChannelType SignalBase::type() const
 
 unsigned int SignalBase::index() const
 {
-       return (channel_) ? channel_->index() : (unsigned int)-1;
+       return (channel_) ? channel_->index() : 0;
+}
+
+unsigned int SignalBase::logic_bit_index() const
+{
+       if (channel_type_ == LogicChannel)
+               return channel_->index();
+       else
+               return 0;
 }
 
 QColor SignalBase::colour() const
@@ -170,9 +177,7 @@ shared_ptr<data::Logic> SignalBase::logic_data() const
 void SignalBase::set_conversion_type(ConversionType t)
 {
        if (conversion_type_ != NoConversion) {
-               // Wait for the currently ongoing conversion to finish
-               if (conversion_thread_.joinable())
-                       conversion_thread_.join();
+               stop_conversion();
 
                // Discard converted data
                converted_data_.reset();
@@ -180,20 +185,7 @@ void SignalBase::set_conversion_type(ConversionType t)
 
        conversion_type_ = t;
 
-       if ((channel_type_ == AnalogChannel) &&
-               ((conversion_type_ == A2LConversionByTreshold) ||
-               (conversion_type_ == A2LConversionBySchmittTrigger))) {
-
-               shared_ptr<Analog> analog_data = dynamic_pointer_cast<Analog>(data_);
-
-               if (analog_data->analog_segments().size() > 0) {
-                       AnalogSegment *asegment = analog_data->analog_segments().front().get();
-
-                       // Begin conversion of existing sample data
-                       // TODO Support for multiple segments is missing
-                       on_samples_added(asegment, 0, 0);
-               }
-       }
+       start_conversion();
 
        conversion_type_changed(t);
 }
@@ -237,104 +229,142 @@ uint8_t SignalBase::convert_a2l_schmitt_trigger(float lo_thr, float hi_thr,
        return state;
 }
 
-void SignalBase::conversion_thread_proc(QObject* segment, uint64_t start_sample,
-       uint64_t end_sample)
+void SignalBase::conversion_thread_proc(QObject* segment)
 {
        // TODO Support for multiple segments is missing
 
-       if ((channel_type_ == AnalogChannel) &&
-               ((conversion_type_ == A2LConversionByTreshold) ||
-               (conversion_type_ == A2LConversionBySchmittTrigger))) {
+       uint64_t start_sample, end_sample;
+       start_sample = end_sample = 0;
 
-               AnalogSegment *asegment = qobject_cast<AnalogSegment*>(segment);
-
-               // Create the logic data container if needed
-               shared_ptr<Logic> logic_data;
-               if (!converted_data_) {
-                       logic_data = make_shared<Logic>(1);  // Contains only one channel
-                       converted_data_ = logic_data;
-               } else
-                        logic_data = dynamic_pointer_cast<Logic>(converted_data_);
-
-               // Create the initial logic data segment if needed
-               if (logic_data->segments().size() == 0) {
-                       shared_ptr<LogicSegment> lsegment =
-                               make_shared<LogicSegment>(*logic_data.get(), 1, asegment->samplerate());
-                       logic_data->push_segment(lsegment);
-               }
+       do {
+               if ((channel_type_ == AnalogChannel) &&
+                       ((conversion_type_ == A2LConversionByTreshold) ||
+                       (conversion_type_ == A2LConversionBySchmittTrigger))) {
+
+                       AnalogSegment *asegment = qobject_cast<AnalogSegment*>(segment);
+
+                       // Create the logic data container if needed
+                       shared_ptr<Logic> logic_data;
+                       if (!converted_data_) {
+                               logic_data = make_shared<Logic>(1);  // Contains only one channel
+                               converted_data_ = logic_data;
+                       } else
+                                logic_data = dynamic_pointer_cast<Logic>(converted_data_);
+
+                       // Create the initial logic data segment if needed
+                       if (logic_data->segments().size() == 0) {
+                               shared_ptr<LogicSegment> lsegment =
+                                       make_shared<LogicSegment>(*logic_data.get(), 1, asegment->samplerate());
+                               logic_data->push_segment(lsegment);
+                       }
 
-               LogicSegment *lsegment = dynamic_cast<LogicSegment*>(logic_data->segments().front().get());
+                       LogicSegment *lsegment = dynamic_cast<LogicSegment*>(logic_data->segments().front().get());
 
-               // start_sample=end_sample=0 means we need to figure out the unprocessed range
-               if ((start_sample == 0) && (end_sample == 0)) {
                        start_sample = lsegment->get_sample_count();
                        end_sample = asegment->get_sample_count();
-               }
-
-               if (start_sample == end_sample)
-                       return;  // Nothing to do
-
-               float min_v, max_v;
-               tie(min_v, max_v) = asegment->get_min_max();
-
-               vector<uint8_t> lsamples;
-               lsamples.reserve(ConversionBlockSize);
-
-               uint64_t i = start_sample;
 
-               if (conversion_type_ == A2LConversionByTreshold) {
-                       const float threshold = (min_v + max_v) * 0.5;  // middle between min and max
-
-                       // Convert as many sample blocks as we can
-                       while ((end_sample - i) > ConversionBlockSize) {
-                               const float* asamples = asegment->get_samples(i, i + ConversionBlockSize);
-                               for (uint32_t j = 0; j < ConversionBlockSize; j++)
-                                       lsamples.push_back(convert_a2l_threshold(threshold, asamples[j]));
-                               lsegment->append_payload(lsamples.data(), lsamples.size());
-                               i += ConversionBlockSize;
-                               lsamples.clear();
-                               delete[] asamples;
+                       if (end_sample > start_sample) {
+                               float min_v, max_v;
+                               tie(min_v, max_v) = asegment->get_min_max();
+
+                               vector<uint8_t> lsamples;
+                               lsamples.reserve(ConversionBlockSize);
+
+                               uint64_t i = start_sample;
+
+                               if (conversion_type_ == A2LConversionByTreshold) {
+                                       const float threshold = (min_v + max_v) * 0.5;  // middle between min and max
+
+                                       // Convert as many sample blocks as we can
+                                       while ((end_sample - i) > ConversionBlockSize) {
+                                               const float* asamples = asegment->get_samples(i, i + ConversionBlockSize);
+                                               for (uint32_t j = 0; j < ConversionBlockSize; j++)
+                                                       lsamples.push_back(convert_a2l_threshold(threshold, asamples[j]));
+                                               lsegment->append_payload(lsamples.data(), lsamples.size());
+                                               samples_added(lsegment, i, i + ConversionBlockSize);
+                                               i += ConversionBlockSize;
+                                               lsamples.clear();
+                                               delete[] asamples;
+                                       }
+
+                                       // Convert remaining samples
+                                       const float* asamples = asegment->get_samples(i, end_sample);
+                                       for (uint32_t j = 0; j < (end_sample - i); j++)
+                                               lsamples.push_back(convert_a2l_threshold(threshold, asamples[j]));
+                                       lsegment->append_payload(lsamples.data(), lsamples.size());
+                                       samples_added(lsegment, i, end_sample);
+                                       delete[] asamples;
+                               }
+
+                               if (conversion_type_ == A2LConversionBySchmittTrigger) {
+                                       const float amplitude = max_v - min_v;
+                                       const float lo_thr = min_v + (amplitude * 0.1);  // 10% above min
+                                       const float hi_thr = max_v - (amplitude * 0.1);  // 10% below max
+                                       uint8_t state = 0;  // TODO Use value of logic sample n-1 instead of 0
+
+                                       // Convert as many sample blocks as we can
+                                       while ((end_sample - i) > ConversionBlockSize) {
+                                               const float* asamples = asegment->get_samples(i, i + ConversionBlockSize);
+                                               for (uint32_t j = 0; j < ConversionBlockSize; j++)
+                                                       lsamples.push_back(convert_a2l_schmitt_trigger(lo_thr, hi_thr, asamples[j], state));
+                                               lsegment->append_payload(lsamples.data(), lsamples.size());
+                                               samples_added(lsegment, i, i + ConversionBlockSize);
+                                               i += ConversionBlockSize;
+                                               lsamples.clear();
+                                               delete[] asamples;
+                                       }
+
+                                       // Convert remaining samples
+                                       const float* asamples = asegment->get_samples(i, end_sample);
+                                       for (uint32_t j = 0; j < (end_sample - i); j++)
+                                               lsamples.push_back(convert_a2l_schmitt_trigger(lo_thr, hi_thr, asamples[j], state));
+                                       lsegment->append_payload(lsamples.data(), lsamples.size());
+                                       samples_added(lsegment, i, end_sample);
+                                       delete[] asamples;
+                               }
+
+                               // If acquisition is ongoing, start-/endsample may have changed
+                               end_sample = asegment->get_sample_count();
                        }
+               }
 
-                       // Convert remaining samples
-                       const float* asamples = asegment->get_samples(i, end_sample);
-                       for (uint32_t j = 0; j < (end_sample - i); j++)
-                               lsamples.push_back(convert_a2l_threshold(threshold, asamples[j]));
-                       lsegment->append_payload(lsamples.data(), lsamples.size());
-                       delete[] asamples;
-
-                       samples_added(lsegment, start_sample, end_sample);
+               if (!conversion_interrupt_ && (start_sample == end_sample)) {
+                       unique_lock<mutex> input_lock(conversion_input_mutex_);
+                       conversion_input_cond_.wait(input_lock);
                }
+       } while (!conversion_interrupt_);
+}
 
-               if (conversion_type_ == A2LConversionBySchmittTrigger) {
-                       const float amplitude = max_v - min_v;
-                       const float lo_thr = min_v + (amplitude * 0.1);  // 10% above min
-                       const float hi_thr = max_v - (amplitude * 0.1);  // 10% below max
-                       uint8_t state = 0;  // TODO Use value of logic sample n-1 instead of 0
-
-                       // Convert as many sample blocks as we can
-                       while ((end_sample - i) > ConversionBlockSize) {
-                               const float* asamples = asegment->get_samples(i, i + ConversionBlockSize);
-                               for (uint32_t j = 0; j < ConversionBlockSize; j++)
-                                       lsamples.push_back(convert_a2l_schmitt_trigger(lo_thr, hi_thr, asamples[j], state));
-                               lsegment->append_payload(lsamples.data(), lsamples.size());
-                               i += ConversionBlockSize;
-                               lsamples.clear();
-                               delete[] asamples;
-                       }
+void SignalBase::start_conversion()
+{
+       stop_conversion();
+
+       if ((channel_type_ == AnalogChannel) &&
+               ((conversion_type_ == A2LConversionByTreshold) ||
+               (conversion_type_ == A2LConversionBySchmittTrigger))) {
+
+               shared_ptr<Analog> analog_data = dynamic_pointer_cast<Analog>(data_);
 
-                       // Convert remaining samples
-                       const float* asamples = asegment->get_samples(i, end_sample);
-                       for (uint32_t j = 0; j < (end_sample - i); j++)
-                               lsamples.push_back(convert_a2l_schmitt_trigger(lo_thr, hi_thr, asamples[j], state));
-                       lsegment->append_payload(lsamples.data(), lsamples.size());
-                       delete[] asamples;
+               if (analog_data->analog_segments().size() > 0) {
+                       // TODO Support for multiple segments is missing
+                       AnalogSegment *asegment = analog_data->analog_segments().front().get();
 
-                       samples_added(lsegment, start_sample, end_sample);
+                       conversion_interrupt_ = false;
+                       conversion_thread_ = std::thread(
+                               &SignalBase::conversion_thread_proc, this, asegment);
                }
        }
 }
 
+void SignalBase::stop_conversion()
+{
+       // Stop conversion so we can restart it from the beginning
+       conversion_interrupt_ = true;
+       conversion_input_cond_.notify_one();
+       if (conversion_thread_.joinable())
+               conversion_thread_.join();
+}
+
 void SignalBase::on_samples_cleared()
 {
        if (converted_data_)
@@ -347,14 +377,13 @@ void SignalBase::on_samples_added(QObject* segment, uint64_t start_sample,
        uint64_t end_sample)
 {
        if (conversion_type_ != NoConversion) {
-
-               // Wait for the currently ongoing conversion to finish
-               if (conversion_thread_.joinable())
-                       conversion_thread_.join();
-
-               conversion_thread_ = std::thread(
-                       &SignalBase::conversion_thread_proc, this,
-                       segment, start_sample, end_sample);
+               if (conversion_thread_.joinable()) {
+                       // Notify the conversion thread since it's running
+                       conversion_input_cond_.notify_one();
+               } else {
+                       // Start the conversion thread
+                       start_conversion();
+               }
        }
 
        samples_added(segment, start_sample, end_sample);
@@ -362,26 +391,11 @@ void SignalBase::on_samples_added(QObject* segment, uint64_t start_sample,
 
 void SignalBase::on_capture_state_changed(int state)
 {
-       return;
-       if (state == Session::Stopped) {
-               // Make sure that all data is converted
-
-               if ((channel_type_ == AnalogChannel) &&
-                       ((conversion_type_ == A2LConversionByTreshold) ||
-                       (conversion_type_ == A2LConversionBySchmittTrigger))) {
-
-                       shared_ptr<Analog> analog_data = dynamic_pointer_cast<Analog>(data_);
-
-                       if (analog_data->analog_segments().size() > 0) {
-                               // TODO Support for multiple segments is missing
-                               AnalogSegment *asegment = analog_data->analog_segments().front().get();
-
-                               if (conversion_thread_.joinable())
-                                       conversion_thread_.join();
-
-                               conversion_thread_ = std::thread(
-                                       &SignalBase::conversion_thread_proc, this, asegment, 0, 0);
-                       }
+       if (state == Session::Running) {
+               if (conversion_type_ != NoConversion) {
+                       // Restart conversion
+                       stop_conversion();
+                       start_conversion();
                }
        }
 }
index 90a13555fb41938db36eb6063aef096bc6da20ef..322bdf48e8c3a026d2156aa6ab75e05b024e9935 100644 (file)
@@ -21,6 +21,8 @@
 #ifndef PULSEVIEW_PV_DATA_SIGNALBASE_HPP
 #define PULSEVIEW_PV_DATA_SIGNALBASE_HPP
 
+#include <atomic>
+#include <condition_variable>
 #include <thread>
 
 #include <QColor>
@@ -30,6 +32,9 @@
 
 #include <libsigrokcxx/libsigrokcxx.hpp>
 
+using std::atomic;
+using std::condition_variable;
+using std::mutex;
 using std::shared_ptr;
 
 namespace sigrok {
@@ -94,10 +99,19 @@ public:
        ChannelType type() const;
 
        /**
-        * Gets the index number of this channel.
+        * Gets the index number of this channel, i.e. a unique ID assigned by
+        * the device driver.
         */
        unsigned int index() const;
 
+       /**
+        * Returns which bit of a given sample for this signal represents the
+        * signal itself. This is relevant for compound signals like logic,
+        * rather meaningless for everything else but provided in case there
+        * is a conversion active that provides a digital signal using bit #0.
+        */
+       unsigned int logic_bit_index() const;
+
        /**
         * Gets the name of this signal.
         */
@@ -161,8 +175,10 @@ private:
        uint8_t convert_a2l_schmitt_trigger(float lo_thr, float hi_thr,
                float value, uint8_t &state);
 
-       void conversion_thread_proc(QObject* segment, uint64_t start_sample,
-               uint64_t end_sample);
+       void conversion_thread_proc(QObject* segment);
+
+       void start_conversion();
+       void stop_conversion();
 
 Q_SIGNALS:
        void enabled_changed(const bool &value);
@@ -194,6 +210,9 @@ protected:
        int conversion_type_;
 
        std::thread conversion_thread_;
+       atomic<bool> conversion_interrupt_;
+       mutex conversion_input_mutex_;
+       condition_variable conversion_input_cond_;
 
        QString internal_name_, name_;
        QColor colour_, bgcolour_;