Session: Enable logic data acquisition using gstreamer
authorSoeren Apel <soeren@apelpie.net>
Sat, 29 Dec 2018 21:58:57 +0000 (22:58 +0100)
committerSoeren Apel <soeren@apelpie.net>
Fri, 18 Jan 2019 18:40:01 +0000 (19:40 +0100)
For now, reading data from /tmp/dummy_binary

pv/session.cpp
pv/session.hpp

index 7db96ff0abf568d0199d948afc7853bbbf499b9c..d406b9f21ba040c1fb4707c0f261bb7c75b05b33 100644 (file)
 
 #include <libsigrokcxx/libsigrokcxx.hpp>
 
+#ifdef ENABLE_FLOW
+#include <gstreamermm.h>
+#include <libsigrokflow/libsigrokflow.hpp>
+#endif
+
 #ifdef ENABLE_DECODE
 #include <libsigrokdecode/libsigrokdecode.h>
 #include "data/decodesignal.hpp"
@@ -74,6 +79,9 @@ using std::recursive_mutex;
 using std::runtime_error;
 using std::shared_ptr;
 using std::string;
+#ifdef ENABLE_FLOW
+using std::unique_lock;
+#endif
 using std::unique_ptr;
 using std::unordered_set;
 using std::vector;
@@ -91,6 +99,12 @@ using sigrok::Session;
 
 using Glib::VariantBase;
 
+#ifdef ENABLE_FLOW
+using Gst::Bus;
+using Gst::ElementFactory;
+using Gst::Pipeline;
+#endif
+
 namespace pv {
 
 shared_ptr<sigrok::Context> Session::sr_context;
@@ -947,6 +961,35 @@ void Session::sample_thread_proc(function<void (const QString)> error_handler)
 {
        assert(error_handler);
 
+#ifdef ENABLE_FLOW
+       pipeline_ = Pipeline::create();
+
+       source_ = ElementFactory::create_element("filesrc", "source");
+       sink_ = RefPtr<AppSink>::cast_dynamic(ElementFactory::create_element("appsink", "sink"));
+
+       pipeline_->add(source_)->add(sink_);
+       source_->link(sink_);
+
+       source_->set_property("location", Glib::ustring("/tmp/dummy_binary"));
+
+       sink_->set_property("emit-signals", TRUE);
+       sink_->signal_new_sample().connect(sigc::mem_fun(*this, &Session::on_gst_new_sample));
+
+       // Get the bus from the pipeline and add a bus watch to the default main context
+       RefPtr<Bus> bus = pipeline_->get_bus();
+       bus->add_watch(sigc::mem_fun(this, &Session::on_gst_bus_message));
+
+       // Start pipeline and Wait until it finished processing
+       pipeline_done_interrupt_ = false;
+       pipeline_->set_state(Gst::STATE_PLAYING);
+
+       unique_lock<mutex> pipeline_done_lock_(pipeline_done_mutex_);
+       pipeline_done_cond_.wait(pipeline_done_lock_);
+
+       // Let the pipeline free all resources
+       pipeline_->set_state(Gst::STATE_NULL);
+
+#else
        if (!device_)
                return;
 
@@ -993,6 +1036,7 @@ void Session::sample_thread_proc(function<void (const QString)> error_handler)
        // Confirm that SR_DF_END was received
        if (cur_logic_segment_)
                qDebug() << "WARNING: SR_DF_END was not received.";
+#endif
 
        // Optimize memory usage
        free_unused_memory();
@@ -1069,6 +1113,49 @@ void Session::signal_segment_completed()
                segment_completed(segment_id);
 }
 
+#ifdef ENABLE_FLOW
+bool Session::on_gst_bus_message(const Glib::RefPtr<Gst::Bus>& bus, const Glib::RefPtr<Gst::Message>& message)
+{
+       (void)bus;
+
+       if ((message->get_source() == pipeline_) && \
+               ((message->get_message_type() == Gst::MESSAGE_EOS)))
+               pipeline_done_cond_.notify_one();
+
+       // TODO Also evaluate MESSAGE_STREAM_STATUS to receive error notifications
+
+       return true;
+}
+
+Gst::FlowReturn Session::on_gst_new_sample()
+{
+       RefPtr<Gst::Sample> sample = sink_->pull_sample();
+       RefPtr<Gst::Buffer> buf = sample->get_buffer();
+
+       for (uint32_t block_id = 0; block_id < buf->n_memory(); block_id++) {
+               RefPtr<Gst::Memory> buf_mem = buf->get_memory(block_id);
+               Gst::MapInfo mapinfo;
+               buf_mem->map(mapinfo, Gst::MAP_READ);
+
+               shared_ptr<sigrok::Packet> logic_packet =
+                       sr_context->create_logic_packet(mapinfo.get_data(), buf->get_size(), 1);
+
+               try {
+                       feed_in_logic(dynamic_pointer_cast<sigrok::Logic>(logic_packet->payload()));
+               } catch (bad_alloc&) {
+                       out_of_memory_ = true;
+                       device_->stop();
+                       buf_mem->unmap(mapinfo);
+                       return Gst::FLOW_ERROR;
+               }
+
+               buf_mem->unmap(mapinfo);
+       }
+
+       return Gst::FLOW_OK;
+}
+#endif
+
 void Session::feed_in_header()
 {
        // Nothing to do here for now
index 2ee31cfee5afb8ef6de44171322d1d20086e755a..9566b4a06a739efd2d198fb42f4ae6cfddcf71fb 100644 (file)
 #ifndef PULSEVIEW_PV_SESSION_HPP
 #define PULSEVIEW_PV_SESSION_HPP
 
+#ifdef ENABLE_FLOW
+#include <atomic>
+#include <condition_variable>
+#endif
+
 #include <functional>
 #include <map>
 #include <memory>
 #include <QSettings>
 #include <QString>
 
+#ifdef ENABLE_FLOW
+#include <gstreamermm.h>
+#include <libsigrokflow/libsigrokflow.hpp>
+#endif
+
 #include "util.hpp"
 #include "views/viewbase.hpp"
 
+
 using std::function;
 using std::list;
 using std::map;
@@ -46,6 +57,13 @@ using std::shared_ptr;
 using std::string;
 using std::unordered_set;
 
+#ifdef ENABLE_FLOW
+using Glib::RefPtr;
+using Gst::AppSink;
+using Gst::Element;
+using Gst::Pipeline;
+#endif
+
 struct srd_decoder;
 struct srd_channel;
 
@@ -203,6 +221,12 @@ private:
        void signal_new_segment();
        void signal_segment_completed();
 
+#ifdef ENABLE_FLOW
+       bool on_gst_bus_message(const Glib::RefPtr<Gst::Bus>& bus, const Glib::RefPtr<Gst::Message>& message);
+
+       Gst::FlowReturn on_gst_new_sample();
+#endif
+
        void feed_in_header();
 
        void feed_in_meta(shared_ptr<sigrok::Meta> meta);
@@ -272,6 +296,16 @@ private:
        bool out_of_memory_;
        bool data_saved_;
        bool frame_began_;
+
+#ifdef ENABLE_FLOW
+       RefPtr<Pipeline> pipeline_;
+       RefPtr<Element> source_;
+       RefPtr<AppSink> sink_;
+
+       mutable mutex pipeline_done_mutex_;
+       mutable condition_variable pipeline_done_cond_;
+       atomic<bool> pipeline_done_interrupt_;
+#endif
 };
 
 } // namespace pv