2016-07-26 66 views
0

Ich habe einen FTP-Eingangsadapter, der auf mehreren Tomcat-Instanzen bereitgestellt wird. Das Problem ist, dass alle Instanzen Dateien aus dem Remote-Verzeichnis ziehen, aber ich brauche eine einzelne Datei, um nur einmal nicht von allen Instanzen gezogen werden.FTP-Eingangsadapter in einer Clusterumgebung

Gibt es einen Weg dies zu erreichen?

Dies ist der Adapter Definition:

<int-sftp:inbound-channel-adapter id="inboundMeasuremntFtpReceiveAdapter" session-factory="inboundMeasurementSftpSession" 
    auto-startup="${ftp_measurement_autostart}" 
    local-directory="#{'${dir_interface_home}' + '${dir_interface}' + '${dir_inbound_measurement}' + '${dir_data}'}" 
    channel="inboundMeasuremntSftpReceiveChannel" 
    remote-directory="${ftp_measurement_remote_dir}" filename-pattern="*.txt" > 
    <int:poller id="inboundMeasurementSftpPoller" trigger="inboundMeasurementFtpTrigger" max-messages-per-poll="-1" 
       error-channel="inboundMeasurementSftpErrorEnrichChannel"> 
    </int:poller> 
</int-sftp:inbound-channel-adapter> 



Metadata store class 
     package com.deere.componentdatafiles.nondeerefile; 
     import java.io.Closeable; 
     import java.io.Flushable; 
     import java.io.IOException; 
     import net.spy.memcached.AddrUtil; 
     import net.spy.memcached.MemcachedClient; 
     import org.springframework.beans.factory.DisposableBean; 
     import org.springframework.beans.factory.InitializingBean; 
     import org.springframework.integration.metadata.ConcurrentMetadataStore; 
     public class MemcacheMetadatastore implements ConcurrentMetadataStore,InitializingBean, DisposableBean, Closeable, Flushable { 
     MemcachedClient cache = null; 
     public MemcacheMetadatastore() { 
       super();try { 
    cache = new MemcachedClient(AddrUtil.getAddresses("URL")); 
       } catch (IOException e) { 
        // TODO Auto-generated catch block 
        e.printStackTrace();} } 
     @Override 
      public String get(String key) { 
       // TODO Auto-generated method stub 
       return (String) cache.get(key); 

      } 

      @Override 
      public void put(String key, String value) { 

       putIfAbsent(key, value); 
      } 
     @Override 
      public String remove(String key) { 
       // TODO Auto-generated method stub 
       return cache.delete(key).toString(); 
      } 
     @Override 
      public void flush() throws IOException { 
       // TODO Auto-generated method stub 
     } 
     @Override 
      public void close() throws IOException { 
       flush(); 
      } 
     @Override 
      public void destroy() throws Exception { 
       flush(); 
      } 
     @Override 
      public void afterPropertiesSet() throws Exception { 
       // TODO Auto-generated method stub 
     } 
     @Override 
      public String putIfAbsent(String key, String value) { 

       String fileValue = get(key); 
       if (fileValue == null && "".equals(fileValue)) { 
        cache.set(key, 0, value); 
        return null; 
       } 
       return fileValue; 
      } 
      @Override 
      public boolean replace(String arg0, String arg1, String arg2) { 
       // TODO Auto-generated method stub 
       return false; 
      } 
     } 

Konfiguration

<?xml version="1.0" encoding="UTF-8"?> 
<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:int="http://www.springframework.org/schema/integration" 
    xmlns:int-sftp="http://www.springframework.org/schema/integration/sftp" 
    xmlns:int-ftp="http://www.springframework.org/schema/integration/ftp" 
    xmlns:task="http://www.springframework.org/schema/task" 
    xmlns:context="http://www.springframework.org/schema/context" 
    xmlns:int-stream="http://www.springframework.org/schema/integration/stream" 
    xsi:schemaLocation="http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream-4.0.xsd 
     http://www.springframework.org/schema/integration/sftp http://www.springframework.org/schema/integration/sftp/spring-integration-sftp.xsd 
     http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd 
     http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd 
     http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd 
     http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd 
     http://www.springframework.org/schema/integration/ftp http://www.springframework.org/schema/integration/ftp/spring-integration-ftp.xsd"> 

<bean id="ftpClientFactory" 
     class="org.springframework.integration.ftp.session.DefaultFtpSessionFactory"> 
     <property name="host" value="****"/> 
     <property name="username" value="****"/> 
     <property name="password" value="****"/> 
     <property name="clientMode" value="0"/> 
     <property name="fileType" value="2"/> 
     <property name="bufferSize" value="100000"/> 
    </bean> 


    <int-ftp:inbound-channel-adapter id="ftpInbound" 
     channel="receiveChannel" session-factory="ftpClientFactory" 
     auto-create-local-directory="true" delete-remote-files="false" 
     remote-directory="/OUT/SDI402_CARATT_JD" remote-file-separator="/" 
     filter="compositeFilter" 
     local-directory="." 
     local-filter="acceptAll"> 
     <int:poller fixed-rate="5000" max-messages-per-poll="1"> 
        <!-- <int:transactional synchronization-factory="syncFactory"/> --> 
       </int:poller> 
    </int-ftp:inbound-channel-adapter> 

    <int:channel id="receiveChannel" /> 
    <int:channel id="afterSuccessDeleteChannel" /> 
     <int:service-activator 
     id="nonDeereXmlServiceActivator" input-channel="receiveChannel" 
     ref="controllerListener" method="listen1" /> 

    <int:service-activator input-channel="receiveChannel" expression="T(java.lang.System).out.println(payload.toString())"> 
     <int:request-handler-advice-chain> 
      <bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice"> 
       <property name="onSuccessExpression" value="payload.delete()" /> 
       <property name="successChannel" ref="afterSuccessDeleteChannel" /> 
       <property name="onFailureExpression" value="payload.rename('/tmp/bad/' + payload.name)" /> 
      </bean> 
     </int:request-handler-advice-chain> 
    </int:service-activator> 

<!-- <int:transformer input-channel="afterSuccessDeleteChannel" output-channel="stdout" 
     expression="'Removal of ' + inputMessage.payload.absolutePath + ' after transfer ' + (payload ? 'succeeded' : 'failed')" /> 

    <int-stream:stdout-channel-adapter id="stdout" append-newline="true"/> --> 

    <bean id="acceptAll" class="org.springframework.integration.file.filters.AcceptAllFileListFilter" /> 

    <bean id="compositeFilter" class="org.springframework.integration.file.filters.CompositeFileListFilter"> 
     <constructor-arg> 
      <list> 
       <bean class="org.springframework.integration.ftp.filters.FtpSimplePatternFileListFilter"> 
        <!-- <constructor-arg value="*.xml" /> --> 
       </bean> 
       <bean class="org.springframework.integration.ftp.filters.FtpPersistentAcceptOnceFileListFilter"> 
        <constructor-arg name="store" ref="metadataStore"/> 
        <constructor-arg value="foo/bar/"/> 
       </bean> 
      </list> 
     </constructor-arg> 
    </bean> 

    <bean name="metadataStore" class="com.deere.componentdatafiles.nondeerefile.MemcacheMetadatastore"> 
     <!-- <property name="baseDirectory" value="/tmp/"/> --> 
    </bean> 

</beans> 

Antwort

0

Sie benötigen einen FtpPersistentAcceptOnceFileListFilter mit einigen gemeinsamen Speicher, wie redis zu verwenden, Mongo, Tierpfleger. Siehe the documentation.

+0

Kann ich es erreichen, ohne einen neuen Server mit Hilfe von PropertiesPersistingMetadataStore hinzuzufügen? –

+0

Sie benötigen eine Art gemeinsamen Speicher für einen Cluster - das PPMS ist nicht geeignet. Es ist nur für die Verwendung in einem lokalen Dateisystem gedacht. Es gibt keinen Auslösemechanismus, um zu sagen, dass sich die Datei in einem freigegebenen Dateisystem geändert hat. Sie könnten Ihren eigenen Metadatenspeicher schreiben, um den Status an die anderen Mitglieder des Clusters zu übertragen. –

+0

Danke Gary Ich versuche Metadaten-Store mit Memcache-Server –