2015-12-14 14 views
5

Ich verwendete Symfony2 mit der RabbitMqBundle, um einen Worker zu erstellen, der Dokumente an ElasticSearch sendet. Das Indizieren von Dokumenten mit einer Rate von eins nach eins ist viel langsamer als die Verwendung der Bulk-API von ElasticSearch. Deshalb habe ich einen Puffer erstellt, der die Dokumente in Gruppen von 1000 nach ES leert. Der Code sieht (ein wenig vereinfacht) wie folgt aus:Führen Sie Funktion in PHP CLI-Skript nach Zeitraum der Inaktivität

Das alles funktioniert ganz nett, aber es gibt ein kleines Problem. Die Warteschlange wird mit Nachrichten mit einer unvorhersehbaren Rate gefüllt. Manchmal 100000 in 5 Minuten, manchmal nicht stundenlang. Wenn beispielsweise 82671 Dokumente in die Warteschlange gestellt werden, werden die letzten 671 Dokumente nicht indexiert, bevor weitere 329 Dokumente empfangen werden, was Stunden dauern kann. Ich möchte in der Lage sein, folgendes zu tun:

Warnung: Sci-Fi Code! Dies wird sich natürlich nicht:

class SearchIndexator 
{ 
    protected $elasticaService; 
    protected $buffer = []; 
    protected $bufferSize = 0; 
    protected $flushTimer; 

    // The maximum number of documents to keep in the buffer. 
    // If the buffer reaches this amount of documents, then the buffers content 
    // is send to elasticsearch for indexation. 
    const MAX_BUFFER_SIZE = 1000; 

    public function __construct(ElasticaService $elasticaService) 
    { 
     $this->elasticaService = $elasticaService; 

     // Highly Sci-fi code 
     $this->flushTimer = new Timer(); 
     // Flush buffer after 5 minutes of inactivity. 
     $this->flushTimer->setTimeout(5 * 60); 
     $this->flushTimer->setCallback([$this, 'flush']); 
    } 

    /** 
    * Destructor 
    * 
    * Flush any documents that remain in the buffer. 
    */ 
    public function __destruct() 
    { 
     $this->flush(); 
    } 

    /** 
    * Add a document to the indexation buffer. 
    */ 
    public function onMessage(array $document) 
    { 
     // Prepare the document for indexation. 
     $this->doHeavyWeightStuff($document); 

     // Create an Elastica document 
     $document = new \Elastica\Document(
      $document['key'], 
      $document 
     ); 

     // Add the document to the buffer. 
     $this->buffer[] = $document; 

     // Flush the buffer when max buffersize has been reached. 
     if (self::MAX_BUFFER_SIZE <= ++$this->bufferSize) { 
      $this->flush(); 
     } else { 
      // Start a timer that will flush the buffer after a timeout. 
      $this->initTimer(); 
     } 
    } 

    /** 
    * Send the current buffer to ElasticSearch for indexation. 
    */ 
    public function flush() 
    { 
     // Send documents to ElasticSearch for indexation. 
     if (1 <= $this->bufferSize) { 
      $this->elasticaService->addDocuments($this->buffer); 
     } 

     // Clear buffer 
     $this->buffer = []; 
     $this->bufferSize = 0; 

     // There are no longer messages to be send, stop the timer. 
     $this->flushTimer->stop(); 
    } 

    protected function initTimer() 
    { 
     // Start or restart timer 
     $this->flushTimer->isRunning() 
      ? $this->flushTimer->reset() 
      : $this->flushTimer->start(); 
    } 
} 

Nun, ich weiß über die Grenzen von PHP nicht sein ereignisgesteuert. Aber das ist 2015 und es gibt Lösungen wie ReactPHP, also sollte das möglich sein, oder? Für ØMQ gibt es this function. Was wäre eine Lösung, die für RabbitMQ oder unabhängig von einer Nachrichtenwarteschlangenerweiterung funktioniert?

Lösungen, die ich bin skeptisch:

  1. ist es crysalead/code. Es simuliert einen Timer mit declare(ticks = 1);. Ich bin mir nicht sicher, ob dies ein performanter und solider Ansatz ist. Irgendwelche Ideen?
  2. Ich könnte einen Cronjob ausführen, der alle fünf Minuten eine "FLUSH" -Nachricht in derselben Warteschlange veröffentlicht und dann den Puffer beim Empfang dieser Nachricht explizit löscht, aber das würde betrügen.
+0

Nicht vollständig, was Sie suchen, aber könnte eine gute Lösung sein. Speichern Sie die Uhrzeit, zu der Sie den Befehl 'flush' zuletzt ausgeführt haben, und überprüfen Sie beim Hinzufügen von Dokumenten die Uhrzeit. Wenn es schon länger als 5 Minuten spült. 2. beste Option ist der Cronjob IMHO –

+0

Der Punkt ist, wenn Sie keine Nachrichten für einen langen Zeitraum erhalten, können Sie nicht überprüfen, die Zeit und damit der Puffer wird nicht geleert. Ein Cronjob führt PHP in einem anderen Prozess aus und kann daher nicht auf den Puffer zugreifen. – Xatoo

+0

Also läuft dieser Code in einem lang laufenden PHP-Prozess? Denn in diesem Fall könnten Sie wahrscheinlich Signale verwenden (genau wie Ihre Nummer 1-Option tut) [hier] (http://www.hackingwithphep.com/16/1/1/taking-control-of-php-pcntl_signal)) und [hier] (http: //www.hackingwithphp.com/16/1/2/Timing-your-Signale). Diese Signale sind nicht blockierend, haben sie selbst noch nicht benutzt, aber es könnte genau das sein, was Sie für Ihren Anwendungsfall brauchen. –

Antwort

0

Wie ich in meinem Kommentar erwähnt habe, könnten Sie die Signale verwenden. Mit PHP können Sie Signal-Handler für Ihre Skriptsignale registrieren (z. B. SIGINT, SIGKILL usw.)

Für Ihren Anwendungsfall können Sie das Signal SIGALRM verwenden. Dieses Signal wird Ihr Skript nach einer bestimmten Zeit (die Sie einstellen können) alarmieren. Die positive Seite dieser Signale ist, dass sie nicht blockierend sind. Mit anderen Worten, der normale Betrieb Ihres Skripts wird nicht beeinträchtigt.

function signal_handler($signal) { 
    // You would flush here 
    print "Caught SIGALRM\n"; 
    // Set the SIGALRM timer again or it won't trigger again 
    pcntl_alarm(300); 
} 

// register your handler with the SIGALRM signal 
pcntl_signal(SIGALRM, "signal_handler", true); 
// set the timeout for the SIGALRM signal to 300 seconds 
pcntl_alarm(300); 

// start loop and check for pending signals 
while(pcntl_signal_dispatch() && your_loop_condition) { 
    //Execute your code here 
} 

Hinweis:

Die eingestellte Lösung (Zecken sind seit PHP 5.3 veraltet) Sie nur 1 SIGALRM Signal in Ihrem Skript verwenden können, wenn Sie die Zeit Ihres Signal mit pcntl_alarm der Timer für Ihre Set Alarm wird zurückgesetzt (ohne das Signal zu zünden) auf seinen neu eingestellten Wert.

+0

Ja, das ist dasselbe wie das '' crysalead/code'' Projekt, das ich in meiner Frage erwähnt habe. Aber das verwendet "declare ticks" und ich bezweifle, dass es eine performante Lösung ist, die Ausführung von PHP nach jeder Anweisung zu unterbrechen. Hast du damit Erfahrung? – Xatoo

+0

Der Link, den Sie angegeben haben, enthält auch Links zu einer Seite, die erklärt, dass die Verwendung von Ticks veraltet ist. Die meisten Erwähnungen von Zecken geben an, dass die Verwendung von Zecken in den meisten Fällen ein Anti-Muster ist. Ich bin daher interessiert, ob es eine Alternative gibt. – Xatoo

+0

Ihr Recht, ich habe den Teil übersehen, dass die angegebenen Ticks veraltet waren. Ich habe etwas gegraben und eine nicht veraltete Alternative gefunden. Mit 'pcntl_signal_dispatch()' können Sie selbst bestimmen, wann nach ausstehenden Signalen i.s.o gesucht werden soll. den Hundeführer jeden Tick ausführen. Die angepasste Lösung wird auch leistungsfähiger sein .. Hoffe das hilft. –