2015-11-05 4 views
7

Ich habe ein Problem, von Spark in Hive erzeugte partitionierte Parkettdateien zu lesen. Ich bin in der Lage, die externe Tabelle in Hive zu erstellen, aber wenn ich versuche, ein paar Zeilen auszuwählen, gibt Hive nur eine "OK" -Nachricht ohne Zeilen zurück.Hive liest keine von Spark generierten partitionierten Parkettdateien.

Ich kann die partitionierten Parkett-Dateien korrekt in Spark lesen, also nehme ich an, dass sie korrekt generiert wurden. Ich kann diese Dateien auch lesen, wenn ich eine externe Tabelle in der Struktur ohne Partitionierung erstelle.

Hat jemand einen Vorschlag?

Meine Umwelt ist:

  • Cluster EMR 4.1.0
  • Hive 1.0.0
  • Funken 1.5.0
  • Hue 3.7.1
  • Parkett Dateien in gespeichert werden ein S3-Bucket (s3: // staging-dev/test/ttfourfieldspart2/Jahr = 2013/Monat = 11)

My Spark-Konfigurationsdatei hat die folgenden Parameter (/etc/spark/conf.dist/spark-defaults.conf):

spark.master yarn 
spark.driver.extraClassPath /etc/hadoop/conf:/etc/hive/conf:/usr/lib/hadoop/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-yarn/*:/usr/lib/hadoop-lzo/lib/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/* 
spark.driver.extraLibraryPath /usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native 
spark.executor.extraClassPath /etc/hadoop/conf:/etc/hive/conf:/usr/lib/hadoop/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-yarn/*:/usr/lib/hadoop-lzo/lib/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/* 
spark.executor.extraLibraryPath /usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native 
spark.eventLog.enabled true 
spark.eventLog.dir hdfs:///var/log/spark/apps 
spark.history.fs.logDirectory hdfs:///var/log/spark/apps 
spark.yarn.historyServer.address ip-10-37-161-246.ec2.internal:18080 
spark.history.ui.port 18080 
spark.shuffle.service.enabled true 
spark.driver.extraJavaOptions -Dlog4j.configuration=file:///etc/spark/conf/log4j.properties -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=512M -XX:OnOutOfMemoryError='kill -9 %p' 
spark.executor.extraJavaOptions -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p' 
spark.executor.memory 4G 
spark.driver.memory 4G 
spark.dynamicAllocation.enabled true 
spark.dynamicAllocation.maxExecutors 100 
spark.dynamicAllocation.minExecutors 1 

Hive Konfigurationsdatei hat die folgenden Parameter (/ etc/Nest/conf/hive-site.xml):

<configuration> 

<!-- Hive Configuration can either be stored in this file or in the hadoop configuration files --> 
<!-- that are implied by Hadoop setup variables.            --> 
<!-- Aside from Hadoop setup variables - this file is provided as a convenience so that Hive --> 
<!-- users do not have to edit hadoop configuration files (that may be managed as a centralized --> 
<!-- resource).                     --> 

<!-- Hive Execution Parameters --> 


<property> 
    <name>hbase.zookeeper.quorum</name> 
    <value>ip-10-xx-xxx-xxx.ec2.internal</value> 
    <description>http://wiki.apache.org/hadoop/Hive/HBaseIntegration</description> 
</property> 

<property> 
    <name>hive.execution.engine</name> 
    <value>mr</value> 
</property> 

    <property> 
    <name>fs.defaultFS</name> 
    <value>hdfs://ip-10-xx-xxx-xxx.ec2.internal:8020</value> 
    </property> 

<property> 
    <name>hive.metastore.uris</name> 
    <value>thrift://ip-10-xx-xxx-xxx.ec2.internal:9083</value> 
    <description>JDBC connect string for a JDBC metastore</description> 
</property> 

<property> 
    <name>javax.jdo.option.ConnectionURL</name> 
    <value>jdbc:mysql://ip-10-xx-xxx-xxx.ec2.internal:3306/hive?createDatabaseIfNotExist=true</value> 
    <description>username to use against metastore database</description> 
</property> 

<property> 
    <name>javax.jdo.option.ConnectionDriverName</name> 
    <value>org.mariadb.jdbc.Driver</value> 
    <description>username to use against metastore database</description> 
</property> 

<property> 
    <name>javax.jdo.option.ConnectionUserName</name> 
    <value>hive</value> 
    <description>username to use against metastore database</description> 
</property> 

<property> 
    <name>javax.jdo.option.ConnectionPassword</name> 
    <value>1R72JFCDG5XaaDTB</value> 
    <description>password to use against metastore database</description> 
</property> 

    <property> 
    <name>datanucleus.fixedDatastore</name> 
    <value>true</value> 
    </property> 

    <property> 
    <name>mapred.reduce.tasks</name> 
    <value>-1</value> 
    </property> 

    <property> 
    <name>mapred.max.split.size</name> 
    <value>256000000</value> 
    </property> 

    <property> 
    <name>hive.metastore.connect.retries</name> 
    <value>5</value> 
    </property> 

    <property> 
    <name>hive.optimize.sort.dynamic.partition</name> 
    <value>true</value> 
    </property> 

    <property><name>hive.exec.dynamic.partition</name><value>true</value></property> 
    <property><name>hive.exec.dynamic.partition.mode</name><value>nonstrict</value></property> 
    <property><name>hive.exec.max.dynamic.partitions</name><value>10000</value></property> 
    <property><name>hive.exec.max.dynamic.partitions.pernode</name><value>500</value></property> 

</configuration> 

Mein python-Code, der die partitionierten Parkett Datei lautet:

from pyspark import * 
from pyspark.sql import * 
from pyspark.sql.types import * 
from pyspark.sql.functions import * 

df7 = sqlContext.read.parquet('s3://staging-dev/test/ttfourfieldspart2/') 

Das Parkett Dateischema von Spark gedruckt:

>>> df7.schema 
StructType(List(StructField(transactionid,StringType,true),StructField(eventts,TimestampType,true),StructField(year,IntegerType,true),StructField(month,IntegerType,true))) 

>>> df7.printSchema() 
root 
|-- transactionid: string (nullable = true) 
|-- eventts: timestamp (nullable = true) 
|-- year: integer (nullable = true) 
|-- month: integer (nullable = true) 

>>> df7.show(10) 
+--------------------+--------------------+----+-----+ 
|  transactionid|    eventts|year|month| 
+--------------------+--------------------+----+-----+ 
|f7018907-ed3d-49b...|2013-11-21 18:41:...|2013| 11| 
|f6d95a5f-d4ba-489...|2013-11-21 18:41:...|2013| 11| 
|02b2a715-6e15-4bb...|2013-11-21 18:41:...|2013| 11| 
|0e908c0f-7d63-48c...|2013-11-21 18:41:...|2013| 11| 
|f83e30f9-950a-4b9...|2013-11-21 18:41:...|2013| 11| 
|3425e4ea-b715-476...|2013-11-21 18:41:...|2013| 11| 
|a20a6aeb-da4f-4fd...|2013-11-21 18:41:...|2013| 11| 
|d2f57e6f-889b-49b...|2013-11-21 18:41:...|2013| 11| 
|46f2eda5-408e-44e...|2013-11-21 18:41:...|2013| 11| 
|36fb8b79-b2b5-493...|2013-11-21 18:41:...|2013| 11| 
+--------------------+--------------------+----+-----+ 
only showing top 10 rows 

Die create table in Hive:

create external table if not exists t3(
    transactionid string, 
    eventts timestamp) 
partitioned by (year int, month int) 
stored as parquet 
location 's3://staging-dev/test/ttfourfieldspart2/'; 

Wenn ich versuche, einige Zeilen in Hive zu wählen, nicht wahr Zeilen zurücksenden:

hive> select * from t3 limit 10; 
OK 
Time taken: 0.027 seconds 
hive> 

Antwort

9

Ich habe endlich das Problem gefunden. Wenn Sie in Hive Tabellen erstellen, in denen bereits partitionierte Daten in S3 oder HDFS vorhanden sind, müssen Sie einen Befehl ausführen, um den Hive Metastore mit der Partitionsstruktur der Tabelle zu aktualisieren. Werfen Sie einen Blick hier: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-RecoverPartitions(MSCKREPAIRTABLE)

The commands are: 

MSCK REPAIR TABLE table_name; 


And on Hive running in Amazon EMR you can use: 

ALTER TABLE table_name RECOVER PARTITIONS; 
+0

Dieses auch für mich gearbeitet. Brand neue Tabelle, und vor Auswahl würde alle Daten zurückgeben, musste es reparieren ... K ... Danke! – jhnclvr