diff --git a/conanfile.py b/conanfile.py index cbcdf82..212b441 100644 --- a/conanfile.py +++ b/conanfile.py @@ -23,7 +23,7 @@ def requirements(self): self.requires("nlohmann_json/3.11.3") # self.requires("json-schema-validator/2.3.0") self.requires("libxvc/0.1.2") - self.requires("xdaqmetadata/0.1.1") + self.requires("xdaqmetadata/0.1.2") self.requires("oatpp/1.3.0.latest") def layout(self): diff --git a/thorvision/CMakeLists.txt b/thorvision/CMakeLists.txt index 5fb7964..c14ba8c 100644 --- a/thorvision/CMakeLists.txt +++ b/thorvision/CMakeLists.txt @@ -6,6 +6,7 @@ find_package(libxvc REQUIRED) find_package(Qt6 REQUIRED COMPONENTS Qml Quick QuickControls2 Svg) find_package(PkgConfig REQUIRED) pkg_search_module(gstreamer REQUIRED IMPORTED_TARGET gstreamer-1.0>=1.4) +pkg_search_module(gstreamer-codecparsers REQUIRED IMPORTED_TARGET gstreamer-codecparsers-1.0>=1.4) find_package(Python3 REQUIRED COMPONENTS Interpreter) find_package(oatpp REQUIRED) @@ -107,7 +108,7 @@ set_target_properties(ThorVision AUTOMOC ON AUTORCC ON MACOSX_BUNDLE TRUE - WIN32_EXECUTABLE TRUE + WIN32_EXECUTABLE FALSE RUNTIME_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}" INSTALL_RPATH "$ORIGIN;@executable_path/../Frameworks" MACOSX_BUNDLE_GUI_IDENTIFIER "com.KonteX.ThorVision" @@ -134,8 +135,21 @@ target_compile_options(ThorVision ) if(CMAKE_BUILD_TYPE MATCHES "Debug") - target_compile_options(ThorVision PRIVATE -fsanitize=address,undefined) - target_link_options(ThorVision PRIVATE -fsanitize=address,undefined) + if(MSVC) + target_compile_definitions(ThorVision PRIVATE + GSTREAMER_QML_DEBUG_PATH="../../gstreamer_qml" + ) + + # Copy the custom gstreamer_qml folder to the build directory + add_custom_command(TARGET ThorVision POST_BUILD + COMMAND ${CMAKE_COMMAND} -E copy_directory + "${CMAKE_SOURCE_DIR}/gstreamer_qml" + "$/../../gstreamer_qml" + ) + elseif(APPLE) + target_compile_options(ThorVision PRIVATE -fsanitize=address,undefined) + target_link_options(ThorVision PRIVATE -fsanitize=address,undefined) + endif() endif() target_link_libraries(ThorVision @@ -204,6 +218,7 @@ if(WIN32) # "gstqsv.dll" # https://gstreamer.freedesktop.org/documentation/qsv/index.html?gi-language=c "gstd3d11.dll" # https://gstreamer.freedesktop.org/documentation/d3d11/index.html?gi-language=c "gstqt6d3d11.dll" # https://gstreamer.freedesktop.org/documentation/qt6d3d11/index.html?gi-language=c + "gstvideorate.dll" # https://gstreamer.freedesktop.org/documentation/videorate/index.html?gi-language=c ) list(TRANSFORM gst_plugins PREPEND "${gst_plugin_dir}/") diff --git a/thorvision/src/CameraItem.cc b/thorvision/src/CameraItem.cc index 7ced196..1fa4311 100644 --- a/thorvision/src/CameraItem.cc +++ b/thorvision/src/CameraItem.cc @@ -62,8 +62,12 @@ CameraItem::CameraItem(Camera *camera, QObject *parent) return fa > fb; }); + static const QMap codec_priority = { + {tr("M-JPEG"), 0}, + {tr("H.265"), 1}, + }; std::sort(_codecs.begin(), _codecs.end(), [&](const QString &a, const QString &b) { - return _codecs.indexOf(a) > _codecs.indexOf(b); + return codec_priority.value(a, 99) < codec_priority.value(b, 99); }); _caps.insert(0, ""); @@ -122,8 +126,8 @@ void CameraItem::set_cap(const QString &cap) ); const auto &gst_cap = _quality_format[{_cap, _codec}]; - _stream->start(gst_cap.media_type); _camera->start(gst_cap); + _stream->start(gst_cap.media_type); } void CameraItem::set_codec(const QString &codec) @@ -148,6 +152,8 @@ void CameraItem::set_codec(const QString &codec) void CameraItem::update_metadata(const XDAQFrameData &metadata) { + // spdlog::info("XDAQ Timestamp: {}", metadata.fpga_timestamp); + _metadata = metadata; emit metadata_changed(); } @@ -172,18 +178,33 @@ void CameraItem::start_recording(RecorderSettings *settings) auto filepath = fs::path(settings->save_paths().at(0).toStdString()) / settings->dir_name().toStdString() / _camera->name(); - xvc::start_jpeg_recording( - GST_PIPELINE(_stream->_pipeline), - filepath, - settings->split_on(), - settings->split_length(), - to_time_unit(settings->split_unit_index()) - ); + if (_codec == tr("H.265")) { + _stream->start_h265_recording( + filepath, + settings->split_on(), + settings->split_length(), + to_time_unit(settings->split_unit_index()) + ); + } else if (_codec == "M-JPEG") { + xvc::start_jpeg_recording( + GST_PIPELINE(_stream->_pipeline), + filepath, + settings->split_on(), + settings->split_length(), + to_time_unit(settings->split_unit_index()) + ); + } else { + spdlog::warn("Unsupported codec: {}", _codec.toStdString()); + } } void CameraItem::stop_recording() { spdlog::info("Stopping recording for camera {}", _camera->id()); - xvc::stop_jpeg_recording(GST_PIPELINE(_stream->_pipeline)); + if (_codec == tr("H.265")) { + _stream->stop_h265_recording(); + } else { + xvc::stop_jpeg_recording(GST_PIPELINE(_stream->_pipeline)); + } } \ No newline at end of file diff --git a/thorvision/src/CameraItem.h b/thorvision/src/CameraItem.h index 4a4c0e7..4d912a7 100644 --- a/thorvision/src/CameraItem.h +++ b/thorvision/src/CameraItem.h @@ -6,6 +6,7 @@ #include #include #include +#include #include "RecorderSettings.h" #include "Stream.h" @@ -70,6 +71,13 @@ class CameraItem : public QObject bool is_streaming() const { return _stream ? _stream->_streaming.load() : false; }; + void cleanup_stream() + { + if (_stream) { + _stream->reset(); + } + } + signals: void name_changed(); void cap_changed(); diff --git a/thorvision/src/CameraModel.h b/thorvision/src/CameraModel.h index 2a7fd34..8731327 100644 --- a/thorvision/src/CameraModel.h +++ b/thorvision/src/CameraModel.h @@ -47,6 +47,18 @@ class CameraModel : public QAbstractListModel bool all_cameras_streaming() const; + const QList &cameras() const { return _cameras; }; + + void cleanup_all_streams() + { + spdlog::info( + "CameraModel::cleanup_all_streams() - cleaning up {} cameras", _cameras.size() + ); + for (auto cam : _cameras) { + cam->cleanup_stream(); + } + } + public: int rowCount(const QModelIndex &parent = QModelIndex()) const override; Q_INVOKABLE QVariant data(const QModelIndex &index, int role = Qt::DisplayRole) const override; @@ -63,6 +75,7 @@ class CameraModel : public QAbstractListModel public slots: void onItemAdded(int index, QQuickItem *item); + // void onLoaderLoaded(); private: QList _cameras; diff --git a/thorvision/src/Stream.h b/thorvision/src/Stream.h index d70f14d..84fef0d 100644 --- a/thorvision/src/Stream.h +++ b/thorvision/src/Stream.h @@ -1,8 +1,22 @@ #pragma once +#include + +#include #include +#include +#include +#include +#include +#include +#include #include "xdaqmetadata/metadata_handler.h" +#include "xdaqvc/xvc.h" + + + +namespace fs = std::filesystem; struct Stream : public QObject { Q_OBJECT @@ -22,46 +36,100 @@ struct Stream : public QObject { std::optional _base_time; std::unique_ptr _metadata_handler; + std::atomic _recording_active{false}; + + std::deque _pre_record_buffer; + std::mutex _pre_record_mutex; + static constexpr GstClockTime PRE_RECORD_DURATION = 1 * GST_SECOND; + gulong _buffer_collector_probe_id{0}; + size_t _last_keyframe_index{0}; // Index of the most recent keyframe in _pre_record_buffer + + // FPS monitoring for queue_record + std::atomic _fps_frame_count{0}; + std::atomic _fps_last_report_time{0}; + gulong _fps_monitor_probe_id{0}; + // TODO: media_type static std::string pipeline(std::string_view uri, [[maybe_unused]] std::string_view media_type) { + if (media_type == "video/x-h265") { #ifdef _WIN32 - return fmt::format( - "srtclientsrc name=src uri=srt://{} keep-listening=true latency=125 ! " - "jpegparse name=parser ! " - "tee name=t ! " - "queue name=queue_dec leaky=2 ! " - "jpegdec name=dec ! " - "d3d11upload name=upload ! " - "d3d11convert name=conv ! video/x-raw(memory:D3D11Memory), format=(string)RGB ! " - "queue name=queue_sink leaky=2 ! " - "fpsdisplaysink name=sink sync=false text-overlay=false", - uri - ); - // auto dec = gst_element_factory_make("qsvjpegdec", "dec"); - // auto dec = gst_element_factory_make("nvjpegdec", "dec"); - // auto dec = gst_element_factory_make("decodebin", "dec"); + // Windows H.265 pipeline using d3d11h265dec + return fmt::format( + "srtclientsrc name=src uri=srt://{} keep-listening=true latency=125 ! " + "h265parse name=parser ! video/x-h265,stream-format=byte-stream,alignment=au ! " + "tee name=t ! " + "queue name=queue_dec leaky=2 ! " + "d3d11h265dec name=dec ! " + "d3d11convert name=conv ! video/x-raw(memory:D3D11Memory), format=(string)RGB ! " + "videorate max-rate=30 ! " + "queue name=queue_sink leaky=2 ! " + "fpsdisplaysink name=sink sync=false text-overlay=false " + "t. ! queue name=queue_record max-size-time=10000000000 max-size-buffers=10 " + "max-size-bytes=0 ! " + "fakesink name=record_sink async=false sync=false", + uri + ); #elif __APPLE__ - return fmt::format( - "srtclientsrc name=src uri=srt://{} keep-listening=true latency=125 ! " - "jpegparse name=parser ! " - "tee name=t ! " - "queue name=queue_dec leaky=2 ! " - "vtdec name=dec ! video/x-raw, format=(string)NV12 ! " - "glupload name=upload ! video/x-raw(memory:GLMemory) ! " - "glcolorconvert name=conv ! video/x-raw(memory:GLMemory), format=(string)RGB ! " - "queue name=queue_sink leaky=2 ! " - "fpsdisplaysink name=sink sync=false text-overlay=false", - uri - ); + // macOS H.265 pipeline using vtdec + return fmt::format( + "srtclientsrc name=src uri=srt://{} keep-listening=true latency=125 ! " + "h265parse name=parser ! video/x-h265,stream-format=byte-stream,alignment=au ! " + "tee name=t ! " + "queue name=queue_dec leaky=2 ! " + "h265parse ! video/x-h265,stream-format=hvc1,alignment=au ! " + "vtdec name=dec ! video/x-raw, format=(string)NV12 ! " + "glupload name=upload ! video/x-raw(memory:GLMemory) ! " + "glcolorconvert name=conv ! video/x-raw(memory:GLMemory), format=(string)RGB ! " + "queue name=queue_sink leaky=2 ! " + "fpsdisplaysink name=sink sync=false text-overlay=false " + "t. ! queue name=queue_record max-size-time=10000000000 max-size-buffers=10 " + "max-size-bytes=0 ! " + "fakesink name=record_sink async=false sync=false", + uri + ); #endif + } else if (media_type == "image/jpeg") { +#ifdef _WIN32 + return fmt::format( + "srtclientsrc name=src uri=srt://{} keep-listening=true latency=125 ! " + "jpegparse name=parser ! " + "tee name=t ! " + "queue name=queue_dec leaky=2 ! " + "jpegdec name=dec ! " + "d3d11upload name=upload ! " + "d3d11convert name=conv ! video/x-raw(memory:D3D11Memory), format=(string)RGB ! " + "queue name=queue_sink leaky=2 ! " + "fpsdisplaysink name=sink sync=false text-overlay=false", + uri + ); + // auto dec = gst_element_factory_make("qsvjpegdec", "dec"); + // auto dec = gst_element_factory_make("nvjpegdec", "dec"); + // auto dec = gst_element_factory_make("decodebin", "dec"); +#elif __APPLE__ + return fmt::format( + "srtclientsrc name=src uri=srt://{} keep-listening=true latency=125 ! " + "jpegparse name=parser ! " + "tee name=t ! " + "queue name=queue_dec leaky=2 ! " + "vtdec name=dec ! video/x-raw, format=(string)NV12 ! " + "glupload name=upload ! video/x-raw(memory:GLMemory) ! " + "glcolorconvert name=conv ! video/x-raw(memory:GLMemory), format=(string)RGB ! " + "queue name=queue_sink leaky=2 ! " + "fpsdisplaysink name=sink sync=false text-overlay=false", + uri + ); +#endif + } else { + spdlog::error("Unsupported media type: {}", media_type); + return ""; + } } static gboolean bus_handler([[maybe_unused]] GstBus *bus, GstMessage *msg, gpointer user_data) { auto stream = static_cast(user_data); if (!stream) { - spdlog::error("Invalid 'Stream' cast in 'bus_handler' callback"); return G_SOURCE_REMOVE; } @@ -101,46 +169,57 @@ struct Stream : public QObject { auto const structure = gst_message_get_structure(msg); if (gst_structure_has_name(structure, "GstBinForwarded")) { GstMessage *forward_msg = nullptr; - gst_structure_get(structure, "message", GST_TYPE_MESSAGE, &forward_msg, nullptr); + if (GST_MESSAGE_TYPE(forward_msg) == GST_MESSAGE_EOS) { auto element_name = GST_OBJECT_NAME(GST_MESSAGE_SRC(forward_msg)); spdlog::info("EOS from element {}", element_name); + std::string name_str(element_name); - if (fmt::format("{}", element_name) != "filesink") { - spdlog::info("Not filesink EOS, ignore"); - gst_message_unref(forward_msg); - break; - } + // H.265 recording path - splitmux cleanup + if (name_str.find("splitmux") != std::string::npos) { + auto pipeline = GST_BIN(stream->_pipeline); + auto splitmux = gst_bin_get_by_name(pipeline, "splitmux"); - auto pipeline = GST_BIN(stream->_pipeline); - auto tee = gst_bin_get_by_name(pipeline, "t"); - auto queue = gst_bin_get_by_name(pipeline, "queue_record"); - auto parser = gst_bin_get_by_name(pipeline, "record_parser"); - auto filesink = gst_bin_get_by_name(pipeline, "filesink"); + if (splitmux) { + gst_element_set_state(splitmux, GST_STATE_NULL); + gst_bin_remove(pipeline, splitmux); + gst_object_unref(splitmux); + spdlog::info("H.265 recording finalized and splitmux removed"); + } + } + // JPEG recording path - filesink cleanup + else if (name_str == "filesink") { + auto pipeline = GST_BIN(stream->_pipeline); + auto tee = gst_bin_get_by_name(pipeline, "t"); + auto queue = gst_bin_get_by_name(pipeline, "queue_record"); + auto parser = gst_bin_get_by_name(pipeline, "record_parser"); + auto filesink = gst_bin_get_by_name(pipeline, "filesink"); - auto queue_sinkpad = gst_element_get_static_pad(queue, "sink"); - auto tee_srcpad = gst_pad_get_peer(queue_sinkpad); + auto queue_sinkpad = gst_element_get_static_pad(queue, "sink"); + auto tee_srcpad = gst_pad_get_peer(queue_sinkpad); - // TODO: error handling - gst_pad_unlink(tee_srcpad, queue_sinkpad); + gst_pad_unlink(tee_srcpad, queue_sinkpad); - gst_element_set_state(queue, GST_STATE_NULL); - gst_element_set_state(parser, GST_STATE_NULL); - gst_element_set_state(filesink, GST_STATE_NULL); + gst_element_set_state(queue, GST_STATE_NULL); + gst_element_set_state(parser, GST_STATE_NULL); + gst_element_set_state(filesink, GST_STATE_NULL); - gst_bin_remove_many(pipeline, queue, parser, filesink, nullptr); + gst_bin_remove_many(pipeline, queue, parser, filesink, nullptr); - gst_element_release_request_pad(tee, tee_srcpad); + gst_element_release_request_pad(tee, tee_srcpad); - gst_object_unref(tee_srcpad); - gst_object_unref(tee); - gst_object_unref(queue_sinkpad); - gst_object_unref(queue); - gst_object_unref(parser); - gst_object_unref(filesink); + gst_object_unref(tee_srcpad); + gst_object_unref(tee); + gst_object_unref(queue_sinkpad); + gst_object_unref(queue); + gst_object_unref(parser); + gst_object_unref(filesink); - spdlog::debug("Unlinked"); + spdlog::info("JPEG recording finalized"); + } else { + spdlog::debug("EOS from {}, ignoring", name_str); + } } gst_message_unref(forward_msg); } @@ -209,29 +288,74 @@ struct Stream : public QObject { if (!sink) { spdlog::error("Failed to create 'qml6d3d11sink' element"); gst_object_unref(_pipeline); + return; } + // _video_item is GstD3D11Qt6VideoItem created by QML + g_object_set(sink, "widget", _video_item, nullptr); + spdlog::info("D3D11 sink widget set"); #elif __APPLE__ auto sink = gst_element_factory_make("qml6glsink", "sink"); if (!sink) { spdlog::error("Failed to create 'qml6glsink' element"); gst_object_unref(_pipeline); + return; } + g_object_set(sink, "widget", _video_item, nullptr); #endif auto fpsdisplaysink = gst_bin_get_by_name(GST_BIN(_pipeline), "sink"); g_object_set(_pipeline, "message-forward", true, nullptr); - g_object_set(sink, "sync", false, "widget", _video_item, nullptr); + g_object_set(sink, "sync", false, nullptr); g_object_set(fpsdisplaysink, "video-sink", sink, nullptr); auto parser = gst_bin_get_by_name(GST_BIN(_pipeline), "parser"); auto parser_srcpad = gst_element_get_static_pad(parser, "src"); - gst_pad_add_probe( - parser_srcpad, - GST_PAD_PROBE_TYPE_BUFFER, - parse_jpeg_metadata, - _metadata_handler.get(), - nullptr - ); + // Check if the parser is actually jpegparse before adding the JPEG metadata probe + gchar *factory_name = nullptr; + auto factory = gst_element_get_factory(parser); + if (factory) { + factory_name = gst_plugin_feature_get_name(GST_PLUGIN_FEATURE(factory)); + } + + if (factory_name && std::string(factory_name) == "jpegparse") { + gst_pad_add_probe( + parser_srcpad, + GST_PAD_PROBE_TYPE_BUFFER, + parse_jpeg_metadata, + _metadata_handler.get(), + nullptr + ); + } else if (factory_name && std::string(factory_name) == "h265parse") { + gst_pad_add_probe( + parser_srcpad, + GST_PAD_PROBE_TYPE_BUFFER, + parse_h265_metadata, + _metadata_handler.get(), + nullptr + ); + + // Add buffer collector probe on queue_record's src pad for pre-recording + auto queue_record = gst_bin_get_by_name(GST_BIN(_pipeline), "queue_record"); + if (queue_record) { + auto queue_srcpad = gst_element_get_static_pad(queue_record, "src"); + _buffer_collector_probe_id = gst_pad_add_probe( + queue_srcpad, GST_PAD_PROBE_TYPE_BUFFER, buffer_collector_probe, this, nullptr + ); + spdlog::info("Buffer collector probe installed for pre-recording"); + + // Also add FPS monitor probe on queue_record's sink pad to measure incoming frame + // rate + auto queue_sinkpad = gst_element_get_static_pad(queue_record, "sink"); + _fps_monitor_probe_id = gst_pad_add_probe( + queue_sinkpad, GST_PAD_PROBE_TYPE_BUFFER, fps_monitor_probe, this, nullptr + ); + spdlog::info("FPS monitor probe installed on queue_record sink"); + gst_object_unref(queue_sinkpad); + + gst_object_unref(queue_srcpad); + gst_object_unref(queue_record); + } + } gst_object_unref(fpsdisplaysink); gst_object_unref(parser_srcpad); gst_object_unref(parser); @@ -250,53 +374,353 @@ struct Stream : public QObject { static GstPadProbeReturn extract_metadata(GstPad *, GstPadProbeInfo *info, gpointer user_data) { auto stream = static_cast(user_data); + // if (!stream || stream->_destroying.load()) { + // return GST_PAD_PROBE_OK; // Don't access stream + // } auto buffer = GST_PAD_PROBE_INFO_BUFFER(info); - if (!buffer) return GST_PAD_PROBE_DROP; + if (!buffer) { + spdlog::error("Buffer is null"); + return GST_PAD_PROBE_DROP; + } GstMapInfo map; - if (!gst_buffer_map(buffer, &map, GST_MAP_READ)) return GST_PAD_PROBE_DROP; + if (!gst_buffer_map(buffer, &map, GST_MAP_READ)) { + spdlog::error("Failed to map buffer"); + return GST_PAD_PROBE_DROP; + } gst_buffer_unmap(buffer, &map); - if (auto xdaqmetadata = stream->_metadata_handler->safe_deque.check_pts_pop_timestamp( - GST_BUFFER_PTS(buffer) - )) { - emit stream->metadata_received(xdaqmetadata.value_or(XDAQFrameData{0, 0, 0, 0, 0, 0})); + auto xdaqmetadata = + stream->_metadata_handler->safe_deque.check_pts_pop_timestamp(GST_BUFFER_PTS(buffer)); + + if (xdaqmetadata) { + // emit stream->metadata_received(xdaqmetadata.value()); + // Throttle signal emission to ~20 Hz (every 50ms) + static thread_local GstClockTime last_emit_time = 0; + GstClockTime current_pts = GST_BUFFER_PTS(buffer); + + if (last_emit_time == 0 || (GST_BUFFER_PTS_IS_VALID(buffer) && + current_pts - last_emit_time >= 10 * GST_MSECOND)) { + emit stream->metadata_received(xdaqmetadata.value()); + last_emit_time = current_pts; + } } return GST_PAD_PROBE_OK; } + static GstPadProbeReturn fps_monitor_probe( + [[maybe_unused]] GstPad *pad, GstPadProbeInfo *info, gpointer user_data + ) + { + auto stream = static_cast(user_data); + if (!stream) { + return GST_PAD_PROBE_REMOVE; + } + + auto buffer = GST_PAD_PROBE_INFO_BUFFER(info); + + if (!buffer) { + return GST_PAD_PROBE_OK; + } + + stream->_fps_frame_count++; + + GstClockTime now = GST_BUFFER_PTS(buffer); + GstClockTime last_report = stream->_fps_last_report_time.load(); + + // Report every 2 seconds (based on PTS) + if (GST_BUFFER_PTS_IS_VALID(buffer) && + (last_report == 0 || now - last_report >= 60 * GST_SECOND)) { + if (last_report > 0) { + GstClockTime elapsed = now - last_report; + uint64_t frames = stream->_fps_frame_count.load(); + double fps = static_cast(frames) * GST_SECOND / elapsed; + spdlog::info( + "[FPS Monitor] queue_record: {} frames in {:.2f}s = {:.1f} FPS", + frames, + static_cast(elapsed) / GST_SECOND, + fps + ); + } + stream->_fps_frame_count = 0; + stream->_fps_last_report_time = now; + } + + return GST_PAD_PROBE_OK; + } + + static GstPadProbeReturn buffer_collector_probe( + [[maybe_unused]] GstPad *pad, GstPadProbeInfo *info, gpointer user_data + ) + { + auto stream = static_cast(user_data); + if (!stream) { + return GST_PAD_PROBE_REMOVE; + } + + // Replace the frame-counting approach with time-based logging + static thread_local auto last_log_time = std::chrono::steady_clock::now(); + static thread_local uint64_t total_copies = 0; + static thread_local uint64_t total_unrefs = 0; + auto now = std::chrono::steady_clock::now(); + auto elapsed = std::chrono::duration_cast(now - last_log_time); + // Periodic logging every 1 minute + if (elapsed.count() >= 1) { + last_log_time = now; + + auto queue = gst_bin_get_by_name(GST_BIN(stream->_pipeline), "queue_record"); + if (queue) { + guint current_level_buffers; + g_object_get(queue, "current-level-buffers", ¤t_level_buffers, nullptr); + spdlog::info( + "queue_record internal buffers: {}, pre_record_buffer: {}", + current_level_buffers, + stream->_pre_record_buffer.size() + ); + spdlog::info( + "Buffer stats: copies={}, unrefs={}, delta={}, buffer_count={}", + total_copies, + total_unrefs, + total_copies - total_unrefs, + stream->_pre_record_buffer.size() + ); + gst_object_unref(queue); + } + } + + auto buffer = GST_PAD_PROBE_INFO_BUFFER(info); + + // Validate buffer before any operations + if (!buffer || !GST_IS_BUFFER(buffer)) { + return GST_PAD_PROBE_OK; + } + + // Additional validation: check buffer has valid refcount + if (GST_MINI_OBJECT_REFCOUNT_VALUE(buffer) < 1) { + spdlog::warn("Buffer collector: invalid buffer refcount, skipping"); + return GST_PAD_PROBE_OK; + } + + // Only collect buffers when NOT recording + if (!stream->_recording_active.load()) { + std::lock_guard lock(stream->_pre_record_mutex); + + // Check if this is a keyframe (IDR/CRA frame) + // DELTA_UNIT flag is set for P/B frames, NOT set for keyframes + bool is_keyframe = !GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DELTA_UNIT); + + // Add new buffer with copy + GstBuffer *buffer_copy = gst_buffer_copy(buffer); + if (!buffer_copy) { + spdlog::error("Buffer collector: failed to copy buffer"); + return GST_PAD_PROBE_OK; + } + stream->_pre_record_buffer.push_back(buffer_copy); + total_copies++; + + // Track the index of the most recent keyframe + if (is_keyframe) { + stream->_last_keyframe_index = stream->_pre_record_buffer.size() - 1; + spdlog::debug( + "Buffer collector: keyframe detected at index {}", stream->_last_keyframe_index + ); + } + + spdlog::debug( + "Buffer collector: pushed buffer (keyframe={}), buffer size = {}", + is_keyframe, + stream->_pre_record_buffer.size() + ); + + // Remove old buffers beyond PRE_RECORD_DURATION + while (stream->_pre_record_buffer.size() > 1) { + auto oldest = stream->_pre_record_buffer.front(); + + // Validate oldest buffer before accessing + if (!oldest || !GST_IS_BUFFER(oldest)) { + spdlog::warn("Buffer collector: invalid oldest buffer, removing from deque"); + stream->_pre_record_buffer.pop_front(); + if (stream->_last_keyframe_index > 0) { + stream->_last_keyframe_index--; + } + continue; + } + + auto newest = stream->_pre_record_buffer.back(); + + // Validate newest buffer + if (!newest || !GST_IS_BUFFER(newest)) { + break; + } + + // Check if we have valid PTS values + if (GST_BUFFER_PTS_IS_VALID(oldest) && GST_BUFFER_PTS_IS_VALID(newest)) { + if (GST_BUFFER_PTS(newest) - GST_BUFFER_PTS(oldest) > PRE_RECORD_DURATION) { + // Validate refcount before unref + if (GST_MINI_OBJECT_REFCOUNT_VALUE(oldest) >= 1) { + gst_buffer_unref(oldest); + total_unrefs++; + } else { + spdlog::warn( + "Buffer collector: oldest buffer has invalid refcount {}", + GST_MINI_OBJECT_REFCOUNT_VALUE(oldest) + ); + } + stream->_pre_record_buffer.pop_front(); + // Adjust keyframe index since we removed from front + if (stream->_last_keyframe_index > 0) { + stream->_last_keyframe_index--; + } + spdlog::debug( + "Buffer collector: removed old buffer (by PTS), buffer size = {}", + stream->_pre_record_buffer.size() + ); + } else { + break; + } + } else { + // Fallback: limit by count (assume ~30fps, 10 seconds = ~300 frames) + if (stream->_pre_record_buffer.size() > 300) { + // Validate refcount before unref + if (GST_MINI_OBJECT_REFCOUNT_VALUE(oldest) >= 1) { + gst_buffer_unref(oldest); + total_unrefs++; + } else { + spdlog::warn( + "Buffer collector: oldest buffer has invalid refcount {}", + GST_MINI_OBJECT_REFCOUNT_VALUE(oldest) + ); + } + stream->_pre_record_buffer.pop_front(); + // Adjust keyframe index since we removed from front + if (stream->_last_keyframe_index > 0) { + stream->_last_keyframe_index--; + } + spdlog::debug( + "Buffer collector: removed old buffer (by count), buffer size = {}", + stream->_pre_record_buffer.size() + ); + } else { + break; + } + } + } + } + + spdlog::debug( + "Buffer collector: accepted buffer, current size = {}", + stream->_pre_record_buffer.size() + ); + return GST_PAD_PROBE_OK; + } + + void clear_pre_record_buffer() + { + std::lock_guard lock(_pre_record_mutex); + size_t count = _pre_record_buffer.size(); + size_t unreffed = 0; + for (auto buffer : _pre_record_buffer) { + if (buffer && GST_IS_BUFFER(buffer)) { + if (GST_MINI_OBJECT_REFCOUNT_VALUE(buffer) >= 1) { + gst_buffer_unref(buffer); + unreffed++; + } else { + spdlog::warn( + "clear_pre_record_buffer: buffer has invalid refcount {}", + GST_MINI_OBJECT_REFCOUNT_VALUE(buffer) + ); + } + } + } + _pre_record_buffer.clear(); + _last_keyframe_index = 0; + spdlog::debug("Pre-record buffer cleared, had {} frames, unreffed {}", count, unreffed); + } + void reset() { spdlog::info("Stream::reset()"); set_streaming(false); - // if (_pipeline) { - // } + // Clear pre-record buffer + clear_pre_record_buffer(); - auto bus = gst_pipeline_get_bus(_pipeline); - gst_bus_remove_watch(bus); - gst_object_unref(bus); + _metadata_handler = std::make_unique(); if (_pipeline) { + // Remove buffer collector probe and FPS monitor probe if installed + auto queue_record = gst_bin_get_by_name(GST_BIN(_pipeline), "queue_record"); + if (queue_record) { + if (_buffer_collector_probe_id != 0) { + auto queue_srcpad = gst_element_get_static_pad(queue_record, "src"); + gst_pad_remove_probe(queue_srcpad, _buffer_collector_probe_id); + gst_object_unref(queue_srcpad); + _buffer_collector_probe_id = 0; + } + if (_fps_monitor_probe_id != 0) { + auto queue_sinkpad = gst_element_get_static_pad(queue_record, "sink"); + gst_pad_remove_probe(queue_sinkpad, _fps_monitor_probe_id); + gst_object_unref(queue_sinkpad); + _fps_monitor_probe_id = 0; + } + gst_object_unref(queue_record); + } + + // Detach the QML video item from the sink BEFORE destroying the pipeline + // This prevents heap corruption from the qt6d3d11 plugin +#ifdef _WIN32 + auto fpsdisplaysink = gst_bin_get_by_name(GST_BIN(_pipeline), "sink"); + if (fpsdisplaysink) { + GstElement *video_sink = nullptr; + g_object_get(fpsdisplaysink, "video-sink", &video_sink, nullptr); + if (video_sink) { + g_object_set(video_sink, "widget", nullptr, nullptr); + spdlog::info("Detached QML widget from D3D11 sink"); + gst_object_unref(video_sink); + } + gst_object_unref(fpsdisplaysink); + } +#elif __APPLE__ + auto fpsdisplaysink = gst_bin_get_by_name(GST_BIN(_pipeline), "sink"); + if (fpsdisplaysink) { + GstElement *video_sink = nullptr; + g_object_get(fpsdisplaysink, "video-sink", &video_sink, nullptr); + if (video_sink) { + g_object_set(video_sink, "widget", nullptr, nullptr); + spdlog::info("Detached QML widget from GL sink"); + gst_object_unref(video_sink); + } + gst_object_unref(fpsdisplaysink); + } +#endif + + auto bus = gst_pipeline_get_bus(_pipeline); + gst_bus_remove_watch(bus); + gst_object_unref(bus); + gst_element_set_state(GST_ELEMENT(_pipeline), GST_STATE_NULL); gst_object_unref(_pipeline); + _pipeline = nullptr; } } void start(std::string_view media_type) { - if (!_pipeline) { - spdlog::error("Pipeline is null, cannot start stream"); - return; - } spdlog::info("Stream::start()"); - if (_streaming) { - spdlog::warn("Stream is already started, resetting..."); + // ALWAYS reset the pipeline to ensure a fresh connection and correct format + if (_pipeline) { reset(); - init_pipeline(pipeline(fmt::format("{}:{}", "192.168.177.100", _port), media_type)); + } + + // Re-initialize with the correct media_type + init_pipeline(pipeline(fmt::format("{}:{}", "192.168.177.100", _port), media_type)); + + if (!_pipeline) { + spdlog::error("Pipeline creation failed"); + return; } if (gst_element_set_state(GST_ELEMENT(_pipeline), GST_STATE_PLAYING) == @@ -324,12 +748,8 @@ struct Stream : public QObject { ~Stream() { spdlog::info("~Stream()"); - // set_streaming(false); - // if (_pipeline) { - // gst_element_set_state(GST_ELEMENT(_pipeline), GST_STATE_NULL); - // gst_object_unref(_pipeline); - // } + _streaming = false; reset(); } @@ -337,7 +757,515 @@ struct Stream : public QObject { { if (_streaming != now) { _streaming = now; - emit status_changed(_streaming); + emit status_changed(now); + } + } + + struct FileTracker { + std::string base_filepath; + std::vector file_paths; + int max_files; + }; + + static gchararray generate_filename( + [[maybe_unused]] GstElement *splitmux, [[maybe_unused]] guint fragment_id, gpointer udata + ) + { + auto tracker = static_cast(udata); + auto now = std::chrono::system_clock::now(); + auto time_t_now = std::chrono::system_clock::to_time_t(now); + std::tm tm_now; + +#ifdef _WIN32 + localtime_s(&tm_now, &time_t_now); +#else + localtime_r(&time_t_now, &tm_now); +#endif + + auto timestamp = fmt::format("{:%Y-%m-%d_%H-%M-%S}", tm_now); + auto file_path = fmt::format("{}-{}.mkv", tracker->base_filepath, timestamp); + + tracker->file_paths.emplace_back(file_path); + + if (tracker->file_paths.size() > static_cast(tracker->max_files)) { + auto _file_path = tracker->file_paths.front(); + fs::remove(_file_path); + spdlog::debug("Remove file: {}", _file_path.generic_string()); + + _file_path.replace_extension(".bin"); + fs::remove(_file_path); + spdlog::debug("Remove file: {}", _file_path.generic_string()); + + tracker->file_paths.erase(tracker->file_paths.begin()); + } + + return g_strdup(file_path.c_str()); + } + + void start_h265_recording( + const fs::path &filepath, bool continuous = true, int max_size_time = 10, + xvc::TimeUnit unit = xvc::TimeUnit::Minutes, bool loop = false, int max_files = 10 + ) + { + if (!_pipeline) { + spdlog::error("Pipeline is null"); + return; + } + + // Run pipeline modification on Qt main thread to avoid race condition + // with Qt's rendering thread (gstqt6d3d11 plugin) + if (QThread::currentThread() != QCoreApplication::instance()->thread()) { + spdlog::info("start_h265_recording: dispatching to Qt main thread"); + QMetaObject::invokeMethod( + QCoreApplication::instance(), + [this, filepath, continuous, max_size_time, unit, loop, max_files]() { + start_h265_recording_impl( + filepath, continuous, max_size_time, unit, loop, max_files + ); + }, + Qt::BlockingQueuedConnection + ); + return; + } + + start_h265_recording_impl(filepath, continuous, max_size_time, unit, loop, max_files); + } + + void start_h265_recording_impl( + const fs::path &filepath, bool continuous, int max_size_time, xvc::TimeUnit unit, bool loop, + int max_files + ) + { + { + std::lock_guard lock(_pre_record_mutex); + spdlog::info( + "Pre-record buffer state: {} frames, keyframe_index={}", + _pre_record_buffer.size(), + _last_keyframe_index + ); + if (!_pre_record_buffer.empty()) { + auto oldest = _pre_record_buffer.front(); + auto newest = _pre_record_buffer.back(); + if (GST_BUFFER_PTS_IS_VALID(oldest) && GST_BUFFER_PTS_IS_VALID(newest)) { + GstClockTime duration = GST_BUFFER_PTS(newest) - GST_BUFFER_PTS(oldest); + spdlog::info( + "Pre-record buffer PTS range: oldest={} ns, newest={} ns, duration={} ns " + "({:.2f} sec)", + GST_BUFFER_PTS(oldest), + GST_BUFFER_PTS(newest), + duration, + static_cast(duration) / GST_SECOND + ); + } else { + spdlog::warn( + "Pre-record buffer has invalid PTS: oldest_valid={}, newest_valid={}", + GST_BUFFER_PTS_IS_VALID(oldest), + GST_BUFFER_PTS_IS_VALID(newest) + ); + } + } + } + + // Create directory if needed + auto path = filepath.parent_path(); + if (!fs::exists(path)) { + spdlog::info("Create Directory: {}", path.generic_string()); + std::error_code ec; + if (!fs::create_directories(path, ec)) { + spdlog::info( + "Failed to create directory: {}. Error: {}", path.generic_string(), ec.message() + ); + } + } + + int time_seconds = max_size_time; + switch (unit) { + case xvc::TimeUnit::Minutes: time_seconds *= 60; break; + case xvc::TimeUnit::Hours: time_seconds *= 3600; break; + case xvc::TimeUnit::Days: time_seconds *= 86400; break; + default: break; + } + + // Get existing elements + auto queue_record = gst_bin_get_by_name(GST_BIN(_pipeline), "queue_record"); + auto fakesink = gst_bin_get_by_name(GST_BIN(_pipeline), "record_sink"); + + if (!queue_record || !fakesink) { + spdlog::error("Could not find queue_record or record_sink"); + if (queue_record) gst_object_unref(queue_record); + if (fakesink) gst_object_unref(fakesink); + return; + } + + // Create new recording elements + auto record_parser = gst_element_factory_make("h265parse", "record_parser"); + auto cf_parser = gst_element_factory_make("capsfilter", "cf_record_parser"); + auto muxer = gst_element_factory_make("matroskamux", "muxer"); + auto splitmux = gst_element_factory_make("splitmuxsink", "splitmux"); + + if (!record_parser || !cf_parser || !muxer || !splitmux) { + spdlog::error("Failed to create recording elements"); + gst_object_unref(queue_record); + gst_object_unref(fakesink); + if (record_parser) gst_object_unref(record_parser); + if (cf_parser) gst_object_unref(cf_parser); + if (muxer) gst_object_unref(muxer); + if (splitmux) gst_object_unref(splitmux); + return; + } + + // Configure caps filter for hvc1 format + auto caps = gst_caps_new_simple( + "video/x-h265", + "stream-format", + G_TYPE_STRING, + "hvc1", + "alignment", + G_TYPE_STRING, + "au", + nullptr + ); + g_object_set(cf_parser, "caps", caps, nullptr); + gst_caps_unref(caps); + + // Configure muxer + g_object_set(muxer, "timecodescale", 1, "offset-to-zero", TRUE, nullptr); + + + max_files = loop ? max_files : INT_MAX; + + auto tracker = + std::make_unique(FileTracker{filepath.generic_string(), {}, max_files}); + + g_signal_connect_data( + splitmux, + "format-location", + G_CALLBACK(generate_filename), + tracker.release(), + [](gpointer data, GClosure *) { + delete static_cast(data); + spdlog::debug("FileTracker memory successfully released"); + }, + static_cast(0) + ); + + g_object_set( + splitmux, + "max-size-time", + continuous ? max_size_time * GST_SECOND : 0, + "async-finalize", + false, + "muxer", + muxer, + nullptr + ); + + // Add new elements to pipeline + gst_bin_add_many(GST_BIN(_pipeline), record_parser, cf_parser, splitmux, nullptr); + + // Get pads + auto queue_srcpad = gst_element_get_static_pad(queue_record, "src"); + + gst_pad_add_probe( + queue_srcpad, + GST_PAD_PROBE_TYPE_IDLE, + []([[maybe_unused]] GstPad *pad, + [[maybe_unused]] GstPadProbeInfo *info, + gpointer user_data) -> GstPadProbeReturn { + auto stream = static_cast(user_data); + + if (!stream) { + return GST_PAD_PROBE_REMOVE; + } + + // bool expected = false; + // if (!stream->_recording_active.compare_exchange_strong(expected, true)) { + // return GST_PAD_PROBE_REMOVE; + // } + + auto pipeline = GST_BIN(stream->_pipeline); + + auto queue_record = gst_bin_get_by_name(pipeline, "queue_record"); + auto fakesink = gst_bin_get_by_name(pipeline, "record_sink"); + auto record_parser = gst_bin_get_by_name(pipeline, "record_parser"); + auto cf_parser = gst_bin_get_by_name(pipeline, "cf_record_parser"); + auto splitmux = gst_bin_get_by_name(pipeline, "splitmux"); + + if (!queue_record || !fakesink || !record_parser || !cf_parser || !splitmux) { + spdlog::error("start_h265_recording idle probe: elements not found"); + if (queue_record) gst_object_unref(queue_record); + if (fakesink) gst_object_unref(fakesink); + if (record_parser) gst_object_unref(record_parser); + if (cf_parser) gst_object_unref(cf_parser); + if (splitmux) gst_object_unref(splitmux); + return GST_PAD_PROBE_REMOVE; + } + + auto queue_srcpad = gst_element_get_static_pad(queue_record, "src"); + auto fakesink_sinkpad = gst_element_get_static_pad(fakesink, "sink"); + + // Unlink queue from fakesink + gst_pad_unlink(queue_srcpad, fakesink_sinkpad); + + // Remove and destroy fakesink + gst_element_set_state(fakesink, GST_STATE_NULL); + gst_bin_remove(pipeline, fakesink); + + // Link new recording chain: queue_record -> record_parser -> cf_parser -> splitmux + if (!gst_element_link_many( + queue_record, record_parser, cf_parser, splitmux, nullptr + )) { + spdlog::error("Failed to link recording elements"); + } + + // Sync states with parent (pipeline is PLAYING) + gst_element_sync_state_with_parent(record_parser); + gst_element_sync_state_with_parent(cf_parser); + gst_element_sync_state_with_parent(splitmux); + + // Flush pre-recorded buffer to record_parser + { + std::lock_guard lock(stream->_pre_record_mutex); + + size_t start_index = 0; + for (size_t i = 0; i < stream->_pre_record_buffer.size(); ++i) { + auto buf = stream->_pre_record_buffer[i]; + if (buf && !GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLAG_DELTA_UNIT)) { + start_index = i; + break; + } + } + + size_t frames_to_flush = stream->_pre_record_buffer.size() - start_index; + spdlog::info( + "Flushing {} pre-recorded frames to record_parser (starting from keyframe " + "at index " + "{}, skipping {} frames)", + frames_to_flush, + start_index, + start_index + ); + + auto record_parser_sinkpad = gst_element_get_static_pad(record_parser, "sink"); + if (record_parser_sinkpad) { + // 1. Forward sticky STREAM_START from queue + GstEvent *stream_start_event = + gst_pad_get_sticky_event(queue_srcpad, GST_EVENT_STREAM_START, 0); + if (stream_start_event) { + gst_pad_send_event(record_parser_sinkpad, stream_start_event); + } else { + gst_pad_send_event( + record_parser_sinkpad, gst_event_new_stream_start("pre-record") + ); + } + + // 2. Forward sticky CAPS from queue + GstEvent *caps_event = + gst_pad_get_sticky_event(queue_srcpad, GST_EVENT_CAPS, 0); + if (caps_event) { + gst_pad_send_event(record_parser_sinkpad, caps_event); + } else { + auto input_caps = gst_caps_new_simple( + "video/x-h265", + "stream-format", + G_TYPE_STRING, + "byte-stream", + "alignment", + G_TYPE_STRING, + "au", + nullptr + ); + gst_pad_send_event( + record_parser_sinkpad, gst_event_new_caps(input_caps) + ); + gst_caps_unref(input_caps); + } + + // 3. Forward sticky SEGMENT from queue (This is the magic fix!) + GstEvent *seg_event = + gst_pad_get_sticky_event(queue_srcpad, GST_EVENT_SEGMENT, 0); + if (seg_event) { + gst_pad_send_event(record_parser_sinkpad, seg_event); + spdlog::debug("Pushed original sticky segment from queue"); + } else { + GstSegment segment; + gst_segment_init(&segment, GST_FORMAT_TIME); + gst_pad_send_event( + record_parser_sinkpad, gst_event_new_segment(&segment) + ); + spdlog::warn("No sticky segment found, pushing default"); + } + + // --> Continue with the existing logic to push pre-recorded frames + size_t pushed_count = 0; + for (size_t i = start_index; i < stream->_pre_record_buffer.size(); ++i) { + auto buffer = stream->_pre_record_buffer[i]; + if (buffer && GST_IS_BUFFER(buffer) && + GST_MINI_OBJECT_REFCOUNT_VALUE(buffer) >= 1) { + GstFlowReturn ret = + gst_pad_chain(record_parser_sinkpad, gst_buffer_ref(buffer)); + if (ret != GST_FLOW_OK) { + spdlog::warn( + "Failed to push pre-recorded buffer {}: {}", + i, + gst_flow_get_name(ret) + ); + } else { + pushed_count++; + } + } else if (buffer) { + spdlog::warn("Recording flush: invalid buffer at index {}", i); + } + } + spdlog::info("Successfully pushed {} pre-recorded frames", pushed_count); + gst_object_unref(record_parser_sinkpad); + } else { + spdlog::error("Failed to get record_parser sink pad for pre-record flush"); + } + + // Unref all buffers with validation + size_t unreffed = 0; + for (auto buffer : stream->_pre_record_buffer) { + if (buffer && GST_IS_BUFFER(buffer)) { + if (GST_MINI_OBJECT_REFCOUNT_VALUE(buffer) >= 1) { + gst_buffer_unref(buffer); + unreffed++; + } else { + spdlog::warn( + "Recording flush cleanup: buffer has invalid refcount" + ); + } + } + } + spdlog::debug("Recording flush: unreffed {} buffers", unreffed); + stream->_pre_record_buffer.clear(); + stream->_last_keyframe_index = 0; + } + + gst_object_unref(queue_srcpad); + gst_object_unref(fakesink_sinkpad); + if (queue_record) gst_object_unref(queue_record); + if (record_parser) gst_object_unref(record_parser); + if (cf_parser) gst_object_unref(cf_parser); + if (splitmux) gst_object_unref(splitmux); + if (fakesink) gst_object_unref(fakesink); + + stream->_recording_active = true; + return GST_PAD_PROBE_REMOVE; + }, + this, + nullptr + ); + + gst_object_unref(queue_srcpad); + gst_object_unref(queue_record); + gst_object_unref(fakesink); + } + + void stop_h265_recording() + { + if (!_pipeline) { + spdlog::error("Pipeline is null"); + return; } + + // Run pipeline modification on Qt main thread to avoid race condition + // with Qt's rendering thread (gstqt6d3d11 plugin) + if (QThread::currentThread() != QCoreApplication::instance()->thread()) { + spdlog::info("stop_h265_recording: dispatching to Qt main thread"); + QMetaObject::invokeMethod( + QCoreApplication::instance(), + [this]() { stop_h265_recording_impl(); }, + Qt::BlockingQueuedConnection + ); + return; + } + + stop_h265_recording_impl(); + } + + void stop_h265_recording_impl() + { + _recording_active = false; + + auto queue_record = gst_bin_get_by_name(GST_BIN(_pipeline), "queue_record"); + if (!queue_record) { + spdlog::error("queue_record not found"); + return; + } + + auto queue_srcpad = gst_element_get_static_pad(queue_record, "src"); + gst_object_ref(_pipeline); + + gst_pad_add_probe( + queue_srcpad, + GST_PAD_PROBE_TYPE_IDLE, + []([[maybe_unused]] GstPad *pad, + [[maybe_unused]] GstPadProbeInfo *info, + gpointer user_data) -> GstPadProbeReturn { + auto pipeline = GST_PIPELINE(user_data); + + spdlog::info("stop_h265_recording: idle probe triggered"); + + auto queue_record = gst_bin_get_by_name(GST_BIN(pipeline), "queue_record"); + auto record_parser = gst_bin_get_by_name(GST_BIN(pipeline), "record_parser"); + auto cf_parser = gst_bin_get_by_name(GST_BIN(pipeline), "cf_record_parser"); + auto splitmux = gst_bin_get_by_name(GST_BIN(pipeline), "splitmux"); + + if (!splitmux) { + spdlog::warn("Recording elements not found, may already be stopped"); + if (queue_record) gst_object_unref(queue_record); + if (record_parser) gst_object_unref(record_parser); + if (cf_parser) gst_object_unref(cf_parser); + return GST_PAD_PROBE_REMOVE; + } + + // Unlink the FULL recording chain + gst_element_unlink_many(queue_record, record_parser, cf_parser, splitmux, nullptr); + + // Create and add new fakesink FIRST (so queue has somewhere to go) + auto fakesink = gst_element_factory_make("fakesink", "record_sink"); + g_object_set(fakesink, "async", FALSE, "sync", FALSE, nullptr); + gst_bin_add(GST_BIN(pipeline), fakesink); + + if (!gst_element_link(queue_record, fakesink)) { + spdlog::error("Failed to link queue_record to fakesink"); + } + gst_element_sync_state_with_parent(fakesink); + + // Send EOS to splitmux to finalize the file + auto splitmux_sinkpad = gst_element_get_static_pad(splitmux, "video"); + if (splitmux_sinkpad) { + gst_pad_send_event(splitmux_sinkpad, gst_event_new_eos()); + gst_object_unref(splitmux_sinkpad); + } + + // Set record_parser and cf_parser to NULL and remove them now + // (they're already unlinked) + gst_element_set_state(record_parser, GST_STATE_NULL); + gst_element_set_state(cf_parser, GST_STATE_NULL); + gst_bin_remove_many(GST_BIN(pipeline), record_parser, cf_parser, nullptr); + + // DON'T set splitmux to NULL here - let bus_handler handle EOS + + gst_object_unref(queue_record); + gst_object_unref(record_parser); + gst_object_unref(cf_parser); + gst_object_unref(splitmux); + + spdlog::info("H.265 recording stop initiated, waiting for EOS"); + + return GST_PAD_PROBE_REMOVE; + }, + _pipeline, + [](gpointer data) { + if (data) { + gst_object_unref(GST_OBJECT(data)); + } + } + ); + + gst_object_unref(queue_srcpad); + gst_object_unref(queue_record); } }; \ No newline at end of file diff --git a/thorvision/src/main.cc b/thorvision/src/main.cc index a8c03b9..7fbed21 100644 --- a/thorvision/src/main.cc +++ b/thorvision/src/main.cc @@ -1,176 +1,182 @@ -#include - -#include - -#ifdef _WIN32 -#include -#endif -#include -#include -#include -#include -#include -#include - -#include "CameraModel.h" -#include "HttpServer.h" -#include "Recorder.h" -#include "RecorderSettings.h" -#include "Server.h" -#include "WebSocketClient.h" - -using json = nlohmann::json; - -void setup_gst_plugin_path(const QCoreApplication &app) -{ -#ifdef _WIN32 - auto gst_plugin_dir = - fmt::format("{}/../plugins/gstreamer", app.applicationDirPath().toStdString()); - auto const default_plugin_dir = - "C:\\Program Files\\gstreamer\\1.0\\msvc_x86_64\\lib\\gstreamer-1.0"; -#elif __APPLE__ - auto gst_plugin_dir = - fmt::format("{}/../PlugIns/gstreamer", app.applicationDirPath().toStdString()); - auto const default_plugin_dir = - "/Library/Frameworks/GStreamer.framework/Versions/Current/lib/gstreamer-1.0"; -#endif - - if (std::filesystem::exists(gst_plugin_dir)) { - spdlog::info("set GST_PLUGIN_PATH to: {}", gst_plugin_dir); - g_setenv("GST_PLUGIN_PATH", gst_plugin_dir.c_str(), true); - return; - } - - auto const env_path = g_getenv("GST_PLUGIN_PATH"); - if (env_path && *env_path) { - spdlog::info("Set GST_PLUGIN_PATH to: {}", env_path); - g_setenv("GST_PLUGIN_PATH", env_path, true); - return; - } - - spdlog::info("Set GST_PLUGIN_PATH to default: {}", default_plugin_dir); - g_setenv("GST_PLUGIN_PATH", default_plugin_dir, true); -} - -int main(int argc, char *argv[]) -{ - QGuiApplication app(argc, argv); - - setup_gst_plugin_path(app); - gst_init(&argc, &argv); - -#ifdef _WIN32 - QQuickStyle::setStyle("Fusion"); - QQuickWindow::setGraphicsApi(QSGRendererInterface::Direct3D11); - // register GstD3D11Qt6VideoItem as qml element - if (auto sink = gst_element_factory_make("qml6d3d11sink", nullptr)) { - gst_object_unref(sink); - } -#else - QQuickWindow::setGraphicsApi(QSGRendererInterface::OpenGL); - // register GstGLQt6VideoItem as qml element - if (auto sink = gst_element_factory_make("qml6glsink", nullptr)) { - gst_object_unref(sink); - } -#endif - - auto loop = g_main_loop_new(nullptr, false); - - QQmlApplicationEngine engine; - - auto camera_model = new CameraModel(&app); - auto recorder_settings = new RecorderSettings(&app); - auto recorder = new Recorder(camera_model, recorder_settings, &app); - auto server = new Server(&app); - auto ws_client = new WebSocketClient(&app); - HttpServer http_server(recorder); - - auto root_context = engine.rootContext(); - root_context->setContextProperty("CameraModel", camera_model); - root_context->setContextProperty("Recorder", recorder); - root_context->setContextProperty("RecorderSettings", recorder_settings); - root_context->setContextProperty("Server", server); - - const QUrl url(QStringLiteral("qrc:/qt/qml/App/Theme/src/main.qml")); - QObject::connect( - &engine, - &QQmlApplicationEngine::objectCreationFailed, - &app, - []() { QCoreApplication::exit(-1); }, - Qt::QueuedConnection - ); - - engine.load(url); - if (engine.rootObjects().isEmpty()) return -1; - - QObject::connect(server, &Server::status_change, [camera_model](bool connected) { - if (connected) { - for (auto *cam : Camera::cameras()) { - camera_model->add_camera(cam); - } - } else { - for (auto i = camera_model->rowCount() - 1; i >= 0; --i) { - camera_model->remove_camera(i); - } - } - }); - QObject::connect( - ws_client, - &WebSocketClient::camera_added, - camera_model, - [camera_model](const json &camera_json) { - const auto &camera = Camera::parse(camera_json); - camera_model->add_camera(camera); - } - ); - QObject::connect( - ws_client, - &WebSocketClient::camera_removed, - camera_model, - [camera_model, recorder](const int id) { - auto const index = camera_model->index_of_camera_id(id); - if (index == -1) { - spdlog::error("Camera: id {} not found", id); - return; - } - - auto const model_index = camera_model->index(index); - auto const camera_name = - camera_model->data(model_index, CameraModel::NameRole).toString(); - camera_model->remove_camera(index); - if (recorder->recording()) { - emit camera_model->camera_unplugged_during_recording(camera_name); - } - } - ); - - auto root_object = static_cast(engine.rootObjects().first()); - assert(root_object != nullptr && "[qml] Could not find qml root object"); - - auto video_layout = root_object->findChild("video_layout"); - assert(video_layout != nullptr && "[qml] Could not find video_layout"); - - auto repeater = video_layout->findChild("repeater"); - assert(repeater != nullptr && "[qml] Could not find repeater"); - - QObject::connect( - repeater, - SIGNAL(itemAdded(int, QQuickItem *)), - camera_model, - SLOT(onItemAdded(int, QQuickItem *)) - ); - - std::signal(SIGINT, [](int) { QCoreApplication::quit(); }); - - std::jthread gst_thread([loop]() { - spdlog::info("Run g_main_loop thread"); - g_main_loop_run(loop); - spdlog::info("Quit g_main_loop thread"); - g_main_loop_unref(loop); - }); - - auto result = app.exec(); - g_main_loop_quit(loop); - - return result; +#include + +#include + +#ifdef _WIN32 +#include +#endif +#include +#include +#include +#include +#include +#include + +#include "CameraModel.h" +#include "HttpServer.h" +#include "Recorder.h" +#include "RecorderSettings.h" +#include "Server.h" +#include "WebSocketClient.h" + +using json = nlohmann::json; + +void setup_gst_plugin_path(const QCoreApplication &app) +{ +#ifdef _WIN32 + auto gst_plugin_dir = + fmt::format("{}/../plugins/gstreamer", app.applicationDirPath().toStdString()); + auto const default_plugin_dir = + "C:\\Program Files\\gstreamer\\1.0\\msvc_x86_64\\lib\\gstreamer-1.0"; +#elif __APPLE__ + auto gst_plugin_dir = + fmt::format("{}/../PlugIns/gstreamer", app.applicationDirPath().toStdString()); + auto const default_plugin_dir = + "/Library/Frameworks/GStreamer.framework/Versions/Current/lib/gstreamer-1.0"; +#endif + + if (std::filesystem::exists(gst_plugin_dir)) { + spdlog::info("set GST_PLUGIN_PATH to: {}", gst_plugin_dir); + g_setenv("GST_PLUGIN_PATH", gst_plugin_dir.c_str(), true); + return; + } + + auto const env_path = g_getenv("GST_PLUGIN_PATH"); + if (env_path && *env_path) { + spdlog::info("Set GST_PLUGIN_PATH to: {}", env_path); + g_setenv("GST_PLUGIN_PATH", env_path, true); + return; + } + + spdlog::info("Set GST_PLUGIN_PATH to default: {}", default_plugin_dir); + g_setenv("GST_PLUGIN_PATH", default_plugin_dir, true); +} + +int main(int argc, char *argv[]) +{ + QGuiApplication app(argc, argv); + + setup_gst_plugin_path(app); + gst_init(&argc, &argv); + +#ifdef _WIN32 + QQuickStyle::setStyle("Fusion"); + QQuickWindow::setGraphicsApi(QSGRendererInterface::Direct3D11); + // register GstD3D11Qt6VideoItem as qml element + if (auto sink = gst_element_factory_make("qml6d3d11sink", nullptr)) { + gst_object_unref(sink); + } +#else + QQuickWindow::setGraphicsApi(QSGRendererInterface::OpenGL); + // register GstGLQt6VideoItem as qml element + if (auto sink = gst_element_factory_make("qml6glsink", nullptr)) { + gst_object_unref(sink); + } +#endif + + auto loop = g_main_loop_new(nullptr, false); + + QQmlApplicationEngine engine; + +#ifdef GSTREAMER_QML_DEBUG_PATH + QString qmlPath = QCoreApplication::applicationDirPath() + "/" + GSTREAMER_QML_DEBUG_PATH; + engine.addImportPath(qmlPath); + spdlog::info("Added QML import path: {}", qmlPath.toStdString()); +#endif + + auto camera_model = new CameraModel(&app); + auto recorder_settings = new RecorderSettings(&app); + auto recorder = new Recorder(camera_model, recorder_settings, &app); + auto server = new Server(&app); + auto ws_client = new WebSocketClient(&app); + HttpServer http_server(recorder); + + auto root_context = engine.rootContext(); + root_context->setContextProperty("CameraModel", camera_model); + root_context->setContextProperty("Recorder", recorder); + root_context->setContextProperty("RecorderSettings", recorder_settings); + root_context->setContextProperty("Server", server); + + const QUrl url(QStringLiteral("qrc:/qt/qml/App/Theme/src/main.qml")); + QObject::connect( + &engine, + &QQmlApplicationEngine::objectCreationFailed, + &app, + []() { QCoreApplication::exit(-1); }, + Qt::QueuedConnection + ); + + engine.load(url); + if (engine.rootObjects().isEmpty()) return -1; + + QObject::connect(server, &Server::status_change, [camera_model](bool connected) { + if (connected) { + for (auto *cam : Camera::cameras()) { + camera_model->add_camera(cam); + } + } else { + for (auto i = camera_model->rowCount() - 1; i >= 0; --i) { + camera_model->remove_camera(i); + } + } + }); + QObject::connect( + ws_client, + &WebSocketClient::camera_added, + camera_model, + [camera_model](const json &camera_json) { + const auto &camera = Camera::parse(camera_json); + camera_model->add_camera(camera); + } + ); + QObject::connect( + ws_client, + &WebSocketClient::camera_removed, + camera_model, + [camera_model, recorder](const int id) { + auto const index = camera_model->index_of_camera_id(id); + if (index == -1) { + spdlog::error("Camera: id {} not found", id); + return; + } + + auto const model_index = camera_model->index(index); + auto const camera_name = + camera_model->data(model_index, CameraModel::NameRole).toString(); + camera_model->remove_camera(index); + if (recorder->recording()) { + emit camera_model->camera_unplugged_during_recording(camera_name); + } + } + ); + + auto root_object = static_cast(engine.rootObjects().first()); + assert(root_object != nullptr && "[qml] Could not find qml root object"); + + auto video_layout = root_object->findChild("video_layout"); + assert(video_layout != nullptr && "[qml] Could not find video_layout"); + + auto repeater = video_layout->findChild("repeater"); + assert(repeater != nullptr && "[qml] Could not find repeater"); + + QObject::connect( + repeater, + SIGNAL(itemAdded(int, QQuickItem *)), + camera_model, + SLOT(onItemAdded(int, QQuickItem *)) + ); + + std::signal(SIGINT, [](int) { QCoreApplication::quit(); }); + + std::jthread gst_thread([loop]() { + spdlog::info("Run g_main_loop thread"); + g_main_loop_run(loop); + spdlog::info("Quit g_main_loop thread"); + g_main_loop_unref(loop); + }); + + auto result = app.exec(); + g_main_loop_quit(loop); + + return result; } \ No newline at end of file