fstrm  0.4.0
Frame Streams implementation in C
Library overview

Initializing the library

fstrm has no global library state. In most cases, only a single fstrm_iothr library context object will be needed for the entire process, which will implicitly create a background I/O serialization thread. This I/O thread is bound to a particular output writer (for example, an AF_UNIX socket) and is fully buffered – submitted data frames will be accumulated in an output buffer and periodically flushed, minimizing the number of system calls that need to be performed. This frees worker threads from waiting for a write() to complete.

fstrm abstracts the actual I/O operations needed to read or write a byte stream. File and socket I/O implementations are included in the library, but if necessary fstrm can be extended to support new types of byte stream transports. See the fstrm_reader, fstrm_writer, and fstrm_rdwr interfaces for details.

The following code example shows the initialization of an fstrm_iothr library context object connected to an fstrm_file writer.

const char *file_path = "/tmp/output.fs";
struct fstrm_file_options *fopt;
struct fstrm_iothr *iothr;
struct fstrm_writer *writer;
writer = fstrm_file_writer_init(fopt, NULL);
if (!writer) {
fprintf(stderr, "Error: fstrm_file_writer_init() failed.\n");
exit(EXIT_FAILURE);
}
iothr = fstrm_iothr_init(NULL, &writer);
if (!iothr) {
fprintf(stderr, "Error: fstrm_iothr_init() failed.\n");
exit(EXIT_FAILURE);
}

Since the I/O operations are abstracted through the fstrm_writer interface, the writer variable in the above example could instead have been initialized with a completely different implementation. For example, fstrm_unix_writer objects can be initialized as follows:

const char *socket_path = "/tmp/output.sock";
struct fstrm_writer *writer;
struct fstrm_unix_writer_options *uwopt;
writer = fstrm_unix_writer_init(uwopt, NULL);
if (!writer) {
fprintf(stderr, "Error: fstrm_unix_writer_init() failed.\n");
exit(EXIT_FAILURE);
}

Getting an input queue

After the fstrm_iothr object has been created with fstrm_iothr_init(), an input queue handle can be obtained with the fstrm_iothr_get_input_queue() function, which returns an fstrm_iothr_queue object. This function is thread-safe and returns a unique queue each time it is called, up to the number of queues specified by fstrm_iothr_options_set_num_input_queues(). fstrm_iothr_queue objects belong to their parent fstrm_iothr object and will be destroyed when the parent fstrm_iothr object is destroyed.

The following code example shows a single fstrm_iothr_queue handle being obtained from an already initialized fstrm_iothr library context object.

// 'iothr' is a struct fstrm_iothr *
struct fstrm_iothr_queue *ioq;
if (!ioq) {
fprintf(stderr, "Error: fstrm_iothr_get_input_queue() failed.\n");
exit(EXIT_FAILURE);
}

Submitting data frames

Once the fstrm_iothr object has been created and an fstrm_iothr_queue handle is available, data frames can be submitted for asynchronous writing using the fstrm_iothr_submit() function. A callback is passed to this function which will be invoked to deallocate the data frame once the I/O thread has completed processing it. In the common case where the data frame is dynamically allocated with malloc(), the deallocation callback must call free(). fstrm_free_wrapper() is provided as a convenience function which does this and can be specified as the free_func parameter to fstrm_iothr_submit().

If space is available in the queue, fstrm_iothr_submit() will return fstrm_res_success, indicating that ownership of the memory allocation for the data frame has passed from the caller to the library. The caller must not reuse or deallocate the memory for the data frame after a successful call to fstrm_iothr_submit().

Callers must check the return value of fstrm_iothr_submit(). If this function fails, that is, it returns any result code other than fstrm_res_success, the caller must deallocate or otherwise dispose of memory allocated for the data frame, in order to avoid leaking memory. fstrm_iothr_submit() can fail with fstrm_res_again if there is currently no space in the circular queue for an additional frame, in which case a later call to fstrm_iothr_submit() with the same parameters may succeed. However, if fstrm_iothr_submit() fails with fstrm_res_invalid, then there is a problem with the parameters and a later call will not succeed.

The following code example shows data frames containing a short sequence of bytes being created and submitted repeatedly, with appropriate error handling. Note that the data frames in this example intentionally contain embedded unprintable characters, showing that Frame Streams is binary clean. This example follows from the previous examples, where the iothr and ioq variables have already been initialized.

// 'iothr' is a struct fstrm_iothr *
// 'ioq' is a struct fstrm_queue *
const unsigned num_frames = 100;
const uint8_t frame_template[] = {
'H', 'e', 'l', 'l', 'o', 0x00, 0x01, 0x02, 0x03,
'W', 'o', 'r', 'l', 'd', 0x04, 0x05, 0x06, 0x07,
};
for (unsigned i = 0; i < num_frames; i++) {
// Allocate a new frame from the template.
uint8_t *frame = malloc(sizeof(frame_template));
if (!frame)
break;
memcpy(frame, frame_template, sizeof(frame_template));
// Submit the frame for writing.
for (;;) {
fstrm_res res;
res = fstrm_iothr_submit(iothr, ioq, frame,
sizeof(frame_template),
if (res == fstrm_res_success) {
// Frame successfully queued.
break;
} else if (res == fstrm_res_again) {
// Queue is full. Try again in a busy loop.
// Alternatively, if loss can be tolerated we
// could free the frame here and break out of
// the loop.
continue;
} else {
// Permanent failure.
free(frame);
fputs("fstrm_iothr_submit() failed.\n", stderr);
break;
}
}
}

Shutting down

Calling fstrm_iothr_destroy() on the fstrm_iothr object will signal the I/O thread to flush any outstanding data frames being written and will deallocate all associated resources. This function is synchronous and does not return until the I/O thread has terminated.

// 'iothr' is a struct fstrm_iothr *