Ich arbeite an einem Spark-Programm, das die Eingabe von der RDD übernimmt und ein paar Sabol-Regeln beim Lesen aus einer drl-Datei ausführt.Spark- und Drools-Integration (Lesen von Regeln aus einer Drl-Datei)
in der TFL-Datei i eine Regel haben, dass überall dort, wo das hz Attribut des Objekts 0 ist es von 1.
der Zähler Attribut erhöhen sollte ich keine Ahnung, warum das nicht funktioniert, gibt es mir eine Ausgabe von 0 für alle Daten im Stream (Ja, es gibt Daten mit hz-Attribut gleich 0 und ja, ich kann alle Attribute drucken und verifizieren, dass auch für sie Zähler 0 ist)
Ich verwende die KieSessionFactory Klasse, die ich hier auf einem git Hub-Projekt gefunden https://github.com/mganta/sprue/blob/master/src/main/java/com/cloudera/sprue/KieSessionFactory.java
Aber ich bin mir ziemlich sicher, dass dieser Teil nicht wo Das Problem ist, es liest nur aus der Drl-Datei und wendet die Regeln an.
unten ist mein scala Code: (Ich habe das Teil markiert, wo ich das Problem denken liegt, aber zuerst bitte einen Blick auf die TFL-Datei nehmen)
package com.streams.Scala_Consumer
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.{ DStream, InputDStream, ConstantInputDStream }
import org.apache.spark.streaming.kafka.v09.KafkaUtils
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.sql.functions.avg
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.kafka.producer._
import org.apache.kafka.common.serialization.{ Deserializer, Serializer }
import org.apache.kafka.common.serialization.StringSerializer
import org.kie.api.runtime.StatelessKieSession
//import KieSessionFactory.getKieSession;
//import Sensor
object scala_consumer extends Serializable {
// schema for sensor data
class Sensor(resid_1: String, date_1: String, time_1: String, hz_1: Double, disp_1: Double, flo_1: Double, sedPPM_1: Double, psi_1: Double, chlPPM_1: Double, counter_1: Int) extends Serializable
{
var resid = resid_1
var date = date_1
var time = time_1
var hz = hz_1
var disp = disp_1
var flo = flo_1
var sedPPM = sedPPM_1
var psi = psi_1
var chlPPM = chlPPM_1
var counter = counter_1
def IncrementCounter (param: Int) =
{
counter = counter + param
}
}
// function to parse line of sensor data into Sensor class
def parseSensor(str: String): Sensor = {
val p = str.split(",")
//println("printing p: " + p)
new Sensor(p(0), p(1), p(2), p(3).toDouble, p(4).toDouble, p(5).toDouble, p(6).toDouble, p(7).toDouble, p(8).toDouble, 0)
}
var counter = 0
val timeout = 10 // Terminate after N seconds
val batchSeconds = 2 // Size of batch intervals
def main(args: Array[String]): Unit = {
val brokers = "maprdemo:9092" // not needed for MapR Streams, needed for Kafka
val groupId = "testgroup"
val offsetReset = "latest"
val batchInterval = "2"
val pollTimeout = "1000"
val topics = "/user/vipulrajan/streaming/original:sensor"
val topica = "/user/vipulrajan/streaming/fail:test"
val xlsFileName = "./src/main/Rules.drl"
val sparkConf = new SparkConf().setAppName("SensorStream").setMaster("local[1]").set("spark.testing.memory", "536870912")
.set("spark.streaming.backpressure.enabled", "true")
.set("spark.streaming.receiver.maxRate", Integer.toString(2000000))
.set("spark.streaming.kafka.maxRatePerPartition", Integer.toString(2000000));
val ssc = new StreamingContext(sparkConf, Seconds(batchInterval.toInt))
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->
"org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
"org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> offsetReset,
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false",
"spark.kafka.poll.time" -> pollTimeout
)
val producerConf = new ProducerConf(
bootstrapServers = brokers.split(",").toList
)
val messages = KafkaUtils.createDirectStream[String, String](ssc, kafkaParams, topicsSet)
val values: DStream[String] = messages.map(_._2)
println("message values received")
//values.print(10)
///////////*************************PART THAT COULD BE CAUSING A PROBLEM**************************/////////////
values.foreachRDD(x => try{
print("did 1\n") //markers for manual and minor debugging
val myData = x.mapPartitions(s => {s.map(sens => {parseSensor(sens)})})
//myData.collect().foreach(println)
//println(youData.date)
print("did 2\n")
val evalData = myData.mapPartitions(s => {
val ksession = KieSessionFactory.getKieSession(xlsFileName)
val retData = s.map(sens => {ksession.execute(sens); sens;})
retData
})
evalData.foreach(t => {println(t.counter)})
print("did 3\n")
}
catch{case e1: ArrayIndexOutOfBoundsException => println("exception in line ")})
///////////*************************PART THAT COULD BE CAUSING A PROBLEM**************************/////////////
println("filtered alert messages ")
// Start the computation
ssc.start()
// Wait for the computation to terminate
ssc.awaitTermination()
}
}
die TFL-Datei
package droolsexample
import com.streams.Scala_Consumer.Sensor;
import scala.com.streams.Scala_Consumer.Sensor; //imported because my rules file lies in the src/main folder
//and code lies in src/main/scala
// declare any global variables here
dialect "java"
rule "Counter Incrementer"
when
sens : Sensor (hz == 0)
then
sens.IncrementCounter(1);
end
Ich habe versucht, eine xls-Datei anstelle der drl-Datei zu verwenden, ich habe versucht, die Klasse in Java und das Objekt in scala zu erstellen. Ich habe viele andere Dinge versucht, aber alles, was ich in der Ausgabe bekomme, ist eine Warnung:
6/06/27 16: 38: 30.462 Executor Taskstart worker-0 WARN AbstractKieModule: Keine Dateien gefunden für KieBase defaultKieBase
und wenn ich die Zählerwerte drucke, bekomme ich alle Nullen. Jemand zur Rettung?
Wenn eine einfache Regel 'Regel x wenn dann System.out.println (" Hallo "); Ende ist in dieser Datei und nicht feuern dann am wahrscheinlichsten, Sie erstellen die Wissensbasis nicht korrekt. Denken Sie, dass der Import von zwei verschiedenen Klassen mit demselben einfachen Namen eine gute Idee ist? – laune
Ich habe versucht, sie auch einzeln zu importieren. Ich habe auch versucht, nur zu drucken ("Hallo"), das hat auch nicht funktioniert. Es tut mir leid, aber ich habe keine Ahnung, welche Wissensbasis ist, ich würde es googeln, aber wenn Sie einen Link oder eine Ressource haben, wäre ich wirklich dankbar, wenn Sie hier posten könnten :) –