52536

How to aggregate CSV lines with Apache Camel?

Question:

I have a CSV similar to this:

County City Area Street county1 city1 area1 street1 county1 city1 area2 street2 county1 city1 area3 street7 county1 city2 area2 street2 county1 city2 area6 street1 county2 city1 area3 street3 county2 city1 area3 street2 ...

During the CSV parsing, I need to aggregate the same County/City to create a final structure like this:

county1/city1: [ [area1, street1], [area2, street2], [area3, street7] ] county1/city2: [ [area2, street2], [area6, street1] ] county2/city1: [ [area3, street3], [area3, street2] ]

basically a grouping by county/city.

I tried different things with Camel, this is the latest:

class CsvAppender { CsvRow append(CsvRow existing, CsvRow next) { next.previous = existing next } } @CsvRecord(separator = "\\t") class CsvRow { @DataField(pos = 1) private String county @DataField(pos = 2) private String city @DataField(pos = 3) private String area @DataField(pos = 4) private String street CsvRow previous boolean sameAggregateWithPrevious() { previous?.county == county && previous?.city == city } public String toString() { "${county} ${city} ${area} ${street}" } } class CsvRouteBuilder extends RouteBuilder { void configure() { CsvAppender appender = new CsvAppender() Closure predicate = { exchange -> def body = exchange.getIn().getBody(CsvRow.class) def currentAggregate = exchange.getIn().getHeader('CurrentAggregate') def nextAggregate = exchange.getIn().getHeader('NextAggregate') if (!currentAggregate) { currentAggregate = body.previous ? [ body.previous ] : [] nextAggregate = [] } else if (exchange.getIn().getHeader('AggregateComplete')) { currentAggregate = nextAggregate nextAggregate = [] } def aggregateComplete = body.sameAggregateWithPrevious() if (aggregateComplete) { nextAggregate << body } else { currentAggregate << body } exchange.getIn().setHeaders(['CurrentAggregate': currentAggregate, 'NextAggregate': nextAggregate, 'AggregateComplete': aggregateComplete]) aggregateComplete } from("file:/tmp/folder?noop=true") .split(body().tokenize('\n')).streaming() .unmarshal().bindy(BindyType.Csv, CsvRow.class) .aggregate(constant(true), AggregationStrategies.bean(appender, "append")).completionPredicate(predicate) .process({ it.getOut().setBody(it.getIn().getHeader('CurrentAggregate')) }) .convertBodyTo(String.class) .to("jms:myCsvSplitter") } }

Anyway my solution doesn't fully work as sometime the "previous" element is null and the code looks too verbose.

Any idea how to aggregate the csv file properly?

Answer1:

I've got some rough code that works that should hopefully be good enough to help you along. It's in Java rather than Groovy, on the grounds that my Groovy isn't up to much. It should be easy enough to translate though.

Firstly the aggregator:

public class MyAgregationStrategy implements AggregationStrategy { @Override public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { CsvRow newBody = (CsvRow)newExchange.getIn().getBody(); Map<String, List<CsvRow>> map = null; if (oldExchange == null) { map = new HashMap<String, List<CsvRow>>(); ArrayList list = new ArrayList<CsvRow>(); list.add(newBody); map.put(newBody.getCounty(), list); newExchange.getIn().setBody(map); return newExchange; } else { map = oldExchange.getIn().getBody(Map.class); List list = map.get(newBody.getCounty()); if ( list == null ) { list = new ArrayList<CsvRow>(); } list.add(newBody); map.put(newBody.getCounty(), list); oldExchange.setProperty("CamelSplitComplete", newExchange.getProperty("CamelSplitComplete")); return oldExchange; } } }

This stores the rows in a list in a map, keyed by the county.

Then the route:

public class MyRouteBuilder extends RouteBuilder { @Override public void configure() throws Exception { from("file:/c:/dev/test?noop=true") .split(body().tokenize("\n")) .log("Read line ${body}") .unmarshal() .bindy(BindyType.Csv, CsvRow.class) .aggregate(constant(true), new MyAgregationStrategy()).completionPredicate(simple("${property.CamelSplitComplete} == true")) .process(new Processor() { @Override public void process(Exchange exchange) throws Exception { Map results = (Map) exchange.getIn().getBody(); System.out.println("Got results for " + results.size() + " counties"); } }); } }

It uses the CamelSplitComplete property to detect when the splitting is finished. In the processpr at the end you can then do what you like with the map. Alternatively you can change the aggregator strategy to aggregate however you need the results.

Hope this helps.

Recommend

  • Apache Camel with Json Array split
  • JSF not working on WebSphere Liberty Profile
  • RFX equivalent data type for _int64 in Informix
  • how to create aggregation on a graph from CONSTRUCT
  • Case insensitive search in CQ5 using querybuilder
  • Camel REST Bean Chaining
  • I can't get setInterval() to work in Javascript
  • Python equivalent of Scala's exists() function?
  • Adding a new element into the DOM with angularjs does not initiate it
  • Jquery UI Sortable, move item automatically
  • Most efficient way to replace lowest list values in dataframe in R
  • List comprehension with if conditional to get list of files of a specific type
  • passing a default argument to a browserify module
  • Configure Spring's MappingJacksonHttpMessageConverter
  • vectorized indexing/slicing in numpy/scipy?
  • How do I retrieve the user information of a user authenticated with Apache's mod_ldap?
  • Unexpected token ILLEGAL while running node.js mocha test
  • Angular2 Response for preflight is invalid (redirect) from some GET requests
  • How do I configure context broker accept post requests from my remote sensor?
  • Run multiple queries from 1 SQL file showing result in multiple tables
  • wxPython: displaying multiple widgets in same frame
  • how to avoid repetitive constructor in children
  • How to use remove-erase idiom for removing empty vectors in a vector?
  • Highlight and Bold text in JTextPane
  • how to do an event when i swipe from fragment to the other
  • swift auto completion not working in Xcode6-Beta
  • Is there a amazon webstore API for customers?
  • Akka Routing: Reply's send to router ends up as dead letters
  • How to delete a row from a dynamic generate table using jquery?
  • unknown Exception android
  • EntityFramework adding new object to nested object collection
  • Is there any way to bind data to data.frame by some index?
  • Checking variable from a different class in C#
  • How to get NHibernate ISession to cache entity not retrieved by primary key
  • Observable and ngFor in Angular 2
  • How to Embed XSL into XML
  • failed to connect to specific WiFi in android programmatically
  • UserPrincipal.Current returns apppool on IIS
  • Conditional In-Line CSS for IE and Others?
  • How can I use threading to 'tick' a timer to be accessed by other threads?