Mooring floating purpose in this paper, starting from Jane books: www.jianshu.com/u/204b8aaab…

This article is participating in “Java Theme Month – Java Development in Action”, see the activity link for details

version The date of note
1.0 2020.9.13 The article first
1.1 2020.11.8 Optimize the wording for the first scenario
1.2 2021.1.17 Optimize the summary part
1.3 2021.2.3 Modify the title fromWrite white box tests with Clean Architecture -> Follow Clean Architecture to write white box tests
1.4 2021.5.21 Modify the title fromFollow Clean Architecture to write white box tests -> Tip: Follow Clean Architecture to write white box tests

preface

Clean Architecture is an architectural model proposed by Uncle Bob in 2012. It is distilled from a series of architectures over the past few decades:

  • Hexagonal Architecture: First proposed by Alistair Cockburn
  • DCI: First proposed by James Coplien and Trygve Reenskaug
  • BCE: was first proposed by Ivar Jacobson in his book Obect Oriented Software Engineering: A Use-Case Driven Approach

Systems designed based on these architectures often have the following characteristics:

  • Framework-independent: The architecture of these systems does not depend on a function within a feature-rich framework. Frameworks can be used as tools, but the system does not need to adapt to the framework.
  • Testable The business logic of these systems can be tested without UI, database, Web services, and other external elements.
  • UI independence: The UI of these systems can be easily changed without modifying other parts of the system. For example, we can replace a system’s UI from a Web interface with a command line interface without changing the business logic.
  • Database independence: We can easily replace Oracle and SQL Server with Mongo, BigTable, CouchDB, etc. Because the business logic is decoupled from the database. Independence from any external agency: The business logic of these systems does not need to be aware of the existence of any other external interfaces.

So much for the introduction of Clean Architecture, if you are interested, you can find it on Google.

background

I’ve been writing a lot of business code lately, and because every component is distributed, manual testing can be painful and time-consuming. So the author began to think about automated testing solutions for the business.

Currently, part of the business code uses the Storm framework. Let’s pick a use case that is easy to understand, which involves about three components:

  • ReadSpout: Reads messages from Kafka and database and delivers them
  • DispatcherBolt: Reads messages sent upstream and dispatches them according to certain rules — for example, custom fields
  • KafkaWriteBolt: Reads messages sent upstream and writes data like keywords to the same kafka partition

The code for DispatcherBolt looks like this:

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        super.prepare(conf, context, collector);
        try {
            init();
        } catch (Exception e) {
            collector.reportError(e);
            throw newRuntimeException(e); }}@Override
    public void execute(Tuple dataTuple) {
        this.input = dataTuple;
        try {
            Object obj = dataTuple.getValueByField(EmitFields.MESSAGE);
            String key = (String) dataTuple.getValueByField(EmitFields.GROUP_FIELD);
            List<MessageEntry> messageEntries = (List<MessageEntry>) obj;
            emitMessageEntry(key, messageEntries);
            this.collector.ack(dataTuple);
        } catch (Exception e) {
            logger.info("Dispatcher Execute error: ", e);
            this.collector.reportError(e);
            this.collector.fail(dataTuple); }}private void emitMessageEntry(String key, List<MessageEntry> messageEntries) throws Exception {
        long lastPos = 0L, uniquePos = 0L, payloadSize = 0L;
        UmsMessageBuilder builder = null;
        String tableName = messageEntries.get(0).getEntryHeader().getTableName();
        for (MessageEntry msgEntry : messageEntries) {
            EntryHeader header = msgEntry.getEntryHeader();
            header.setLastPosition(lastPos);
            if(StringUtils.isEmpty(tableName) || (getExtractorConfig().getGroupType() == GroupType.SCHEMA && ! StringUtils.equalsIgnoreCase(tableName, header.getTableName()))) { emitBuilderMessage(builder, key); builder = createUmsDataBuilder(msgEntry, destination, msgEntry.getBatchId(), MediaType.DataSourceType.getTypeByName(getExtractorConfig().getNodeType())); payloadSize =0;
            }
            // DDL handle
            if (msgEntry.isDdl()) {
                emitBuilderMessage(builder, key);
                executeDdlEvent(msgEntry);
                emitDDLMessage(key, msgEntry);
                builder = null;
                continue;
            }


            if(builder ! =null && msgEntry.getEntryHeader().getHeader().getSourceType().equalsIgnoreCase(MediaType.DataSourceType.ORACLE.getName())) {
                emitBoltMessage(key, builder.getMessage());
                builder = createUmsDataBuilder(msgEntry, destination, msgEntry.getBatchId(),
                        MediaType.DataSourceType.getTypeByName(getExtractorConfig().getNodeType()));
                payloadSize = 0;
            }
            // DML handle
            if (builder == null) {
                builder = createUmsDataBuilder(msgEntry, destination, msgEntry.getBatchId(),
                        MediaType.DataSourceType.getTypeByName(getExtractorConfig().getNodeType()));
                payloadSize = 0;
            }
            for (CanalEntry.RowData rowData : msgEntry.getRowDataLst()) {
                lastPos = Long.parseLong(header.getPosition()) + (++uniquePos);
                if (header.isUpdate()) {
                    if (getExtractorConfig().getOutputBeforeUpdateFlg()) {
                        payloadSize += appendUpdateBefore2Builder(builder, header, rowData, EventType.BEFORE.getValue().toLowerCase());
                    }
                    if (ExtractorHelper.isPkUpdate(rowData.getAfterColumnsList())) {
                        payloadSize += appendUpdateBefore2Builder(builder, header, rowData, getEventTypeForUMS(CanalEntry.EventType.DELETE));
                    }
                }

                List<Object> payloads = new ArrayList<>();
                payloadSize += appendRowData2Builder(payloads, builder, header, rowData);
                builder.appendPayload(payloads.toArray());
                }
            }

        }
        emitBuilderMessage(builder, key);
    }
Copy the code

Note that the prepare and execute methods are exposed interfaces to the framework to get the context of the strom and the object delivered by the strom during initialization. If used incorrectly by developers, this can lead to coupling between business code and the framework.

Option 1: Object Dependency Inject

This scheme was tried in the early stage. Simply speaking, it abstracts the emitMessageEntry related code into an interface method, fills the current logic into the implementation code, and infuses it through the Spring IOC framework, similar to:

    override fun prepare(topoConf: MutableMap<String, Any>, context: TopologyContext, collector: OutputCollector) {
        super.prepare(topoConf, context, collector)
        try {
            init(a)this.dispatcherServer = IOCUtil.getBean(DispatcherServer::class.java).init(collector)
        } catch (e: Exception) {
            collector.reportError(e)
            throw RuntimeException(e)
        }
    }
    override fun execute(input: Tuple) {
            val obj = dataTuple.getValueByField(EmitFields.MESSAGE)
            val key = dataTuple.getValueByField(EmitFields.GROUP_FIELD) as String
            val messageEntries = obj as List<MessageEntry>
            dispatcherService.dispatcherLogical(messageEntries,key)
    }
Copy the code

This allows us to inject the dispatcherService class directly into our unit tests and implement an OutputCollector ourselves to collect distribution data — by configuring the Spring framework to flexibly replace the implementation class. We then fill in the parameters of the mock and assert whether the result matches our expectations.

But since Storm is involved in distribution related matters (like serialization), the business code gets a little twisted:

  1. Will thisdispatcherServiceThe members declared in Bolt asTransient
  2. The IOC container needs to be initialized at initialization time
  3. Inject dispatcherService after initializing the IOC container

As you can see, we actually had to change the business code in order to test it — adding irrelevant logic, which is obviously not a good solution.

Mockito

The Mockito solution is non-invasive to the business. You can write the test code directly, which is similar to:

@RunWith(PowerMockRunner::class)
@PowerMockIgnore("javax.management.*")
class DispatcherBoltTest {

    private lateinit var config: AbstractSinkConfig
    private lateinit var outputCollector: OutputCollector
    private lateinit var tuple: Tuple


    @Before
    fun atBefore(a) {
        config = PowerMockito.mock(AbstractSinkConfig::class.java)
        outputCollector = PowerMockito.mock(OutputCollector::class.java)
        tuple = PowerMockito.mock(Tuple::class.java)
    }

    private fun init(dispatcherBoltImpl: DispatcherBoltImpl) {
        reset(config)
        reset(outputCollector)
        reset(tuple)
        dispatcherBoltImpl.prepare(mutableMapOf(), PowerMockito.mock(TopologyContext::class.java), outputCollector)
    }

    @Test
    fun testSingleUms(a) {
        // Define some behavior of the mock object
        `when`(config.configProps).thenReturn(Properties())

        // Instantiate the class to be tested
        val dispatcherBoltImpl = DispatcherBoltImpl(config)
        init(dispatcherBoltImpl)

        val umsMap = generateSingleUmsBo()
        val boMap = getBoMap(intArrayOf(1))

        // Define some behavior of the mock object
        `when`(tuple.getValueByField(EmitFields.MESSAGE)).thenReturn(umsMap.messages)
        `when`(tuple.getValueByField(EmitFields.GROUP_FIELD)).thenReturn(umsMap.dispatchKey)
        `when`(tuple.getValueByField(EmitFields.EX_BO)).thenReturn(boMap)

        dispatcherBoltImpl.handleDataTuple(tuple)

        // Result verification
        Mockito.verify(outputCollector, Mockito.times(1))
                .emit(EmitFields.DATA_MSG, tuple, Values(umsMap.dispatchKey, umsMap.messages,
                        boMap,
                        EmitFields.EMIT_TO_BOLT))
    }
}
Copy the code

The logic is straightforward: select the object to mock, define its behavior, fill it with data, and validate it against the result-essentially testing the behavior of both the business and the framework.

But if you take a higher view, there are two potential problems to consider:

  1. The business logic of this class is relatively simple right now, so we have fewer links to focus on — this is reflected in the mock behavior we write on our mock objects. In other words, the more complex the class, the more mock code we need to write.
  2. Currently our business and framework are tightly coupled, so we need to take the behavior of the framework into account when testing. It also means that when the behavior of the framework changes (such as upgrades), the test cases need to change a lot. Or when you change frameworks, test cases become almost unavailable. This violates the principle of clean architecture — the business needs to be framework independent, not tightly coupled.

Scheme 3: Clean Architecture

As mentioned earlier, the first thing we need to do is decouple the business from the framework. So how do you peel it off? Let’s get straight to the answer:

/** * Decouple from any stream processing framework and focus only on UMS distributed services ** /
interface DispatcherServer {

    fun dispatcherMessageEntry(key: String, messageEntries: List<MessageEntry>, destination: String,
                               tableToDispatchColumn: HashMap<String, Set<String>>,
                               resultConsumer: (group: MutableMap<Int.UmsMessageBuilder>, key: String) - >Unit,
                               executeDdlEventBlock: (messageEntry: MessageEntry) - >Unit,
                               ddlMessageConsumer: (key: String.messageEntry: MessageEntry) - >Unit)
}
Copy the code

We define three functional parameters. In this way, we can easily separate the business from the framework. The code calls like this:

    override fun execute(dataTuple: Tuple) {
        input = dataTuple
        try {
            val obj = dataTuple.getValueByField(EmitFields.MESSAGE)
            val key = dataTuple.getValueByField(EmitFields.GROUP_FIELD) as String
            val messageEntries = obj as List<MessageEntry>
            dispatcherServer.dispatcherMessageEntry(key, messageEntries, destination, tableToDispatchColumn,
                    dmlMessageConsumer = { builder, innerKey -> emitBuilderMessage(builder, innerKey) },
                    executeDdlEventBlock = { entry -> executeDdlEvent(entry) },
                    ddlMessageConsumer = { innerKey, msgEntry -> emitDDLMessage(innerKey, msgEntry) }
            )

            collector.ack(dataTuple)
        } catch (e: Exception) {
            logger.info("Dispatcher Execute error: ", e)
            collector.reportError(e)
            collector.fail(dataTuple)
        }
    }
Copy the code

EmitBuilderMessage, executeDdlEvent, and emitDDLMessage are private methods of DispatcherBolt that send incoming data to the collector according to certain rules. In this way, we put our framework-specific code in DispatcherBolt.

Framework independent business code can be put into the DispatcherServer implementation.

Testing code can also focus on testing business logic:

    @Test
    fun testUpdateRecords(a) {
        val originNamespace = "my_schema.my_table"
        val mockData = listOf(getUpdate1Data())
        val config = getMockConfig(extractorConfigJsonFile)
        config.outputBeforeUpdateFlg = false
        config.outputExtraValueFlg = false
        config.payloadType = PayloadType.SIZE
        config.maxPayloadSize = 10240
        val dispatcherServer = DispatcherServerImpl(config)
        val resultMap = mutableMapOf<Int, UmsMessageBuilder>()

        dispatcherServer.dispatcherMessageEntry(originNamespace, mockData, "M26", hashMapOf(),
                dmlMessageConsumer = { builder, innerKey ->
                    resultMap.putAll(builder)
                    Assert.assertEquals(innerKey, originNamespace)
                },
                executeDdlEventBlock = { throw  RuntimeException("There should be no DDL events in this pile of data.") },
                ddlMessageConsumer = { _, _ -> throw  RuntimeException("There should be no DDL-related results in this pile of data.") })


        assertEquals(1, resultMap.keys.toSet().size, "The current data should be divided into three groups -- from different primary keys according to primary key distribution.")
        assertEquals(1, resultMap.size, "The current data should be divided into three groups -- from different primary keys according to primary key distribution.")
        val umsList = resultMap.values.map { it.message }
        umsList.forEach {
            Assert.assertEquals("m.M26.my_schema.my_table", it.schema.namespace)
            Assert.assertEquals(1, it.payloads.size)
            assertEquals(9, it.schema.fields.size, "5 extended fields +4 Schema fields should be 9")
            Assert.assertEquals("inc", it.protocol.type)
            Assert.assertEquals("2", it.protocol.version)
            assertEquals(MediaType.DataSourceType.MYSQL, KafkaKeyUtils.getDataSourceType(it))
        }
    }
Copy the code

After that, let’s talk about the techniques used above. In fact, this is much like the Strategy pattern in object-oriented — define an algorithm interface and implement the logic of each algorithm in this interface, so that algorithms of the same type can be used interchangeably. The advantage of this is that changes in the algorithm do not affect or be influenced by the user. If the function is a first-class citizen, it makes it easy to set up and manipulate various policies.

So what is not simple? With Java, we would have to define a special interface, declare a method, and pass it in using an anonymous internal implementation, but that’s not really necessary because we just want to pass in a function, not an object. Typical code can be seen here:

ZStack source analysis of the design patterns appreciation — the strategy pattern: segmentfault.com/a/119000001…

All design patterns do is reduce code redundancy and increase code reuse. ** In functional languages, reuse takes the form of passing functions as first-level language components through arguments, a technique frequently used by functional programming libraries. In contrast to object-oriented languages (in terms of type), reuse in functional languages occurs at a coarser level of granularity (in terms of behavior), with an aim to extract some common operating mechanism and parameterize its behavior.

summary

In this article, I discussed some typical testing methods with you, and finally we used the strategy pattern to better complete the test code. The strategy model itself is an inversion of control. We can use the Hollywood Principle to understand IOC: don’t call us, we’ll call you In the earliest versions, our business code “found” the framework’s methods directly after execution to make it coupled. In the final version, our business code exposed the policy interface so that external logic could be injected flexibly rather than tightly coupled together.