4419

How to use FileIO.writeDynamic() in Apache Beam 2.6 to write to multiple output paths?

Question:

I am using Apache Beam 2.6 to read from a single Kafka topic and write the output to Google Cloud Storage (GCS). Now I want to alter the pipeline so that it is reading multiple topics and writing them out as gs://bucket/topic/...

When reading only a single topic I used TextIO in the last step of my pipeline:

TextIO.write() .to( new DateNamedFiles( String.format("gs://bucket/data%s/", suffix), currentMillisString)) .withWindowedWrites() .withTempDirectory( FileBasedSink.convertToFileResourceIfPossible( String.format("gs://bucket/tmp%s/%s/", suffix, currentMillisString))) .withNumShards(1));

<a href="https://stackoverflow.com/a/50241407/5497956" rel="nofollow">This</a> is a similar question, which code I tried to adapt.

FileIO.<EventType, Event>writeDynamic() .by( new SerializableFunction<Event, EventType>() { @Override public EventType apply(Event input) { return EventType.TRANSFER; // should return real type here, just a dummy } }) .via( Contextful.fn( new SerializableFunction<Event, String>() { @Override public String apply(Event input) { return "Dummy"; // should return the Event converted to a String } }), TextIO.sink()) .to(DynamicFileDestinations.constant(new DateNamedFiles("gs://bucket/tmp%s/%s/", currentMillisString), new SerializableFunction<String, String>() { @Override public String apply(String input) { return null; // Not sure what this should exactly, but it needs to // include the EventType into the path } })) .withTempDirectory( FileBasedSink.convertToFileResourceIfPossible( String.format("gs://bucket/tmp%s/%s/", suffix, currentMillisString))) .withNumShards(1))

The <a href="https://github.com/apache/beam/blob/release-2.6.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java#L169" rel="nofollow">official JavaDoc</a> contains example code which seem to have outdated method signatures. (The .via method seems to have switched the order of the arguments). I' furthermore stumbled across the example in FileIO which confused me - shouldn't TransactionType and Transaction <a href="https://github.com/apache/beam/blob/release-2.6.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L296" rel="nofollow">in this line</a> change places?

Answer1:

After a night of sleep and a fresh start I figured out the solution, I used the functional Java 8 style as it makes the code shorter (and more readable):

.apply( FileIO.<String, Event>writeDynamic() .by((SerializableFunction<Event, String>) input -> input.getTopic()) .via( Contextful.fn( (SerializableFunction<Event, String>) input -> input.getPayload()), TextIO.sink()) .to(String.format("gs://bucket/data%s/", suffix) .withNaming(type -> FileNaming.getNaming(type, "", currentMillisString)) .withDestinationCoder(StringUtf8Coder.of()) .withTempDirectory( String.format("gs://bucket/tmp%s/%s/", suffix, currentMillisString)) .withNumShards(1));

Explanation:

<ul><li>Event is a Java POJO containing the payload of the Kafka message and the topic it belongs to, it is parsed in a ParDo after the KafkaIO step</li> <li>suffix is a either dev or empty and set by environment variables</li> <li>currentMillisStringcontains the timestamp when the whole pipeline was launched so that new files don't overwrite old files on GCS when a pipeline gets restarted</li> <li>

FileNaming implements a custom naming and receives the type of the event (the topic) in it's constructor, it uses a custom formatter to write to daily partitioned "sub-folders" on GCS:

class FileNaming implements FileIO.Write.FileNaming { static FileNaming getNaming(String topic, String suffix, String currentMillisString) { return new FileNaming(topic, suffix, currentMillisString); } private static final DateTimeFormatter FORMATTER = DateTimeFormat .forPattern("yyyy-MM-dd").withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("Europe/Zurich"))); private final String topic; private final String suffix; private final String currentMillisString; private String filenamePrefixForWindow(IntervalWindow window) { return String.format( "%s/%s/%s_", topic, FORMATTER.print(window.start()), currentMillisString); } private FileNaming(String topic, String suffix, String currentMillisString) { this.topic = topic; this.suffix = suffix; this.currentMillisString = currentMillisString; } @Override public String getFilename( BoundedWindow window, PaneInfo pane, int numShards, int shardIndex, Compression compression) { IntervalWindow intervalWindow = (IntervalWindow) window; String filenamePrefix = filenamePrefixForWindow(intervalWindow); String filename = String.format( "pane-%d-%s-%05d-of-%05d%s", pane.getIndex(), pane.getTiming().toString().toLowerCase(), shardIndex, numShards, suffix); String fullName = filenamePrefix + filename; return fullName; } } </li> </ul>

Recommend

  • Akka stream sliding window to control reduce emit to sink by SourceQueue
  • How to put contents of stream into a val?
  • Send IoT Hub Cloud-to-Device message from Stream Analytics Output (Using Event Hub endpoint)
  • how do you build gstreamer's gst-launch pipelines?
  • Make variadic function which takes arbitary number of void functors
  • Recovering state consistency in Flink when using Kafka as EventStore
  • Directshow RenderStream “the parameter is incorrect”
  • Shiny print reactive df to console (glimpse(myreactive_df) for debugging purposes?
  • Decompress file from Boost filtering_streambuf to std::vector?
  • Flutter: Display content from paginated API with dynamic ListView
  • How to hide or disable in-function printed message
  • Confluent Control Center Interceptor
  • Is there a way to configure multiple Serilog RollingFiles through appSetting configuration
  • Maximum Flow in Directed Graph
  • Boost Fusion: validate adapted struct member ordering at compile time
  • Img height on auto height div
  • FAILURE: Error Domain=NSURLErrorDomain Code=-1004 “Could not connect to the server.”
  • Multi-dimensional regression with Keras
  • NHibernate Lazy Loading Behaviour
  • Testing Rails module without loading ActiveRecord
  • How to emulate integrated numeric keypad cursor keys in linux
  • AngularJS - ngBind and Bootstrap Switch
  • python struct.pack(): pack multiple datas in a list or a tuple
  • Flink: How to write DataSet to a variable instead of to a file
  • jQuery file download plugin
  • How to call BeanFactoryPostProcessor.postProcessBeanFactory method when use Spring with XML configur
  • Is a .txt file created in VB different than one I'd randomly create?
  • What do I do with this error when I run tests in rails?
  • if some function is not optimized does it mean that all functions where it is declared are not optim
  • Why doesn't a local variable live long enough for thread::scoped?
  • Why does the font in these TD elements render at different sizes?
  • NUnit 3.0 TestCase const custom object arguments
  • Default parameter as generic type
  • Time complexity of a program which involves multiple variables
  • Windows forms listbox.selecteditem displaying “System.Data.DataRowView” instead of actual value
  • Buffer size for converting unsigned long to string
  • Hits per day in Google Big Query
  • How to get Windows thread pool to call class member function?
  • LevelDB C iterator
  • How can i traverse a binary tree from right to left in java?