2016-04-15 7 views
1

Ich habe kurento letzten paar Monate auf Arbeit und heute plötzlich begann ich die folgende Protokoll bekommen:Kurento: seltsame Protokoll erzeugt kontinuierlich „disconnectAll()“

[0x00007f40b9f6d700] debug KurentoMediaElementImpl MediaElementImpl.cpp:660 disconnectAll() <kmswebrtcendpoint3> Retry disconnect all <kmshubport3> 

dieses Protokoll kontinuierlich erzeugt und verursacht 100 % cpu-Nutzung mit 100 + mb der Erstellung mehrerer Protokolldateien.

Ich habe auch dieses Protokoll in einer anderen Protokolldatei

2016-04-15 20:05:47,914497 23751 [0x00007fd903de1700] error KurentoWorkerPool   WorkerPool.cpp:41 workerThreadLoop() Unexpected error while running the server: boost::filesystem::last_write_time: No such file or directory: "/var/log/kurento-media-server/media-server_2016-04-15_20-05-18.00049.pid23751.log" 
2016-04-15 20:05:47,914513 23751 [0x00007fd90f7fe700] debug KurentoMediaSet   MediaSet.cpp:470 async_delete() Destroying HubPort -> d1975230-7fcb-498e-8d4a-bfc7afbf8bf2_kurento.MediaPipeline/528783df-ac14-47bc-a7ad-8044721563b9_kurento.HubPort 
2016-04-15 20:05:47,914593 23751 [0x00007fd903de1700] debug KurentoWorkerPool   WorkerPool.cpp:37 workerThreadLoop() Working thread starting 
2016-04-15 20:05:47,917583 23751 [0x00007fd903de1700] debug KurentoMediaSet   MediaSet.cpp:470 async_delete() Destroying Composite -> d1975230-7fcb-498e-8d4a-bfc7afbf8bf2_kurento.MediaPipeline/63c84bc4-0d42-4299-ae22-219c0a603837_kurento.Composite 
2016-04-15 20:06:10,067442 23751 [0x00007fd9cbfff700] debug KurentoMediaSet   MediaSet.cpp:131 doGarbageCollection() Running garbage collector 
2016-04-15 20:10:10,067626 23751 [0x00007fd9cbfff700] debug KurentoMediaSet   MediaSet.cpp:131 doGarbageCollection() Running garbage collector 
2016-04-15 20:14:10,067789 23751 [0x00007fd9cbfff700] debug KurentoMediaSet   MediaSet.cpp:131 doGarbageCollection() Running garbage collector 

Hinweis:

Ich bin mit maßgeschneiderter Aufruf java Beispiel Gruppe und ich habe einen Kilometer Filter geschrieben, wo ich vadfilter Filter verwendet habe, die bekommen non silent audio buffer, kopieren Sie es in ein gst_buffer-Objekt, bis Stille erkannt wird und senden Sie es dann an einen Webservice mit curl_easy_perform.

auch wenn ich sudo netstat -np | grep "CLOSE_WAIT"

Ich sehe, dass die Maschine, die ich für das Gruppe Gespräch und die Webservice-IP-Adresse verwenden in CLOSE_WAIT Zustand aufgeführt ist, aber ich bin nicht sicher, ob sie in CLOSE_WAIT sind wegen der Absturz in meinem Plugin oder etwas anderes.

P. S: Ich habe gst_buffer_copy_deep()gst_buffer_get_memorygst_buffer_append_memory verwendet nachfolgende Audiopuffer zu kopieren, bis Stille erkannt wird.

Es wird sehr hilfreich sein, wenn jemand auf die Fehler hinweisen kann, mit denen ich konfrontiert bin.

Und es gibt mehr ... Ich habe enter link description here, die Sie beziehen können, um die Fehler zu sehen, die ich bekomme. Vielen Dank.

Edit 1:

namespace kurento 
{ 
    namespace module 
    { 
    namespace vadcustomfilter 
    { 
     void VADCustomFilterImpl::newBufferHandler (GstBuffer * buffer) 
     { 
     try 
     { 
      //instead of doing audio_buffer=null we set a flag audio_buffer_null to TRUE when we want to make a new copy of audio_buffer 
      if (!audio_buffer_null) 
      { 
       // dest, source, flag, offset, size 
       //gst_buffer_copy_into (audio_buffer, buffer, GST_BUFFER_COPY_MEMORY, 0, -1); 
       GstMemory *inbuf_memory = gst_buffer_get_memory(buffer,0); 
       gst_buffer_append_memory(audio_buffer,inbuf_memory); 
      } 
      else 
      { 
       audio_buffer = gst_buffer_new(); 
       audio_buffer = gst_buffer_copy_deep (buffer); 
       audio_buffer_null = FALSE; 
      } 
     } 
     catch (std::bad_weak_ptr & e) 
     { 
      GST_ERROR ("EXCEPTION: new buffer"); 
     } 
     } 


     void VADCustomFilterImpl::busMessage (GstMessage * message) 
     { 
     try 
     { 
      switch (message->type) 
      { 
       case GST_MESSAGE_EOS: 
       { 
        GST_WARNING ("---------------> GST_MESSAGE_EOS"); 
       } 
       break; 

       case GST_MESSAGE_ELEMENT: 
       { 
        const GstStructure *structure = gst_message_get_structure (message); 
        if (structure == NULL) return; 

        const gchar *name = gst_structure_get_name (structure); 
        if (name == NULL || strlen(name) == 0) return; 


        if (strcmp (name, "vadfilter") == 0) 
        { 
         //messages are voice , silence, max_voice_reached 
         const GValue *msg_value = NULL; 
         gint64 silence_buffer = 0; 

         gst_structure_get_int64(structure, "silence", &silence_buffer); 
         silence_buffer /= sizeof(gint16); 

         //silence_buffer is the size of buffer which was emitted in chunks before silence was detected 
         //use to check for min voice size 

         if (silence_buffer > 0) 
         { 
          //min_voice_buffer_size set from java server to set a limit of min buffer size which can be sent to the web service 
          if (audio_buffer != NULL && (silence_buffer > (gint64)min_voice_buffer_size)) 
          { 
           audio_buffer_copy = NULL; 
           audio_buffer_copy = gst_buffer_copy_deep(audio_buffer); 

           push_buffer_in_queue(); 
          } 
          return; 
         } 

         gint64 max_voice = 0; 
         gst_structure_get_int64 (structure, "max_voice_reached", &max_voice); 

         if (audio_buffer != NULL && (max_voice > 0)) 
          { 
          audio_buffer_copy = NULL; 
          audio_buffer_copy = gst_buffer_copy_deep(audio_buffer); 
          push_buffer_in_queue(); 
          } 
         } /* vadfilter */ 
        } /* message_element */ 
        break; 

       default: 
        break; 

       } /* switch */ 
      } 
      catch (std::bad_weak_ptr & e) 
      { 
       GST_ERROR("EXCEPTION: bus message"); 
      } 
     } /* bus message */ 


     void VADCustomFilterImpl::push_buffer_in_queue() 
     { 
     try 
     { 
      GstMapInfo map;   
      if (audio_buffer_copy != NULL) 
      { 

       if (gst_buffer_map (audio_buffer_copy, &map, GST_MAP_READ)) 
       { 
        AUDIO_STREAM *pcm = new AUDIO_STREAM; 
        pcm->size = (unsigned int) map.size; 
        pcm->data = new unsigned char[pcm->size + 1]; 
        std::memcpy (pcm->data, (unsigned char *) map.data, pcm->size); 

        gst_buffer_unmap (audio_buffer_copy, &map); 

        //set the flag for audio_buffer to create a new copy 
        audio_buffer_null = TRUE; 
        fetch_response(pcm); 
        if (pcm->data != NULL) 
        {    
         pcm->size = 0; 
         pcm->data = NULL; 
        } 

        if (pcm != NULL) 
        {   
         delete pcm; 
         pcm = NULL; 
        }    
       } 
       else 
        GST_ERROR("push_buffer_in_queue: Invalid buffer"); 


      } 
     } 
     catch (std::bad_weak_ptr & e) 
     { 
      GST_ERROR ("EXCEPTION::push_buffer_in_queue "); 
     } 
     } 

     VADCustomFilterImpl::VADCustomFilterImpl (const boost::property_tree::ptree &conf, 
               std::shared_ptr<MediaPipeline> mediaPipeline, 
               int minVoiceBufferSize, 
               int maxVoiceBufferSize, 
               int64_t minSilenceTime, 
               const std::string &url, 
               const std::string &language):FilterImpl 
               (conf, 
               std::dynamic_pointer_cast <MediaObjectImpl> (mediaPipeline)) 
     { 

     try 
     { 
      min_voice_buffer_size = (gsize) minVoiceBufferSize; 
      curl_url = url ; 
      accept_language = (!language.empty()) ? ("Accept-Language: " + language) : ("Accept-Language: en-US"); 

      g_object_set (element, "filter-factory", "vadfilter", NULL); 

      g_object_get(G_OBJECT(element), "filter", &vadfilter, NULL); 

      if (vadfilter == NULL) 
       { 
       throw KurentoException (MEDIA_OBJECT_NOT_AVAILABLE, "Media Object not available"); 
       } 


      g_object_set(G_OBJECT(vadfilter), "minimum-silence-time", (guint64)minSilenceTime, NULL); //2000000000 
      g_object_set(G_OBJECT(vadfilter), "max-voice-buffer-size", (guint)maxVoiceBufferSize, NULL); 

      g_object_unref(vadfilter); 

      g_async_queue_create(); 

      curl_init(); 
     } 
     catch (std::bad_weak_ptr & e) 
     { 
      GST_ERROR("EXCEPTION: constructor... "); 
     } 
     } 

     void VADCustomFilterImpl::postConstructor() 
     { 
     try 
     { 
      GstBus *bus; 

      std::shared_ptr <MediaPipelineImpl> pipe; 

      FilterImpl::postConstructor(); 

      pipe = std::dynamic_pointer_cast <MediaPipelineImpl>(getMediaPipeline()); 

      bus = gst_pipeline_get_bus (GST_PIPELINE (pipe->getPipeline())); 

      /* register bus message handler */ 
      bus_handler_id = register_signal_handler (G_OBJECT (bus), 
            "message", 
            std::function < 
            void (GstElement *, 
            GstMessage *) > 
            (std:: 
            bind 
            (&VADCustomFilterImpl:: 
            busMessage, this, 
            std::placeholders::_2)), 
            std::dynamic_pointer_cast < 
            VADCustomFilterImpl > 
            (shared_from_this())); 

      /* register new_buffer handler */ 
      remove_silence_id = register_signal_handler(G_OBJECT(vadfilter), 
            "buffer", 
            std::function < 
            void (GstElement *, 
             GstBuffer *) > 
            (std:: 
             bind 
             (&VADCustomFilterImpl:: 
             newBufferHandler, this, 
             std::placeholders:: 
             _2)), 
            std:: 
            dynamic_pointer_cast < 
            VADCustomFilterImpl > 
            (shared_from_this())); 

      if (bus) 
       g_object_unref (bus); 
     } 
      catch (std::bad_weak_ptr & e) 
     { 
      GST_ERROR("EXCEPTION: post constructor... "); 
     } 
     } 

     VADCustomFilterImpl::~VADCustomFilterImpl() 
     { 
     std::shared_ptr <MediaPipelineImpl> pipe; 

     try 
     {   
      if (bus_handler_id > 0) 
      { 
      pipe = std::dynamic_pointer_cast <MediaPipelineImpl> (getMediaPipeline()); 
      GstBus *bus = gst_pipeline_get_bus (GST_PIPELINE (pipe->getPipeline())); 
      if (bus) 
      { 
       unregister_signal_handler (bus, bus_handler_id); 
       g_object_unref (bus); 
      } 

      } 

      /*if (remove_silence_id > 0) 
      { 
       unregister_signal_handler (element, remove_silence_id); 
      } 
      */ 

      /* curl cleanup */ 
      curl_cleanup(); 

      /* buffer queue cleanup */ 
      if (g_async_buffer_queue) 
      { 
       g_async_queue_unref (g_async_buffer_queue); 
       g_async_buffer_queue = NULL; 
      } 


      /* audio_buffer cleanup */ 
      if (audio_buffer_copy != NULL) 
      { 
       audio_buffer_copy = NULL; 
      } 

      if (audio_buffer != NULL) 
      { 
       audio_buffer = NULL; 
       audio_buffer_null = TRUE; 
      } 

     } 
     catch (std::bad_weak_ptr & e) 
     { 
      remove_silence_id = -1; 
      GST_ERROR("EXCEPTION: destructor "); 
     } 
     } 


     size_t VADCustomFilterImpl::curl_callback (char *contents, size_t size, 
         size_t nmemb, void *userp) 
     { 
     size_t realsize = 0; 
     try 
     { 
      realsize = size * nmemb; /* calculate buffer size */ 
      ((std::string *) userp)->append ((char *) contents, realsize); 
     } 
     catch (std::bad_weak_ptr & e) 
     { 
      GST_ERROR("EXCEPTION: curl_callback"); 
     } 
     return realsize; 
     } 

     int VADCustomFilterImpl::curl_init() 
     { 
     /* init curl handle */ 
     if ((curl_handle = curl_easy_init()) == NULL) 
      return 1; 


     /* set content type */ 
     headers = curl_slist_append (headers, "Accept: application/xml"); 

     headers = curl_slist_append (headers, "Transfer-Encoding: chunked"); 
     headers = curl_slist_append (headers, accept_language.c_str());  

     /* set curl options */ 
     curl_easy_setopt (curl_handle, CURLOPT_VERBOSE, 0); 
     curl_easy_setopt (curl_handle, CURLOPT_POST, 1); 
     curl_easy_setopt (curl_handle, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_1_1); 
     curl_easy_setopt (curl_handle, CURLOPT_HTTPHEADER, headers); 
     curl_easy_setopt (curl_handle, CURLOPT_TIMEOUT, 10);   
     curl_easy_setopt (curl_handle, CURLOPT_USERAGENT, "libcurl-agent/1.0"); /* set default user agent */     
     curl_easy_setopt (curl_handle, CURLOPT_URL, curl_url.c_str()); /* set url to fetch */  

     return 0; 
     } 


     int VADCustomFilterImpl::curl_fetch (AUDIO_STREAM * pcm) 
     { 
     curl_op_in_progress = TRUE; 

     if (curl_handle == NULL) 
      return 1; 

     CURLcode rcode; 

     try 
     { 
      response.clear(); 
      curl_easy_setopt (curl_handle, CURLOPT_WRITEFUNCTION, curl_callback); /* set calback function */   
      curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, &response);/* pass fetch struct pointer */ 
      curl_easy_setopt (curl_handle, CURLOPT_POSTFIELDSIZE, pcm->size); 
      curl_easy_setopt (curl_handle, CURLOPT_POSTFIELDS, pcm->data); 

      rcode = curl_easy_perform(curl_handle); 

      handle_curl_response(rcode); 
     } 
     catch (std::bad_weak_ptr & e) 
     { 
      GST_ERROR("EXCEPTION: curl_fetch "); 
     } 


     return 1; 
     } 

    void VADCustomFilterImpl::handle_curl_response(CURLcode rcode) 
    { 
     curl_op_in_progress = FALSE; 
     switch (rcode) 
     { 
      case CURLE_OK: 
      { 

       if (!response.empty()) 
       { 
        SendresponseReceivedEvent(response); 
        response.clear(); 
       } 
      } 
       break; 

      case CURLE_OPERATION_TIMEDOUT: 
      case CURLE_HTTP_RETURNED_ERROR: /* HTTP Error, >= 400*/ 
      { 
       GST_ERROR ("ERROR: handle_curl_response: (%s)", curl_easy_strerror (rcode)); 

       goto pop_pcm_from_queue; 
      } 
      break; 

      default: 
       GST_ERROR ("ERROR: Failed to fetch url (%s) - curl said: %s\n", curl_url.c_str(), curl_easy_strerror (rcode)); 
       break; 
     } /* switch */ 



     pop_pcm_from_queue: 
      if (g_async_queue_length (g_async_buffer_queue) > 0) 
       { 
       AUDIO_STREAM *pcm_popped = reinterpret_cast < AUDIO_STREAM * >(g_async_queue_pop (g_async_buffer_queue)); 
       curl_fetch (pcm_popped); 

       } 
      else 
       GST_ERROR ("handle_curl_response: queue is empty, no pcm to pop out"); 
    } 

    void VADCustomFilterImpl::curl_cleanup() 
    { 
     if (curl_handle) 
      curl_easy_cleanup (curl_handle); 

     /* free headers */ 
     if (headers) 
      curl_slist_free_all (headers); 

     curl_url.clear(); 
    } 

    /* fetch the response from web service, if it is already fetching the push the current audio buffer in queue */ 
    void VADCustomFilterImpl::fetch_response(AUDIO_STREAM * pcm) 
    { 
    //check queue size if > 0 then push else hit curl 
    if (!curl_op_in_progress) 
     { 
     curl_fetch (pcm); 
     } 
    else 
     { 
     try 
     {   
      if (g_async_buffer_queue == NULL) 
       g_async_queue_create(); 

      g_async_queue_push (g_async_buffer_queue, pcm); 
     } 
     catch (std::bad_weak_ptr & e) 
     { 
      GST_ERROR("EXCEPTION: curl_fetch "); 
     } 

     } 
    } 

     void VADCustomFilterImpl::g_async_queue_create() 
     { 
     g_async_buffer_queue = g_async_queue_new(); 
     } 
     }   /* vadcustomfilter */ 
    }  /* module */ 
}  /* kurento */ 

Antwort

1

geben die Informationen, die Sie hier zur Verfügung stellen und in dieser anderen related question. Ich bin mir ziemlich sicher, dass das Problem darin besteht, dass Ihr Filter den Medienfluss blockiert (wahrscheinlich aufgrund der Tatsache, dass er HTTP-Anfragen sendet). Darüber hinaus weisen alle Fehler, die Sie mit Puffern und Miniobjekten haben, darauf hin, dass Sie bei der Verarbeitung des Datenträgers Fehler im Filter haben.

Sie sollten Ihren Code überprüfen und sich möglicherweise mit einem Experten in Verbindung setzen, um die Ursache des Problems zu erfahren. Es kann schwierig sein, nur mit den von Ihnen bereitgestellten Informationen zu raten, ohne auf den Quellcode zugreifen zu müssen. Selbst mit dem Quellcode könnte es schwierig sein festzustellen, was den Deadlock verursacht.

+0

hi, ich habe den Quellcode hinzugefügt, bitte überprüfen Sie die Eidt –