Implemented threaded decode
authorJoel Holdsworth <joel@airwebreathe.org.uk>
Sun, 2 Mar 2014 17:27:12 +0000 (17:27 +0000)
committerJoel Holdsworth <joel@airwebreathe.org.uk>
Sun, 2 Mar 2014 21:32:03 +0000 (21:32 +0000)
pv/data/decoderstack.cpp
pv/data/decoderstack.h

index 3bb13284b425503c56786df5337e7c8018dfbb9d..87ac32613f954e8fdbaa4333513e4b3211266dbd 100644 (file)
@@ -38,7 +38,9 @@
 
 using boost::lock_guard;
 using boost::mutex;
+using boost::optional;
 using boost::shared_ptr;
+using boost::unique_lock;
 using std::deque;
 using std::make_pair;
 using std::max;
@@ -62,9 +64,16 @@ mutex DecoderStack::_global_decode_mutex;
 DecoderStack::DecoderStack(pv::SigSession &session,
        const srd_decoder *const dec) :
        _session(session),
+       _sample_count(0),
+       _frame_complete(false),
        _samples_decoded(0)
 {
-       connect(&_session, SIGNAL(frame_began()), this, SLOT(on_new_frame()));
+       connect(&_session, SIGNAL(frame_began()),
+               this, SLOT(on_new_frame()));
+       connect(&_session, SIGNAL(data_received()),
+               this, SLOT(on_data_received()));
+       connect(&_session, SIGNAL(frame_ended()),
+               this, SLOT(on_frame_ended()));
 
        _stack.push_back(shared_ptr<decode::Decoder>(
                new decode::Decoder(dec)));
@@ -164,6 +173,8 @@ QString DecoderStack::error_message()
 
 void DecoderStack::clear()
 {
+       _sample_count = 0;
+       _frame_complete = false;
        _samples_decoded = 0;
        _error_message = QString();
        _rows.clear();
@@ -225,6 +236,13 @@ void DecoderStack::begin_decode()
        if (!data)
                return;
 
+       // Check we have a snapshot of data
+       const deque< shared_ptr<pv::data::LogicSnapshot> > &snapshots =
+               data->get_snapshots();
+       if (snapshots.empty())
+               return;
+       _snapshot = snapshots.front();
+
        // Get the samplerate and start time
        _start_time = data->get_start_time();
        _samplerate = data->samplerate();
@@ -247,16 +265,26 @@ uint64_t DecoderStack::get_max_sample_count() const
        return max_sample_count;
 }
 
+optional<int64_t> DecoderStack::wait_for_data() const
+{
+       unique_lock<mutex> input_lock(_input_mutex);
+       while(!boost::this_thread::interruption_requested() &&
+               !_frame_complete && _samples_decoded >= _sample_count)
+               _input_cond.wait(input_lock);
+       return boost::make_optional(
+               !boost::this_thread::interruption_requested() &&
+               (_samples_decoded < _sample_count || !_frame_complete),
+               _sample_count);
+}
+
 void DecoderStack::decode_data(
-       const shared_ptr<pv::data::LogicSnapshot> &snapshot,
+       const int64_t sample_count, const unsigned int unit_size,
        srd_session *const session)
 {
        uint8_t chunk[DecodeChunkLength];
 
-       const int64_t sample_count = snapshot->get_sample_count();
-       const unsigned int unit_size = snapshot->unit_size();
        const unsigned int chunk_sample_count =
-               DecodeChunkLength / snapshot->unit_size();
+               DecodeChunkLength / _snapshot->unit_size();
 
        for (int64_t i = 0;
                !boost::this_thread::interruption_requested() &&
@@ -267,7 +295,7 @@ void DecoderStack::decode_data(
 
                const int64_t chunk_end = min(
                        i + chunk_sample_count, sample_count);
-               snapshot->get_samples(chunk, i, chunk_end);
+               _snapshot->get_samples(chunk, i, chunk_end);
 
                if (srd_session_send(session, i, i + sample_count, chunk,
                                (chunk_end - i) * unit_size) != SRD_OK) {
@@ -285,16 +313,12 @@ void DecoderStack::decode_data(
 
 void DecoderStack::decode_proc(shared_ptr<data::Logic> data)
 {
+       optional<int64_t> sample_count;
        srd_session *session;
        srd_decoder_inst *prev_di = NULL;
 
        assert(data);
-
-       // Check we have a snapshot of data
-       const deque< shared_ptr<pv::data::LogicSnapshot> > &snapshots =
-               data->get_snapshots();
-       if (snapshots.empty())
-               return;
+       assert(_snapshot);
 
        // Check that all decoders have the required probes
        BOOST_FOREACH(const shared_ptr<decode::Decoder> &dec, _stack)
@@ -306,8 +330,7 @@ void DecoderStack::decode_proc(shared_ptr<data::Logic> data)
        assert(session);
 
        // Create the decoders
-       const shared_ptr<pv::data::LogicSnapshot> &snapshot = snapshots.front();
-       const unsigned int unit_size = snapshot->unit_size();
+       const unsigned int unit_size = _snapshot->unit_size();
 
        BOOST_FOREACH(const shared_ptr<decode::Decoder> &dec, _stack)
        {
@@ -326,6 +349,12 @@ void DecoderStack::decode_proc(shared_ptr<data::Logic> data)
                prev_di = di;
        }
 
+       // Get the intial sample count
+       {
+               unique_lock<mutex> input_lock(_input_mutex);
+               sample_count = _sample_count = _snapshot->get_sample_count();
+       }
+
        // Start the session
        srd_session_metadata_set(session, SRD_CONF_SAMPLERATE,
                g_variant_new_uint64((uint64_t)_samplerate));
@@ -335,7 +364,9 @@ void DecoderStack::decode_proc(shared_ptr<data::Logic> data)
 
        srd_session_start(session);
 
-       decode_data(snapshot, session);
+       do {
+               decode_data(*sample_count, unit_size, session);
+       } while(_error_message.isEmpty() && (sample_count = wait_for_data()));
 
        // Destroy the session
        srd_session_destroy(session);
@@ -391,5 +422,23 @@ void DecoderStack::on_new_frame()
        begin_decode();
 }
 
+void DecoderStack::on_data_received()
+{
+       {
+               unique_lock<mutex> lock(_input_mutex);
+               _sample_count = _snapshot->get_sample_count();
+       }
+       _input_cond.notify_one();
+}
+
+void DecoderStack::on_frame_ended()
+{
+       {
+               unique_lock<mutex> lock(_input_mutex);
+               _frame_complete = true;
+       }
+       _input_cond.notify_one();
+}
+
 } // namespace data
 } // namespace pv
index 308dce6edde0ad8b480bf65beba7cc6b6ca7cac9..073f2692565da5db9f0024d74ae4f5cb9581c48e 100644 (file)
@@ -25,6 +25,7 @@
 
 #include <list>
 
+#include <boost/optional.hpp>
 #include <boost/shared_ptr.hpp>
 #include <boost/thread.hpp>
 
@@ -103,9 +104,10 @@ public:
        void begin_decode();
 
 private:
-       void decode_data(
-               const boost::shared_ptr<pv::data::LogicSnapshot> &snapshot,
-               srd_session *const session);
+       boost::optional<int64_t> wait_for_data() const;
+
+       void decode_data(const int64_t sample_count,
+               const unsigned int unit_size, srd_session *const session);
 
        void decode_proc(boost::shared_ptr<data::Logic> data);
 
@@ -115,6 +117,10 @@ private:
 private slots:
        void on_new_frame();
 
+       void on_data_received();
+
+       void on_frame_ended();
+
 signals:
        void new_decode_data();
 
@@ -131,6 +137,13 @@ private:
 
        std::list< boost::shared_ptr<decode::Decoder> > _stack;
 
+       boost::shared_ptr<pv::data::LogicSnapshot> _snapshot;
+
+       mutable boost::mutex _input_mutex;
+       mutable boost::condition_variable _input_cond;
+       int64_t _sample_count;
+       bool _frame_complete;
+
        mutable boost::mutex _output_mutex;
        int64_t _samples_decoded;