fstrm  0.4.0
Frame Streams implementation in C
fstrm_iothr

Detailed Description

The fstrm_iothr interface creates a background I/O thread which writes Frame Streams encapsulated data frames into an output stream specified by an fstrm_writer object.

It exposes non-blocking input queues that can be used by worker threads to asynchronously write data frames to the output stream. A deferred deallocation callback is invoked after the I/O thread has disposed of a queued data frame.

In order to create an fstrm_iothr object, the caller must first configure and instantiate an fstrm_writer object and pass this instance to the fstrm_iothr_init() function. The fstrm_iothr object then takes ownership of the fstrm_writer object. It is responsible for serializing writes and will take care of destroying the captive fstrm_writer object at the same time the fstrm_iothr object is destroyed. The caller should not perform any operations on the captive fstrm_writer object after it has been passed to fstrm_iothr_init().

Parameters used to configure the I/O thread are passed through an fstrm_iothr_options object. These options have to be specified in advance and are mostly performance knobs which have reasonable defaults.

Once the fstrm_iothr object has been created, handles to the input queues used to submit data frames can be obtained by calling fstrm_iothr_get_input_queue(). This function can be called up to num_input_queues times, and can be safely called concurrently. For instance, in an application with a fixed number of worker threads, an input queue can be dedicated to each worker thread by setting the num_input_queues option to the number of worker threads, and then calling fstrm_iothr_get_input_queue() from each worker thread's startup function to obtain a per-thread input queue.

Macros

#define FSTRM_IOTHR_BUFFER_HINT_MIN   1024
 Minimum buffer_hint value. More...
 
#define FSTRM_IOTHR_BUFFER_HINT_DEFAULT   8192
 Default buffer_hint value. More...
 
#define FSTRM_IOTHR_BUFFER_HINT_MAX   65536
 Maximum buffer_hint value. More...
 
#define FSTRM_IOTHR_FLUSH_TIMEOUT_MIN   1
 Minimum flush_timeout value. More...
 
#define FSTRM_IOTHR_FLUSH_TIMEOUT_DEFAULT   1
 Default flush_timeout value. More...
 
#define FSTRM_IOTHR_FLUSH_TIMEOUT_MAX   600
 Maximum flush_timeout value. More...
 
#define FSTRM_IOTHR_INPUT_QUEUE_SIZE_MIN   2
 Minimum input_queue_size value. More...
 
#define FSTRM_IOTHR_INPUT_QUEUE_SIZE_DEFAULT   512
 Default input_queue_size value. More...
 
#define FSTRM_IOTHR_INPUT_QUEUE_SIZE_MAX   16384
 Maximum input_queue_size value. More...
 
#define FSTRM_IOTHR_NUM_INPUT_QUEUES_MIN   1
 Minimum num_input_queues value. More...
 
#define FSTRM_IOTHR_NUM_INPUT_QUEUES_DEFAULT   1
 Default num_input_queues value. More...
 
#define FSTRM_IOTHR_OUTPUT_QUEUE_SIZE_MIN   2
 Minimum output_queue_size value. More...
 
#define FSTRM_IOTHR_OUTPUT_QUEUE_SIZE_DEFAULT   64
 Default output_queue_size value. More...
 
#define FSTRM_IOTHR_OUTPUT_QUEUE_SIZE_MAX   IOV_MAX
 Maximum output_queue_size value. More...
 
#define FSTRM_IOTHR_QUEUE_MODEL_DEFAULT   FSTRM_IOTHR_QUEUE_MODEL_SPSC
 Default queue_model value. More...
 
#define FSTRM_IOTHR_QUEUE_NOTIFY_THRESHOLD_MIN   1
 Minimum queue_notify_threshold value. More...
 
#define FSTRM_IOTHR_QUEUE_NOTIFY_THRESHOLD_DEFAULT   32
 Default queue_notify_threshold value. More...
 
#define FSTRM_IOTHR_REOPEN_INTERVAL_MIN   1
 Minimum reopen_interval value. More...
 
#define FSTRM_IOTHR_REOPEN_INTERVAL_DEFAULT   5
 Default reopen_interval value. More...
 
#define FSTRM_IOTHR_REOPEN_INTERVAL_MAX   600
 Maximum reopen_interval value. More...
 

Enumerations

enum  fstrm_iothr_queue_model
 Queue models. More...
 

Functions

struct fstrm_iothr_options * fstrm_iothr_options_init (void)
 Initialize an fstrm_iothr_options object. More...
 
void fstrm_iothr_options_destroy (struct fstrm_iothr_options **opt)
 Destroy an fstrm_iothr_options object. More...
 
fstrm_res fstrm_iothr_options_set_buffer_hint (struct fstrm_iothr_options *opt, unsigned buffer_hint)
 Set the buffer_hint parameter. More...
 
fstrm_res fstrm_iothr_options_set_flush_timeout (struct fstrm_iothr_options *opt, unsigned flush_timeout)
 Set the flush_timeout parameter. More...
 
fstrm_res fstrm_iothr_options_set_input_queue_size (struct fstrm_iothr_options *opt, unsigned input_queue_size)
 Set the input_queue_size parameter. More...
 
fstrm_res fstrm_iothr_options_set_num_input_queues (struct fstrm_iothr_options *opt, unsigned num_input_queues)
 Set the num_input_queues parameter. More...
 
fstrm_res fstrm_iothr_options_set_output_queue_size (struct fstrm_iothr_options *opt, unsigned output_queue_size)
 Set the output_queue_size parameter. More...
 
fstrm_res fstrm_iothr_options_set_queue_model (struct fstrm_iothr_options *opt, fstrm_iothr_queue_model queue_model)
 Set the queue_model parameter. More...
 
fstrm_res fstrm_iothr_options_set_queue_notify_threshold (struct fstrm_iothr_options *opt, unsigned queue_notify_threshold)
 Set the queue_notify_threshold parameter. More...
 
fstrm_res fstrm_iothr_options_set_reopen_interval (struct fstrm_iothr_options *opt, unsigned reopen_interval)
 Set the reopen_interval parameter. More...
 
struct fstrm_iothr * fstrm_iothr_init (const struct fstrm_iothr_options *opt, struct fstrm_writer **writer)
 Initialize an fstrm_iothr object. More...
 
void fstrm_iothr_destroy (struct fstrm_iothr **iothr)
 Destroy an fstrm_iothr object. More...
 
struct fstrm_iothr_queue * fstrm_iothr_get_input_queue (struct fstrm_iothr *iothr)
 Obtain an fstrm_iothr_queue object for submitting data frames to the fstrm_iothr object. More...
 
struct fstrm_iothr_queue * fstrm_iothr_get_input_queue_idx (struct fstrm_iothr *iothr, size_t idx)
 Obtain an fstrm_iothr_queue object for submitting data frames to the fstrm_iothr object. More...
 
fstrm_res fstrm_iothr_submit (struct fstrm_iothr *iothr, struct fstrm_iothr_queue *ioq, void *data, size_t len, void(*free_func)(void *buf, void *free_data), void *free_data)
 Submit a data frame to the background I/O thread. More...
 
void fstrm_free_wrapper (void *data, void *free_data)
 Wrapper function for the system's free(), suitable for use as the free_func callback for fstrm_iothr_submit(). More...
 

Macro Definition Documentation

◆ FSTRM_IOTHR_BUFFER_HINT_MIN

#define FSTRM_IOTHR_BUFFER_HINT_MIN   1024

Minimum buffer_hint value.

◆ FSTRM_IOTHR_BUFFER_HINT_DEFAULT

#define FSTRM_IOTHR_BUFFER_HINT_DEFAULT   8192

Default buffer_hint value.

◆ FSTRM_IOTHR_BUFFER_HINT_MAX

#define FSTRM_IOTHR_BUFFER_HINT_MAX   65536

Maximum buffer_hint value.

◆ FSTRM_IOTHR_FLUSH_TIMEOUT_MIN

#define FSTRM_IOTHR_FLUSH_TIMEOUT_MIN   1

Minimum flush_timeout value.

◆ FSTRM_IOTHR_FLUSH_TIMEOUT_DEFAULT

#define FSTRM_IOTHR_FLUSH_TIMEOUT_DEFAULT   1

Default flush_timeout value.

◆ FSTRM_IOTHR_FLUSH_TIMEOUT_MAX

#define FSTRM_IOTHR_FLUSH_TIMEOUT_MAX   600

Maximum flush_timeout value.

◆ FSTRM_IOTHR_INPUT_QUEUE_SIZE_MIN

#define FSTRM_IOTHR_INPUT_QUEUE_SIZE_MIN   2

Minimum input_queue_size value.

◆ FSTRM_IOTHR_INPUT_QUEUE_SIZE_DEFAULT

#define FSTRM_IOTHR_INPUT_QUEUE_SIZE_DEFAULT   512

Default input_queue_size value.

◆ FSTRM_IOTHR_INPUT_QUEUE_SIZE_MAX

#define FSTRM_IOTHR_INPUT_QUEUE_SIZE_MAX   16384

Maximum input_queue_size value.

◆ FSTRM_IOTHR_NUM_INPUT_QUEUES_MIN

#define FSTRM_IOTHR_NUM_INPUT_QUEUES_MIN   1

Minimum num_input_queues value.

◆ FSTRM_IOTHR_NUM_INPUT_QUEUES_DEFAULT

#define FSTRM_IOTHR_NUM_INPUT_QUEUES_DEFAULT   1

Default num_input_queues value.

◆ FSTRM_IOTHR_OUTPUT_QUEUE_SIZE_MIN

#define FSTRM_IOTHR_OUTPUT_QUEUE_SIZE_MIN   2

Minimum output_queue_size value.

◆ FSTRM_IOTHR_OUTPUT_QUEUE_SIZE_DEFAULT

#define FSTRM_IOTHR_OUTPUT_QUEUE_SIZE_DEFAULT   64

Default output_queue_size value.

◆ FSTRM_IOTHR_OUTPUT_QUEUE_SIZE_MAX

#define FSTRM_IOTHR_OUTPUT_QUEUE_SIZE_MAX   IOV_MAX

Maximum output_queue_size value.

◆ FSTRM_IOTHR_QUEUE_MODEL_DEFAULT

#define FSTRM_IOTHR_QUEUE_MODEL_DEFAULT   FSTRM_IOTHR_QUEUE_MODEL_SPSC

Default queue_model value.

◆ FSTRM_IOTHR_QUEUE_NOTIFY_THRESHOLD_MIN

#define FSTRM_IOTHR_QUEUE_NOTIFY_THRESHOLD_MIN   1

Minimum queue_notify_threshold value.

◆ FSTRM_IOTHR_QUEUE_NOTIFY_THRESHOLD_DEFAULT

#define FSTRM_IOTHR_QUEUE_NOTIFY_THRESHOLD_DEFAULT   32

Default queue_notify_threshold value.

◆ FSTRM_IOTHR_REOPEN_INTERVAL_MIN

#define FSTRM_IOTHR_REOPEN_INTERVAL_MIN   1

Minimum reopen_interval value.

◆ FSTRM_IOTHR_REOPEN_INTERVAL_DEFAULT

#define FSTRM_IOTHR_REOPEN_INTERVAL_DEFAULT   5

Default reopen_interval value.

◆ FSTRM_IOTHR_REOPEN_INTERVAL_MAX

#define FSTRM_IOTHR_REOPEN_INTERVAL_MAX   600

Maximum reopen_interval value.

Enumeration Type Documentation

◆ fstrm_iothr_queue_model

Queue models.

See also
fstrm_iothr_options_set_queue_model()
Enumerator
FSTRM_IOTHR_QUEUE_MODEL_SPSC 

Single Producer, Single Consumer.

FSTRM_IOTHR_QUEUE_MODEL_MPSC 

Multiple Producer, Single Consumer.

Function Documentation

◆ fstrm_iothr_options_init()

struct fstrm_iothr_options* fstrm_iothr_options_init ( void  )

Initialize an fstrm_iothr_options object.

This is needed to pass configuration parameters to fstrm_iothr_init().

Returns
fstrm_iothr_options object.

◆ fstrm_iothr_options_destroy()

void fstrm_iothr_options_destroy ( struct fstrm_iothr_options **  opt)

Destroy an fstrm_iothr_options object.

Parameters
optPointer to fstrm_iothr_options object.

◆ fstrm_iothr_options_set_buffer_hint()

fstrm_res fstrm_iothr_options_set_buffer_hint ( struct fstrm_iothr_options *  opt,
unsigned  buffer_hint 
)

Set the buffer_hint parameter.

This is the threshold number of bytes to accumulate in the output buffer before forcing a buffer flush.

Parameters
optfstrm_iothr_options object.
buffer_hintNew buffer_hint value.
Return values
fstrm_res_success
fstrm_res_failure

◆ fstrm_iothr_options_set_flush_timeout()

fstrm_res fstrm_iothr_options_set_flush_timeout ( struct fstrm_iothr_options *  opt,
unsigned  flush_timeout 
)

Set the flush_timeout parameter.

This is the number of seconds to allow unflushed data to remain in the output buffer.

Parameters
optfstrm_iothr_options object.
flush_timeoutNew flush_timeout value.
Return values
fstrm_res_success
fstrm_res_failure

◆ fstrm_iothr_options_set_input_queue_size()

fstrm_res fstrm_iothr_options_set_input_queue_size ( struct fstrm_iothr_options *  opt,
unsigned  input_queue_size 
)

Set the input_queue_size parameter.

This is the number of queue entries to allocate per each input queue. This option controls the number of outstanding data frames per input queue that can be outstanding for deferred processing by the fstrm_iothr object and thus affects performance and memory usage.

This parameter must be a power-of-2.

Parameters
optfstrm_iothr_options object.
input_queue_sizeNew input_queue_size value.
Return values
fstrm_res_success
fstrm_res_failure

◆ fstrm_iothr_options_set_num_input_queues()

fstrm_res fstrm_iothr_options_set_num_input_queues ( struct fstrm_iothr_options *  opt,
unsigned  num_input_queues 
)

Set the num_input_queues parameter.

This is the number of input queues to create and must match the number of times that fstrm_iothr_get_input_queue() is called on the corresponding fstrm_iothr object.

Parameters
optfstrm_iothr_options object.
num_input_queuesNew num_input_queues value.
Return values
fstrm_res_success
fstrm_res_failure

◆ fstrm_iothr_options_set_output_queue_size()

fstrm_res fstrm_iothr_options_set_output_queue_size ( struct fstrm_iothr_options *  opt,
unsigned  output_queue_size 
)

Set the output_queue_size parameter.

This is the number of queue entries to allocate for the output queue. This option controls the maximum number of data frames that can be accumulated in the output queue before a buffer flush must occur and thus affects performance and memory usage.

Parameters
optfstrm_iothr_options object.
output_queue_sizeNew output_queue_size value.
Return values
fstrm_res_success
fstrm_res_failure

◆ fstrm_iothr_options_set_queue_model()

fstrm_res fstrm_iothr_options_set_queue_model ( struct fstrm_iothr_options *  opt,
fstrm_iothr_queue_model  queue_model 
)

Set the queue_model parameter.

This controls what queueing semantics to use for fstrm_iothr_queue objects. Single Producer queues (FSTRM_IOTHR_QUEUE_MODEL_SPSC) may only have a single thread at a time calling fstrm_iothr_submit() on a given fstrm_iothr_queue object, while Multiple Producer queues (FSTRM_IOTHR_QUEUE_MODEL_MPSC) may have multiple threads concurrently calling fstrm_iothr_submit() on a given fstrm_iothr_queue object.

Parameters
optfstrm_iothr_options object.
queue_modelNew queue_model value.
Return values
fstrm_res_success
fstrm_res_failure

◆ fstrm_iothr_options_set_queue_notify_threshold()

fstrm_res fstrm_iothr_options_set_queue_notify_threshold ( struct fstrm_iothr_options *  opt,
unsigned  queue_notify_threshold 
)

Set the queue_notify_threshold parameter.

This controls the number of outstanding queue entries to allow on an input queue before waking the I/O thread, which will cause the outstanding queue entries to begin draining.

Parameters
optfstrm_iothr_options object.
queue_notify_thresholdNew queue_notify_threshold value.
Return values
fstrm_res_success
fstrm_res_failure

◆ fstrm_iothr_options_set_reopen_interval()

fstrm_res fstrm_iothr_options_set_reopen_interval ( struct fstrm_iothr_options *  opt,
unsigned  reopen_interval 
)

Set the reopen_interval parameter.

This controls the number of seconds to wait between attempts to reopen a closed fstrm_writer output stream.

Parameters
optfstrm_iothr_options object.
reopen_intervalNew queue_notify_threshold value.
Return values
fstrm_res_success
fstrm_res_failure

◆ fstrm_iothr_init()

struct fstrm_iothr* fstrm_iothr_init ( const struct fstrm_iothr_options *  opt,
struct fstrm_writer **  writer 
)

Initialize an fstrm_iothr object.

This creates a background I/O thread which asynchronously writes data frames submitted by other threads which call fstrm_iothr_submit().

Parameters
optfstrm_iothr_options object. May be NULL, in which case default values will be used.
writerPointer to fstrm_writer object. Must be non-NULL.
Returns
fstrm_iothr object.
Return values
NULLon failure.

◆ fstrm_iothr_destroy()

void fstrm_iothr_destroy ( struct fstrm_iothr **  iothr)

Destroy an fstrm_iothr object.

This signals the background I/O thread to flush or discard any queued data frames and deallocates any resources used internally. This function blocks until the I/O thread has terminated.

Parameters
iothrPointer to fstrm_iothr object.

◆ fstrm_iothr_get_input_queue()

struct fstrm_iothr_queue* fstrm_iothr_get_input_queue ( struct fstrm_iothr *  iothr)

Obtain an fstrm_iothr_queue object for submitting data frames to the fstrm_iothr object.

fstrm_iothr_queue objects are child objects of their parent fstrm_iothr object and will be destroyed when fstrm_iothr_destroy() is called on the parent fstrm_iothr object.

This function is thread-safe and may be called simultaneously from any thread. For example, in a program which employs a fixed number of worker threads to handle requests, fstrm_iothr_get_input_queue() may be called from a thread startup routine without synchronization.

fstrm_iothr objects allocate a fixed total number of fstrm_iothr_queue objects during the call to fstrm_iothr_init(). To adjust this parameter, use fstrm_iothr_options_set_num_input_queues().

This function will fail if it is called more than num_input_queues times. By default, only one input queue is initialized per fstrm_iothr object.

For optimum performance in a threaded program, each worker thread submitting data frames should have a dedicated fstrm_iothr_queue object. This allows each worker thread to have its own queue which is processed independently by the I/O thread. If the queue model for the fstrm_iothr object is set to FSTRM_IOTHR_QUEUE_MODEL_SPSC, this results in contention-free access to the input queue.

Parameters
iothrfstrm_iothr object.
Returns
fstrm_iothr_queue object.
Return values
NULLon failure.

◆ fstrm_iothr_get_input_queue_idx()

struct fstrm_iothr_queue* fstrm_iothr_get_input_queue_idx ( struct fstrm_iothr *  iothr,
size_t  idx 
)

Obtain an fstrm_iothr_queue object for submitting data frames to the fstrm_iothr object.

This function is like fstrm_iothr_get_input_queue() except it indexes into the fstrm_iothr_queue's array of input queues.

Parameters
iothrfstrm_iothr object.
idxIndex of the fstrm_iothr_queue object to retrieve. This value is limited by the num_input_queues option.
Returns
fstrm_iothr_queue object.
Return values
NULLon failure.

◆ fstrm_iothr_submit()

fstrm_res fstrm_iothr_submit ( struct fstrm_iothr *  iothr,
struct fstrm_iothr_queue *  ioq,
void *  data,
size_t  len,
void(*)(void *buf, void *free_data)  free_func,
void *  free_data 
)

Submit a data frame to the background I/O thread.

If successfully queued and the I/O thread has an active output stream opened, the data frame will be asynchronously written to the output stream.

When this function returns fstrm_res_success, responsibility for deallocating the data frame specified by the data parameter passes to the fstrm library. The caller MUST ensure that the data object remains valid after fstrm_iothr_submit() returns. The callback function specified by the free_func parameter will be invoked once the data frame is no longer needed by the fstrm library. For example, if the data frame is dynamically allocated, the data frame may be deallocated in the callback function.

Note that if this function returns fstrm_res_failure, the responsibility for deallocating the data frame remains with the caller.

As a convenience, if data is allocated with the system's malloc(), fstrm_free_wrapper may be provided as the free_func parameter with the free_data parameter set to NULL. This will cause the system's free() to be invoked to deallocate data.

free_func may be NULL, in which case no callback function will be invoked to dispose of buf. This behavior may be useful if data is a global, statically allocated object.

Parameters
iothrfstrm_iothr object.
ioqfstrm_iothr_queue object.
dataData frame bytes.
lenNumber of bytes in data.
free_funcCallback function to deallocate the data frame. The data and free_data parameters passed to this callback will be the same values originally supplied in the call to fstrm_iothr_submit().
free_dataParameter to pass to free_func.
Return values
fstrm_res_successThe data frame was successfully queued.
fstrm_res_againThe queue is full.
fstrm_res_failurePermanent failure.

◆ fstrm_free_wrapper()

void fstrm_free_wrapper ( void *  data,
void *  free_data 
)

Wrapper function for the system's free(), suitable for use as the free_func callback for fstrm_iothr_submit().

Parameters
dataObject to call free() on.
free_dataUnused.