PipelineDB: Working with Data Streams

We’ve already touched on the issue of processing events in real time. Today, we’re going to return to this topic and talk about a fairly new and interesting tool: the streaming DBMS PipelineDB.

PipelineDB is built on and fully compatible with the PostgreSQL 9.4 codebase. It was first released in June 2015 and the enterprise version came out in January 2016.

Below, we’ll compare PipelineDB with similar solutions, give brief installation and initial setup instructions, and also look at a use case.

Processing Data in Real Time: A Trip through Time

We can summarize the principles of PipelineDB as constant requests and short-term data. In similar DBMS, we find the reverse: short-term requests and constant data. In PipelineDB, data is not saved, but enters a stream and is processed on the fly.

The first attempts to create an instrument for processing data on the fly were made at the end of 1980s, when active databases first appeared. They were extensions to existing DBMS for processing events using triggers and rules. We can look at HiPAC, Starburst, and ODE as examples. These, however, were not widely received: their area of application were fairly narrow, and the rule syntax was too complex and confusing.

Various systems for managing data streams appeared (Data Stream Management Systems) between the 1990s and early 2000s: TelegraphCQ (a PostgreSQL fork), StreamBase, and StreamSQL. These tools implemented a principle of using window operators and converting streams to tables, which SQL queries could then be applied to.

The appearance of these solutions was certainly a step forward, but they couldn’t provide high speeds or performance when working with large streams of data.

Tools for processing data without storage have become more and more widely used over the past 5-6 years. The most well-known examples of these are Storm and Heron. One of the more recent tools is Apache Calcite. All of these solutions have a complicated installation and setup, as well as a very difficult learning curve.

PipelineDB has several obvious advantages over these other tools:

  • Easy setup: just download and install the necessary packages
  • Easy-to-use (as a result of PostgreSQL compatibility)

Let’s look at how PipelineDB handles data streams. We’ll start by analyzing two key concepts: continuous views and streams.

Streams and Continuous Views

Streams and continuous views are the key abstractions of PipelineDB.
A stream is a sequence of events. Event records are written to streams the same way they written to tables in relational databases (for more information, see here). When an event enters a stream, a timestamp is added to it.

Streams in PipelineDB serve a support function and supply continuous views with data. Unlike tables, streams do not require a schematic. As long as the stream is interactive with at least one continuous view, data can be written to it.

A continuous view is a set of streams and tables that gets updated whenever new data is added. Filtered events are added to continuous views.

To better understand how PipelineDB works, we’ll give a few continuous view examples.

We can create a continuous view for calculating the number of daily unique visitors coming from external links:

SELECT date_trunc('day', arrival_timestamp) AS day,
  referrer::text, COUNT(DISTINCT user_id::integer)
FROM users_stream GROUP BY day, referrer;

We can also calculate the number of ad impressions on a site for the past 5 minutes:

SELECT COUNT(*) FROM imps_stream
WHERE (arrival_timestamp > clock_timestamp() - interval '5 minutes');

As we see, continuous views follow this simple format:


When creating a continuous view relative to streams, a SELECT operation is run, which filters data according to the given parameters.

We’ve stated the basic theoretical information needed for understanding the principles behind PipelineDB. Let’s now look at the practical side of things. We’ll first describe the installation and initial setup of PipelineDB, and then look at some practical examples.

Installation and Initial Setup

We’ll describe the Pipeline DB installation for Ubuntu 14.04. If you use another Linux distribution, please check the official documentation.

To install PipelineDB, we just have to run two commands:

$ wget https://www.pipelinedb.com/download/0.9.3/ubuntu14
$ sudo dpkg -i ubuntu14

Afterwards, we initialize the PipelineDB server:

$ pipeline-init -D [directory name]

With the -D option, we can enter a new directory that will be automatically created. This directory will contain the following:

base      	pg_hba.conf	pg_replslot   pg_subtrans  pipelinedb.auto.conf
global    	pg_ident.conf  pg_serial 	pg_tblspc	pipelinedb.conf
pg_clog   	pg_logical 	pg_snapshots  pg_twophase  postmaster.opts
pg_commit_ts  pg_multixact   pg_stat   	PG_VERSION   postmaster.pid
pg_dynshmem   pg_notify  	pg_stat_tmp   pg_xlog

PipelineDB’s main configuration is saved to the file pipelinedb.conf. The configuration is nearly identical to the corresponding PostgreSQL configuration.

By default, PipelineDB cannot accept connections from remote hosts. To change this, open the file pipelinedb.conf, find the section Connections and Authentication, uncomment the first line and make the following change:

listen_addresses = '*'

Afterwards, we enter specific hosts in the file pg_hba.conf:

host	all         	all         	/        	md5

If we need to accept connections from all possible hosts, we should change this line to look like the following:

host	all         	all        	md5

And that’s it. PipelineDB is ready to go.
To launch it in background mode, run the following command:

$ pipeline-ctl -D [directory name] -l pipelinedb.log start

Use Case: Analyzing Wikipedia Statistics

We looked at the theory behind PipelineDB and its installation and initial setup. Now let’s put PipelineDB to work.

Let’s look at an interesting example given in the PipelineDB official documentation: analyzing page visit statistics for Wikipedia and related projects (Wiktionary, Wikisources, Wikibooks, etc.) for an hour. These stats are publicly available. Information about each visit is given as a record containing the following fields:

Time of visit | project | visits per * | total bytes processed

We’re interested in the maximum, minimum, and average number of page visits for one hour, as well as the 99th percentile of visits.
We’ll execute a continuous view of requests:

$ psql -h localhost -p 5432 -d pipeline -c "ACTIVATE"

Afterwards, we create a continuous view:

$ psql -h localhost -p 5432 -d pipeline -c "CREATE CONTINUOUS VIEW wiki_stats AS
SELECT hour::timestamp, project::text,
           count(*) AS total_pages,
           sum(view_count::bigint) AS total_views,
           min(view_count) AS min_views,
           max(view_count) AS max_views,
           avg(view_count) AS avg_views,
           percentile_cont(0.99) WITHIN GROUP (ORDER BY view_count) AS p99_views,
           sum(size::bigint) AS total_bytes_served
    FROM wiki_stream
    GROUP BY hour, project;"

In this command we show that we’ll receive data for the continuous view of the wiki_stream stream. To create the stream, we need to download data from the site, unpack it, write it to a standard output, and then transfer it to PipelineDB with the COPY command:

$ curl -sL http://pipelinedb.com/data/wiki-pagecounts | gunzip | \
        psql -h localhost -p 5432 -d pipeline -c "
        COPY wiki_stream (hour, project, title, view_count, size) FROM STDIN"

We’d like to point out that this is an enormous amount of data (it’s saved in 80-90MB archives), and it may take some time to download it. Downloads can be stopped at any time with the standard Ctrl+C key combination.

When the download is complete, we run the command:

$ psql -h localhost -p 5432 -d pipeline -c "
SELECT * FROM wiki_stats ORDER BY total_views DESC";

The results will be displayed in a table (below is a small fragment):

     hour         |     project     | total_pages | total_views | min_views | max_views |       avg_views        |    p99_views     | total_bytes_served
 2015-06-01 01:00:00 | en              |     2590639 |     7640042 |         1 |    368264 |     2.9490955706294856 | 28.8354562848382 |    247434016274
 2015-06-01 00:00:00 | en              |     2556148 |     7554038 |         1 |    406121 |     2.9552428106666750 | 29.2785253533473 |    243707663997
 2015-06-01 01:00:00 | en.mw           |           1 |     5560465 |   5560465 |   5560465 |   5560465.000000000000 |          5560465 |    143619712266
 2015-06-01 00:00:00 | en.mw           |           1 |     5398933 |   5398933 |   5398933 |   5398933.000000000000 |          5398933 |    137434148764
 2015-06-01 02:00:00 | en              |      810196 |     2411275 |         1 |     18485 |     2.9761625581957946 | 28.6634029920602 |    75540476402
 2015-06-01 00:00:00 | es              |      395449 |     1705754 |         1 |     19397 |     4.3134614071599625 | 53.0791353769321 |    37829076747
 2015-06-01 01:00:00 | es              |      407703 |     1696681 |         1 |     19306 |     4.1615612345261134 | 50.7215532793044 |    37355945443
 2015-06-01 01:00:00 | es.mw           |           1 |     1205844 |   1205844 |   1205844 |   1205844.000000000000 |          1205844 |    20934926923
 2015-06-01 00:00:00 | es.mw           |           1 |     1192257 |   1192257 |   1192257 |   1192257.000000000000 |          1192257 |    20642415036
 2015-06-01 01:00:00 | ja              |      386039 |     1079228 |         1 |     27601 |     2.7956449995984862 | 23.9804451096668 |    38244974161


PipelineDB is an interesting and prospective tools. We hope it sees further development and refinement.
If you’ve ever used PipelineDB, we’d be happy to hear your experience in the comments below.

For anyone interested in reading more, please visit the following links: