nmsg 1.1.2
Data Structures | Typedefs | Enumerations | Functions
io.h File Reference

Multi-threaded nmsg I/O processing. More...

Go to the source code of this file.

Data Structures

struct  nmsg_io_close_event
 Structure for passing information about a close event between the nmsg_io processing loop and the original caller. More...
 

Typedefs

typedef void(* nmsg_io_close_fp) (struct nmsg_io_close_event *ce)
 Function for handling close event notifications.
 
typedef void(* nmsg_io_user_fp) (unsigned threadno, void *user)
 Optional user-specified function to be run at thread start or thread stop.
 

Enumerations

enum  nmsg_io_close_type {
  nmsg_io_close_type_eof ,
  nmsg_io_close_type_count ,
  nmsg_io_close_type_interval
}
 Type of a close event notification. More...
 
enum  nmsg_io_io_type {
  nmsg_io_io_type_input ,
  nmsg_io_io_type_output
}
 Type of the stream associated with a close event. More...
 
enum  nmsg_io_output_mode {
  nmsg_io_output_mode_stripe ,
  nmsg_io_output_mode_mirror
}
 Output behavior when multiple outputs are present. More...
 

Functions

nmsg_io_t nmsg_io_init (void)
 Initialize a new nmsg_io_t object.
 
nmsg_res nmsg_io_add_filter (nmsg_io_t io, nmsg_filter_message_fp fp, void *data)
 Add a user-specified filter function to the filter chain.
 
nmsg_res nmsg_io_add_filter_module (nmsg_io_t io, const char *name, const void *param, const size_t len_param)
 Add a filter module to the filter chain.
 
nmsg_res nmsg_io_add_input (nmsg_io_t io, nmsg_input_t input, void *user)
 Add an nmsg input to an nmsg_io_t object.
 
nmsg_res nmsg_io_add_input_channel (nmsg_io_t io, const char *chan, void *user)
 Add an nmsg input channel to an nmsg_io_t object.
 
nmsg_res nmsg_io_add_input_zmq_channel (nmsg_io_t io, void *zmq_ctx, const char *chan, void *user)
 Add an nmsg ZMQ input channel to an nmsg_io_t object.
 
nmsg_res nmsg_io_add_input_sockspec (nmsg_io_t io, const char *sockspec, void *user)
 Add an nmsg input sockspec to an nmsg_io_t object.
 
nmsg_res nmsg_io_add_input_fname (nmsg_io_t io, const char *fname, void *user)
 Add an NMSG file to an nmsg_io_t object.
 
nmsg_res nmsg_io_add_output (nmsg_io_t io, nmsg_output_t output, void *user)
 Add an nmsg output to an nmsg_io_t object.
 
nmsg_res nmsg_io_loop (nmsg_io_t io)
 Begin processing the data specified by the configured inputs and outputs.
 
void nmsg_io_breakloop (nmsg_io_t io)
 Force a currently executing nmsg_io_loop() to stop looping and return.
 
void nmsg_io_destroy (nmsg_io_t *io)
 Deallocate the resources associated with an nmsg_io_t object.
 
unsigned nmsg_io_get_num_inputs (nmsg_io_t io)
 Get the number of inputs bound to the nmsg_io_t object.
 
unsigned nmsg_io_get_num_outputs (nmsg_io_t io)
 Get the number of outputs bound to the nmsg_io_t object.
 
void nmsg_io_set_close_fp (nmsg_io_t io, nmsg_io_close_fp close_fp)
 Set the close event notification function associated with an nmsg_io_t object.
 
void nmsg_io_set_atstart_fp (nmsg_io_t io, nmsg_io_user_fp user_fp, void *user)
 Set a user-specified function to be called in each thread after the thread starts.
 
void nmsg_io_set_atexit_fp (nmsg_io_t io, nmsg_io_user_fp user_fp, void *user)
 Set a user-specified function to be called in each thread before the thread exits.
 
void nmsg_io_set_count (nmsg_io_t io, unsigned count)
 Configure the nmsg_io_t object to close inputs after processing a certain non-zero number of payloads.
 
void nmsg_io_set_debug (nmsg_io_t io, int debug)
 Set the debug level for an nmsg_io_t object.
 
void nmsg_io_set_filter_policy (nmsg_io_t io, const nmsg_filter_message_verdict policy)
 Set the filter policy for the nmsg_io_t object's filter chain.
 
void nmsg_io_set_interval (nmsg_io_t io, unsigned interval)
 Configure the nmsg_io_t object to close inputs periodically, every #interval seconds.
 
void nmsg_io_set_interval_randomized (nmsg_io_t io, bool randomized)
 Configure the nmsg_io_t object to randomize the initial second within the interval where it closes inputs, rather than on the zeroth second of the interval.
 
void nmsg_io_set_output_mode (nmsg_io_t io, nmsg_io_output_mode output_mode)
 Set the output mode behavior for an nmsg_io_t object.
 
nmsg_res nmsg_io_get_stats (nmsg_io_t io, uint64_t *sum_in, uint64_t *sum_out, uint64_t *container_recvs, uint64_t *container_drops)
 Get counters of input and output payloads and containers received and dropped since the start of nmsg_io_loop().
 

Detailed Description

Multi-threaded nmsg I/O processing.

nmsg_io_t objects handle the multiplexing of NMSG data between nmsg_input_t and nmsg_output_t objects. Callers should initialize at least one input and at least one output and add them to an nmsg_io_t object before calling nmsg_io_loop().

Striping and mirroring of input payloads to individual outputs is supported. Striping is the default mode. Mirroring imposes the overhead of a per-output copy for each input payload.

MP:

Definition in file io.h.

Typedef Documentation

◆ nmsg_io_close_fp

typedef void(* nmsg_io_close_fp) (struct nmsg_io_close_event *ce)

Function for handling close event notifications.

Parameters
[in,out]ceClose event

Definition at line 124 of file io.h.

◆ nmsg_io_user_fp

typedef void(* nmsg_io_user_fp) (unsigned threadno, void *user)

Optional user-specified function to be run at thread start or thread stop.

Definition at line 129 of file io.h.

Enumeration Type Documentation

◆ nmsg_io_close_type

Type of a close event notification.

Enumerator
nmsg_io_close_type_eof 

end of file

nmsg_io_close_type_count 

payload count reached

nmsg_io_close_type_interval 

interval elapsed

Definition at line 47 of file io.h.

◆ nmsg_io_io_type

Type of the stream associated with a close event.

Enumerator
nmsg_io_io_type_input 

close event input

nmsg_io_io_type_output 

close event output

Definition at line 56 of file io.h.

◆ nmsg_io_output_mode

Output behavior when multiple outputs are present.

Enumerator
nmsg_io_output_mode_stripe 

stripe payloads across output

nmsg_io_output_mode_mirror 

mirror payloads across output

Definition at line 64 of file io.h.

Function Documentation

◆ nmsg_io_init()

nmsg_io_t nmsg_io_init ( void  )

Initialize a new nmsg_io_t object.

Returns
Opaque pointer that is NULL on failure or non-NULL on success.

◆ nmsg_io_add_filter()

nmsg_res nmsg_io_add_filter ( nmsg_io_t  io,
nmsg_filter_message_fp  fp,
void *  data 
)

Add a user-specified filter function to the filter chain.

See the documentation for nmsg_filter_message_fp for further details about the semantics of the filter function itself.

This function appends the specified filter to the end of an nmsg_io_t object's list of filters.

When an nmsg_io_t runs its processing loop, each message read from an input stream is sequentially passed to each filter. If a filter returns the verdict nmsg_filter_message_verdict_DROP, the message will be immediately destroyed with no further processing. The verdict nmsg_filter_message_verdict_ACCEPT causes the message to be accepted into the output stream, bypassing any remaining filters in the filter chain, if any. The verdict nmsg_filter_message_verdict_DECLINED causes the message to be passed to the next filter in the filter chain, if any.

Filters in the filter chain are executed in the order that they were added to the nmsg_io_t object with nmsg_io_add_filter() and nmsg_io_add_filter_module().

If the entire filter chain has been executed and all filters have returned the verdict nmsg_filter_message_verdict_DECLINED, the default action to take is determined by the nmsg_io_t's filter policy, which can be set with nmsg_io_set_filter_policy(). The default filter policy is nmsg_filter_message_verdict_ACCEPT.

All filters must be added to the nmsg_io_t object before calling nmsg_io_loop(). It is an unchecked runtime error for a caller to attempt to modify the filter chain on an nmsg_io_t that is currently executing its processing loop.

Parameters
[in]ioValid nmsg_io_t object.
[in]fpUser-specified function.
[in]dataUser pointer to be passed to user function.
Returns
nmsg_res_success

◆ nmsg_io_add_filter_module()

nmsg_res nmsg_io_add_filter_module ( nmsg_io_t  io,
const char *  name,
const void *  param,
const size_t  len_param 
)

Add a filter module to the filter chain.

This function instantiates an nmsg_fltmod_t object with the specified 'name', 'param', and 'len_param' values and appends the filter to the end of an nmsg_io_t object's list of filters.

Filter modules allow filter functions to be wrapped in external shared objects, but they otherwise participate in the filter chain in the same way that a filter function added with nmsg_io_add_filter() does.

See also
nmsg_io_add_filter()
nmsg_fltmod_init()
Parameters
[in]ioValid nmsg_io_t object.
[in]namePassed to nmsg_fltmod_init().
[in]paramPassed to nmsg_fltmod_init().
[in]len_paramPassed to nmsg_fltmod_init().
Returns
nmsg_res_success
nmsg_res_failure If creating the nmsg_fltmod_t object failed.

◆ nmsg_io_add_input()

nmsg_res nmsg_io_add_input ( nmsg_io_t  io,
nmsg_input_t  input,
void *  user 
)

Add an nmsg input to an nmsg_io_t object.

When nmsg_io_loop() is called, one thread will be created for each input to process input payloads.

Parameters
[in]ioValid nmsg_io_t object.
[in]inputValid nmsg_input_t object.
[in]userNULL or an input-specific user pointer.
Returns
nmsg_res_success
nmsg_res_memfail

◆ nmsg_io_add_input_channel()

nmsg_res nmsg_io_add_input_channel ( nmsg_io_t  io,
const char *  chan,
void *  user 
)

Add an nmsg input channel to an nmsg_io_t object.

When nmsg_io_loop() is called, one thread will be created for each input socket constituting the channel to process input payloads.

"Channels" are specified in the channel alias file, which is usually a file named "nmsg.chalias" in the sysconfdir.

Parameters
[in]ioValid nmsg_io_t object.
[in]chanInput channel name.
[in]userNULL or an input-specific user pointer.
Returns
nmsg_res_success
nmsg_res_parse_error
nmsg_res_memfail

◆ nmsg_io_add_input_zmq_channel()

nmsg_res nmsg_io_add_input_zmq_channel ( nmsg_io_t  io,
void *  zmq_ctx,
const char *  chan,
void *  user 
)

Add an nmsg ZMQ input channel to an nmsg_io_t object.

Parameters
[in]ioValid nmsg_io_t object.
[in]zmq_ctxZMQ context object.
[in]chanInput channel name.
[in]userNULL or an input-specific user pointer.
Returns
nmsg_res_success
nmsg_res_parse_error
nmsg_res_memfail

◆ nmsg_io_add_input_sockspec()

nmsg_res nmsg_io_add_input_sockspec ( nmsg_io_t  io,
const char *  sockspec,
void *  user 
)

Add an nmsg input sockspec to an nmsg_io_t object.

When nmsg_io_loop() is called, one thread will be created for each input socket constituting the sockspec to process input payloads.

Sockspecs are strings in the form "<ADDRESS>/<PORTRANGE>" where <ADDRESS> is an IPv4 or IPv6 address, and <PORTRANGE> is either a single port or a contiguous, inclusive range of ports of the form "<PORT_START>..<PORT_END>".

Parameters
[in]ioValid nmsg_io_t object.
[in]sockInput channel.
[in]userNULL or an input-specific user pointer.
Returns
nmsg_res_success
nmsg_res_parse_error
nmsg_res_memfail

◆ nmsg_io_add_input_fname()

nmsg_res nmsg_io_add_input_fname ( nmsg_io_t  io,
const char *  fname,
void *  user 
)

Add an NMSG file to an nmsg_io_t object.

Parameters
[in]ioValid nmsg_io_t object.
[in]fnameName of NMSG file.
[in]userNULL or an input-specific user pointer.
Returns
nmsg_res_success
nmsg_res_failure
nmsg_res_memfail

◆ nmsg_io_add_output()

nmsg_res nmsg_io_add_output ( nmsg_io_t  io,
nmsg_output_t  output,
void *  user 
)

Add an nmsg output to an nmsg_io_t object.

When nmsg_io_loop() is called, the input threads will cycle over and write payloads to the available outputs.

Parameters
[in]ioValid nmsg_io_t object.
[in]outputValid nmsg_output_t object.
[in]userNULL or an output-specific user pointer.
Returns
nmsg_res_success
nmsg_res_memfail

◆ nmsg_io_loop()

nmsg_res nmsg_io_loop ( nmsg_io_t  io)

Begin processing the data specified by the configured inputs and outputs.

One processing thread is created for each input. nmsg_io_loop() does not return until these threads finish and are destroyed.

Only nmsg_io_breakloop() may be called asynchronously while nmsg_io_loop() is executing.

nmsg_io_loop() invalidates an nmsg_io_t object. nmsg_io_destroy() should then be called.

Parameters
[in]iovalid nmsg_io_t object.
Returns
nmsg_res_success
nmsg_res_failure

◆ nmsg_io_breakloop()

void nmsg_io_breakloop ( nmsg_io_t  io)

Force a currently executing nmsg_io_loop() to stop looping and return.

Since nmsg_io_loop() is a blocking call, nmsg_io_breakloop() must be called asynchronously.

This function is safe to call inside a signal handler.

Parameters
[in]ioValid and currently processing nmsg_io_t object.

◆ nmsg_io_destroy()

void nmsg_io_destroy ( nmsg_io_t *  io)

Deallocate the resources associated with an nmsg_io_t object.

Parameters
[in]ioPointer to an nmsg_io_t object.

◆ nmsg_io_get_num_inputs()

unsigned nmsg_io_get_num_inputs ( nmsg_io_t  io)

Get the number of inputs bound to the nmsg_io_t object.

Returns
Number of inputs.

◆ nmsg_io_get_num_outputs()

unsigned nmsg_io_get_num_outputs ( nmsg_io_t  io)

Get the number of outputs bound to the nmsg_io_t object.

Returns
Number of outputs.

◆ nmsg_io_set_close_fp()

void nmsg_io_set_close_fp ( nmsg_io_t  io,
nmsg_io_close_fp  close_fp 
)

Set the close event notification function associated with an nmsg_io_t object.

The provided function will only be called at EOF on an input or output stream, unless nmsg_io_set_count() or nmsg_io_set_interval() are used to specify conditions when an input stream should be closed.

Parameters
[in]ioValid nmsg_io_t object.
[in]close_fpClose event notification function. It must be reentrant.

◆ nmsg_io_set_atstart_fp()

void nmsg_io_set_atstart_fp ( nmsg_io_t  io,
nmsg_io_user_fp  user_fp,
void *  user 
)

Set a user-specified function to be called in each thread after the thread starts.

Parameters
[in]ioValid nmsg_io_t object.
[in]user_fpUser-specified function.
[in]userUser pointer to be passed to user function.

◆ nmsg_io_set_atexit_fp()

void nmsg_io_set_atexit_fp ( nmsg_io_t  io,
nmsg_io_user_fp  user_fp,
void *  user 
)

Set a user-specified function to be called in each thread before the thread exits.

Parameters
[in]ioValid nmsg_io_t object.
[in]user_fpUser-specified function.
[in]userUser pointer to be passed to user function.

◆ nmsg_io_set_count()

void nmsg_io_set_count ( nmsg_io_t  io,
unsigned  count 
)

Configure the nmsg_io_t object to close inputs after processing a certain non-zero number of payloads.

Note that setting a count only guarantees that processing will terminate soon after at least #count payloads have been received. It is possible for an amount of inputs slightly greater than #count to be processed before the nmsg_io_t instance stops.

If the 'user' pointer associated with an output stream is non-NULL the close event notification function must be set, and this function must reopen the stream. If the 'user' pointer is NULL, nmsg_io processing will be shut down.

Parameters
[in]ioValid nmsg_io_t object.
[in]countInteger > 0.

◆ nmsg_io_set_debug()

void nmsg_io_set_debug ( nmsg_io_t  io,
int  debug 
)

Set the debug level for an nmsg_io_t object.

Debug levels >= 0 will cause debugging information to be logged to stderr.

Parameters
[in]ioValid nmsg_io_t object.
[in]debugDebug level.

◆ nmsg_io_set_filter_policy()

void nmsg_io_set_filter_policy ( nmsg_io_t  io,
const nmsg_filter_message_verdict  policy 
)

Set the filter policy for the nmsg_io_t object's filter chain.

If all filters in the filter chain return nmsg_filter_message_verdict_DECLINED for a particular message, the filter policy determines the default policy action to be applied to the message.

If not explicitly set, the default is nmsg_filter_message_verdict_ACCEPT.

Parameters
[in]ioValid nmsg_io_t object.
[in]policyThe filter policy to apply by default. Must be either nmsg_filter_message_verdict_ACCEPT or nmsg_filter_message_verdict_DROP.

◆ nmsg_io_set_interval()

void nmsg_io_set_interval ( nmsg_io_t  io,
unsigned  interval 
)

Configure the nmsg_io_t object to close inputs periodically, every #interval seconds.

The periodic closure is relative to the UNIX epoch, not the start of nmsg_io_loop(). The actual closing may be delayed up to 0.5s [NMSG_RBUF_TIMEOUT] after the interval's end.

If the 'user' pointer associated with an output stream is non-NULL the close event notification function must be set, and this function must reopen the stream. If the 'user' pointer is NULL, nmsg_io processing will be shut down.

Parameters
[in]ioValid nmsg_io_t object.
[in]intervalPositive number of seconds.

◆ nmsg_io_set_interval_randomized()

void nmsg_io_set_interval_randomized ( nmsg_io_t  io,
bool  randomized 
)

Configure the nmsg_io_t object to randomize the initial second within the interval where it closes inputs, rather than on the zeroth second of the interval.

Parameters
[in]ioValid nmsg_io_t object.
[in]randomizedBoolean flag.

◆ nmsg_io_set_output_mode()

void nmsg_io_set_output_mode ( nmsg_io_t  io,
nmsg_io_output_mode  output_mode 
)

Set the output mode behavior for an nmsg_io_t object.

Nmsg payloads received from inputs may be striped across available outputs (the default), or mirrored across all available outputs.

Since nmsg_io must synchronize access to individual outputs, the mirrored output mode will limit the amount of parallelism that can be achieved.

Parameters
[in]ioValid nmsg_io_t object.
[in]output_modenmsg_io_output_mode_stripe or nmsg_io_output_mode_mirror.

◆ nmsg_io_get_stats()

nmsg_res nmsg_io_get_stats ( nmsg_io_t  io,
uint64_t *  sum_in,
uint64_t *  sum_out,
uint64_t *  container_recvs,
uint64_t *  container_drops 
)

Get counters of input and output payloads and containers received and dropped since the start of nmsg_io_loop().

Parameters
[in]ioValid nmsg_io_t object.
[out]sum_inNumber of input payloads.
[out]sum_outNumber of output payloads.
[out]container_recvsNumber of containers received.
[out]container_dropsNumber of container drops detected.
Returns
nmsg_res_success
nmsg_res_failure