Using Pig on a Hadoop cluster, I have a huge bag
of huge tuples
which I regularly add fields to as I continue to work on this project, and several UDFs which use various fields from it. I want to be able to call a UDF on just a few fields from each tuple
and <strong>reconnect the result to that particular tuple</strong>. Doing a join to reconnect the records using unique ids takes forever on billions of records.
I think there should be a way to do this all inside the GENERATE
statement, but I can't find the right syntax.
Here is some toy code using a Python UDF to get the idea across.
Register 'jumper.py' using jython as myfuncs;
jumps = LOAD 'jumps.csv' USING PigStorage(',')
AS (jumper:int, attempt:int, distance:double, location:chararray);
byJumper = GROUP jumps by jumper;
sigmas = FOREACH byJumper GENERATE
jumps.jumper, jumps.attempt, jumps.distance, jumps.location,
myfuncs.conv2sigma(jumps.distance);
rmf sigmas
STORE sigmas INTO 'sigmas' USING PigStorage(',');
This is producing bags of tuples with single fields in each tuple, rather than tuples of the form I expect.
The input data is

<li>people (with unique integer IDs),</li>
<li>their long jump attempts (with uniquetothatperson integer IDs),</li>
<li>the distance they jumped,</li>
<li>the location they were jumping at the time.</li>
</ul>
For each jump we want to generate how many standard deviations (sigmas) the jumper was from their average, then later we'll correlate sigmas by location to see where jumpers do the best. We need to calculate the average and standard deviation for each person then a 'sigma' for each jump, and store the data with the new sigma field attached.
The question is:
How do we change this to output tuples like (jumper:int, attempt:int, distance:double, location:chararray, sigma:double)
?
I have tried FLATTEN
in various ways and it only gets me enormous crossproducts. I can change my UDF to take in jumper and attempt and output a triple then do a JOIN
, but in the real world this solution is enormously impractical because of the size of the data sets.
Here's the supporting code and data if you want to try it at home:
jumper.py: (a quick, not thoughtful, implementation  the only important thing here is that it takes a bag input and produces a bag output with one output tuple corresponding to each input tuple)
#!/usr/local/bin/python
# we're forced to use python 2.5.2 :(
from math import sqrt
@outputSchema("y:bag{t:tuple(sigma:double)}")
def conv2sigma(bag):
s = 0.0
n = 0
dd = []
print('conv2sigma input bag:')
print(bag)
for word in bag:
d = float(word[0])
dd.append(d)
n += 1
s += d
a = s / n
ss = 0
for d in dd:
ss += (da)**2
sd = sqrt(ss)
outputBag = []
for d in dd:
outputBag.append( ( (da)/sd, ) )
print('conv2sigma output bag:')
print(outputBag)
return outputBag
The input file jumps.csv
:
0,0,5,a
0,1,6,b
0,2,7,c
0,3,5,a
0,4,8,c
0,5,7,b
0,6,6,b
0,7,7,c
0,8,5,a
1,0,6,a
1,1,5,a
1,2,7,b
1,3,4,a
1,4,5,a
1,5,7,b
1,6,8,c
1,7,9,c
1,8,5,a
1,9,4,a
1,10,5,a
1,11,6,b
1,12,8,c
1,13,8,b
2,0,7,b
2,1,5,a
2,2,6,b
2,3,5,a
2,4,7,c
2,5,5,a
2,6,6,c
2,7,5,a
2,8,7,b
2,9,5,a
2,10,6,b
The output produced as written now:
{(0),(0),(0),(0),(0),(0),(0),(0),(0)},{(1),(2),(3),(4),(5),(6),(7),(8),(0)},{(6.0),(7.0),(5.0),(8.0),(7.0),(6.0),(7.0),(5.0),(5.0)},{(b),(c),(a),(c),(b),(b),(c),(a),(a)},{(0.07188851546895898),(0.25160980414135625),(0.39538683507927425),(0.5751081237516715),(0.25160980414135625),(0.07188851546895898),(0.25160980414135625),(0.39538683507927425),(0.39538683507927425)}
{(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1),(1)},{(8),(0),(1),(2),(3),(4),(5),(6),(7),(9),(10),(11),(12),(13)},{(5.0),(6.0),(5.0),(7.0),(4.0),(5.0),(7.0),(8.0),(9.0),(4.0),(5.0),(6.0),(8.0),(8.0)},{(a),(a),(a),(b),(a),(a),(b),(c),(c),(a),(a),(b),(c),(b)},{(0.20716308289978433),(0.03655819109996196),(0.20716308289978433),(0.1340467006998604),(0.3777679746996067),(0.20716308289978433),(0.1340467006998604),(0.30465159249968277),(0.4752564842995052),(0.3777679746996067),(0.20716308289978433),(0.03655819109996196),(0.30465159249968277),(0.30465159249968277)}
{(2),(2),(2),(2),(2),(2),(2),(2),(2),(2),(2)},{(0),(1),(2),(3),(4),(5),(6),(7),(8),(9),(10)},{(7.0),(5.0),(6.0),(5.0),(7.0),(5.0),(6.0),(5.0),(7.0),(5.0),(6.0)},{(b),(a),(b),(a),(c),(a),(c),(a),(b),(a),(b)},{(0.4276686017238498),(0.2960782627318961),(0.06579516949597684),(0.2960782627318961),(0.4276686017238498),(0.2960782627318961),(0.06579516949597684),(0.2960782627318961),(0.4276686017238498),(0.2960782627318961),(0.06579516949597684)}
Each output tuple is a collection of bags, and each bag contains tuples with single entries from one field, which is not what we want.
Answer1:
You will need to do this in two steps. Each jump has its own sigma value, so in order to properly associate each sigma with the correct jump, you will either need to pass the IDs to the sigmacomputing UDF and then join the results back in (bad idea), or compute the summary statistics first (mean and standard deviation) and then later derive the sigma from that. Here's how:
jumps = LOAD 'jumps.csv' USING PigStorage(',')
AS (jumper:int, attempt:int, distance:double, location:chararray);
byJumper = GROUP jumps by jumper;
jumperSummaries =
FOREACH byJumper
GENERATE
group AS jumper,
FLATTEN(jumps.(attempt, distance, location)),
myfuncs.mean(jumps.distance) AS mean,
myfunds.stddev(jumps.distance) AS stddev;
sigmas =
FOREACH jumperSummaries
GENERATE
jumper,
attempt,
distance,
location,
myfuncs.sigma(distance, mean, stddev) AS sigma;
The FLATTEN
ungroups all the jumps and gives you back your original input, except now every record also has copied the mean and standard deviation for that jumper, which you can then use to compute the sigma for each jump rowbyrow.
Note that while this is conceptually two steps, it still only takes one mapreduce job.
Answer2:
For comparison with WinnieNicklaus' answer, and to draw comments, here is the solution I came up with:
Register 'jumper.py' using jython as myfuncs;
jumps = LOAD 'jumps.csv' USING PigStorage(',')
AS (jumper:int, attempt:int, distance:double, location:chararray);
byJumper = GROUP jumps by jumper;
sigmas0 = FOREACH byJumper
GENERATE
FLATTEN(jumps),
FLATTEN(myfuncs.conv2sigma(jumps.(jumper,attempt,distance)));
sigmas1 = FILTER sigmas0 BY jumper == s_id AND attempt == s_att;
sigmas = FOREACH sigmas1
GENERATE jumper, attempt, distance, location, sigma;
rmf sigmas
STORE sigmas INTO 'sigmas' USING PigStorage(',');
The first FOREACH
creates a (potentially large) cross product sigma0
, filters out the "incorrect" elements of the product and generates the desired fields. JOIN
is often academically described this way.
This seems like it may be slow.
But it still results in a single MapReduce job, and seems to be fast in practice.
The huge win for me is that it allows my UDF to do arbitrarily complicated things and return arbitrarily many tuples which are rejoined to the input data.