When your system is publishing billions of messages a day in real time from MySQL to Kafka, how do you manage the pattern information in that data? When your system is connected to hundreds of services, you are dealing with thousands of different patterns, and manual management is not feasible. There must be an automated solution to deal with schema changes from upstream data sources to all downstream consumers. Confluent’s Schema Registry and Kafka Connect are good choices, but they weren’t available when we started building the Yelp data pipeline. Hence Schematizer.
What is Schematizer?
An important part of Yelp’s data pipeline design is to stereotype all data, meaning that all data flowing through the data pipeline must conform to some predefined schema, rather than be formatted arbitrarily. Why emphasize this point? Because we want all data consumers to be able to anticipate the data format they are going to fetch, there will be no significant downstream impact if upstream data producers decide to change the data schema they publish. The uniform pattern representation also makes it easy to integrate systems that use different data formats.
Schematizer is a schema storage service for tracking and managing schemas used in all data pipelines, as well as providing automated documentation support. We use Apache Avro to express patterns. Avro has many of the features we need in data pipelines, especially schema evolution, which is one of the key factors in decoupling data producers and consumers. Each message that flows through the data pipeline is serialized with Avro schema. To reduce the size of the message, we do not put all the schema information in the message, but just the schema ID. Data consumers can use ids to retrieve schema information from Schematizer and deserialize messages at run time. Schematizer is the only reliable source of all predefined schema information.
Manage patterns differently
Schematizer organizes and manages schemas in two ways: from the perspective of a data producer and a data consumer.
The first approach groups schemas based on data generation information, with each group defined by namespace and data source. Producers must provide namespace and data source information when registering a schema with Schematizer. For example, a service that intends to publish database data to a data pipeline can use the service name as a namespace and the table name as a data source.
Group schemas by namespace and data source
The second method groups data by destination information. For example, a Redshift cluster or MySQL database is a data destination that corresponds to one or more data producers, and each data producer is associated with one or more schemas, which correspond to the namespace and data source defined in the first method.
Look at the picture and group patterns according to the individual data destination
These two methods allow us to retrieve and correlate patterns for different needs. For example, a program might want to know what topics it publishes data to, and another service might want to know where data in its Redshift cluster comes from.
The registration model
The data pipeline requires that all data published to it be modeled and serialized with a predefined Avro schema. Therefore, when a data producer is ready to publish data to a data pipeline, the first thing it does is register the schema with Schematizer, and the most common approach is to register an Avro schema directly.
For data producers that do not have or cannot create Avro schemas, schema converters can also be added to Schematizer to convert non-AvRO schemas to Avro schemas. One example is MySQLStreamer, which is a service that publishes data from a MySQL database to a data pipeline, knowing only the MySQL table schema. Schematizer can transform MySQL table schema definitions into the corresponding Avro schema. However, if the data producer changes the schema definition, it must be re-registered.
Will changes in upstream patterns affect downstream services?
A common pain point for all data pipeline services is how to deal with upstream schema changes. Often this requires a lot of communication and coordination between upstream producers and downstream consumers. Yelp is not immune. We also have batch tasks and systems that process data generated by other batch tasks and systems. Every upstream schema change is painful, it can cause downstream services to crash, and the whole process is very labor-intensive.
We solve this problem with schema compatibility. During schema registration, Schematizer determines the mapping between a Topic and a new schema based on schema compatibility. Only compatible patterns can continue with older topics. If an incompatible schema is registered, Schematizer registers a new Topic for the new schema with the same namespace and data source. How does Schematizer determine compatibility? The answer is Avro Resolution rules. Avro interpretation rules ensure that messages packaged in a new version of the schema can be unpackaged in an old version of the schema, and vice versa, within the same Topic.
View images incompatible patterns will assign different topics
Most of the data in Yelp’s data pipeline currently comes from MySQLStreamer. For example, if we want to add a field to a business table, MySQLStreamer registers the new schema with Schematizer. Since such changes are compatible according to Avro interpretation rules, Schematizer creates a new Avro schema and assigns it the namespace and the old Topic corresponding to the data source. Changing a field from int to vARCHar is an incompatible change, and Schematizer creates a new Topic for the new schema.
By ensuring schema compatibility within a Topic, downstream data consumers can safely use the old schema to process any data in the Topic without worrying about any problems such as crashes caused by data schema changes. They can also connect to new topics at the right time according to their own needs. This makes the whole system more automated, reducing human involvement when patterns change.
In addition to Avro interpreting rules, we also defined our own rules in Schematizer to support some data piping capabilities. The primary key field of the schema is used for log compression in the data pipeline. Since the primary key for logging compression must be the same for the same Topic, any changes to the primary key are considered incompatible, causing Schematizer to create a new Topic for the new schema. Moreover, when the non-pii (Personally Identifiable Information) pattern began to contain manually Identifiable fields, such a change was also considered incompatible. Manually unreadable data and manually readable data must be stored separately, which simplifies the secure implementation of manually readable data and prevents downstream consumers from accidentally reading data that they do not have permission to read.
Review images
The logical flow to determine whether a new Topic is needed
It is worth mentioning that the schema registration process is idempotent. If you register the same schema multiple times, only the first time will generate a new schema, and the rest will return the registered schema directly. This makes it very easy for applications and services to initialize their Avro schema. Many applications and services define Avro schemas in files or code, but they cannot write out schema ids because they are controlled by Schematizer. So an application can call the schema registration interface to register the schema directly, get the schema information back if it already exists, or register it if it doesn’t, killing two birds with one foot.
Streamline all pattern change event processing
In order for the data pipeline to handle schema change events in a fully pipelined manner, Schematizer generates schema migration plans for downstream systems based on information about the current schema and the new schema. Currently Schematizer can only generate schema migration plans for Redshift tables. For downstream systems that apply data from the data pipeline to the Redshift cluster, the schema migration plan can be directly retrieved and executed when the schema changes, and the new schema information can be automatically retrieved without any human intervention. This capability is easily extended, and schema migration plan generators are easily replaced, so in the future we will add more schema migration plan generators to support more schema types, or switch to better algorithms to generate migration plans.
Schematizer knows all the data producers and consumers
In addition to managing registered schemas, Schematizer keeps track of all data producers and consumers, including which teams and services are responsible for producing or consuming what data, how often data is published, and so on. We can use this information to effectively find and negotiate with teams when human intervention is needed. This information also helps us to monitor and identify outdated patterns and topics so that they can be deprecated or removed. This simplifies compatibility verification when a new schema is registered. Schematizer can skip over obsolete patterns and only check the compatibility of the new pattern with the remaining valid patterns within a Topic.
All data producers and consumers must provide this information at startup. Initially we just wanted to keep it in Schematizer, but it turned out that this information was very useful for exploratory analysis and early warning, and we decided to write it to a separate Kafka Topic outside of the data pipeline. The data can then be processed by Redshift and Splunk, as well as imported into Schematizer and displayed through a front-end Web interface. We used Yelp’s own asynchronous, non-blocking Kafka producer that writes data through CLOGs so that it doesn’t interfere with the producer’s ability to publish data. In addition, this avoids circular dependencies, which can occur when a normal producer registers multiple times with the same information.
Which Kafka Topic should I use? Schematizer takes care of these details
Unlike regular Kafka producers, data producers in a data pipeline do not need to know in advance which Kafka Topic they should send data to. Because Schematizer specifies the mapping between registered schemas and topics, data producers can get the right Topic information from Schematizer and publish the data simply by providing the schema information they use to serialize their data. Abstracting the Topic information away makes the interface easier to use.
A similar mechanism applies to data consumers. While it is possible to assign them specific topics to consume, a more common use case is for Schematizer to provide the right Topic based on information about the groups of interest to the data consumer. The various grouping mechanisms were introduced in the previous sections of this article. Data consumption can either specify a namespace and data source, or specify a data destination, and Schematizer will find the corresponding Topic within that group. This mechanism is particularly effective for scenarios in which a set of topics of interest to data consumers may change due to incompatible schema changes. It frees the data consumer from having to track every Topic in the group.
The schema is good, the documentation is better!
Schemas format the data, but may not provide enough information for those who want to know exactly what the data means. We notice that the people who are using the data are often not the ones who are producing the data, so they don’t know where to find useful information to make sense of the data they are using. Because Schematizer is responsible for managing all the schemas in the data pipeline, it is appropriate to store the description of the data there as well.
Watson, the knowledge miner, makes a grand entrance
Schematizer requires the schema’s registrar to provide documentation along with the schema, and Schematizer extracts the document information and stores it. To make patterns and data documents available to teams within Yelp, we developed Watson, a Webapp that employees across the company can use to mine data content. Watson is actually a visual front end to Schematizer, and it gets data from Schematizer through several RESTful apis.
Watson provides valuable information about the state of the data pipeline: existing namespaces, data sources, and associated Avro schema information. Most importantly, Watson provides an easy way to view all data sources and schema information managed by Schematizer.
Documentation doesn’t just fall out of the sky
Most of the data that flows through our data pipeline currently comes from databases. We use the SQLAlchemy model to document the data sources and schemas for this data. At Yelp, SQLAlchemy describes all the models in our database. In addition to docString, SQLAlchemy also allows users to add additional information to the fields of the model. Therefore, it is a natural choice for us to document the purpose and meaning of individual data models and fields.
SQLAlchemy also introduced an owner field to keep track of the maintainers and experts for each model. We think the person who generated the data is the best person to provide the documentation. In addition, this approach encourages people to keep their real data models and descriptions in sync at all times.
class BizModel(Base):
__yelp_owner__ = Ownership(
teams=[TEAM_OWNERS['biz_team'],
members=[],
contacts=[]
)
__table_name__ = 'my_biz_table'
__doc__ = 'Business information.'
id = Column(Integer, primary_key=True, doc=r"""ID of the business.""")
name = Column(String(64), doc=r"""Name of the business.""")Copy the code
A simple SQLAlchemy model containing documents and owner information
However, developers do not always remember to provide documentation when working on the SQLAlchemy model. To prevent this from happening, we developed auto-validation to enforce that all models must provide complete property descriptions and documentation, which is a hard-and-fast standard. Whenever a new model is added, the validation will fail if the required documentation information is incomplete or there is no owner information. These auto-validation features help us take a big step toward our goal of 100% document coverage.
Extract high-quality documents for Watson
Once the data model is documented, we can import it into Schematizer and eventually expose it through Watson. Before diving into the Specific extraction process, let’s take a look at another important module in the process: The Application Specific Transformer, or AST. As its name implies, the AST inputs message flows from one or more data pipe topics, processes message patterns and packets using transformation logic, and outputs the transformed messages to other data pipe topics. Transformation modules that provide specific transformation processing can be chained together, so multiple modules can be combined to do very detailed transformation work.
We use the many transformation modules in the AST to generate more understandable data based on the SQLAlchemy model. Because modules can be concatenated, for now we simply create a transformation module that extracts document and owner information from the SQLAlchemy model and adds it to the existing transformation chain. In this way, the documentation and owner information for all models is automatically extracted and imported into Schematizer through the existing pipeline. The implementation process is fairly simple and seamless into the pipeline, so high-quality documentation can be generated very efficiently.
View the conversion module in picture AST
As mentioned above, the AST now has transformation modules that generate more meaningful information for the user. The bit-flag conversion module interprets the different bits of an integer field. Similarly, the Enum field conversion module converts Enum values into a readable literal representation. Another benefit of these transformation modules is that they also result in self-interpreting and self-generating documentation patterns, and therefore better documentation.
Collaboration, contribution and retrieval
Developer documentation is not the last thing we’ll cover. Watson also provides features that allow end users to work together to help make Yelp’s data more readable.
The first function is tagging. Watson allows users to label and categorize arbitrary data sources. A data source may be a MySQL database table or a data model. For example, a Business data source can be labeled “Business Information,” while a User Information source can be labeled “User Information.” End users can label all related data sources the same, organizing them in a way that makes the most sense to them. Tagging gives us a deeper understanding of how our data sources are related to each other.
View the picture for the Business data source labeled “Business Info”
Another feature Watson provides is the ability to add annotations. End users, especially non-technical people, can use this method to provide their own documentation for a data source or field. Business analysts, for example, often have valuable insights into using data, and they can use comments to share miscellaneous, boundary-bound use cases, and time-sensitive information.
The biggest need for Watson by end users is retrieval. We implemented a simple search engine in Watson that allows users to retrieve data from various aspects such as schema, Topic, and data model description. We chose the Whoosh Python package instead of Elasticsearch in the search background because it helps us to get the development done quickly. Whoosh’s performance is adequate for our current volume of searches. As the data scale increases, we will consider switching to other, more scalable engines in the future.
conclusion
Schematizer is an important part of Yelp’s data pipeline. Its schema registration operations underlie many of the important functions of the data pipeline, including mitigating the impact on downstream consumer programs and services when upstream data producers change schemas. Schematizer also manages the Topic assignment of data releases, freeing users from such details as which Topic to use. Finally, it requires that all data written to the data pipeline be documented, which promotes knowledge sharing across the company. Watson makes it easy for all Yelp employees to get the most timely information.
We’ve taken a close look at Schematizer and its front-end documentation system, Watson. Next, we’ll take a closer look at the stream processor: Paastorm. Please stay tuned!
This is the third article in a series on Yelp’s real-time streaming data infrastructure. This series will take a deep look at how we stream changes to our MySQL database in real time in a “sure only once” way, how we automatically track table schema changes, how we process and transform streams, and ultimately how we store this data in a data warehouse like Redshift or Salesforce. This article is about Schematizer, Yelp’s schema storage service.
More Than Just a Schema Store More Than Just a Schema Store
Read more:
Yelp Real-time Streaming Technology Part 1: Billions of Messages a Day: Yelp’s Real-time Data Pipeline
Yelp Real-time Streaming Technology Part 2: Using MySQLStreamer to send database changes to Kafka
Review images
Tmall Interactive Creative Competition, interactive technology form is not limited, professional review panel comments, generous bonus temptation, winning teams can obtain complete business plan support, become long-term partners of Tmall. Click “Read the original” to sign up!
Big data chatter
ID: BigdataTina2016
View pictures ▲ Long press the QR code to identify attention
Focusing on big data and machine learning,
Share cutting-edge technology, exchange deep thinking.
Welcome to the community!