2016-06-01 12 views
0

Ich habe gerade angefangen mit akka Streams (2.4.6) und slick (3.1.1) in Scala (2.11.7) zu arbeiten. Ich benutze Intellij (nicht sicher, ob das wichtig ist, aber ich dachte, ich würde es dort rauswerfen).Scala - fromPublisher ist kein Mitglied von object akka.stream.scaladsl.Quelle

val scanner: DatabasePublisher[Stage] = db.stream(action.transactionally.withStatementParameters(fetchSize = 5000)) 
val source = Source.fromPublisher(scanner) 

das Problem ist in der zweiten Zeile. Intellij sagt mir, dass es nicht das Symbol „fromPublisher“ auflösen kann ... Ich dachte, es war IJ sein finnicky aber als ich das ich bekam zu bauen:

Error:(38, 10) value fromPublisher is not a member of object akka.stream.scaladsl.Source 
    Source.fromPublisher(scanner) 
     ^

eine Ahnung von dem, was ich falsch machen könnte?

Ich habe die ganze Nacht damit gekämpft und bin bereit, mir die Haare auszuziehen.

Danke!

+0

könnten Sie das vollständige Code-Snippet zeigen – Sky

Antwort

1

Siehe Arbeitsbeispiel:

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.Source 
import slick.backend.DatabasePublisher 
import slick.driver.H2Driver.api._ 

import scala.concurrent.Await 
import scala.concurrent.duration._ 

case class Emp(id: Int, name: String) 

object Demo extends App { 

    implicit val system = ActorSystem("Sys") 

    val db = Database.forConfig("h2mem1") 

    val empTableQuery = TableQuery[EmployeeTable] 
    val insertQuery = empTableQuery ++= Seq(Emp(1, "emp1"), Emp(2, "emp2"), Emp(3, "emp3"), Emp(4, "emp4")) 
    val action = DBIO.seq(empTableQuery.schema.create, insertQuery) 

    //create schema and insert record 
    Await.result(db.run(action), 1000 second) 
    // print db record 
    Await.result(db.run(empTableQuery.result), 1000 second).foreach(println) 
    val publisher: DatabasePublisher[Emp] = db.stream(empTableQuery.result) 

    import system.dispatcher 

    implicit val materializer = ActorMaterializer() 

    //consume using stream 
    println("Steaming data::::::::") 
    val source = Source.fromPublisher(publisher).map(emp => emp.id + " : " + emp.name).runForeach(println) 

    class EmployeeTable(tag: Tag) extends Table[Emp](tag, "emp") { 
    val id = column[Int]("id", O.PrimaryKey) 
    val name = column[String]("name") 

    def * = (id, name) <>(Emp.tupled, Emp.unapply) 
    } 


    source.onComplete(_ => system.terminate()) 

} 

build.sbt

scalaVersion := "2.11.8" 

libraryDependencies ++= Seq(
    "mysql" % "mysql-connector-java" % "5.1.36", 
    "com.typesafe.slick" %% "slick-hikaricp" % "3.1.1", 
    "ch.qos.logback" % "logback-classic" % "1.1.3", 
    "com.typesafe.slick" %% "slick" % "3.1.1", 
    "com.typesafe.akka" %% "akka-stream" % "2.4.6", 
    "org.scalatest" %% "scalatest" % "2.2.5" % "test", 
    "com.h2database" % "h2" % "1.4.187" 
) 

application.conf

h2mem1 = { 
    url = "jdbc:h2:mem:test1" 
    driver = org.h2.Driver 
    connectionPool = disabled 
    keepAliveConnection = true 
} 
0

Danke alle auf meine Frage zu beantworten. Ich habe das Problem gefunden. Es war nicht der Code.

Das Problem war in Intellij-Bibliotheken eingerichtet.

Ich kompilierte das Projekt mit SBT und es funktionierte und lief gut, trotz der Tatsache, dass IJ war über alles werfen.

Also nachdem ich stocherte ich um ging ich in Projektstruktur -> SDK und ich bemerkte, dass in den SDK-Bibliotheken, akka-stream-experimentelle 1.0 Glas und eine ganze Reihe von anderen Junk.

Ich löschte diese (links die JDK 8 Gläser). Und jetzt funktioniert alles!

Danke trotzdem!