MR从HDFS处理数据写入到Hbase

785次阅读
没有评论

共计 3497 个字符,预计需要花费 9 分钟才能阅读完成。

需求:使用 mr 从 hdfs 读取数据处理之后写入到 Hbase

package pers.nebo.mrtohbase;
/**
 * @ author fnb
 * @ email [email protected]
 * @ date  2019/12/17
 * @ des :
 */
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;


/**
 *
 * mr to hbase 的输入与输出 有 job 设置,* tableMapper 为从 hbase 读取数据
 * tableReduce 为往 Hbase 写入数据
 *   写入的表名 通过 job 设置为 tablename (一个) 则 reduce 函数中 为 NullWritable.get
 *   写入的表名 为多个 则 job 设置为 MultiTableOutputFormat  通过 reduce 函数指定 tableName
 *
 *
 *
 */
public class MR2Hbase {

    public static class Map extends Mapper<LongWritable, Text, Text, LongWritable>{

        // 该步骤同一般 Map 程序
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            // 将读入行转化为字符串
            String line = value.toString();
            // 切分字符串
            String[] words = StringUtils.split(line," ");
            // 将单词写入 context
            for(String word:words) {context.write(new Text(word), new LongWritable(1));
            }
        }

    }

    public static class Reduce extends TableReducer<Text, LongWritable, NullWritable>{

        @Override
        protected void reduce(Text key, Iterable<LongWritable> values,Context context)
                throws IOException, InterruptedException {
            long count =0;
            for(LongWritable value : values) {count += value.get();
            }
            // 实例化 Put,将单词作为主键
            Put put = new Put(Bytes.toBytes(key.toString()));
            // 列族为 content,key 为 result,value 为 count
            put.addColumn(Bytes.toBytes("content"), Bytes.toBytes("result"), Bytes.toBytes(String.valueOf(count)));
            context.write(NullWritable.get(), put);
        }

    }

    // 创建表
    public static void createTable(String tablename) throws Exception{

        // 设置配置文件
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "master:2181,slave1:2181,slave2:2181");

        Connection  conn = ConnectionFactory.createConnection(conf);

        //hbase 客户端实例
        Admin admin = conn.getAdmin();

        // 指定表名
        TableName name = TableName.valueOf(tablename);

        // 向表描述里添加表名
        HTableDescriptor desc = new HTableDescriptor(name);

        // 指定列族名和版本数
        HColumnDescriptor content = new HColumnDescriptor("content");
        content.setMaxVersions(3);

        // 向表描述里添加列族
        desc.addFamily(content);
        // 判断表是否存在
        if(admin.tableExists(name)){System.out.println("table exists,trying recreate table !");
            admin.disableTable(name);
            admin.deleteTable(name);
        }
        System.out.println("Create new table:"+ tablename);

        // 创建表
        admin.createTable(desc);

        admin.close();}

    public static void main(String[] args) throws Exception{

        String tablename = "WC";
        // 创建表
        createTable(tablename);

        // 配置文件
        Configuration conf = new Configuration();
        conf.set(TableOutputFormat.OUTPUT_TABLE, tablename);
        Job job = Job.getInstance(conf);

        // 设置整个 job 所调用类的 jar 包路径
        job.setJarByClass(MR2Hbase.class);

        // 设置该作业所使用的 mapper 和 reducer 类
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);

        // 指定 mapper 输出数据的 k - v 类型,和 reduce 输出类型一样,可缺省
        job.setMapOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        // 指定 reduce 输出到 Hbase
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        // 指定输入数据存放路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));

        // 指定输出到 HBase
        job.setOutputFormatClass(TableOutputFormat.class);

        // 将 job 提交给集群运行,参数为 true 表示提示运行进度
        System.exit(job.waitForCompletion(true)?0:1);

    }

}

IT 内容具有时效性,未避免更新后未同步,请点击查看最新内容:MR 从 HDFS 处理数据写入到 Hbase
文章首发于:http://nebofeng.com/2021/01/12/mrtohbase/

正文完
欢迎关注个人公众号, 内含各种工具及大厂内推码合集
post-qrcode
 0
HTML文本

本文链接:

nebofeng
版权声明:本站原创文章,由 nebofeng 于2021-01-12发表,共计3497字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(没有评论)
验证码