2019-03-09 10:45  阅读(2610)
文章分类:HBase 学习之路 文章标签:大数据HbaseHbase 学习
©  原文作者:扎心了,老铁 原文地址:https://www.cnblogs.com/qingyunzong/category/1187868.html

作者:扎心了,老铁

出处:https://www.cnblogs.com/qingyunzong/category/1187868.html


MapReduce从HDFS读取数据存储到HBase中

现有HDFS中有一个student.txt文件,格式如下

```

95002,刘晨,女,19,IS 95017,王风娟,女,18,IS 95018,王一,女,19,IS 95013,冯伟,男,21,CS 95014,王小丽,女,19,CS 95019,邢小丽,女,19,IS 95020,赵钱,男,21,IS 95003,王敏,女,22,MA 95004,张立,男,19,IS 95012,孙花,女,20,CS 95010,孔小涛,男,19,CS 95005,刘刚,男,18,MA 95006,孙庆,男,23,CS 95007,易思玲,女,19,MA 95008,李娜,女,18,CS 95021,周二,男,17,MA 95022,郑明,男,20,MA 95001,李勇,男,20,CS 95011,包小柏,男,18,MA 95009,梦圆圆,女,18,MA 95015,王君,男,18,MA ```

将HDFS上的这个文件里面的数据写入到HBase数据块中

MapReduce实现代码如下

```

import java.io.IOException;

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;

public class ReadHDFSDataToHbaseMR extends Configured implements Tool{

public static void main(String[] args) throws Exception {

    int run = ToolRunner.run(new ReadHDFSDataToHbaseMR(), args);
    System.exit(run);
}

@Override
public int run(String[] arg0) throws Exception {

    Configuration conf = HBaseConfiguration.create();
    conf.set("fs.defaultFS", "hdfs://myha01/");
    conf.set("hbase.zookeeper.quorum", "hadoop1:2181,hadoop2:2181,hadoop3:2181");
    System.setProperty("HADOOP_USER_NAME", "hadoop");
    FileSystem fs = FileSystem.get(conf);

// conf.addResource("config/core-site.xml"); // conf.addResource("config/hdfs-site.xml");

Job job = Job.getInstance(conf);

    job.setJarByClass(ReadHDFSDataToHbaseMR.class);

    job.setMapperClass(HDFSToHbaseMapper.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(NullWritable.class);

    TableMapReduceUtil.initTableReducerJob("student", HDFSToHbaseReducer.class, job,null,null,null,null,false);
    job.setOutputKeyClass(NullWritable.class);
    job.setOutputValueClass(Put.class);

    Path inputPath = new Path("/student/input/");
    Path outputPath = new Path("/student/output/");

    if(fs.exists(outputPath)) {
        fs.delete(outputPath,true);
    }

    FileInputFormat.addInputPath(job, inputPath);
    FileOutputFormat.setOutputPath(job, outputPath);

    boolean isDone = job.waitForCompletion(true);

    return isDone ? 0 : 1;
}

public static class HDFSToHbaseMapper extends Mapper<LongWritable, Text, Text, NullWritable>{

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {    
        context.write(value, NullWritable.get());
    }

}

/**
 * 95015,王君,男,18,MA
 * */
public static class HDFSToHbaseReducer extends TableReducer<Text, NullWritable, NullWritable>{

    @Override
    protected void reduce(Text key, Iterable<NullWritable> values,Context context)
            throws IOException, InterruptedException {

        String[] split = key.toString().split(",");

        Put put = new Put(split[0].getBytes());

        put.addColumn("info".getBytes(), "name".getBytes(), split[1].getBytes());
        put.addColumn("info".getBytes(), "sex".getBytes(), split[2].getBytes());
        put.addColumn("info".getBytes(), "age".getBytes(), split[3].getBytes());
        put.addColumn("info".getBytes(), "department".getBytes(), split[4].getBytes());

        context.write(NullWritable.get(), put);

    }

}

} ```

MapReduce从HBase读取数据计算平均年龄并存储到HDFS中

```

import java.io.IOException; import java.util.List;

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;

public class ReadHbaseDataToHDFS extends Configured implements Tool{

public static void main(String[] args) throws Exception {

    int run = ToolRunner.run(new ReadHbaseDataToHDFS(), args);
    System.exit(run);

}

@Override
public int run(String[] arg0) throws Exception {

    Configuration conf = HBaseConfiguration.create();
    conf.set("fs.defaultFS", "hdfs://myha01/");
    conf.set("hbase.zookeeper.quorum", "hadoop1:2181,hadoop2:2181,hadoop3:2181");
    System.setProperty("HADOOP_USER_NAME", "hadoop");
    FileSystem fs = FileSystem.get(conf);

// conf.addResource("config/core-site.xml"); // conf.addResource("config/hdfs-site.xml");

Job job = Job.getInstance(conf);

    job.setJarByClass(ReadHbaseDataToHDFS.class);

    // 取对业务有用的数据 info,age
    Scan scan = new Scan();
    scan.addColumn("info".getBytes(), "age".getBytes());

    TableMapReduceUtil.initTableMapperJob(
            "student".getBytes(), // 指定表名
            scan, // 指定扫描数据的条件
            HbaseToHDFSMapper.class, // 指定mapper class
            Text.class,     // outputKeyClass mapper阶段的输出的key的类型
            IntWritable.class, // outputValueClass mapper阶段的输出的value的类型
            job, // job对象
            false
            );

    job.setReducerClass(HbaseToHDFSReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(DoubleWritable.class);

    Path outputPath = new Path("/student/avg/");

    if(fs.exists(outputPath)) {
        fs.delete(outputPath,true);
    }

    FileOutputFormat.setOutputPath(job, outputPath);

    boolean isDone = job.waitForCompletion(true);

    return isDone ? 0 : 1;
}

public static class HbaseToHDFSMapper extends TableMapper<Text, IntWritable>{

    Text outKey = new Text("age");
    IntWritable outValue = new IntWritable();
    // key是hbase中的行键
    // value是hbase中的所行键的所有数据
    @Override
    protected void map(ImmutableBytesWritable key, Result value,Context context)
            throws IOException, InterruptedException {

        boolean isContainsColumn = value.containsColumn("info".getBytes(), "age".getBytes());

        if(isContainsColumn) {

            List<Cell> listCells = value.getColumnCells("info".getBytes(), "age".getBytes());
            System.out.println("listCells:\t"+listCells);
            Cell cell = listCells.get(0);
            System.out.println("cells:\t"+cell);

            byte[] cloneValue = CellUtil.cloneValue(cell);
            String ageValue = Bytes.toString(cloneValue);
            outValue.set(Integer.parseInt(ageValue));

            context.write(outKey,outValue);

        }

    }

}

public static class HbaseToHDFSReducer extends Reducer<Text, IntWritable, Text, DoubleWritable>{

    DoubleWritable outValue = new DoubleWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,Context context)
            throws IOException, InterruptedException {

        int count = 0;
        int sum = 0;
        for(IntWritable value : values) {
            count++;
            sum += value.get();
        }

        double avgAge = sum * 1.0 / count;
        outValue.set(avgAge);
        context.write(key, outValue);
    }

}

} ```

点赞(0)
版权归原创作者所有,任何形式转载请联系作者; Java 技术驿站 >> HBase学习之路 (五)MapReduce操作Hbase
上一篇
HBase学习之路 (四)HBase的API操作
下一篇
HBase学习之路 (六)过滤器