6

Ich möchte PubSub-Nachrichten aus einem Zweig mithilfe von Google Cloud Dataflow in eine BigQuery-Tabelle einfügen. Alles funktioniert gut, aber in der BigQuery-Tabelle kann ich nicht lesbare Strings wie "߈ " sehen. Dies ist meine Pipeline:PubSub-Nachrichten über Google Cloud Dataflow in BigQuery einfügen

p.apply(PubsubIO.Read.named("ReadFromPubsub").topic("projects/project-name/topics/topic-name")) 
.apply(ParDo.named("Transformation").of(new StringToRowConverter())) 
.apply(BigQueryIO.Write.named("Write into BigQuery").to("project-name:dataset-name.table") 
    .withSchema(schema) 
    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)) 

und meine einfache StringToRowConverter Funktion ist:

class StringToRowConverter extends DoFn<String, TableRow> { 
private static final long serialVersionUID = 0; 

@Override 
public void processElement(ProcessContext c) { 
    for (String word : c.element().split(",")) { 
     if (!word.isEmpty()) { 
      System.out.println(word); 
     c.output(new TableRow().set("data", word)); 
     } 
    } 
} 
} 

Und dies ist die Botschaft, die ich durch eine POST-Anfrage gesendet:

POST https://pubsub.googleapis.com/v1/projects/project-name/topics/topic-name:publish 
{ 
"messages": [ 
    { 
    "attributes":{ 
"key": "tablet, smartphone, desktop", 
"value": "eng" 
    }, 
    "data": "34gf5ert" 
    } 
] 
} 

Was bin ich fehlt ? Vielen Dank!

+0

[Dies] (https://github.com/bomboradata/pubsub-to-bigquery) ist eine Open Source, die Sie verwenden können, um Pub/Sub zu BQ zu leiten – PUG

Antwort

6

Laut https://cloud.google.com/pubsub/reference/rest/v1/PubsubMessage ist die JSON-Nutzlast der Pubsub-Nachricht base64-codiert. PubsubIO in Dataflow verwendet standardmäßig den String-UTF8-Codierer. Die Beispielzeichenfolge, die Sie "34gf5ert" zur Verfügung gestellt haben, wenn base64-decodiert und dann als eine UTF-8-Zeichenfolge interpretiert wird, gibt genau "߈ ".

2

Dies ist, wie ich meine PubSub Nachrichten am Auspacken:

@Override 
public void processElement(ProcessContext c) { 

    String json = c.element(); 

    HashMap<String,String> items = new Gson().fromJson(json, new TypeToken<HashMap<String, String>>(){}.getType()); 
    String unpacked = items.get("JsonKey"); 

Hoffnung liegt es an Ihnen nützlich.