Other Hive languages

HiveServer2 makes it possible for other languages to access Hive. Before we begin, we will review the following concepts

  • Metadata: Hive metadata, including table names, field names, types, partitions, and users defined by Hive. You can also use hive’s built-in Derby database in the testing phase of mysql, which stores relational books in general.

  • Metastore: Hivestore server. Converts DDL and DML statements to MapReduce and submits them to HDFS.

  • Hiveserver2: Indicates the Hive server. Hive services are provided. Clients can connect to Hive using beeline and JDBC (that is, Java code links).

  • Beeline: a tool used to connect hive clients to Hive. Mysql client. For example, Navite Cat.

Other languages access Hive mainly through HiveServer2. HiveServer2(HS2) is a service that enables clients to perform Hive query. HiveServer2 supports embedded and remote access to HiveServer2, multi-client concurrency, and identity authentication. Designed to provide better support for open API clients such as JDBC and ODBC.

The default hive server port is 10000. You can connect to Hive using Beeline, JDBC, and ODBC. When hiveserver2 is started, hiveserver2 checks whether hive.metastore.uris is configured. If hiveserver2 is not configured, A MetaStore service is started and hiveserver2 is started. If hive.metastore.uris is configured. Connects to the remote MetaStore service. This method is the most commonly used. The deployment is shown below:

Python access Hive

Python3 to access Hive, install the following dependencies:

  • pip3 install thrift
  • pip3 install PyHive
  • pip3 install sasl
  • pip3 install thrift_sasl

Here is a Python utility class for accessing Hive:

# -*- coding:utf- 8 - -*-

from pyhive import hive


class HiveClient(object):
	"""docstring for HiveClient"""
	def __init__(self, host='hadoop-master',port=10000,username='hadoop',password='hadoop',database='hadoop',auth='LDAP'):
		""" 
		create connection to hive server2 
		"""  
		self.conn = hive.Connection(host=host,  
			port=port,  
			username=username,  
			password=password,  
			database=database,
			auth=auth) 

	def query(self, sql):
		""" 
		query 
		""" 
		with self.conn.cursor() as cursor: 
			cursor.execute(sql)
			return cursor.fetchall()

	def insert(self, sql):
		"""
		insert action
		"""
		with self.conn.cursor() as cursor:
			cursor.execute(sql)
			# self.conn.commit()
			# self.conn.rollback()

	def close(self):
		""" 
		close connection 
		"""  
		self.conn.close()
Copy the code

To do this, you simply import and create an instance of an object, pass it into SQL and call the Query method.

# Get a connection
hclient = hive.HiveClient()

# execute query.# close the connection
hclient.close()
Copy the code

Note: in the insert method, I annotated self.conn.mit () and self.conn.rollback(), which are transactional operations for traditional relational databases and not supported in Hive.

The Java connection Hive

Java is the basic language of big data. Connecting to Hive is supported well. This section describes how to connect to Hive using JDBC and Mybatis.

1. A Jdbc connection

Java connects to Hiveserver through JDBC, just like traditional JDBC connects to mysql.

Hive-jdbc dependencies are required:

<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-jdbc</artifactId>
    <version>1.2.1</version>
</dependency>
Copy the code

Code to connect mysql routines, is using the DriverManager. GetConnection (url, username, password) :

@NoArgsConstructor
@AllArgsConstructor
@Data
@ToString
public class HiveConfigModel {

    private String url = "jdbc:hive2://localhost:10000";
    private String username = "hadoop";
    private String password = "hadoop";
    
}

@Test
public void test(a){
    // Initialize the configuration
    HiveConfigModel hiveConfigModel = ConfigureContext.getInstance("hive-config.properties")
            .addClass(HiveConfigModel.class)
            .getModelProperties(HiveConfigModel.class);

    try {
        Connection conn = DriverManager.getConnection(hiveConfigModel.getUrl(),
                hiveConfigModel.getUsername(), hiveConfigModel.getPassword());


        String sql = "show tables";
        PreparedStatement preparedStatement = conn.prepareStatement(sql);
        ResultSet rs = preparedStatement.executeQuery();
        List<String> tables = new ArrayList<>();
        while (rs.next()){
            tables.add(rs.getString(1));
        }

        System.out.println(tables);
    } catch(SQLException e) { e.printStackTrace(); }}Copy the code

Java.sql.Driver: java.sql.

org.apache.hive.jdbc.HiveDriver
Copy the code

Java. SQL. DriverManager using spi implements the service interface and service implementation separation in order to achieve decoupling, the realization of the JDBC org. Here the apache. Hive. JDBC. HiveDriver based on Java. SQL. The Driver to provide standardized implementation logic. Clients using JDBC do not need to change the code and simply introduce different SPI interface services.

DriverManager.getConnection(url, username, password)
Copy the code

This will get the connection, provided the implementation complies with the corresponding SPI specification.

2. Integrate mybatis

Mybatis is usually used for dao layer database access, and hive access is similar.

Configuration file sqlconfig.xml:


      
<! DOCTYPEconfiguration PUBLIC "- / / mybatis.org//DTD Config / 3.0 / EN"
        "http://mybatis.org/dtd/mybatis-3-config.dtd">
<configuration>
    <environments default="production">
        <environment id="production">
            <transactionManager type="JDBC"/>
            <dataSource type="POOLED">
                <property name="driver" value="org.apache.hive.jdbc.HiveDriver"/>
                <property name="url" value="jdbc:hive2://master:10000/default"/>
                <property name="username" value="hadoop"/>
                <property name="password" value="hadoop"/>
            </dataSource>
        </environment>
    </environments>
    <mappers>
        <mapper resource="mapper/hive/test/test.xml"/>
    </mappers>
</configuration>
Copy the code

Mapper code omitted, implementation code:

public classTestMapperImpl implements TestMapper {

    private static SqlSessionFactory sqlSessionFactory = HiveSqlSessionFactory.getInstance().getSqlSessionFactory();

    @Override
    public int getTestCount(String dateTime) {
        SqlSession sqlSession = sqlSessionFactory.openSession();
        TestMapper testMapper = sqlSession.getMapper(TestMapper.class);

        int count = testMapper.getTestCount(dateTime);

        sqlSession.close();

        returncount; }}Copy the code

3. Integrate SpringBoot

There are different levels of personnel in each department of the company, and it is impossible to use the big data analysis background, let alone write SQL. At this time, you can develop a set of self-fetch system, which can obtain the corresponding data through page operation. At this time, you usually need to use SpringBoot to connect mysql and Hive to generate reports. The Druid connection pool is integrated with Hive.


Tasks that need to be done
  • Everyone can be therewebPage to writesqlTo completeHiveQuery task;
  • The amount of query data should not be too large, not more than 60 days of data (that would be a disaster);
  • Obtained after the query task is submittedyarnIf the resource situation is tight, reject;
  • The background will be abnormal, and the reason for the denial of service by throwing exceptions, feedback information to the front page;
  • If someone has looked it up before it will be savedmysql, the second person query, no need to queryHive, just frommysqlIt take;

1) Dependence on needs

To save space, hiveserver2 is used to connect the main Maven dependencies of Hive. The parent project Springboot dependency is omitted.

<! -- Version information -->
<properties>
    <hadoop.version>2.6.5</hadoop.version>
    <mybatis.version>3.2.7</mybatis.version>
</properties>
<dependency>
    <groupId>org.mybatis</groupId>
    <artifactId>mybatis</artifactId>
    <version>${mybatis.version}</version>
</dependency>

<! -- Hadoop dependency -->
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>${hadoop.version}</version>
</dependency>

<! -- hive-jdbc -->
<! -- https://mvnrepository.com/artifact/org.apache.hive/hive-jdbc -->
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-jdbc</artifactId>
    <version>1.2.1</version>
</dependency>

<! -- Parsing HTML -->
<dependency>
    <groupId>org.jsoup</groupId>
    <artifactId>jsoup</artifactId>
    <version>1.8.3</version>
</dependency>
	
Copy the code

2)application-test.ymlFile:

# Spring configuration
spring:
  # data source configuration
  datasource:
    type: com.alibaba.druid.pool.DruidDataSource
    driverClassName: com.mysql.cj.jdbc.Driver
    druid:
      Primary library data source
      master:
        url: jdbc:mysql://localhost:3306/test? useUnicode=true&characterEncoding=utf8&useSSL=true&serverTimezone=GMT%2B8
        username: root
        password: root
      # from the library data source
      slave:
        Switch from data source/off by default
        enabled: true
        url: jdbc:mysql://localhost:3306/test2? useUnicode=true&characterEncoding=utf8&useSSL=true&serverTimezone=GMT%2B8
        username: root
        password: root
      # from library data source 2
      #... Omit...
      # Hive data source
      slave3:
      Switch from data source/off by default
        enabled: true
        driverClassName: org.apache.hive.jdbc.HiveDriver
        url: jdbc:hive2://master:10000/default
        username: hive
        password: hive
      # initial connection number
      initialSize: 5
      Minimum number of connection pools
      minIdle: 10
      # Maximum number of connection pools
      maxActive: 20
      Set the connection wait timeout
      maxWait: 60000
      Configure how often to detect idle connections that need to be closed, in milliseconds
      timeBetweenEvictionRunsMillis: 60000
      Set the minimum time for a connection to live in the pool in milliseconds
      minEvictableIdleTimeMillis: 300000
      Set the maximum number of milliseconds for a connection to live in the pool
      maxEvictableIdleTimeMillis: 900000
Copy the code

Mysql and Hive are configured for the data source. By default, the master data source is used to access mysql and only need to switch in mapper layer.

Code implementation is the same as other procedures, mapper, Service, controller layer, the same routine. The real-time and offline YARN resource queues are configured. Because the queues may be overloaded when people in other departments use yarn resources, the data volume must be limited within 60 days and the resource usage of the cluster cannot exceed 55%. This section describes how to prevent the data volume at the Controller layer.

Entity classUserModel:

@NoArgsConstructor
@AllArgsConstructor
@Data
@ToString
public class UserModel extends BaseEntity{

    private String userId;
    private Integer count;
}
Copy the code

3) The cluster resource usage is not greater than or equal to55%

Because many business query logic controllers use excessive data volume defense, annotations associated with Spring facets are used to identify the controller.

Define the planeYarnResourceAspectAnd associated annotations@YarnResource

@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface YarnResource {

}

@Aspect
@Component
public class YarnResourceAspect {

    private static final Logger log = LoggerFactory.getLogger(YarnResourceAspect.class);

    /** * Configure the pointcut */
    @Pointcut("@annotation(com.ruoyi.common.annotation.YarnResource)")
    public void yarnResourcdPointCut(a){}/** * Check whether yarn resources are available */
    @Before("yarnResourcdPointCut()")
    public void before(a){
        log.info("* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * check whether the resources of the yarn available * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *");
        // YARN resources are insufficient
        if(! YarnClient.yarnResourceOk()){throw newInvalidStatusException(); }}}Copy the code
  1. To obtainyarnResource usage data of

The time for submitting a task is variable. You need to determine whether Hive query can be performed based on the YARN resource status at the time of submitting the task to avoid affecting online tasks.

@Slf4j
public class YarnClient {

    /** * The maximum number of YARN resources */
    private static final int YARN_RESOURCE = 55;

    / * * * *@returnTrue: Resources are normal. False: resources are tight. */
    public static boolean yarnResourceOk(a) {
        try {
            URL url = new URL("http://master:8088/cluster/scheduler");
            HttpURLConnection conn = null;
            conn = (HttpURLConnection) url.openConnection();
            conn.setRequestMethod("GET");
            conn.setUseCaches(false);
            // The request timed out 5 seconds
            conn.setConnectTimeout(5000);
            // Set the HTTP header:
            conn.setRequestProperty("Accept"."* / *");
            conn.setRequestProperty("User-Agent"."Mozilla / 5.0 (Windows NT 6.1; Win64; X64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.111 Safari/537.36");
            // Connect and send an HTTP request:
            conn.connect();

            // Check whether the HTTP response is 200:
            if(conn.getResponseCode() ! =200) {
                throw new RuntimeException("bad response");
            }
            // Get all the response headers:
            Map<String, List<String>> map = conn.getHeaderFields();
            for (String key : map.keySet()) {
                System.out.println(key + ":" + map.get(key));
            }
            // Get the response content:
            InputStream input = conn.getInputStream();
            byte[] datas = null;

            try {
                // Read data from the input stream
                datas = readInputStream(input);
            } catch (Exception e) {
                e.printStackTrace();
            }
            String result = new String(datas, "UTF-8");// Convert the binary stream to String

            Document document = Jsoup.parse(result);

            Elements elements = document.getElementsByClass("qstats");

            String[] ratios = elements.text().split("used");

            return Double.valueOf(ratios[3].replace("%"."")) < YARN_RESOURCE;
        } catch (IOException e) {
            log.error("Failed to obtain YARN Resources");
        }

        return false;

    }

    private static byte[] readInputStream(InputStream inStream) throws Exception {
        ByteArrayOutputStream outStream = new ByteArrayOutputStream();
        byte[] buffer = new byte[1024];
        int len = 0;
        while((len = inStream.read(buffer)) ! = -1) {
            outStream.write(buffer, 0, len);
        }
        byte[] data = outStream.toByteArray();
        outStream.close();
        inStream.close();
        returndata; }}Copy the code
  1. incontrollerOn by note@YarnResourceIdentification:
@Controller
@RequestMapping("/hero/hive")
public class HiveController {

    /** * HTML file address prefix */
    private String prefix = "hero";

    @Autowired
    IUserService iUserService;

    @RequestMapping("")
    @RequiresPermissions("hero:hive:view")
    public String heroHive(a){
        return prefix + "/hive";
    }

    @YarnResource
    @RequestMapping("/user")
    @RequiresPermissions("hero:hive:user")
    @ResponseBody
    public TableDataInfo user(UserModel userModel){
        DateCheckUtils.checkInputDate(userModel);

        PageInfo pageInfo = iUserService.queryUser(userModel);
        TableDataInfo tableDataInfo = new TableDataInfo();

        tableDataInfo.setTotal(pageInfo.getTotal());
        tableDataInfo.setRows(pageInfo.getList());

        returntableDataInfo; }}Copy the code

6) The query data span shall not exceed60Day of inspection

In this way, every time a request enters the Controller, it will automatically check whether the query date is more than 60 days old, preventing too much data loading and causing insufficient resources for other tasks.

public class DateCheckUtils {

    /** * select * from ** *; /** *@param o
     */
    public static void checkInputDate(BaseEntity o){
        if("".equals(o.getParams().get("beginTime"&&))"".equals(o.getParams().get("endTime"))) {throw new InvalidTaskException();
        }

        String beginTime = "2019-01-01";
        String endTime = DateUtils.getDate();

        if(!"".equals(o.getParams().get("beginTime"))){
            beginTime = String.valueOf(o.getParams().get("beginTime"));
        }

        if(!"".equals(o.getParams().get("endTime"))){
            endTime = String.valueOf(o.getParams().get("endTime"));
        }

        // Query data for a time span of more than two months
        if(DateUtils.getDayBetween(beginTime, endTime) > 60) {throw newInvalidTaskException(); }}}Copy the code

Note that you need to switch data sources when accessing Hive, because other pages also have access to mysql.

7) Each query result will be enteredmysql

If there is no query, the data will be stored in mysql and returned to the page. If there is no query, the data will be retrieved from Hive. The following part of the code is also in the controller, so IT’s separate here.

Mysql is a cache medium. Mysql is a cache medium
PageInfo pageInfo = iToplocationService.queryToplocation(toplocationCountModel);
if(pageInfo.getList().size() > 0){
    log.info("Data exists, obtained directly from mysql...");
    tableDataInfo.setTotal(pageInfo.getTotal());
    tableDataInfo.setRows(pageInfo.getList());
}else if(iToplocationService.queryExistsToplocation(toplocationCountModel) == null){
    log.info("Querying data from hive...");
    PageInfo pageInfo2 = iToplocationService.query(toplocationCountModel);

    // Save to mysql
    log.info("Batch save to mysql...");
    List<ToplocationCountModel> toplocationCountModels = pageInfo2.getList();
    int i = 0;
    while (i < toplocationCountModels.size()){
        if(toplocationCountModels.size() - i > 10000){
            iToplocationService.insertToplocation(toplocationCountModels.subList(i, i + 10000));
        }else{
            iToplocationService.insertToplocation(toplocationCountModels.subList(i, toplocationCountModels.size()));
        }

        i = i + 10000;
    }
Copy the code

At present the function looks very simple, did not use what lofty thing, behind slowly perfect.

conclusion

  1. The successful integration of Spring Boot allows us to easily integrate Hive into our data platform, as well as develop other Hive based data products
  2. It’s a small step today, but we’re going to do a lot of fun things based on it