2015-05-25 5 views
13

Ich habe die Möglichkeiten für Websphere MQ als Datenquelle für Spark-Streaming untersucht, da es in einem unserer Anwendungsfälle benötigt wird. Ich habe erfahren, dass MQTT das Protokoll ist, das die Kommunikation von MQ-Datenstrukturen unterstützt, aber da ich ein Neuling bin, um das Streaming auszulösen, brauche ich einige Arbeitsbeispiele für dasselbe. Hat jemand versucht, den MQ mit Spark-Streaming zu verbinden? Bitte ersinne den besten Weg dafür.Websphere MQ als Datenquelle für Apache Spark Streaming

+1

Voting zu schließen, wie Off-Topic, da es nicht in die Überarbeitungsfragen von Stack Overflow passt. Ich würde vorschlagen, diese Fragen zu Architektur und Machbarkeit auf http://mqseries.net oder in einem der anderen Online-MQ-Foren zu stellen. –

+0

Ich denke, es könnte nur ein Phrasierungsproblem sein. Statt der vagen - "Ich habe in diese Sache hineingeschaut. Was ist die beste Lösung?" _ Könnte man eine direkte Frage stellen. _ "Wie lese ich Daten aus Websphere MQ über Apache Spark?" _ Wenn Sie mehr über die WebSphere MQ-Seite der Frage wissen, können Sie weitere Informationen dazu hinzufügen. Unterstützt es SQL? Wie fragen Sie das normalerweise ab? Welche Kunden existieren dafür? Dann kann jemand, der Spark kennt, Ihnen wahrscheinlich helfen. –

Antwort

3

Also, ich bin Entsendung hier den Arbeitscode für CustomMQReceiver, die den Websphere MQ verbindet und liest Daten:

public class CustomMQReciever extends Receiver<String> { String host = null; 
int port = -1; 
String qm=null; 
String qn=null; 
String channel=null; 
transient Gson gson=new Gson(); 
transient MQQueueConnection qCon= null; 

Enumeration enumeration =null; 

public CustomMQReciever(String host , int port, String qm, String channel, String qn) { 
    super(StorageLevel.MEMORY_ONLY_2()); 
    this.host = host; 
    this.port = port; 
    this.qm=qm; 
    this.qn=qn; 
    this.channel=channel; 

} 

public void onStart() { 
    // Start the thread that receives data over a connection 
    new Thread() { 
     @Override public void run() { 
      try { 
       initConnection(); 
       receive(); 
      } 
      catch (JMSException ex) 
      { 
       ex.printStackTrace(); 
      } 
     } 
    }.start(); 
} 
public void onStop() { 
    // There is nothing much to do as the thread calling receive() 
    // is designed to stop by itself isStopped() returns false 
} 

/** Create a MQ connection and receive data until receiver is stopped */ 
private void receive() { 
    System.out.print("Started receiving messages from MQ"); 

    try { 

    JMSMessage receivedMessage= null; 

     while (!isStopped() && enumeration.hasMoreElements()) 
     { 

      receivedMessage= (JMSMessage) enumeration.nextElement(); 
      String userInput = convertStreamToString(receivedMessage); 
      //System.out.println("Received data :'" + userInput + "'"); 
      store(userInput); 
     } 

     // Restart in an attempt to connect again when server is active again 
     //restart("Trying to connect again"); 

     stop("No More Messages To read !"); 
     qCon.close(); 
     System.out.println("Queue Connection is Closed"); 

    } 
    catch(Exception e) 
    { 
     e.printStackTrace(); 
     restart("Trying to connect again"); 
    } 
    catch(Throwable t) { 
     // restart if there is any other error 
     restart("Error receiving data", t); 
    } 
    } 

    public void initConnection() throws JMSException 
{ 
    MQQueueConnectionFactory conFactory= new MQQueueConnectionFactory(); 
    conFactory.setHostName(host); 
    conFactory.setPort(port); 
    conFactory.setTransportType(JMSC.MQJMS_TP_CLIENT_MQ_TCPIP); 
    conFactory.setQueueManager(qm); 
    conFactory.setChannel(channel); 


    qCon= (MQQueueConnection) conFactory.createQueueConnection(); 
    MQQueueSession qSession=(MQQueueSession) qCon.createQueueSession(false, 1); 
    MQQueue queue=(MQQueue) qSession.createQueue(qn); 
    MQQueueBrowser browser = (MQQueueBrowser) qSession.createBrowser(queue); 
    qCon.start(); 

    enumeration= browser.getEnumeration(); 
    } 

@Override 
public StorageLevel storageLevel() { 
    return StorageLevel.MEMORY_ONLY_2(); 
} 
} 
1

Ich glaube, Sie JMS verwenden können, um eine Verbindung Websphere MQ zu verbinden, und Apache Camel kann verwendet werden, Verbindung mit Websphere MQ. Sie können einen benutzerdefinierten Empfänger erstellen, wie so (beachten Sie, dass dieses Muster auch ohne JMS verwendet werden könnten):

class JMSReceiver(topicName: String, cf: String, jndiProviderURL: String) 
    extends Receiver[String](StorageLevel.MEMORY_AND_DISK_SER) with Serializable { 
    //Transient as this will get passed to the Workers from the Driver 
    @transient 
    var camelContextOption: Option[DefaultCamelContext] = None 

    def onStart() = { 
    camelContextOption = Some(new DefaultCamelContext()) 
    val camelContext = camelContextOption.get 
    val env = new Properties() 
    env.setProperty("java.naming.factory.initial", "???") 
    env.setProperty("java.naming.provider.url", jndiProviderURL) 
    env.setProperty("com.webmethods.jms.clientIDSharing", "true") 
    val namingContext = new InitialContext(env); //using the properties file to create context 

    //Lookup Connection Factory 
    val connectionFactory = namingContext.lookup(cf).asInstanceOf[javax.jms.ConnectionFactory] 
    camelContext.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory)) 

    val builder = new RouteBuilder() { 
     def configure() = { 
      from(s"jms://topic:$topicName?jmsMessageType=Object&clientId=$clientId&durableSubscriptionName=${topicName}_SparkDurable&maxConcurrentConsumers=10") 
      .process(new Processor() { 
      def process(exchange: Exchange) = { 
       exchange.getIn.getBody match { 
       case s: String => store(s) 
       } 
      } 
      }) 
     } 
     } 
    } 
    builders.foreach(camelContext.addRoutes) 
    camelContext.start() 
    } 

    def onStop() = if(camelContextOption.isDefined) camelContextOption.get.stop() 
} 

Sie können dann eine DSTREAM Ihrer Veranstaltungen erstellen wie folgt:

val myDStream = ssc.receiverStream(new JMSReceiver("MyTopic", "MyContextFactory", "MyJNDI"))