nagios4/lib/wproc.c

257 lines
5.9 KiB
C

/*
* Simple test-program to try multiplexing running other programs
* through the worker process layer.
*/
#include <time.h>
#include <signal.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/wait.h>
#include <fcntl.h>
#include <sys/poll.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <netinet/in.h>
#include "worker.h"
typedef struct simple_worker {
int pid, sd;
unsigned int job_index;
iocache *ioc;
} simple_worker;
/* we can't handle packets larger than 64MiB */
#define MAX_IOCACHE_SIZE (64 * 1024 * 1024)
static int sigreceived;
static iobroker_set *iobs;
static simple_worker *spawn_worker(void (*init_func)(void *), void *init_arg)
{
int sv[2];
int pid;
if (socketpair(AF_UNIX, SOCK_STREAM, 0, sv) < 0)
return NULL;
pid = fork();
if (pid < 0) {
close(sv[0]);
close(sv[1]);
return NULL;
}
/* parent leaves the child */
if (pid) {
simple_worker *worker = calloc(1, sizeof(simple_worker));
close(sv[1]);
if (!worker) {
kill(pid, SIGKILL);
close(sv[0]);
return NULL;
}
worker->sd = sv[0];
worker->pid = pid;
worker->ioc = iocache_create(1 * 1024 * 1024);
return worker;
}
/* child closes parent's end of socket and gets busy */
close(sv[0]);
if (init_func) {
init_func(init_arg);
}
enter_worker(sv[1], start_cmd);
/* not reached, ever */
exit(EXIT_FAILURE);
}
static void die(const char *msg)
{
perror(msg);
exit(EXIT_FAILURE);
}
static void sighandler(int sig)
{
sigreceived = sig;
printf("%d: caught sig %d\n", getpid(), sig);
}
static void child_exited(int sig)
{
int status;
pid_t result;
result = waitpid(-1, &status, 0);
printf("waitpid() status: %d; result %d: %s\n",
status, (int) result, strerror(errno));
if (WIFEXITED(status)) {
printf("Child with pid %d exited normally\n", (int) result);
}
if (WIFSIGNALED(status)) {
printf("Child caught signal %d\n", WTERMSIG(status));
printf("Child did%s produce a core dump\n", WCOREDUMP(status) ? "" : " not");
}
exit(1);
}
static int print_input(int sd, int events, void *wp_)
{
int ret, pkt = 0;
simple_worker *wp = (simple_worker *)wp_;
struct kvvec kvv = KVVEC_INITIALIZER;
char *buf;
unsigned long tot_bytes = 0, size;
/*
* if some command filled the buffer, we grow it and read some
* more until we hit the limit
* @todo Define a limit :p
*/
size = iocache_size(wp->ioc);
if (!iocache_capacity(wp->ioc)) {
if (iocache_size(wp->ioc) < MAX_IOCACHE_SIZE) {
/* double the size */
iocache_grow(wp->ioc, iocache_size(wp->ioc));
printf("Growing iocache for worker %d. sizes old/new %lu/%lu\n",
wp->pid, size, iocache_size(wp->ioc));
} else {
printf("iocache_size() for worker %d is already at max\n", wp->pid);
}
}
ret = iocache_read(wp->ioc, sd);
if (!ret) {
printf("Worker with pid %d seems to have crashed. Exiting\n", wp->pid);
exit(1);
}
if (ret < 0) {
printf("iocache_read() from worker %d returned %d: %m\n", wp->pid, ret);
return 0;
}
printf("read %d bytes from worker with pid %d::\n", ret, wp->pid);
while ((buf = worker_ioc2msg(wp->ioc, &size, 0))) {
int i, ret;
tot_bytes += size;
ret = worker_buf2kvvec_prealloc(&kvv, buf, (unsigned int)size, KVVEC_ASSIGN);
if (!ret < 0) {
printf("main: Failed to parse buffer of size %lu to key/value vector\n", size);
continue;
}
for (i = 0; i < kvv.kv_pairs; i++) {
struct key_value *kv = &kvv.kv[i];
if (!i && memcmp(kv->key, buf, kv->key_len)) {
printf("### kv[0]->key doesn't match buf. error in kvvec?\n");
}
printf("main: %2d.%02d: %s=%s\n", pkt, i, kv->key, kv->value);
}
pkt++;
}
printf("iocache: available: %lu; size: %lu; capacity: %lu\n",
iocache_available(wp->ioc), iocache_size(wp->ioc), iocache_capacity(wp->ioc));
printf("Got %d packets in %ld bytes (ret: %d)\n", pkt, tot_bytes, ret);
return 0;
}
#define NWPS 3
static simple_worker *wps[NWPS];
static int wp_index;
static int send_command(int sd, int events, void *discard)
{
char buf[8192];
int ret;
simple_worker *wp;
struct kvvec *kvv;
ret = read(sd, buf, sizeof(buf));
if (ret == 0) {
iobroker_close(iobs, sd);
return 0;
}
if (ret < 0) {
printf("main: Failed to read() from fd %d: %s",
sd, strerror(errno));
}
/* this happens when we're reading from stdin */
buf[--ret] = 0;
kvv = kvvec_create(5);
wp = wps[wp_index++ % NWPS];
kvvec_addkv(kvv, "job_id", (char *)mkstr("%d", wp->job_index++));
kvvec_addkv_wlen(kvv, "command", sizeof("command") - 1, buf, ret);
kvvec_addkv(kvv, "timeout", (char *)mkstr("%d", 10));
printf("Sending kvvec with %d pairs to worker %d\n", kvv->kv_pairs, wp->pid);
worker_send_kvvec(wp->sd, kvv);
kvvec_destroy(kvv, 0);
return 0;
}
void print_some_crap(void *arg)
{
char *str = (char *)arg;
printf("%d: Argument passed: %s\n", getpid(), str);
}
int main(int argc, char **argv)
{
simple_worker *wp;
int i;
#ifdef HAVE_SIGACTION
struct sigaction sig_action;
sig_action.sa_sigaction = NULL;
sigfillset(&sig_action.sa_mask);
sig_action.sa_flags=SA_NOCLDSTOP;
sig_action.sa_handler = child_exited;
sigaction(SIGCHLD, &sig_action, NULL);
sig_action.sa_flags = SA_NODEFER|SA_RESTART;
sig_action.sa_handler = sighandler;
sigfillset(&sig_action.sa_mask);
sigaction(SIGINT, &sig_action, NULL);
sigaction(SIGPIPE, &sig_action, NULL);
#else /* HAVE_SIGACTION */
signal(SIGINT, sighandler);
signal(SIGPIPE, sighandler);
signal(SIGCHLD, child_exited);
#endif /* HAVE_SIGACTION */
iobs = iobroker_create();
if (!iobs)
die("Failed to create io broker set");
for (i = 0; i < NWPS; i++) {
wp = spawn_worker(print_some_crap, "lalala");
if (!wp) {
die("Failed to spawn worker(s)\n");
}
wps[i] = wp;
printf("Registering worker sd %d with io broker\n", wp->sd);
iobroker_register(iobs, wp->sd, wp, print_input);
}
iobroker_register(iobs, fileno(stdin), NULL, send_command);
/* get to work */
while (!sigreceived && iobroker_get_num_fds(iobs)) {
iobroker_poll(iobs, -1);
}
for (i = 0; i < NWPS; i++) {
kill(wps[i]->pid, SIGKILL);
}
return 0;
}