2016-06-08 4 views
1

Ich habe zwei Dateien für ein Projekt eingerichtet mit Celery und Pydoop, tasks.py und HDFSStorage.pyPydoop Anruf nicht funktioniert, wenn sie innerhalb Sellerie Aufgabe

# tasks.py 

from celery import Celery 
from celery import shared_task 
from celery.utils.log import get_task_logger 
from HDFSStorage import HDFSStorage 

app = Celery('tasks', broker='amqp://[email protected]//') 
logger = get_task_logger(__name__) 
fs = HDFSStorage() 
print fs.exists("/myfile.txt") 

@shared_task 
def add(x,y): 
    logger.info('Adding {0} + {1}'.format(x, y)) 
    logger.info('Checking if file exists') 
    fs.exists("/myfile.txt") 
    logger.info('Done checking if file exists') 
    return x+y 

# HDFSStorage.py 

import pydoop 
from pydoop.hdfs import hdfs 

class HDFSStorage(): 
    def __init__(self): 
     self.client = hdfs(host="master", port=54310, user="oskar") 

    def exists(self, name): 
     return self.client.exists(name) 

Running Sellerie startet mit dem fs.exists() Aufruf außerhalb der Aufgabe und gibt True als erwartet.

$ celery -A tasks worker -l info 
True 
[2016-06-08 15:54:15,298: WARNING/MainProcess] /usr/local/lib/python2.7/dist-packages/ce 
lery/apps/worker.py:161: CDeprecationWarning: 
Starting from version 3.2 Celery will refuse to accept pickle by default. 

The pickle serializer is a security concern as it may give attackers 
the ability to execute any command. It's important to secure 
your broker from unauthorized access when using pickle, so we think 
that enabling pickle should require a deliberate action and not be 
the default choice. 

If you depend on pickle then you should set a setting to disable this 
warning and to be sure that everything will continue working 
when you upgrade to Celery 3.2:: 

    CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml'] 

You must only enable the serializers that you will actually use. 


    warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED)) 

-------------- [email protected] v3.1.23 (Cipater) 
---- **** ----- 
--- * *** * -- Linux-3.19.0-32-generic-x86_64-with-LinuxMint-17.3-rosa 
-- * - **** --- 
- ** ---------- [config] 
- ** ---------- .> app:   tasks:0x7f510d3162d0 
- ** ---------- .> transport: amqp://guest:**@localhost:5672// 
- ** ---------- .> results:  disabled:// 
- *** --- * --- .> concurrency: 4 (prefork) 
-- ******* ---- 
--- ***** ----- [queues] 
-------------- .> celery   exchange=celery(direct) key=celery 


[tasks] 
    . tasks.add 

[2016-06-08 15:54:15,371: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672/ 
/
[2016-06-08 15:54:15,382: INFO/MainProcess] mingle: searching for neighbors 
[2016-06-08 15:54:16,395: INFO/MainProcess] mingle: all alone 
[2016-06-08 15:54:16,412: WARNING/MainProcess] [email protected] ready. 
[2016-06-08 15:54:19,736: INFO/MainProcess] Events of group {task} enabled by remote. 

jedoch die Aufgabe ausgeführt wird, die die gleiche fs.exists() Anruf hat wird aus einem unbekannten Grund stecken.

$ python 
Python 2.7.6 (default, Jun 22 2015, 17:58:13) 
[GCC 4.8.2] on linux2 
Type "help", "copyright", "credits" or "license" for more information. 
>>> from tasks import add 
True 
>>> print add.delay(5,4).get() 

[2016-06-08 15:54:32,833: INFO/MainProcess] Received task: tasks.add[a50409a8-f82d-4376- 
ace2-442a09c9ed4f] 
[2016-06-08 15:54:32,834: INFO/Worker-2] tasks.add[a50409a8-f82d-4376-ace2-442a09c9ed4f] 
: Adding 5 + 3 
[2016-06-08 15:54:32,834: INFO/Worker-2] tasks.add[a50409a8-f82d-4376-ace2-442a09c9ed4f] 
: Checking if file exists 

die fs.exists() Anruf in der Aufgabe macht richtig die Aufgabe Finish Entfernen.

Was mache ich falsch? Was macht Sellerie nicht mit Pydoop arbeiten?

Antwort

1

Die HDFSStorage Instanz muss innerhalb der Aufgabe erstellt werden

# tasks.py 

from celery import Celery 
from celery import shared_task 
from celery.utils.log import get_task_logger 
from HDFSStorage import HDFSStorage 

app = Celery('tasks', broker='amqp://[email protected]//') 
logger = get_task_logger(__name__) 

@shared_task 
def add(x,y): 
    fs = HDFSStorage() 
    logger.info('Adding {0} + {1}'.format(x, y)) 
    logger.info('Checking if file exists') 
    fs.exists("/myfile.txt") 
    logger.info('Done checking if file exists') 
    return x+y