2016-07-21 10 views
0

Hallo ich muss eine Tabelle in Phoenix von einem Funken Job erstellen. Ich habe 2 Möglichkeiten unten versucht, aber keiner von ihnen funktioniert, scheint, das wird immer noch nicht unterstützt.create table in phoenix von spark

1) Dataframe.write erfordert nach wie vor, dass die Tabellen

df.write.format("org.apache.phoenix.spark").mode("overwrite").option("table", schemaName.toUpperCase + "." + tableName.toUpperCase ).option("zkUrl", hbaseQuorum).save() 

2), wenn wir nach Phoenix durch JDBC verbinden, und versuchen, die CREATE statemnt ausführen, dann erhalten wir einen Parsing-Fehler (gleiche erstellen zuvor

existieren arbeitet in Phoenix)

var ddlCode="create table test (mykey integer not null primary key, mycolumn varchar) " 

val driver = "org.apache.phoenix.jdbc.PhoenixDriver" 
val jdbcConnProps = new Properties() 
jdbcConnProps.setProperty("driver", driver); 
val jdbcConnString = "jdbc:phoenix:hostname:2181/hbase-unsecure" 
sqlContext.read.jdbc(jdbcConnString, ddlCode, jdbcConnProps) 

Fehler: org.apache.phoenix.exception.PhoenixParserException: eRROR 601 (42P00): Syntaxfehler. Begegnet „create“ in Zeile 1, Spalte 15.

Wer mit ähnlichen Herausforderungen, die es anders machen verwaltet?

+0

Haben Sie einen Ausweg für das oben genannte Problem gefunden? Ich habe auch nach dem gesucht. – pjain

Antwort

0

Ich habe endlich in einer Lösung dafür gearbeitet. Grundsätzlich glaube ich, dass ich falsch lag, indem ich versuchte, die SQLContext-Lesemethode dafür zu verwenden. Ich denke, dass diese Methode nur dazu dient, Datenquellen zu "lesen". Die Art und Weise zu arbeiten war es im Grunde eine JDBC-Standardverbindung gegen Phoenix zu öffnen:

var ddlCode="create table test (mykey integer not null primary key, mycolumn varchar) " 
val jdbcConnString = "jdbc:hostname:2181/hbase-unsecure" 
val user="USER" 
val pass="PASS" 
var connection:Connection = null 
Class.forName(driver) 
connection = DriverManager.getConnection(jdbcConnString, user, pass) 
val statement = connection.createStatement() 
statement.executeUpdate(ddlCode)