Dowemo
0 0 0 0

This article uses the way of writing mapreduce through java to manipulate hbase
- use mapreduce to import hdfs files into hbase
- backup data from hbase to hdfs
- import data from hbase into mysql

Create a project

First, create a maven project using the development tools
Specific pom files are as follows.

Pom file

<?xml version="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.cfl</groupId><artifactId>mapreduce_hbase_demo</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.7.3</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>1.2.6</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>1.2.6</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build></project>

Note version compatibility!!!

Log tracking

Create

#OFF,systemOut,logFile,logDailyFile,logRollingFile,logMail,logDB,ALLlog4j.rootLogger=ALL,systemOut
log4j.appender.systemOut= org.apache.log4j.ConsoleAppenderlog4j.appender.systemOut.layout= org.apache.log4j.PatternLayoutlog4j.appender.systemOut.layout.ConversionPattern= [%-5p][%-22d{yyyy/MM/dd HH:mm:ssS}][%l]%n%m%n
log4j.appender.systemOut.Threshold= INFO
log4j.appender.systemOut.ImmediateFlush= TRUE
log4j.appender.systemOut.Target= System.out

Next, place the following configuration files for hadoop into your project
- core-site. Xml.
- hdfs-site. Xml.
- mapred-site. Xml.
- yarn-site. Xml.
- slaves.
And the configuration file for hbase.
- hbase-site. Xml.
- regionservers.

Export data from hbase to hdfs
package com.cfl.mapreduce.hbase;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.Cell;import org.apache.hadoop.hbase.CellUtil;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.filter.CompareFilter;import org.apache.hadoop.hbase.filter.Filter;import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.hbase.mapreduce.TableMapper;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import java.io.IOException;/**
 * MapReduce操作HBase:将HBase中的数据写入到HDFS
 */publicclassImpHDFSFromHBaseextendsConfiguredimplementsTool {publicstaticclassMyTableMapperextendsTableMapper<NullWritable, Text>{private Text text = new Text();
 @Overrideprotectedvoidmap(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
 String name = null;
 String num = null;
 String fee = null;
 for (Cell cell: value.listCells()) {
 if (Bytes.toString(CellUtil.cloneQualifier(cell)).equals("name")){
 name = Bytes.toString(CellUtil.cloneValue(cell));
 }
 if (Bytes.toString(CellUtil.cloneQualifier(cell)).equals("num")){
 num = Bytes.toString(CellUtil.cloneValue(cell));
 }
 if (Bytes.toString(CellUtil.cloneQualifier(cell)).equals("fee")){
 fee = Bytes.toString(CellUtil.cloneValue(cell));
 }
 }
 text.set(name + "" + num + "" + fee);
 context.write(NullWritable.get(), text);
 }
 }
 publicstaticclassMyReduceextendsReducer<NullWritable, Text, NullWritable, Text>{@Overrideprotectedvoidreduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
 for (Text value: values) {
 context.write(NullWritable.get(), value);
 }
 }
 }
 @Overridepublicintrun(String[] args) throws Exception {
 Configuration cfg = new Configuration();
 cfg.set("mapred.jar", "E:codeworkspace_ideahadoopprojecthadoop_mapreduce_demotargethadoop_mapreduce_demo-1.0-SNAPSHOT.jar");
 Job job = Job.getInstance(cfg, "从HBase备份免费课程到HDFS中");
 job.setJarByClass(ImpHDFSFromHBase.class);
 //查询免费的课程 Scan scan = new Scan();
 Filter filter = new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("fee"), CompareFilter.CompareOp.EQUAL, Bytes.toBytes("免费"));
 scan.setFilter(filter);
 TableMapReduceUtil.initTableMapperJob(args[0], scan, MyTableMapper.class,NullWritable.class, Text.class, job);
 job.setReducerClass(MyReduce.class);
 job.setOutputKeyClass(NullWritable.class);
 job.setOutputValueClass(Text.class);
 FileOutputFormat.setOutputPath(job, new Path(args[1]));
 //成功返回0,失败返回1return job.waitForCompletion(true)? 0 : 1;
 }
 publicstaticvoidmain(String[] args) throws Exception {
 System.out.println(ToolRunner.run(new ImpHDFSFromHBase(), args));
 }
}
Import hdfs files into hbase

First, there's a data file in hdfs.
这里写图片描述
For example, the path is user hadoop input

package com.cfl.mapreduce.hbase;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import java.io.IOException;/**
 * MapReduce操作HBase:读取HDFS文件存储到HBase中
 */publicclassImpHBaseFormHDFSextendsConfiguredimplementsTool {/**
 * LongWritable 文件中一行文本的偏移量
 * Text 文件中一行文本内容
 * ImmutableBytesWritable 对应行健
 * Put 对应一条数据
 */publicstaticclassHDFSMapperextendsMapper<LongWritable, Text, ImmutableBytesWritable, Put>{private ImmutableBytesWritable rowkey = new ImmutableBytesWritable(); //rowkeyprivatebyte[] info = Bytes.toBytes("info");//列族privatebyte[] name = Bytes.toBytes("name");//列:课程名称 nameprivatebyte[] num = Bytes.toBytes("num");//列:人数 numprivatebyte[] fee = Bytes.toBytes("fee");//列:费用 fee@Overrideprotectedvoidmap(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
 String[] strings = value.toString().split("s+");//按空格分隔(一个或多个空格)if (strings.length == 3) {
 rowkey.set(Bytes.toBytes(strings[0])); //将课程作为rowkey Put put = new Put(Bytes.toBytes(strings[0]));
 put.addColumn(info, name, Bytes.toBytes(strings[0]));
 put.addColumn(info, num, Bytes.toBytes(strings[1]));
 put.addColumn(info, fee, Bytes.toBytes(strings[2]));
 context.write(rowkey, put);
 }
 }
 }
 @Overridepublicintrun(String[] args) throws Exception {
 //Configuration 读取 hadoop core-site.xml文件 Configuration cfg = new Configuration();
 //设置生成的jar名字 cfg.set("mapred.jar", "E:codeworkspace_ideahadoopprojecthadoop_mapreduce_demotargethadoop_mapreduce_demo-1.0-SNAPSHOT.jar");
 Job job = Job.getInstance(cfg, "导入课程到HBase中");
 job.setJarByClass(ImpHBaseFormHDFS.class);
 job.setMapperClass(HDFSMapper.class);
 job.setMapOutputKeyClass(ImmutableBytesWritable.class);
 job.setMapOutputValueClass(Put.class);
 FileInputFormat.addInputPath(job, new Path(args[0]));
 //TableMapReduceUtil 读取了hadoop的配置文件和hbase的配置文件,并做了合并 TableMapReduceUtil.initTableReducerJob(
 args[1], //output tablenull, //reducer class job);
 job.setNumReduceTasks(1); //at least one, adjust as required//成功返回0,失败返回1return job.waitForCompletion(true)? 0 : 1;
 }
 publicstaticvoidmain(String[] args) throws Exception {
 int n = ToolRunner.run(new ImpHBaseFormHDFS(), args);
 System.out.println(n);
 }
}

You need to add parameters to the project before running
这里写图片描述

Import data from hbase into MySQL
package com.cfl.mapreduce.hbase;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.CellUtil;import java.sql.Connection;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.filter.CompareFilter;import org.apache.hadoop.hbase.filter.Filter;import org.apache.hadoop.hbase.filter.RegexStringComparator;import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.hbase.mapreduce.TableMapper;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import java.io.IOException;import java.sql.DriverManager;import java.sql.SQLException;import java.sql.Statement;/**
 * MapReduce操作HBase:将HBase中的数据导入到MySql
 * Map的作用是分布式的查询到符合的记录
 * Reduce得到map的输出汇总,连接mysql,存储数据(这样只需要连接一次mysql,提高效率)
 * 如果在map中连接mysql,存储数据,每一次map都会连接,效率低
 */public class HBaseToMySql extends Configured implements Tool {
 public static void addTmpJar(String jarPath, Configuration conf) throws IOException {
 System.setProperty("path.separator", ":"); FileSystem fs = FileSystem.getLocal(conf); String newJarPath = new Path(jarPath).makeQualified(fs).toString(); String tmpjars = conf.get("tmpjars"); if (tmpjars == null || tmpjars.length() == 0) {
 conf.set("tmpjars", newJarPath); } else {
 conf.set("tmpjars", tmpjars + "," + newJarPath); }
 }
 public static class ReadMap extends TableMapper<NullWritable, Text>{
 private Text sql = new Text();//获取列的值
 private String getValue(String qualifier, Result result){
 return Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes(qualifier))); }
 @Override
 protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
 String name = getValue("name", value); String numStr = getValue("num", value); String pay = getValue("fee", value); int num = Integer.parseInt(numStr); String str = "insert into tb_course(name,num,pay) values('"+name+"',"+num+",'"+pay+"')"; sql.set(str); context.write(NullWritable.get(), sql); }
 }
 public static class WriteReduce extends Reducer<NullWritable, Text, NullWritable, NullWritable>{
 private Connection conn = null; private Statement st = null;//连接mysql
 @Override
 protected void setup(Context context) throws IOException, InterruptedException {
 try {
 Class.forName("com.mysql.jdbc.Driver"); conn = DriverManager.getConnection("jdbc:mysql://192.168.19.95:3306/kgc","root","root");st = conn.createStatement(); } catch (SQLException e) {
 throw new InterruptedException(e.getMessage()); } catch (ClassNotFoundException e) {
 throw new InterruptedException(e.getMessage()); }
 }
//不做任何输出,插入数据
 @Override
 protected void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
 for (Text v: values) {
 try {
 st.executeUpdate(v.toString()); } catch (SQLException e) {
 throw new InterruptedException(e.getMessage()); }
 }
 }
//关闭连接
 @Override
 protected void cleanup(Context context) throws IOException, InterruptedException {
 try {
 if (st!= null) {
 st.close(); }
 if (conn!= null) {
 conn.close(); }
 } catch (SQLException e) {
 e.printStackTrace(); }
 }
 }
 @Override
 public int run(String[] args) throws Exception {
 Configuration cfg = getConf(); addTmpJar(args[0], cfg); cfg.set("mapreduce.job.jar", "E:codeworkspace_ideahadoopprojecthadoop_mapreduce_demotargethadoop_mapreduce_demo-1.0-SNAPSHOT.jar"); Job job = Job.getInstance(cfg, "从 HBase 将收费课程导入到MySQL DB"); job.setJarByClass(HBaseToMySql.class);//查询含有"K币"的课程
 Scan scan = new Scan();//Filter filter = new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("fee"), CompareFilter.CompareOp.EQUAL, new RegexStringComparator("K币")); Filter filter = new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("fee"), CompareFilter.CompareOp.EQUAL, Bytes.toBytes("免费")); scan.setFilter(filter); TableMapReduceUtil.initTableMapperJob(args[1], scan, ReadMap.class, NullWritable.class, Text.class, job); job.setReducerClass(WriteReduce.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(NullWritable.class); FileOutputFormat.setOutputPath(job, new Path(args[2])); return job.waitForCompletion(true)? 0 : 1; }
 public static void main(String[] args) throws Exception {
 System.out.println(ToolRunner.run(new HBaseToMySql(), args)); }
}

We need to import the data from hbase into MySQL, which requires a third party jar, which is a single method of addtmpjar ( ) to add a third-party jar, because if you submit a directly using a path to windows errors, if you want to add multiple third-party jars, you can call multiple addtmpjar ( ) methods if you want to add multiple third-party jars. In addition to this approach, you can also submit third-party jars, such as MySQL 's driver jars.
Note: when you submit a third-party jar using libjars, it isn't a parameter, but hadoop reads it

public int run(String[] args) throws Exception {
 Configuration cfg = getConf(); cfg.set("mapreduce.job.jar", "E:codeworkspace_ideahadoopprojecthadoop_mapreduce_demotargethadoop_mapreduce_demo-1.0-SNAPSHOT.jar"); Job job = Job.getInstance(cfg, "从 HBase 将收费课程导入到MySQL DB"); job.setJarByClass(HBaseToMySql.class);//查询含有"K币"的课程
 Scan scan = new Scan();//Filter filter = new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("fee"), CompareFilter.CompareOp.EQUAL, new RegexStringComparator("K币")); Filter filter = new SingleColumnValueFilter(Bytes.toBytes("info"), Bytes.toBytes("fee"), CompareFilter.CompareOp.EQUAL, Bytes.toBytes("免费")); scan.setFilter(filter); TableMapReduceUtil.initTableMapperJob(args[0], scan, ReadMap.class, NullWritable.class, Text.class, job); job.setReducerClass(WriteReduce.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(NullWritable.class); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true)? 0 : 1;



Copyright © 2011 Dowemo All rights reserved.    Creative Commons   AboutUs