Using multiple input pipelines in TensorFlow

I know how to use an input pipeline to read data from files:

input = ... # Read from file loss = network(input) # build a network train_op = ... # Using SGD or other algorithms to train the network.

But how can I switch between multiple input pipelines? Say, if I want to train a network for 1000 batches on the training set from the training pipeline, then validate it on a validation set from another pipeline, then keep training, then validate, then train, ..., and so forth.

It's easy to implement this with feed_dict. I also know how to use checkpoints to achieve this, just like in the cifar-10 example. But it's kind of cumbersome: I need to dump the model to disk then read it from disk again.

Can I just switch between two input pipelines (one for training data, one for validation data) to achieve this? Reading 1000 batches from the training data queue, then a few batched from the validation data queue, and so forth. If it is possible, how to do it?


Not sure if this is exactly what you are looking for, but I am doing training and validation in the same code in two separate loops. My code reads numeric and string data from .CSV files, not images. I am reading from two separate CSV files, one for training and one for validation. I'm sure you can generalize it to read from two 'sets' of files, rather than just single files, as the code is there.

Here are the code snippets in case it helps. Note that this code first reads everything as string and then converts the necessary cells into floats, just given my own requirements. If your data is purely numeric, you should just set the defaults to floats and all should be easier. Also, there are a couple of lines in there that drop Weights and Biases into a CSV file AND serialize them into the TF checkpoint file, depending on which way you'd prefer.

#first define the defaults: rDefaults = [['a'] for row in range((TD+TS+TL))] # this function reads line-by-line from CSV and separates cells into chunks: def read_from_csv(filename_queue): reader = tf.TextLineReader(skip_header_lines=False) _, csv_row = reader.read(filename_queue) data = tf.decode_csv(csv_row, record_defaults=rDefaults) dateLbl = tf.slice(data, [0], [TD]) features = tf.string_to_number(tf.slice(data, [TD], [TS]), tf.float32) label = tf.string_to_number(tf.slice(data, [TD+TS], [TL]), tf.float32) return dateLbl, features, label #this function loads the above lines and spits them out as batches of N: def input_pipeline(fName, batch_size, num_epochs=None): filename_queue = tf.train.string_input_producer( [fName], num_epochs=num_epochs, shuffle=True) dateLbl, features, label = read_from_csv(filename_queue) min_after_dequeue = 10000 capacity = min_after_dequeue + 3 * batch_size # max of how much to load into memory dateLbl_batch, feature_batch, label_batch = tf.train.shuffle_batch( [dateLbl, features, label], batch_size=batch_size, capacity=capacity, min_after_dequeue=min_after_dequeue) return dateLbl_batch, feature_batch, label_batch # These are the TRAINING features, labels, and meta-data to be loaded from the train file: dateLbl, features, labels = input_pipeline(fileNameTrain, batch_size, try_epochs) # These are the TESTING features, labels, and meta-data to be loaded from the test file: dateLblTest, featuresTest, labelsTest = input_pipeline(fileNameTest, batch_size, 1) # 1 epoch here regardless of training # then you define the model, start the session, blah blah # fire up the queue: coord = tf.train.Coordinator() threads = tf.train.start_queue_runners(coord=coord) #This is the TRAINING loop: try: while not coord.should_stop(): dateLbl_batch, feature_batch, label_batch = sess.run([dateLbl, features, labels]) _, acc, summary = sess.run([train_step, accuracyTrain, merged_summary_op], feed_dict={x: feature_batch, y_: label_batch, keep_prob: dropout, learning_rate: lRate}) except tf.errors.OutOfRangeError: # (so done reading the file(s)) # by the way, this dumps weights and biases into a CSV file, since you asked for that np.savetxt(fPath + fIndex + '_weights.csv', sess.run(W), # and this serializes weight and biases into the TF-formatted protobuf: # tf.train.Saver({'varW': W, 'varB': b}).save(sess, fileNameCheck) finally: coord.request_stop() # now re-start the runners for the testing file: coord = tf.train.Coordinator() threads = tf.train.start_queue_runners(coord=coord) try: while not coord.should_stop(): # so now this line reads features, labels, and meta-data, but this time from the training file: dateLbl_batch, feature_batch, label_batch = sess.run([dateLblTest, featuresTest, labelsTest]) guessY = tf.argmax(y, 1).eval({x: feature_batch, keep_prob: 1}) trueY = tf.argmax(label_batch, 1).eval() accuracy = round(tf.reduce_mean(tf.cast(tf.equal(guessY, trueY), tf.float32)).eval(), 2) except tf.errors.OutOfRangeError: acCumTest /= i finally: coord.request_stop() coord.join(threads)

This may differ from what you are trying to do in the sense that it first completes the Training loop and THEN restarts the queues for the Testing loop. Not sure how you'd do this if you want to go back and fourth, but you can try to experiment with the two functions defined above by passing them the relevant file names (or lists) interchangeably.

Also I'm not sure if re-starting the queues after training is the best way to go, but it works for me. Would love to see a better example out there, as most TF examples use some built-in wrappers around the MNIST dataset to do the training in one go...


  • SQL IN Statement splitting parameter
  • Does user input go to the controller or model?
  • Javascript positive lookbehind alternative
  • Why is this implementation of a queue with two stacks immutable and thread safe?
  • Is it possible to group results together after, well, grouping them together?
  • How the preg_match handles the delimiter when \\Q..\\E used?
  • Macos Hadoop 3.1.1 - Failed to start namenode. java.io.IOException: Could not parse line: “Filesyste
  • make - define multiple variables in the same eval call
  • How to escape string for SQLite FTS query
  • What's the right way for summary write to avoid overlapping on tensorboard when restore a model
  • new spark.sql.shuffle.partitions value not used after checkpointing
  • “Complex Header” not responsive in current DataTables.net build?
  • Variant from android-autofittextview library : scaling makes strange behaviour
  • Checkpointing In ALS Spark Scala
  • in batch how do i use taskkill properly
  • Local Development, Apache vs Developer - file permissions
  • Excel's Macro-Recorder usage
  • jQuery ready not fired after rails link_to is clicked
  • Sencha Touch 2.0 Controller refs attribute not working?
  • Yii2: Config params vs. const/define
  • Algorithm for a smudge tool?
  • When to use `image` and when to use `Matrix` in Emgu CV?
  • Using $this when not in object context
  • What is the “return” in scheme?
  • How do I fake an specific browser client when using Java's Net library?
  • How reduce the height of an mschart by breaking up the y-axis
  • Perl system calls when running as another user using sudo
  • How to handle AllServersUnavailable Exception
  • Which linear programming package should I use for high numbers of constraints and “warm starts” [clo
  • VBA Convert delimiter text file to Excel
  • Unanticipated behavior
  • using conditional logic : check if record exists; if it does, update it, if not, create it
  • Windows forms listbox.selecteditem displaying “System.Data.DataRowView” instead of actual value
  • Can't mass-assign protected attributes when import data from csv file
  • Reading document lines to the user (python)
  • Binding checkboxes to object values in AngularJs
  • Unable to use reactive element in my shiny app
  • Net Present Value in Excel for Grouped Recurring CF
  • jQuery Masonry / Isotope and fluid images: Momentary overlap on window resize
  • How to load view controller without button in storyboard?