2016-08-08 7 views
1

Ich habe einen Variations-Autoencoder mit Tensorflow auf einer einzelnen Maschine implementiert. Jetzt versuche ich es auf meinem Cluster mit dem verteilten Mechanismus Tensorflow laufen zu lassen. Aber das folgende Problem hatte mich mehrere Tage lang festgenagelt.Ausführen von Distributed Tensorflow mit InvalidArgumentError: Sie müssen einen Wert für den Platzhalter Tensor 'Placeholder' mit dem Datentyp float eingeben

Traceback (most recent call last): 
    File "/home/yama/mfs/ZhuSuan/examples/vae.py", line 265, in <module> 
    print('>> Test log likelihood = {}'.format(np.mean(test_lls))) 
    File "/usr/lib/python2.7/contextlib.py", line 35, in __exit__ 
    self.gen.throw(type, value, traceback) 
    File "/mfs/yama/tensorflow/local/lib/python2.7/site-packages/tensorflow/python/training/supervisor.py", line 942, in managed_session 
    self.stop(close_summary_writer=close_summary_writer) 
    File "/mfs/yama/tensorflow/local/lib/python2.7/site-packages/tensorflow/python/training/supervisor.py", line 768, in stop 
    stop_grace_period_secs=self._stop_grace_secs) 
    File "/mfs/yama/tensorflow/local/lib/python2.7/site-packages/tensorflow/python/training/coordinator.py", line 322, in join 
    six.reraise(*self._exc_info_to_raise) 
    File "/mfs/yama/tensorflow/local/lib/python2.7/site-packages/tensorflow/python/training/coordinator.py", line 267, in stop_on_exception 
    yield 
    File "/mfs/yama/tensorflow/local/lib/python2.7/site-packages/tensorflow/python/training/coordinator.py", line 411, in run 
    self.run_loop() 
    File "/mfs/yama/tensorflow/local/lib/python2.7/site-packages/tensorflow/python/training/supervisor.py", line 972, in run_loop 
    self._sv.global_step]) 
    File "/mfs/yama/tensorflow/local/lib/python2.7/site-packages/tensorflow/python/client/session.py", line 372, in run 
    run_metadata_ptr) 
    File "/mfs/yama/tensorflow/local/lib/python2.7/site-packages/tensorflow/python/client/session.py", line 636, in _run 
    feed_dict_string, options, run_metadata) 
    File "/mfs/yama/tensorflow/local/lib/python2.7/site-packages/tensorflow/python/client/session.py", line 708, in _do_run 
    target_list, options, run_metadata) 
    File "/mfs/yama/tensorflow/local/lib/python2.7/site-packages/tensorflow/python/client/session.py", line 728, in _do_call 
    raise type(e)(node_def, op, message) 
tensorflow.python.framework.errors.InvalidArgumentError: You must feed a value for placeholder tensor 'Placeholder' with dtype float 
    [[Node: Placeholder = Placeholder[dtype=DT_FLOAT, shape=[], _device="/job:worker/replica:0/task:0/gpu:0"]()]] 
    [[Node: model_1/fully_connected_10/Relu_G88 = _Recv[client_terminated=false, recv_device="/job:worker/replica:0/task:0/cpu:0", send_device="/job:worker/replica:0/task:0/gpu:0", send_device_incarnation=3964479821165574552, tensor_name="edge_694_model_1/fully_connected_10/Relu", tensor_type=DT_FLOAT, _device="/job:worker/replica:0/task:0/cpu:0"]()]] 
Caused by op u'Placeholder', defined at: 
    File "/home/yama/mfs/ZhuSuan/examples/vae.py", line 201, in <module> 
    x = tf.placeholder(tf.float32, shape=(None, x_train.shape[1])) 
    File "/mfs/yama/tensorflow/local/lib/python2.7/site-packages/tensorflow/python/ops/array_ops.py", line 895, in placeholder 
    name=name) 
    File "/mfs/yama/tensorflow/local/lib/python2.7/site-packages/tensorflow/python/ops/gen_array_ops.py", line 1238, in _placeholder 
    name=name) 
    File "/mfs/yama/tensorflow/local/lib/python2.7/site-packages/tensorflow/python/ops/op_def_library.py", line 704, in apply_op 
    op_def=op_def) 
    File "/mfs/yama/tensorflow/local/lib/python2.7/site-packages/tensorflow/python/framework/ops.py", line 2260, in create_op 
    original_op=self._default_original_op, op_def=op_def) 
    File "/mfs/yama/tensorflow/local/lib/python2.7/site-packages/tensorflow/python/framework/ops.py", line 1230, in __init__ 
    self._traceback = _extract_stack() 

Hier ist mein Code, ich die Hauptfunktion nur der Einfachheit halber fügen:

if __name__ == "__main__": 
    tf.set_random_seed(1234) 

    # Load MNIST 
    data_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 
          'data', 'mnist.pkl.gz') 
    x_train, t_train, x_valid, t_valid, x_test, t_test = \ 
     dataset.load_mnist_realval(data_path) 
    x_train = np.vstack([x_train, x_valid]) 
    np.random.seed(1234) 
    x_test = np.random.binomial(1, x_test, size=x_test.shape).astype('float32') 

    # Define hyper-parametere 
    n_z = 40 

    # Define training/evaluation parameters 
    lb_samples = 1 
    ll_samples = 5000 
    epoches = 10 
    batch_size = 100 
    test_batch_size = 100 
    iters = x_train.shape[0] // batch_size 
    test_iters = x_test.shape[0] // test_batch_size 
    test_freq = 10 

    ps_hosts = FLAGS.ps_hosts.split(",") 
    worker_hosts = FLAGS.worker_hosts.split(",") 

    # Create a cluster from the parameter server and worker hosts. 
    clusterSpec = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts}) 

    print("Create and start a server for the local task.") 
    # Create and start a server for the local task. 
    server = tf.train.Server(clusterSpec, 
          job_name=FLAGS.job_name, 
          task_index=FLAGS.task_index) 

    print("Start ps and worker server") 
    if FLAGS.job_name == "ps": 
     server.join() 
    elif FLAGS.job_name == "worker": 
     #set distributed device 
     with tf.device(tf.train.replica_device_setter(
      worker_device="/job:worker/task:%d" % FLAGS.task_index, 
      cluster=clusterSpec)): 

      print("Build the training computation graph") 
      # Build the training computation graph 
      x = tf.placeholder(tf.float32, shape=(None, x_train.shape[1])) 
      optimizer = tf.train.AdamOptimizer(learning_rate=0.001, epsilon=1e-4) 
      with tf.variable_scope("model") as scope: 
       with pt.defaults_scope(phase=pt.Phase.train): 
        train_model = M1(n_z, x_train.shape[1]) 
        train_vz_mean, train_vz_logstd = q_net(x, n_z) 
        train_variational = ReparameterizedNormal(
         train_vz_mean, train_vz_logstd) 
        grads, lower_bound = advi(
         train_model, x, train_variational, lb_samples, optimizer) 
        infer = optimizer.apply_gradients(grads) 

      print("Build the evaluation computation graph") 
      # Build the evaluation computation graph 
      with tf.variable_scope("model", reuse=True) as scope: 
       with pt.defaults_scope(phase=pt.Phase.test): 
        eval_model = M1(n_z, x_train.shape[1]) 
        eval_vz_mean, eval_vz_logstd = q_net(x, n_z) 
        eval_variational = ReparameterizedNormal(
         eval_vz_mean, eval_vz_logstd) 
        eval_lower_bound = is_loglikelihood(
         eval_model, x, eval_variational, lb_samples) 
        eval_log_likelihood = is_loglikelihood(
         eval_model, x, eval_variational, ll_samples) 

      global_step = tf.Variable(0) 
      saver = tf.train.Saver() 
      summary_op = tf.merge_all_summaries() 
      init_op = tf.initialize_all_variables() 

     # Create a "supervisor", which oversees the training process. 
     sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0), 
           logdir=LogDir, 
           init_op=init_op, 
           summary_op=summary_op, 
           saver=saver, 
           global_step=global_step, 
           save_model_secs=600) 
     # Run the inference 
     with sv.managed_session(server.target) as sess: 
      epoch = 0 
      while not sv.should_stop() and epoch < epoches: 
      #for epoch in range(1, epoches + 1): 
       np.random.shuffle(x_train) 
       lbs = [] 
       for t in range(iters): 
        x_batch = x_train[t * batch_size:(t + 1) * batch_size] 
        x_batch = np.random.binomial(n=1, p=x_batch, size=x_batch.shape).astype('float32') 
        _, lb = sess.run([infer, lower_bound], feed_dict={x: x_batch}) 
        lbs.append(lb) 
       if epoch % test_freq == 0: 
        test_lbs = [] 
        test_lls = [] 
        for t in range(test_iters): 
         test_x_batch = x_test[ 
          t * test_batch_size: (t + 1) * test_batch_size] 
         test_lb, test_ll = sess.run(
          [eval_lower_bound, eval_log_likelihood], 
          feed_dict={x: test_x_batch} 
         ) 
         test_lbs.append(test_lb) 
         test_lls.append(test_ll) 
        print('>> Test lower bound = {}'.format(np.mean(test_lbs))) 
        print('>> Test log likelihood = {}'.format(np.mean(test_lls))) 
     sv.stop() 

Ich habe versucht meinen Code für mehrere Tage zu korrigieren, aber alle meine Bemühungen sind gescheitert. Auf der Suche nach Ihrer Hilfe!

Antwort

4

Die wahrscheinlichste Ursache dieser Ausnahme ist, dass eine der Operationen, dass die tf.train.Supervisor läuft im Hintergrund auf dem tf.placeholder() Tensor x abhängt, aber nicht über genügend Informationen, um einen Wert für sie zu füttern.

Der wahrscheinlichste Schuldige ist summary_op = tf.merge_all_summaries(), weil der Bibliothekscode oft Werte zusammenfasst, die von den Trainingsdaten abhängen. Um zu verhindern, die Supervisor von Zusammenfassungen in den Hintergrund zu sammeln, übergeben summary_op=None zum tf.train.Supervisor Konstruktor:

 # Create a "supervisor", which oversees the training process. 
     sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0), 
           logdir=LogDir, 
           init_op=init_op, 
           summary_op=None, 
           saver=saver, 
           global_step=global_step, 
           save_model_secs=600) 

Danach tun, werden Sie alternative Vorkehrungen treffen müssen, um Zusammenfassungen zu sammeln. Der einfachste Weg, dies zu tun ist, in regelmäßigen Abständen summary_op an sess.run() zu übergeben, dann das Ergebnis an sv.summary_computed() übergeben.

+0

Hallo, mrry. Ich habe deinen Rat auf verschiedene Arten versucht. – sproblvem

+0

Entschuldigung, die letzten Kommentare sind nicht vollständig. Ich habe deine Lösung versucht, der Fehler bleibt bestehen. Wenn das Programm sess.run (summary_op) und sv.summary_computed() aufruft. Das Fehlerprotokoll erinnert mich immer noch daran, dass "Sie einen Wert für den Platzhalter Tensor einspeisen müssen". Oder wenn ich einfach summary_op = None setze, ohne sess.run (summary_op) in regelmäßigen Abständen zu starten, bleibt das Programm hängen. Irgendwelche weiteren Ratschläge? Danke und auf der Suche nach Ihrer Antwort. – sproblvem

0

Ich hatte das gleiche genaue Problem. Nach mrry Vorschlag konnte ich diese Arbeit wird durch:

  1. Zusammenfassung Protokollierung im Supervisor Deaktivieren von summary_op = Keine Einstellung (als mrry vorgeschlagen)
  2. meine eigene summary_op anlegen und weitergeben(), um sess.run zusammen mit dem Rest der zu evaluierenden Ops. Behalten Sie die resultierende Zusammenfassung bei, sagen wir, sie heißt "my_summary".
  3. Erstellen eines eigenen Übersichtsschreibers. Nennen Sie es mit 'my_summary', zB: summary_writer.add_summary (Zusammenfassung, epoch_count)

Um zu klären, ich habe nicht mrry Vorschlag verwenden sess.run zu tun (summary_op) und sv.summary_computed(), aber statt hat zusammen mit den anderen Operationen den summary_op ausgeführt und dann die Zusammenfassung selbst geschrieben. Vielleicht möchten Sie auch die Zusammenfassung schreiben, um Chef zu sein.

Im Grunde genommen müssen Sie die Zusammenfassungsdienste des Supervisors vollständig umgehen. Scheint wie eine überraschende Einschränkung/ein Fehler von Supervisor, da es nicht ungewöhnlich ist, Dinge zu protokollieren, die von der Eingabe abhängen (die in einem Platzhalter lebt). Zum Beispiel in meinem Netzwerk (ein Autoencoder) hängen die Kosten von der Eingabe ab.

0

Kam über eine ähnliche Sache. Der Chef ging mit der oben erwähnten Fehlermeldung unter. Da ich jedoch die MonitoredTrainingSession anstelle eines selbst erstellten Supervisors verwendete, konnte ich das Problem lösen, indem ich die Standardzusammenfassung deaktivierte. Zum Deaktivieren müssen Sie dem Konstruktor der MonitoredTrainingSession

angeben. Danach lief alles glatt! Code on Github