Ich verwendete Symfony2 mit der RabbitMqBundle, um einen Worker zu erstellen, der Dokumente an ElasticSearch sendet. Das Indizieren von Dokumenten mit einer Rate von eins nach eins ist viel langsamer als die Verwendung der Bulk-API von ElasticSearch. Deshalb habe ich einen Puffer erstellt, der die Dokumente in Gruppen von 1000 nach ES leert. Der Code sieht (ein wenig vereinfacht) wie folgt aus:Führen Sie Funktion in PHP CLI-Skript nach Zeitraum der Inaktivität
Das alles funktioniert ganz nett, aber es gibt ein kleines Problem. Die Warteschlange wird mit Nachrichten mit einer unvorhersehbaren Rate gefüllt. Manchmal 100000 in 5 Minuten, manchmal nicht stundenlang. Wenn beispielsweise 82671 Dokumente in die Warteschlange gestellt werden, werden die letzten 671 Dokumente nicht indexiert, bevor weitere 329 Dokumente empfangen werden, was Stunden dauern kann. Ich möchte in der Lage sein, folgendes zu tun:
Warnung: Sci-Fi Code! Dies wird sich natürlich nicht:
class SearchIndexator
{
protected $elasticaService;
protected $buffer = [];
protected $bufferSize = 0;
protected $flushTimer;
// The maximum number of documents to keep in the buffer.
// If the buffer reaches this amount of documents, then the buffers content
// is send to elasticsearch for indexation.
const MAX_BUFFER_SIZE = 1000;
public function __construct(ElasticaService $elasticaService)
{
$this->elasticaService = $elasticaService;
// Highly Sci-fi code
$this->flushTimer = new Timer();
// Flush buffer after 5 minutes of inactivity.
$this->flushTimer->setTimeout(5 * 60);
$this->flushTimer->setCallback([$this, 'flush']);
}
/**
* Destructor
*
* Flush any documents that remain in the buffer.
*/
public function __destruct()
{
$this->flush();
}
/**
* Add a document to the indexation buffer.
*/
public function onMessage(array $document)
{
// Prepare the document for indexation.
$this->doHeavyWeightStuff($document);
// Create an Elastica document
$document = new \Elastica\Document(
$document['key'],
$document
);
// Add the document to the buffer.
$this->buffer[] = $document;
// Flush the buffer when max buffersize has been reached.
if (self::MAX_BUFFER_SIZE <= ++$this->bufferSize) {
$this->flush();
} else {
// Start a timer that will flush the buffer after a timeout.
$this->initTimer();
}
}
/**
* Send the current buffer to ElasticSearch for indexation.
*/
public function flush()
{
// Send documents to ElasticSearch for indexation.
if (1 <= $this->bufferSize) {
$this->elasticaService->addDocuments($this->buffer);
}
// Clear buffer
$this->buffer = [];
$this->bufferSize = 0;
// There are no longer messages to be send, stop the timer.
$this->flushTimer->stop();
}
protected function initTimer()
{
// Start or restart timer
$this->flushTimer->isRunning()
? $this->flushTimer->reset()
: $this->flushTimer->start();
}
}
Nun, ich weiß über die Grenzen von PHP nicht sein ereignisgesteuert. Aber das ist 2015 und es gibt Lösungen wie ReactPHP, also sollte das möglich sein, oder? Für ØMQ gibt es this function. Was wäre eine Lösung, die für RabbitMQ oder unabhängig von einer Nachrichtenwarteschlangenerweiterung funktioniert?
Lösungen, die ich bin skeptisch:
- ist es crysalead/code. Es simuliert einen Timer mit
declare(ticks = 1);
. Ich bin mir nicht sicher, ob dies ein performanter und solider Ansatz ist. Irgendwelche Ideen? - Ich könnte einen Cronjob ausführen, der alle fünf Minuten eine "FLUSH" -Nachricht in derselben Warteschlange veröffentlicht und dann den Puffer beim Empfang dieser Nachricht explizit löscht, aber das würde betrügen.
Nicht vollständig, was Sie suchen, aber könnte eine gute Lösung sein. Speichern Sie die Uhrzeit, zu der Sie den Befehl 'flush' zuletzt ausgeführt haben, und überprüfen Sie beim Hinzufügen von Dokumenten die Uhrzeit. Wenn es schon länger als 5 Minuten spült. 2. beste Option ist der Cronjob IMHO –
Der Punkt ist, wenn Sie keine Nachrichten für einen langen Zeitraum erhalten, können Sie nicht überprüfen, die Zeit und damit der Puffer wird nicht geleert. Ein Cronjob führt PHP in einem anderen Prozess aus und kann daher nicht auf den Puffer zugreifen. – Xatoo
Also läuft dieser Code in einem lang laufenden PHP-Prozess? Denn in diesem Fall könnten Sie wahrscheinlich Signale verwenden (genau wie Ihre Nummer 1-Option tut) [hier] (http://www.hackingwithphep.com/16/1/1/taking-control-of-php-pcntl_signal)) und [hier] (http: //www.hackingwithphp.com/16/1/2/Timing-your-Signale). Diese Signale sind nicht blockierend, haben sie selbst noch nicht benutzt, aber es könnte genau das sein, was Sie für Ihren Anwendungsfall brauchen. –