Cutting down bag to pass to udf

Using Pig on a Hadoop cluster, I have a huge bag of huge tuples which I regularly add fields to as I continue to work on this project, and several UDFs which use various fields from it. I want to be able to call a UDF on just a few fields from each tuple and <strong>reconnect the result to that particular tuple</strong>. Doing a join to reconnect the records using unique ids takes forever on billions of records.

I think there should be a way to do this all inside the GENERATE statement, but I can't find the right syntax.

Here is some toy code using a Python UDF to get the idea across.

Register '' using jython as myfuncs; jumps = LOAD 'jumps.csv' USING PigStorage(',') AS (jumper:int, attempt:int, distance:double, location:chararray); byJumper = GROUP jumps by jumper; sigmas = FOREACH byJumper GENERATE jumps.jumper, jumps.attempt, jumps.distance, jumps.location, myfuncs.conv2sigma(jumps.distance); rmf sigmas STORE sigmas INTO 'sigmas' USING PigStorage(',');

This is producing bags of tuples with single fields in each tuple, rather than tuples of the form I expect.

The input data is

    <li>people (with unique integer IDs),</li> <li>their long jump attempts (with unique-to-that-person integer IDs),</li> <li>the distance they jumped,</li> <li>the location they were jumping at the time.</li> </ul>

    For each jump we want to generate how many standard deviations (sigmas) the jumper was from their average, then later we'll correlate sigmas by location to see where jumpers do the best. We need to calculate the average and standard deviation for each person then a 'sigma' for each jump, and store the data with the new sigma field attached.

    The question is:

    How do we change this to output tuples like (jumper:int, attempt:int, distance:double, location:chararray, sigma:double)?

    I have tried FLATTEN in various ways and it only gets me enormous cross-products. I can change my UDF to take in jumper and attempt and output a triple then do a JOIN, but in the real world this solution is enormously impractical because of the size of the data sets.

    Here's the supporting code and data if you want to try it at home: (a quick, not thoughtful, implementation -- the only important thing here is that it takes a bag input and produces a bag output with one output tuple corresponding to each input tuple)

    #!/usr/local/bin/python # we're forced to use python 2.5.2 :-( from math import sqrt @outputSchema("y:bag{t:tuple(sigma:double)}") def conv2sigma(bag): s = 0.0 n = 0 dd = [] print('conv2sigma input bag:') print(bag) for word in bag: d = float(word[0]) dd.append(d) n += 1 s += d a = s / n ss = 0 for d in dd: ss += (d-a)**2 sd = sqrt(ss) outputBag = [] for d in dd: outputBag.append( ( (d-a)/sd, ) ) print('conv2sigma output bag:') print(outputBag) return outputBag

    The input file jumps.csv:

    0,0,5,a 0,1,6,b 0,2,7,c 0,3,5,a 0,4,8,c 0,5,7,b 0,6,6,b 0,7,7,c 0,8,5,a 1,0,6,a 1,1,5,a 1,2,7,b 1,3,4,a 1,4,5,a 1,5,7,b 1,6,8,c 1,7,9,c 1,8,5,a 1,9,4,a 1,10,5,a 1,11,6,b 1,12,8,c 1,13,8,b 2,0,7,b 2,1,5,a 2,2,6,b 2,3,5,a 2,4,7,c 2,5,5,a 2,6,6,c 2,7,5,a 2,8,7,b 2,9,5,a 2,10,6,b

    The output produced as written now:

    {(0),(0),(0),(0),(0),(0),(0),(0),(0)},{(1),(2),(3),(4),(5),(6),(7),(8),(0)},{(6.0),(7.0),(5.0),(8.0),(7.0),(6.0),(7.0),(5.0),(5.0)},{(b),(c),(a),(c),(b),(b),(c),(a),(a)},{(-0.07188851546895898),(0.25160980414135625),(-0.39538683507927425),(0.5751081237516715),(0.25160980414135625),(-0.07188851546895898),(0.25160980414135625),(-0.39538683507927425),(-0.39538683507927425)} {(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1)},{(8),(0),(1),(2),(3),(4),(5),(6),(7),(9),(10),(11),(12),(13)},{(5.0),(6.0),(5.0),(7.0),(4.0),(5.0),(7.0),(8.0),(9.0),(4.0),(5.0),(6.0),(8.0),(8.0)},{(a),(a),(a),(b),(a),(a),(b),(c),(c),(a),(a),(b),(c),(b)},{(-0.20716308289978433),(-0.03655819109996196),(-0.20716308289978433),(0.1340467006998604),(-0.3777679746996067),(-0.20716308289978433),(0.1340467006998604),(0.30465159249968277),(0.4752564842995052),(-0.3777679746996067),(-0.20716308289978433),(-0.03655819109996196),(0.30465159249968277),(0.30465159249968277)} {(2),(2),(2),(2),(2),(2),(2),(2),(2),(2),(2)},{(0),(1),(2),(3),(4),(5),(6),(7),(8),(9),(10)},{(7.0),(5.0),(6.0),(5.0),(7.0),(5.0),(6.0),(5.0),(7.0),(5.0),(6.0)},{(b),(a),(b),(a),(c),(a),(c),(a),(b),(a),(b)},{(0.4276686017238498),(-0.2960782627318961),(0.06579516949597684),(-0.2960782627318961),(0.4276686017238498),(-0.2960782627318961),(0.06579516949597684),(-0.2960782627318961),(0.4276686017238498),(-0.2960782627318961),(0.06579516949597684)}

    Each output tuple is a collection of bags, and each bag contains tuples with single entries from one field, which is not what we want.


    You will need to do this in two steps. Each jump has its own sigma value, so in order to properly associate each sigma with the correct jump, you will either need to pass the IDs to the sigma-computing UDF and then join the results back in (bad idea), or compute the summary statistics first (mean and standard deviation) and then later derive the sigma from that. Here's how:

    jumps = LOAD 'jumps.csv' USING PigStorage(',') AS (jumper:int, attempt:int, distance:double, location:chararray); byJumper = GROUP jumps by jumper; jumperSummaries = FOREACH byJumper GENERATE group AS jumper, FLATTEN(jumps.(attempt, distance, location)), myfuncs.mean(jumps.distance) AS mean, myfunds.stddev(jumps.distance) AS stddev; sigmas = FOREACH jumperSummaries GENERATE jumper, attempt, distance, location, myfuncs.sigma(distance, mean, stddev) AS sigma;

    The FLATTEN ungroups all the jumps and gives you back your original input, except now every record also has copied the mean and standard deviation for that jumper, which you can then use to compute the sigma for each jump row-by-row.

    Note that while this is conceptually two steps, it still only takes one map-reduce job.


    For comparison with WinnieNicklaus' answer, and to draw comments, here is the solution I came up with:

    Register '' using jython as myfuncs; jumps = LOAD 'jumps.csv' USING PigStorage(',') AS (jumper:int, attempt:int, distance:double, location:chararray); byJumper = GROUP jumps by jumper; sigmas0 = FOREACH byJumper GENERATE FLATTEN(jumps), FLATTEN(myfuncs.conv2sigma(jumps.(jumper,attempt,distance))); sigmas1 = FILTER sigmas0 BY jumper == s_id AND attempt == s_att; sigmas = FOREACH sigmas1 GENERATE jumper, attempt, distance, location, sigma; rmf sigmas STORE sigmas INTO 'sigmas' USING PigStorage(',');

    The first FOREACH creates a (potentially large) cross product sigma0, filters out the "incorrect" elements of the product and generates the desired fields. JOIN is often academically described this way.

    This seems like it may be slow.

    But it still results in a single Map-Reduce job, and seems to be fast in practice.

    The huge win for me is that it allows my UDF to do arbitrarily complicated things and return arbitrarily many tuples which are rejoined to the input data.

    人吐槽 人点赞



用户名: 密码:
验证码: 匿名发表


查看评论:Cutting down bag to pass to udf