diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index e736a018..45afe4d8 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -21,7 +21,7 @@ jobs: mkdir build && cd build cmake -DBUILD_SAMPLES=On -DBUILD_TESTS=On .. make package - ctest -V + ctest -VV --stop-on-failure cmake -DBUILD_OOT_TEST=On .. sudo make install - name: Build and test FS @@ -29,7 +29,7 @@ jobs: mkdir build_fs && cd build_fs cmake -DBUILD_TESTS=On -DBUILD_SAMPLES=On -DWITH_FS=On .. make package - ctest -V + ctest -VV --stop-on-failure cmake -DBUILD_OOT_TEST=On .. sudo make install - name: Build and test libkqueue @@ -37,7 +37,7 @@ jobs: mkdir build_kqueue && cd build_kqueue cmake -DBUILD_SAMPLES=On -DBUILD_TESTS=On -DWITH_LIBKQUEUE=true -DWITH_VALGRIND=false .. make package - ctest -V + ctest -VV --stop-on-failure cmake -DBUILD_OOT_TEST=On .. sudo make install - name: Build and test liburing @@ -45,7 +45,7 @@ jobs: mkdir build_uring && cd build_uring cmake -DBUILD_SAMPLES=On -DBUILD_TESTS=On -DWITH_LIBURING=true -DWITH_VALGRIND=false .. make package - ctest -V + ctest -VV --stop-on-failure cmake -DBUILD_OOT_TEST=On .. sudo make install @@ -70,14 +70,14 @@ jobs: mkdir build && cd build cmake -DBUILD_SAMPLES=On -DBUILD_TESTS=On -DBUILD_OOT_TEST=true .. make - ctest -V + ctest -VV --stop-on-failure make install cd .. mkdir build_fs && cd build_fs cmake -DBUILD_SAMPLES=On -DBUILD_TESTS=On -DBUILD_OOT_TEST=true -DWITH_FS=true .. make kldload fusefs - ctest -V + ctest -VV --stop-on-failure make install build-osx-amd64: @@ -94,4 +94,4 @@ jobs: mkdir -p build cd build && cmake -DBUILD_SAMPLES=On -DBUILD_TESTS=On .. cmake --build . - ctest -V + ctest -VV --stop-on-failure diff --git a/Lib/core/evts.c b/Lib/core/evts.c index 9722c66f..e741b422 100644 --- a/Lib/core/evts.c +++ b/Lib/core/evts.c @@ -89,19 +89,12 @@ _public_ ssize_t m_mod_unstash(m_mod_t *mod, size_t len) { return processed; } -_public_ int m_mod_set_batch_size(m_mod_t *mod, size_t len) { +_public_ int m_mod_batch_set(m_mod_t *mod, size_t len, uint64_t timeout_ns) { M_MOD_ASSERT(mod); M_MOD_CONSUME_TOKEN(mod); mod->batch.len = len; - return 0; -} - -_public_ int m_mod_set_batch_timeout(m_mod_t *mod, uint64_t timeout_ns) { - M_MOD_ASSERT(mod); - - // src_deregister and src_register already consume a token - + /* If it was already set, remove old timer */ if (mod->batch.timer.ns != 0) { m_mod_src_deregister_tmr(mod, &mod->batch.timer); diff --git a/Lib/core/mod.c b/Lib/core/mod.c index b3297401..d029999b 100644 --- a/Lib/core/mod.c +++ b/Lib/core/mod.c @@ -547,7 +547,7 @@ _public_ int m_mod_deregister(m_mod_t **mod) { return mod_deregister(mod, true); } -_public_ int m_mod_set_tokenbucket(m_mod_t *mod, uint32_t rate, uint64_t burst) { +_public_ int m_mod_tb_set(m_mod_t *mod, uint32_t rate, uint64_t burst) { M_MOD_ASSERT(mod); M_PARAM_ASSERT(rate <= BILLION); diff --git a/Lib/core/poll/kqueue.c b/Lib/core/poll/kqueue.c index 6c8812a8..35a890c9 100644 --- a/Lib/core/poll/kqueue.c +++ b/Lib/core/poll/kqueue.c @@ -21,7 +21,6 @@ int poll_create(poll_priv_t *priv) { } int poll_set_new_evt(poll_priv_t *priv, ev_src_t *tmp, const enum op_type flag) { - static int timer_ids = 1; GET_PRIV_DATA(); /* Eventually alloc kqueue data if needed */ @@ -53,7 +52,7 @@ int poll_set_new_evt(poll_priv_t *priv, ev_src_t *tmp, const enum op_type flag) #else const int flags = 0; // unsupported... #endif - EV_SET(_ev, timer_ids++, EVFILT_TIMER, f, flags | NOTE_NSECONDS, tmp->tmr_src.its.ns, tmp); + EV_SET(_ev, tmp->tmr_src.its.ns, EVFILT_TIMER, f, flags | NOTE_NSECONDS, tmp->tmr_src.its.ns, tmp); break; } case M_SRC_TYPE_SGN: diff --git a/Lib/core/ps.c b/Lib/core/ps.c index 38347264..cf2f58a2 100644 --- a/Lib/core/ps.c +++ b/Lib/core/ps.c @@ -135,7 +135,7 @@ static int send_msg(m_mod_t *mod, const m_mod_t *recipient, const char *topic, M_PARAM_ASSERT(message); mod->stats.sent_msgs++; - ps_priv_t m = { { false, mod, topic, message }, flags, NULL }; + ps_priv_t m = { { mod, topic, message }, flags, NULL }; return tell_pubsub_msg(&m, recipient, mod->ctx); } @@ -146,7 +146,7 @@ int tell_system_pubsub_msg(const m_mod_t *recipient, m_ctx_t *c, m_mod_t *sender // A module sent a M_PS_MOD_POISONPILL message to another, or it was stopped sender->stats.sent_msgs++; } - ps_priv_t m = { { true, sender, topic, NULL }, 0, NULL }; + ps_priv_t m = { { sender, topic, NULL }, 0, NULL }; return tell_pubsub_msg(&m, recipient, c); } diff --git a/Lib/core/public/module/mod.h b/Lib/core/public/module/mod.h index 45ebcd69..e0ec948e 100644 --- a/Lib/core/public/module/mod.h +++ b/Lib/core/public/module/mod.h @@ -28,9 +28,9 @@ typedef enum { */ #define M_SRC_SHIFT(type, val) val << (8 * (type + 1)) typedef enum { - M_SRC_PRIO_LOW = 1 << 0, // PubSub subscription low priority - M_SRC_PRIO_NORM = 1 << 1, // PubSub subscription mid priority (default) - M_SRC_PRIO_HIGH = 1 << 2, // PubSub subscription high priority + M_SRC_PRIO_LOW = 1 << 0, // Src low priority + M_SRC_PRIO_NORM = 1 << 1, // Src mid priority (default) + M_SRC_PRIO_HIGH = 1 << 2, // Src high priority M_SRC_AUTOFREE = 1 << 3, // Automatically free userdata upon source deregistation. M_SRC_ONESHOT = 1 << 4, // Run just once then automatically deregister source. M_SRC_DUP = 1 << 5, // Duplicate PubSub topic, source fd or source path. @@ -46,7 +46,7 @@ typedef enum { #define M_PS_MOD_STOPPED "LIBMODULE_MOD_STOPPED" /* - * Module's pubsub API flags (m_mod_tell(), m_mod_publish(), m_mod_broadcast()) + * Module's pubsub API flags (m_mod_ps_tell(), m_mod_ps_publish(), m_mod_ps_broadcast()) */ typedef enum { M_PS_AUTOFREE = 1 << 0, // Autofree PubSub data after every recipient receives message (ie: when ps_evt ref counter goes to 0) @@ -87,7 +87,6 @@ typedef struct { /* PubSub messages */ typedef struct { - bool system; // Is this a system message? const m_mod_t *sender; const char *topic; const void *data; // NULL for system messages @@ -232,12 +231,10 @@ m_mod_t *m_mod_lookup(const m_mod_t *mod, const char *name); int m_mod_become(m_mod_t *mod, m_evt_cb new_on_evt); int m_mod_unbecome(m_mod_t *mod); -/* Module PubSub interface */ +/* Module PubSub interface (Subscribe/unsubscribe API is below under the event sources management) */ int m_mod_ps_tell(m_mod_t *mod, const m_mod_t *recipient, const void *message, m_ps_flags flags); int m_mod_ps_publish(m_mod_t *mod, const char *topic, const void *message, m_ps_flags flags); int m_mod_ps_poisonpill(m_mod_t *mod, const m_mod_t *recipient); -int m_mod_ps_subscribe(m_mod_t *mod, const char *topic, m_src_flags flags, const void *userptr); -int m_mod_ps_unsubscribe(m_mod_t *mod, const char *topic); /* Events' stashing API */ int m_mod_stash(m_mod_t *mod, const m_evt_t *evt); @@ -246,6 +243,9 @@ ssize_t m_mod_unstash(m_mod_t *mod, size_t len); /* Event Sources management */ ssize_t m_mod_src_len(const m_mod_t *mod, m_src_types type); +int m_mod_ps_subscribe(m_mod_t *mod, const char *topic, m_src_flags flags, const void *userptr); +int m_mod_ps_unsubscribe(m_mod_t *mod, const char *topic); + int m_mod_src_register_fd(m_mod_t *mod, int fd, m_src_flags flags, const void *userptr); int m_mod_src_deregister_fd(m_mod_t *mod, int fd); @@ -268,11 +268,10 @@ int m_mod_src_register_thresh(m_mod_t *mod, const m_src_thresh_t *thr, m_src_fla int m_mod_src_deregister_thresh(m_mod_t *mod, const m_src_thresh_t *thr); /* Event batch management */ -int m_mod_set_batch_size(m_mod_t *mod, size_t len); -int m_mod_set_batch_timeout(m_mod_t *mod, uint64_t timeout_ns); +int m_mod_batch_set(m_mod_t *mod, size_t len, uint64_t timeout_ns); /* Mod tokenbucket */ -int m_mod_set_tokenbucket(m_mod_t *mod, uint32_t rate, uint64_t burst); +int m_mod_tb_set(m_mod_t *mod, uint32_t rate, uint64_t burst); /* Generic event source registering functions */ #define m_mod_src_register(mod, X, flags, userptr) _Generic((X) + 0, \ diff --git a/Lib/core/src.c b/Lib/core/src.c index d140cb3a..f2544faa 100644 --- a/Lib/core/src.c +++ b/Lib/core/src.c @@ -200,56 +200,56 @@ static ev_src_t *create_src(m_mod_t *mod, m_src_types type, process_cb proc, } static int fdcmp(void *my_data, void *node_data) { - ev_src_t *src = (ev_src_t *)node_data; - int fd = *((int *)my_data); + const ev_src_t *node = (const ev_src_t *)node_data; + const ev_src_t *other = (const ev_src_t *)my_data; - return fd - src->fd_src.fd; + return other->fd_src.fd - node->fd_src.fd; } static int tmrcmp(void *my_data, void *node_data) { - ev_src_t *src = (ev_src_t *)node_data; - const m_src_tmr_t *its = (const m_src_tmr_t *)my_data; + const ev_src_t *node = (const ev_src_t *)node_data; + const ev_src_t *other = (const ev_src_t *)my_data; - return its->ns - src->tmr_src.its.ns; + return other->tmr_src.its.ns - node->tmr_src.its.ns; } static int sgncmp(void *my_data, void *node_data) { - ev_src_t *src = (ev_src_t *)node_data; - const m_src_sgn_t *sgs = (const m_src_sgn_t *)my_data; + const ev_src_t *node = (const ev_src_t *)node_data; + const ev_src_t *other = (const ev_src_t *)my_data; - return sgs->signo - src->sgn_src.sgs.signo; + return other->sgn_src.sgs.signo - node->sgn_src.sgs.signo; } static int pathcmp(void *my_data, void *node_data) { - ev_src_t *src = (ev_src_t *)node_data; - const m_src_path_t *pt = (const m_src_path_t *)my_data; + const ev_src_t *node = (const ev_src_t *)node_data; + const ev_src_t *other = (const ev_src_t *)my_data; - return strcmp(pt->path, src->path_src.pt.path); + return strcmp(other->path_src.pt.path, node->path_src.pt.path); } static int pidcmp(void *my_data, void *node_data) { - ev_src_t *src = (ev_src_t *)node_data; - const m_src_pid_t *pid = (const m_src_pid_t *)my_data; + const ev_src_t *node = (const ev_src_t *)node_data; + const ev_src_t *other = (const ev_src_t *)my_data; - return pid->pid - src->pid_src.pid.pid; + return other->pid_src.pid.pid - node->pid_src.pid.pid; } static int taskcmp(void *my_data, void *node_data) { - ev_src_t *src = (ev_src_t *)node_data; - const m_src_task_t *tid = (const m_src_task_t *)my_data; + const ev_src_t *node = (const ev_src_t *)node_data; + const ev_src_t *other = (const ev_src_t *)my_data; - return tid->tid - src->task_src.tid.tid; + return other->task_src.tid.tid - node->task_src.tid.tid; } static int threshcmp(void *my_data, void *node_data) { - ev_src_t *src = (ev_src_t *)node_data; - const m_src_thresh_t *thr = (const m_src_thresh_t *)my_data; + const ev_src_t *node = (const ev_src_t *)node_data; + const ev_src_t *other = (const ev_src_t *)my_data; - long double my_val = (long double)thr->activity_freq - + (long double)thr->inactive_ms; - long double their_val = (long double)src->thresh_src.thr.activity_freq - + (long double)src->thresh_src.thr.inactive_ms; - return my_val - their_val; + long double other_val = (long double)other->thresh_src.thr.activity_freq + + (long double)other->thresh_src.thr.inactive_ms; + long double node_val = (long double)node->thresh_src.thr.activity_freq + + (long double)node->thresh_src.thr.inactive_ms; + return other_val - node_val; } static ev_src_t *process_ps(ev_src_t *this, m_ctx_t *c, int idx, evt_priv_t *evt) { @@ -394,7 +394,14 @@ int deregister_mod_src(m_mod_t *mod, m_src_types type, void *src_data) { M_MOD_ASSERT(mod); M_MOD_CONSUME_TOKEN(mod); - return m_bst_remove(mod->srcs[type], src_data); + // Create onetime src to check the bst + ev_src_t *src = create_src(mod, type, src_procs_map[type], src_data, 0, NULL); + if (!src) { + return -EINVAL; + } + int ret = m_bst_remove(mod->srcs[type], src); + m_mem_unref(src); + return ret; } int start_task(m_ctx_t *c, ev_src_t *src) { diff --git a/Lib/utils/log.c b/Lib/utils/log.c index de33d5de..fe22a272 100644 --- a/Lib/utils/log.c +++ b/Lib/utils/log.c @@ -27,6 +27,9 @@ X_LOG_LEVELS /** **/ static inline m_logger_level find_level(const char *level_str) { + if (!level_str) { + return -1; + } static const char *lvl_names[] = { #define X_LOG_LEVEL(name) #name, X_LOG_LEVELS @@ -47,27 +50,27 @@ static __attribute__((constructor (111))) void libmodule_log_init(void) { X_LOG_CTXS #undef X_LOG_CTX }; - char *log_env; + + // Load fallback global level + int global_level = find_level(getenv("LIBMODULE_LOG")); + if (global_level == -1) { + // Default value + global_level = ERR; + } + char env_name[64]; - bool log_set[X_LOG_CTX_MAX] = {0}; + // Now load log levels for each context for (int i = 0; i < X_LOG_CTX_MAX; i++) { - /* Default values */ + // Default noop logger libmodule_logger.DEBUG[i] = libmodule_log_noop; libmodule_logger.INFO[i] = libmodule_log_noop; libmodule_logger.WARN[i] = libmodule_log_noop; libmodule_logger.ERR[i] = libmodule_log_noop; - - int log_level = ERR; + snprintf(env_name, sizeof(env_name), "LIBMODULE_LOG_%s", ctx_names[i]); - log_env = getenv(env_name); - if (log_env) { - log_level = find_level(log_env); - if (log_level != -1) { - log_set[i] = true; - } else { - // Default value - log_level = ERR; - } + int log_level = find_level(getenv(env_name)); + if (log_level == -1) { + log_level = global_level; } switch (log_level) { case DEBUG: @@ -81,32 +84,7 @@ static __attribute__((constructor (111))) void libmodule_log_init(void) { break; } } - - int log_level = ERR; - log_env = getenv("LIBMODULE_LOG"); - if (log_env) { - log_level = find_level(log_env); - if (log_level == -1) { - // Default value - log_level = ERR; - } - } - for (int i = 0; i < X_LOG_CTX_MAX; i++) { - if (!log_set[i]) { - switch (log_level) { - case DEBUG: - libmodule_logger.DEBUG[i] = libmodule_log_DEBUG; - case INFO: - libmodule_logger.INFO[i] = libmodule_log_INFO; - case WARN: - libmodule_logger.WARN[i] = libmodule_log_WARN; - default: - libmodule_logger.ERR[i] = libmodule_log_ERR; - break; - } - } - } - + const char *log_file_path = getenv("LIBMODULE_LOG_OUTPUT"); if (log_file_path) { libmodule_logger.log_file = fopen(log_file_path, "w"); diff --git a/Samples/Task/pippo.c b/Samples/Task/pippo.c index 45695429..635c95b2 100644 --- a/Samples/Task/pippo.c +++ b/Samples/Task/pippo.c @@ -30,7 +30,7 @@ static bool m_mod_on_start(m_mod_t *mod) { m_mod_src_register(mod, &((m_src_tmr_t) { CLOCK_MONOTONIC, (uint64_t)1 * 1000 * 1000 * 1000 }), 0, &tmrData); // 1s m_mod_src_register(mod, &((m_src_task_t) { 8, inc }), 0, &thData); - m_mod_set_batch_timeout(mod, 1500); // 1500ms! + m_mod_batch_set(mod, 0, 1500); // 1500ms! return true; } diff --git a/TODO.md b/TODO.md index 88f0be87..5e20ebbf 100644 --- a/TODO.md +++ b/TODO.md @@ -2,10 +2,25 @@ ### TODO +#### Mod + +- [x] Drop `system` field from m_evt_ps_t? +- [x] Unify `m_mod_set_batch_size` and `m_mod_set_batch_timeout` under `m_mod_batch_set(size_t, uint64_t)` +- [x] Rename `m_mod_set_tokenbucket` to `m_mod_tb_set`? + +### Src +- [x] double check m_bst_insert/remove usage in src API + add unit tests! + +#### Ctx +- [ ] use pthread_setname_np() to store each context thread name (max 16chars len; drop ctx->name field) ? + +#### Generic + +- [ ] expose a `libmodule_set_loglevel` API? +- [ ] allow users to override default logging env variables prefix (ie: "LIBMODULE_")? + #### DOC -- [x] Fully rewrite documentation per-namespace -- [x] Document that m_ctx_deregister() cannot be called on a looping context (`M_PARAM_ASSERT(c && *c && (*c)->state == M_CTX_IDLE);`) - [ ] document m_evt_t memref'd behaviour!!! - [ ] Document stats and thresh activity_freq (num_action_per_ms) diff --git a/docs/concepts/ctx.md b/docs/concepts/ctx.md index 60a5c8bb..45513d04 100644 --- a/docs/concepts/ctx.md +++ b/docs/concepts/ctx.md @@ -6,7 +6,7 @@ This can be particularly useful when dealing with 2+ threads; each thread has it Modules can only see and reach (through PubSub messaging) other modules from same context. A context is given a name at registration time. This is only useful for logging purposes. -> NOTE: having multiple contexts with same name is allowed; given that each context is thread-specific, there will be no clash. +> **NOTE:** having multiple contexts with same name is allowed; given that each context is thread-specific, there will be no clash. > Of course, it's better to set different names. ## Loop diff --git a/docs/concepts/mod.md b/docs/concepts/mod.md index ffc159e5..35e0a565 100644 --- a/docs/concepts/mod.md +++ b/docs/concepts/mod.md @@ -31,41 +31,53 @@ First of all, module lifecycle is automatically managed by libmodule; moreover, module registration/deregistration is completely automated and transparent to developer. This means that when using it, you will only have to declare a source file as a module and define needed functions. -First function to be called is `m_on_boot()`; it is called right after libmodule gets linked. -This function is useful to set a custom memhook for libmodule, through `m_set_memhook()` API. - > **_EASY API_**: each module's `m_mod_on_boot()` function is called. At this stage, no module is registered yet. -Finally, libmodule will register every module. +libmodule users will need to register a context on the currently running thread (`m_ctx_register` API). +Then, every module needs to be registered within the context (`m_mod_register` API). Once a module is registered, it will be initially set to `M_MOD_IDLE` state. Idle means that the module is not started yet, thus it cannot receive any event. +> **_EASY API_**: context registration is implicit; every `M_MOD()` is then automatically registered. + As soon as its context starts looping, `m_mod_on_eval()` function will be called on every idle module, trying to start it right away. That function will be called at each state machine change, for each idle module, until it returns true. +> **NOTE:** passing a NULL `m_mod_on_eval` callback is allowed, and just means to start the module at first evaluation. + As soon as `m_mod_on_eval()` returns true, a module is started. It means its `m_mod_on_start()` function is finally called and its state is set to M_MOD_RUNNING. When a module reaches RUNNING state, its context loop will finally receive events for its registered sources. +> **NOTE:** passing a NULL `m_mod_on_start` callback is allowed, and just means to skip the on start callback. + Whenever an event triggers a module's wakeup, its `m_mod_on_evt()` callback can be called (depending on the event source priority) with a `m_queue_t` argument. Finally, when stopping a module, its `m_mod_on_stop()` function is called. Note that a module is automatically stopped during the process of module's deregistration. -Thus, for Easy API, you should implement `m_mod_on_eval()` to return true when the module is ready to be started, -then eventually register event sources in `m_mod_on_start()`, and manage events in `m_mod_on_evt()`. -If you need to cleanup any data, manage it in `m_mod_on_stop()`. +> **NOTE:** passing a NULL `m_mod_on_stop` callback is allowed, and just means to skip the on stop callback. + +> **_EASY API_**: you should implement `m_mod_on_eval()` to return true when the module is ready to be started, +> then eventually register event sources in `m_mod_on_start()`, and manage events in `m_mod_on_evt()`. +> If you need to cleanup any data, manage it in `m_mod_on_stop()`. ### States As previously mentioned, a registered module, before being started, is in` M_MOD_IDLE` state. It means that it has never been started before; it won't receive any event thus its event callback will never be called. When module is started, thus reaching `M_MOD_RUNNING` state, all its registered sources will finally start to receive events. Sources registered while in `M_MOD_RUNNING` state are automatically polled. -If a module is paused, reaching `M_MOD_PAUSED` state, it will stop polling on its event sources, but event sources will still be kept alive. Thus, as soon as module is resumed, all events received during paused state will trigger m_evt_cb. +If a module is paused, reaching `M_MOD_PAUSED` state, it will stop polling on its event sources, but event sources will still be kept alive. Thus, as soon as module is resumed, all events received during paused state will trigger its event callback. If a module gets stopped, reaching `M_MOD_STOPPED` state, it will destroy any registered source and it will be resetted to its initial state. If you instead wish to stop a module letting it manage any already-enqueued event, you need to send a _POISONPILL_ message to it, through `m_mod_poisonpill()` API. The difference between `M_MOD_IDLE` and `M_MOD_STOPPED` states is that idle modules will be evaluated to be started during context loop, while stopped modules won't. When a module is deregistered, it will reach a final `M_MOD_ZOMBIE` state. It means that the module is no more reachable neither usable, but it can still be referenced by any previously sent message (or any `m_mod_ref()`). -After all module's ref count drops to 0 (ie: all sent messages are received by respective recipients and there are no pending `m_mod_unref()`) module will finally get destroyed and its memory freed. -You can call only `m_mod_is()`, `m_mod_name()` and `m_mod_ctx()` on a zombie module. +In zombie state, most of the module API won't be available aside from some minor read-only functions: + +* `m_mod_name` +* `m_mod_userdata` +* `m_mod_state` +* `m_mod_is` + +After all module's ref count drops to 0 (ie: all sent messages are received by respective recipients and there are no pending `m_mod_unref()`), a module will finally get destroyed and its memory freed. To summarize: diff --git a/docs/core/core.md b/docs/core/core.md index efed74c7..ab54d701 100644 --- a/docs/core/core.md +++ b/docs/core/core.md @@ -12,13 +12,14 @@ It is made of multiple headers: For the sake of readiness, function params where an output value will be stored, are marked with `OUT` (empty) macro. -> **All the libmodule core API returns an errno-style negative error code, where left unspecified.** +> **NOTE:** all the libmodule core API returns an errno-style negative error code, where left unspecified.** ## Memory ### Ref counted Ideally, all of the exposed pointers have their lifetime reference based. -This means that you can call `m_mem_ref()` API to manage eg: `m_mod_t`, `m_ctx_t`, `m_evt_t` pointers, and so on. +This means that you can call `m_mem_ref()` API to manage eg: `m_mod_t`, `m_evt_t` pointers, and so on. Normally, there is no such need because the library manages everything. -But if you called `m_mod_ref()`, then you own a reference on that module and it's up to you to free the reference by calling `m_mem_unref()` on it. +But if you called `m_mem_ref()` on an object, then you own a reference on it and it's up to you to free the reference by calling `m_mem_unref()` on it, +otherwise you would be causing leaks. diff --git a/docs/core/ctx.md b/docs/core/ctx.md index 284fbd15..b572dcfc 100644 --- a/docs/core/ctx.md +++ b/docs/core/ctx.md @@ -3,8 +3,8 @@ Ctx API denotes libmodule interface functions to manage contexts. It can be found under ``. -> NOTE: there is no context handler visible to user, because the handler is basically the thread itself. -> Trying to use the context API on a thread that has no context associated, will promptly return -EPIPE errno code. +> **NOTE:** there is no context handler visible to user, because the handler is basically the thread itself. +> Trying to use the context API on a thread that has no context associated, will promptly return -EPIPE errno code. ## Types @@ -52,11 +52,11 @@ int m_ctx_register(const char *ctx_name, m_ctx_flags flags, const void *userdata int m_ctx_deregister(); ``` > Deregister the ctx associated with the current thread. -> NOTE: this API cannot be called if the ctx is still looping. +> **NOTE:** this API cannot be called if the ctx is still looping. > Make sure to `m_ctx_quit` the loop before deregistering a context. -> NOTE: unless a ctx is registered with `M_CTX_PERSIST` flag, it will get +> **NOTE:** unless a ctx is registered with `M_CTX_PERSIST` flag, it will get > automatically destroyed when there are no more modules registered in it. -> NOTE: all of the modules in the context will be deregistered +> **NOTE:** all of the modules in the context will be deregistered > when their context gets deregistered. ```C @@ -72,7 +72,7 @@ int m_ctx_set_logger(m_log_cb logger); int m_ctx_loop(void); ``` > Loop a ctx in a blocking manner, until `m_ctx_quit` is called by any module. -> NOTE: stopping a ctx is a blocking action: +> **NOTE:** stopping a ctx is a blocking action: > all present events will be flushed to their modules, > and, in case any `M_SRC_TYPE_TASK` src is enabled, > its thread will be joined for a clean exit. @@ -104,7 +104,7 @@ int m_ctx_dispatch(void); > then, consecutive calls will dispatch ctx events. > Finally, after `m_ctx_quit` has been called, > it will notify the ctx to stop. -> NOTE: stopping a ctx is a blocking action: +> **NOTE:** stopping a ctx is a blocking action: > all present events will be flushed to their modules, > and, in case any `M_SRC_TYPE_TASK` src is enabled, > it will join its thread. @@ -167,7 +167,7 @@ int m_ctx_fs_set_root(const char *path); int m_ctx_fs_set_ops(const struct fuse_operations *ops); ``` > Set specified FUSE operations to context. Must be set before the ctx loop is started. -> NOTE: module files will always be created readonly. +> **NOTE:** module files will always be created readonly. **Params:** diff --git a/docs/core/mod.md b/docs/core/mod.md index 3c4f7776..7e1110f2 100644 --- a/docs/core/mod.md +++ b/docs/core/mod.md @@ -3,5 +3,380 @@ Mod API denotes libmodule interface functions to manage modules. It can be found in `` header. -> **All the mod API expects a non-NULL mod handler**, except for ... functions. +> **NOTE:** all the mod API expects a non-NULL mod handler, except for `m_mod_register` function. + +## Types + +```C +typedef enum { + M_SRC_TYPE_PS, // PubSub Source + M_SRC_TYPE_FD, // FD Source -> M_SRC_PRIO_HIGH flag is implicit + M_SRC_TYPE_TMR, // Timer Source + M_SRC_TYPE_SGN, // Signal Source + M_SRC_TYPE_PATH, // Path Source + M_SRC_TYPE_PID, // PID Source + M_SRC_TYPE_TASK, // Task source -> M_SRC_ONESHOT flag is implicit + M_SRC_TYPE_THRESH,// Threshold source -> M_SRC_ONESHOT flag is implicit + M_SRC_TYPE_END // End of supported sources +} m_src_types; +``` +> List of available event source types + +```C +typedef enum { + M_SRC_PRIO_LOW = 1 << 0, // Src low priority + M_SRC_PRIO_NORM = 1 << 1, // Src mid priority (default) + M_SRC_PRIO_HIGH = 1 << 2, // Src high priority + M_SRC_AUTOFREE = 1 << 3, // Automatically free userdata upon source deregistation. + M_SRC_ONESHOT = 1 << 4, // Run just once then automatically deregister source. + M_SRC_DUP = 1 << 5, // Duplicate PubSub topic, source fd or source path. + M_SRC_FD_AUTOCLOSE = 1 << 16, // Automatically close fd upon deregistation. + M_SRC_TMR_ABSOLUTE = 1 << 24, // Absolute timer +} m_src_flags; +``` +> Bitmask of flags related to event source registration + +```C +typedef enum { + M_PS_AUTOFREE = 1 << 0, // Autofree PubSub data after every recipient receives message (ie: when ps_evt ref counter goes to 0) +} m_ps_flags; +``` +> Flags available when sending PubSub messages through m_mod_ps API + +```C +typedef struct { + clockid_t clock_id; // Clock_id (eg: CLOCK_MONOTONIC). Unsupported on libkqueue/kqueue + uint64_t ns; // Timer in ns +} m_src_tmr_t; + +typedef struct { + unsigned int signo; // Requested signal number source, as defined in signal.h +} m_src_sgn_t; + +typedef struct { + const char *path; // Path of file/folder to which subscribe for events + unsigned int events; // Desired events +} m_src_path_t; + +typedef struct { + pid_t pid; // Process id to be watched + unsigned int events; // Desired events. Unused on linux: only process exit is supported +} m_src_pid_t; + +typedef struct { + int tid; // Unique task id; it allows to run multiple times the same fn + int (*fn)(void *); // Function to be run on thread +} m_src_task_t; + +typedef struct { + uint64_t inactive_ms; // if != 0 -> if module is inactive for longer than this, an alarm will be received + double activity_freq; // if != 0 -> if module's activity is higher than this, an alarm will be received +} m_src_thresh_t; +``` +> Libmodule input src types for m_mod_src_register() API + +```C +typedef struct { + m_src_types type; // Event type + union { // Event data + m_evt_fd_t *fd_evt; + m_evt_ps_t *ps_evt; + m_evt_tmr_t *tmr_evt; + m_evt_sgn_t *sgn_evt; + m_evt_path_t *path_evt; + m_evt_pid_t *pid_evt; + m_evt_task_t *task_evt; + m_evt_thresh_t *thresh_evt; + }; + const void *userdata; // Event userdata, passed through m_mod_src_register() + uint64_t ts; // Event timestamp +} m_evt_t; +``` +> Event type received inside `m_mod_on_evt()` callback queue + +```C +typedef struct { + const m_mod_t *sender; + const char *topic; + const void *data; // NULL for system messages +} m_evt_ps_t; + +typedef struct { + int fd; +} m_evt_fd_t; + +typedef struct { + uint64_t ns; +} m_evt_tmr_t; + +typedef struct { + unsigned int signo; +} m_evt_sgn_t; + +typedef struct { + const char *path; + unsigned int events; +} m_evt_path_t; + +typedef struct { + pid_t pid; + unsigned int events; +} m_evt_pid_t; + +typedef struct { + unsigned int tid; + int retval; +} m_evt_task_t; + +typedef struct { + unsigned int id; + uint64_t inactive_ms; + double activity_freq; +} m_evt_thresh_t; +``` +> Event types stored inside m_evt_t + +```C +typedef enum { + M_MOD_IDLE = 1 << 0, + M_MOD_RUNNING = 1 << 1, + M_MOD_PAUSED = 1 << 2, + M_MOD_STOPPED = 1 << 3, + M_MOD_ZOMBIE = 1 << 4 +} m_mod_states; +``` +> Bitmask of possible module states + +```C +typedef enum { + M_MOD_NAME_DUP = 1 << 0, // Should module's name be strdupped? (force M_MOD_NAME_AUTOFREE flag) + M_MOD_NAME_AUTOFREE = 1 << 1, // Should module's name be autofreed? + M_MOD_ALLOW_REPLACE = 1 << 8, // Can module be replaced by another module with same name? + M_MOD_PERSIST = 1 << 9, // Module cannot be deregistered by direct call to m_mod_deregister (or by FS delete) while its context is looping + M_MOD_USERDATA_AUTOFREE = 1 << 10, // Automatically free module userdata upon deregister + M_MOD_DENY_CTX = 1 << 16, // Deny access to module's ctx through m_mod_ctx() (it means the module won't be able to call ctx API) + M_MOD_DENY_PUB = 1 << 17, // Deny access to module's publishing functions: m_mod_ps_{tell,publish,broadcast,poisonpill} + M_MOD_DENY_SUB = 1 << 18, // Deny access to m_mod_ps_(un)subscribe() +} m_mod_flags; +``` +> Bitmaks of module flags, passed at registration time + +```C +typedef bool (*m_start_cb)(m_mod_t *self); +typedef bool (*m_eval_cb)(m_mod_t *self); +typedef void (*m_evt_cb)(m_mod_t *self, const m_queue_t *const evts); +typedef void (*m_stop_cb)(m_mod_t *self); +``` +> Module callbacks types + +```C +typedef struct { + m_start_cb on_start; + m_eval_cb on_eval; + m_evt_cb on_evt; + m_stop_cb on_stop; +} m_mod_hook_t; +``` +> Userhook to hold callbacks from user + +```C +typedef struct { + uint64_t inactive_ms; + double activity_freq; + uint64_t sent_msgs; + uint64_t recv_msgs; +} m_mod_stats_t; +``` +> Hold stats about a module + +```C +#define M_PS_CTX_STARTED "LIBMODULE_CTX_STARTED" +#define M_PS_CTX_STOPPED "LIBMODULE_CTX_STOPPED" +#define M_PS_CTX_TICK "LIBMODULE_CTX_TICK" +#define M_PS_MOD_STARTED "LIBMODULE_MOD_STARTED" +#define M_PS_MOD_STOPPED "LIBMODULE_MOD_STOPPED" +``` +> Available system topics. Subscribe to any of them to receive messages produced by the internal system. + +## Functions + +```C +int m_mod_register(const char *name, OUT m_mod_t **mod_ref, const m_mod_hook_t *hook, + m_mod_flags flags, const void *userdata); +``` +> Register a new module in current thread context. + +**Params:** + +* `name`: name of the new module +* `mod_ref`: when not NULL, store a ref to the newly creted module. The ref must be then `m_mem_unref` to free up the module +* `hook`: module userhook; when NULL, treat `name` as a path to `dlopen` a runtime-loaded module +* `flags`: flags of the newly created module +* `userdata`: user pointer stored inside the mod + +```C +int m_mod_deregister(OUT m_mod_t **mod); +``` +> Deregister a module from current thread context. +> **NOTE:** deregistering a ctx will internally deregister any source, +> so that you don't need to care. +> **NOTE:** a module registered with the M_MOD_PERSIST flag cannot be deregistered +> while its context is still looping. + +**Params:** + +* `mod`: pointer to module's handler. Set to NULL after return. + +```C +const char *m_mod_name(const m_mod_t *mod); +``` +> Retrieve a module's name + +**Params:** + +* `mod`: module's handler + +```C +bool m_mod_is(const m_mod_t *mod, m_mod_states st); +``` +> Check if module is in a given state + +**Params:** + +* `mod`: module's handler +* `st`: bitmask of module states + +**Returns:** true if module is in one of the requested state. + +```C +m_mod_states m_mod_state(const m_mod_t *mod); +``` +> Retrieve module state + +**Params:** + +* `mod`: module's handler. + +**Returns:** module state + +```C +int m_mod_start(m_mod_t *mod); +``` +> Start a module + +**Params:** + +* `mod`: module's handler. + +```C +int m_mod_stop(m_mod_t *mod); +``` +> Stop a module + +**Params:** + +* `mod`: module's handler. + +```C +int m_mod_resume(m_mod_t *mod); +``` +> Resume a module, re-attaching all its sources + +**Params:** + +* `mod`: module's handler. + +```C +int m_mod_pause(m_mod_t *mod); +``` +> Pause a module, detaching all its sources + +**Params:** + +* `mod`: module's handler. + +```C +int m_mod_bind(m_mod_t *mod, m_mod_t *ref); +``` +> Bind a module'state to another one's +> **NOTE:** this means that `mod` will follow `ref` state, +> ie: it will be started when `ref` is started, +> stopped when it is stopped, and so on. + +**Params:** + +* `mod`: module's handler. +* `ref`: other module's reference + +```C +int m_mod_log(const m_mod_t *mod, const char *fmt, ...); +``` +> Log a formatted string from a module, using context logger + +**Params:** + +* `mod`: module's handler. +* `fmt`: string printf-like formatted +* `...`: variadic argument + +```C +int m_mod_dump(const m_mod_t *mod); +``` +> Dump a json of current module internal state + +**Params:** + +* `mod`: module's handler. + +```C +int m_mod_stats(const m_mod_t *mod, OUT m_mod_stats_t *stats); +``` +> Retrieve module stats + +**Params:** + +* `mod`: module's handler. +* `stats`: storage for module stats + +```C +const void *m_mod_userdata(const m_mod_t *mod); +``` +> Retrieve module userdata + +**Params:** + +* `mod`: module's handler. + +```C +m_mod_t *m_mod_lookup(const m_mod_t *mod, const char *name); +``` +> Find a module named `name` +> **NOTE:** the API does not take any reference on the other module. +> If you wish to store other module for future usage, +> make sure to `m_mem_ref` it; and them `m_mem_unref` it +> when it is not needed anymore. + +**Params:** + +* `mod`: module's handler. +* `name`: name of the module to be found + +```C +int m_mod_become(m_mod_t *mod, m_evt_cb new_on_evt); +``` +> Push a new on_evt callback onto the callbacks stack + +**Params:** + +* `mod`: module's handler. +* `new_on_evt`: new on_evt callback + +```C +int m_mod_unbecome(m_mod_t *mod); +``` +> Pop an on_evt callback from the callbacks stack + +**Params:** + +* `mod`: module's handler. diff --git a/tests/main.c b/tests/main.c index b8638bc4..c8fb430a 100644 --- a/tests/main.c +++ b/tests/main.c @@ -35,12 +35,19 @@ int main(void) { */ cmocka_unit_test(test_mod_register_same_name), - /* Test modules_ API */ + /* Test ctx API */ cmocka_unit_test(test_ctx_set_logger_NULL_logger), cmocka_unit_test(test_ctx_set_logger), cmocka_unit_test(test_ctx_quit_no_loop), cmocka_unit_test(test_ctx_dump), + /* Test module src api */ + cmocka_unit_test(test_mod_src_tmr), + cmocka_unit_test(test_mod_src_sgn), + cmocka_unit_test(test_mod_src_path), + cmocka_unit_test(test_mod_src_pid), + cmocka_unit_test(test_mod_src_thresh), + /* Test module state setters */ cmocka_unit_test(test_mod_start_NULL_self), cmocka_unit_test(test_mod_start), diff --git a/tests/test_mod.c b/tests/test_mod.c index 38936cbd..91372903 100644 --- a/tests/test_mod.c +++ b/tests/test_mod.c @@ -7,6 +7,12 @@ #include #include #include +#ifdef __linux__ +#include +#else +#include +#endif +#include static bool init_false(m_mod_t *mod); static void mod_recv(m_mod_t *mod, const m_queue_t *const evts); @@ -261,6 +267,120 @@ void test_mod_add_fd(void **state) { assert_true(ret == -EEXIST); } +void test_mod_src_tmr(void **state) { + (void) state; /* unused */ + + const m_src_tmr_t my_tmr = {.ns = 5000 }; + + int ret = m_mod_src_register_tmr(test_mod, &my_tmr, 0, NULL); + assert_true(ret == 0); + + /* Try to register again, expect -EEXIST error */ + ret = m_mod_src_register_tmr(test_mod, &my_tmr, 0, NULL); + assert_true(ret == -EEXIST); + + size_t len = m_mod_src_len(test_mod, M_SRC_TYPE_TMR); + assert_true(len == 1); + + ret = m_mod_src_deregister_tmr(test_mod, &my_tmr); + assert_true(ret == 0); + + len = m_mod_src_len(test_mod, M_SRC_TYPE_TMR); + assert_true(len == 0); +} + +void test_mod_src_sgn(void **state) { + (void) state; /* unused */ + + const m_src_sgn_t my_sgn = {.signo = 10 }; + + int ret = m_mod_src_register_sgn(test_mod, &my_sgn, 0, NULL); + assert_true(ret == 0); + + /* Try to register again, expect -EEXIST error */ + ret = m_mod_src_register_sgn(test_mod, &my_sgn, 0, NULL); + assert_true(ret == -EEXIST); + + size_t len = m_mod_src_len(test_mod, M_SRC_TYPE_SGN); + assert_true(len == 1); + + ret = m_mod_src_deregister_sgn(test_mod, &my_sgn); + assert_true(ret == 0); + + len = m_mod_src_len(test_mod, M_SRC_TYPE_SGN); + assert_true(len == 0); +} + +void test_mod_src_path(void **state) { + (void) state; /* unused */ + + #ifdef __linux__ + const m_src_path_t my_path = {.path = "/tmp", .events = IN_CREATE }; + #else + const m_src_path_t my_path = {.path = "/tmp", .events = NOTE_WRITE }; + #endif + + int ret = m_mod_src_register_path(test_mod, &my_path, 0, NULL); + assert_true(ret == 0); + + /* Try to register again, expect -EEXIST error */ + ret = m_mod_src_register_path(test_mod, &my_path, 0, NULL); + assert_true(ret == -EEXIST); + + size_t len = m_mod_src_len(test_mod, M_SRC_TYPE_PATH); + assert_true(len == 1); + + ret = m_mod_src_deregister_path(test_mod, &my_path); + assert_true(ret == 0); + + len = m_mod_src_len(test_mod, M_SRC_TYPE_PATH); + assert_true(len == 0); +} + +void test_mod_src_pid(void **state) { + (void) state; /* unused */ + + const m_src_pid_t my_pid = {.pid = getpid(), .events = 0 }; + + int ret = m_mod_src_register_pid(test_mod, &my_pid, 0, NULL); + assert_true(ret == 0); + + /* Try to register again, expect -EEXIST error */ + ret = m_mod_src_register_pid(test_mod, &my_pid, 0, NULL); + assert_true(ret == -EEXIST); + + size_t len = m_mod_src_len(test_mod, M_SRC_TYPE_PID); + assert_true(len == 1); + + ret = m_mod_src_deregister_pid(test_mod, &my_pid); + assert_true(ret == 0); + + len = m_mod_src_len(test_mod, M_SRC_TYPE_PID); + assert_true(len == 0); +} + +void test_mod_src_thresh(void **state) { + (void) state; /* unused */ + + const m_src_thresh_t my_thresh = {.activity_freq = 10.0f }; + + int ret = m_mod_src_register_thresh(test_mod, &my_thresh, 0, NULL); + assert_true(ret == 0); + + /* Try to register again, expect -EEXIST error */ + ret = m_mod_src_register_thresh(test_mod, &my_thresh, 0, NULL); + assert_true(ret == -EEXIST); + + size_t len = m_mod_src_len(test_mod, M_SRC_TYPE_THRESH); + assert_true(len == 1); + + ret = m_mod_src_deregister_thresh(test_mod, &my_thresh); + assert_true(ret == 0); + + len = m_mod_src_len(test_mod, M_SRC_TYPE_THRESH); + assert_true(len == 0); +} + void test_mod_rm_wrong_fd(void **state) { (void) state; /* unused */ diff --git a/tests/test_mod.h b/tests/test_mod.h index ea6539e3..12b8450b 100644 --- a/tests/test_mod.h +++ b/tests/test_mod.h @@ -27,6 +27,11 @@ void test_mod_unbecome(void **state); void test_mod_add_wrong_fd(void **state); void test_mod_add_fd_NULL_self(void **state); void test_mod_add_fd(void **state); +void test_mod_src_tmr(void **state); +void test_mod_src_sgn(void **state); +void test_mod_src_path(void **state); +void test_mod_src_pid(void **state); +void test_mod_src_thresh(void **state); void test_mod_rm_wrong_fd(void **state); void test_mod_rm_wrong_fd_2(void **state); void test_mod_rm_fd_NULL_self(void **state);