21#include "nmsg_port_net.h"
26# ifdef HAVE_SYS_ENDIAN_H
27# include <sys/endian.h>
32#include <sys/socket.h>
58#include <protobuf-c/protobuf-c.h>
72#include "msgmod_plugin.h"
76#include "libmy/crc32c.h"
77#include "libmy/list.h"
78#include "libmy/tree.h"
79#include "libmy/ubuf.h"
80#include "libmy/b64_decode.h"
81#include "libmy/b64_encode.h"
82#include "libmy/vector.h"
83#include "libmy/fast_inet_ntop.h"
90#define NMSG_SEQSRC_GC_INTERVAL 120
91#define NMSG_FRAG_GC_INTERVAL 30
92#define NMSG_NSEC_PER_SEC 1000000000
94#define DEFAULT_STRBUF_ALLOC_SZ 16384
96#define NMSG_FLT_MODULE_PREFIX "nmsg_flt" XSTR(NMSG_FLTMOD_VERSION)
97#define NMSG_MSG_MODULE_PREFIX "nmsg_msg" XSTR(NMSG_MSGMOD_VERSION)
99#define NMSG_MODULE_SUFFIX ".so"
101#define _nmsg_dprintf(level, format, ...) \
103 if (_nmsg_global_debug >= (level)) \
104 fprintf(stderr, format, ##__VA_ARGS__); \
107#define _nmsg_dprintfv(var, level, format, ...) \
109 if ((var) >= (level)) \
110 fprintf(stderr, format, ##__VA_ARGS__); \
116 nmsg_stream_type_file,
117 nmsg_stream_type_sock,
118 nmsg_stream_type_zmq,
119 nmsg_stream_type_null,
126struct nmsg_container;
146extern bool _nmsg_global_autoclose;
147extern int _nmsg_global_debug;
163 uint64_t sequence_id;
176 uint64_t sequence_id;
178 uint64_t count_dropped;
181 char addr_str[INET6_ADDRSTRLEN];
188 struct sockaddr_storage addr_ss;
197 ProtobufCBinaryData *frags;
218 struct _nmsg_ipreasm *reasm;
223 struct bpf_program userbpf;
231 pthread_mutex_t lock;
244 pthread_mutex_t lock;
255 nmsg_stream_type type;
266 struct timespec lastgc;
276 struct nmsg_brate *brate;
278 struct sockaddr_storage addr_ss;
282 nmsg_input_stream_read_fp stream_read_fp;
287 pthread_mutex_t c_lock;
288 pthread_mutex_t w_lock;
289 nmsg_stream_type type;
296 nmsg_random_t random;
305 uint64_t sequence_id;
323 nmsg_msgmod_t msgmod;
332 nmsg_input_read_fp read_fp;
333 nmsg_input_read_loop_fp read_loop_fp;
337 unsigned filter_msgtype;
350 nmsg_output_write_fp write_fp;
351 nmsg_output_flush_fp flush_fp;
355 unsigned filter_msgtype;
362 ProtobufCMessage *message;
363 Nmsg__NmsgPayload *np;
405typedef enum nmsg_msgmod_clos_mode {
406 nmsg_msgmod_clos_m_keyval,
407 nmsg_msgmod_clos_m_multiline
408} nmsg_msgmod_clos_mode;
413 nmsg_msgmod_clos_mode mode;
441 char fixed[DEFAULT_STRBUF_ALLOC_SZ];
449void _nmsg_alias_fini(
void);
453ssize_t _nmsg_buf_avail(
struct nmsg_buf *buf);
454ssize_t _nmsg_buf_used(
struct nmsg_buf *buf);
455struct nmsg_buf * _nmsg_buf_new(
size_t sz);
456void _nmsg_buf_destroy(
struct nmsg_buf **buf);
457void _nmsg_buf_reset(
struct nmsg_buf *buf);
461struct nmsg_dlmod * _nmsg_dlmod_init(
const char *path);
462void _nmsg_dlmod_destroy(
struct nmsg_dlmod **dlmod);
475nmsg_message_t _nmsg_message_from_payload(Nmsg__NmsgPayload *np);
476nmsg_message_t _nmsg_message_dup(
struct nmsg_message *msg);
478nmsg_res _nmsg_message_to_json(nmsg_output_t output, nmsg_message_t msg,
struct nmsg_strbuf *sb);
489char * _nmsg_strbuf_detach(
struct nmsg_strbuf *size);
492void _nmsg_payload_free_all(Nmsg__Nmsg *nc);
493void _nmsg_payload_free_crcs(Nmsg__Nmsg *nc);
494void _nmsg_payload_calc_crcs(Nmsg__Nmsg *nc);
495void _nmsg_payload_free(Nmsg__NmsgPayload **np);
496size_t _nmsg_payload_size(
const Nmsg__NmsgPayload *np);
499nmsg_res _input_frag_read(nmsg_input_t, Nmsg__Nmsg **, uint8_t *buf,
size_t buf_len);
504bool _input_nmsg_filter(nmsg_input_t,
unsigned, Nmsg__NmsgPayload *);
505nmsg_res _input_nmsg_read(nmsg_input_t, nmsg_message_t *);
507nmsg_res _input_nmsg_unpack_container(nmsg_input_t, Nmsg__Nmsg **, uint8_t *,
size_t);
508nmsg_res _input_nmsg_unpack_container2(
const uint8_t *,
size_t,
unsigned, Nmsg__Nmsg **);
509nmsg_res _input_nmsg_read_container_file(nmsg_input_t, Nmsg__Nmsg **);
510nmsg_res _input_nmsg_read_container_sock(nmsg_input_t, Nmsg__Nmsg **);
512nmsg_res _input_nmsg_read_container_zmq(nmsg_input_t, Nmsg__Nmsg **);
514nmsg_res _input_nmsg_deserialize_header(
const uint8_t *,
size_t, ssize_t *,
unsigned *);
517nmsg_res _input_nmsg_read_callback(nmsg_input_t, nmsg_message_t *);
520nmsg_res _input_nmsg_read_null(nmsg_input_t, nmsg_message_t *);
524nmsg_res _input_pcap_read(nmsg_input_t, nmsg_message_t *);
525nmsg_res _input_pcap_read_raw(nmsg_input_t, nmsg_message_t *);
528nmsg_res _input_pres_read(nmsg_input_t, nmsg_message_t *);
531nmsg_res _input_json_read(nmsg_input_t, nmsg_message_t *);
534struct nmsg_seqsrc * _input_seqsrc_get(nmsg_input_t, Nmsg__Nmsg *);
535void _input_seqsrc_destroy(nmsg_input_t);
536size_t _input_seqsrc_update(nmsg_input_t,
struct nmsg_seqsrc *, Nmsg__Nmsg *);
539void _output_stop(nmsg_output_t);
542nmsg_res _output_nmsg_flush(nmsg_output_t);
543nmsg_res _output_nmsg_write(nmsg_output_t, nmsg_message_t);
546nmsg_res _output_pres_write(nmsg_output_t, nmsg_message_t);
549nmsg_res _output_json_write(nmsg_output_t, nmsg_message_t);
552struct nmsg_brate * _nmsg_brate_init(
size_t target_byte_rate);
553void _nmsg_brate_destroy(
struct nmsg_brate **);
554void _nmsg_brate_sleep(
struct nmsg_brate *,
size_t container_sz,
size_t n_payloads,
size_t n);
603_nmsg_ipdg_parse_reasm(
struct nmsg_ipdg *dg,
unsigned etype,
size_t len,
604 const u_char *pkt,
struct _nmsg_ipreasm *reasm,
605 unsigned *new_len, u_char *new_pkt,
int *defrag,
Implementing message filter modules.
Base nmsg support header.
nmsg_res(* nmsg_cb_message_read)(nmsg_message_t *msg, void *user)
Callback function for generating nmsg messages.
void(* nmsg_cb_message)(nmsg_message_t msg, void *user)
Callback function for processing nmsg messages.
nmsg_output_type
An enum identifying the underlying implementation of an nmsg_output_t object.
an nmsg_message MUST always have a non-NULL ->np member.
Structure mapping protocol buffer schema fields to nmsg_msgmod_field_type values for "transparent" mo...
Structure exported by message modules to implement a new message type.