Ich habe einen FTP-Eingangsadapter, der auf mehreren Tomcat-Instanzen bereitgestellt wird. Das Problem ist, dass alle Instanzen Dateien aus dem Remote-Verzeichnis ziehen, aber ich brauche eine einzelne Datei, um nur einmal nicht von allen Instanzen gezogen werden.FTP-Eingangsadapter in einer Clusterumgebung
Gibt es einen Weg dies zu erreichen?
Dies ist der Adapter Definition:
<int-sftp:inbound-channel-adapter id="inboundMeasuremntFtpReceiveAdapter" session-factory="inboundMeasurementSftpSession"
auto-startup="${ftp_measurement_autostart}"
local-directory="#{'${dir_interface_home}' + '${dir_interface}' + '${dir_inbound_measurement}' + '${dir_data}'}"
channel="inboundMeasuremntSftpReceiveChannel"
remote-directory="${ftp_measurement_remote_dir}" filename-pattern="*.txt" >
<int:poller id="inboundMeasurementSftpPoller" trigger="inboundMeasurementFtpTrigger" max-messages-per-poll="-1"
error-channel="inboundMeasurementSftpErrorEnrichChannel">
</int:poller>
</int-sftp:inbound-channel-adapter>
Metadata store class
package com.deere.componentdatafiles.nondeerefile;
import java.io.Closeable;
import java.io.Flushable;
import java.io.IOException;
import net.spy.memcached.AddrUtil;
import net.spy.memcached.MemcachedClient;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.integration.metadata.ConcurrentMetadataStore;
public class MemcacheMetadatastore implements ConcurrentMetadataStore,InitializingBean, DisposableBean, Closeable, Flushable {
MemcachedClient cache = null;
public MemcacheMetadatastore() {
super();try {
cache = new MemcachedClient(AddrUtil.getAddresses("URL"));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();} }
@Override
public String get(String key) {
// TODO Auto-generated method stub
return (String) cache.get(key);
}
@Override
public void put(String key, String value) {
putIfAbsent(key, value);
}
@Override
public String remove(String key) {
// TODO Auto-generated method stub
return cache.delete(key).toString();
}
@Override
public void flush() throws IOException {
// TODO Auto-generated method stub
}
@Override
public void close() throws IOException {
flush();
}
@Override
public void destroy() throws Exception {
flush();
}
@Override
public void afterPropertiesSet() throws Exception {
// TODO Auto-generated method stub
}
@Override
public String putIfAbsent(String key, String value) {
String fileValue = get(key);
if (fileValue == null && "".equals(fileValue)) {
cache.set(key, 0, value);
return null;
}
return fileValue;
}
@Override
public boolean replace(String arg0, String arg1, String arg2) {
// TODO Auto-generated method stub
return false;
}
}
Konfiguration
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-sftp="http://www.springframework.org/schema/integration/sftp"
xmlns:int-ftp="http://www.springframework.org/schema/integration/ftp"
xmlns:task="http://www.springframework.org/schema/task"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
xsi:schemaLocation="http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream-4.0.xsd
http://www.springframework.org/schema/integration/sftp http://www.springframework.org/schema/integration/sftp/spring-integration-sftp.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/ftp http://www.springframework.org/schema/integration/ftp/spring-integration-ftp.xsd">
<bean id="ftpClientFactory"
class="org.springframework.integration.ftp.session.DefaultFtpSessionFactory">
<property name="host" value="****"/>
<property name="username" value="****"/>
<property name="password" value="****"/>
<property name="clientMode" value="0"/>
<property name="fileType" value="2"/>
<property name="bufferSize" value="100000"/>
</bean>
<int-ftp:inbound-channel-adapter id="ftpInbound"
channel="receiveChannel" session-factory="ftpClientFactory"
auto-create-local-directory="true" delete-remote-files="false"
remote-directory="/OUT/SDI402_CARATT_JD" remote-file-separator="/"
filter="compositeFilter"
local-directory="."
local-filter="acceptAll">
<int:poller fixed-rate="5000" max-messages-per-poll="1">
<!-- <int:transactional synchronization-factory="syncFactory"/> -->
</int:poller>
</int-ftp:inbound-channel-adapter>
<int:channel id="receiveChannel" />
<int:channel id="afterSuccessDeleteChannel" />
<int:service-activator
id="nonDeereXmlServiceActivator" input-channel="receiveChannel"
ref="controllerListener" method="listen1" />
<int:service-activator input-channel="receiveChannel" expression="T(java.lang.System).out.println(payload.toString())">
<int:request-handler-advice-chain>
<bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
<property name="onSuccessExpression" value="payload.delete()" />
<property name="successChannel" ref="afterSuccessDeleteChannel" />
<property name="onFailureExpression" value="payload.rename('/tmp/bad/' + payload.name)" />
</bean>
</int:request-handler-advice-chain>
</int:service-activator>
<!-- <int:transformer input-channel="afterSuccessDeleteChannel" output-channel="stdout"
expression="'Removal of ' + inputMessage.payload.absolutePath + ' after transfer ' + (payload ? 'succeeded' : 'failed')" />
<int-stream:stdout-channel-adapter id="stdout" append-newline="true"/> -->
<bean id="acceptAll" class="org.springframework.integration.file.filters.AcceptAllFileListFilter" />
<bean id="compositeFilter" class="org.springframework.integration.file.filters.CompositeFileListFilter">
<constructor-arg>
<list>
<bean class="org.springframework.integration.ftp.filters.FtpSimplePatternFileListFilter">
<!-- <constructor-arg value="*.xml" /> -->
</bean>
<bean class="org.springframework.integration.ftp.filters.FtpPersistentAcceptOnceFileListFilter">
<constructor-arg name="store" ref="metadataStore"/>
<constructor-arg value="foo/bar/"/>
</bean>
</list>
</constructor-arg>
</bean>
<bean name="metadataStore" class="com.deere.componentdatafiles.nondeerefile.MemcacheMetadatastore">
<!-- <property name="baseDirectory" value="/tmp/"/> -->
</bean>
</beans>
Kann ich es erreichen, ohne einen neuen Server mit Hilfe von PropertiesPersistingMetadataStore hinzuzufügen? –
Sie benötigen eine Art gemeinsamen Speicher für einen Cluster - das PPMS ist nicht geeignet. Es ist nur für die Verwendung in einem lokalen Dateisystem gedacht. Es gibt keinen Auslösemechanismus, um zu sagen, dass sich die Datei in einem freigegebenen Dateisystem geändert hat. Sie könnten Ihren eigenen Metadatenspeicher schreiben, um den Status an die anderen Mitglieder des Clusters zu übertragen. –
Danke Gary Ich versuche Metadaten-Store mit Memcache-Server –