2016-06-26 11 views
2

Ich habe Luftstrom konfiguriert und erstellt einige Dags und subDags, die mehrere Operatoren aufrufen.Luftstrom erhalten Ergebnis nach der Ausführung eines Bedieners

Mein Problem ist, dass, wenn ein Operator ausgeführt wird und den Job beendet, möchte ich die Ergebnisse in einer Python-Struktur zurück erhalten. Zum Beispiel:

File1.py

... 
    ... 
    sub_dag_one=SubDagOperator(subdag=subdag_accessHive(
PARENT_DAG_NAME, CHILD_DAG_NAME, default_args, STEP, macros,path, 
     ), 
     task_id=DELP_DAG_NAME, 
     dag=dag, 
    ) 

File2.py

from airflow import DAG 
    from airflow.operators import HiveOperator 
def subdag_callHive(parent, child, args, step, 
         user_defined_macros, path 
         ): 
     dag_subdag = DAG(
      dag_id='%s.%s' % (parent, child), 
      default_args=args, 
      schedule_interval="@daily", 
      template_searchpath=path, 
      user_defined_macros=user_defined_macros, 
     ) 

     # some work... 

     HiveOperator(
      task_id='some_id', 
      hiveconf_jinja_translate=True, 
      hql='select field1 from public.mytable limit 4;', 
      trigger_rule='all_done', 
      dag=dag_subdag, 
     ) 

     return dag_subdag 

Die Funktion subdag_callHive aus einem anderen Python-Skript aufgerufen wird, wo die Haupt Dag definiert ist und alle anderen Parameter benötigt.

Ich möchte nur in der Lage sein, das Ergebnis aus dem HiveOperator (* wählen * von public.mytable Grenze 4; *), die in diesem Fall 4 Werte wäre.

der zurück dag_subdag ist ein Objekt < Klasse 'airflow.models.DAG> und enthält alle Attribute/auf den Aufruf übergebenen Daten, aber keine Informationen darüber, was der HiveOperator tat.

Ist das möglich? Wenn ja, wie kann es erreicht werden?

Antwort

4

Sie können Hooks für Ihre Bedürfnisse verwenden. Grundsätzlich macht der HiveOperator dasselbe, er ruft Hive Hooks an, die mehrere Methoden haben, um mit Ergebnissen zu arbeiten.

Verwenden Sie einen PythonOperator, um eine Funktion aufzurufen, die dann einen Hive-Hook startet.

Folgendes Beispiel könnte Ihnen helfen.

Code Snippet:

callHook = PythonOperator(
    task_id='foo', 
    python_callable=do_work, 
    dag=dag 
) 

def do_work(): 
    hiveserver = HiveServer2Hook() 
    hql = "SELECT COUNT(*) FROM foo.bar" 
    row_count = hiveserver.get_records(hql, schema='foo') 
    print row_count[0][0] 

Alle verfügbaren Methoden sind hier zu finden: https://github.com/apache/incubator-airflow/blob/master/airflow/hooks/hive_hooks.py