2009-10-29 12 views
7

Ich muss asynchrone Nachrichtenwarteschlangen dynamisch in Java erstellen. Mein Anwendungsfall ist das Senden von E-Mails über mehrere SMTP-Server: Ich muss erzwingen, dass E-Mails an denselben SMTP-Server sequenziell verarbeitet werden, aber E-Mails an verschiedene SMTP-Server können gleichzeitig verarbeitet werden. Ich habe JMS in der Vergangenheit verwendet, aber soweit ich sehen kann, erlaubt es nur die Erstellung von Kompilierungswarteschlangen, während ich zur Laufzeit Warteschlangen erstellen muss (eine Warteschlange für jeden SMTP-Server).Dynamisch asynchrone Nachrichtenwarteschlangen in Java erstellen

Fehle ich etwas in Bezug auf JMS oder gibt es ein anderes Tool/Vorschlag, das ich mir ansehen sollte?

+0

Sind Sie JMS mit speziell oder ist dies etwas, das Sie java.util.concurrent und seine ExecutorServices tun können? –

+0

Ich verwende JMS nicht speziell, deshalb werde ich die ExecutorServices anschauen, danke. – Zecrates

Antwort

6

Ich stimme Adam zu, der Anwendungsfall klingt wie JMS ist Overhead. Java integrierte Funktionalität ausreichend:

package de.mhaller; 

import java.util.ArrayDeque; 
import java.util.ArrayList; 
import java.util.Deque; 
import java.util.HashMap; 
import java.util.Map; 
import java.util.Queue; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.LinkedBlockingDeque; 

import org.junit.Assert; 
import org.junit.Test; 

public class Mailer { 

    @Test 
    public void testMailer() throws Exception { 
     ExecutorService executor = Executors.newCachedThreadPool(); 
     ArrayList<Mail> log = new ArrayList<Mail>(); 
     LinkedBlockingDeque<Mail> incoming = new LinkedBlockingDeque<Mail>(); 

     // TODO: Put mails to be sent into the incoming queue 
     incoming.offer(new Mail("[email protected]", "localhost")); 
     incoming.offer(new Mail("[email protected]", "otherhost")); 
     incoming.offer(new Mail("[email protected]", "otherhost")); 
     incoming.offer(new Mail("[email protected]", "localhost")); 

     Map<Mailserver, Queue<Mail>> queues = new HashMap<Mailserver, Queue<Mail>>(); 
     while (!incoming.isEmpty()) { 
      Mail mail = incoming.pollFirst(); 
      Mailserver mailserver = findMailserver(mail); 
      if (!queues.containsKey(mailserver)) { 
       ArrayDeque<Mail> serverQueue = new ArrayDeque<Mail>(); 
       queues.put(mailserver, serverQueue); 
       executor.execute(new SendMail(mailserver, serverQueue)); 
      } 
      Queue<Mail> slot = queues.get(mailserver); 
      slot.offer(mail); 
     } 

     assertMailSentWithCorrectServer(log); 
    } 

    private void assertMailSentWithCorrectServer(ArrayList<Mail> log) { 
     for (Mail mail : log) { 
      if (!mail.server.equals(mail.sentBy.mailserver)) { 
       Assert.fail("Mail sent by wrong server: " + mail); 
      } 
     } 
    } 

    private Mailserver findMailserver(Mail mail) { 
     // TODO: Your lookup logic which server to use 
     return new Mailserver(mail.server); 
    } 

    private static class Mail { 
     String recipient; 
     String server; 
     SendMail sentBy; 

     public Mail(String recipient, String server) { 
      this.recipient = recipient; 
      this.server = server; 
     } 

     @Override 
     public String toString() { 
      return "mail for " + recipient; 
     } 
    } 

    public static class SendMail implements Runnable { 

     private final Deque<Mail> queue; 
     private final Mailserver mailserver; 

     public SendMail(Mailserver mailserver, Deque<Mail> queue) { 
      this.mailserver = mailserver; 
      this.queue = queue; 
     } 

     @Override 
     public void run() { 
      while (!queue.isEmpty()) { 
       Mail mail = queue.pollFirst(); 
       // TODO: Use SMTP to send the mail via mailserver 
       System.out.println(this + " sent " + mail + " via " + mailserver); 
       mail.sentBy = this; 
      } 
     } 

    } 

    public static class Mailserver { 
     String hostname; 

     public Mailserver(String hostname) { 
      this.hostname = hostname; 
     } 

     @Override 
     public String toString() { 
      return hostname; 
     } 

     @Override 
     public int hashCode() { 
      return hostname.hashCode(); 
     } 

     @Override 
     public boolean equals(Object obj) { 
      return hostname.equals(((Mailserver) obj).hostname); 
     } 

    } 

} 
1

JMS selbst als eine Spezifikation ist eher leise auf das Problem. Bei den meisten Implementierungen können Sie dies tun, nur nicht über JMS selbst, sondern über ihre eigene API. Aber Sie können nicht etwas Formales wie eine MDB an eine dynamische Warteschlange anschließen. Stattdessen müssen Sie Ihre eigenen Verbindungen und Listener verwalten.

1

Das letzte Mal, als wir uns dies in einer WebSphere-Umgebung angesehen haben, war es überraschend schwierig/unmöglich, Warteschlangen dynamisch zu erstellen (temporäre Warteschlangen sind für Sie zu kurzlebig, denke ich). Obwohl APIs zum Erstellen von Warteschlangen vorhanden waren, mussten sie später neu gestartet werden, um aktiv zu werden. Dann gibt es das MDB-Problem allusused.

Wie wäre es mit einem dreckigen Workaround, der auf dem Sprichwort basiert, dass alle Probleme durch eine zusätzliche Indirektion gelöst werden können, die davon ausgeht, dass die verfügbaren Drucker vergleichsweise klein sind.

Erstellen Sie die Warteschlangen Printer01 bis Printer99 (oder eine kleinere Nummer). Habe eine "Datenbank", die Warteschlangen echten Druckern zuordnet. Wenn Anfragen für Drucker kommen, können Sie der Mapping-Tabelle hinzufügen. Sie haben vielleicht einen Overhead von MDBs, die auf Warteschlangen schauen, die nie benutzt werden, aber wenn Ihre potentielle Anzahl von Druckern nicht groß ist, können Sie es sich vielleicht leisten?

0

eine Warteschlange erstellen für jede Ihrer SMTP-Sever und Limit Warteschlange Verbraucher (MDB oder ein Zuhörer Nachricht) auf 1

0

ich dies mit activemq getan habe - ich gepostet tatsächlich eine Frage zu diesem Thema zu der Zeit, als Ich hatte ähnliche Bedenken (die JMS-Dokumentation erklärte zu dem Zeitpunkt, dass dies nicht unterstützt wurde) und wurde versichert, dass dies unterstützt wird.

+0

Haben Sie einen Link zu Ihrer Frage oder zu einer Dokumentation, in der beschrieben wird, wie dies erreicht wird? – Zecrates