2016-07-08 15 views
0

Ich habe einen Code, der viele Knoten und Beziehungen einfügen:Wie auf Neo4j mit Python Batch-Insertion zu tun

from neo4jrestclient.client import GraphDatabase 
from neo4jrestclient import client 
import psycopg2 

db = GraphDatabase("http://127.0.0.1:7474",username="neo4j", password="1234") 

conn = psycopg2.connect("\ 
    dbname='bdTrmmTest'\ 
    user='postgres'\ 
    host='127.0.0.1'\ 
    password='1234'\ 
    "); 

inicio = 0 

while(inicio <= 4429640): 
    c = conn.cursor() 
    c.execute("SELECT p.latitude, p.longitude, h.precipitacaoh, h.datah, h.horah FROM pontos AS p, historico AS h WHERE p.gid = h.gidgeo_fk LIMIT 1640 OFFSET %d"%(inicio)) 

    sensorlatlong = db.labels.create("LaLo") 
    sensorprecip = db.labels.create("Precipitacao") 
    sensordata = db.labels.create("Data") 
    sensorhora = db.labels.create("Hora") 

    records = c.fetchall() 

    for i in records: 
     s2 = db.nodes.create(precipitacao=i[2]) 
     sensorprecip.add(s2) 
     s5 = db.nodes.create(horah=i[4]) 
     sensorhora.add(s5) 
     s5.relationships.create("REGISTROU",s2) 
     q = 'MATCH (s:LaLo) WHERE s.latitude = "%s" AND s.longitude = "%s" RETURN s'%(str(i[0]),str(i[1])) 
     results = db.query(q, returns=(client.Node)) 
     q2 = 'MATCH (s:LaLo)-->(d:Data)-->(h:Hora)-->(p:Precipitacao) WHERE s.latitude = "%s" AND s.longitude = "%s" AND d.datah = "%s" RETURN d'%(str(i[0]), str(i[1]), str(i[3])) 
     results1 = db.query(q2, returns=(client.Node)) 
     if (len(results) > 0): 
      n = results[0].pop() 
      if(len(results1) > 0): 
       n1 = results1[0].pop() 
       n1.relationships.create("AS", s5) 
      else: 
       s4 = db.nodes.create(datah=i[3]) 
       sensordata.add(s4) 
       n.relationships.create("EM", s4) 
       s4.relationships.create("AS",s5) 
     else: 
      s3 = db.nodes.create(latitude=i[0],longitude=i[1]) 
      sensorlatlong.add(s3) 
      if(len(results1) > 0): 
       n1 = results1[0].pop() 
       n1.relationships.create("AS", s5) 
      else: 
       s4 = db.nodes.create(datah=i[3]) 
       sensordata.add(s4) 
       s3.relationships.create("EM", s4) 
       s4.relationships.create("AS",s5) 

    inicio = inicio+1640 

Aber es dauert so viele Tage einzufügen. Wie fügt Batch in diesen Code ein, um die Einfügezeit zu verringern? Ich lese diesen Beitrag http://jexp.de/blog/2012/10/parallel-batch-inserter-with-neo4j/, aber es ist in Java.

Antwort

0

Ich habe Neo4j nicht von Python verwendet, aber ich bin mir ziemlich sicher, dass der Client genauso funktioniert wie in anderen Sprachen, und das bedeutet, dass der Code viele verschiedene HTTP-Verbindungen generiert und den Low-Level-Knoten manipuliert und Beziehungsendpunkte. Das bedeutet viel Latenz.

Es erzeugt auch viele unterschiedliche Abfragen, weil es anstelle der Verwendung von parametrisierten Abfragen Zeichenketten ersetzend, und Neo4j muss jeden von ihnen analysieren.

Sie wären viel besser mit einer kleinen Anzahl von parametrisierten Cypher-Abfragen, oder sogar einer.

Wenn ich die documentation for neo4jrestclient richtig gelesen habe, denke ich, es so ähnlich aussehen:

c.execute("SELECT p.latitude, p.longitude, h.precipitacaoh, h.datah, h.horah FROM pontos AS p, historico AS h WHERE p.gid = h.gidgeo_fk LIMIT 1640 OFFSET %d"%(inicio)) 

records = c.fetchall() 

q = """ 
MERGE (lalo:LaLo {latitude: {latitude}, longitude: {longitude}}) 
WITH lalo 
MERGE (lalo)-[:EM]->(data:Data {datah: {datah}}) 
WITH data 
CREATE (data)-[:AS]->(hora:Hora {horah: {horah}}) 
CREATE (hora)-[:REGISTROU]->(:Precipitacao {precipitacao: {precipitacao}}) 
""" 

for i in records: 
    params = { 
     "latitude": str(i[0]), 
     "longitude": str(i[1]), 
     "precipitacao": i[2], 
     "datah": i[3], 
     "horah": i[4], 
    } 
    db.query(q=q, params=params) 

Natürlich wird es schneller laufen, wenn Sie Indizes, so dass Sie diese zuerst erstellen bräuchten (zumindest die erste 2), beispielsweise unmittelbar vor der Schleife oder außerhalb des Prozesses:

CREATE INDEX ON :LaLo(latitude) 
CREATE INDEX ON :LaLo(longitude) 
CREATE INDEX ON :Data(datah) 

das letzte, was Sie tun können, um Dinge zu beschleunigen ist use transactions, schreibt so in den Reihen passieren.

  1. Öffnen Sie eine Transaktion

    tx = db.transaction(for_query=True) 
    
  2. Anfügen (zum Beispiel) bis tausend Anfragen an (oder weniger, wenn Sie das Ende der Zeilen erreichen)

    params = // ... 
    tx.append(q=q, params=params) 
    
  3. Übernehmen Sie die Transaktion

    tx.execute() 
    
  4. wiederholen, bis Sie haben aus Zeilen aus der SQL-Datenbank ausführen