2016-04-25 4 views
0

Ich lese von MongoDB mit einer benutzerdefinierten Iteamreader-Bean. Mein Leser gibt Daten gemäß der im Leser definierten Seitengröße (50) zurück. Aber der Prozessor bekommt nur die ersten 31 Datenzeilen von 50. Ich habe verschiedene Chunk-Größen ausprobiert, aber einige, wie der Prozessor nur die ersten 31 Zeilen bekommt.spring batch: itemprocessor erhält nicht alle vom Leser gelesenen Daten

Bitte helfen Sie mir den Fehler in finden ... Ich habe versucht, Hörer, aber nicht in der Lage Problem zu finden ..

---- Config XML ----

<?xml version="1.0" encoding="UTF-8"?> 

    <context:property-placeholder location="classpath:application.properties" /> 

    <context:component-scan base-package="com.xxx.yyy.batch.kernel" /> 
    <context:component-scan base-package="com.xxx.yyy.batch.dao" /> 

    <context:annotation-config /> 

    <!-- Enable Annotation based Declarative Transaction Management --> 
    <tx:annotation-driven proxy-target-class="true" 
     transaction-manager="transactionManager" /> 

    <!-- Creating TransactionManager Bean, since JDBC we are creating of type 
     DataSourceTransactionManager --> 
    <bean id="transactionManager" 
     class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> 
     <property name="dataSource" ref="dataSource" /> 
    </bean> 

    <batch:job id="txnLogJob" job-repository="jobRepository" 
     restartable="true"> 
     <batch:step id="txnload"> 
      <tasklet allow-start-if-complete="true"> 
       <chunk reader="txnLogItemReader" writer="txnLogItemWriter" 
        processor="txnLogProcessor" commit-interval="20" />    
      </tasklet> 
     </batch:step> 

     <batch:listeners> 
      <batch:listener ref="completionListener" /> 
     </batch:listeners> 
    </batch:job> 

    <bean id="completionListener" 
     class="com.xxx.yyy.batch.listeners.JobCompletionNotificationListener" /> 


    <bean id="jobParametersDAOImpl" class="com.xxx.yyy.batch.dao.JobParametersDAOImpl" /> 

    <bean id="batchLoader" class="com.xxx.yyy.batch.kernel.BatchLoader" /> 

    <bean id="batchjobParameter" class="com.xxx.yyy.batch.dao.Batch_Job_Parameters" /> 



    <bean id="txnLogItemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter" 
     scope="step"> 
     <property name="shouldDeleteIfExists" value="true" /> 
     <property name="resource" value="file:target/test-outputs/output.txt" /> 
     <property name="lineAggregator"> 
      <bean 
       class="org.springframework.batch.item.file.transform.PassThroughLineAggregator" /> 
     </property> 
    </bean> 


    <bean id="txnLogProcessor" 
     class="com.xxx.yyy.batch.processor.MessageContextItemProcessor" /> 

    <bean id="jobLauncher" 
     class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> 
     <property name="jobRepository" ref="jobRepository" /> 
    </bean> 

    <bean id="jobRepository" 
     class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean"> 
     <property name="databaseType" value="MYSQL" /> 
     <property name="dataSource" ref="dataSource" /> 
     <property name="transactionManager" ref="transactionManager" /> 
    </bean> 

    <bean id="dataSource" class="com.xxx.yyy.common.DataSource" 
     destroy-method="close"> 
     <property name="driverClassName" value="${jdbc.driverClassName}" /> 
     <property name="url" value="${jdbc.url}" /> 
     <property name="username" value="${jdbc.username}" /> 
     <property name="password" value="${jdbc.password}" /> 
     <property name="connectionProperties" value="${jdbc.connectionProperties}" /> 
     <property name="initialSize" value="${jdbc.initialSize}" /> 
     <property name="maxTotal" value="${jdbc.maxTotal}" /> 
     <property name="maxIdle" value="${jdbc.maxIdle}" /> 
     <property name="minIdle" value="${jdbc.minIdle}" /> 
     <property name="maxWaitMillis" value="${jdbc.maxWaitMillis}" /> 
     <property name="testOnBorrow" value="${jdbc.testOnBorrow}" /> 
     <property name="testWhileIdle" value="${jdbc.testWhileIdle}" /> 
     <property name="testOnReturn" value="${jdbc.testOnReturn}" /> 
     <property name="validationQuery" value="${jdbc.validationQuery}" /> 
    </bean> 

</beans> 

benutzerdefinierte Leser Bohne:

@Bean 
    public MongoItemReader<MessageContext> txnLogItemReader() { 
     MongoItemReader<MessageContext> reader = new MongoItemReader<MessageContext>(); 
     reader.setPageSize(50); 
     reader.setCollection("txnlog"); 
     reader.setTemplate(mongoTemplate); 

     String query = null ; 
     query = "{ \"audit_info.created_on\": { $gt: { \"$date\" : ?0 }, $lte: { \"$date\" : ?1 } }, " 
         + "$and: [ { \"processing_status\": { $in: [?2] } } ] }" ; 




     reader.setQuery(query);   

     //Timestamp to_date_timestamp = jobParametersDAOImpl.getCurrentTimeStamp() ;   

     Batch_Job_Parameters job_param = jobParametersDAOImpl.getBatchJobParameters() ; 
     String from_date = job_param.getFrom_date().toString() ; 
     String [] splitstr = from_date.split(" ") ; 
     from_date = splitstr[0]+"T"+splitstr[1]+"00Z" ; 

     String to_date = job_param.getTo_date().toString() ; 
     splitstr = to_date.split(" ") ; 
     to_date = splitstr[0]+"T"+splitstr[1]+"00Z" ; 

     List<Object> parameterValues = new ArrayList<Object>() ; 
     parameterValues.add(from_date) ; 
     parameterValues.add(to_date) ;  
     parameterValues.add(job_param.getTxnlog_status_list()) ;  

     reader.setParameterValues(parameterValues); 

     reader.setTargetType(com....MessageContext.class); 
     Map<String,Direction> sorts = new HashMap<String,Direction>() ; 
     sorts.put("audit_info.created_on", org.springframework.data.domain.Sort.Direction.ASC) ; 
     reader.setSort(sorts); 

     return reader; 
    } 

Antwort

0

aktualisiert Antwort da verschob sich die Frage konzentrieren

Ich hätte die MessageContextReadConverter nicht zurückgeben null und stattdessen Validierung in der Processor. Wenn Processornull zurückgibt, erhöht es nur die Anzahl der Filter, anstatt die Step zu verwirren und zu denken, dass keine weiteren Zeilen zu verarbeiten sind.

+0

MongoDB enthält mehr als 500 Zeilen und wie ich bereits erwähnt habe, kann ich alle 50 gelesenen Zeilen sehen. –

+0

MongoDB enthält mehr als 500 Zeilen, ich kann alle 50 Zeilen lesen. Ich habe Listener (Schritt, Chunk, lesen) in der Konfiguration. Ausgabe des Zuhörers ist: „vor dem Schritt“ „vor chunk“ 50 Zeilen von Daten „vor lesen“ „nach dem Lesen“ 19mal: „vor lesen“ „nach dem Lesen“ 20 Zeilen von Daten durch den Prozessor und writer verarbeitet "after chunk" "vor chunk" 10mal: "vor lesen" "nach dem lesen" durch den Prozessor und writer 10 Datenzeilen verarbeitet "vor lesen" "after chunk" "nach dem Schritt" –

0

Ich habe implementiert MessageContextReadConverter implementiert Converter und ich gebe Null zurück, falls die Konvertierung nicht durchgeführt wird. Im Fall von null übergibt die read() -Methode keine Elemente mehr an Processor/Writer. Issue is Converter lässt keine Ausnahme zu. Suchen, wie man diesen Teil auflöst.