Problems encountered
According to the regulations issued by the state, we need to encrypt and store users’ private data, such as mobile phone numbers and IP addresses. Our data warehouse is stored in AWS S3 storage system, and the data is queried through Presto. In many cases, it is necessary to find the data accurately to specific users. But our user data was already encrypted, and Presto didn’t support decryption, so it took a lot of effort to decrypt it programmatically, so custom plugins came into play.
Basic version
I used the EMR version of AWS as follows:
JDK: 1.8.0_92 EMR: EMR-5.30.1 Presto: 0.232Copy the code
JDK: 1.8.0_201 EMR: EMR-5.24.0 Presto: 0.219Copy the code
By the way, if you copy my code, please confirm that you are the maintenance author of Presto. Now there are two versions of Presto on the Internet, which are incompatible with each other. One is com.facebook.presto, which I use now, and one is IO. Prestosql. The Presto SDK version of my program is 0.232, and the tests are compatible with lower versions, such as 0.219. \
Write a program
File directory
G: ├ ─. Idea └ ─ SRC ├ ─ the main │ ├ ─ Java │ │ └ ─ com │ │ └ ─ yi │ │ └ ─ udfs │ │ ├ ─ model │ │ ├ ─ scalar │ │ │ ├ ─ cryptography │ │ │ ├ ─ date │ │ │ ├ ─ geo │ │ │ ├ ─ json │ │ │ └ ─ other │ │ └ ─ utils │ └ ─ resources │ └ ─ meta-inf │ └ ─ services └ ─ test └ ─ Java └ ─ com └ ─ yiCopy the code
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.yi</groupId>
<artifactId>presto-third-udfs</artifactId>
<version>1.0 the SNAPSHOT</version>
<properties>
<java.version>1.8</java.version>
<presto.version>0.232</presto.version>
<slice.version>0.38</slice.version>
<joda.version>2.8.2</joda.version>
<airlift.version>0.189</airlift.version>
<commons-codec.version>1.15</commons-codec.version>
<junit.version>4.12</junit.version>
<slf4j.version>1.7.25</slf4j.version>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<! -- https://mvnrepository.com/artifact/com.facebook.presto/presto-spi -->
<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-spi</artifactId>
<version>${presto.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>slice</artifactId>
<version>${slice.version}</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>${joda.version}</version>
</dependency>
<dependency>
<groupId>com.facebook.airlift</groupId>
<artifactId>json</artifactId>
<version>${airlift.version}</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>${commons-codec.version}</version>
</dependency>
<! -- Logger -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
</dependencies>
<build>
<finalName>presto-third-udfs</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-release-plugin</artifactId>
<version>2.5.1</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<includes>
<include>io.airlift:log</include>
<include>joda-time:joda-time</include>
</includes>
</artifactSet>
<relocations>
<relocation>
<pattern>io.airlift.log</pattern>
<shadedPattern>io.airlift.log.shaded</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Copy the code
PrestoPlugin, which inherits the Plugin class of Presto SPI so that Presto starts and loads. The program will scan all classes in the com.yi.udfs.Scalar package and load the classes with specific annotations into the function.
package com.yi.udfs;
import com.facebook.presto.spi.Plugin;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.URL;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
/ * * \ *@author YI
* @description
* @date create in 2021/8/17 9:38
*/
public class PrestoPlugin implements Plugin {
private static Logger logger = LoggerFactory.getLogger(PrestoPlugin.class);
/** * Load class *@returnReturns the class object */
publicSet<Class<? >> getFunctions() {try{ List<Class<? >> classes = getFunctionClasses(); Set<Class<? >> set = Sets.newHashSet();for(Class<? > clazz : classes) {if (clazz.getName().startsWith("com.yi.udfs.scalar")) {
logger.info("Loading function:"+ clazz); set.add(clazz); }}returnImmutableSet.<Class<? >>builder().addAll(set).build(); }catch (IOException e) {
logger.error("Cannot load classes from jar files!", e);
returnImmutableSet.of(); }}/** * Get class object * by reflection@returnClass object *@throws IOException
*/
privateList<Class<? >> getFunctionClasses()throwsIOException { List<Class<? >> classes = Lists.newArrayList(); String classResource =this.getClass().getName().replace("."."/") + ".class";
String jarURLFile = Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource(classResource)).getFile();
int jarEnd = jarURLFile.indexOf('! ');
// This is the URL format, again converted to get the actual file location
String jarLocation = jarURLFile.substring(0, jarEnd);
jarLocation = new URL(jarLocation).getFile();
ZipInputStream zip = new ZipInputStream(new FileInputStream(jarLocation));
for(ZipEntry entry = zip.getNextEntry(); entry ! =null; entry = zip.getNextEntry()) {
if (entry.getName().endsWith(".class") && !entry.isDirectory()) {
String className = entry.getName().replace("/".".");
// Delete the.class suffix
className = className.substring(0, className.length() - 6);
try {
classes.add(Class.forName(className));
} catch (ClassNotFoundException e) {
logger.error("Unable to load class {}, exception: {}", className, e); }}}returnclasses; }}Copy the code
AESFunctions, a library responsible for AES encryption and decryption, is the library we need to use in Presto.
package com.yi.udfs.scalar.cryptography;
import com.facebook.presto.spi.function.Description;
import com.facebook.presto.spi.function.ScalarFunction;
import com.facebook.presto.spi.function.SqlType;
import com.facebook.presto.spi.type.StandardTypes;
import com.yi.udfs.utils.AESUtil;
import com.yi.udfs.utils.StringUtil;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
/ * * *@author YI
* @descriptionUse AES encryption to decrypt AES-128-ECB encryption *@date create in 2021/8/17 11:00
*/
public class AESFunctions {
@ the Description (" aes encryption ")
@ScalarFunction("aes_encrypt")
@SqlType(StandardTypes.VARCHAR)
public static Slice aesEncrypt(@SqlType(StandardTypes.VARCHAR) Slice cSrc, @SqlType(StandardTypes.VARCHAR) Slice cKey) throws Exception {
if (cSrc == null || StringUtil.empty(cSrc.toStringUtf8())) {
return cSrc;
}
/ / encryption
String enString = AESUtil.Encrypt(cSrc.toStringUtf8(), cKey);
return Slices.utf8Slice(enString);
}
@ the Description (" aes decryption ")
@ScalarFunction("aes_decrypt")
@SqlType(StandardTypes.VARCHAR)
public static Slice aesDecrypt(@SqlType(StandardTypes.VARCHAR) Slice cSrc, @SqlType(StandardTypes.VARCHAR) Slice cKey) {
if (cSrc == null || StringUtil.empty(cSrc.toStringUtf8())) {
return cSrc;
}
/ / decryption
String deString = AESUtil.Decrypt(cSrc.toStringUtf8(), cKey);
returnSlices.utf8Slice(deString); }}Copy the code
@description: a comment on a custom function to let people know what the function does. @scalarFunction: @sqlType: the return value of the method in the @SQLType parameter can be used to indicate the type of the parameter, custom function Slice is our Java String.
The deployment of
1. If you are also using AWS EMR and the cluster is already running, this can only be replaced one node at a time.
Sudo does not have permission to copy the files downloaded by sudo to the target directory. If you do not have permission to copy the files to S3, it will be easier to use the second command. /usr/bin/aws s3 cp s3://kylin-data/presto-third-udfs.jar /home/hadoop/ sudo mv presto-third-udfs.jar /usr/lib/presto/plugin/hive-hadoop2/ 2. Restart presto, each node needs to restart, restart the document is as follows: https://aws.amazon.com/cn/premiumsupport/knowledge-center/restart-service-emr/Copy the code
2. If you want to automatically load the plug-in when the cluster starts, you can use it after starting up. You can do as follows, again using AWS EMR as an example. I wrote a script to put on S3 file system as follows:
#! /bin/bashaws s3 cp s3://kylin-data/presto-third-udfs.jar /home/hadoop/ sudo mkdir -p /usr/lib/presto/plugin/hive-hadoop2/ sudo cp /home/hadoop/presto-third-udfs.jar /usr/lib/presto/plugin/hive-hadoop2/exit 0
Copy the code
To start in the Web console, you need to add this script at boot
Application startup cluster (Java)
RunJobFlowRequest Request = new RunJobFlowRequest() // Cluster name.withname (emrName) // EMR version, WithReleaseLabel (" EMR-5.30.1 ") // The cluster work step is automatically executed after the cluster is started. WithSteps (stepConfigs) // Components that need to be installed in the cluster,hive, hadoop. withApplications(applications) // EMR log path WithLogUri (" S3 :// apM-event-source-data /") // IAM role of cluster-related AWS services. WithServiceRole ("EMR_DefaultRole") // IAM role of Ec2 in the cluster .withJobFlowRole("EMR_EC2_DefaultRole").withBootstrapActions(new BootstrapActionConfig().withname (" Install custom plug-in ") .withScriptBootstrapAction(new ScriptBootstrapActionConfig() .withPath("s3://kylin-data/copyJarFile.sh"))) ......Copy the code
Add bootstrap and copy our plugin to the target directory during bootstrap
WithBootstrapActions (new BootstrapActionConfig (.) withName (" install custom plug-in "). WithScriptBootstrapAction (new ScriptBootstrapActionConfig() .withPath("s3://kylin-data/copyJarFile.sh")))Copy the code
use
Public encryption and decryption key: J4NWAAAAAAAAAA
Presto encryption and decryption
Encrypt function: select aes_encrypt(' Hello world ', 'j4NwaAAAAAAAAA '); Decryption function: select AES_decrypt ('KbAC1EtPwRSbNmS9oaBSuA==', 'J4NwAAAAAAAAAAAA'); Output: Hello worldCopy the code
Hive uses AES encryption and decryption functions as follows:
Encryption function: SELECT base64(aes_encrypt(' Hello world ', 'j4NwaAAAAAAAAA ')); Decryption function: SELECT AES_decrypt (unbase64('KbAC1EtPwRSbNmS9oaBSuA=='), 'J4NwAAAAAAAAAAAA'); Output: Hello worldCopy the code
Personal blog: A Leaf autumn Maple (hwy.ac.cn)