Example: complex event loops

This Unix program by Dennis Jenkins illustrates how your program's event loop can wait on a mix of notifications from postgres (through libpqxx), signals, timeouts, and data from other file descriptors. (It has been modified here and there since it was originally written, so don't blame Dennis if anything goes wrong!)

You'll need to edit the program to customize the connection string. Then, to compile it with gcc:

g++ --std=c++0x -Wall -fPIC -pipe -o pqxx-test-notifier pqxx-test-notifier.cxx -lpqxx -lpq

Run the program without arguments. Or, to get a closer look at what's going on under the hood, run it through strace:

strace ./pqxx-test-notifier

While the program is running, send notifications via postgres. You can do this from pgAdmin, or from the command line with, for example:

psql -Upostgres -dpostgres -c"NOTIFY job_queue, 'hello world';"
/* This program will connect to a postgres database, register for notifications
 * on a given channel, and then sleep.  When it gets a notification the process
 * will "fork."  The parent will resume sleeping.  The child process will do
 * whatever (nothing in this demo) and then exit.
 * The parent process will also wake up when a child dies; when input is
 * available on STDIN; or when a timer expires.
 * Written 2011-09-03, by Dennis Jenkins.
 * Dennis can be reached as dennis [dot] jenkins [dot] 75 at gmail dot com.
 * Developed and tested on 64-bit Gentoo linux, postgresql 9.0.4, gcc 4.4.5, libpqxx svn ver 1745)
 * Example usage of libpqxx notifiers for Postgresql 9.0 at:
 *   svn://

#include <fcntl.h>
#include <sys/wait.h>
#include <unistd.h>

#include <cerrno>
#include <cstdlib>
#include <cstring>
#include <exception>
#include <iostream>
#include <string>

#include <pqxx/pqxx>

// Change these to suit your needs.
static const char conn_str[] =
  "user='postgres' password='' host='' port='5432' dbname='postgres'";
static const char channel_name[] = "job_queue";
static const char app_name[] = "\"pqxx-listener-test\"";

// Error from C standard library.  Checks errno for details.
class CStandardError: public std::runtime_error
  explicit CStandardError(const char where[]) :
    std::runtime_error(std::string(where) + ": " + std::strerror(errno))

// Close stdin, make it read from /dev/null.
void redirect_stdin()
  const int fd_null = open("/dev/null", O_RDONLY);
  if (fd_null == -1) throw CStandardError("Opening /dev/null");
  if (dup2(fd_null, 0) == -1) throw CStandardError("Redirecting stdin");

/* Sample child process.
void execute_child_process(int backend_pid, const std::string &payload)
  // Child can do whatever it wants to with the payload.
  // Maybe exec() a process or just run a simple function here.
  std::cout << "CHILD[" << backend_pid << "]: " << payload << std::endl;

  // Simulate a short-lived child process.

class MyNoticer: public pqxx::notification_receiver
  // Used by forked child to close libpq/pqxx connection to postgres backend.
  int m_nSocket;

  MyNoticer(pqxx::connection_base &c, const std::string &channel) :
    pqxx::notification_receiver(c, channel),

  virtual void operator()(const std::string &payload, int backend_pid)
    throw ()
    std::cout << "NOTIFY[" << backend_pid << "]: " << payload << std::endl;

    // Launch child process:
    pid_t pid = fork();

    switch (pid)
    case -1:
      // Failed.
      throw std::runtime_error("fork() failed.");

    case 0:
      // In child process.
      // Close postgresql socket.  Only parent process should use it.

      // Do whatever the child process wanted to do upon notification.
      execute_child_process(backend_pid, payload);

      // Finish.

      // In parent process.  "pid" is pid of child.
      std::cout << "Parent launched child " << pid << std::endl;

// Obligatory signal handler used by 'sigaction'.
// Don't reap children inside this handler; it can cause a race condition.
void sigchld_handler(int sig)
  std::cout << "Received SIGCHLD (" << sig << ")" << std::endl;

void mourn_child(pqxx::connection_base &db, pid_t pid, int exit_status)
    << "Child " << pid << " returned exit code " << exit_status << std::endl;

  /* If we wanted to, we could execute SQL using "db".  Maybe update a row in a
   * table, invoke a stored procedure, or send a NOTIFY to a different channel.

// Reap any dead children.
void reap_children(pqxx::connection_base &db)
  pid_t pid = 0;
  int status = 0;

  /* waitpid() will return pid of dead child, 0 when done or -1 when no
   * children are dead.
  while (0 < (pid = waitpid(-1, &status, WNOHANG)))
    mourn_child(db, pid, status);

// Read all available data from stdin and write it to stdout.
void handle_stdin_data()
  char buffer[4096];
  int n;

  std::cout << "stdin activity detected:" << std::endl;

  while (0 < (n = read(STDIN_FILENO, buffer, sizeof(buffer))))
    std::cout.write(buffer, n);

void main_loop()
  /* Step #1.  Mask SIGCHLD, but save the original process mask so that we can
   * pass it to 'pselect.'
  sigset_t mask;
  sigset_t origmask;
  struct sigaction sa;

  sigaddset(&mask, SIGCHLD);
  if (sigprocmask(SIG_BLOCK, &mask, &origmask) == -1)
    throw CStandardError("sigprocmask");

  sa.sa_flags = 0;
  sa.sa_handler = sigchld_handler;
  if (sigaction(SIGCHLD, &sa, NULL) == -1)
    throw CStandardError("sigaction");

  /* Step #2.  Connect to database.
  pqxx::connection db(conn_str);

  // Only useful on postgresql 9.0 or greater.
  db.set_variable("application_name", app_name);

  /* Step #3.  Use libpqxx to create object that handles 'NOTIFY' events.
   * It runs 'LISTEN' internally; don't exec 'LISTEN' yourself.
  MyNoticer receiver(db, channel_name);

  while (true)
    /* Step #4.  Create set of file descriptors to pass to 'pselect()'.
     * You can put anything in here that you want (files, sockets, serial ports,
     * descriptors from other sources).
     * IMPORTANT: If you want to catch 'NOTIFY' events, you MUST add the
     * 'db.sock()' descriptor!  This is the socket for the database connection,
     * where notifications from the database backend come in.
    fd_set read_fds;
    FD_SET(STDIN_FILENO, &read_fds);  // optional (just part of the demo)
    FD_SET(db.sock(), &read_fds);    // postgres connection (mandatory)
    const int max_fd = db.sock() + 1;

    /* Optional timeout (in case your process wanted to take action if no events
     * occur for a period of time).
    struct timespec ts;
    ts.tv_sec = 5;
    ts.tv_nsec = 0;

    /* Step #5.  Magic happens here!  Put process to sleep until something
     * "interesting" happens.
     * Unix was built to do "synchronous IO multiplexing".
     * pselect() can return the following information values:
     * -1 = pselect() failed or was interrupted (errno == EINTR).
     *  0 = timeout expired (timeout is optional).
     * >1 = count of file descriptors in any of the sets that have activity.
     * If the return value is positive, pselect() will have modified the file
     * descriptor sets: descriptors with activity will remain set, but all
     * others will be cleared.
     * pselect() can return "failure" (-1) for only one acceptable reason: it
     * was interrupted by a signal.  For this demo, we assume that that signal
     * is SIGCHLD.  Any other errno value is beyond the scope of the demo.
    switch (pselect(max_fd, &read_fds, NULL, NULL, &ts, &origmask))
    case -1:
      if (errno != EINTR) throw CStandardError("pselect");

      /* Interrupted by signal, which we assume is SIGCHLD.
       * We reap children here, but without updating read_fds.  So don't
       * re-use it when we're done reaping.

    case 0:
      // No file descriptors ready; we got here through timeout.
      std::cout << "pselect() timeout" << std::endl;

      // Check for, and handle, incoming data on stdin.
      if (FD_ISSET(STDIN_FILENO, &read_fds))

      // Check for, and handle, notifications.  Calls MyNoticer::operator().
      if (FD_ISSET(db.sock(), &read_fds))


int main()
    std::cout << "Running (our pid = " << getpid() << ")." << std::endl;
    std::cout << "Exiting." << std::endl;
  catch (const pqxx::pqxx_exception &e)
      << "*** Caught pqxx_exception:" << std::endl
      << e.base().what() << std::endl;

    const pqxx::sql_error *s = dynamic_cast<const pqxx::sql_error*>(&e.base());
    if (s) std::cerr << "Query was: " << s->query() << std::endl;

    return EXIT_FAILURE;

  return EXIT_SUCCESS;
Last modified 3 years ago Last modified on Jan 16, 2015, 5:21:47 PM