Data science Software Course Training in Ameerpet Hyderabad

Data science Software Course Training in Ameerpet Hyderabad

Thursday, 22 September 2016

MR Lab6 : Sorting Using MapReduce

hive> select * from emp
        order by sal desc;

emp---->id, name, sal, sex,dno

SortSalDriver.java
--------------------------
package mr.analytics;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SortSalDriver
{

 public static void main(String[] args) throws Exception {
 
  Configuration c = new Configuration();
  Job j = new Job(c, "SortOnValueDescending");
  j.setJarByClass(SortSalDriver.class);

  j.setMapperClass(SortSalMap.class);
  j.setReducerClass(SortSalRed.class);
  j.setSortComparatorClass(SortComparator.class);
  j.setOutputKeyClass(IntWritable.class);
  j.setOutputValueClass(Text.class);

 
  FileInputFormat.addInputPath(j, new Path(args[0]));
  FileOutputFormat.setOutputPath(j, new Path(args[1]));
  System.exit(j.waitForCompletion(true) ? 0:1);
 
 }

}
----
SortSalMap.java

package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SortSalMap extends
Mapper<LongWritable,Text,IntWritable,Text>
{
    public void map(LongWritable k,
             Text v , Context con)
     throws IOException, InterruptedException
     {
         //   v  --> 101,aaa,40000,m,11
        String line = v.toString();
        String[] w = line.split(",");
        int sal = Integer.parseInt(w[2]);
        con.write(new IntWritable(sal),v);
     }
}


---------

SortSalRed.java

package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SortSalRed extends Reducer<IntWritable,
 Text,Text,NullWritable>
{
    public void reduce(IntWritable sal, Iterable<Text> vlist,
            Context con)
    throws IOException, InterruptedException
    {
        for(Text rec: vlist)
            con.write(rec,NullWritable.get());
    }

}

---------------
SortComparator.java

package mr.analytics;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class SortComparator extends WritableComparator {

 protected SortComparator() {
  super(IntWritable.class, true);
  // TODO Auto-generated constructor stub
 }

 @Override
 public int compare(WritableComparable o1, WritableComparable o2) {
  IntWritable k1 = (IntWritable) o1;
  IntWritable k2 = (IntWritable) o2;
  int cmp = k1.compareTo(k2);
  return -1* cmp;
 }


}
---------------------------------------------------
hive> select sex, sum(sal) as tot from emp
         group by sex
       order by tot desc;



package mr.analytics;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SortDriver2
{

 public static void main(String[] args) throws Exception {
 
  Configuration c = new Configuration();
  Job j = new Job(c, "SortOnValueDescending");
  j.setJarByClass(SortDriver2.class);

  j.setMapperClass(Map1.class);
  j.setReducerClass(RedForSum.class);
  //j.setSortComparatorClass(SortComparator.class);
  j.setMapOutputKeyClass(Text.class);
  j.setMapOutputValueClass(IntWritable.class);
 
  j.setOutputKeyClass(Text.class);
  j.setOutputValueClass(IntWritable.class);

  //   file1   res1   res2
 
  FileInputFormat.addInputPath(j, new Path(args[0]));
  FileOutputFormat.setOutputPath(j, new Path(args[1]));
  j.waitForCompletion(true);
 
  Job j2 = new Job(c, "SortOnValueDescending");
  j2.setJarByClass(SortDriver2.class);

  j2.setMapperClass(SortMapper.class);
  j2.setReducerClass(SortReducer.class);
  j2.setSortComparatorClass(SortComparator.class);
  j2.setOutputKeyClass(IntWritable.class);
  j2.setOutputValueClass(Text.class);

 
  FileInputFormat.addInputPath(j2, new Path(args[1]));
  FileOutputFormat.setOutputPath(j2, new Path(args[2]));
 
  System.exit(j2.waitForCompletion(true) ? 0:1);
 }

}
-------------

Map1.java

package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

// emp1, emp3
// id,name,sal,sex,dno
public class Map1 extends
 Mapper<LongWritable,Text,Text,IntWritable>
{
 public void map(LongWritable k,
       Text v, Context con)
 throws IOException,InterruptedException
 {
     String line = v.toString();
     String[] w = line.split(",");
     String sex = w[3];
     int sal = Integer.parseInt(w[2]);
   
    con.write(new Text(sex), new IntWritable(sal));
 }
}
-----------
RedForSum.java

package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;

public class RedForSum extends Reducer<Text,IntWritable,
 Text,IntWritable>
{
    public void reduce(Text k, Iterable<IntWritable>vlist, Context con)
     throws IOException, InterruptedException
     {
        int tot = 0;
         for (IntWritable v:vlist)
             tot+=v.get();
         con.write(k, new IntWritable(tot));
     }

}


-------------
SortMapper.java

package mr.analytics;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SortMapper extends Mapper<LongWritable, Text, IntWritable, Text> {
 public void map(LongWritable key, Text value, Context context)
   throws IOException, InterruptedException {
  //   sex     sal
   
  String[] splits = value.toString().trim().split("\t");
  int tot = Integer.parseInt(splits[1]);
  context.write(new IntWritable(tot),
          new Text(splits[0]));
 }

}
------------------

SortReducer.java

package mr.analytics;
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SortReducer extends
  Reducer<IntWritable, Text, Text, IntWritable> {

 @Override
 public void reduce(IntWritable key, Iterable<Text> values, Context context)
   throws IOException, InterruptedException {
            
   
  for (Text val : values) {
   context.write(val, key);
  }
 }
}

------------------------




1 comment:

  1. thank you for offering such unique content.we are very happy to recieve articles from you.please update latest content in hadoop.one of the recommanded blog for newbies and hadoop professionals with great intend

    Hadoop training
    Hadoop training in hyderabad
    Hadoop training in usa

    ReplyDelete