Ich schreibe einen Frühling Batch-Job. Ich implementiere benutzerdefinierte Schreiber mitWie man den Schreiber für jede Jobinstanz im Frühjahr starten lässt Batch
Ich habe Felder, die für jede Instanz eindeutig sein müssen. Aber ich konnte sehen, dass diese Klasse nur einmal initiiert wurde. Rest-Jobs haben dieselbe Instanz der Writer-Klasse. Wo meine benutzerdefinierten Leser und Prozessoren für jeden Job initiiert werden. Unten finden Sie meine Jobkonfigurationen. Wie kann ich das gleiche Verhalten für den Autor erreichen?
@Bean
@Scope("job")
public ZipMultiResourceItemReader reader(@Value("#{jobParameters[fileName]}") String fileName, @Value("#{jobParameters[s3SourceFolderPrefix]}") String s3SourceFolderPrefix, @Value("#{jobParameters[timeStamp]}") long timeStamp, com.fastretailing.catalogPlatformSCMProducer.service.ConfigurationService confService) {
FlatFileItemReader faltFileReader = new FlatFileItemReader();
ZipMultiResourceItemReader zipReader = new ZipMultiResourceItemReader();
Resource[] resArray = new Resource[1];
resArray[0] = new FileSystemResource(new File(fileName));
zipReader.setArchives(resArray);
DefaultLineMapper<ProducerMessage> lineMapper = new DefaultLineMapper<ProducerMessage>();
lineMapper.setLineTokenizer(new DelimitedLineTokenizer());
CSVFieldMapper csvFieldMapper = new CSVFieldMapper(fileName, s3SourceFolderPrefix, timeStamp, confService);
lineMapper.setFieldSetMapper(csvFieldMapper);
faltFileReader.setLineMapper(lineMapper);
zipReader.setDelegate(faltFileReader);
return zipReader;
}
@Bean
@Scope("job")
public ItemProcessor<ProducerMessage, ProducerMessage> processor(@Value("#{jobParameters[timeStamp]}") long timeStamp) {
ProducerProcessor processor = new ProducerProcessor();
processor.setS3FileTimeStamp(timeStamp);
return processor;
}
@Bean
@ConfigurationProperties
public ItemWriter<ProducerMessage> writer() {
return new KafkaClientWriter();
}
@Bean
public Step step1(StepBuilderFactory stepBuilderFactory,
ItemReader reader, ItemWriter writer,
ItemProcessor processor, @Value("${reader.chunkSize}")
int chunkSize) {
LOGGER.info("Step configuration loaded with chunk size {}", chunkSize);
return stepBuilderFactory.get("step1")
.chunk(chunkSize).reader(reader)
.processor(processor).writer(writer)
.build();
}
@Bean
public StepScope stepScope() {
final StepScope stepScope = new StepScope();
stepScope.setAutoProxy(true);
return stepScope;
}
@Bean
public JobScope jobScope() {
final JobScope jobScope = new JobScope();
return jobScope;
}
@Bean
public Configuration configuration() {
return new Configuration();
}
Ich habe versucht, den Schreiber mit Aufgabenbereich zu machen. Aber in diesem Fall wird open nicht aufgerufen. Hier mache ich einige Initialisierungen.
Ändern Sie den Rückgabetyp in 'KafkaClientWriter' und fügen Sie' @Scope ("job") 'hinzu. –
@M. Denium: Hervorragend. Danke. Es hat funktioniert. Öffne und schließe jetzt, um richtig angerufen zu werden. Warum "@Scope (" job ")' mit dem Rückgabetyp 'ItemWriter' funktionierte nicht? Ich erhielt ein benutzerdefiniertes Feld als null von der write-Methode, die von open gesetzt werden sollte. –