introduce
One of the biggest problems with Web development using Python is performance, and C10K is a bit of a struggle to solve. There are asynchronous frameworks like Tornado, Twisted, Gevent, and others that address performance issues. These frameworks offer some performance improvements, but there are also all sorts of weird problems that are hard to solve.
In python3.6, the official asynchronous coroutine library asyncio became the official standard. A number of asynchronous frameworks have emerged that use Asyncio while preserving the convenience and improving performance.
An earlier asynchronous framework is AIoHTTP, which provides both server and client sides and encapsulates Asyncio well. However, flask is not developed in the same way as flask, the most popular microframework, which is simple, lightweight and efficient.
Microservices are the most popular development model these days because they solve complexity problems, improve development efficiency, and facilitate deployment.
It is the combination of these advantages, based on Sanic, integrated multiple popular libraries to build microservices. Sanic framework is an asynchronous coroutine framework similar to Flask, which is simple and lightweight with high performance.
This project is a micro-service framework based on Sanic.
The characteristics of
- Use SANIC asynchronous framework, simple, lightweight, efficient.
- Using uvloop as the core engine allows SANIC to be as concurrent as Golang in many cases.
- Asyncpg is used as the database driver to connect to the database and execute SQL statements.
- Use AIOHTTP as Client to access other microservices.
- Use Peewee for ORM, but only for model design and migration.
- Use OpenTracing for a distributed tracing system.
- Use UnitTest for unit testing, and mock to avoid accessing other microservices.
- Use Swagger as API standard, can automatically generate API documents.
use
The project address
sanic-ms:`https://github.com/songcser/sanic-ms`
Example:`https://github.com/songcser/sanic-ms/tree/master/examples`
Copy the code
Swagger API
Zipkin Server
The service side
Using SANIC asynchronous framework, has high performance, but improper use will cause blocking, for IO requests to choose asynchronous library. Be careful when adding libraries. Sanic uses uvloop asynchronous drivers, which are written in Cython based on Libuv and perform better than NodeJS.
Functional specifications
Before the start of
@app.listener('before_server_start')
async def before_srver_start(app, loop):
queue = asyncio.Queue()
app.queue = queue
loop.create_task(consume(queue, app.config.ZIPKIN_SERVER))
reporter = AioReporter(queue=queue)
tracer = BasicTracer(recorder=reporter)
tracer.register_required_propagators()
opentracing.tracer = tracer
app.db = await ConnectionPool(loop=loop).init(DB_CONFIG)
Copy the code
- Example Create a DB connection pool
- Creating a Client connection
- Create a queue that consumes a span for log tracking
- Create OpenTracing. Tracer for log tracing
The middleware
@app.middleware('request')
async def cros(request):
if request.method == 'POST' or request.method == 'PUT':
request['data'] = request.json
span = before_request(request)
request['span'] = span
@app.middleware('response')
async def cors_res(request, response):
span = request['span'] if 'span' in request else None
if response is None:
return response
result = {'code': 0}
if not isinstance(response, HTTPResponse):
if isinstance(response, tuple) and len(response) == 2:
result.update({
'data': response[0],
'pagination': response[1]
})
else:
result.update({'data': response})
response = json(result)
if span:
span.set_tag('http.status_code', "200")
if span:
span.set_tag('component', request.app.name)
span.finish()
return response
Copy the code
- Create a SPAN for log tracking
- Encapsulate the response and unify the format
Exception handling
Handles thrown exceptions and returns a uniform format
task
Create a pair of spans in the Task consumption queue for log tracking
Asynchronous processing
With an asynchronous framework, some IO requests can be processed in parallel
Example:
async def async_request(datas):
# async handler request
results = await asyncio.gather(*[data[2] for data in datas])
for index, obj in enumerate(results):
data = datas[index]
data[0][data[1]] = results[index]
@user_bp.get('/<id:int>')
@doc.summary("get user info")
@doc.description("get user info by id")
@doc.produces(Users)
async def get_users_list(request, id):
async with request.app.db.acquire(request) as cur:
record = await cur.fetch(
""" SELECT * FROM users WHERE id = $1 """, id)
datas = [
[record, 'city_id', get_city_by_id(request, record['city_id'])]
[record, 'role_id', get_role_by_id(request, record['role_id'])]
]
await async_request(datas)
return record
Copy the code
Getcitybyid, getroleByID is parallel processing.
Related links
sanic: https://github.com/channelcat/sanic
Model design & ORM
Peewee is a simple and small ORM. It has few (but expressive) concepts, making It easy to learn and intuitive to use.Copy the code
ORM uses Peewee for model design and migration only, while database operations use Asyncpg.
Example:
# models.py
class Users(Model):
id = PrimaryKeyField()
create_time = DateTimeField(verbose_name='create time',
default=datetime.datetime.utcnow)
name = CharField(max_length=128, verbose_name="user's name")
age = IntegerField(null=False, verbose_name="user's age")
sex = CharField(max_length=32, verbose_name="user's sex")
city_id = IntegerField(verbose_name='city for user', help_text=CityApi)
role_id = IntegerField(verbose_name='role for user', help_text=RoleApi)
class Meta:
db_table = 'users'
# migrations.py
from sanic_ms.migrations import MigrationModel, info, db
class UserMigration(MigrationModel):
_model = Users
# @info(version="v1")
# def migrate_v1(self):
# migrate(self.add_column('sex'))
def migrations():
try:
um = UserMigration()
with db.transaction():
um.auto_migrate()
print("Success Migration")
except Exception as e:
raise e
if __name__ == '__main__':
migrations()
Copy the code
- Run the python migrations.py command
- Migrate_v1 adds the field sex, and the name field is added in BaseModel first
- The info decorator creates a migrate_record table to record migrate, version must be unique in each model, version is used to record whether or not the migrate has been executed, and author and datetime can also be recorded
- The migrate function must start with migrate_
Related links
peewee:http://docs.peewee-orm.com/en/latest/
Copy the code
Database operations
asyncpg is the fastest driver among common Python, NodeJS and Go implementations
Copy the code
Asyncpg is used as the database driver to encapsulate database connections and perform database operations.
One reason for not using ORM for database operations is performance, ORM has a performance drain and the asyncpG high performance library is not available. The other is that individual microservices are very simple, the table structure is not very complex, simple SQL statements can be handled, there is no need to introduce ORM. Using Peewee is just model design
Example:
sql = "SELECT * FROM users WHERE name=$1"
name = "test"
async with request.app.db.acquire(request) as cur:
data = await cur.fetchrow(sql, name)
async with request.app.db.transaction(request) as cur:
data = await cur.fetchrow(sql, name)
Copy the code
- The acquire() function is non-transactional, which can improve query efficiency if only queries are involved
- The tansAction () function is transactional and must be used for additions, deletions, and changes
- The request argument is passed in to get a SPAN for log tracing
- TODO database read/write separation
Related links
asyncpg:https://github.com/MagicStack/asyncpg
benchmarks:https://magic.io/blog/asyncpg-1m-rows-from-postgres-to-python/
Copy the code
The client
Using client in AIOHTTP, the client is simply encapsulated for access between microservices.
Don't create a session per request. Most likely you need a session per application which everyone requests altogether. A session contains a connection pool inside, connection reusage and keep-alives (both are on by default) may speed up total performance.Copy the code
Example:
@app.listener('before_server_start')
async def before_srver_start(app, loop):
app.client = Client(loop, url='http://host:port')
async def get_role_by_id(request, id):
cli = request.app.client.cli(request)
async with cli.get('/cities/{}'.format(id)) as res:
return await res.json()
@app.listener('before_server_stop')
async def before_server_stop(app, loop):
app.client.close()
Copy the code
Multiple different clients can be created for accessing different microservices, so that each client keeps -alives
Related links
aiohttp:http://aiohttp.readthedocs.io/en/stable/client.html
Copy the code
Log & distributed tracking system
Use official logging, with the configuration file logging.yml and sanIC version 0.6.0 or above. JsonFormatter converts logs to JSON format for input to ES
Enter OpenTracing: by offering consistent, expressive, vendor-neutral APIs for popular platforms, OpenTracing makes it easy for developers to add (or switch) tracing implementations with an O(1) configuration change. OpenTracing also offers a lingua franca for OSS instrumentation and platform-specific tracing helper libraries. Please refer to the Semantic Specification.
Copy the code
A decorator logger
@logger(type='method', category='test', detail='detail', description="des", tracing=True, level=logging.INFO)
async def get_city_by_id(request, id):
cli = request.app.client.cli(request)
Copy the code
- Type: indicates the log type, such as method and route
- Category: log category. The default value is app name
- Detail: log details
- Description: Indicates the log description. The default value is a function comment
- Tracing: log tracing, default True
- Level: log level. The default value is INFO
Distributed tracking system
- Based on distributed tracking systems such as Dapper and Zipkin, OpenTracing established unified standards.
- Opentracing traces each request and records each microservice that the request passes through. It is crucial to analyze performance bottlenecks of microservices in a chain manner.
- Use the OpenTracing framework, but convert to Zipkin format for output. Since most distributed tracing systems use thrift to communicate for performance reasons, in the spirit of simple, Restful style, RPC communication is not used. Output in the form of logs, you can use fluentd, Logstash and other log collection and input to Zipkin. Zipkin supports HTTP input.
- The generated span is first queued without blocking, and the span of the queue is consumed in task. The upper sampling frequency can be added later.
- For DB, clients are added with tracing
Related links
opentracing:https://github.com/opentracing/opentracing-python
zipkin:https://github.com/openzipkin/zipkin
jaeger:https://uber.github.io/jaeger/
Copy the code
API interface
API documentation uses the Swagger standard.
Example:
from sanic_ms import doc
@user_bp.post('/')
@doc.summary('create user')
@doc.description('create user info')
@doc.consumes(Users)
@doc.produces({'id': int})
async def create_user(request):
data = request['data']
async with request.app.db.transaction(request) as cur:
record = await cur.fetchrow(
""" INSERT INTO users(name, age, city_id, role_id)
VALUES($1, $2, $3, $4, $5)
RETURNING id
""", data['name'], data['age'], data['city_id'], data['role_id']
)
return {'id': record['id']}
Copy the code
- Summary: the API profile
- Description: Provides detailed description
- Consumes: The body data of the request
- Produces: Response returns data
- Tag: API
- The consumes or Produces parameters that are passed in are Peewee’s model. It parses the Model to generate API data, and the Help_text parameter in the field field represents the consumes or Produces objects
- http://host:ip/openapi/spec.json for generating the json data
Related links
swagger:https://swagger.io/
Copy the code
The Response data
Instead of returning sanic’s response, return raw data directly. The Middleware will process the returned data and return a uniform format, which can be found in [see]
Unit testing
Unit tests use UnitTest. Mock creates a MockClient by itself, because UnitTest does not yet have an asyncio mock, and sanic’s test interface also sends request requests. Pytest can be used later.
Example:
from sanic_ms.tests import APITestCase
from server import app
class TestCase(APITestCase):
_app = app
_blueprint = 'visit'
def setUp(self):
super(TestCase, self).setUp()
self._mock.get('/cities/1',
payload={'id': 1, 'name': 'shanghai'})
self._mock.get('/roles/1',
payload={'id': 1, 'name': 'shanghai'})
def test_create_user(self):
data = {
'name': 'test',
'age': 2,
'city_id': 1,
'role_id': 1,
}
res = self.client.create_user(data=data)
body = ujson.loads(res.text)
self.assertEqual(res.status, 200)
Copy the code
- _blueprint indicates the name of the blueprint
- In the setUp function, we use _mock to register the mock information so that the real server is not accessed, and payload is the body information returned
- Each function is called using client variables, with data as body information, params as path parameters, and other parameters as route parameters
Code coverage
coverage erase
coverage run --source . -m sanic_ms tests
coverage xml -o reports/coverage.xml
coverage2clover -i reports/coverage.xml -o reports/clover.xml
coverage html -d reports
Copy the code
- Coverage2colver converts coverage. XML into clover. XML, bamboo needs clover format.
Related links
unittest:https://docs.python.org/3/library/unittest.html coverage: https://coverage.readthedocs.io/en/coverage-4.4.1/Copy the code
Exception handling
Handle thrown exceptions using app.error_handler = CustomHander()
Example:
from sanic_ms.exception import ServerError @visit_bp.delete('/users/<id:int>') async def del_user(request, id): Raise ServerError(error=' internal error ',code=10500, message=" MSG ") code: error code. The value is 0 if no exception occurs. Other values are abnormal. User-defined error message status_code: indicates the HTTP status code. The standard HTTP status code is usedCopy the code
\
Author: Song Jiyi, January 29
Making ID: songcser
\
Click to read the original article and join the CodingGo programming community. To read more, click: