Learn how processes synchronize with other processes in Linux.
This is the second article in the Interprocess Communication (IPC) series on Linux. The first article focuses on IPC through shared storage such as shared files and shared memory segments. The focus of this document will turn to pipes, which are channels that connect processes that need to communicate. A pipe has a write end for writing bytes of data and a read end for reading those bytes in a first-in, first-out order. And those bytes of data could represent anything: numbers, employee records, digital movies, and so on.
There are two types of pipes, named pipes and nameless pipes, both of which can be used interactively on the command line or in programs; Relevant examples are shown below. This article will also cover memory queues, and although they are somewhat outdated, they do not deserve this treatment.
The sample code in the first article in this series acknowledges the threat of race conditions (whether file-based or memory-based) in IPC. Naturally we will also consider secure concurrency for pipe-based IPC, which will also be covered in this article. The examples for pipes and memory queues will use apis recommended by POSIX, one of POSIX’s core goals being thread safety.
Take a look at the MAN pages of some mq_open functions, which belong to the memory queue API. The features section of the MAN page has a small table:
interface | features | value |
---|---|---|
mq_open() |
Thread safety | MT-Safe |
The above mt-safe means that mq_open is thread-safe and implies that the process is Safe: The execution of a process is similar to the execution of one of its threads. If a race condition does not occur in a thread in the same process, then the race condition does not occur in a thread in a different process. The MT-Safe feature ensures that no race conditions occur when mq_open is called. In general, channel-based IPC is concurrency safe, although there is a caveat regarding warnings in the following example.
Anonymous pipe
Let’s start with a deliberately constructed command line example to show how unnamed pipes work. In all modern systems, symbol | at the command line represents an anonymous pipes. Assuming our command line prompt is %, consider the following command:
## written on the left side |, read on the right
% sleep 5 | echo "Hello, world!"
Copy the code
The Sleep and Echo programs execute in different processes, and nameless pipes allow them to communicate. But the above example is deliberately designed so that no communication takes place. Greetings “Hello, world! Appears on the screen, and then, five seconds later, the command line returns, indicating that both the sleep and Echo processes are finished. What happened in between?
On the command line syntax, the vertical bar | (sleep) is written on the left side of the process, the right of the process (echo) for reading. By default, the reader blocks until bytes of data can be read from the channel, and the writer, after writing its bytes, sends a flag indicating that the stream has terminated end-of-stream. (Even if the writer terminates prematurely, a stream is signaled to the reader that it has terminated.) The unnamed pipe is held until the time when both the writer and the reader stop.
In the example above, the sleep process does not write any bytes of data to the channel, but terminates after five seconds, sending a signal to the channel indicating that the stream has died. Meanwhile, the Echo process immediately writes the greeting to the standard output (screen), and since the process is not reading any bytes from the channel, it is not waiting. Once both the sleep and Echo processes have terminated, the unnamed pipe that will no longer be used for communication will disappear and return to the command line prompt.
The more practical example below uses two unnamed pipes. We assume the contents of the test.dat file are as follows:
this
is
the
way
the
world
ends
Copy the code
The following command:
% cat test.dat | sort | uniq
Copy the code
Output from cat (short for concatenate) is piped to sort to generate sorted output, which is then piped to UNIQ to eliminate duplicate records (in this case, two occurrences of “the” are reduced to one) :
ends
is
the
this
way
world
Copy the code
The scenario shown below shows a program with two processes communicating through an unnamed channel.
Example 1. Two processes communicate through an unnamed pipe
#include <sys/wait.h> /* wait */
#include <stdio.h>
#include
/* exit functions */
#include
/* read, write, pipe, _exit */
#include <string.h>
#define ReadEnd 0
#define WriteEnd 1
void report_and_exit(const char* msg) {
perror(msg);
exit(- 1); /** failure **/
}
int main(a) {
int pipeFDs[2]; /* two file descriptors */
char buf; /* 1-byte buffer */
const char* msg = "Nature's first green is gold\n"; /* bytes to write */
if (pipe(pipeFDs) < 0) report_and_exit("pipeFD");
pid_t cpid = fork(); /* fork a child process */
if (cpid < 0) report_and_exit("fork"); /* check for failure */
if (0 == cpid) { /*** child ***/ /* child process */
close(pipeFDs[WriteEnd]); /* child reads, doesn't write */
while (read(pipeFDs[ReadEnd], &buf, 1) > 0) /* read until end of byte stream */
write(STDOUT_FILENO, &buf, sizeof(buf)); /* echo to the standard output */
close(pipeFDs[ReadEnd]); /* close the ReadEnd: all done */
_exit(0); /* exit and notify parent at once */
}
else { /*** parent ***/
close(pipeFDs[ReadEnd]); /* parent writes, doesn't read */
write(pipeFDs[WriteEnd], msg, strlen(msg)); /* write the bytes to the pipe */
close(pipeFDs[WriteEnd]); /* done writing: generate eof */
wait(NULL); /* wait for child to exit */
exit(0); /* exit normally */
}
return 0;
}
Copy the code
The program named pipeUN above uses the system function fork to create a process. Although the program has only a single source file, multiple processes will occur if it executes correctly.
Here’s a quick review of how the library function fork works:
fork
Function byThe fatherProcess call, which returns on failure- 1
To the parent process. inpipeUN
In this example, the corresponding call is:
Copy the code
pid_t cpid = fork(); /* called in parent */
The return value of the > function call is also saved. In this case, the variable 'cpid' is stored in an integer of type 'pid_t'. Each process has its own * process ID*, which is a non-negative integer that identifies the process. Copying a new process may fail for a number of reasons, including the process table being full. This structure is maintained by the system to keep track of process status. Specifically, zombie processes that are not disposed of may cause the process table to fill up. > * If the 'fork' call succeeds, it creates a new child that returns one value to the parent and another value to the child. Both parent and child processes execute the same code after calling 'fork'. In particular, a successful 'fork' call returns the following: > * returns' 0 'to the child process > * Returns the child process ID to the parent process > * After a successful' fork 'call, an'if` / `else'or its equivalent will be used to separate code for parent and child processes. In this case, the corresponding declaration is: > ' 'cif (0 == cpid) { /*** child ***/
...
}
else { /*** parent ***/
...
}
Copy the code
If a child process is successfully copied, the pipeUN program will execute as follows. In an integer sequence:
int pipeFDs[2]; /* two file descriptors */
Copy the code
To hold two file descriptors, one for writing to and from the pipe. (Array element pipeFDs[0] is the read-side file descriptor, and element pipeFDs[1] is the write-side file descriptor.) A successful call to the system pipe function immediately gives the array two file descriptors before calling fork:
if (pipe(pipeFDs) < 0) report_and_exit("pipeFD");
Copy the code
Both parent and child processes now have copies of the file descriptor. But the separation of concerns pattern means that each process needs exactly one descriptor. In this example, the parent is responsible for writing and the child is responsible for reading, although this role assignment can be reversed. The first statement in the if clause will be used to close the read end of the pipe:
close(pipeFDs[WriteEnd]); /* called in child code */
Copy the code
The else clause in the parent process will close the reading end of the pipe:
close(pipeFDs[ReadEnd]); /* called in parent code */
Copy the code
The parent process then writes some bytes of data (ASCII code) to the unnamed pipe, which the child process reads and then plays back to standard output.
Another point that needs to be clarified in this program is the wait function in the parent process code. Once created, the child process is largely independent of its parent, as the short pipeUN program shows. Child processes can execute arbitrary code that is completely unrelated to the parent process. However, if the child terminates, the system notifies the parent with a signal.
What if the parent terminates before the child? In this case, unless precautions are taken, the child process becomes a zombie process in the process table. There are two main types of precautions. The first is to let the parent process notify the system that it has no interest in the child’s termination:
signal(SIGCHLD, SIG_IGN); /* in parent: ignore notification */
Copy the code
The second method is to have the parent perform a wait when the child terminates. This ensures that the parent can exist independently of the child. The second method is used in the pipeUN program, where the parent process’s code uses the following call:
wait(NULL); /* called in parent */
Copy the code
This call to wait means waiting until termination of any child process occurs, so in the pipeUN program, there is only one child process. (The NULL argument can be replaced with the address of an integer variable that holds the exit status of the subroutine.) For finer grained control, you can also use a more flexible WaitPID function, such as specifying one of multiple child processes.
PipeUN will take another precaution. When the parent finishes waiting, the parent will call the normal exit function to exit. Accordingly, the child process will call the _exit variant to exit, which will quickly track termination notifications. In effect, the child tells the system to notify the parent immediately that its child has terminated.
If two processes write to the same unnamed pipe, will bytes be interleaved? For example, if process P1 writes to a pipe:
foo bar
Copy the code
While process P2 writes concurrently:
baz baz
Copy the code
To the same pipe, the end result seems to be that the contents of the pipe will be arbitrarily scrambled, such as this:
baz foo baz bar
Copy the code
As long as no more than PIPE_BUF bytes are written, the POSIX standard ensures that writes are not interlaced. On Linux, the size of PIPE_BUF is 4096 bytes. For pipes I prefer to have only one writer and one reader to get around this problem.
A named pipe
Nameless pipes have no backup files: the system will maintain an in-memory cache to pass byte data from writer to reader. Once the writer and reader terminates, the cache is reclaimed and the nameless pipe disappears. In contrast, named pipes have backup files and a different API.
Let’s take a look at another command line example to get the gist of named pipes. Here are the steps:
-
Enable two terminals. Both terminals should have the same working directory.
-
On one of the terminals, type the following two commands (the command line prompt is still %, and my comment starts with ##). :
Copy the code
% mkFIFo Tester ## Create a backup file called Tester % cat Tester ## to output the contents of the pipe to STdout
In the beginning, nothing appears in the terminal because nothing has been written to the named pipe so far. * Enter the following command on the second terminal: ' 'shell % cat > Tester## redirect keyboard input to the pipe
hello, world! ## then hit Return key
bye.bye ## ditto
<Control-C> ## terminate session with a Control-C
Copy the code
Whatever is entered in this terminal is displayed in the other terminal. Once you type Ctrl+C, you return to the normal command line prompt because the pipe has been closed.
-
Clean up by removing the file that implements the named pipe:
Copy the code
% unlink tester
As the name of the 'mkFIFo' program implies, the named pipe is also called a FIFO, because the first byte that comes in is the first byte that comes out, and so on. There is a library function called 'mkFIFo' that creates a named pipe in the program, which will be used in the next example, which consists of two processes: one writing to the named pipe and the other reading from the pipe.#### Example 2. FifoWriter
```c
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <time.h>
#include <stdlib.h>
#include <stdio.h>
#define MaxLoops 12000 /* outer loop */
#define ChunkSize 16 /* how many written at a time */
#define IntsPerChunk 4 /* four 4-byte ints per chunk */
#define MaxZs 250 /* max microseconds to sleep */
int main() {
const char* pipeName = "./fifoChannel"; mkfifo(pipeName, 0666); / *read/write for user/group/others */
int fd = open(pipeName, O_CREAT | O_WRONLY); /* open as write-only */
if (fd < 0) return- 1; /* can't go on */ int i; for (i = 0; i < MaxLoops; i++) { /* write MaxWrites times */ int j; for (j = 0; j < ChunkSize; j++) { /* each time, write ChunkSize bytes */ int k; int chunk[IntsPerChunk]; for (k = 0; k < IntsPerChunk; k++) chunk[k] = rand(); write(fd, chunk, sizeof(chunk)); } usleep((rand() % MaxZs) + 1); /* pause a bit for realism */ } close(fd); /* close pipe: generates an end-of-stream marker */ unlink(pipeName); /* unlink from the implementing file */ printf("%i ints sent to the pipe.\n", MaxLoops * ChunkSize * IntsPerChunk); return 0; }Copy the code
The fifoWriter program above can be summarized as follows:
-
First the program creates a named pipe to write data to:
Copy the code
mkfifo(pipeName, 0666); /* read/write perms for user/group/others */ int fd = open(pipeName, O_CREAT | O_WRONLY);
Where 'pipeName' is the name of the backup file passed to 'mkFIFo' as its first argument. The named pipe is then opened through the familiar 'open' function call, which returns a file descriptor. * At the implementation level, 'fifoWriter' does not write all the data at once, but writes to a block, then rest for microseconds for random numbers, and then loops again. In all, 768,000 4-byte integer values were written to the named pipe. * After closing the named pipe, 'fifoWriter' will also unlink the file using 'unlink'. ```c close(fd); /* close pipe: generates end-of-stream marker */ unlink(pipeName); /* unlink from the implementing file */Copy the code
Once each process connected to the pipe has unlinked, the system will reclaim these backup files. In this example, there are only two such processes, fifoWriter and fifoReader, both of which do unlink.
The two programs should be executed in the same working directory on different terminals. But fifoWriter should be started before fifoReader, because fifoWriter is needed to create pipes. FifoReader can then retrieve the newly created named pipe.
Example 3. FifoReader program
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
unsigned is_prime(unsigned n) { /* not pretty, but efficient */
if (n <= 3) return n > 1;
if (0 == (n % 2) | |0 == (n % 3)) return 0;
unsigned i;
for (i = 5; (i * i) <= n; i += 6)
if (0 == (n % i) || 0 == (n % (i + 2))) return 0;
return 1; /* found a prime! * /
}
int main(a) {
const char* file = "./fifoChannel";
int fd = open(file, O_RDONLY);
if (fd < 0) return - 1; /* no point in continuing */
unsigned count = 0, total = 0, primes_count = 0;
while (1) {
int next;
int i;
ssize_t count = read(fd, &next, sizeof(int));
if (0 == count) break; /* end of stream */
else if (count == sizeof(int)) { /* read a 4-byte int value */
total++;
if (is_prime(next)) primes_count++;
}
}
close(fd); /* close pipe from read end */
unlink(file); /* unlink from the underlying file */
printf("Received ints: %u, primes: %u\n", total, primes_count);
return 0;
}
Copy the code
The above fifoReader can be summarized as follows:
-
Since fifoWriter has already created the named pipe, fifoReader only needs to use the standard open call to retrieve the contents of the pipe by backing up the file:
Copy the code
const char* file = “./fifoChannel”; int fd = open(file, O_RDONLY);
This file is read-only. * The program then enters a potentially infinite loop, trying to read 4-byte blocks on each loop. `readC ssize_t count =read(fd, &next, sizeof(int));
Copy the code
Return 0 to indicate the end of the stream. In this case, fifoReader jumps out of the loop, closes the named pipe, and unlinks the file before terminating.
- After reading the 4-byte integer,
fifoReader
Check if the number is prime. This operation represents the logical operation that a production-level reader might perform on the byte data it receives. In the example run, there are 37,682 prime numbers out of the 768,000 integers received.
By repeating the example, fifoReader will successfully read all bytes written by fifoWriter. This is not very surprising. The two processes execute on the same machine, eliminating network-related concerns. Named pipes are widely used as a trusted and efficient IPC mechanism.
Here is the output of the two programs, which are started on different terminals but in the same working directory:
% ./fifoWriter
768000 ints sent to the pipe.
## #
% ./fifoReader
Received ints: 768000, primes: 37682
Copy the code
The message queue
Pipes have strict first-in, first-out behavior: the first byte written will be the first read, the second byte written will be the second read, and so on. Message queues can do the same, but are flexible enough that byte blocks are not received in first-in, first-out order.
As its name suggests, a message queue is a series of messages, each containing two parts:
- Load, a sequence of bytes (char in C)
- Type, given as a positive integer value, is used to classify messages for more flexible recycling
Take a look at the following description of a message queue, each message marked by an integer type:
+-+ +-+ +-+ +-+
sender--->|3|--->|2|--->|2|--->|1|--->receiver
+-+ +-+ +-+ +-+
Copy the code
Of the four messages shown above, the one labeled 1 is the beginning, which is closest to the receiver, followed by another labeled 2, followed by a message labeled 3. If strict FIFO behavior is followed, messages will be received in the order 1-2-2-3. But message queues allow other orders of collection. For example, messages can be received by the recipient in a 3-2-1-2 order.
The mQueue example contains two programs where the Sender writes data to the message queue and the Receiver reads data from the queue. Both programs contain the following header, queue.h:
Example 4. header file queue.h
#define ProjectId 123
#define PathName "queue.h" /* any existing, accessible file would do */
#define MsgLen 4
#define MsgCount 6
typedef struct {
long type; /* must be of type long */
char payload[MsgLen + 1]; /* bytes in the message */
} queuedMessage;
Copy the code
The header above defines a structure type called queuedMessage with fields payload (byte array) and Type (integer). The file also defines symbolic constants (using the #define statement), the first two of which are used to generate a key, which in turn is used to obtain the ID of a message queue. ProjectId can be any positive integer value and PathName must be an existing and accessible file, in this case the file Queue.h. In the sender and receiver, they both have setup statements like:
key_t key = ftok(PathName, ProjectId); /* generate key */
int qid = msgget(key, 0666 | IPC_CREAT); /* use key to get queue id */
Copy the code
ID QID is in effect the counterpart of the message queue file descriptor.
Example 5. sender program
#include <stdio.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdlib.h>
#include <string.h>
#include "queue.h"
void report_and_exit(const char* msg) {
perror(msg);
exit(- 1); /* EXIT_FAILURE */
}
int main(a) {
key_t key = ftok(PathName, ProjectId);
if (key < 0) report_and_exit("couldn't get key...");
int qid = msgget(key, 0666 | IPC_CREAT);
if (qid < 0) report_and_exit("couldn't get queue id...");
char* payloads[] = {"msg1"."msg2"."msg3"."msg4"."msg5"."msg6"};
int types[] = {1.1.2.2.3.3}; /* each must be > 0 */
int i;
for (i = 0; i < MsgCount; i++) {
/* build the message */
queuedMessage msg;
msg.type = types[i];
strcpy(msg.payload, payloads[i]);
/* send the message */
msgsnd(qid, &msg, sizeof(msg), IPC_NOWAIT); /* don't block */
printf("%s sent as type %i\n", msg.payload, (int) msg.type);
}
return 0;
}
Copy the code
The sender above will send six messages, two of one type: the first two of type 1, the next two of type 2, and the last two of type 3. Sent statement:
msgsnd(qid, &msg, sizeof(msg), IPC_NOWAIT);
Copy the code
It is configured to be non-blocking (IPC_NOWAIT flag) because the message sizes are small. The only danger is that a complete sequence might cause a send failure, which this example does not. The following receiver program will also use the IPC_NOWAIT flag to receive messages.
Example 6. receiver program
#include <stdio.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdlib.h>
#include "queue.h"
void report_and_exit(const char* msg) {
perror(msg);
exit(- 1); /* EXIT_FAILURE */
}
int main(a) {
key_t key= ftok(PathName, ProjectId); /* key to identify the queue */
if (key < 0) report_and_exit("key not gotten...");
int qid = msgget(key, 0666 | IPC_CREAT); /* access if created already */
if (qid < 0) report_and_exit("no access to queue...");
int types[] = {3.1.2.1.3.2}; /* different than in sender */
int i;
for (i = 0; i < MsgCount; i++) {
queuedMessage msg; /* defined in queue.h */
if (msgrcv(qid, &msg, sizeof(msg), types[i], MSG_NOERROR | IPC_NOWAIT) < 0)
puts("msgrcv trouble...");
printf("%s received as type %i\n", msg.payload, (int) msg.type);
}
/** remove the queue **/
if (msgctl(qid, IPC_RMID, NULL) < 0) /* NULL = 'no flags' */
report_and_exit("trouble removing queue...");
return 0;
}
Copy the code
The Receiver does not create a message queue, although the API recommends that. In receiver, yeah
int qid = msgget(key, 0666 | IPC_CREAT);
Copy the code
May be misleading because of the IPC_CREAT flag, but the true meaning of this flag is to create it if needed, get it otherwise. The Sender program calls MSGSND to send messages, and the Receiver calls MSGRCV to receive them. In this example, the sender sends messages in the order 1-1-2-3-3, but the receiver receives them in the order 3-1-2-1-3-2, which shows that the message queue is not constrained by strict FIFO behavior:
% ./sender
msg1 sent as type 1
msg2 sent as type 1
msg3 sent as type 2
msg4 sent as type 2
msg5 sent as type 3
msg6 sent as type 3
% ./receiver
msg5 received as type 3
msg1 received as type 1
msg3 received as type 2
msg2 received as type 1
msg6 received as type 3
msg4 received as type 2
Copy the code
The output above shows that the sender and receiver can be started on the same terminal. The output also shows that the message queue is persistent, even after the sender process has completed the entire process of creating the queue, writing data to the queue, and then exiting. The queue disappears only if the receiver process explicitly calls MSGCTL to remove it:
if (msgctl(qid, IPC_RMID, NULL) < 0) /* remove queue */
Copy the code
conclusion
Both the pipeline and message queue apis are fundamentally one-way: one process writes, then another process reads. Of course there are implementations of two-way named pipes, but I think this IPC mechanism is best when it’s easiest. As mentioned earlier, message queues have fallen out of favor, although no particularly good reason has been found for this phenomenon; Queues are still a tool in the IPC toolbox. This quick tour of the IPC toolkit concludes with Part 3, an example of IPC through sockets and signals.
Via: opensource.com/article/19/…
By Marty Kalin, lujun9972
This article is originally compiled by LCTT and released in Linux China