2015-03-10 12 views
5

ich Code schreibe basierend auf „Asynchronous Iteratoren für großen Datensätze“ beschrieben bei https://github.com/websudos/phantom#partial-select-queriesWert Scheibe ist kein Mitglied von play.api.libs.iteratee.Enumerator

import scala.concurrent.ExecutionContext.Implicits.global 
import scala.concurrent.Future 

import org.joda.time.DateTime 
import org.joda.time.format.DateTimeFormat 
import org.joda.time.format.DateTimeFormatter 

import com.anomaly42.aml.dao.CassandraConnector 
import com.websudos.phantom.CassandraTable 
import com.websudos.phantom.Implicits._ 

object People extends People { 
    def getPersonByUpdatedAt(from:String, to:String, start: Int, limit: Int) = { 
    val dtf:DateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ssZ"); 
    val fromDateTime = dtf.parseDateTime(from) 
    val toDateTime = dtf.parseDateTime(to) 

    People.select(_.updated_at, _.firstName).allowFiltering.where(_.updated_at gte fromDateTime).and(_.updated_at lte toDateTime).fetchEnumerator().slice(start, limit).collect 
    } 
} 

ich folgenden Bibliothek Abhängigkeit bin mit:

scalaVersion := "2.11.6" 
libraryDependencies ++= Seq(
    "com.websudos"  %% "phantom-dsl"  % "1.5.4", 
    many more... 
) 

aber ich bekomme folgende Fehler beim Kompilieren vor:

value slice is not a member of play.api.libs.iteratee.Enumerator[(org.joda.time.DateTime, Option[String])] 

Was ich tryin bin Es ist eine Abfrage zu schreiben, die die nächste Anzahl von Ergebnissen zurückbringt, beginnend mit "start", jedes Mal, wenn die Methode getPersonByUpdatedAt() aufgerufen wird.

Antwort

3

Es gibt einige Implementierungsdetails, die hier behandelt werden müssen. Vor allem, wenn Sie nach der Seitennummerierung sind, gibt es möglicherweise einen einfacheren Weg, dies mit einfachen Bereichsabfragen anstelle von gefilterten Daten zu erreichen.

Werfen Sie einen Blick auf CLUSTERING ORDER, dass Anruf an ALLOW FILTERING sollte nicht da sein. Ohne CLUSTERING ORDER ist der voreingestellte Murmur3-Partitionierer nicht wirklich geordert, so dass Sie nicht garantieren können, dass Daten in derselben Reihenfolge abgerufen werden, in der Sie sie geschrieben haben.

Was wahrscheinlich bedeutet, dass Ihre Paginierung überhaupt nicht funktioniert. Nicht zuletzt ist die Verwendung von Enumeratoren wahrscheinlich nicht das, wonach Sie suchen.

Sie sind asynchron, also müssen Sie innerhalb einer Zukunft abbilden, um eine Scheibe zu erhalten, aber das beiseite, Aufzählungen sind nützlich, wenn etwas wie Spark eine ganze Tabelle auf einmal auflädt, zum Beispiel viele viele Ergebnisse.

Um es allen oben, in der Tabelle Personen Summe:

object id extends UUIDColumn(this) with PartitionKey[UUID]// doesn't have to be UUID 
object start extends DateTimeColumn(this) with ClusteringOrder[DateTime] with Ascending 
object end extends DateTimeColumn(this) with ClusteringOrder[DateTime] with Ascending 

Und einfach fetch() verwenden und dann Seq.slice aus der Bibliothek Scala Sammlungen. Das oben Gesagte geht davon aus, dass Sie in aufsteigender Reihenfolge paginieren möchten, z. B. zuerst die älteste.

Sie müssen auch herausfinden, was ein realistischer Partitionsschlüssel sein könnte. Wenn 2 Benutzer gleichzeitig aktualisiert werden, werden im schlimmsten Fall Daten verloren und mit einer FIFO-Warteschlange beendet, z. B. die letzte Aktualisierung zu einer bestimmten Zeit "gewinnt". Ich habe id oben verwendet, aber das ist nicht das, was Sie offensichtlich brauchen.

Und Sie müssen möglicherweise mehrere Tabellen, wo Sie Menschen speichern, so dass Sie alle Fragen abdecken können, die Sie benötigen.

+0

Hallo @flavian helfen, vielen Dank für Ihre Antwort. Meine Datenbank hat Millionen von Datensätzen und deshalb muss ich sie mit Enumerators implementieren. Ich habe die Datumsspalte mit 'CLUSTERING ORDER' definiert, wie von Ihnen erwähnt, aber wenn ich' ALLOW FILTERING' nicht verwende, dann bekomme ich Laufzeitfehler. Es ist mir gelungen, mit dem Kompilierungsfehler mit folgender Anweisung wegzukommen: People.select (_. Updated_at, _.firstName) .allowFiltering.where (_. Updated_at gte fromDateTime) .and (_. Updated_at lte toDateTime) .setFetchSize (pageSize) .fetchEnumerator() run Iteratee.slice (start, pageSize)) –

1

sollten Sie Iteratee und Enumerator von Play Framework verwenden. In Ihrem Fall müssen Sie:

import com.websudos.phantom.iteratee.Iteratee 

val enumerator = People.select(_.updated_at, _.firstName).allowFiltering.where(_.updated_at gte fromDateTime).and(_.updated_at lte toDateTime).fetchEnumerator 

val iteratee = Iteratee.slice[PeopleCaseClass](start, limit) 

enumerator.run(iteratee).map(_.foldLeft(List.empty[PeopleCaseClass])((l,e) => { e :: l })) 

Hope this