2014-03-01 6 views
25

This is not working anymore, Scrapy API hat sich geändert.Führen Sie eine Scrapy-Spinne in einer Sellerie-Aufgabe

Jetzt die Dokumentation Feature einen Weg zu "", aber ich bekomme die ReactorNotRestartable Fehler.

Meine Aufgabe:

from celery import Task 

from twisted.internet import reactor 

from scrapy.crawler import Crawler 
from scrapy import log, signals 
from scrapy.utils.project import get_project_settings 

from .spiders import MySpider 



class MyTask(Task): 
    def run(self, *args, **kwargs): 
     spider = MySpider 
     settings = get_project_settings() 
     crawler = Crawler(settings) 
     crawler.signals.connect(reactor.stop, signal=signals.spider_closed) 
     crawler.configure() 
     crawler.crawl(spider) 
     crawler.start() 

     log.start() 
     reactor.run() 
+0

Welche Version von Scrapy verwenden Sie? – Talvalin

+0

@Talvalin 'Scrapy == 0.22.2' –

+1

@shirkey Ich beziehe mich auf diese Frage im ersten Link –

Antwort

28

Die verdrehte Drossel kann nicht neu gestartet werden. Eine Arbeit um für diese ist die Sellerie Aufgabe zu lassen, für jeden Crawl einen neuen Kind-Prozess gabeln Sie wie vorgeschlagen in dem folgenden Posten ausführen mögen:

Running Scrapy spiders in a Celery task

Diese rund um den „Reaktor wird nicht neu starten-fähig sein Ausgabe "durch Nutzung des Multiprocessing-Pakets. Aber das Problem dabei ist, dass die Problemumgehung mit der neuesten Sellerie-Version veraltet ist, weil Sie stattdessen in ein anderes Problem geraten, bei dem ein Daemon-Prozess keine Unterprozesse erzeugen kann. Damit die Problemumgehung funktioniert, müssen Sie die Sellerieversion verwenden.

Ja, und die Scrapy API hat sich geändert. Aber mit kleinen Änderungen (Crawler importieren statt CrawlerProcess). Sie können die Problemumgehung verwenden, indem Sie in der Sellerieversion herunterfahren. Celery Issue #1709

Hier ist mein aktualisiert Crawl-Skript, das durch die Verwendung von Billard statt Multiprozessing mit neueren Sellerie Versionen funktioniert:

Die Sellerie Ausgabe finden Sie hier

from scrapy.crawler import Crawler 
from scrapy.conf import settings 
from myspider import MySpider 
from scrapy import log, project 
from twisted.internet import reactor 
from billiard import Process 
from scrapy.utils.project import get_project_settings 

class UrlCrawlerScript(Process): 
    def __init__(self, spider): 
     Process.__init__(self) 
     settings = get_project_settings() 
     self.crawler = Crawler(settings) 
     self.crawler.configure() 
     self.crawler.signals.connect(reactor.stop, signal=signals.spider_closed) 
     self.spider = spider 

    def run(self): 
     self.crawler.crawl(self.spider) 
     self.crawler.start() 
     reactor.run() 

def run_spider(url): 
    spider = MySpider(url) 
    crawler = UrlCrawlerScript(spider) 
    crawler.start() 
    crawler.join() 

Bearbeiten: Durch Lesen der Sellerie Ausgabe #1709 schlagen sie vor, Billiard anstelle von Multiprocessing zu verwenden, damit die Subprozessbeschränkung aufgehoben wird. Mit anderen Worten, wir sollten versuchen billiard und sehen, ob es funktioniert!

Bearbeiten 2: Ja, mit billiard funktioniert mein Skript mit dem neuesten Sellerie-Build! Siehe mein aktuelles Skript.

+0

funktioniert wie ein Charme, danke! –

+2

Hinweis - Ich musste die 'self.crawler.signals.connect (reactor.stop, signal = signals.spider_closed)' Zeile außerhalb der Initialisierungsprüfung verschieben oder der zweite Durchlauf würde hängen bleiben. Verschieben es macht es gut in meinem Projekt. Auch, da 'scrapy.project' abgeschrieben wird, verwendete' current_thread' von billiard ein Initialisierungsflag pro Thread. Das hat auch super funktioniert. – jlovison

+1

jlovison, kannst du bitte die Änderungen teilen, die du an current_thread vorgenommen hast? und wo hast du signals.spider_closed platziert? danke im voraus –

9

The Twisted Reaktor kann nicht neu gestartet werden, so dass, sobald eine Spinne läuft beendet und crawler stoppt den Reaktor implizit, dass Arbeiter nutzlos ist.

Wie in den Antworten zu dieser anderen Frage geschrieben, müssen Sie nur den Arbeiter töten, der Ihre Spinne laufen ließ und sie durch eine neue ersetzen, die verhindert, dass der Reaktor mehr als einmal gestartet und gestoppt wird. Um dies zu tun, nur ein:

CELERYD_MAX_TASKS_PER_CHILD = 1 

Der Nachteil ist, dass Sie nicht wirklich sind der Twisted-Reaktor zu seinem vollen Potential mit und Verschwendung von Ressourcen mehrere Reaktoren ausgeführt wird, als ein Reaktor auf einmal mehrere Spinne läuft in ein einzelner Prozess. Ein besserer Ansatz besteht darin, einen Reaktor pro Arbeiter (oder sogar einen Reaktor weltweit) zu betreiben und ihn nicht berühren zu lassen.

Ich arbeite an diesem für ein sehr ähnliches Projekt, also werde ich diesen Beitrag aktualisieren, wenn ich irgendwelche Fortschritte mache.

+4

Ich bin an Ihrer Problemumgehung interessiert. Bitte lassen Sie uns wissen, wenn Sie sich etwas einfallen lassen. –

+0

Sehr interezessting, Bist du damit irgendwo hingekommen? – gerosalesc

-2

Ich würde sagen, dieser Ansatz ist sehr ineffizient, wenn Sie eine Menge Aufgaben zu bearbeiten haben. Da Sellerie mit Gewinde versehen ist, wird jede Aufgabe in einem eigenen Thread ausgeführt. Nehmen wir an, mit RabbitMQ als Broker können Sie> 10K q/s passieren. Mit Sellerie würde dies möglicherweise 10K Threads Overhead verursachen! Ich würde empfehlen, hier keinen Sellerie zu verwenden. Greifen Sie stattdessen direkt auf den Broker zu!

+1

Greifen Sie direkt auf den Broker zu? Was meinen Sie? –

2

Um ReactorNotRestartable Fehler zu vermeiden, wenn Scrapy in Sellerie Tasks Warteschlange ausgeführt wird, habe ich Threads verwendet. Derselbe Ansatz wurde verwendet, um den Twisted-Reaktor mehrmals in einer App zu betreiben. Scrapy verwendet auch Twisted, also können wir das auch machen. Hier

ist der Code:

from threading import Thread 
from scrapy.crawler import CrawlerProcess 
import scrapy 

class MySpider(scrapy.Spider): 
    name = 'my_spider' 


class MyCrawler: 

    spider_settings = {} 

    def run_crawler(self): 

     process = CrawlerProcess(self.spider_settings) 
     process.crawl(MySpider) 
     Thread(target=process.start).start() 

nicht CELERYD_CONCURRENCY für Sellerie zu erhöhen Vergessen.

CELERYD_CONCURRENCY = 10 

funktioniert gut für mich.

Dies blockiert den laufenden Prozess nicht, aber scrapy Best Practice ist ohnehin die Verarbeitung von Daten in Callbacks. Gehen Sie einfach so vor:

for crawler in process.crawlers: 
    crawler.spider.save_result_callback = some_callback 
    crawler.spider.save_result_callback_params = some_callback_params 

Thread(target=process.start).start()