21#include "nmsg_port_net.h" 
   26# ifdef HAVE_SYS_ENDIAN_H 
   27#  include <sys/endian.h> 
   32#include <sys/socket.h> 
   58#include <protobuf-c/protobuf-c.h> 
   72#include "msgmod_plugin.h" 
   76#include "libmy/crc32c.h" 
   77#include "libmy/list.h" 
   78#include "libmy/tree.h" 
   79#include "libmy/ubuf.h" 
   80#include "libmy/b64_decode.h" 
   81#include "libmy/b64_encode.h" 
   82#include "libmy/vector.h" 
   83#include "libmy/fast_inet_ntop.h" 
   90#define NMSG_SEQSRC_GC_INTERVAL 120 
   91#define NMSG_FRAG_GC_INTERVAL   30 
   92#define NMSG_NSEC_PER_SEC       1000000000 
   94#define DEFAULT_STRBUF_ALLOC_SZ         16384 
   96#define NMSG_FLT_MODULE_PREFIX  "nmsg_flt" XSTR(NMSG_FLTMOD_VERSION) 
   97#define NMSG_MSG_MODULE_PREFIX  "nmsg_msg" XSTR(NMSG_MSGMOD_VERSION) 
   99#define NMSG_MODULE_SUFFIX      ".so" 
  101#define _nmsg_dprintf(level, format, ...) \ 
  103        if (_nmsg_global_debug >= (level)) \ 
  104                fprintf(stderr, format, ##__VA_ARGS__); \ 
  107#define _nmsg_dprintfv(var, level, format, ...) \ 
  109        if ((var) >= (level)) \ 
  110                fprintf(stderr, format, ##__VA_ARGS__); \ 
  116        nmsg_stream_type_file,
 
  117        nmsg_stream_type_sock,
 
  118        nmsg_stream_type_zmq,
 
  119        nmsg_stream_type_null,
 
  126struct nmsg_container;
 
  146extern bool                     _nmsg_global_autoclose;
 
  147extern int                      _nmsg_global_debug;
 
  163        uint64_t                        sequence_id;
 
 
  176        uint64_t                        sequence_id;
 
  178        uint64_t                        count_dropped;
 
  181        char                            addr_str[INET6_ADDRSTRLEN];
 
 
  188        struct sockaddr_storage addr_ss;
 
 
  197        ProtobufCBinaryData     *frags;
 
 
  218        struct _nmsg_ipreasm    *reasm;
 
  223        struct bpf_program      userbpf;
 
 
  231        pthread_mutex_t         lock;
 
 
  244        pthread_mutex_t         lock;
 
 
  255        nmsg_stream_type        type;
 
  266        struct timespec         lastgc;
 
  276        struct nmsg_brate       *brate;
 
  278        struct sockaddr_storage addr_ss;
 
  282        nmsg_input_stream_read_fp  stream_read_fp;
 
 
  287        pthread_mutex_t         c_lock;                 
 
  288        pthread_mutex_t         w_lock;                 
 
  289        nmsg_stream_type        type;
 
  296        nmsg_random_t           random;
 
  305        uint64_t                sequence_id;
 
 
  323        nmsg_msgmod_t           msgmod;
 
  332        nmsg_input_read_fp      read_fp;
 
  333        nmsg_input_read_loop_fp read_loop_fp;
 
  337        unsigned                filter_msgtype;
 
 
  350        nmsg_output_write_fp    write_fp;
 
  351        nmsg_output_flush_fp    flush_fp;
 
  355        unsigned                filter_msgtype;
 
 
  362        ProtobufCMessage        *message;
 
  363        Nmsg__NmsgPayload       *np;
 
 
  405typedef enum nmsg_msgmod_clos_mode {
 
  406        nmsg_msgmod_clos_m_keyval,
 
  407        nmsg_msgmod_clos_m_multiline
 
  408} nmsg_msgmod_clos_mode;
 
  413        nmsg_msgmod_clos_mode   mode;
 
 
  441        char                            fixed[DEFAULT_STRBUF_ALLOC_SZ];
 
 
  449void                    _nmsg_alias_fini(
void);
 
  453ssize_t                 _nmsg_buf_avail(
struct nmsg_buf *buf);
 
  454ssize_t                 _nmsg_buf_used(
struct nmsg_buf *buf);
 
  455struct nmsg_buf *       _nmsg_buf_new(
size_t sz);
 
  456void                    _nmsg_buf_destroy(
struct nmsg_buf **buf);
 
  457void                    _nmsg_buf_reset(
struct nmsg_buf *buf);
 
  461struct nmsg_dlmod *     _nmsg_dlmod_init(
const char *path);
 
  462void                    _nmsg_dlmod_destroy(
struct nmsg_dlmod **dlmod);
 
  475nmsg_message_t          _nmsg_message_from_payload(Nmsg__NmsgPayload *np);
 
  476nmsg_message_t          _nmsg_message_dup(
struct nmsg_message *msg);
 
  478nmsg_res                _nmsg_message_to_json(nmsg_output_t output, nmsg_message_t msg, 
struct nmsg_strbuf *sb);
 
  489char *                  _nmsg_strbuf_detach(
struct nmsg_strbuf *size);
 
  492void                    _nmsg_payload_free_all(Nmsg__Nmsg *nc);
 
  493void                    _nmsg_payload_free_crcs(Nmsg__Nmsg *nc);
 
  494void                    _nmsg_payload_calc_crcs(Nmsg__Nmsg *nc);
 
  495void                    _nmsg_payload_free(Nmsg__NmsgPayload **np);
 
  496size_t                  _nmsg_payload_size(
const Nmsg__NmsgPayload *np);
 
  499nmsg_res                _input_frag_read(nmsg_input_t, Nmsg__Nmsg **, uint8_t *buf, 
size_t buf_len);
 
  504bool                    _input_nmsg_filter(nmsg_input_t, 
unsigned, Nmsg__NmsgPayload *);
 
  505nmsg_res                _input_nmsg_read(nmsg_input_t, nmsg_message_t *);
 
  507nmsg_res                _input_nmsg_unpack_container(nmsg_input_t, Nmsg__Nmsg **, uint8_t *, 
size_t);
 
  508nmsg_res                _input_nmsg_unpack_container2(
const uint8_t *, 
size_t, 
unsigned, Nmsg__Nmsg **);
 
  509nmsg_res                _input_nmsg_read_container_file(nmsg_input_t, Nmsg__Nmsg **);
 
  510nmsg_res                _input_nmsg_read_container_sock(nmsg_input_t, Nmsg__Nmsg **);
 
  512nmsg_res                _input_nmsg_read_container_zmq(nmsg_input_t, Nmsg__Nmsg **);
 
  514nmsg_res                _input_nmsg_deserialize_header(
const uint8_t *, 
size_t, ssize_t *, 
unsigned *);
 
  517nmsg_res                _input_nmsg_read_callback(nmsg_input_t, nmsg_message_t *);
 
  520nmsg_res                _input_nmsg_read_null(nmsg_input_t, nmsg_message_t *);
 
  524nmsg_res                _input_pcap_read(nmsg_input_t, nmsg_message_t *);
 
  525nmsg_res                _input_pcap_read_raw(nmsg_input_t, nmsg_message_t *);
 
  528nmsg_res                _input_pres_read(nmsg_input_t, nmsg_message_t *);
 
  531nmsg_res                _input_json_read(nmsg_input_t, nmsg_message_t *);
 
  534struct nmsg_seqsrc *    _input_seqsrc_get(nmsg_input_t, Nmsg__Nmsg *);
 
  535void                    _input_seqsrc_destroy(nmsg_input_t);
 
  536size_t                  _input_seqsrc_update(nmsg_input_t, 
struct nmsg_seqsrc *, Nmsg__Nmsg *);
 
  539void                    _output_stop(nmsg_output_t);
 
  542nmsg_res                _output_nmsg_flush(nmsg_output_t);
 
  543nmsg_res                _output_nmsg_write(nmsg_output_t, nmsg_message_t);
 
  546nmsg_res                _output_pres_write(nmsg_output_t, nmsg_message_t);
 
  549nmsg_res                _output_json_write(nmsg_output_t, nmsg_message_t);
 
  552struct nmsg_brate *     _nmsg_brate_init(
size_t target_byte_rate);
 
  553void                    _nmsg_brate_destroy(
struct nmsg_brate **);
 
  554void                    _nmsg_brate_sleep(
struct nmsg_brate *, 
size_t container_sz, 
size_t n_payloads, 
size_t n);
 
  603_nmsg_ipdg_parse_reasm(
struct nmsg_ipdg *dg, 
unsigned etype, 
size_t len,
 
  604                       const u_char *pkt, 
struct _nmsg_ipreasm *reasm,
 
  605                       unsigned *new_len, u_char *new_pkt, 
int *defrag,
 
Implementing message filter modules.
 
Base nmsg support header.
 
nmsg_res(* nmsg_cb_message_read)(nmsg_message_t *msg, void *user)
Callback function for generating nmsg messages.
 
void(* nmsg_cb_message)(nmsg_message_t msg, void *user)
Callback function for processing nmsg messages.
 
nmsg_output_type
An enum identifying the underlying implementation of an nmsg_output_t object.
 
an nmsg_message MUST always have a non-NULL ->np member.
 
Structure mapping protocol buffer schema fields to nmsg_msgmod_field_type values for "transparent" mo...
 
Structure exported by message modules to implement a new message type.