2016-03-23 2 views
0

Ich schreibe einen Frühling Batch-Job. Ich implementiere benutzerdefinierte Schreiber mitWie man den Schreiber für jede Jobinstanz im Frühjahr starten lässt Batch

Ich habe Felder, die für jede Instanz eindeutig sein müssen. Aber ich konnte sehen, dass diese Klasse nur einmal initiiert wurde. Rest-Jobs haben dieselbe Instanz der Writer-Klasse. Wo meine benutzerdefinierten Leser und Prozessoren für jeden Job initiiert werden. Unten finden Sie meine Jobkonfigurationen. Wie kann ich das gleiche Verhalten für den Autor erreichen?

@Bean 
     @Scope("job") 
     public ZipMultiResourceItemReader reader(@Value("#{jobParameters[fileName]}") String fileName, @Value("#{jobParameters[s3SourceFolderPrefix]}") String s3SourceFolderPrefix, @Value("#{jobParameters[timeStamp]}") long timeStamp, com.fastretailing.catalogPlatformSCMProducer.service.ConfigurationService confService) { 
      FlatFileItemReader faltFileReader = new FlatFileItemReader(); 
      ZipMultiResourceItemReader zipReader = new ZipMultiResourceItemReader(); 
      Resource[] resArray = new Resource[1]; 
      resArray[0] = new FileSystemResource(new File(fileName)); 
      zipReader.setArchives(resArray); 
      DefaultLineMapper<ProducerMessage> lineMapper = new DefaultLineMapper<ProducerMessage>(); 
      lineMapper.setLineTokenizer(new DelimitedLineTokenizer()); 
      CSVFieldMapper csvFieldMapper = new CSVFieldMapper(fileName, s3SourceFolderPrefix, timeStamp, confService); 
      lineMapper.setFieldSetMapper(csvFieldMapper); 
      faltFileReader.setLineMapper(lineMapper); 
      zipReader.setDelegate(faltFileReader); 
      return zipReader; 
     } 

     @Bean 
     @Scope("job") 
     public ItemProcessor<ProducerMessage, ProducerMessage> processor(@Value("#{jobParameters[timeStamp]}") long timeStamp) { 
      ProducerProcessor processor = new ProducerProcessor(); 
      processor.setS3FileTimeStamp(timeStamp); 
      return processor; 
     } 

     @Bean 
     @ConfigurationProperties 
     public ItemWriter<ProducerMessage> writer() { 
      return new KafkaClientWriter(); 
     } 

     @Bean 
     public Step step1(StepBuilderFactory stepBuilderFactory, 
          ItemReader reader, ItemWriter writer, 
          ItemProcessor processor, @Value("${reader.chunkSize}") 
          int chunkSize) { 
      LOGGER.info("Step configuration loaded with chunk size {}", chunkSize); 
      return stepBuilderFactory.get("step1") 
        .chunk(chunkSize).reader(reader) 
        .processor(processor).writer(writer) 
        .build(); 
     } 

     @Bean 
     public StepScope stepScope() { 
      final StepScope stepScope = new StepScope(); 
      stepScope.setAutoProxy(true); 
      return stepScope; 
     } 

     @Bean 
     public JobScope jobScope() { 
      final JobScope jobScope = new JobScope(); 
      return jobScope; 
     } 

     @Bean 
     public Configuration configuration() { 
      return new Configuration(); 
     } 

Ich habe versucht, den Schreiber mit Aufgabenbereich zu machen. Aber in diesem Fall wird open nicht aufgerufen. Hier mache ich einige Initialisierungen.

+0

Ändern Sie den Rückgabetyp in 'KafkaClientWriter' und fügen Sie' @Scope ("job") 'hinzu. –

+0

@M. Denium: Hervorragend. Danke. Es hat funktioniert. Öffne und schließe jetzt, um richtig angerufen zu werden. Warum "@Scope (" job ")' mit dem Rückgabetyp 'ItemWriter ' funktionierte nicht? Ich erhielt ein benutzerdefiniertes Feld als null von der write-Methode, die von open gesetzt werden sollte. –

Antwort

0

Bei der java-basierten Konfiguration und einem Bereichs-Proxy wird der Rückgabetyp der Methode erkannt und dafür ein Proxy erstellt. Wenn Sie also ItemWriter zurückgeben, erhalten Sie einen JDK-Proxy, der nur ItemWriter implementiert, während Ihre open-Methode auf der ItemStream-Schnittstelle ist. Da diese Schnittstelle nicht auf dem Proxy enthalten ist, gibt es keine Möglichkeit, die Methode aufzurufen.

Entweder ändern Sie den Rückgabetyp auf KafkaClientWriter oder ItemStreamWriter< ProducerMessage> (vorausgesetzt, die KafkaCLientWriter implementiert diese Methode). Als nächstes fügen Sie @Scope("job") hinzu, und Sie sollten Ihre open-Methode erneut mit einem korrekt beschränkten Schreiber aufgerufen haben.