I. Design concept
1. Space for time
1) Multi-level cache, static
Client page Cache (HTTP header contains Expires/Cache of Control, last Modified (304, server does not return body, client can continue to Cache, reduce traffic), ETag)
Reverse proxy cache
Application side cache (memcache)
In-memory database
Buffer, cache mechanism (database, middleware, etc.)
2) index
Hash, B tree, inversion, bitmap
Hash index is suitable for comprehensive array addressing and linked list insertion and can realize fast data access.
B-tree indexes are suitable for query-oriented scenarios, avoiding multiple I/OS and improving query efficiency.
Inverted index is the best way to realize the word to document mapping and the most effective index structure, which is widely used in the search field.
Bitmap is a very simple and fast data structure, which can optimize both storage space and speed (without space for time), suitable for computing scenarios with large amounts of data.
2. Parallel and distributed computing
1) Task segmentation, divide and conquer (MR)
In the large-scale data, the data has certain characteristics of locality, the principle of locality is used to divide and conquer the problem of massive data calculation.
The MR model is a shared-nothing architecture in which data sets are distributed to each node. During processing, each node reads the map of the local storage nearby, combines, shuffles and sort the processed data, and then distributes the processed data to reduce nodes. This avoids mass data transmission and improves processing efficiency.
2) Multi-process, multi-thread parallel execution (MPP)
Parallel Computing refers to the process of using multiple Computing resources simultaneously to solve Computing problems. It is an effective means to improve the Computing speed and processing capability of a computer system. Its basic idea is to use multiple processors/processes/threads to solve the same problem cooperatively, that is, the problem to be solved is decomposed into several parts, each part by an independent processor to calculate in parallel.
The difference with MR is that it is based on problem decomposition rather than data decomposition.
3. Multi-dimensional availability
1) Load balancing, Dr, and backup
As the platform concurrency increases, nodes need to be added to cluster, and load balancing devices are used to distribute requests. Load balancing devices usually provide failure detection as well as load balancing. In addition, to improve availability, a Dr Backup is required to prevent unavailability caused by node failures. Backup includes online backup and offline backup. You can select different backup policies based on failure requirements.
2) Read-write separation
Read/write separation is for database. With the increase of system concurrency, an important means to improve data access availability is to separate write data from read data. Of course, while reading and writing separation, we need to pay attention to data consistency; For consistency, in distributed system CAP quantification, more attention is paid to availability.
3) Dependency
The relationship between each module in the platform should be low coupling as far as possible. They can interact with each other through relevant message components, asynchronously if possible. The main process and secondary process of data flow should be clearly distinguished, and the primary and secondary processes should be asynchronous.
Of course, in asynchronous processing, an acknowledgement mechanism (CONFIRM, ACK) is often required to ensure that data is received or processed.
However, there are scenarios where the request has been processed but the acknowledgement message is not returned due to other reasons (such as network instability). In this case, the request needs to be retransmitted. The processing design of the request needs to consider idempotency due to the retransmission factor.
4) monitor
Monitoring is also an important means to improve the availability of the entire platform, multi-platform monitoring of multiple dimensions; Modules are transparent at run time to achieve white-box runtime.
4. The telescopic
1) split
You can split services and databases.
System resources are always limited. If a long business execution is performed in a row, such blocking mode cannot effectively release resources to other processes in time under a large number of concurrent operations, thus the system throughput is not high.
The service needs to be logically segmented to improve the throughput of the system in an asynchronous and non-blocking way.
As the amount of data and concurrency increases, read/write separation cannot meet the requirements of system concurrency performance, and data needs to be shred, including database and table. In this way, the routing logic of data needs to be supported.
2) stateless
For system scalability, modules are ideally stateless, and overall throughput can be improved by adding nodes.
5. Optimize resource utilization
1) Limited system capacity
The capacity of the system is limited, and the amount of concurrency it can withstand is also limited. In the architecture design, flow control must be considered to prevent the system from crashing due to accidental attacks or instantaneous concurrent impact. Add flow control measures in the design, consider queuing requests, exceeding the expected range, can alarm or discard.
2) Atomic operation and concurrency control
For the access of shared resources, concurrency control is needed to prevent conflicts. At the same time, some transactions need to be transactional to ensure the consistency of transactions. Therefore, atomic operation and concurrency control should be considered in the design of transaction system.
Some commonly used high-performance methods to ensure concurrency control include optimistic locking, Latch, MUtex, copy-on-write, CAS, etc. Multi-version concurrency control MVCC is often an important means of ensuring consistency, which is often used in database design.
3) Adopt different strategies based on different logic
There are different types of business logic in the platform, including complex computing and CONSUMING IO. Meanwhile, for the same type, different business logic consumes different resources, which requires different strategies for different logics.
For THE IO type, the event-driven asynchronous non-blocking mode can be adopted. The single-thread mode can reduce the overhead caused by thread switching, or the spin spin mode can be adopted in the case of multi-thread to reduce the thread switching (such as the Oracle Latch design). For computational, take full advantage of multithreading.
In the same invocation mode, resources are allocated to different services and the number of compute nodes or threads is set to different levels. Services with higher priorities are distributed.
4) Fault tolerance and isolation
When some service modules of the system have errors, in order to reduce the impact of concurrent processing on normal requests, it is sometimes necessary to consider processing these abnormal requests through separate channels, or even temporarily forbid these abnormal service modules automatically.
Some request failures may be occasional, temporary failures (such as network instability) that require request retry considerations.
5) Resource release
System resources are limited. When using resources, release them at the end of the request, no matter whether the request takes a normal path or an abnormal path. In this way, resources can be reclaimed in time for other requests.
When designing the architecture of communication, it is often necessary to consider timeout control.
Static architecture blueprint
The whole architecture is a hierarchical distributed architecture, including CDN, load balancing/reverse proxy, Web application, business layer, basic service layer and data storage layer vertically. The horizontal direction includes configuration management deployment and monitoring of the entire platform.
Three, analysis structure
1. CDN
The CDN system can redirect users’ requests to the nearest service node in real time according to the comprehensive information such as network traffic, connection of each node, load status, distance to users and response time. Its purpose is to enable users to obtain the content needed nearby, solve the situation of crowded Internet network, improve the response speed of users to visit the website.
Large-scale e-commerce platforms generally need to establish CDN for network acceleration. Large platforms such as Taobao and JINGdong all adopt self-established CDN. Small and medium-sized enterprises can use third-party CDN vendors to cooperate, such as Lanxun, Wangzhu and Kuaiwang.
Of course, when choosing CDN vendors, it is necessary to consider the length of operation time, whether there are expandable bandwidth resources, flexible traffic and bandwidth selection, stable nodes and cost-effective.
2. Load balancing and reverse proxy
A large platform includes many service domains, and different service domains have different clusters. DNS can be used for domain name resolution distribution or polling. DNS is simple to implement, but lacks flexibility due to cache. Generally based on commercial hardware F5, NetScaler or open source soft load LVS distribution in layer 4, of course, will take redundancy (such as LVS + Keepalived) consideration, adopt the master/slave mode.
After layer 4 is distributed to the service cluster, web servers such as Nginx or HAProxy perform load balancing or reverse proxy on Layer 7 to distribute the data to application nodes in the cluster.
To choose a load, you need to consider various factors (whether it meets the requirements of high concurrency and performance, how to solve Session persistence, load balancing algorithm, support for compression, memory consumption of cache). The following describes several commonly used load balancing software.
LVS, a Linux load balancer with high performance, high concurrency, scalability and reliability, works at Layer 4. It supports multiple forwarding modes (NAT, DR and IP Tunneling), among which DR mode supports load balancing through wan. Supports hot standby (Keepalived or Heartbeat). It is highly dependent on the network environment.
Nginx works at tier 7, an event-driven, asynchronous, non-blocking architecture that supports multi-process, high-concurrency load balancer/reverse agent software. HTTP can be shunted by domain names, directory structures, and regular rules. Internal server faults are detected through the port, such as status code returned by the server to process the web page, timeout, etc., and the returned error request will be re-submitted to another node, but the disadvantage is that the URL is not supported to detect. The sticky session can be implemented based on the IP hash algorithm. The cookie-based extension nginx-sticky-module supports sticky session.
HAProxy supports layer 4 and layer 7 load balancing, session persistence, and cookie guidance. Supports backend URL detection. Load balancing algorithms are rich, such as RR and weight.
For images, there needs to be a separate domain name, an independent or distributed image server or, for example, mogileFS. Varnish can be added to the image server for image caching.
3. The App access
The application layer runs in jBoss or Tomcat containers and represents independent systems, such as front-end shopping, user-owned services, back-end systems, etc
Protocol interface, HTTP, JSON
Servlet 3.0 can be used to asynchronize servlets and improve the throughput of the entire system
HTTP requests go through Nginx and are distributed to a node of the App through load balancing algorithm, which is relatively easy to expand.
In addition to saving a small amount of user information with cookies (cookies generally cannot exceed 4K size), the App access layer stores user-related session data. However, some reverse proxies or load balancers do not support sticky sessions or have high access availability requirements (app access nodes break down and sessions are lost). Therefore, centralized session storage needs to be considered to make app access layer stateless. At the same time, when the system users become more, more application nodes can be added to achieve the purpose of horizontal expansion.
The centralized Session storage must meet the following requirements:
A. Efficient communication protocol
B. Distributed cache of session, supporting node scaling, data redundancy and backup and data migration
C. Session expiration management
Business services
Services provided on behalf of businesses in a certain field. For e-commerce, the field includes users, goods, orders, red envelopes, payment services, etc. Different fields provide different services.
These different fields constitute a module, good module division and interface design is very important, generally refer to the principle of high cohesion, interface convergence,
This improves overall system availability. Modules can of course be deployed together depending on the size of the application, but for large-scale applications, they are usually deployed independently.
High concurrency:
The external protocols of the business layer are exposed in the FORM of NIO RPC, and mature NIO communication frameworks such as NetTY and MINA can be adopted
Availability:
In order to improve the availability of module services, a module is deployed in multiple nodes for redundancy, and automatically carries out load forwarding and failover.
VIP+ Heartbeat can be used initially, currently the system has a separate component HA, implemented using ZooKeeper (advantages over the original solution)
Consistency, transaction:
For the consistency of distributed system, try to meet the availability, consistency can be proofread to achieve the final consistency state.
5. Basic service middleware
1) Communication components
Communication components are used to call services within the business system. In the large concurrency e-commerce platform, the requirements of high concurrency and high throughput need to be met.
The whole communication component includes two parts: client and server.
The client defines a connection pool for each server. After the connection is initialized, the client can concurrently connect to the server for RPC operations. The long connection in the connection pool requires heartbeat maintenance and the request timeout period is set.
The maintenance process for long connections can be divided into two phases: one is the sending of requests and the other is the receiving of responses. The connection flag is invalidated if an IOException occurs during the sending of the request. Upon receiving the response, the server returns a SocketTimeoutException or, if timeout is set, an exception that clears the current connection of timeout requests. Otherwise, continue to send heartbeat packets (ping is sent after pingInterval because packets may be lost). If ping cannot be pinged (IOException is sent), the current connection is faulty and the current connection is marked as invalid. If the ping succeeds, the current connection is reliable. Proceed with the read operation. Failed connections are purged from the connection pool.
Each connection is run on a separate thread for receiving the response. Clients can make RPC calls either synchronously (wait,notify) or asynchronously.
Serialization adopts a more efficient hession serialization method.
The server uses the event-driven NIO MINA framework to support requests with high concurrency and throughput.
2) routing the Router
In most database sharding solutions, in order to improve database throughput, the first is to vertically shard different tables into different databases. Then, when a table in the database exceeds a certain size, the table needs to be shard horizontally. The same is true here. To access the database client, locate the data to be accessed according to the user ID. Data segmentation algorithm performs hash operation according to user ID and consistency hash. This method has the problem of invalid data migration. During the migration, services are unavailable and the routing table is maintained. Each biz client needs to maintain all sharding connection pools. The downside of this is the problem of full connection; One solution is for Sharding to be carried out at the business service layer, where each business node maintains only one shard connection.
See figure (Router)
The implementation of the routing component looks like this (availability, high performance, high concurrency)
Based on performance considerations, mongodb is used to maintain the relationship between user ID and SHard. In order to ensure availability, REPLICatset cluster is built.
Biz sharding and database sharding are one-to-one, accessing only one database Sharding.
Biz Registered the service node to the ZooKeeper /bizs/shard/ directory.
The router monitors the status of the nodes on/under ZooKeeper and caches online biz on the Router.
When the client requests the router to obtain biz, the Router first obtains the shard corresponding to the user from mongodb. The Router obtains biz nodes based on the cache content using the RR algorithm.
To address router availability and concurrent throughput, routers are redundant. At the same time, the client monitors the/Routers of ZooKeeper and caches the online router node list.
3) HA
The traditional way to implement HA is to use virtual IP drift, combined with Heartbeat, Keepalived, etc.
Keepalived uses VRRP to forward data packets and provides layer 4 load balancing. It detects VRRP data packets to switch between Keepalived and LVS for redundant hot backup. Linux Heartbeat is highly available based on network or host services. HAProxy or Nginx can forward data packets based on layer 7. Therefore, Heatbeat is more suitable for HAProxy, Nginx and other services.
In a distributed cluster, ZooKeeper can be used for distributed coordination to implement list maintenance and failure notification. Clients can choose hash algorithm or Roudrobin to implement load balancing. The zooKeeper distributed lock mechanism supports master-master and master-slave modes.
4) Message Message
For asynchronous interactions between platforms’ systems, MQ components are used.
When designing message service components, you need to consider message consistency, persistence, availability, and a sound monitoring system.
There are two main types of open source messaging middleware in the industry: RabbitMQ and Kafka.
RabbitMQ, which follows the AMQP protocol, is developed by the inherently high concurrency erlanng language. Kafka is an open-source messaging and subscription system that Linkedin launched in December 2010. Kafka is designed to handle active streaming data, large amounts of data processing.
The occasions with high requirement for message consistency need acknowledgement mechanism, including the process of producing message and consuming message. However, the missing response caused by network and other principles may lead to message repetition, which can be judged and filtered at the business level according to idempotency. RabbitMQ takes this approach. Another mechanism, such as Kafka Distributed messaging middleware, is for the consumer to pull messages from the broker with an LSN number and to pull messages in batches from an LSN point in the broker without the need for an acknowledgement mechanism.
The storage of messages in the broker can be in memory and persisted to storage, depending on the reliability requirements of the message and the combined measures of performance.
To meet the requirements of availability and high throughput, both cluster and active/standby modes can be applied in actual scenarios. The RabbitMQ solution has a common cluster and a more available mirror queue. Kafka uses ZooKeeper to manage brokers and consumers in a cluster. You can register topics on ZooKeeper. Through the coordination mechanism of ZooKeeper, producer saves broker information of corresponding topics, which can be sent to the broker randomly or in polling. In addition, producer can specify shards based on semantics, and messages are sent to a shard of the broker.
In general, RabbitMQ is used for real-time, reliable messaging. Kafka is mainly used to process active streaming data, large data volume data processing.
5) Cache&Buffer
The Cache system
In some high concurrency and high performance scenarios, using the cache can reduce the load on the back-end system and bear the burden of most of the reads, which can greatly improve the throughput of the system, such as increasing the cache cache often before database storage.
However, the introduction of the cache architecture inevitably brings some problems, such as cache hit ratio, jitter caused by cache failures, and cache and storage consistency.
Compared with storage, the data in Cache is limited after all. The more ideal situation is to store the hot data of the system. Here, some common algorithms such as LRU can be used to eliminate the old data. As the system scale increases, a single node cache cannot meet the requirements. Therefore, a distributed cache needs to be built. To solve the jitter caused by a single node failure, distributed cache generally adopts a consistent hash solution to greatly reduce the jitter range caused by a single node failure. For scenarios with high availability requirements, each node needs to be backed up. Data has the same backup in the cache and storage, so consistency is inevitable. If consistency is strong, update the database cache at the same time as updating the database. If the consistency requirement is not high, you can set the cache expiration time policy.
Memcached as a high-speed distributed cache server, the protocol is relatively simple, based on libevent event processing mechanism.
The Cache system is used in the client of the Router system. Hotspot data is cached on the client. When the data access fails, the client accesses the Router system.
Of course, currently more use of in-memory database cache, such as Redis, mongodb; Redis has richer apis for data manipulation than Memcache; Redis and mongodb both persist data, while memcache does not, so memcache is more suitable for caching data on relational databases.
Buffer system
In high-speed write operations, some data needs to be written into the database, and the data is divided into databases and tables, but the reliability of the data is not so high. To reduce the write pressure on the database, batch write operations can be adopted.
Open up a memory area, when the data reaches a certain threshold of the area, such as 80%, do the sorting in memory (memory speed is relatively fast), and then batch flush.
6) search
Search is a very important function in e-commerce platform, which mainly has search term category navigation, automatic prompt and search sorting functions.
Open source enterprise-level search engines mainly include Lucene and Sphinx. Here, we do not discuss which search engine is better. However, in addition to the basic function needs to be supported, non-functional aspects need to consider the following two points:
A. Whether search engines support distributed indexing and search to cope with massive data, support read/write separation and improve availability
B. Real-time performance of indexes
C, performance,
Solr is a high-performance full-text search server based on Lucene, which provides a richer query language than Lucene. Solr can be configured and extended, and provides an XML/JSON interface based on HTTP.
From Solr4 version, SolrCloud mode is provided to support distributed index and sharding data segmentation automatically. Search performance is improved through the master-slave(leader, Replica) mode of each Sharding; Zookeeper is used to manage clusters, including leader election, to ensure cluster availability.
The Lucene index Reader is based on the snapshot of the index, so you must open a new snapshot after the index commit to search for the newly added content. Index commit is very performance intensive, and achieving real-time index searches is inefficient.
Solr4’s previous solution for index search in real time was a combination of file full indexes and memory incremental indexes, as shown in the figure below.
Solr4 provides a solution for NRT SoftCommit. Softcommit can search for the latest changes to the index without submitting the index. However, the changes to the index are not sync commit to the hard disk. Data that is not committed will be lost. Therefore, you need to commit data periodically.
Indexing and storing data in the platform is asynchronous, which can greatly improve availability and throughput; Only indexes certain attribute fields to store data identification keys and reduce the index size. Data is stored in HBase, which is a distributed storage. HBase does not support secondary index search. However, it can perform multi-dimensional search statistics with Solr search function.
To ensure consistency between index data and HBase data stores, ensure that HBase data is indexed. You can use the confirm mechanism to create a data queue to be indexed before the index and delete data from the data queue to be indexed after the data store is indexed.
7) Log collection
During the whole transaction process, a large number of logs are generated. These logs need to be collected and stored in a distributed storage system for centralized query and analysis.
A log system should have three basic components: Agent (encapsulates the data source and sends the data in the data source to the Collector), Collector (receives the data from multiple agents, summarizes the data and imports it to the back-end Store), and Store (a central storage system with scalability and reliability. Support for HDFS, which is currently very popular.
Cloudera’s Flume and Facebook’s Scribe are two of the most widely used open source log collection systems in the industry. The current version of Flume, FlumeNG, has major architectural changes.
In the design or technical selection of a log collection system, the following characteristics are usually required:
A. Bridge between application systems and analysis systems and decouple their relationship
B. Distributed scalability with high scalability. When the amount of data increases, it can be horizontally expanded by adding nodes
Log collection systems are scalable at all levels of the system, data processing does not require state, and scalability is easy to achieve.
C. Near real-time
In some scenarios that require high timeliness, you need to collect logs and analyze data in a timely manner.
General log files are regularly or quantitatively rolled, so real-time detection of log file generation, timely tail operations on log files, and support batch sending to improve transmission efficiency; The batch sending time must meet the requirements of the number and interval of messages.
D. Fault tolerance
Scribe’s consideration in terms of fault tolerance is that when the back-end storage system crashes, Scribe writes data to local disks, and when the storage system recovers, Scribe reloads the logs to the storage system.
FlumeNG implements load balancing and failover through the Sink Processor. Multiple sinks can form a Sink Group. A Sink Processor is responsible for activating a Sink from a specified Sink Group. The Sink Processor implements load balancing through all sinks in a group. It can also be transferred to another Sink when it fails.
E. Transaction support
Scribe does not consider transaction support.
Flume supports transactions through the acknowledgement mechanism, as shown in the figure below.
Generally, messages are extracted and sent in batches. Message confirmation is the confirmation of a batch of data, which can greatly improve the efficiency of data transmission.
F. Recoverability
According to different reliability requirements, FlumeNG channel can be based on memory and file persistence mechanism. The sales volume of data transmission based on memory is relatively high, but the data is lost and cannot be recovered after the node goes down. Persistent file outages can be recovered.
G. Regular and quantitative archiving of data
After the data is collected by the log collection system, it is generally stored in a distributed file system such as Hadoop. In order to facilitate the subsequent processing and analysis of the data, TimeTrigger or SizeTrigger files of rolling distributed system are required.
8) Data synchronization
In the transaction system, it is usually necessary to synchronize heterogeneous data sources, including data files to relational database, data files to distributed database, and relational database to distributed database. Data synchronization between heterogeneous sources is based on performance and service requirements. Data storage in local files is based on performance. Files are stored sequentially and the efficiency is relatively high. Data synchronization to relational data is generally based on query requirements; The distributed database is storing more and more massive data, and the relational database can not meet the large amount of data storage and query requests.
Throughput, fault tolerance, reliability and consistency should be considered in the design of data synchronization
Synchronization is divided into real-time incremental data synchronization and offline full data synchronization. The following two dimensions are introduced.
Real-time increments are usually Tail files that track file changes in real time and are exported to the database in batches or in multiple threads, similar to a log collection framework. This approach requires a confirmation mechanism, including two aspects.
One aspect is that a Channel needs to confirm to the agent that it has received data records in batches and send the LSN number to the agent. In this way, when the Agent recovers from failure, it can start tail from the LSN point. Of course, the problem that allows a small number of duplicate records (when a channel confirms to an agent, the agent is down and does not receive the confirmation message) needs to be judged in business scenarios.
Another aspect is that Sync confirms to the channel that it has written to the database in bulk, so that the channel can delete the confirmed messages.
Based on the requirement of reliability, channel can adopt file persistence.
See below
Offline full data synchronization follows the principle of time exchange and divide-and-rule between Spaces to shorten the data synchronization time and improve the synchronization efficiency.
Source data, such as mysql, needs to be sliced, and multiple threads concurrently read the source data, while multiple threads concurrently write in batches to distributed databases, such as HBase. Channel is used as buffer between read and write to achieve better decoupling. Channel can be based on file storage or memory. See below:
For source data sharding, if it is a file, you can set the block size according to the file name.
For relational databases, due to the general demand is only offline data synchronization for a period of time (such as to the order of the day data synchronization in HBase) in the morning, so I need at the time of data segmentation (according to the number of rows segmentation), will be a multithreaded scanning the entire table (indexed in time, also want to back to the table), to contain large amounts of data in the table, IO is very high, the efficiency is very low; The solution here is to partition the database according to the time field (according to the time synchronization) and export it according to the partition each time.
9) Data analysis
From traditional parallel processing based on relational database cluster, is used for memory, calculation of the near real-time to its current based on hadoop massive amounts of data analysis, data analysis is widely used in large e-commerce sites, including traffic statistics, recommendation engines, trend analysis, user behavior analysis, index of classifier, distributed data mining and so on.
Parallel processing clusters include the commercial EMC Greenplum, whose architecture uses MPP(Massively parallel processing), a postgresQl-based distributed database for large data storage.
In terms of memory calculation, SAP HANA and mongodb, the open source noSQL in-memory database, also support MapReduce for data analysis.
At present, Hadoop is widely used by Internet companies. With irreplaceable advantages in scalability, robustness, computing performance and cost, Hadoop has become the mainstream big data analysis platform for Internet enterprises
Hadoop uses MapReuce’s distributed processing framework to process large-scale data and scales well. However, the biggest disadvantage of MapReduce is that it cannot meet real-time scenarios and is mainly used for offline analysis.
Data analysis based on MapRduce model programming is inefficient in development. The appearance of Hive on Hadoop makes data analysis similar to the way of writing SQL. After SQL syntax analysis and execution plan generation, MapReduce task is finally generated for execution, which greatly improves the efficiency of development. Do the analysis in ad-hoc(computation as query occurs) fashion.
The analysis of distributed data based on MapReduce model is all offline analysis, and the execution is all violent scanning, so the mechanism like index cannot be used. Cloudera Impala is based on MPP’s parallel programming model and is underpinned by a high-performance real-time analysis platform for Hadoop storage that significantly reduces latency for data analysis.
The current version of Hadoop is Hadoop1.0. On the one hand, the original MapReduce framework has the problem of JobTracker single point. On the other hand, JobTracker does resource management and task scheduling. There are obvious defect bottlenecks in scalability, memory consumption, threading model, reliability, and performance; Hadoop2.0 yarn reconstructs the entire framework, separating resource management from task scheduling, and solving this problem in architecture design.
See Yarn architecture
10) Real-time computing
In the field of Internet, real-time computing is widely used in real-time monitoring and analysis, flow control, risk control and other fields. A large number of daily logs and abnormal information generated by e-commerce platform systems or applications need to be filtered and analyzed in real time to determine whether early warning is needed.
At the same time, self-protection mechanisms should be implemented for the system, such as flow control for modules, to prevent system breakdown caused by unexpected pressure on the system. When the flow is too large, you can use the rejection or drainage mechanism. Some businesses need risk control, for example, some lottery businesses need to limit and release numbers according to the real-time sales of the system.
The original single-node computing, with the explosion of the system information and the increase of the complexity of computing, single node computing can not meet the requirements of real-time computing, need to carry out multi-node distributed computing, distributed real-time computing platform emerged.
The real-time computing mentioned here is actually streaming computing, the concept predecessor is actually CEP complex event processing, related open source products such as Esper, the distributed streaming computing products of the industry Yahoo S4,Twitter Storm, etc. Storm is the most widely used open source product.
For the real-time computing platform, the following factors should be considered in the architectural design:
1. Scalability
As the service volume increases, the amount of computation increases. You can add nodes for processing.
2, high performance, low delay
High performance and low latency are required to ensure fast message processing and real-time computing from data flowing into computing platform data to computing output results.
3. Reliability
Ensure that each data message is processed completely once.
4. Fault tolerance
The system automatically manages node failures and is transparent to applications.
Storm of Twitter does a good job in the above aspects. Here is a brief introduction of Storm architecture.
The entire cluster is managed through ZooKeeper.
The client submits the topology to Nimbus.
Nimbus creates a local directory for this topology, calculates and assigns tasks based on the topology configuration, and creates a Assignments node on ZooKeeper to store the mapping between Tasks and Supervisor machine nodes.
Create a TaskBeats node on ZooKeeper to monitor the heartbeat of tasks. Start the topology.
The Supervisor obtains the assigned tasks from ZooKeeper and starts multiple Wokers. Each WOKer generates a task, one thread for each task. Initialize the connection between tasks based on the topology information. Tasks are managed by zeroMQ. Then the whole topology is up and running.
A Tuple is the basic processing unit of a flow, that is, a message. The Tuple is sent and received in a task as follows:
By sending a Tuple, the Worker provides a transfer function for the current task to send the Tuple to other tasks. Serialize the tuple data to the transfer Queue with the taskid and tuple parameters.
Before version 0.8, this queue was LinkedBlockingQueue, and after 0.8, DisruptorQueue.
After version 0.8, each woker is bound to an Inbound Transfer queue, which receives messages, and an Outbond queue, which sends messages.
When sending a message, a single thread pulls data from the TransferQueue and sends the tuple to another Woker via zeroMQ.
Receive a Tuple. Each WOKer listens on the TCP port of zeroMQ to receive a message, fetch a Message (taskid, Tuple) from the DisruptorQueue, and route the message to a Task for execution based on the value of taskid and Tuple. Each tuple can be emitted to a direct steam or regular stream. The Stream Group (Stream ID – > Component ID – > Outbond Tasks) function completes the destination of the tuple to be sent by the current tuple.
From the above analysis, it can be seen that Storm’s scalability, fault tolerance and high performance are supported from the perspective of architectural design. Meanwhile, in terms of reliability, Storm’s ACK component uses xOR algorithm to ensure that every message is fully processed without losing performance.
11) Real-time push
There are many application scenarios of real-time push, such as system monitoring, dynamic real-time curve drawing, mobile phone message push, web real-time chat and so on.
There are many technologies for real-time push, such as Comet and Websocket.
Comet is a server push technology based on long server connections. There are two types of Comet:
Long Polling: The server suspends after receiving a request, returns with an update, and then launches a new connection
Stream method: each time the server data transmission will not close the connection, the connection will only appear in communication errors, or connection rebuilt closed (some firewall is often set to throw away a long connection, the server can set a timeout, after a timeout to inform the client to establish a connection, and close the original connection).
Websocket: long connection, full duplex communication
It’s a new protocol for Html5. It realizes bidirectional communication between browser and server. In webSocket API, the browser and the server only need to shake hands to form a fast two-way channel between the browser and the client, so that the data can be quickly two-way transmission.
Socket. IO is a NodeJS Websocket library, including client-side JS and server-side NodeJS, used to quickly build real-time Web applications.
12) Recommendation engines
To be added
6. Data storage
Database storage is generally divided into the following types: relational (transactional) databases represented by Oracle and mysql, KeyValue databases represented by Redis and memcached DB, document databases such as mongodb, and columnar distributed databases such as HBase. Cassandra, Dynamo, other graphics databases, object databases, XML databases, etc. Each type of database application has different service domains. This section analyzes the performance and availability of related products from the aspects of memory, relationship, and distribution.
1) In-memory database
The in-memory database, with high concurrency and high performance as the goal, is not so strict in the transactional aspect, take open source NoSQL database mongodb, Redis as an example
Ø mongo
Communication mode
In multithreaded mode, the main thread listens for new connections and starts new threads to perform data operations (IO switching).
The data structure
Database – > collection – > record
MongoDB divides data stores by namespace. A collection is a namespace, and an index is also a namespace.
Data in the same namespace is divided into extents, which are connected using a bidirectional linked list.
Each Extent contains data for each row, which is also connected by two-way links.
Each row contains not only the space occupied by the data, but may also contain a portion of the additional space that allows the data to remain in place after updates become larger.
The index is implemented in the BTree structure.
If you have jorunaling logs enabled, there will also be files that store all of your actions.
Persistent storage
MMap maps the file address to the memory address space. You can operate the file directly in the memory address space without invoking write or read operations, which provides high performance.
Mongodb calls mmap to map data from disk to memory, so there must be a mechanism to flush data to disk to ensure reliability. How often to flush data is related to the syncdelay parameter.
Journal is the redo log in Mongodb, and Oplog is the binlog for replication. If Journal is turned on, only 100ms of data will be lost even if a power outage occurs, which is tolerable for most applications. From 1.9.2+, mongodb has journal enabled by default to ensure data security. The journal refresh time can be changed in the range of 2-300ms using the -journalcommitInterval command. The time for Oplog and data to be refreshed to disk is 60 seconds. For replication, it can be directly copied to Sencondary node in memory without waiting for Oplog to refresh the disk.
Transaction support
Mongodb only supports atomic operations on single row records
HA cluster
Replica Sets are commonly used. The leader election is carried out automatically by means of the election algorithm, which can meet the requirements of strong consistency while ensuring availability.
Of course, for a large amount of data, mongodb also provides Sharding architecture for data segmentation.
Ø Redis
Rich data structure, high response speed, memory operation
Communication mode
Since all operations are in memory, logical operations are very fast, reducing CPU switching overhead, so the single-thread mode (logical processing thread and main thread are the same).
Reactor pattern, implementing your own multiplexing NIO mechanism (epoll, Select, KQueue, etc.)
A single thread handles multiple tasks
The data structure
Hash +bucket structure, when the linked list length is too long, migrate (expand double the original hash, move the data over, expand+rehash)
Persistent storage
A. Full RDB (traversing redisDB to read the key and value in the bucket), block the main thread with the save command, and enable the sub-process to persist the snapshot and generate the RDB file.
When shutdown occurs, the save operation is invoked
The data changes in how many seconds a BGSAVE is triggered
Sync: The master receives the command sent by the slave
B. Incremental persistence (aOF is similar to Redolog). It is written to the buffer and then flushed to the log file. Only the flush to the file is returned to the client.
Aof files and RDB files must be merged periodically. (During the snapshot process, the changed data is written to the AOF BUF after the sub-process completes the snapshot < memory snapshot>, and then the changed part of AOFBUF and the full mirror data are merged.)
In high concurrent access mode, THE performance of the service in RDB mode is obviously jitter. Aof is better than RDB in performance overhead, but the time of reloading into memory on recovery is proportional to the amount of data.
The HA cluster
The general solution is the master/slave backup switch, using HA software, so that the failure of the master REDis can be quickly switched to the slave REDis. The replication mechanism is used to synchronize primary and secondary data. In this scenario, read and write data can be separated.
One of the problems with replication is that in the event of network instability, disconnection (including intermittent disconnection) between the Slave and Master causes the Master to regenerate all the data in memory (RDB file) and then transfer it to the Slave. After receiving the RDB file from the Master, the Slave clears its own memory and reloads the RDB file into the memory. This method is inefficient, in the future version of Redis2.8 the author has implemented a partial copy function.
2) Relational database
The relational database needs to meet the requirements of transaction as well as concurrency. Taking mysql database as an example, this paper describes the architecture design principle, performance considerations, and how to meet the requirements of availability.
ø Mysql architecture principle (InnoDB)
In architecture, mysql is divided into server layer and storage engine layer.
The architecture of the Server layer is the same for different storage engines, including connection/thread processing, query processing (Parser, Optimizer), and other system tasks. There are many kinds of storage engine layers. Mysql provides a plug-in structure for storage engine, supporting a variety of storage engines, the most widely used are InnoDB and Myisamin. Inodb is mainly for OLTP applications and supports transaction processing. Myisam does not support transaction, table locking, and fast OPERATION on OLAP.
The following is an introduction to innoDB storage engine.
In terms of thread handling, Mysql is a multi-threaded architecture, consisting of a master thread, a lock monitor thread, an error monitor thread, and multiple IO threads. And a thread is opened to service each connection. I/O threads are divided into insert buffers that save random I/O, Oracle-like redo logs for transaction control, and multiple write, multiple read hard disk and memory swapping I/O threads.
In terms of memory allocation, it includes innoDB buffer pool and log buffer. Innodb Buffer pool includes insert Buffer, Datapage, Index page, data dictionary, and adaptive hash. The Log buffer is used to cache transaction logs to provide performance.
In terms of data structures, InnoDB includes table Spaces, segments, extents, pages/blocks, and rows. The index structure is a B+tree structure, including the secondary index and primary key index. The leaf node of the secondary index is the primary key PK. The leaf node points to the stored data block according to the primary key. The B + tree storage structure can better meet the requirements of random query operation IO, divided into data pages and secondary index page, modify the secondary index page involves random operations, in order to improve the performance of the written using the insert buffer order written, again by the background thread insert multiple merger at a certain frequency to secondary index page. In order to ensure database consistency (memory and hard disk data files) and to reduce the recovery time of instances, relational databases also have a checkpoint function, which is used to write the previous dirty pages in the buffer to disk in proportion (old LSN), so that the previous LSN of the redolog file can be overwritten. For recycling; In failure recovery, you only need to recover from the LSN point in the log.
The transaction on the support of it, relational database to meet the ACID four features, need according to the requirements of the different transaction concurrency and data visibility, defines the different transaction isolation level, and leave the locking mechanism of resource contention, want to avoid deadlocks, mysql in Server layer and the storage engine for concurrency control, mainly reflected in the read-write lock, There are different levels of locks (table locks, row locks, page locks, MVCC) depending on the lock granularity. In order to improve concurrency performance, multi-version concurrency control MVCC is used to support transaction isolation, and it is implemented based on Undo. Undo segment is also used when doing transaction rollback. Mysql uses Redolog to ensure data writing performance and failure recovery. When modifying data, you only need to modify the memory and record the modification behavior in the transaction log (sequential I/O), instead of persisting the data modification itself to the hard disk (random I/O) each time, which greatly improves the performance.
In terms of reliability, innoDB storage engine provides the double Writer mechanism to prevent errors in flush pages to storage and solve the problem of disk half-writern.
ø For mysql with high concurrent performance, performance can be tuned in multiple dimensions.
A. Hardware level,
The storage of logs and data needs to be separated. Logs are written in sequence. Raid1 +0 is required and buffer-IO is used. Data is read and write discrete. direct I/O is used to avoid the overhead caused by file system cache.
Storage capability: SAS disk RAID operation (RAID card cache, read cache disabled, disk cache disabled, preread disabled, writeback buffer only, but need to consider charging and discharging problems). Of course, if the data size is not large, data can be stored using high-speed devices such as Fusion IO and SSD.
For data writing, the frequency of dirty page refreshing is controlled. For data reading, the cache hit rate is controlled. Therefore, calculate the IOPS required by the system and the number of hard disks required. For fusion IO, the IOPS must be greater than 10w and the common hard disks must be 150.
In terms of Cpu, NUMA is disabled for single instances. Mysql does not support multiple cores very well. Multiple instances can be Cpu bound.
B. Operating system level
Kernel and socket optimization, network optimization bond, file system, IO scheduling
Innodb is mainly used in OLTP applications, which are generally IO intensive applications. On the basis of improving I/O capacity, innoDB makes full use of cache mechanism. Things to consider are,
On the basis of ensuring the available memory of the system, the InnoDB buffer pool is enlarged as much as possible, usually set to 3/4 of the physical memory
File system cache is used only for transaction logging. Swappiness =0, free file system cache when memory is tight.
IO scheduling optimization, reduce unnecessary blocking, reduce random IO access latency (CFQ, Deadline, NOOP)
C, Server and storage engine levels (connection management, network management, table management, log)
Includes cache/buffer, Connection, and IO
D. Application level (such as index consideration, schema optimization with appropriate redundancy; Optimize CPU and memory issues caused by SQL queries, reduce lock scope, reduce back table scans, overwrite indexes)
ø In terms of high availability practices,
The master-master and master-slave modes are supported. In master-master mode, one slave works as the master to perform read and write operations, while the other slave works as the standby to perform Dr. In maser-slave mode, one slave works as the master to perform write operations.
You can use the HA software to detect and switch between active and standby nodes. You can also use ZooKeeper as the cluster coordination service for finer granularity customization.
For distributed systems, the consistency of database master/standby switchover is always a problem. The following methods can be used:
A. Clustering, such as Oracle rack, has the disadvantage of being more complex
B. In the shared SAN storage mode, data files and log files are stored on the shared storage. The data is consistent during the active/standby switchover and will not be lost
C. Log synchronization is common for data synchronization between master and standby, which can ensure hot backup and good real-time performance. However, some data may not be synchronized during switchover, resulting in data consistency problems. You can record operation logs while operating the primary database. When switching to the standby database, the system checks with the operation logs to make up for data that has not been synchronized.
D. Another way is to switch the standby database to the storage of regolog in the primary database to ensure that data is not lost.
The efficiency of database master-slave replication is not too high in mysql, mainly because transactions are strictly maintained in order. Index mysql replication includes log IO and relog log, both of which are single-thread serial operations. In terms of data replication optimization, the impact of IO is minimized. With Mysql5.6, however, parallel replication across different libraries is supported.
ø Access methods based on different business requirements
Platform business, different business have access to different requirements, such as the typical business users and orders, and users in general is controlled, and the order is constantly increasing, the users table first take depots segmentation, a master read each sharding, is also demand for orders for more user query their orders, Order libraries also need to be shard by user, and support one master multiple reads.
In terms of hardware storage, the transaction log is sequentially written, and the advantage of flash memory is not much higher than that of hard disk. Therefore, raid card storage with battery protection write cache is adopted. For data files, there are a lot of random read and write operations, whether for users or orders. Of course, memory is one aspect, and high-speed I/O device flash memory can be used, such as PCIe card Fusion-IO. The use of flash is also suitable for single-threaded loads, such as master-slave replication, where fusion-IO cards can be configured for slave nodes to reduce replication latency.
For order services, the quantity is increasing, the storage capacity of PCIe cards is limited, and the hot data of order services is only in the latest period (for example, in the last three months). Two solutions are listed here. One is Flashcache, which adopts open source hybrid storage based on flash memory and hard disk. Store hotspot data in flash memory. On the other hand, old data can be periodically exported to HBase in the distributed database. When querying the order list, users can obtain recent data from mysql, and old data can be queried from HBase. Of course, good rowkey design of HBase is required to meet the query requirements.
3) Distributed database
For high concurrent data access, the traditional relational database provides read and write separation scheme, but brings the data consistency problem provides the data segmentation scheme; For more and more massive data, the traditional database is the use of sub-database sub-table, the implementation is more complex, the late to continue to carry out migration maintenance; In terms of high availability and scaling, traditional data adopts active/standby, master/slave, and multi-master schemes. However, the scalability is poor, and data migration is required when nodes are added and downtime occurs. Distributed database HBase has a comprehensive solution to these problems, which meets the requirements of high-concurrency massive data access.
Ø HBase
Column-based efficient storage reduces IO
Typical queries do not require all of the fields on a row; most require only a few fields
For both line-oriented storage systems, all data is retrieved in each query and the desired fields are then selected
Column-oriented storage systems can query a column individually, greatly reducing I/O
Improve compression efficiency
The high similarity of data in the same column increases the compression efficiency
Many Hbase features are determined by column storage
Strong consistent data access
MVCC
HBase consistency data access is implemented using MVCC.
HBase data is written in several stages, including HLog, memStore, and MVCC update.
Only when the MVCC is updated, the memStore is truly written successfully. The isolation of transactions needs to be controlled by the MVCC, such as reading data cannot obtain data that has not been committed by other threads.
Highly reliable
HBase data storage is based on the HADOOP Distributed File System (HDFS) and provides redundancy.
The breakdown of the Region node provides a reliable recovery mechanism for data in memory that is not flushed into files.
Scalable, automatic segmentation, migration
Locate the target Region Server using Zookeeper and finally locate the Region.
Region Servers can be expanded to distribute Region servers to Master servers.
availability
A single point of failure occurs. After the Region Server is down, the Region maintained by the Server cannot be accessed for a short period of time and the failover takes effect.
Maintains the health status and Region distribution of each Region Server using the Master.
The paxOS voting mechanism of ZooKeeper selects the next Master. Even if the Master is down, Region reads and writes are not affected. The Master only acts as an automatic maintenance role.
The HDFS is a distributed storage engine with high reliability and zero data loss.
HDFS namenode is an SPOF.
The split mechanism is provided to prevent frequent access to a single region and heavy single-machine pressure
HBase write is based on the LSM-tree architecture. As the number of Hfiles increases with the append of data, HBase provides HFile files for compact to clear expired data and improve query performance.
Schema free
HBase does not have a strict schema like a relational database. You can add and delete fields in the schema freely.
HBase distributed databases do not support secondary indexes. Currently, only indexes on rowkeys are supported. Therefore, the design of rowkeys is critical to query performance.
7. Manage and deploy configurations
Unified configuration library
Deployment platform
8. Monitoring and statistics
Large distributed system involves a variety of devices, such as network switches, ordinary PCS, various types of network cards, hard disks, memory and so on, as well as application business level monitoring, when the number is very large, the probability of error will become larger, and some monitoring timeliness requirements are relatively high, some reach second level; In a large number of data flows, abnormal data needs to be filtered. Sometimes, complex contextual calculations are performed on the data to determine whether alarms are required. Therefore, it is important to monitor the performance, throughput and availability of the monitoring platform, and a unified and integrated monitoring platform should be planned to monitor the system at all levels.
Platform data classification
Application service levels: application events, service logs, audit logs, request logs, exceptions, request business metrics, and performance metrics
System level: CPU, Memory, network, and IO
Timeliness requirement
Threshold, alarm:
Real-time computing:
Near real time minute calculation
Offline analysis by hour and day
Real-time query
architecture
The Agent on a node can receive logs, application events, and collect data by means of probes. The principle for Agent to collect data is that it is asynchronously isolated from the flow of service applications and does not affect the transaction flow.
Data is collected in a collector cluster and distributed to different computing clusters for processing based on different data types. Some of the data is not time-sensitive, such as by the hour, into the Hadoop cluster; Some data is the trace data of the request flow and needs to be queryable, so it can be put into the Solr cluster for indexing; Some data that need to be calculated in real time and generated alarms need to be processed in the Storm cluster.
After the data is processed by the computing cluster, the data is stored in Mysql or HBase.
Monitoring web applications can push real-time monitoring results to the browser and provide apis for the presentation and search of results.