2015-09-08 11 views
7

Während Apache Flink mit dem folgenden Code:

DataStream<List<String>> result = source.window(Time.of(1, TimeUnit.SECONDS)).mapWindow(new WindowMapFunction<String, List<String>>() { 

    @Override 
    public void mapWindow(Iterable<String> iterable, Collector<List<String>> collector) throws Exception { 
     List<String> top5 = Ordering.natural().greatestOf(iterable, 5); 
     collector.collect(top5); 
    } 
}).flatten(); 

Ich habe diese Ausnahme

Caused by: java.lang.UnsupportedOperationException 
    at java.util.Collections$UnmodifiableCollection.add(Collections.java:1055) 
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) 
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) 
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) 
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:211) 
    at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:110) 
    at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:41) 
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) 
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:125) 
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:127) 
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:56) 
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:172) 
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) 
    at java.lang.Thread.run(Thread.java:745) 

Wie kann ich die UnmodifiableCollection mit Flink ue?

Antwort

9

Das Problem ist, dass die Standardeinstellung CollectionSerializer von Kryo die Sammlung nicht erneut deserialisieren kann, weil es nicht änderbar ist (der .add() Aufruf schlägt fehl).

Um das Problem zu beheben, können wir das UnmodifiableCollectionsSerializer aus dem Projekt kryo-serializers verwenden. Flink hängt transitiv vom Projekt ab, daher muss es nicht als Abhängigkeit hinzugefügt werden.

Als nächstes müssen wir den Serializer mit Flink Kryo Instanzen registrieren.

StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); 
Class<?> unmodColl = Class.forName("java.util.Collections$UnmodifiableCollection"); 
see.getConfig().addDefaultKryoSerializer(unmodColl, UnmodifiableCollectionsSerializer.class); 

Normalerweise müssen wir Class.forName() nicht verlangen einen Serializer Registrierung, aber in diesem Fall ist java.util.Collections$UnmodifiableCollection Paket sichtbar, so dass sie nicht direkt auf die Klasse zugreifen.

+4

danke für diese aufschlussreiche und schnelle Antwort. Ihre Antwortgeschwindigkeit ist atemberaubend :-) –