2016-06-09 8 views
1

Mein Ziel ist es, eine einfache benutzerdefinierte Senke zu erstellen, die Daten aus einer Pipeline empfangen kann, die dann für verschiedene Anwendungen (Aufzeichnung, Broadcasting, interne Pufferung usw.) verwendet werden soll.QtGstreamer Appsink: hängt und langsame/unbrauchbare Samples

In meinem ersten Versuch ist die Idee, eine Http (s)/Udp/etc. streaming über Http wieder, also verwende ich souphttpsrc, eine Warteschlange und QHttp, um die Daten an einen oder mehrere Clients zu liefern.

Die Anwendungen scheint zu funktionieren, da ich die Pipeline mit meiner benutzerdefinierten Senke (die standardmäßig ignoriert jedes Beispiel, bis mindestens ein Client verbunden ist) funktioniert, so kopiere ich einfach die Beispiele in der Clientantwort.

Was wirklich seltsam ist, ist, dass die Geschwindigkeit des Downloads wirklich wirklich wirklich langsamer ist (5-10 KB/s statt 2-300KB/s, wie erwartet) als die eingehende Streaming-Geschwindigkeit und darüber hinaus scheinen die Daten vollständig zu sein unbrauchbar. Ich habe versucht, ein Dekodebin in der Pipeline vor der Senke einzufügen und die Geschwindigkeit erreicht 33 MBit/s, so würde ich Leistungsprobleme durch den Queued Aufruf der Methode im Hauptthread ausgeschlossen, verantwortlich für das Senden der eigentlichen Probe über das Netzwerk; Selbst in diesem Fall scheinen die Daten nur Müll zu sein.

An zweiter Stelle wenn ich ein T-Stück (oder multiqueue) hinzufüge, um parallel einen autovideosink und/oder eine autoaudiosink zu laufen, bleibt die Pipeline beim Start stecken (dies passiert nicht nur mit zwei Autosinks) und wieder der Defekt ist das gleiche, wenn ich versuche, nur zwei benutzerdefinierte Appsink mit dem Tee zu verknüpfen.

Das Debuggen des Codes scheint, dass newSample() nie aufgerufen wird, wenn die Pipeline hängt, selbst wenn die letzte Nachricht, die auf dem Bus empfangen wird, die Änderung des Spielstatus der benutzerdefinierten Senke ist.

Vielen Dank!

Hier den Code:

QGst :: FlowReturn MultiHttpSink :: newSample() {

 QGst::SamplePtr sample = pullSample(); 
     char data[sample->buffer()->size()]; 
     qDebug() << "New Sample: size " << sample->buffer()->size(); 
     qDebug() << sample->buffer()->duration().toTime(); 
     qDebug() << sample->buffer()->presentationTimeStamp().toTime(); 

     sample->buffer()->extract(0, &data, sample->buffer()->size()); 
     for (auto it = resources.begin(); it != resources.end(); it++) 
      QMetaObject::invokeMethod(&*server, "writeToHttpResource", Qt::QueuedConnection, Q_ARG(QPointer<qhttp::server::QHttpResponse>, *it), Q_ARG(QByteArray, data)); 

     return QGst::FlowOk; 
    } 


    void Server::setupServer() { 
     server = new QHttpServer(app); 
     server->listen(QHostAddress::Any, 8080, [&](QHttpRequest* req, QHttpResponse * res) { 

      res->setStatusCode(qhttp::ESTATUS_OK); // http status 200 
      res->addHeader("Content-Type", "video/mp2t"); 

      m_sink.addAudience(res); 
     }); 

     if (!server->isListening()) { 
      fprintf(stderr, "failed. can not listen at port 8080!\n"); 
      throw std::exception(); 
     } 

    void Server::setupPipeline() { 

     /* source pipeline */ 
     QString pipe1Descr = QString(
          "souphttpsrc location=\"%1\" ! " 
       "queue ! tee name=splitter " 
       "splitter.! decodebin ! autoaudiosink " 
       "splitter.! decodebin ! autovideosink " 
       "splitter.! appsink name=\"mysink\" " 
          ).arg("URL"); 
     pipeline1 = QGst::Parse::launch(pipe1Descr).dynamicCast<QGst::Pipeline>(); 
     m_sink.setElement(pipeline1->getElementByName("mysink")); 

     QGlib::connect(pipeline1->bus(), "message", this, &Server::onBusMessage); 
     pipeline1->bus()->addSignalWatch(); 
     /* start playing */ 
     pipeline1->setState(QGst::StatePlaying); 

    } 



    QString stateString(QGst::State state) { 
     switch (state) { 
      case 0: 
       return "Void Pending"; 
      case 1: 
       return "Null"; 
      case 2: 
       return "Ready"; 
      case 3: 
       return "Playing"; 
      case 4: 
       return "Paused"; 
     } 
     return ""; 
    } 

    QString streamStateString(QGst::StreamStatusType type) { 
     switch (type) { 
      case 0: 
       return "Create"; 
      case 1: 
       return "Enter"; 
      case 2: 
       return "Leave"; 
      case 3: 
       return "Destroy"; 
      case 4: 
       return "Start"; 
      case 5: 
       return "Pause"; 
      case 6: 
       return "Stop"; 
     } 
    } 

    void Server::onBusMessage(const QGst::MessagePtr& message) { 

     switch (message->type()) { 
      case QGst::MessageEos: 
       app->quit(); 
       break; 
      case QGst::MessageError: 
       qCritical() << message.staticCast<QGst::ErrorMessage>()->error(); 
       qCritical() << message.staticCast<QGst::ErrorMessage>()->debugMessage(); 
       break; 
      case QGst::MessageStateChanged: 
      { 
       QGlib::RefPointer<QGst::StateChangedMessage> msg = message.staticCast<QGst::StateChangedMessage>(); 
       qDebug() << msg->source()->name() <<": State changed from " << stateString(msg->oldState()) 
         << " -> " << stateString(msg->newState()) << " Transitioning to " << stateString(msg->pendingState()); 
       if(msg->source()->name().compare("pipeline0")==0){ 
        if(msg->newState() == QGst::StatePaused){ 
         need_reset = true; 
         playing = false; 
        } else { 
         need_reset = false; 
         if(msg->newState() == QGst::StatePlaying){ 
          playing = true; 
         } 
        } 
       } 
       break; 
      } 
      case QGst::MessageStreamStatus: 
      { 
       QGlib::RefPointer<QGst::StreamStatusMessage> msg = message.staticCast<QGst::StreamStatusMessage>(); 
       qDebug() << "Stream Status: " << streamStateString(msg->statusType()); 
       break; 
      } 
      default: 
       qDebug() << "Unhandles Bus Message Type: " << message->typeName(); 
       break; 
     } 
    } 

    void Server::writeToHttpResource(QPointer<qhttp::server::QHttpResponse> res, QByteArray data) { 
     qDebug() << "Writing data..."; 
     res->write(data); 
    } 

UPDATE

Das Debug-Protokoll von GStreamer ist etwas Nützliches Berichterstattung ...

0:00:00.733339246 [335m 8248[00m  0x2747450 [37mDEBUG [00m [00m   souphttpsrc gstsouphttpsrc.c:1430:gst_soup_http_src_chunk_allocator:<souphttpsrc0>[00m alloc 4096 bytes <= 18446744073709551615 
    0:00:00.733350762 [335m 8248[00m  0x274a8f0 [37mDEBUG [00m [00m     tee gsttee.c:774:gst_tee_chain:<splitter>[00m received buffer 0x7f69d000fb80 
    0:00:00.733353629 [335m 8248[00m  0x2747450 [37mDEBUG [00m [00;01;34m   GST_MEMORY gstmemory.c:138:gst_memory_init:[00m new memory 0x7f69d0015120, maxsize:4103 offset:0 size:4096 
    0:00:00.733361430 [335m 8248[00m  0x274a8f0 [37mDEBUG [00m [00;01;35m  GST_SCHEDULING gstpad.c:4174:gst_pad_chain_data_unchecked:<mysink2:sink>[00m calling chainfunction &gst_base_sink_chain with buffer buffer: 0x7f69d000fb80, pts 99:99:99.999999999, dts 99:99:99.999999999, dur 99:99:99.999999999, size 1430, offset 15582, offset_end none, flags 0x0 

Die Größe der ankommenden Netzwerkpakete ist 4096, aber die Größe des Puffers ist 1430 die meiste Zeit ... Untersuchung .... ich denke, ich sollte auf den Puffer auf eine andere Weise zugreifen, um Zugriff auf Rohdaten zu erhalten.

Die Größe der Netzwerkpakete ist 1430, 4096 der Pufferbereich zu sein scheint ...

0:00:29.316301105 [335m 8248[00m  0x2747450 [37mDEBUG [00m [00m   souphttpsrc gstsouphttpsrc.c:1482:gst_soup_http_src_got_chunk_cb:<souphttpsrc0>[00m got chunk of 1430 bytes 

Warum die Übertragung ist langsam und sinnlos bleibt vorerst ein Geheimnis. Warum hängt es auch!

UPDATE 2

Ich habe auch versucht auf diese Weise, gleiche Ergebnisse (und gleiche Größen berichteten):

QGst::FlowReturn MultiHttpSink::newSample() { 

     QGst::SamplePtr sample = pullSample(); 
     QGst::BufferPtr buffer = sample->buffer(); 

     qDebug() << "New Sample: size " << buffer->size(); 

     QGst::MapInfo map; 

     if (!buffer->map(map, QGst::MapRead)) 
      return QGst::FlowError; 

     qDebug() << "New Buffer Map: size " << map.size(); 


     for (auto it = resources.begin(); it != resources.end(); it++) 
      QMetaObject::invokeMethod(&*server, "writeToHttpResource", Qt::QueuedConnection, Q_ARG(QPointer<qhttp::server::QHttpResponse>, *it), Q_ARG(QByteArray, (char *)map.data())); 

     buffer->unmap(map); 

     return QGst::FlowOk; 
    } 

Hier ist der Ausgang der letzten Version, die gleiche wie zuvor:

... ... 
New Sample: size 1430 
New Buffer Map: size 1430 
QTime("23:34:33.709") 
QTime("23:34:33.709") 
Writing data... 
New Sample: size 4096 
New Buffer Map: size 4096 
QTime("23:34:33.709") 
QTime("23:34:33.709") 
Writing data... 
... ... 

UPDATE 3

Dies ist das Protokoll zwischen dem einzelnen Datenblock:

New Sample: size 1430 
    New Buffer Map: size 1430 
    QTime("23:34:33.709") 
    QTime("23:34:33.709") 
    0:04:28.221638796 [335m28768[00m  0xd8a8f0 [37mDEBUG [00m [00m   basesink gstbasesink.c:3566:gst_base_sink_chain_unlocked:<mysink2>[00m object unref after render 0x7f894400f850 
    0:04:28.221653137 [335m28768[00m  0xd8a8f0 [37mDEBUG [00m [00;01;35m  GST_SCHEDULING gstpad.c:4180:gst_pad_chain_data_unchecked:<mysink2:sink>[00m called chainfunction &gst_base_sink_chain with buffer 0x7f894400f850, returned ok 
    0:04:28.221667131 [335m28768[00m  0xd8a8f0 [37mDEBUG [00m [00m     tee gsttee.c:778:gst_tee_chain:<splitter>[00m handled buffer ok 
    0:04:28.221676164 [335m28768[00m  0xd8a8f0 [37mDEBUG [00m [00;01;35m  GST_SCHEDULING gstpad.c:4180:gst_pad_chain_data_unchecked:<splitter:sink>[00m called chainfunction &gst_tee_chain with buffer 0x7f894400f850, returned ok 
    0:04:28.221687738 [335m28768[00m  0xd8a8f0 [37mDEBUG [00m [00m  queue_dataflow gstqueue.c:1482:gst_queue_loop:<queue0>[00m queue is empty 
    0:04:28.222861204 [335m28768[00m  0xd87450 [37mDEBUG [00m [00m   souphttpsrc gstsouphttpsrc.c:1430:gst_soup_http_src_chunk_allocator:<souphttpsrc0>[00m alloc 4096 bytes <= 18446744073709551615 
    0:04:28.222896720 [335m28768[00m  0xd87450 [37mDEBUG [00m [00;01;34m   GST_MEMORY gstmemory.c:138:gst_memory_init:[00m new memory 0x7f8944012fe0, maxsize:4103 offset:0 size:4096 
    0:04:28.222926663 [335m28768[00m  0xd87450 [37mDEBUG [00m [00m   souphttpsrc gstsouphttpsrc.c:1482:gst_soup_http_src_got_chunk_cb:<souphttpsrc0>[00m got chunk of 1430 bytes 
    0:04:28.222956032 [335m28768[00m  0xd87450 [37mDEBUG [00m [00m    basesrc gstbasesrc.c:2316:gst_base_src_do_sync:<souphttpsrc0>[00m no sync needed 
    0:04:28.222968263 [335m28768[00m  0xd87450 [37mDEBUG [00m [00m    basesrc gstbasesrc.c:2520:gst_base_src_get_range:<souphttpsrc0>[00m buffer ok 
    0:04:28.222978663 [335m28768[00m  0xd87450 [37mDEBUG [00m [00;01;35m  GST_SCHEDULING gstpad.c:4174:gst_pad_chain_data_unchecked:<queue0:sink>[00m calling chainfunction &gst_queue_chain with buffer buffer: 0x7f894400f630, pts 99:99:99.999999999, dts 99:99:99.999999999, dur 99:99:99.999999999, size 1430, offset 20789192, offset_end none, flags 0x0 
    0:04:28.223004500 [335m28768[00m  0xd87450 [37mDEBUG [00m [00;01;35m  GST_SCHEDULING gstpad.c:4180:gst_pad_chain_data_unchecked:<queue0:sink>[00m called chainfunction &gst_queue_chain with buffer 0x7f894400f630, returned ok 
    0:04:28.223013700 [335m28768[00m  0xd8a8f0 [37mDEBUG [00m [00m  queue_dataflow gstqueue.c:1494:gst_queue_loop:<queue0>[00m queue is not empty 
    0:04:28.223031507 [335m28768[00m  0xd87450 [37mDEBUG [00m [00m    basesrc gstbasesrc.c:2355:gst_base_src_update_length:<souphttpsrc0>[00m reading offset 20790622, length 4096, size -1, segment.stop -1, maxsize -1 
    0:04:28.223066379 [335m28768[00m  0xd87450 [37mDEBUG [00m [00m    basesrc gstbasesrc.c:2456:gst_base_src_get_range:<souphttpsrc0>[00m calling create offset 20790622 length 4096, time 0 
    0:04:28.223056071 [335m28768[00m  0xd8a8f0 [37mDEBUG [00m [00;01;35m  GST_SCHEDULING gstpad.c:4174:gst_pad_chain_data_unchecked:<splitter:sink>[00m calling chainfunction &gst_tee_chain with buffer buffer: 0x7f894400f630, pts 99:99:99.999999999, dts 99:99:99.999999999, dur 99:99:99.999999999, size 1430, offset 20789192, offset_end none, flags 0x0 
    0:04:28.223111831 [335m28768[00m  0xd8a8f0 [37mDEBUG [00m [00m     tee gsttee.c:774:gst_tee_chain:<splitter>[00m received buffer 0x7f894400f630 
    0:04:28.223121952 [335m28768[00m  0xd87450 [37mDEBUG [00m [00m   souphttpsrc gstsouphttpsrc.c:1430:gst_soup_http_src_chunk_allocator:<souphttpsrc0>[00m alloc 4096 bytes <= 18446744073709551615 
    0:04:28.223139071 [335m28768[00m  0xd8a8f0 [37mDEBUG [00m [00;01;35m  GST_SCHEDULING gstpad.c:4174:gst_pad_chain_data_unchecked:<mysink2:sink>[00m calling chainfunction &gst_base_sink_chain with buffer buffer: 0x7f894400f630, pts 99:99:99.999999999, dts 99:99:99.999999999, dur 99:99:99.999999999, size 1430, offset 20789192, offset_end none, flags 0x0 
    0:04:28.223152343 [335m28768[00m  0xd87450 [37mDEBUG [00m [00;01;34m   GST_MEMORY gstmemory.c:138:gst_memory_init:[00m new memory 0x7f8944015120, maxsize:4103 offset:0 size:4096 
    0:04:28.223179317 [335m28768[00m  0xd8a8f0 [37mDEBUG [00m [00m   basesink gstbasesink.c:3409:gst_base_sink_chain_unlocked:<mysink2>[00m got times start: 99:99:99.999999999, end: 99:99:99.999999999 
    0:04:28.223208511 [335m28768[00m  0xd8a8f0 [37mDEBUG [00m [00m   basesink gstbasesink.c:1958:gst_base_sink_get_sync_times:<mysink2>[00m got times start: 99:99:99.999999999, stop: 99:99:99.999999999, do_sync 0 
    0:04:28.223222869 [335m28768[00m  0xd87450 [37mDEBUG [00m [00;01;34m   GST_MEMORY gstmemory.c:87:_gst_memory_free:[00m free memory 0x7f8944015120 
    0:04:28.223238437 [335m28768[00m  0xd8a8f0 [37mDEBUG [00m [00;04m    default gstsegment.c:731:gst_segment_to_running_time_full:[00m invalid position (-1) 
    0:04:28.223281513 [335m28768[00m  0xd8a8f0 [37mDEBUG [00m [00;04m    default gstsegment.c:731:gst_segment_to_running_time_full:[00m invalid position (-1) 
    0:04:28.223298205 [335m28768[00m  0xd8a8f0 [37mDEBUG [00m [00m   basesink gstbasesink.c:3520:gst_base_sink_chain_unlocked:<mysink2>[00m rendering object 0x7f894400f630 
    0:04:28.223312428 [335m28768[00m  0xd8a8f0 [37mDEBUG [00m [00m   basesink gstbasesink.c:946:gst_base_sink_set_last_buffer_unlocked:<mysink2>[00m setting last buffer to 0x7f894400f630 
    0:04:28.223321131 [335m28768[00m  0xd8a8f0 [37mDEBUG [00m [00;01;34m   GST_MEMORY gstmemory.c:87:_gst_memory_free:[00m free memory 0x7f8944014080 
    0:04:28.223344396 [335m28768[00m  0xd8a8f0 [37mDEBUG [00m [00;01;34m   GST_CAPS gstpad.c:2641:gst_pad_has_current_caps:<mysink2:sink>[00m check current pad caps (NULL) 
    0:04:28.223355939 [335m28768[00m  0xd8a8f0 [37mDEBUG [00m [00m    appsink gstappsink.c:760:gst_app_sink_render:<mysink2>[00m pushing render buffer 0x7f894400f630 on queue (0) 
    0:04:28.223369213 [335m28768[00m  0xd8a8f0 [37mDEBUG [00m [00m    appsink gstappsink.c:1286:gst_app_sink_pull_sample:<mysink2>[00m trying to grab a buffer 
    0:04:28.223377762 [335m28768[00m  0xd8a8f0 [37mDEBUG [00m [00m    appsink gstappsink.c:706:dequeue_buffer:<mysink2>[00m dequeued buffer 0x7f894400f630 
    0:04:28.223386131 [335m28768[00m  0xd8a8f0 [37mDEBUG [00m [00m    appsink gstappsink.c:1301:gst_app_sink_pull_sample:<mysink2>[00m we have a buffer 0x7f894400f630 
    New Sample: size 1430 
    New Buffer Map: size 1430 
    QTime("23:34:33.709") 
    QTime("23:34:33.709") 

Antwort

0

Ich habe endlich den mysteriösen Bug gefunden!

QMetaObject::invokeMethod(&*server, "writeToHttpResource", Qt::QueuedConnection, Q_ARG(QPointer<qhttp::server::QHttpResponse>, *it), Q_ARG(QByteArray, (char *)map.data())); 

Auch wenn dieser Code kompiliert es läuft schlecht, da dies:

Q_ARG(QByteArray, (char *)map.data()) 

Mit QByteArray (char *) ist falsch in diesem Fall, da wir sie nicht aus einem String initialisiert werden, sondern aus ein Stück Rohdaten. Geschrieben wie es war, wurde das Stück die ganze Zeit nach der neuen Zeilenkombination durchsucht, die zufällig den Datenblock schnitt.

Um dies zu vermeiden, ist der zu verwendende Konstruktor QByteArray (char * ptr, int size).

Dies ist die richtige Version des Verfahrens:

QMetaObject::invokeMethod(&*server, "writeToHttpResource", Qt::QueuedConnection, Q_ARG(QPointer<GssClientRequest>, *it), Q_ARG(QByteArray, QByteArray((char *) map.data(), map.size()))); 

waren keine so harte Fehler am Ende zu besiegen;)

Danke für die Unterstützung!

1

Add Warteschlange vor decodebin und nach T-Stück, T-Kontext ausgeführt werden muss.

"souphttpsrc location = \" % 1 \“! " "queue! Tee name = splitter" "Splitter.! Warteschlange! Decodebin! Autoaudiosink" "Splitter.! Warteschlange! Decodebin! Autovideosink" " splitter.! appsink name = \ "mysink \" "

letzter splitter, der mit appsink verbunden ist, benötigt keine warteschlange, er wird im kontext des source plugins erstellt, erstellt gsttask.

+0

Danke für die Wiederholung Ich hatte die Warteschlange wie vorgeschlagen hinzugefügt, aber das war nicht das Problem, ich werde die Lösung unten schreiben! – Gianks

+0

Aus Neugier wollte ich wissen, funktioniert es für Sie, ohne Warteschlange hinzuzufügen, nur mit der Lösung, die Sie unten erwähnt. –

+0

Nein, dieser Teil funktioniert immer noch nicht, Pipeline bleibt stecken, wenn ich noch etwas hinzufüge meine appsink.Ich werde eine neue Frage zu SO öffnen. – Gianks