再学HBase,关于HBase的一些基础知识 | 如何集成在java中

kun1790051360 / 2025-01-20 / 原文

HBase简介

  • 在使用方面:HBase是一种数据仓库,是基于hdfs的nosql数据源,数据都是存放在hdfs上的,不需要像hive一样再去运行MapReduce进行长时间运算。

特点:

  • 在phonenix/hive的集成下才可以支持sql,本身是有自己的dql语言的。
  • 具有一级索引rowKey,基于一级索引查询
  • hbase的表都是物理表,有独立的物理数据结构,查询时把数据加载到内存,提高查询效率

缺点

  • hbase是依赖于zk和hdfs的,需要zk协调服务,配置管理,维护元数据命名空间等问题;需要hdfs来存储hbase的数据

关于CRUD:

查:HBase是如何锁定数据的:通过rowKey、列族、列名、时间戳(这个一般不需要手动写)来找到准确的cell

改:注意HBase是基于hdfs的,所以他是不能修改数据的。因此修改数据的时候我们会通过时间戳版本来标记新数据。

关于分片:

HBase具有强大的分布式功能,可以通过rowKey分片,也可以通过column family分片。如图例,通过这种灵活的分片方式可以把他分成六个分片

关于要求:hbase的数据库被称为namespace;在创建时仅需要声明column family即可;rowKey的排序是按字典顺序存储

 

关于java集成hbase

<dependency>
     <groupId>org.apache.hbase</groupId>
     <artifactId>hbase-client</artifactId>
     <version>2.2.7</version>
</dependency>

链接hbase

private static Connection getConn(){
        //获取hbase链接
        Configuration conf = new Configuration();
        //指定hbase使用的zk地址
        //注意:需要在执行hbase hava代码的机器上配置zk和hbase集群的主机名和ip的映射关系
        conf.set("hbase.zookeeper.quorum","192.168.88.95:2181");
        //指定hbase在hdfs上的根目录
        conf.set("hbase.rootdir","hdfs://192.168.88.95:9000/hbase");
        //创建HBase数据库链接
        Connection co = null;
        try{
            co = ConnectionFactory.createConnection(conf);
        }catch (IOException e){
            System.out.println("获取链接失败:"+e.getMessage());
        }
        return co;
    }

创建表,只需要提供表名namespace和列簇column family

public static void createTable(String tableName,String... cfs) throws Exception {
        Admin admin = conn.getAdmin();
        ArrayList<ColumnFamilyDescriptor> cfArr = new ArrayList<ColumnFamilyDescriptor>();
        for (String cf : cfs) {
            ColumnFamilyDescriptor cfDesc = ColumnFamilyDescriptorBuilder
                    .newBuilder(Bytes.toBytes(cf))
                    .build();
            cfArr.add(cfDesc);
        }
        TableDescriptor tableDesc = TableDescriptorBuilder
                .newBuilder(TableName.valueOf(tableName))
                .setColumnFamilies(cfArr)
                .build();
        admin.createTable(tableDesc);
        admin.close();
    }

添加单元格数据,需指明4个元素:namespace,column family,column,value。这里timestamp是默认本机时间

    public static void put2HBaseCell(String tableName,String rowKey,String columnFamily,String column,String value)throws Exception{
        Table table = conn.getTable(TableName.valueOf(tableName));
        Put put = new Put(Bytes.toBytes(rowKey));
        put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(column),Bytes.toBytes(value));
        table.put(put);
        table.close();
    }

批量添加数据,这里的put和上述添加单元格数据的put一致,主要是减少IO来增快插入速度

    public static void put2HBaseList(String tableName, List<Put> list)throws Exception{
        Table table = conn.getTable(TableName.valueOf(tableName));
        table.put(list);
        table.close();
    }

根据rowKey获取具体的KV数据对

    public static Map<String,String> getFromHBase(String tableName,String rowKey)throws IOException{
        Table table = conn.getTable(TableName.valueOf(tableName));
        Get get = new Get(Bytes.toBytes(rowKey));
        Result result = table.get(get);
        List<Cell> cells = result.listCells();
        HashMap<String, String> resMap = new HashMap<String, String>();
        for (Cell cell: cells) {
            //
            byte[] column_bytes = CellUtil.cloneQualifier(cell);
            //
            byte[] value_bytes = CellUtil.cloneValue(cell);
            resMap.put(new String(column_bytes),new String(value_bytes));
        }
        return resMap;
    }

 

 总结:单例模式创建的HBase工具类

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * HBase工具类
 * Created by LJK
 */
public class HBaseUtil {
    private HBaseUtil(){}

    private static Connection conn = getConn();

    private static Connection getConn(){
        //获取hbase链接
        Configuration conf = new Configuration();
        //指定hbase使用的zk地址
        //注意:需要在执行hbase hava代码的机器上配置zk和hbase集群的主机名和ip的映射关系
        conf.set("hbase.zookeeper.quorum","192.168.88.95:2181");
        //指定hbase在hdfs上的根目录
        conf.set("hbase.rootdir","hdfs://192.168.88.95:9000/hbase");
        //创建HBase数据库链接
        Connection co = null;
        try{
            co = ConnectionFactory.createConnection(conf);
        }catch (IOException e){
            System.out.println("获取链接失败:"+e.getMessage());
        }
        return co;
    }

    /**
     * 对外提供的方法
     * @return
     */
    public static Connection getInstance(){
        return conn;
    }

    /**
     * 创建表
     * @param tableName
     * @param cfs
     */
    public static void createTable(String tableName,String... cfs) throws Exception {
        Admin admin = conn.getAdmin();
        ArrayList<ColumnFamilyDescriptor> cfArr = new ArrayList<ColumnFamilyDescriptor>();
        for (String cf : cfs) {
            ColumnFamilyDescriptor cfDesc = ColumnFamilyDescriptorBuilder
                    .newBuilder(Bytes.toBytes(cf))
                    .build();
            cfArr.add(cfDesc);
        }
        TableDescriptor tableDesc = TableDescriptorBuilder
                .newBuilder(TableName.valueOf(tableName))
                .setColumnFamilies(cfArr)
                .build();
        admin.createTable(tableDesc);
        admin.close();
    }

    /**
     * 添加一个单元格(列)的数据
     * @param tableName
     * @param rowKey
     * @param columnFamily
     * @param column
     * @param value
     * @throws Exception
     */
    public static void put2HBaseCell(String tableName,String rowKey,String columnFamily,String column,String value)throws Exception{
        Table table = conn.getTable(TableName.valueOf(tableName));
        Put put = new Put(Bytes.toBytes(rowKey));
        put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(column),Bytes.toBytes(value));
        table.put(put);
        table.close();
    }

    /**
     * 向hbase中添加一批数据
     * @param tableName
     * @param list
     * @throws Exception
     */
    public static void put2HBaseList(String tableName, List<Put> list)throws Exception{
        Table table = conn.getTable(TableName.valueOf(tableName));
        table.put(list);
        table.close();
    }

    /**
     * 根据Rowkey获取数据
     * @param tableName
     * @param rowKey
     * @return
     * @throws IOException
     */
    public static Map<String,String> getFromHBase(String tableName,String rowKey)throws IOException{
        Table table = conn.getTable(TableName.valueOf(tableName));
        Get get = new Get(Bytes.toBytes(rowKey));
        Result result = table.get(get);
        List<Cell> cells = result.listCells();
        HashMap<String, String> resMap = new HashMap<String, String>();
        for (Cell cell: cells) {
            //
            byte[] column_bytes = CellUtil.cloneQualifier(cell);
            //
            byte[] value_bytes = CellUtil.cloneValue(cell);
            resMap.put(new String(column_bytes),new String(value_bytes));
        }
        return resMap;
    }
}