nmsg 1.1.2
private.h
1/*
2 * Copyright (c) 2023 DomainTools LLC
3 * Copyright (c) 2008-2015, 2019, 2021 by Farsight Security, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18#ifndef NMSG_PRIVATE_H
19#define NMSG_PRIVATE_H
20
21#include "nmsg_port_net.h"
22
23#ifdef HAVE_ENDIAN_H
24# include <endian.h>
25#else
26# ifdef HAVE_SYS_ENDIAN_H
27# include <sys/endian.h>
28# endif
29#endif
30
31#include <sys/types.h>
32#include <sys/socket.h>
33#include <sys/time.h>
34#include <sys/stat.h>
35#include <assert.h>
36#include <ctype.h>
37#include <errno.h>
38#include <fcntl.h>
39#include <inttypes.h>
40#include <limits.h>
41#include <pthread.h>
42#include <poll.h>
43#include <signal.h>
44#include <stdarg.h>
45#include <stdbool.h>
46#include <stddef.h>
47#include <stdio.h>
48#include <stdint.h>
49#include <stdlib.h>
50#include <string.h>
51#include <strings.h>
52#include <time.h>
53#include <unistd.h>
54#include <arpa/inet.h>
55
56#include <zlib.h>
57
58#include <protobuf-c/protobuf-c.h>
59
60#ifdef HAVE_LIBZMQ
61# include <zmq.h>
62#endif /* HAVE_LIBZMQ */
63
64#ifdef HAVE_JSON_C
65#include <json.h>
66#endif /* HAVE_JSON_C */
67
68#include "nmsg.h"
69#include "nmsg.pb-c.h"
70
71#include "fltmod_plugin.h"
72#include "msgmod_plugin.h"
73#include "ipreasm.h"
74#include "nmsg_json.h"
75
76#include "libmy/crc32c.h"
77#include "libmy/list.h"
78#include "libmy/tree.h"
79#include "libmy/ubuf.h"
80#include "libmy/b64_decode.h"
81#include "libmy/b64_encode.h"
82#include "libmy/vector.h"
83#include "libmy/fast_inet_ntop.h"
84
85/* Macros. */
86
87#define STR(x) #x
88#define XSTR(x) STR(x)
89
90#define NMSG_SEQSRC_GC_INTERVAL 120
91#define NMSG_FRAG_GC_INTERVAL 30
92#define NMSG_NSEC_PER_SEC 1000000000
93
94#define DEFAULT_STRBUF_ALLOC_SZ 16384
95
96#define NMSG_FLT_MODULE_PREFIX "nmsg_flt" XSTR(NMSG_FLTMOD_VERSION)
97#define NMSG_MSG_MODULE_PREFIX "nmsg_msg" XSTR(NMSG_MSGMOD_VERSION)
98
99#define NMSG_MODULE_SUFFIX ".so"
100
101#define _nmsg_dprintf(level, format, ...) \
102do { \
103 if (_nmsg_global_debug >= (level)) \
104 fprintf(stderr, format, ##__VA_ARGS__); \
105} while (0)
106
107#define _nmsg_dprintfv(var, level, format, ...) \
108do { \
109 if ((var) >= (level)) \
110 fprintf(stderr, format, ##__VA_ARGS__); \
111} while (0)
112
113/* Enums. */
114
115typedef enum {
116 nmsg_stream_type_file,
117 nmsg_stream_type_sock,
118 nmsg_stream_type_zmq,
119 nmsg_stream_type_null,
120} nmsg_stream_type;
121
122/* Forward. */
123
124struct nmsg_brate;
125struct nmsg_buf;
126struct nmsg_container;
127struct nmsg_dlmod;
128struct nmsg_frag;
129struct nmsg_frag_key;
130struct nmsg_frag_tree;
131struct nmsg_input;
132struct nmsg_json;
133struct nmsg_output;
134struct nmsg_msgmod;
135struct nmsg_msgmod_field;
136struct nmsg_msgmod_clos;
137struct nmsg_pcap;
138struct nmsg_pres;
139struct nmsg_stream_input;
140struct nmsg_stream_output;
141struct nmsg_seqsrc;
142struct nmsg_seqsrc_key;
143
144/* Globals. */
145
146extern bool _nmsg_global_autoclose;
147extern int _nmsg_global_debug;
148extern struct nmsg_msgmodset * _nmsg_global_msgmodset;
149
150/* Function types. */
151
152typedef nmsg_res (*nmsg_input_read_fp)(struct nmsg_input *, nmsg_message_t *);
153typedef nmsg_res (*nmsg_input_read_loop_fp)(struct nmsg_input *, int,
154 nmsg_cb_message, void *);
155typedef nmsg_res (*nmsg_input_stream_read_fp)(struct nmsg_input *, Nmsg__Nmsg **);
156typedef nmsg_res (*nmsg_output_write_fp)(struct nmsg_output *, nmsg_message_t);
157typedef nmsg_res (*nmsg_output_flush_fp)(struct nmsg_output *);
158
159/* Data types. */
160
161/* nmsg_seqsrc */
163 uint64_t sequence_id;
164 sa_family_t af;
165 uint16_t port;
166 union {
167 uint8_t ip4[4];
168 uint8_t ip6[16];
169 };
170};
171
173 ISC_LINK(struct nmsg_seqsrc) link;
174 struct nmsg_seqsrc_key key;
175 uint32_t sequence;
176 uint64_t sequence_id;
177 uint64_t count;
178 uint64_t count_dropped;
179 time_t last;
180 bool init;
181 char addr_str[INET6_ADDRSTRLEN];
182};
183
184/* nmsg_frag: used by nmsg_stream_input */
186 uint32_t id;
187 uint32_t crc;
188 struct sockaddr_storage addr_ss;
189};
190
191struct nmsg_frag {
192 RB_ENTRY(nmsg_frag) link;
193 struct nmsg_frag_key key;
194 unsigned last;
195 unsigned rem;
196 struct timespec ts;
197 ProtobufCBinaryData *frags;
198};
199
200/* nmsg_frag_tree: used by nmsg_stream_input */
202 RB_HEAD(frag_ent, nmsg_frag) head;
203};
204
205/* nmsg_buf: used by nmsg_stream_input, nmsg_stream_output */
206struct nmsg_buf {
207 int fd;
208 size_t bufsz;
209 u_char *data; /* allocated data starts here */
210 u_char *pos; /* position of next buffer read */
211 u_char *end; /* one byte beyond valid data */
212};
213
214/* nmsg_pcap: used by nmsg_input */
215struct nmsg_pcap {
216 int datalink;
217 pcap_t *handle;
218 struct _nmsg_ipreasm *reasm;
219 u_char *new_pkt;
220
221 pcap_t *user;
222 char *userbpft;
223 struct bpf_program userbpf;
224
225 nmsg_pcap_type type;
226 bool raw;
227};
228
229/* nmsg_pres: used by nmsg_input and nmsg_output */
230struct nmsg_pres {
231 pthread_mutex_t lock;
232 FILE *fp;
233 bool flush;
234 char *endline;
235 unsigned source;
236 unsigned operator;
237 unsigned group;
238};
239
240/* nmsg_json: used by nmsg_input and nmsg_output */
241struct nmsg_json {
242#ifdef HAVE_JSON_C
243#endif /* HAVE_JSON_C */
244 pthread_mutex_t lock;
245 FILE *fp;
246 int orig_fd;
247 bool flush;
248 unsigned source;
249 unsigned operator;
250 unsigned group;
251};
252
253/* nmsg_stream_input: used by nmsg_input */
255 nmsg_stream_type type;
256 struct nmsg_buf *buf;
257#ifdef HAVE_LIBZMQ
258 void *zmq;
259#endif /* HAVE_LIBZMQ */
260 Nmsg__Nmsg *nmsg;
261 unsigned np_index;
262 size_t nc_size;
263 struct nmsg_frag_tree nft;
264 struct pollfd pfd;
265 struct timespec now;
266 struct timespec lastgc;
267 unsigned nfrags;
268 unsigned flags;
269 nmsg_zbuf_t zb;
270 u_char *zb_tmp;
271 unsigned source;
272 unsigned operator;
273 unsigned group;
274 bool blocking_io;
275 bool verify_seqsrc;
276 struct nmsg_brate *brate;
277 ISC_LIST(struct nmsg_seqsrc) seqsrcs;
278 struct sockaddr_storage addr_ss;
279 uint64_t count_recv;
280 uint64_t count_drop;
281
282 nmsg_input_stream_read_fp stream_read_fp;
283};
284
285/* nmsg_stream_output: used by nmsg_output */
287 pthread_mutex_t c_lock; /* Container lock. */
288 pthread_mutex_t w_lock; /* Write/Send lock. */
289 nmsg_stream_type type;
290 int fd;
291#ifdef HAVE_LIBZMQ
292 void *zmq;
293#endif /* HAVE_LIBZMQ */
294 nmsg_container_t c;
295 size_t bufsz;
296 nmsg_random_t random;
297 nmsg_rate_t rate;
298 bool buffered;
299 unsigned source;
300 unsigned operator;
301 unsigned group;
302 bool do_zlib;
303 bool do_sequence;
304 uint32_t sequence;
305 uint64_t sequence_id;
306};
307
308/* nmsg_callback_output: used by nmsg_output */
311 void *user;
312};
313
314/* nmsg_callback_input: used by nmsg_input */
317 void *user;
318};
319
320/* nmsg_input */
322 nmsg_input_type type;
323 nmsg_msgmod_t msgmod;
324 void *clos;
325 union {
326 struct nmsg_stream_input *stream;
327 struct nmsg_pcap *pcap;
328 struct nmsg_pres *pres;
329 struct nmsg_json *json;
330 struct nmsg_callback_input *callback;
331 };
332 nmsg_input_read_fp read_fp;
333 nmsg_input_read_loop_fp read_loop_fp;
334
335 bool do_filter;
336 unsigned filter_vid;
337 unsigned filter_msgtype;
338 volatile bool stop;
339};
340
341/* nmsg_output */
343 nmsg_output_type type;
344 union {
345 struct nmsg_stream_output *stream;
346 struct nmsg_pres *pres;
347 struct nmsg_json *json;
348 struct nmsg_callback_output *callback;
349 };
350 nmsg_output_write_fp write_fp;
351 nmsg_output_flush_fp flush_fp;
352
353 bool do_filter;
354 unsigned filter_vid;
355 unsigned filter_msgtype;
356 volatile bool stop;
357};
358
359/* nmsg_message */
361 nmsg_msgmod_t mod;
362 ProtobufCMessage *message;
363 Nmsg__NmsgPayload *np;
364 void *msg_clos;
365 size_t n_allocs;
366 void **allocs;
367 bool updated;
368};
369
397/* dlmod / msgmod / msgmodset */
398
400 ISC_LINK(struct nmsg_dlmod) link;
401 char *path;
402 void *handle;
403};
404
405typedef enum nmsg_msgmod_clos_mode {
406 nmsg_msgmod_clos_m_keyval,
407 nmsg_msgmod_clos_m_multiline
408} nmsg_msgmod_clos_mode;
409
411 char *nmsg_pbuf;
412 size_t estsz;
413 nmsg_msgmod_clos_mode mode;
414 struct nmsg_msgmod_field *field;
415 struct nmsg_strbuf *strbufs;
416 void *mod_clos;
417};
418
420 struct nmsg_msgmod **msgtypes;
421 char *vname;
422 size_t nm;
423};
424
426 struct nmsg_msgmod_plugin *plugin;
427 struct nmsg_msgmod_field *fields;
428 struct nmsg_msgmod_field **fields_idx;
429 size_t n_fields;
430};
431
433 ISC_LIST(struct nmsg_dlmod) dlmods;
434 struct nmsg_msgvendor **vendors;
435 size_t nv;
436};
437
438/* internal nmsg_strbuf wrapper to use expensive stack allocation by default */
440 struct nmsg_strbuf sb;
441 char fixed[DEFAULT_STRBUF_ALLOC_SZ];
442};
443
444/* Prototypes. */
445
446/* from alias.c */
447
448nmsg_res _nmsg_alias_init(void);
449void _nmsg_alias_fini(void);
450
451/* from buf.c */
452
453ssize_t _nmsg_buf_avail(struct nmsg_buf *buf);
454ssize_t _nmsg_buf_used(struct nmsg_buf *buf);
455struct nmsg_buf * _nmsg_buf_new(size_t sz);
456void _nmsg_buf_destroy(struct nmsg_buf **buf);
457void _nmsg_buf_reset(struct nmsg_buf *buf);
458
459/* from dlmod.c */
460
461struct nmsg_dlmod * _nmsg_dlmod_init(const char *path);
462void _nmsg_dlmod_destroy(struct nmsg_dlmod **dlmod);
463
464/* from msgmod.c */
465
466struct nmsg_msgmod * _nmsg_msgmod_start(struct nmsg_msgmod_plugin *plugin);
467void _nmsg_msgmod_stop(struct nmsg_msgmod **mod);
468
469/* from message.c */
470
471nmsg_res _nmsg_message_init_message(struct nmsg_message *msg);
472nmsg_res _nmsg_message_init_payload(struct nmsg_message *msg);
473nmsg_res _nmsg_message_deserialize(struct nmsg_message *msg);
474nmsg_res _nmsg_message_serialize(struct nmsg_message *msg);
475nmsg_message_t _nmsg_message_from_payload(Nmsg__NmsgPayload *np);
476nmsg_message_t _nmsg_message_dup(struct nmsg_message *msg);
477nmsg_res _nmsg_message_dup_protobuf(const struct nmsg_message *msg, ProtobufCMessage **dst);
478nmsg_res _nmsg_message_to_json(nmsg_output_t output, nmsg_message_t msg, struct nmsg_strbuf *sb);
479
480/* from msgmodset.c */
481
482struct nmsg_msgmodset * _nmsg_msgmodset_init(const char *path);
483void _nmsg_msgmodset_destroy(struct nmsg_msgmodset **);
484
485/* from strbuf.c */
486struct nmsg_strbuf * _nmsg_strbuf_init(struct nmsg_strbuf_storage *sbs);
487void _nmsg_strbuf_destroy(struct nmsg_strbuf_storage *sbs);
488nmsg_res _nmsg_strbuf_expand(struct nmsg_strbuf *sb, size_t size);
489char * _nmsg_strbuf_detach(struct nmsg_strbuf *size);
490
491/* from payload.c */
492void _nmsg_payload_free_all(Nmsg__Nmsg *nc);
493void _nmsg_payload_free_crcs(Nmsg__Nmsg *nc);
494void _nmsg_payload_calc_crcs(Nmsg__Nmsg *nc);
495void _nmsg_payload_free(Nmsg__NmsgPayload **np);
496size_t _nmsg_payload_size(const Nmsg__NmsgPayload *np);
497
498/* from input_frag.c */
499nmsg_res _input_frag_read(nmsg_input_t, Nmsg__Nmsg **, uint8_t *buf, size_t buf_len);
500void _input_frag_destroy(struct nmsg_stream_input *);
501void _input_frag_gc(struct nmsg_stream_input *);
502
503/* from input_nmsg.c */
504bool _input_nmsg_filter(nmsg_input_t, unsigned, Nmsg__NmsgPayload *);
505nmsg_res _input_nmsg_read(nmsg_input_t, nmsg_message_t *);
506nmsg_res _input_nmsg_loop(nmsg_input_t, int, nmsg_cb_message, void *);
507nmsg_res _input_nmsg_unpack_container(nmsg_input_t, Nmsg__Nmsg **, uint8_t *, size_t);
508nmsg_res _input_nmsg_unpack_container2(const uint8_t *, size_t, unsigned, Nmsg__Nmsg **);
509nmsg_res _input_nmsg_read_container_file(nmsg_input_t, Nmsg__Nmsg **);
510nmsg_res _input_nmsg_read_container_sock(nmsg_input_t, Nmsg__Nmsg **);
511#ifdef HAVE_LIBZMQ
512nmsg_res _input_nmsg_read_container_zmq(nmsg_input_t, Nmsg__Nmsg **);
513#endif /* HAVE_LIBZMQ */
514nmsg_res _input_nmsg_deserialize_header(const uint8_t *, size_t, ssize_t *, unsigned *);
515
516/* from input_callback.c */
517nmsg_res _input_nmsg_read_callback(nmsg_input_t, nmsg_message_t *);
518
519/* from input_nullnmsg.c */
520nmsg_res _input_nmsg_read_null(nmsg_input_t, nmsg_message_t *);
521nmsg_res _input_nmsg_loop_null(nmsg_input_t, int, nmsg_cb_message, void *);
522
523/* from input_pcap.c */
524nmsg_res _input_pcap_read(nmsg_input_t, nmsg_message_t *);
525nmsg_res _input_pcap_read_raw(nmsg_input_t, nmsg_message_t *);
526
527/* from input_pres.c */
528nmsg_res _input_pres_read(nmsg_input_t, nmsg_message_t *);
529
530/* from input_json.c */
531nmsg_res _input_json_read(nmsg_input_t, nmsg_message_t *);
532
533/* from input_seqsrc.c */
534struct nmsg_seqsrc * _input_seqsrc_get(nmsg_input_t, Nmsg__Nmsg *);
535void _input_seqsrc_destroy(nmsg_input_t);
536size_t _input_seqsrc_update(nmsg_input_t, struct nmsg_seqsrc *, Nmsg__Nmsg *);
537
538/* from output.c */
539void _output_stop(nmsg_output_t);
540
541/* from output_nmsg.c */
542nmsg_res _output_nmsg_flush(nmsg_output_t);
543nmsg_res _output_nmsg_write(nmsg_output_t, nmsg_message_t);
544
545/* from output_pres.c */
546nmsg_res _output_pres_write(nmsg_output_t, nmsg_message_t);
547
548/* from output_json.c */
549nmsg_res _output_json_write(nmsg_output_t, nmsg_message_t);
550
551/* from brate.c */
552struct nmsg_brate * _nmsg_brate_init(size_t target_byte_rate);
553void _nmsg_brate_destroy(struct nmsg_brate **);
554void _nmsg_brate_sleep(struct nmsg_brate *, size_t container_sz, size_t n_payloads, size_t n);
555
556/* from ipdg.c */
557
603_nmsg_ipdg_parse_reasm(struct nmsg_ipdg *dg, unsigned etype, size_t len,
604 const u_char *pkt, struct _nmsg_ipreasm *reasm,
605 unsigned *new_len, u_char *new_pkt, int *defrag,
606 uint64_t timestamp);
607
608#endif /* NMSG_PRIVATE_H */
Implementing message filter modules.
nmsg_input_type
An enum identifying the underlying implementation of an nmsg_input_t object.
Definition input.h:54
Base nmsg support header.
nmsg_res(* nmsg_cb_message_read)(nmsg_message_t *msg, void *user)
Callback function for generating nmsg messages.
Definition nmsg.h:94
void(* nmsg_cb_message)(nmsg_message_t msg, void *user)
Callback function for processing nmsg messages.
Definition nmsg.h:78
nmsg_output_type
An enum identifying the underlying implementation of an nmsg_output_t object.
Definition output.h:38
nmsg_res
nmsg result code
Definition res.h:25
an nmsg_message MUST always have a non-NULL ->np member.
Definition private.h:399
Parsed IP datagram.
Definition ipdg.h:31
Structure mapping protocol buffer schema fields to nmsg_msgmod_field_type values for "transparent" mo...
Structure exported by message modules to implement a new message type.
String buffer.
Definition strbuf.h:28