2016-06-27 15 views
0

Ich arbeite an einem Spark-Programm, das die Eingabe von der RDD übernimmt und ein paar Sabol-Regeln beim Lesen aus einer drl-Datei ausführt.Spark- und Drools-Integration (Lesen von Regeln aus einer Drl-Datei)

in der TFL-Datei i eine Regel haben, dass überall dort, wo das hz Attribut des Objekts 0 ist es von 1.

der Zähler Attribut erhöhen sollte ich keine Ahnung, warum das nicht funktioniert, gibt es mir eine Ausgabe von 0 für alle Daten im Stream (Ja, es gibt Daten mit hz-Attribut gleich 0 und ja, ich kann alle Attribute drucken und verifizieren, dass auch für sie Zähler 0 ist)

Ich verwende die KieSessionFactory Klasse, die ich hier auf einem git Hub-Projekt gefunden https://github.com/mganta/sprue/blob/master/src/main/java/com/cloudera/sprue/KieSessionFactory.java

Aber ich bin mir ziemlich sicher, dass dieser Teil nicht wo Das Problem ist, es liest nur aus der Drl-Datei und wendet die Regeln an.

unten ist mein scala Code: (Ich habe das Teil markiert, wo ich das Problem denken liegt, aber zuerst bitte einen Blick auf die TFL-Datei nehmen)

package com.streams.Scala_Consumer 

import org.apache.kafka.clients.consumer.ConsumerConfig 
import org.apache.kafka.common.serialization.StringDeserializer 
import org.apache.spark.{ SparkConf, SparkContext } 
import org.apache.spark.SparkContext._ 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.dstream.{ DStream, InputDStream, ConstantInputDStream } 
import org.apache.spark.streaming.kafka.v09.KafkaUtils 
import org.apache.spark.streaming.{ Seconds, StreamingContext } 
import org.apache.spark.sql.functions.avg 
import org.apache.spark.rdd.RDD 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.streaming.kafka.producer._ 
import org.apache.kafka.common.serialization.{ Deserializer, Serializer } 
import org.apache.kafka.common.serialization.StringSerializer 
import org.kie.api.runtime.StatelessKieSession 
//import KieSessionFactory.getKieSession; 
//import Sensor 

object scala_consumer extends Serializable { 

// schema for sensor data 
class Sensor(resid_1: String, date_1: String, time_1: String, hz_1: Double, disp_1: Double, flo_1: Double, sedPPM_1: Double, psi_1: Double, chlPPM_1: Double, counter_1: Int) extends Serializable 
{ 
var resid = resid_1 
var date = date_1 
var time = time_1 
var hz = hz_1 
var disp = disp_1 
var flo = flo_1 
var sedPPM = sedPPM_1 
var psi = psi_1 
var chlPPM = chlPPM_1 
var counter = counter_1 

def IncrementCounter (param: Int) = 
{ 
    counter = counter + param 
} 
} 

// function to parse line of sensor data into Sensor class 
def parseSensor(str: String): Sensor = { 
    val p = str.split(",") 
    //println("printing p: " + p) 
    new Sensor(p(0), p(1), p(2), p(3).toDouble, p(4).toDouble, p(5).toDouble, p(6).toDouble, p(7).toDouble, p(8).toDouble, 0) 
} 

var counter = 0 
val timeout = 10 // Terminate after N seconds 
val batchSeconds = 2 // Size of batch intervals 

def main(args: Array[String]): Unit = { 

    val brokers = "maprdemo:9092" // not needed for MapR Streams, needed for Kafka 
    val groupId = "testgroup" 
    val offsetReset = "latest" 
    val batchInterval = "2" 
    val pollTimeout = "1000" 
    val topics = "/user/vipulrajan/streaming/original:sensor" 
    val topica = "/user/vipulrajan/streaming/fail:test" 
    val xlsFileName = "./src/main/Rules.drl" 

    val sparkConf = new SparkConf().setAppName("SensorStream").setMaster("local[1]").set("spark.testing.memory", "536870912") 
                    .set("spark.streaming.backpressure.enabled", "true") 
            .set("spark.streaming.receiver.maxRate", Integer.toString(2000000)) 
            .set("spark.streaming.kafka.maxRatePerPartition", Integer.toString(2000000)); 

    val ssc = new StreamingContext(sparkConf, Seconds(batchInterval.toInt)) 

    // Create direct kafka stream with brokers and topics 
    val topicsSet = topics.split(",").toSet 
    val kafkaParams = Map[String, String](
     ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers, 
     ConsumerConfig.GROUP_ID_CONFIG -> groupId, 
     ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> 
      "org.apache.kafka.common.serialization.StringDeserializer", 
     ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> 
      "org.apache.kafka.common.serialization.StringDeserializer", 
     ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> offsetReset, 
     ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false", 
     "spark.kafka.poll.time" -> pollTimeout 
    ) 

    val producerConf = new ProducerConf(
     bootstrapServers = brokers.split(",").toList 
    ) 

    val messages = KafkaUtils.createDirectStream[String, String](ssc, kafkaParams, topicsSet) 

    val values: DStream[String] = messages.map(_._2) 
    println("message values received") 
    //values.print(10) 
///////////*************************PART THAT COULD BE CAUSING A PROBLEM**************************///////////// 
    values.foreachRDD(x => try{ 
           print("did 1\n")  //markers for manual and minor debugging 
           val myData = x.mapPartitions(s => {s.map(sens => {parseSensor(sens)})}) 
           //myData.collect().foreach(println) 
           //println(youData.date) 
           print("did 2\n") 
           val evalData = myData.mapPartitions(s => { 
           val ksession = KieSessionFactory.getKieSession(xlsFileName) 
           val retData = s.map(sens => {ksession.execute(sens); sens;}) 
           retData 
           }) 
           evalData.foreach(t => {println(t.counter)}) 
           print("did 3\n") 
           } 

    catch{case e1: ArrayIndexOutOfBoundsException => println("exception in line ")}) 
///////////*************************PART THAT COULD BE CAUSING A PROBLEM**************************///////////// 
    println("filtered alert messages ") 

    // Start the computation 
    ssc.start() 
    // Wait for the computation to terminate 
    ssc.awaitTermination() 

} 
} 

die TFL-Datei

package droolsexample 

import com.streams.Scala_Consumer.Sensor; 
import scala.com.streams.Scala_Consumer.Sensor; //imported because my rules file lies in the src/main folder 
              //and code lies in src/main/scala 

// declare any global variables here 
dialect "java" 
rule "Counter Incrementer" 

when 
    sens : Sensor (hz == 0) 

then 
    sens.IncrementCounter(1); 
end 

Ich habe versucht, eine xls-Datei anstelle der drl-Datei zu verwenden, ich habe versucht, die Klasse in Java und das Objekt in scala zu erstellen. Ich habe viele andere Dinge versucht, aber alles, was ich in der Ausgabe bekomme, ist eine Warnung:

6/06/27 16: 38: 30.462 Executor Taskstart worker-0 WARN AbstractKieModule: Keine Dateien gefunden für KieBase defaultKieBase

und wenn ich die Zählerwerte drucke, bekomme ich alle Nullen. Jemand zur Rettung?

+0

Wenn eine einfache Regel 'Regel x wenn dann System.out.println (" Hallo "); Ende ist in dieser Datei und nicht feuern dann am wahrscheinlichsten, Sie erstellen die Wissensbasis nicht korrekt. Denken Sie, dass der Import von zwei verschiedenen Klassen mit demselben einfachen Namen eine gute Idee ist? – laune

+0

Ich habe versucht, sie auch einzeln zu importieren. Ich habe auch versucht, nur zu drucken ("Hallo"), das hat auch nicht funktioniert. Es tut mir leid, aber ich habe keine Ahnung, welche Wissensbasis ist, ich würde es googeln, aber wenn Sie einen Link oder eine Ressource haben, wäre ich wirklich dankbar, wenn Sie hier posten könnten :) –

Antwort

1

Wenn Sie die Funke übermitteln und Ihr JAR zur Ausführung übergeben, stellen Sie sicher, dass andere Abhängigkeits-JARs von KIE usw. ebenfalls in derselben JAR enthalten sind, und führen Sie sie dann mit Spark-Submit aus.

alternative zwei getrennte Projekte haben ein, wo Sie Ihre Funkenprogramm ahve eine andere ist Ihr KIE Projekt, so dass Sie zwei Gläser haben und Sie es soetwas wie unten ausgeführt:

nohup funken einreichen --conf " spark.driver.extraJavaOptions -Dlog4j.configuration = Datei: /log4j.properties "\ - Warteschlange abc \ --master Garn \ --deploy-Modus-Cluster \ --jars drools-kie-project-0.0. 1-SNAPSHOT.jar --class com.abc.DroolsSparkJob SparkcallingDrools-0.0.1-SNAPSHOT.jar \ -inputdatei/user/hive/warehouse/abc/* -output/user/bienenstock/warehouse/drools-Op> app .log &