Data science Software Course Training in Ameerpet Hyderabad

Data science Software Course Training in Ameerpet Hyderabad

Thursday, 8 September 2016

MR Lab5: Merging, MergedAggregations

[training@localhost ~]$ cat emp
101,vino,26000,m,11
102,Sri,25000,f,11
103,mohan,13000,m,13
104,lokitha,8000,f,12
105,naga,6000,m,13
101,janaki,10000,f,12
[training@localhost ~]$

[training@localhost ~]$ cat > emp2
201,aaa,11,m,90000
202,bbbbb,12,f,100000
203,ccc,13,m,200000
[training@localhost ~]$ cat > emp3
301,iiiii,1000,m,11
302,uuuuu,10000,m,12
303,jjjjjj,20000,f,13
[training@localhost ~]$ hadoop fs -mkdir mrlab
[training@localhost ~]$ hadoop fs -copyFromLocal emp mrlab
[training@localhost ~]$ hadoop fs -copyFromLocal emp2 mrlab
[training@localhost ~]$ hadoop fs -copyFromLocal emp3 mrlab
[training@localhost ~]$ hadoop fs -ls mrlab
Found 3 items
-rw-r--r--   1 training supergroup        123 2016-09-07 20:13 /user/training/mrlab/emp
-rw-r--r--   1 training supergroup         61 2016-09-07 20:13 /user/training/mrlab/emp2
-rw-r--r--   1 training supergroup         63 2016-09-07 20:13 /user/training/mrlab/emp3
[training@localhost ~]$

package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;

// emp1, emp3
// id,name,sal,sex,dno
public class MergeMap1 extends
 Mapper<LongWritable,Text,Text,NullWritable>
{
 public void map(LongWritable k,
       Text v, Context con)
 throws IOException,InterruptedException
 {
    con.write(v, NullWritable.get());
 }
}

-----------------
package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;

// emp2
// id,name,dno,sex,sal
public class MergeMap2 extends
 Mapper<LongWritable,Text,Text,NullWritable>
{
 public void map(LongWritable k,
       Text v, Context con)
 throws IOException,InterruptedException
 {
     String line = v.toString();
     String[] w = line.split(",");
     String newLine = w[0]+","+
     w[1]+","+w[4]+","+w[3]+","+w[2];
    con.write(new Text(newLine), NullWritable.get());
 }
}


------------
package mr.analytics;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Merge
{
    public static void main(String[] args)
     throws Exception
     {
         Configuration c = new Configuration();
         Job j = new Job(c, "Merging");
         j.setJarByClass(Merge.class);
         j.setNumReduceTasks(1);
         j.setOutputKeyClass(Text.class);
         j.setOutputValueClass(NullWritable.class);
       
         Path p1 = new Path(args[0]); // emp
         Path p2 = new Path(args[1]); // emp2
         Path p3 = new Path(args[2]); // emp3
         Path p4 = new Path(args[3]); //output
MultipleInputs.addInputPath(j,p1,TextInputFormat.class,MergeMap1.class);
MultipleInputs.addInputPath(j,p2,TextInputFormat.class,MergeMap2.class);
MultipleInputs.addInputPath(j,p3,TextInputFormat.class,MergeMap1.class);

FileOutputFormat.setOutputPath(j, p4);       

System.exit(j.waitForCompletion(true) ? 0:1);
     }

}


------------
[training@localhost ~]$ hadoop jar Desktop/myapp.jar  mr.analytics.Merge  mrlab/emp mrlab/emp2   mrlab/emp3  mrlab/content

[training@localhost ~]$ hadoop fs -cat mrlab/content/part-r-00000
101,janaki,10000,f,12
101,vino,26000,m,11
102,Sri,25000,f,11
103,mohan,13000,m,13
104,lokitha,8000,f,12
105,naga,6000,m,13
201,aaa,90000,m,11
202,bbbbb,100000,f,12
203,ccc,200000,m,13
301,iiiii,1000,m,11
302,uuuuu,10000,m,12
303,jjjjjj,20000,f,13

--------------------------------

hql:

  select sex, sum(sal) from (
     select sex, sal from emp1
         union all
     select sex, sal from emp2
         union all
 select sex, sal from emp3 )
         e  group by sex;
   
package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

// emp1, emp3
// id,name,sal,sex,dno
public class Map1 extends
 Mapper<LongWritable,Text,Text,IntWritable>
{
 public void map(LongWritable k,
       Text v, Context con)
 throws IOException,InterruptedException
 {
     String line = v.toString();
     String[] w = line.split(",");
     String sex = w[3];
     int sal = Integer.parseInt(w[2]);
   
    con.write(new Text(sex), new IntWritable(sal));
 }
}

package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

// emp2
// id,name,dno,sex,sal
public class Map2 extends
 Mapper<LongWritable,Text,Text,IntWritable>
{
 public void map(LongWritable k,
       Text v, Context con)
 throws IOException,InterruptedException
 {
     String line = v.toString();
     String[] w = line.split(",");
     String sex = w[3];
     int sal = Integer.parseInt(w[4]);
   
    con.write(new Text(sex), new IntWritable(sal));
 }
}

package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;

public class RedForSum extends Reducer<Text,IntWritable,
 Text,IntWritable>
{
    public void reduce(Text k, Iterable<IntWritable>vlist, Context con)
     throws IOException, InterruptedException
     {
        int tot = 0;
         for (IntWritable v:vlist)
             tot+=v.get();
         con.write(k, new IntWritable(tot));
     }

}

package mr.analytics;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MergedSum
{
    public static void main(String[] args)
     throws Exception
     {
         Configuration c = new Configuration();
         Job j = new Job(c, "Merging");
         j.setJarByClass(MergedSum.class);
         j.setReducerClass(RedForSum.class);
         j.setOutputKeyClass(Text.class);
         j.setOutputValueClass(IntWritable.class);
       
         Path p1 = new Path(args[0]); // emp
         Path p2 = new Path(args[1]); // emp2
         Path p3 = new Path(args[2]); // emp3
         Path p4 = new Path(args[3]); //output
MultipleInputs.addInputPath(j,p1,TextInputFormat.class,Map1.class);
MultipleInputs.addInputPath(j,p2,TextInputFormat.class,Map2.class);
MultipleInputs.addInputPath(j,p3,TextInputFormat.class,Map1.class);

FileOutputFormat.setOutputPath(j, p4);       

System.exit(j.waitForCompletion(true) ? 0:1);
     }

}




  
[training@localhost ~]$ hadoop jar Desktop/myapp.jar  mr.analytics.MergedSum  mrlab/emp mrlab/emp2   mrlab/emp3  mrlab/res1


[training@localhost ~]$ hadoop fs -cat mrlab/res1/part-r-00000
f       163000
m       346000

--------------------------------









3 comments:

  1. Good Post! Thank you so much for sharing this pretty post, it was so good to read and useful to improve my knowledge as updated one, keep blogging.
    Big Data Hadoop Training in electronic city

    ReplyDelete