Introduction: Flink is used in machine learning feature engineering to solve the problem of feature on-line difficulty; And how the SQL + Python UDF is used in production practice.

The author of this article, Chen Yisheng, introduces the upgrade of the machine learning feature system of The Company fish platform. In terms of architecture, Spark is transformed into Flink, which solves the problem of the difficulty of feature on-line, and how SQL + Python UDF is used in production practice. The main contents are:

Preface Summary of old version feature system V1 new version feature system V2

One, foreword

In Pingyu, we use machine learning in multiple online scenarios to improve user experience. For example, in Pingyu picture book, we recommend posts that users are interested in according to their browsing history of posts. In the background of conversion, we recommend courses that users may be interested in according to their purchase records of picture books.

Features are inputs to machine learning models. How efficiently features are processed from data sources so that they can be accessed efficiently by online services determines whether we can reliably use machine learning in a production environment. Therefore, we set up a feature system to solve this problem systematically. At present, the machine learning feature system of Banyu runs close to 100 features, supporting the online feature acquisition requirements of models with multiple lines of business.

Below, we will introduce the evolution of the feature system in the companion fish and the trade-offs involved.

Two, the old version of the feature system V1

Feature system V1 consists of three core components: feature pipeline, feature warehouse, and feature service. The overall architecture is shown in the figure below:

Feature pipeline includes stream feature pipeline and batch feature pipeline. They consume stream data source and batch data source respectively, preprocess data into features (this step is called feature engineering), and write features into feature warehouse.

  • The batch feature pipeline is implemented using Spark and is scheduled by DolphinScheduler and runs on YARN cluster.
  • For consistency of technology stacks, the Spark Structured Streaming stream feature pipe is implemented and runs on the YARN cluster like the batch feature pipe.

The feature repository selects appropriate storage components (Redis) and data structures (Hashes) to provide low-latency feature access for model services. Redis is selected for storage because:

  • Banyu has rich experience in using Redis;
  • Industry Feature warehouse solutions, including DoorDash Feature Store [1] and Feast [2], use Redis.

Feature services mask the storage and data structures of feature repositories, expose RPC interface GetFeatures(EntityName, FeatureNames), and provide low-latency query for features. In implementation, this interface basically corresponds to Redis HMGET EntityName FeatureName_1… FeatureName_N operation.

There are several problems with this version of the feature system:

  • Algorithm engineers lack control, resulting in low efficiency of iteration. The problem has to do with the technology stack involved in the system and the organizational structure of the company. The feature pipeline has the highest iteration requirements in the entire system, and as soon as the model has new requirements for the feature, it needs to modify or write a new Spark task. The writing of Spark tasks requires certain knowledge of Java or Scala, which is not common skills of algorithm engineers. Therefore, the big data team is in charge. Big data teams are responsible for multiple data requirements and often have many scheduling tasks. As a result, the launch of new features involves frequent cross-departmental communication and low iteration efficiency.
  • The feature pipeline only completes the lightweight feature engineering, which reduces the efficiency of online reasoning. As the feature pipeline is written by big data engineers rather than algorithm engineers, complex data preprocessing involves higher communication costs, so the preprocessing degree of these features is relatively light, and more preprocessing is reserved for model service or even inside the model, which increases the delay of model reasoning.

In order to solve these problems, feature system V2 proposes several design objectives:

  • Return control to algorithm engineer to improve iteration efficiency;
  • The feature engineering with higher weight is assigned to feature pipeline to improve the efficiency of online reasoning.

New version of feature system V2

The only difference in architecture between feature system V2 and feature system V1 is that feature pipeline is divided into three parts: feature generation pipeline, feature source, and feature injection pipeline. It is worth noting that the pipeline was both implemented from Spark to Flink, in line with the evolution of the company’s data infrastructure. The overall architecture of feature system V2 is shown in the figure below:

1. Feature generation pipeline

The feature generation pipeline reads the original data source, processes it into features, and writes the features to the specified feature source (rather than the feature repository).

  • If a pipe uses a stream data source as its original data source, it is a stream feature generation pipe.
  • If a pipe uses a batch data source as its original data source, it is a batch feature generation pipe.

The logic of the feature generation pipeline is written solely by the algorithm engineer. The batch feature generation pipeline is written in HiveQL and scheduled by DolphinScheduler. The stream feature generation pipeline is implemented using PyFlink, as shown below:

Algorithm engineers need to follow the following steps:

  1. Flink SQL is used to declare Flink task source (source.sql) and define characteristic engineering logic (transform.sql).
  2. (Optional) Implement UDF implementations (UDF_def.py) that may be included in the feature engineering logic in Python;
  3. Generate executable PyFlink task scripts (run.py) using your own code generation tools.
  4. Debug PyFlink scripts locally using Docker environment prepared by the platform to ensure normal local operation;
  5. The code is submitted to a repository that manages the feature pipeline and is reviewed by the AI platform team. The approved scripts will be deployed to the real-time computing platform of companion fish to complete the online feature generation pipeline.

This process ensures that:

  • Algorithm engineers have the autonomy of on-line features;
  • The platform engineer controls the code quality of the feature generation pipeline and can refactor them if necessary without the intervention of the algorithm engineer.

2. Characteristics of the source

Feature sources store features that are processed from the original data source. It also serves as a bridge between algorithm engineers and AI platform engineers. The algorithm engineer is only responsible for implementing the logic of feature engineering, processing the raw data into features, writing them into feature sources, and leaving the rest to the AI platform. Platform engineers implement feature injection pipeline, write features into feature warehouse, and provide data access services externally in the form of feature services.

3. Feature injection pipeline

The feature injection pipeline reads features from the feature source and writes them to the feature repository. Due to the lack of native support for Redis sink in Flink community, we simply realized StreamRedisSink and BatchRedisSink by expanding RichSinkFunction [3], which well meet our needs.

BatchRedisSink achieves batch writing through a simple combination of Flink Operator State [4] and Redis Pipelining [5], with a lot of reference to BufferingSink in Flink documents. Significantly reduce the amount of requests to Redis Server and increase throughput, and the writing efficiency is improved by 7 times compared with line-by-line insertion [6]. A brief implementation of BatchRedisSink is as follows. Flush implements the core logic of batch writing to Redis, CheckpointedState/bufferedElements/snapshotState/initializeState implements the logic of using Flink stateful operator to manage element cache.

class BatchRedisSink( pipelineBatchSize: Int ) extends RichSinkFunction[(String, Timestamp, Map[String, String])] with CheckpointedFunction { @transient private var checkpointedState : ListState[(String, java.util.Map[String, String])] = _ private val bufferedElements : ListBuffer[(String, java.util.Map[String, String])] = ListBuffer.empty[(String, java.util.Map[String, String])] private var jedisPool: JedisPool = _ override def invoke( value: (String, Timestamp, Map[String, String]), context: SinkFunction.Context ): Unit = { import scala.collection.JavaConverters._ val (key, _, featureKVs) = value bufferedElements += (key -> featureKVs.asJava) if (bufferedElements.size == pipelineBatchSize) { flush() } } private def flush(): Unit = { var jedis: Jedis = null try { jedis = jedisPool.getResource val pipeline = jedis.pipelined() for ((key, hash) <- bufferedElements) { pipeline.hmset(key, hash) } pipeline.sync() } catch { ... } finally { ... } bufferedElements.clear() } override def snapshotState(context: FunctionSnapshotContext): Unit = { checkpointedState.clear() for (element <- bufferedElements) { checkpointedState.add(element) } } override def initializeState(context: FunctionInitializationContext): Unit = { val descriptor = new ListStateDescriptor[(String, java.util.Map[String, String])]( "buffered-elements", TypeInformation.of( new TypeHint[(String, java.util.Map[String, String])]() {} ) ) checkpointedState = context.getOperatorStateStore.getListState(descriptor) import scala.collection.JavaConverters._ if (context.isRestored) { for (element <- checkpointedState.get().asScala) { bufferedElements += element } } } override def open(parameters: Configuration): Unit = { try { jedisPool = new JedisPool(...) } catch { ... } } override def close(): Unit = { flush() if (jedisPool ! = null) { jedisPool.close() } } }Copy the code

The feature system V2 well satisfies the design purpose proposed by us.

  • Since the compilation of feature generation pipeline only needs to use SQL and Python, two tools that algorithm engineers are very familiar with, they are solely responsible for the compilation and launching of feature generation pipeline without relying on big data team, which greatly improves the iteration efficiency. After familiarity, algorithm engineers can usually write, debug, and put stream features online in less than half an hour. This process could have taken days, depending on the schedule of the big data team;
  • For the same reason, algorithmic engineers can perform more severe feature engineering when necessary, thus reducing model services and model burdens and improving the efficiency of model online reasoning.

Four,

Feature system V1 solves the problem of feature on-line, and feature system V2 solves the problem of feature on-line difficulty on this basis. During the evolution of feature system, we summarized several experiences as platform development:

  • Platforms should provide tools that users want to use. This is consistent with the internal promotion experience of Uber ML platform team [7]. Algorithmic engineers work best in Python and SQL environments and are not familiar with Java and Scala. So, if you want algorithm engineers to write feature pipes themselves, the platform should support algorithm engineers to write feature pipes in Python and SQL, rather than having algorithm engineers learn Java and Scala, or hand over the work to the big data team.
  • Platforms should provide easy-to-use native debugging tools. We provide a Docker environment that encapsulates Kafka and Flink, allowing users to quickly debug PyFlink scripts locally without waiting for pipes to be deployed to the test environment.
  • Platforms should encourage user autonomy while keeping a tight grip on quality through automated inspections or code reviews.

The original link

This article is the original content of Aliyun and shall not be reproduced without permission.