Data science Software Course Training in Ameerpet Hyderabad

Data science Software Course Training in Ameerpet Hyderabad

Thursday, 22 September 2016

MR Lab6 : Sorting Using MapReduce

hive> select * from emp
        order by sal desc;

emp---->id, name, sal, sex,dno

SortSalDriver.java
--------------------------
package mr.analytics;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SortSalDriver
{

 public static void main(String[] args) throws Exception {
 
  Configuration c = new Configuration();
  Job j = new Job(c, "SortOnValueDescending");
  j.setJarByClass(SortSalDriver.class);

  j.setMapperClass(SortSalMap.class);
  j.setReducerClass(SortSalRed.class);
  j.setSortComparatorClass(SortComparator.class);
  j.setOutputKeyClass(IntWritable.class);
  j.setOutputValueClass(Text.class);

 
  FileInputFormat.addInputPath(j, new Path(args[0]));
  FileOutputFormat.setOutputPath(j, new Path(args[1]));
  System.exit(j.waitForCompletion(true) ? 0:1);
 
 }

}
----
SortSalMap.java

package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SortSalMap extends
Mapper<LongWritable,Text,IntWritable,Text>
{
    public void map(LongWritable k,
             Text v , Context con)
     throws IOException, InterruptedException
     {
         //   v  --> 101,aaa,40000,m,11
        String line = v.toString();
        String[] w = line.split(",");
        int sal = Integer.parseInt(w[2]);
        con.write(new IntWritable(sal),v);
     }
}


---------

SortSalRed.java

package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SortSalRed extends Reducer<IntWritable,
 Text,Text,NullWritable>
{
    public void reduce(IntWritable sal, Iterable<Text> vlist,
            Context con)
    throws IOException, InterruptedException
    {
        for(Text rec: vlist)
            con.write(rec,NullWritable.get());
    }

}

---------------
SortComparator.java

package mr.analytics;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class SortComparator extends WritableComparator {

 protected SortComparator() {
  super(IntWritable.class, true);
  // TODO Auto-generated constructor stub
 }

 @Override
 public int compare(WritableComparable o1, WritableComparable o2) {
  IntWritable k1 = (IntWritable) o1;
  IntWritable k2 = (IntWritable) o2;
  int cmp = k1.compareTo(k2);
  return -1* cmp;
 }


}
---------------------------------------------------
hive> select sex, sum(sal) as tot from emp
         group by sex
       order by tot desc;



package mr.analytics;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SortDriver2
{

 public static void main(String[] args) throws Exception {
 
  Configuration c = new Configuration();
  Job j = new Job(c, "SortOnValueDescending");
  j.setJarByClass(SortDriver2.class);

  j.setMapperClass(Map1.class);
  j.setReducerClass(RedForSum.class);
  //j.setSortComparatorClass(SortComparator.class);
  j.setMapOutputKeyClass(Text.class);
  j.setMapOutputValueClass(IntWritable.class);
 
  j.setOutputKeyClass(Text.class);
  j.setOutputValueClass(IntWritable.class);

  //   file1   res1   res2
 
  FileInputFormat.addInputPath(j, new Path(args[0]));
  FileOutputFormat.setOutputPath(j, new Path(args[1]));
  j.waitForCompletion(true);
 
  Job j2 = new Job(c, "SortOnValueDescending");
  j2.setJarByClass(SortDriver2.class);

  j2.setMapperClass(SortMapper.class);
  j2.setReducerClass(SortReducer.class);
  j2.setSortComparatorClass(SortComparator.class);
  j2.setOutputKeyClass(IntWritable.class);
  j2.setOutputValueClass(Text.class);

 
  FileInputFormat.addInputPath(j2, new Path(args[1]));
  FileOutputFormat.setOutputPath(j2, new Path(args[2]));
 
  System.exit(j2.waitForCompletion(true) ? 0:1);
 }

}
-------------

Map1.java

package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

// emp1, emp3
// id,name,sal,sex,dno
public class Map1 extends
 Mapper<LongWritable,Text,Text,IntWritable>
{
 public void map(LongWritable k,
       Text v, Context con)
 throws IOException,InterruptedException
 {
     String line = v.toString();
     String[] w = line.split(",");
     String sex = w[3];
     int sal = Integer.parseInt(w[2]);
   
    con.write(new Text(sex), new IntWritable(sal));
 }
}
-----------
RedForSum.java

package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;

public class RedForSum extends Reducer<Text,IntWritable,
 Text,IntWritable>
{
    public void reduce(Text k, Iterable<IntWritable>vlist, Context con)
     throws IOException, InterruptedException
     {
        int tot = 0;
         for (IntWritable v:vlist)
             tot+=v.get();
         con.write(k, new IntWritable(tot));
     }

}


-------------
SortMapper.java

package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SortMapper extends Mapper<LongWritable, Text, IntWritable, Text> {
 public void map(LongWritable key, Text value, Context context)
   throws IOException, InterruptedException {
  //   sex     sal
   
  String[] splits = value.toString().trim().split("\t");
  int tot = Integer.parseInt(splits[1]);
  context.write(new IntWritable(tot),
          new Text(splits[0]));
 }

}
------------------

SortReducer.java

package mr.analytics;
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SortReducer extends
  Reducer<IntWritable, Text, Text, IntWritable> {

 @Override
 public void reduce(IntWritable key, Iterable<Text> values, Context context)
   throws IOException, InterruptedException {
            
   
  for (Text val : values) {
   context.write(val, key);
  }
 }
}

------------------------




No comments:

Post a Comment