This is the 7th day of my participation in the August Text Challenge.More challenges in August

The producer and consumer problem is a classic problem in the threading model: Producers and consumers share the same Buffer (Buffer) in the same period of time. Producers add products to the Buffer, and consumers take products from the Buffer. When the Buffer is empty, consumers block, and when the Buffer is full, producers block.

Kotlin has several ways to implement a multithreaded production/consumption model (mostly for Java as well)

  1. Synchronized
  2. ReentrantLock
  3. BlockingQueue
  4. Semaphore
  5. PipedXXXStream
  6. RxJava
  7. Coroutine
  8. Flow

1. Synchronized

Synchronized is the most basic thread synchronization tool that can be used with Wait/Notify to implement production and consumption problems

val buffer = LinkedList<Data>()
val MAX = 5 / / buffer size

val lock = Object()

fun produce(data: Data) {
    sleep(2000) // mock produce
    synchronized(lock) {
        while (buffer.size >= MAX) {
           // Stop production when the buffer is full
           // Note that the use of while does not use if, because it is possible to be woken up by another producing thread rather than the consuming thread, so check the buffer state again
           // There is no need to worry about this if production consumes two locks
           lock.wait()
        }

		    buffer.push(data)
        The notify method wakes up only one of the threads, depending on the operating system's implementation of multithreaded management.
        // notifyAll wakes up all waiting threads. Which thread will be the first to process depends on the implementation of the operating system, but all have a chance to process.
        // notifyAll must be used because it is possible to wake up another production thread using notify and thus cause a deadlock
        lock.notifyAll()
    }
}

fun consume(a) {
    synchronized(lock) {
        while (buffer.isEmpty())
            lock.wait() // Stop spending
        buffer.removeFirst()
        lock.notifyAll()
    }
    sleep(2000) // mock consume
}



@Test
fun test(a) {
    // Start multiple production and consumption threads simultaneously
    repeat(10) {
        Thread { produce(Data()) }.start()
    }
    repeat(10) {
        Thread { consume() }.start()
    }
}

Copy the code


2. ReentrantLock


The advantage of Lock over Synchronized is that when there are multiple production/consumption threads, we can specify exactly which one to wake up by defining multiple conditions. The following example shows Lock with await/single replacing Synchronized

val buffer = LinkedList<Data>()
val MAX = 5 / / buffer size
             
val lock = ReentrantLock()                     
val condition = lock.newCondition()          
                                               
fun produce(data: Data) {                      
    sleep(2000) // mock produce                
    lock.lock()                                
                                               
    while (buffer.size >= 5)                      
        condition.await()                      
                                               
    buffer.push(data)                          
    condition.signalAll()                      
    lock.unlock()                              
}                                              
                                               
fun consume(a) {                                
    lock.lock()                                
    while (buffer.isEmpty())                      
        condition.await()                      
                                               
    buffer.removeFirst()
    condition.singleAll()                        
    lock.unlock()                              
    sleep(2000) // mock consume                
}                                              
Copy the code


3. BlockingQueue

When a critical condition is reached, the BlockingQueue will automatically block the current thread waiting for the lock to be released, making it a natural fit for this production/consumption scenario

val buffer = LinkedBlockingQueue<Data>(5)               
                                                        
fun produce(data: Data) {                               
    sleep(2000) // mock produce                         
    buffer.put(data) // Block automatically when the buffer is full
}
                                                        
fun consume(a) {                                         
    buffer.take() // Buffer blocks automatically when empty
    sleep(2000) // mock consume                         
}                                                       
                                                                                                             
Copy the code

Note that BlockingQueue has three groups of read/write methods, and only one group blocks. Don’t use it incorrectly

methods instructions
add(o)/remove(o) The add method throws an exception when adding elements that exceed the length of the column
offer(o)/poll(o) Offer returns false if the queue is too full to add elements
put(o)/take(o) When put adds an element to the end of a queue and finds that the queue is full, it blocks waiting for space to add an element


4. Semaphore

Semaphore is a shared locking mechanism provided by JUC for congestion control, which can be used to control the size of buffers.

// canProduce: the number of buffers that can be produced (i.e. the number of buffers available). The producer calls acquire and reduces the number of permit
val canProduce = Semaphore(5)                                                                                           
// canConsumer: the number that can be consumed, the producer calls release and increases the number of permits
val canConsume = Semaphore(5)                                                                                      
// Control buffer access mutually exclusive
val mutex = Semaphore(0)                                       
                                                               
val buffer = LinkedList<Data>()                                
                                                               
fun produce(data: Data) {                                      
    if (canProduce.tryAcquire()) {                             
        sleep(2000) // mock produce                            
                                                               
        mutex.acquire()                                        
        buffer.push(data)                                      
        mutex.release()                                        
                                                               
        canConsume.release() // Notify the consumer of a new product}}fun consume(a) {                                                
    if (canConsume.tryAcquire()) {                             
        sleep(2000) // mock consume                            
                                                               
        mutex.acquire()                                        
        buffer.removeFirst()                                   
        mutex.release()                                        
                                                               
        canProduce.release() // Notify the production end that additional production can be performed}}Copy the code


5. PipedXXXStream

PipedInputStream PipedInputStream PipedOutputStream PipedInputStream PipedInputStream The thread on which the input stream PipedInputStream is located will block

val pis: PipedInputStream = PipedInputStream()
val pos: PipedOutputStream by lazy {
    PipedOutputStream().apply {
        pis.connect(this) // Establish a connection between input and output streams}}fun produce(data: ContactsContract.Data) {
    while (true) {
        sleep(2000)
        pos.use { // Kotlin uses use to conveniently release resources
            it.write(data.getBytes())
            it.flush()
        }
    }
}

fun consume(a) {
    while (true) {
        sleep(2000)
        pis.use {
            val byteArray = ByteArray(1024)
            it.read(byteArray)
        }
    }
}

@Test
fun Test(a) {
    repeat(10) {
        Thread { produce(Data()) }.start()
    }

    repeat(10) {
        Thread { consume() }.start()
    }
}

Copy the code


6. RxJava

Conceptually, RxJava can take Observable/Subject as producer and Subscriber as consumer, but both Subject and Observable lack blocking mechanism when Buffer overflow occurs. It is difficult to implement the producer/consumer model independently.

Flowable’s backpressure mechanism can be used to control the number of buffers and establish communication between upstream and downstream. With Atomic, it can be changed to achieve single-producer/single-consumer scenarios (not suitable for multi-producer/multi-consumer scenarios).

class Producer : Flowable<Data>() {

    override fun subscribeActual(subscriber: org.reactivestreams.Subscriber<in Data>) {
        subscriber.onSubscribe(object : Subscription {
            override fun cancel(a) {
                / /...
            }

            private val outStandingRequests = AtomicLong(0)

            override fun request(n: Long) { // Received notification from downstream, start production
                outStandingRequests.addAndGet(n)

                while (outStandingRequests.get(a) >0) {
                    sleep(2000)
                    subscriber.onNext(Data())
                    outStandingRequests.decrementAndGet()
                }
            }

        })
    }

}



class Consumer : DefaultSubscriber<Data>() {

    override fun onStart(a) {
        request(1)}override fun onNext(i: Data?). {
        sleep(2000) //mock consume
        request(1) // Notify upstream that production can be increased
    }

    override fun onError(throwable: Throwable) {
        / /...
    }

    override fun onComplete(a) {
        / /...}}@Test
fun test_rxjava(a) {
    try {
        val testProducer = Producer)
        val testConsumer = Consumer()

        testProducer
            .subscribeOn(Schedulers.computation())
            .observeOn(Schedulers.single())
            .blockingSubscribe(testConsumer)

    } catch (t: Throwable) {
        t.printStackTrace()
    }

}
Copy the code


7. Coroutine Channel

The Channel in coroutine has congestion control mechanism, which can realize the communication between producer and consumer. A Channel is a blocking queue of a coroutine version. Capacity specifies the queue capacity.


val channel = Channel<Data>(capacity = 5)

suspend fun produce(data: ContactsContract.Contacts.Data) = run {
    delay(2000) //mock produce
    channel.send(data)}suspend fun consume(a) = run {
    delay(2000)//mock consume
    channel.receive()
}

@Test
fun test_channel(a) {
    repeat(10) {
        GlobalScope.launch {
            produce(Data())
        }
    }

    repeat(10) {
        GlobalScope.launch {
           consume()
        }
    }
}

Copy the code

In addition, Coroutine provides the produce method, which produces data while declaring a Channel. It is easier to write and suitable for single-consumer single-producer scenarios:

fun CoroutineScope.produce(a): ReceiveChannel<Data> = produce {
    repeat(10) {
        delay(2000) //mock produce
        send(Data())
    }
}

@Test
fun test_produce(a) {
    GlobalScope.launch {
        produce.consumeEach {
            delay(2000) //mock consume}}}Copy the code

8. Coroutine Flow

Flow, like RxJava, is not suitable for processing production and consumption problems due to its lack of blocking mechanism when Buffer overflow occurs, and its back pressure mechanism is relatively simple, so it cannot receive downstream notifications like RxJava. But Flow later released SharedFlow as a buffered heat Flow that provides a Buffer overflow strategy that can be used as producer/consumer synchronization.

val flow : MutableSharedFlow<Data> = MutableSharedFlow(
    extraBufferCapacity = 5  // Buffer size
    , onBufferOverflow = BufferOverflow.SUSPEND // Buffer overflow strategy: suspend
)

@Test
fun test(a) {

    GlobalScope.launch {
        repeat(10) {
            delay(2000) //mock produce
            sharedFlow.emit(Data())
        }
    }

    GlobalScope.launch {
        sharedFlow.collect {
            delay(2000) //mock consume}}}Copy the code

Note that SharedFlow can only be used in single-producer/single-consumer scenarios


conclusion

The core of producer/consumer problem is the synchronization problem when multithreading reads and writes shared resources (Buffer). Theoretically, as long as multithreading framework has synchronization mechanism, such as thread lock, semaphore, blocking queue, coroutine Channel, etc., production and consumption model can be realized.

In addition, RxJava and Flow are also multi-threaded frameworks, but lack the blocking mechanism in case of Buffer overflow. They are not suitable for production/consumption scenarios and are more suitable for pure reactive scenarios.