Data science Software Course Training in Ameerpet Hyderabad

Data science Software Course Training in Ameerpet Hyderabad

Thursday, 8 September 2016

MR Lab5: Merging, MergedAggregations

[training@localhost ~]$ cat emp
101,vino,26000,m,11
102,Sri,25000,f,11
103,mohan,13000,m,13
104,lokitha,8000,f,12
105,naga,6000,m,13
101,janaki,10000,f,12
[training@localhost ~]$

[training@localhost ~]$ cat > emp2
201,aaa,11,m,90000
202,bbbbb,12,f,100000
203,ccc,13,m,200000
[training@localhost ~]$ cat > emp3
301,iiiii,1000,m,11
302,uuuuu,10000,m,12
303,jjjjjj,20000,f,13
[training@localhost ~]$ hadoop fs -mkdir mrlab
[training@localhost ~]$ hadoop fs -copyFromLocal emp mrlab
[training@localhost ~]$ hadoop fs -copyFromLocal emp2 mrlab
[training@localhost ~]$ hadoop fs -copyFromLocal emp3 mrlab
[training@localhost ~]$ hadoop fs -ls mrlab
Found 3 items
-rw-r--r--   1 training supergroup        123 2016-09-07 20:13 /user/training/mrlab/emp
-rw-r--r--   1 training supergroup         61 2016-09-07 20:13 /user/training/mrlab/emp2
-rw-r--r--   1 training supergroup         63 2016-09-07 20:13 /user/training/mrlab/emp3
[training@localhost ~]$

package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;

// emp1, emp3
// id,name,sal,sex,dno
public class MergeMap1 extends
 Mapper<LongWritable,Text,Text,NullWritable>
{
 public void map(LongWritable k,
       Text v, Context con)
 throws IOException,InterruptedException
 {
    con.write(v, NullWritable.get());
 }
}

-----------------
package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;

// emp2
// id,name,dno,sex,sal
public class MergeMap2 extends
 Mapper<LongWritable,Text,Text,NullWritable>
{
 public void map(LongWritable k,
       Text v, Context con)
 throws IOException,InterruptedException
 {
     String line = v.toString();
     String[] w = line.split(",");
     String newLine = w[0]+","+
     w[1]+","+w[4]+","+w[3]+","+w[2];
    con.write(new Text(newLine), NullWritable.get());
 }
}


------------
package mr.analytics;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Merge
{
    public static void main(String[] args)
     throws Exception
     {
         Configuration c = new Configuration();
         Job j = new Job(c, "Merging");
         j.setJarByClass(Merge.class);
         j.setNumReduceTasks(1);
         j.setOutputKeyClass(Text.class);
         j.setOutputValueClass(NullWritable.class);
       
         Path p1 = new Path(args[0]); // emp
         Path p2 = new Path(args[1]); // emp2
         Path p3 = new Path(args[2]); // emp3
         Path p4 = new Path(args[3]); //output
MultipleInputs.addInputPath(j,p1,TextInputFormat.class,MergeMap1.class);
MultipleInputs.addInputPath(j,p2,TextInputFormat.class,MergeMap2.class);
MultipleInputs.addInputPath(j,p3,TextInputFormat.class,MergeMap1.class);

FileOutputFormat.setOutputPath(j, p4);       

System.exit(j.waitForCompletion(true) ? 0:1);
     }

}


------------
[training@localhost ~]$ hadoop jar Desktop/myapp.jar  mr.analytics.Merge  mrlab/emp mrlab/emp2   mrlab/emp3  mrlab/content

[training@localhost ~]$ hadoop fs -cat mrlab/content/part-r-00000
101,janaki,10000,f,12
101,vino,26000,m,11
102,Sri,25000,f,11
103,mohan,13000,m,13
104,lokitha,8000,f,12
105,naga,6000,m,13
201,aaa,90000,m,11
202,bbbbb,100000,f,12
203,ccc,200000,m,13
301,iiiii,1000,m,11
302,uuuuu,10000,m,12
303,jjjjjj,20000,f,13

--------------------------------

hql:

  select sex, sum(sal) from (
     select sex, sal from emp1
         union all
     select sex, sal from emp2
         union all
 select sex, sal from emp3 )
         e  group by sex;
   
package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

// emp1, emp3
// id,name,sal,sex,dno
public class Map1 extends
 Mapper<LongWritable,Text,Text,IntWritable>
{
 public void map(LongWritable k,
       Text v, Context con)
 throws IOException,InterruptedException
 {
     String line = v.toString();
     String[] w = line.split(",");
     String sex = w[3];
     int sal = Integer.parseInt(w[2]);
   
    con.write(new Text(sex), new IntWritable(sal));
 }
}

package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

// emp2
// id,name,dno,sex,sal
public class Map2 extends
 Mapper<LongWritable,Text,Text,IntWritable>
{
 public void map(LongWritable k,
       Text v, Context con)
 throws IOException,InterruptedException
 {
     String line = v.toString();
     String[] w = line.split(",");
     String sex = w[3];
     int sal = Integer.parseInt(w[4]);
   
    con.write(new Text(sex), new IntWritable(sal));
 }
}

package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;

public class RedForSum extends Reducer<Text,IntWritable,
 Text,IntWritable>
{
    public void reduce(Text k, Iterable<IntWritable>vlist, Context con)
     throws IOException, InterruptedException
     {
        int tot = 0;
         for (IntWritable v:vlist)
             tot+=v.get();
         con.write(k, new IntWritable(tot));
     }

}

package mr.analytics;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MergedSum
{
    public static void main(String[] args)
     throws Exception
     {
         Configuration c = new Configuration();
         Job j = new Job(c, "Merging");
         j.setJarByClass(MergedSum.class);
         j.setReducerClass(RedForSum.class);
         j.setOutputKeyClass(Text.class);
         j.setOutputValueClass(IntWritable.class);
       
         Path p1 = new Path(args[0]); // emp
         Path p2 = new Path(args[1]); // emp2
         Path p3 = new Path(args[2]); // emp3
         Path p4 = new Path(args[3]); //output
MultipleInputs.addInputPath(j,p1,TextInputFormat.class,Map1.class);
MultipleInputs.addInputPath(j,p2,TextInputFormat.class,Map2.class);
MultipleInputs.addInputPath(j,p3,TextInputFormat.class,Map1.class);

FileOutputFormat.setOutputPath(j, p4);       

System.exit(j.waitForCompletion(true) ? 0:1);
     }

}




  
[training@localhost ~]$ hadoop jar Desktop/myapp.jar  mr.analytics.MergedSum  mrlab/emp mrlab/emp2   mrlab/emp3  mrlab/res1


[training@localhost ~]$ hadoop fs -cat mrlab/res1/part-r-00000
f       163000
m       346000

--------------------------------









1 comment: