1 background
Use Python to open the Impala channel for automated fetch or as a data source for data analysis.
2 Apache Impala
- Impala is an open source, Hadoop-based analytical database.
- Impala can query data stored in HDFS or HBase.
- Impala uses a dedicated distributed query engine to access data, bypassing MapReduce, and provides higher query performance than Hive.
3 impyla
- Python client based on HiveServer2 implementation of distributed query engine (such as Impala, Hive).
- Full compliance with the DB API 2.0 (PEP 249) specification.
- Use Kerberos, LDAP, SSL.
- Support for converting data to DataFrame for easy integration into Python data stacks (such as Scikit-learn, matplotlib, etc.).
4 classes encapsulate
class Impala(SQL):
DESC_EXEC_SUCCESS = "Executed successfully"
def __init__(self, host, port, database, user, password=None):
""Impala tool class :param host: IP :param port: port: param database: database name :param user: user name :param password: password """
self.host = host
self.port = port
self.database = database
self.user = user
self.password = password
self.connect = None
self.cursor = None
def get_connect(self, timeout=600):
""" Get connection :param timeout: timeout period """
self.connect = connect(
host=self.host, # IP
port=self.port, # port
timeout=timeout, # timeout
database=self.database # database name
)
def get_cursor(self):
""" Get cursor """
self.cursor = self.connect.cursor(
user=self.user # username
)
def close(self):
""" Close the connection """
self.cursor.close()
self.connect.close()
self.cursor = None
self.connect = None
def execute(self, sql, auto_close=True):
""" Execute SQL :param auto_close: Whether to automatically close the connection after execution """
if not self.connect: self.get_connect()
if not self.cursor: self.get_cursor()
self.cursor.execute(sql)
try:
result = self.cursor.fetchall()
except ProgrammingError:
result = self.DESC_EXEC_SUCCESS
if auto_close: self.close()
return result
Copy the code
5 Example
from utils.db.impala import Impala
impala = Impala(
host="10.123.0.11",
port=123456,
database="fields",
user="unclebean"
)
sql = "select 1 as a, 2 as b union all select 3 as a, 4 as b"
result = impala.execute(sql, auto_close=True)
print(result)
Copy the code
[(1, 2), (3, 4)] If you want the result of a row to be a dictionary instead of a tuple, you need to pass dictify=True to retrieve the cursor, as in
sql = "select 1 as a, 2 as b union all select 3 as a, 4 as b"
result = impala.execute(sql, auto_close=True, dictify=True)
print(result)
Copy the code
[{‘a’: 1, ‘b’: 2}, {‘a’: 3, ‘b’: 4}]
6 Tkinter encapsulation to refresh metadata and daily statistics by one click
import os
from datetime import datetime
from menu.menu import EMenu
from utils.db.impala import Impala
class MenuImpala(EMenu):
LABEL_NAME = "Impala"
LABEL_NAME_INVALIDATE_METADATA = "Invalidate Metadata"
LABEL_NAME_COUNT_BY_DAY = "Count By Day"
DESC_SSH_CMD_FAILED = "Execution error"
def __init__(self, master=None, cnf={}, **kw):
super().__init__(master=master, cnf=cnf, **kw)
self.impala = Impala(
host = self.conf.impala.HOST,
port = self.conf.impala.PORT,
database = self.conf.impala.DATABASE,
user = self.conf.impala.USER
)
master.add_cascade(label=self.LABEL_NAME, menu=self) Add the main menu
self.add_command( # Add submenu - Refresh metadata
label=self.LABEL_NAME_INVALIDATE_METADATA,
command=self.invalidate_metadata
)
self.add_command( # Add submenu - Statistics by day volume
label=self.LABEL_NAME_COUNT_BY_DAY,
command=self.count_by_day
)
@EMenu.thread_run(LABEL_NAME_COUNT_BY_DAY)
def count_by_day(self):
"" menu command: Data volume by day ""
table_name = self.get_table_name_from_clip() Get the table name from the clipboard
self.invalidate_table(table_name, auto_close=False) Refresh metadata first
sql = "select data_date,count(1) from {} group by data_date order by data_date desc".format(
table_name
)
self.stdout(sql, with_time="-")
result = self.impala.execute(sql) # Then statistics data volume by day
result = "\n".join([str(row) for row in result])
self.stdout("{} - > {}".format(sql, result), with_time="-")
self.msg_box_info(result)
@EMenu.thread_run(LABEL_NAME_INVALIDATE_METADATA)
def invalidate_metadata(self):
Menu command: Refresh metadata
self.invalidate_table(self.get_table_name_from_clip()) Get the table name from the clipboard, then refresh the metadata
def invalidate_table(self, table_name, auto_close=True):
sql = "invalidate metadata {}".format(table_name)
self.stdout(sql, with_time="-")
result = self.impala.execute(sql=sql, auto_close=auto_close)
self.stdout("{} - > {}".format(sql, result), with_time="-")
def get_table_name_from_clip(self):
table_name = self.paste()
if len(table_name.split(".")) = =1:
table_name = table_name.split("_") [0] + "." + table_name
return table_name
Copy the code
7 Complete Code
Search for ThecleWhogrowsBeans on GitHub