nmsg 1.1.2
|
Multi-threaded nmsg I/O processing. More...
Go to the source code of this file.
Data Structures | |
struct | nmsg_io_close_event |
Structure for passing information about a close event between the nmsg_io processing loop and the original caller. More... | |
Typedefs | |
typedef void(* | nmsg_io_close_fp) (struct nmsg_io_close_event *ce) |
Function for handling close event notifications. | |
typedef void(* | nmsg_io_user_fp) (unsigned threadno, void *user) |
Optional user-specified function to be run at thread start or thread stop. | |
Enumerations | |
enum | nmsg_io_close_type { nmsg_io_close_type_eof , nmsg_io_close_type_count , nmsg_io_close_type_interval } |
Type of a close event notification. More... | |
enum | nmsg_io_io_type { nmsg_io_io_type_input , nmsg_io_io_type_output } |
Type of the stream associated with a close event. More... | |
enum | nmsg_io_output_mode { nmsg_io_output_mode_stripe , nmsg_io_output_mode_mirror } |
Output behavior when multiple outputs are present. More... | |
Functions | |
nmsg_io_t | nmsg_io_init (void) |
Initialize a new nmsg_io_t object. | |
nmsg_res | nmsg_io_add_filter (nmsg_io_t io, nmsg_filter_message_fp fp, void *data) |
Add a user-specified filter function to the filter chain. | |
nmsg_res | nmsg_io_add_filter_module (nmsg_io_t io, const char *name, const void *param, const size_t len_param) |
Add a filter module to the filter chain. | |
nmsg_res | nmsg_io_add_input (nmsg_io_t io, nmsg_input_t input, void *user) |
Add an nmsg input to an nmsg_io_t object. | |
nmsg_res | nmsg_io_add_input_channel (nmsg_io_t io, const char *chan, void *user) |
Add an nmsg input channel to an nmsg_io_t object. | |
nmsg_res | nmsg_io_add_input_zmq_channel (nmsg_io_t io, void *zmq_ctx, const char *chan, void *user) |
Add an nmsg ZMQ input channel to an nmsg_io_t object. | |
nmsg_res | nmsg_io_add_input_sockspec (nmsg_io_t io, const char *sockspec, void *user) |
Add an nmsg input sockspec to an nmsg_io_t object. | |
nmsg_res | nmsg_io_add_input_fname (nmsg_io_t io, const char *fname, void *user) |
Add an NMSG file to an nmsg_io_t object. | |
nmsg_res | nmsg_io_add_output (nmsg_io_t io, nmsg_output_t output, void *user) |
Add an nmsg output to an nmsg_io_t object. | |
nmsg_res | nmsg_io_loop (nmsg_io_t io) |
Begin processing the data specified by the configured inputs and outputs. | |
void | nmsg_io_breakloop (nmsg_io_t io) |
Force a currently executing nmsg_io_loop() to stop looping and return. | |
void | nmsg_io_destroy (nmsg_io_t *io) |
Deallocate the resources associated with an nmsg_io_t object. | |
unsigned | nmsg_io_get_num_inputs (nmsg_io_t io) |
Get the number of inputs bound to the nmsg_io_t object. | |
unsigned | nmsg_io_get_num_outputs (nmsg_io_t io) |
Get the number of outputs bound to the nmsg_io_t object. | |
void | nmsg_io_set_close_fp (nmsg_io_t io, nmsg_io_close_fp close_fp) |
Set the close event notification function associated with an nmsg_io_t object. | |
void | nmsg_io_set_atstart_fp (nmsg_io_t io, nmsg_io_user_fp user_fp, void *user) |
Set a user-specified function to be called in each thread after the thread starts. | |
void | nmsg_io_set_atexit_fp (nmsg_io_t io, nmsg_io_user_fp user_fp, void *user) |
Set a user-specified function to be called in each thread before the thread exits. | |
void | nmsg_io_set_count (nmsg_io_t io, unsigned count) |
Configure the nmsg_io_t object to close inputs after processing a certain non-zero number of payloads. | |
void | nmsg_io_set_debug (nmsg_io_t io, int debug) |
Set the debug level for an nmsg_io_t object. | |
void | nmsg_io_set_filter_policy (nmsg_io_t io, const nmsg_filter_message_verdict policy) |
Set the filter policy for the nmsg_io_t object's filter chain. | |
void | nmsg_io_set_interval (nmsg_io_t io, unsigned interval) |
Configure the nmsg_io_t object to close inputs periodically, every #interval seconds. | |
void | nmsg_io_set_interval_randomized (nmsg_io_t io, bool randomized) |
Configure the nmsg_io_t object to randomize the initial second within the interval where it closes inputs, rather than on the zeroth second of the interval. | |
void | nmsg_io_set_output_mode (nmsg_io_t io, nmsg_io_output_mode output_mode) |
Set the output mode behavior for an nmsg_io_t object. | |
nmsg_res | nmsg_io_get_stats (nmsg_io_t io, uint64_t *sum_in, uint64_t *sum_out, uint64_t *container_recvs, uint64_t *container_drops) |
Get counters of input and output payloads and containers received and dropped since the start of nmsg_io_loop(). | |
Multi-threaded nmsg I/O processing.
nmsg_io_t objects handle the multiplexing of NMSG data between nmsg_input_t and nmsg_output_t objects. Callers should initialize at least one input and at least one output and add them to an nmsg_io_t object before calling nmsg_io_loop().
Striping and mirroring of input payloads to individual outputs is supported. Striping is the default mode. Mirroring imposes the overhead of a per-output copy for each input payload.
MP:
Definition in file io.h.
typedef void(* nmsg_io_close_fp) (struct nmsg_io_close_event *ce) |
typedef void(* nmsg_io_user_fp) (unsigned threadno, void *user) |
enum nmsg_io_close_type |
enum nmsg_io_io_type |
enum nmsg_io_output_mode |
nmsg_io_t nmsg_io_init | ( | void | ) |
Initialize a new nmsg_io_t object.
nmsg_res nmsg_io_add_filter | ( | nmsg_io_t | io, |
nmsg_filter_message_fp | fp, | ||
void * | data | ||
) |
Add a user-specified filter function to the filter chain.
See the documentation for nmsg_filter_message_fp for further details about the semantics of the filter function itself.
This function appends the specified filter to the end of an nmsg_io_t object's list of filters.
When an nmsg_io_t runs its processing loop, each message read from an input stream is sequentially passed to each filter. If a filter returns the verdict nmsg_filter_message_verdict_DROP, the message will be immediately destroyed with no further processing. The verdict nmsg_filter_message_verdict_ACCEPT causes the message to be accepted into the output stream, bypassing any remaining filters in the filter chain, if any. The verdict nmsg_filter_message_verdict_DECLINED causes the message to be passed to the next filter in the filter chain, if any.
Filters in the filter chain are executed in the order that they were added to the nmsg_io_t object with nmsg_io_add_filter() and nmsg_io_add_filter_module().
If the entire filter chain has been executed and all filters have returned the verdict nmsg_filter_message_verdict_DECLINED, the default action to take is determined by the nmsg_io_t's filter policy, which can be set with nmsg_io_set_filter_policy(). The default filter policy is nmsg_filter_message_verdict_ACCEPT.
All filters must be added to the nmsg_io_t object before calling nmsg_io_loop(). It is an unchecked runtime error for a caller to attempt to modify the filter chain on an nmsg_io_t that is currently executing its processing loop.
[in] | io | Valid nmsg_io_t object. |
[in] | fp | User-specified function. |
[in] | data | User pointer to be passed to user function. |
nmsg_res nmsg_io_add_filter_module | ( | nmsg_io_t | io, |
const char * | name, | ||
const void * | param, | ||
const size_t | len_param | ||
) |
Add a filter module to the filter chain.
This function instantiates an nmsg_fltmod_t object with the specified 'name', 'param', and 'len_param' values and appends the filter to the end of an nmsg_io_t object's list of filters.
Filter modules allow filter functions to be wrapped in external shared objects, but they otherwise participate in the filter chain in the same way that a filter function added with nmsg_io_add_filter() does.
[in] | io | Valid nmsg_io_t object. |
[in] | name | Passed to nmsg_fltmod_init(). |
[in] | param | Passed to nmsg_fltmod_init(). |
[in] | len_param | Passed to nmsg_fltmod_init(). |
nmsg_res nmsg_io_add_input | ( | nmsg_io_t | io, |
nmsg_input_t | input, | ||
void * | user | ||
) |
Add an nmsg input to an nmsg_io_t object.
When nmsg_io_loop() is called, one thread will be created for each input to process input payloads.
[in] | io | Valid nmsg_io_t object. |
[in] | input | Valid nmsg_input_t object. |
[in] | user | NULL or an input-specific user pointer. |
nmsg_res nmsg_io_add_input_channel | ( | nmsg_io_t | io, |
const char * | chan, | ||
void * | user | ||
) |
Add an nmsg input channel to an nmsg_io_t object.
When nmsg_io_loop() is called, one thread will be created for each input socket constituting the channel to process input payloads.
"Channels" are specified in the channel alias file, which is usually a file named "nmsg.chalias" in the sysconfdir.
[in] | io | Valid nmsg_io_t object. |
[in] | chan | Input channel name. |
[in] | user | NULL or an input-specific user pointer. |
nmsg_res nmsg_io_add_input_zmq_channel | ( | nmsg_io_t | io, |
void * | zmq_ctx, | ||
const char * | chan, | ||
void * | user | ||
) |
Add an nmsg ZMQ input channel to an nmsg_io_t object.
[in] | io | Valid nmsg_io_t object. |
[in] | zmq_ctx | ZMQ context object. |
[in] | chan | Input channel name. |
[in] | user | NULL or an input-specific user pointer. |
nmsg_res nmsg_io_add_input_sockspec | ( | nmsg_io_t | io, |
const char * | sockspec, | ||
void * | user | ||
) |
Add an nmsg input sockspec to an nmsg_io_t object.
When nmsg_io_loop() is called, one thread will be created for each input socket constituting the sockspec to process input payloads.
Sockspecs are strings in the form "<ADDRESS>/<PORTRANGE>" where <ADDRESS> is an IPv4 or IPv6 address, and <PORTRANGE> is either a single port or a contiguous, inclusive range of ports of the form "<PORT_START>..<PORT_END>".
[in] | io | Valid nmsg_io_t object. |
[in] | sock | Input channel. |
[in] | user | NULL or an input-specific user pointer. |
nmsg_res nmsg_io_add_input_fname | ( | nmsg_io_t | io, |
const char * | fname, | ||
void * | user | ||
) |
Add an NMSG file to an nmsg_io_t object.
[in] | io | Valid nmsg_io_t object. |
[in] | fname | Name of NMSG file. |
[in] | user | NULL or an input-specific user pointer. |
nmsg_res nmsg_io_add_output | ( | nmsg_io_t | io, |
nmsg_output_t | output, | ||
void * | user | ||
) |
Add an nmsg output to an nmsg_io_t object.
When nmsg_io_loop() is called, the input threads will cycle over and write payloads to the available outputs.
[in] | io | Valid nmsg_io_t object. |
[in] | output | Valid nmsg_output_t object. |
[in] | user | NULL or an output-specific user pointer. |
nmsg_res nmsg_io_loop | ( | nmsg_io_t | io | ) |
Begin processing the data specified by the configured inputs and outputs.
One processing thread is created for each input. nmsg_io_loop() does not return until these threads finish and are destroyed.
Only nmsg_io_breakloop() may be called asynchronously while nmsg_io_loop() is executing.
nmsg_io_loop() invalidates an nmsg_io_t object. nmsg_io_destroy() should then be called.
[in] | io | valid nmsg_io_t object. |
void nmsg_io_breakloop | ( | nmsg_io_t | io | ) |
Force a currently executing nmsg_io_loop() to stop looping and return.
Since nmsg_io_loop() is a blocking call, nmsg_io_breakloop() must be called asynchronously.
This function is safe to call inside a signal handler.
[in] | io | Valid and currently processing nmsg_io_t object. |
void nmsg_io_destroy | ( | nmsg_io_t * | io | ) |
Deallocate the resources associated with an nmsg_io_t object.
[in] | io | Pointer to an nmsg_io_t object. |
unsigned nmsg_io_get_num_inputs | ( | nmsg_io_t | io | ) |
Get the number of inputs bound to the nmsg_io_t object.
unsigned nmsg_io_get_num_outputs | ( | nmsg_io_t | io | ) |
Get the number of outputs bound to the nmsg_io_t object.
void nmsg_io_set_close_fp | ( | nmsg_io_t | io, |
nmsg_io_close_fp | close_fp | ||
) |
Set the close event notification function associated with an nmsg_io_t object.
The provided function will only be called at EOF on an input or output stream, unless nmsg_io_set_count() or nmsg_io_set_interval() are used to specify conditions when an input stream should be closed.
[in] | io | Valid nmsg_io_t object. |
[in] | close_fp | Close event notification function. It must be reentrant. |
void nmsg_io_set_atstart_fp | ( | nmsg_io_t | io, |
nmsg_io_user_fp | user_fp, | ||
void * | user | ||
) |
Set a user-specified function to be called in each thread after the thread starts.
[in] | io | Valid nmsg_io_t object. |
[in] | user_fp | User-specified function. |
[in] | user | User pointer to be passed to user function. |
void nmsg_io_set_atexit_fp | ( | nmsg_io_t | io, |
nmsg_io_user_fp | user_fp, | ||
void * | user | ||
) |
Set a user-specified function to be called in each thread before the thread exits.
[in] | io | Valid nmsg_io_t object. |
[in] | user_fp | User-specified function. |
[in] | user | User pointer to be passed to user function. |
void nmsg_io_set_count | ( | nmsg_io_t | io, |
unsigned | count | ||
) |
Configure the nmsg_io_t object to close inputs after processing a certain non-zero number of payloads.
Note that setting a count only guarantees that processing will terminate soon after at least #count payloads have been received. It is possible for an amount of inputs slightly greater than #count to be processed before the nmsg_io_t instance stops.
If the 'user' pointer associated with an output stream is non-NULL the close event notification function must be set, and this function must reopen the stream. If the 'user' pointer is NULL, nmsg_io processing will be shut down.
[in] | io | Valid nmsg_io_t object. |
[in] | count | Integer > 0. |
void nmsg_io_set_debug | ( | nmsg_io_t | io, |
int | debug | ||
) |
Set the debug level for an nmsg_io_t object.
Debug levels >= 0 will cause debugging information to be logged to stderr.
[in] | io | Valid nmsg_io_t object. |
[in] | debug | Debug level. |
void nmsg_io_set_filter_policy | ( | nmsg_io_t | io, |
const nmsg_filter_message_verdict | policy | ||
) |
Set the filter policy for the nmsg_io_t object's filter chain.
If all filters in the filter chain return nmsg_filter_message_verdict_DECLINED for a particular message, the filter policy determines the default policy action to be applied to the message.
If not explicitly set, the default is nmsg_filter_message_verdict_ACCEPT.
[in] | io | Valid nmsg_io_t object. |
[in] | policy | The filter policy to apply by default. Must be either nmsg_filter_message_verdict_ACCEPT or nmsg_filter_message_verdict_DROP. |
void nmsg_io_set_interval | ( | nmsg_io_t | io, |
unsigned | interval | ||
) |
Configure the nmsg_io_t object to close inputs periodically, every #interval seconds.
The periodic closure is relative to the UNIX epoch, not the start of nmsg_io_loop(). The actual closing may be delayed up to 0.5s [NMSG_RBUF_TIMEOUT] after the interval's end.
If the 'user' pointer associated with an output stream is non-NULL the close event notification function must be set, and this function must reopen the stream. If the 'user' pointer is NULL, nmsg_io processing will be shut down.
[in] | io | Valid nmsg_io_t object. |
[in] | interval | Positive number of seconds. |
void nmsg_io_set_interval_randomized | ( | nmsg_io_t | io, |
bool | randomized | ||
) |
Configure the nmsg_io_t object to randomize the initial second within the interval where it closes inputs, rather than on the zeroth second of the interval.
[in] | io | Valid nmsg_io_t object. |
[in] | randomized | Boolean flag. |
void nmsg_io_set_output_mode | ( | nmsg_io_t | io, |
nmsg_io_output_mode | output_mode | ||
) |
Set the output mode behavior for an nmsg_io_t object.
Nmsg payloads received from inputs may be striped across available outputs (the default), or mirrored across all available outputs.
Since nmsg_io must synchronize access to individual outputs, the mirrored output mode will limit the amount of parallelism that can be achieved.
[in] | io | Valid nmsg_io_t object. |
[in] | output_mode | nmsg_io_output_mode_stripe or nmsg_io_output_mode_mirror. |
nmsg_res nmsg_io_get_stats | ( | nmsg_io_t | io, |
uint64_t * | sum_in, | ||
uint64_t * | sum_out, | ||
uint64_t * | container_recvs, | ||
uint64_t * | container_drops | ||
) |
Get counters of input and output payloads and containers received and dropped since the start of nmsg_io_loop().
[in] | io | Valid nmsg_io_t object. |
[out] | sum_in | Number of input payloads. |
[out] | sum_out | Number of output payloads. |
[out] | container_recvs | Number of containers received. |
[out] | container_drops | Number of container drops detected. |