Ich habe Spark-Streaming-Job und in diesem einige bin Aggregation, jetzt möchte ich diese Datensätze in HBase einfügen, aber es ist nicht typisch einfügen Ich möchte UPSERT tun, wenn für Zeilenschlüssel verfügbar ist als in Spalte Werte Summe (newvalue + oldvalue) sollte passieren. Hat jemand den Pseudo-Code in Java teilen, wie kann ich das erreichen?Hbase Upsert mit Spark
Antwort
So etwas ...
byte[] rowKey = null; // Provided
Table table = null; // Provided
long newValue = 1000; // Provided
byte[] FAMILY = new byte[]{0}; // Defined
byte[] QUALIFIER = new byte[]{1}; // Defined
try {
Get get = new Get(rowKey);
Result result = table.get(get);
if (!result.isEmpty()) {
Cell cell = result.getColumnLatestCell(FAMILY, QUALIFIER);
newValue += Bytes.bytesToLong(cell.getValueArray(),cell.getValueOffset());
}
Put put = new Put(rowKey);
put.addColumn(FAMILY,QUALIFIER,Bytes.toBytes(newValue));
table.put(put);
} catch (Exception e) {
// Handle Exceptions...
}
Wir (Splice-Maschine [Open Source]) haben ein paar ziemlich coole Tutorials Spark-Streaming verwenden, um Daten in HBase.
Überprüfen Sie it aus. könnte interessant sein.
fand ich die Art und Weise unten sind die Pseudo-Code: -
=========== Für UPSERT (Update und Insert) ===========
public void HbaseUpsert (javaRDD < Row> javaRDD) throws IOException, ServiceException {
JavaPairRDD < ImmutableBytesWritable, Put > hbasePuts1 = javaRDD.mapToPair(
new PairFunction < Row, ImmutableBytesWritable, Put >() {
private static final long serialVersionUID = 1L;
public Tuple2 < ImmutableBytesWritable, Put > call(Row row) throws Exception {
if(HbaseConfigurationReader.getInstance()!=null)
{
HTable table = new HTable(HbaseConfigurationReader.getInstance().initializeHbaseConfiguration(), "TEST");
try {
String Column1 = row.getString(1);
long Column2 = row.getLong(2);
Get get = new Get(Bytes.toBytes(row.getString(0)));
Result result = table.get(get);
if (!result.isEmpty()) {
Cell cell = result.getColumnLatestCell(Bytes.toBytes("cf1"), Bytes.toBytes("Column2"));
Column2 += Bytes.toLong(cell.getValueArray(),cell.getValueOffset());
}
Put put = new Put(Bytes.toBytes(row.getString(0)));
put.add(Bytes.toBytes("cf1"), Bytes.toBytes("Column1"), Bytes.toBytes(Column1));
put.add(Bytes.toBytes("cf1"), Bytes.toBytes("Column2"), Bytes.toBytes(Column2));
return new Tuple2 < ImmutableBytesWritable, Put > (new ImmutableBytesWritable(), put);
} catch (Exception e) {
e.printStackTrace();
}
finally {
table.close();
}
}
return null;
}
});
hbasePuts1.saveAsNewAPIHadoopDataset(HbaseConfigurationReader.initializeHbaseConfiguration());
}
============== Für Konfiguration ========== ===== Öffentliche Klasse HbaseConfigurationReader implementiert Serializable {
static Job newAPIJobConfiguration1 =null;
private static Configuration conf =null;
private static HTable table= null;
private static HbaseConfigurationReader instance= null;
private static Log logger= LogFactory.getLog(HbaseConfigurationReader.class);
HbaseConfigurationReader() throws MasterNotRunningException, ZooKeeperConnectionException, ServiceException, IOException { initializeHbaseConfiguration(); }
public static HbaseConfigurationReader getInstance() throws MasterNotRunningException, ZooKeeperConnectionException, ServiceException, IOException {
if (instance == null) {
instance = new HbaseConfigurationReader();
}
return instance;
} public static Konfiguration initializeHbaseConfiguration() throws MasterNotRunningException, ZooKeeperConnectionException, ServiceException, IOException { if (conf = = null) { conf = HBaseConfiguration.create(); conf.set ("hbase.zookeeper.quorum", "localhost"); conf.set ("hbase.zookeeper.property.clientPort", "2181"); HBaseAdmin.checkHBaseAvailable (conf); Tabelle = neue HTable (conf, "TEST"); conf.set (org.apache.hadoop.hbase.mapreduce.TableInputFormat.INPUT_TABLE, "TEST"); versuchen { newAPIJobConfiguration1 = Job.getInstance (conf); newAPIJobConfiguration1.getConfiguration(). Set (TableOutputFormat.OUTPUT_TABLE, "TEST"); newAPIJobConfiguration1.setOutputFormatClass (org.apache.hadoop.hbase.mapreduce.TableOutputFormat.class); } catch (IOException e) { e.printStackTrace(); }
}
else
logger.info("Configuration comes null");
return newAPIJobConfiguration1.getConfiguration();
} }
Auch möchten Sie vielleicht ein HBase Schritt abhängig von der Art prüfen, den Sie hinzufügen ... –
Hallo John, Vielen Dank für Ihre Antwort seine Arbeits aber die Zeit für 500 nehmen mb seine Einnahme etwa 1 Stunde in Funken gibt es irgendeine Weise, die Sie über die Bulk-update Art der Sache wissen? – ankitbeohar90
Ich erkunde Splice Machine, besuchte auch Webinar von "Lambda-in-a-box" und schickte eine E-Mail an "Thomas Ryan" und wartete auf seine Antwort, ich schätze Splice Machine's Arbeit, können Sie mir bitte sagen, welches Tutorial ich sollte check weil ich viele überprüft habe, aber keine finden konnte. Nochmals vielen Dank eine Tonne .... – ankitbeohar90