Back

Explore Courses Blog Tutorials Interview Questions
+1 vote
2 views
in Machine Learning by (4.2k points)

tl;dr

How can I evaluate a validation set after every K training iterations, using separate queues for training and validation data, without resorting to separate tf.Sessions in multiple processes? There doesn't seem to be a clean way to achieve this, given my particular problem, and my current workaround (which I thought would work) gives me undefined behavior. Help!

The whole story

I want to evaluate a validation set every K training iterations, and I cannot figure out how to implement this properly in TensorFlow. This should be one of the most common operations, yet it feels that TensorFlow's API/architecture is working against me here or is at least making things unnecessarily difficult.

My assumptions are:

  • [A1] The multi-process model for training/validation as described here https://www.tensorflow.org/how_tos/reading_data/#multiple_input_pipelines is not applicable to my problem, as I have to assume there is not enough GPU memory available to load the variables twice.
  • [A2] I want to evaluate on the validation set every K training iterations.
  • [A3] Both training and validation data cannot be simply read from disk, but are generated on-the-fly. This makes it impossible to reliably pre-compute the size of the validation set in advance.
  • [A4] The validation set is too large to pre-compute and store onto disk.
  • [A5] The effective validation set size is not necessarily a multiple of the batch size.

The training input pipeline is set up as follows:

  • A tf.train.slice_input_producer() generates a (shuffled) list of filenames, each referring to raw input data.
  • A custom data generation function generates a variable number of training exemplars/labels from each chunk of raw input data.
  • The generated training exemplars/labels are queued via tf.train.shuffle_batch() before being fed into the network.

Due to [A3], [A4], [A5], the validation input pipeline is set up in an almost identical way, except that the final input queue is generated via tf.train.batch(), since shuffling is not desirable. Due to the above assumptions, a feed_dict based approach is also infeasible, and also seemingly incompatible with using a higher level function such as tf.train.batch.

However, a straightforward implementation using two different sets of queues for training and validation does not work. As far as I understand, I have two options:

  • [B1] Set the num_epochs argument of the validation tf.train.slice_input_producer to None.

    In this case, the validation set is cycled through endlessly, but I would need to know the size of the validation set in advance to explicitly limit the number of batches to evaluate per run through the validation set. Furthermore, if the validation set size is not divisible by the batch size, I will always pull a bit more in the last batch. As this would shift the order of evaluation of the validation data each time, this is not acceptable.

  • [B2] Set the num_epochs argument of the validation tf.train.slice_input_producer to 1, and additionally set the allow_smaller_final_batch argument of the tf.train.batch function to True.

    In this case, the validation set is cycled through exactly once, after which the respective queue is closed forever. By default, this will make evaluating the validation set two or more times impossible. Since I do not know of a good way to reopen a queue in TensorFlow, I need to work around this limitation.

Due to the greater limitations of option [B1], I chose to work around the issues of option [B2] instead. The (pseudo-)code outlining my current approach is as follows:

The training loop should be fairly canonical. Every K iterations, a function to evaluate the validation set is called. Note that I only start the queues that have a name starting with "train_"; these is the queue set up for collecting generated training data. In order to do this, I created two helper functions, get_queues_by_name and start_queue_runners.

def train_loop(train_ops, vali_ops, ...):
    with tf.Session() as sess:
        coord = tf.train.Coordinator()
        sess.run([tf.initialize_all_variables(), tf.initialize_local_variables()])
        load_latest_snapshot(sess, loader, snapshot_file)

        # Launch the queue runners
        queues = get_queues_by_name("train")
        threads = start_queue_runners(sess, coord, queues)

        try:
            for step in range(start_iteration, num_train_iterations):
                # Runs the session on validation set
                if step % K == 0:
                    validation_results = run_validation(vali_ops, snapshot_file)

                # TRAINING:
                # ...

        except Exception as e:
            coord.request_stop(e)
        finally:
            coord.request_stop()
            coord.join(threads)

The helper functions look like this:

def get_queues_by_name(name):
    """Retrieves all queues that contain the string given by 'name'"""
    all_queues = tf.get_collection(tf.GraphKeys.QUEUE_RUNNERS)
    return [q for q in all_queues if name in q.name]


def start_queue_runners(session, coordinator, queues):
    """Similar to tf.train.start_queue_runners but now accepts a list of queues instead of a graph collection"""
    with session.graph.as_default():
        threads = []
        for queue in queues:
            log("Queue", "Starting queue '%s'" % queue.name, level=2)
            threads.extend(queue.create_threads(session, coordinator, daemon=True, start=True))
    return threads

In the run_validation function, my chosen workaround against the issue of a closed queue is to create a new tf.Session. I also only start the threads associated with the queue collecting validation set data.

def run_validation(ops, snapshot_file):  # Called inside train_loop()
    results = None
    loader = tf.train.Saver()

    with tf.Session() as sess:
        coord = tf.train.Coordinator()
        sess.run([tf.initialize_local_variables()])
        load_latest_snapshot(sess, loader, snapshot_file)

        # Launch the queue runners
        queues = get_queues_by_name("eval")
        threads = start_queue_runners(sess, coord, queues)

        # Performs the inference in batches
        try:
            # Evaluate validation set:
            results = eval_in_batches(ops, sess)
        except Exception as e:
            coord.request_stop(e)
        finally:
            coord.request_stop()
            coord.join(threads)

    return results

I do not know whether creating a new tf.Session here is a good idea, but it seems like the only way to accomplish restarting the validation queue. Ideally, I also wouldn't want to re-load the model snapshot, as this seems conceptually unnecessary.

The issue with this code is that I see erratic/undefined behavior during running, such as NaN's or Inf's appearing inside the network during validation set evaluation. This seems to occur predominantly when the validation set queue is being filled at the same time as the training set queue is still being filled (since the training queue is open during validation set evaluation). For example, this very often happens if I evaluate the validation set at iteration 0 (when both queues still need to be filled). It almost seems as if the training/validation queues share some global state, although they are running in a different session.

Can someone explain why this is happening, and how I can solve this more cleanly while taking my above assumptions [A1]-[A5] into account?

1 Answer

+2 votes
by (6.8k points)
  • In training, I want to use a RandomShuffleQueue which makes it even more complicated. I think I'll simply ignore the matter and once the reader thread that enqueues tensors into the queue finishes, I will let the training stop, so I loose the remaining up-to capability things for this epoch and simply use it for the consequent epoch. Maybe to create it settled I check in the train-thread that I still scan from the queue till there are solely min_after_dequeue things left.
  • In evaluation, I want to use the same graph and the same session. I can use tf.cond to read from another separate queue instead of the RandomShuffleQueue. Or I could use feed_dict in evaluation. If I would use a separate queue, I would use a FIFO queue and carefully track that I do the right amount of steps. I could also introduce another dummy tensor which I enqueue into the queue which gives me an end_of_epoch flag or so, so then I know in the eval-thread when to stop.
Since, it is a part of production systems, undergoing a Machine Learning Training will be quite useful when it comes to validating the data queue.

Browse Categories

...