Reading a few rows from Bigquery as a side input, getting None


I am having trouble with side inputs, specifically BQ, for dataflow pipelines, even after going on coursera and looking at the samples.

Right now, I have a pipeline that reads files in a gcs bucket, gets their filenames, and then transforms the file and writes a given number of rows to bigquery. I am trying to figure out how to map the filename to a specific "key" from bq.

result = [m.metadata_list for m in gcs.match(['gs://{}/countries*'.format(BUCKET)])] result = reduce(add, result) #create each input PCollection name variables = ['p{}'.format(i) for i in range(len(result))]

Based on the result, I build a tuple of all the filenames (filename1, filename2…) create a query dynamically that is: Bqquery = "SELECT FILENAME, FILE_ID from 'project:dataset.table' where FILENAME IN (filename tuple)" I thought I would do this as it will be approximately 20 files at once so it would make sense to get the data from bq once instead of having to get the file_id inside the for loop.

So I did

Bqcollection = p | 'Get File_Id' >> beam.io.Read(beam.io.BigQuerySource(query=bqquery)) But the result I get is none for i in range(len(result)): current_file = result[i].path #query inside for loop #bqquery= "SELECT FILE_ID" from 'project:dataset.table' where FILENAME = '{0}'".format(current_file) # file_id = p | 'GetFile_id_{0}'.format(i) >> beam.io.Read(beam.io.BigQuerySource(query=bqquery)) globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.ParDo(AddFilenamesFn(), current_file)

I also tried doing the query inside of the for loop to just get one filename at a time (see the commented out code) but that didn't work either. Ultimately what I want to do is change beam.Pardo(AddFilenamesFn(), current_file) to beam.Pardo(AddFileNamesFn(), file_id) so instead of adding the actual filename I'm adding the fileid

[note the label mentions in the code (i.e. read_labels[i]) are just labels for dataflow]

I assume i'm missing something pretty basic about pcollections, but not sure


I think that the easiest solution, taking into account the code in the previous <a href="https://stackoverflow.com/questions/53404579/dataflow-apache-beam-how-to-access-current-filename-when-passing-in-pattern" rel="nofollow">question</a> would be to run the queries inside the AddFilenamesFn ParDo within the for loop. Keep in mind that beam.io.Read(beam.io.BigQuerySource(query=bqquery)) is used to read rows as source and not in an intermediate step. So, in the case I propose, you can use the Python Client Library directly (google-cloud-bigquery>0.27.0):

<pre class="lang-py prettyprint-override">class AddFilenamesFn(beam.DoFn): """ParDo to output a dict with file id (retrieved from BigQuery) and row""" def process(self, element, file_path): from google.cloud import bigquery client = bigquery.Client() file_name = file_path.split("/")[-1] query_job = client.query(""" SELECT FILE_ID FROM test.file_mapping WHERE FILENAME = '{0}' LIMIT 1""".format(file_name)) results = query_job.result() for row in results: file_id = row.FILE_ID yield {'filename':file_id, 'row':element}

This would be the most straight-forward solution to implement but it might arise an issue. Instead of running all ~20 possible queries at the start of the pipeline we are running a query for each line/record. For example, if we have 3,000 elements in a single file the same query will be launched 3,000 times. However, each different query should be actually run only once and subsequent query "repeats" will hit the <a href="https://cloud.google.com/bigquery/docs/cached-results#disabling_retrieval_of_cached_results" rel="nofollow">cache</a>. Also note that cached queries do not contribute towards the interactive query <a href="https://cloud.google.com/bigquery/quotas#query_jobs" rel="nofollow">limit</a>.

I used the same files of my previous <a href="https://stackoverflow.com/a/53459687/6121516" rel="nofollow">answer</a>:

<pre class="lang-sh prettyprint-override">$ gsutil cat gs://$BUCKET/countries1.csv id,country 1,sweden 2,spain gsutil cat gs://$BUCKET/countries2.csv id,country 3,italy 4,france

and added a new table:

<pre class="lang-sh prettyprint-override">bq mk test.file_mapping FILENAME:STRING,FILE_ID:STRING bq query --use_legacy_sql=false 'INSERT INTO test.file_mapping (FILENAME, FILE_ID) values ("countries1.csv", "COUNTRIES ONE"), ("countries2.csv", "COUNTRIES TWO")'

<a href="https://i.stack.imgur.com/m8qrR.png" rel="nofollow"><img alt="enter image description here" class="b-lazy" data-src="https://i.stack.imgur.com/m8qrR.png" data-original="https://i.stack.imgur.com/m8qrR.png" src="https://etrip.eimg.top/images/2019/05/07/timg.gif" /></a>

and the output is:

<pre class="lang-sh prettyprint-override">INFO:root:{'filename': u'COUNTRIES ONE', 'row': u'id,country'} INFO:root:{'filename': u'COUNTRIES ONE', 'row': u'1,sweden'} INFO:root:{'filename': u'COUNTRIES ONE', 'row': u'2,spain'} INFO:root:{'filename': u'COUNTRIES TWO', 'row': u'id,country'} INFO:root:{'filename': u'COUNTRIES TWO', 'row': u'3,italy'} INFO:root:{'filename': u'COUNTRIES TWO', 'row': u'4,france'}

Another solution would be to load all the table and materialize it as a side input (depending on size this can be problematic of course) with beam.io.BigQuerySource() or, as you say, break it down into N queries and save each one into a different side input. Then you could select the appropriate one for each record and pass it as an additional input to AddFilenamesFn. It would be interesting to try to write that one, too.

Full code for my first proposed solution:

<pre class="lang-py prettyprint-override">import argparse, logging from operator import add import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.io import ReadFromText from apache_beam.io.filesystem import FileMetadata from apache_beam.io.filesystem import FileSystem from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem class GCSFileReader: """Helper class to read gcs files""" def __init__(self, gcs): self.gcs = gcs class AddFilenamesFn(beam.DoFn): """ParDo to output a dict with file id (retrieved from BigQuery) and row""" def process(self, element, file_path): from google.cloud import bigquery client = bigquery.Client() file_name = file_path.split("/")[-1] query_job = client.query(""" SELECT FILE_ID FROM test.file_mapping WHERE FILENAME = '{0}' LIMIT 1""".format(file_name)) results = query_job.result() for row in results: file_id = row.FILE_ID yield {'filename':file_id, 'row':element} # just logging output to visualize results def write_res(element): logging.info(element) return element def run(argv=None): parser = argparse.ArgumentParser() known_args, pipeline_args = parser.parse_known_args(argv) p = beam.Pipeline(options=PipelineOptions(pipeline_args)) gcs = GCSFileSystem(PipelineOptions(pipeline_args)) gcs_reader = GCSFileReader(gcs) # in my case I am looking for files that start with 'countries' BUCKET='BUCKET_NAME' result = [m.metadata_list for m in gcs.match(['gs://{}/countries*'.format(BUCKET)])] result = reduce(add, result) # create each input PCollection name and unique step labels variables = ['p{}'.format(i) for i in range(len(result))] read_labels = ['Read file {}'.format(i) for i in range(len(result))] add_filename_labels = ['Add filename {}'.format(i) for i in range(len(result))] # load each input file into a separate PCollection and add filename to each row for i in range(len(result)): globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.ParDo(AddFilenamesFn(), result[i].path) # flatten all PCollections into a single one merged = [globals()[variables[i]] for i in range(len(result))] | 'Flatten PCollections' >> beam.Flatten() | 'Write results' >> beam.Map(write_res) p.run() if __name__ == '__main__': run()


  • Google Cloud Dataflow Write to CSV from dictionary
  • BigQuery in Dataflow fails to load data from Cloud Storage: JSON object specified for non-record fie
  • ValueProvider type parameters not getting honored at the template execution time
  • WMQ Pub/Sub Topic to Queue bridge
  • HSQLDB: enable LOB compression for existing database
  • Joshua Bloch #Item 1: Consider static factory methods instead of constructors
  • D3: Yet another “Cannot read property 'weight' of undefined”
  • How to solve “Neither Jetty ALPN nor OpenSSL via netty-tcnative were properly configured”?
  • How to check user group in ClearCase client server
  • Get all derived types of a type
  • How do I remove gloss from an iPhone app icon?
  • Using JNA to Get NSWindow from JComponent on mac
  • Entity Framework connect to Oracle: ODP for .NET “does not support time”
  • Python error: len() of unsized object while using statsmodels with one row of data
  • Matching multiple strings to one long string using patterns
  • How do I read and write repeatedly from a process in vim?
  • What's a good way to make a one-shot KVO observation?
  • jQuery: click function bind in for-loop with closure fix
  • Different builds of turn-based Game Center game can’t see each other’s matches
  • ggplot2 polygon world map centred with limits gives funny edges
  • Where in the relevant specification is it documented that some comments in a SQL script are, in fact
  • UML diagram generator in Visual Studio 2010
  • java.lang.NoClassDefFoundError: com.parse.Parse$Configuration$Builder on below Lollipop versions
  • Does CUDA 5 support STL or THRUST inside the device code?
  • Which linear programming package should I use for high numbers of constraints and “warm starts” [clo
  • R: gsub and capture
  • Calling of Constructors in a Java
  • jqPlot EnhancedLegendRenderer plugin does not toggle series for Pie charts
  • Traverse Array and Display in markup
  • Transpose CSV data with awk (pivot transformation)
  • Comma separated Values
  • Benchmarking RAM performance - UWP and C#
  • Acquiring multiple attributes from .xml file in c#
  • Why can't I rebase on to an ancestor of source changesets if on a different branch?
  • How to CLICK on IE download dialog box i.e.(Open, Save, Save As…)
  • Can Visual Studio XAML designer handle font family names with spaces as a resource?
  • How can I remove ASP.NET Designer.cs files?
  • Are Kotlin's Float, Int etc optimised to built-in types in the JVM? [duplicate]
  • java string with new operator and a literal
  • How to load view controller without button in storyboard?