From Apache Flume to Apache Impala using HDFS

March 31st, 2016 | cloudera, flume, hadoop, hbase, hdfs, impala |

Let’s use HDFS as a database !

So, we have data coming from one of our service, which is the source of a flume agent. Now, we want to be able to query them in a scalable fashion without using hbase or any other database, to be lean.

One way is to use HDFS as a database (Flume has a HDFS sink that handle partitioning), create a Hive table on top to query its content, and because we want something performant and fast, to actually use Impala to query the data using a Apache Parquet format.

Here’s a little diagram of the stack :

flume to impala

  • Apache Oozie is used to regularly export the HDFS content to a parquet format.
  • We are storing our data into HDFS in an Apache Avro format (Snappy compressed) because of all its advantages and because we are already using it everywhere.

Let’s review the stack one piece at a time, starting with the Flume configuration.

Flume

Let’s say we have a avro source where our custom service is sending events :

agent1.sources.events_source.type = avro
agent1.sources.events_source.bind = 0.0.0.0
agent1.sources.events_source.port = 9876

Let’s first configure the Flume HDFS sink where we are going to export our events, the configuration is pretty long but every piece has its importance :

agent1.sinks = ... s

agent1.sinks.s.type = hdfs
agent1.sinks.s.hdfs.path = /user/flume/events/ymd=%Y-%m-%d/h=%H
agent1.sinks.s.hdfs.inUsePrefix = .
agent1.sinks.s.hdfs.fileType = DataStream
agent1.sinks.s.hdfs.filePrefix = events
agent1.sinks.s.hdfs.fileSuffix = .avro
agent1.sinks.s.hdfs.rollInterval = 300
agent1.sinks.s.hdfs.rollSize = 0
agent1.sinks.s.hdfs.rollCount = 0
agent1.sinks.s.serializer = com.company.CustomAvroSerializer$Builder
agent1.sinks.s.channel = events_channel

Let’s review this config.

Partition

Because of our volume of data, we want to partition them per year-month-day then by hour. The “ymd=” and “h=” in the path are important, it represents the “column” name of the time dimension that will be queryable later.

Note that your Flume source must have a “timestamp” in the header for Flume to know what is the time dimension. If you don’t have this info, you can simply add hdfs.useLocalTimeStamp = true to use the ingestion time, but it’s discouraged because that means you don’t have any timestamp column in your data, and you’re going to get stuck later when doing some Impala partitioning.

Roll interval

We decide to roll a new file every 5min, and not based on size nor count (they have to be explicitely set to 0 because they have another default value).

By default, Flume buffers into a .tmp file we can’t rely on, and because we want to access the fresh data quickly, 5min is a good start. This is going to generate a bunch of file (144 per day), but we don’t care because we are going to export them later into a hourly parquet format and clean up the old HDFS content.

Moreover, if Flume suddenly dies, you are going to lose maximum 5min of data, instead of the whole buffer. Stopping properly Flume flushes the buffer hopefully.

File name

The inUsePrefix to “.” is to hide the working files to Hive during a query (it ignores the aka hidden files). If you don’t, some MapReduces can fail because at first, Hive saw a file Flume was buffering into (a .tmp), then the time to execute the MR, it was not there anymore (because of some flush), and kaboom, the MR will fail :

Caused by: java.io.FileNotFoundException: File does not exist: hdfs://hadoop01:8020/user/flume/events/ymd=2016–03–17/h=15/events.1458223202146.avro.tmp

File type

By default, the filetype is SequenceFile. We don’t want that, because that makes Flume convert the output stream to a SequenceFile that Hive will not be able to read because the avro schema won’t be inside. Setting it to DataStream let the data sent unaltered.

FYI, a typical SequenceFile body :

SEQ♠!org.apache.hadoop.io.LongWritable”org.apache.hadoop.io.BytesWritable ▒c(g▒s▒▒►▒▒▒|TF..

A snappy compressed avro file body :

Obj♦avro.schema▒8{"type":"record","name":"Order",...}avro.codecsnappy ▒▒T▒▒▒▒♣

Avro serializer

Our custom serializer is doing some conversion of the original event, and simply emits some avro using a DataFileWriter with the snappyCodec:

DatumWriter writer = new GenericDatumWriter<>(schema);
dataFileWriter = new DataFileWriter<>(writer)
                    .setCodec(CodecFactory.snappyCodec())
                    .create(schema, out);
              

Multiple Flume ?

Be careful if multiple Flume are writing to the same location, the buffer is not shareable and you could have name conflicts. By default, Flume named the file with a timestamp (in milliseconds). That’s fine most of the time but you never know if they are going to collide one day. Consider having two different configurations with a different prefix or suffix to the filename.

Performance consideration

Don’t forget to monitor your Flume when adding a HDFS sink. The overhead is noticeable; there are a lot more I/O threads (by default it’s 10 but I noticed way more threads with VisualVM), and the CPU slightly increases.

Our Flume is now properly sinking to HDFS, let’s check it out.

HDFS

First thing is to verify the data are correct and readable.

We check the partitioning is present :

$ hdfs dfs -ls /user/flume/events/ymd=2016-01-29
/user/flume/events/ymd=2016-01-29/h=00
/user/flume/events/ymd=2016-01-29/h=01
...

Then we check a file to see if it’s parsable :

$ hdfs dfs -cat /user/flume/events/ymd=2016–03–14/h=15/events.1458147890641.avro
Objavro.schema�8{“type”:”record”,”name”:”OrderRecord”,”namespace”:”com.company.avro”,”fields”:[...

Our schema is there, it’s not a SequenceFile, good! Let’s use the avro tools to deserialize the content. It’s simply a .jar with some useful functions (getschema, tojson), downloadable here.

$ curl -sLO http://apache.crihan.fr/dist/avro/avro-1.7.7/java/avro-tools-1.7.7.jar
$ java -jar avro-tools-1.7.7.jar getschema events.1458147890641.avro
{
  "type" : "record",
  "name" : "Order",
  "namespace" : "com.company",
  "fields" : [ {
    "name" : "type",
    "type" : "string"
  }, {
...
$ java -jar avro-tools-1.7.7.jar tojson logs.1458147890641.avro
{"type":"AD12","customer_id":2458189, ...
{"type":"AD12","customer_id":9515711, ...

Our HDFS is in place and get streamed new data. Let’s now configure Hive to create a table on top of the files.

Hive

Avro is hopefully standard in Hadoop and Hive has everything to read avro files to create a table.

AvroSerDe

The magic happens when using the AvroSerDe (Avro Serializer Deserializer). It is used to read avro files to create a table, and vice-versa, to create avro files from a table (with some INSERT). It also detects and uncompresses the files when compressed with Snappy.

Under the hood, it’s simply using DataFile[Writer|Reader] code to read/write avro content.

https://cwiki.apache.org/confluence/display/Hive/AvroSerDe

Create the table

We create a Hive external table mapped onto our .avro files using AvroSerDe and specifying a avro schema to read the data (it will probably be the same that the schema used to write the data BUT it could be different, such as only a slice):

CREATE EXTERNAL TABLE avro_events
PARTITIONED BY (ymd STRING, h INT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION '/user/flume/events/'
TBLPROPERTIES ('avro.schema.literal' = '
{
 "type" : "record",
 "name" : "Order",
 "namespace" : "com.company",
 "fields" : [ {
   "name" : "type",
   "type" : "string"
 } ]
}');

The PARTITIONED BY matches our structure and names :

$ hdfs dfs -ls /user/flume/events/ymd=2016–03–17/h=12 
/user/flume/events/ymd=2016–03–17/h=12/events.1458212422229.avro
/user/flume/events/ymd=2016–03–17/h=12/events.1458212456756.avro
...

It’s possible to externalize the schema into its proper file on HDFS and use it :

TBLPROPERTIES ('avro.schema.url' = '/user/flume/Order.avsc');

You can generate the schema if you don’t have it (if it’s generated on the fly by the code with some Scala macros for instance):

$ java -jar avro-tools-1.7.7.jar getschema events.avro > Order.avsc
$ hdfs dfs -put Order.avsc /user/flume/

ERR: Long schemas

If you have a looong schema, therefore a long query, you could end up with this error:

FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. MetaException(message:javax.jdo.JDODataStoreException: Put request failed : INSERT INTO “TABLE_PARAMS” (“PARAM_VALUE”,”TBL_ID”,”PARAM_KEY”) VALUES (?,?,?)
org.datanucleus.store.rdbms.exceptions.MappedDatastoreException: INSERT INTO “TABLE_PARAMS” (“PARAM_VALUE”,”TBL_ID”,”PARAM_KEY”) VALUES (?,?,?)
Caused by: org.postgresql.util.PSQLException: ERROR: value too long for type character varying(4000)

You must use an external schema in this case, your query is too long.

If you’re using a UI like Hue to do the query, and notice some weird things, use the hive shell, the error will be properly displayed, Hue won’t.

ERR: Partition and field names conflicts

If you have a column in your data which the name is the same as the partition dimensions name (ymd and h in our case), you are going to get this error :

FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. org.apache.hadoop.hive.ql.metadata.HiveException: Partition column name hour conflicts with table columns.

You need to change the partition column name you are using.

Notify Hive of the new partitions

Our data are in HDFS, our Hive table is mapped onto it, good. But Hive need to discover the data now, it’s not automatic because we are short-circuiting it (we are inserting data without any Hive INSERT).

msck repair table to the rescue. It simply looks in the folder to discover new directories and add them to the metadata.

hive> msck repair table avro_events;
OK
Partitions not in metastore: avro_events:ymd=2016–03–17/hour=12
Repair: Added partition to metastore avro_events:ymd=2016–03–17/h=12
Time taken: 2.339 seconds, Fetched: 1 row(s)

Hopefully, we are not going to do that manually each time we need, we are going to use Oozie later in the process, and directly via Impala.

Drop partitions

If you’re playing a bit and saw some “Partitions missing from filesystem” when doing a msck repair, you can remove the partitions (metadata) manually :

hive> alter table avro_events DROP IF EXISTS PARTITION (ymd='2016–03–21') PURGE;
Dropped the partition ymd=2016–03–21/h=13
Dropped the partition ymd=2016–03–21/h=14
Dropped the partition ymd=2016–03–21/h=15
Dropped the partition ymd=2016–03–21/h=16
Dropped the partition ymd=2016–03–21/h=17
Dropped the partition ymd=2016–03–21/h=18
Dropped the partition ymd=2016–03–21/h=19
Dropped the partition ymd=2016–03–21/h=20
Dropped the partition ymd=2016–03–21/h=21
Dropped the partition ymd=2016–03–21/h=22
Dropped the partition ymd=2016–03–21/h=23

Querying

You can already query the Hive table to get proper records.

For instance, if you have a column “timestamp”, it’s nice to check the latest ingested data time :

select max(`timestamp`) from avro_events;

Hive has some magic column we can used to get more insights about the table content. To know how many records each partition contains, we can use the virtual column INPUT__FILE__NAME :

hive> select INPUT__FILE__NAME, COUNT(*)
FROM events
GROUP BY INPUT__FILE__NAME
ORDER BY INPUT__FILE__NAME;
hdfs://hadoop:8020/events/ymd=2016-03-22/h=00/...avro      910
hdfs://hadoop:8020/events/ymd=2016-03-22/h=00/...avro      1572
hdfs://hadoop:8020/events/ymd=2016-03-22/h=00/...avro      4884

That can come in handy if you just want to look what’s inside a specific file :

hive> SELECT COUNT(*) FROM avro_events
      WHERE INPUT__FILE__NAME = '...';

Because we are not crazy enough to query through Hive, let’s focus on querying the data through Impala to get blazing fast responses.

Impala

We can use Impala to query the avro table, but for performance reason, we are going to export it into a Parquet table afterwards. This step is mostly to be aware that it’s possible.

Query the Hive avro table

If right now, in Impala, you do:

> show tables;

You won’t see the Hive table yet. You need to make Impala aware it’s there:

> INVALIDATE METADATA avro_events;

When it’s done, you’ll be able to query it, with frozen data still. It’s the same problem that with Hive.

For Impala to see the latest data of the existing partitions, we can use REFRESH :

> REFRESH avro_events;

But that won’t discover the new partitions still (ie: if Flume just created a new hour partition).

We have to use the equivalent of Hive’s msck repair table, but for Impala:

> ALTER TABLE avro_events RECOVER PARTITIONS;

This RECOVER PARTITIONS will do what REFRESH does (see the latest data of the existing partitions), but will discover the new ones too. I don’t know the impact and process time on big tables with tons of partitions.

Query a Parquet table

Because we want to be taken seriously, we want to store our data into Parquet files to get fast queries. Parquet stores the data in columns rather in rows, supporting über-fast filtering because it doesn’t need to parse every rows.

First, we need to create a partitioned Impala table stored in Parquet :

CREATE TABLE avro_events_as_parquet (type STRING, ...)
PARTITIONED BY(ymd STRING, h INT)
STORED AS PARQUETFILE;

It doesn’t have to follow the same partitioning than the Hive table but for the sake of clarity, it does.

It’s empty, we are going to fill it with the avro table. To do that, we are going to base our partition logic on a “timestamp” column you should have in your data. We can’t retrieve the partition values from the Hive table avro folders name because it’s not queryable.

-- We ensure we're viewing the latest partitions of the Hive table
-- where Flume is sinking its content

ALTER TABLE avro_events RECOVER PARTITIONS;
REFRESH avro_events;

-- We insert the data overwriting the partitions we are going to
-- write into, to be sure we don't append stuff to existing (that
-- would do duplicates and wrong stats!)

INSERT OVERWRITE avro_events_as_parquet
PARTITION(ymd, h)
  SELECT type, ..., 
    FROM_UNIXTIME(FLOOR(`timestamp`/1000), 'yyyy-MM-dd') AS ymd,
    CAST(FROM_UNIXTIME(FLOOR(`timestamp`/1000), 'HH') AS INT) AS h
  FROM avro_events
  [ WHERE timestamp < $min AND timestamp > $max ];

-- We compute the stats of the new partitions
COMPUTE INCREMENTAL STATS avro_events_as_parquet;

We specify the partition values doing some transformation on our “timestamp” column (note: FROM_UNIXTIME uses the current locale).

The WHERE is not necessary the first time, you just want to load everything. Later, when you have a scheduled job every hour for instance, you want to filter which partition you want to write, thus the WHERE.

The end result is something like :

$ hdfs dfs -ls .../avro_events_as_parquet/ymd=2016–03–23/h=13
78.5 M avro_events_as_parquet/ymd=2016–03–23/h=13/644a2fbc8ad76fb2–2bd5c46c0a9a2cba_497985827_data.0.parq

A nice .parq file combining all the 5min-range avro files Flume created.

Now, because Flume is streaming, and because you want to query Impala without doing all the manual updates yourself, you’re planning a recurring Oozie job to take care of them.

Oozie

It’s quite straight-forward, it’s just an encapsulation of the scripts we just used.

First, we define a coordinator running every hour that will write the previous hour partition. (the coordinator could trigger the workflow every 5mins to have less lag in Impala if that’s necessary; the same Parquet partition would just be overwritten 12 times per hour with more and more data each time)

We take care of adding a small lag before the exact new hour, 12:00, and instead run the workflow at 12:05, to be sure Flume has flushed its data (the 5min is not random, it’s the same value as Flume rollInterval).

That means we can define a property in the coordinator.xml like that :


  partitionDate
  ${coord:dateOffset(coord:dateOffset(coord:nominalTime(), -1, 'HOUR'), -5, 'MINUTE')}

If the workflow runs at 12:05, then partitionDate=2016–03–31T11:00Z. The partition ymd=2016–03–31/h=11 will contain every data in [11:00, 12:00).

Then, in the worflow, we create a action where we pass this value :


    ${jobTracker}
    ${nameNode}

    ${insertOverwriteScript}
    ${partitionDate}
    ...

And the script uses it to create the WHERE condition we just talked about in the Impala part using some shell script transformations (because with what we did, we just have a plain TZ date, but we need 2 timestamps at least (min and max), or any other time value we could use from our data if we have them, such as “day”, “hour”).

I assume you know Oozie and what you do to not provide the full scripts in there.

Improvements

I think the stack could be simplified using a serializer to export into Parquet directly to HDFS, but that would still create a bunch of tiny parquet files (because we want to flush a lot) so we would still need to merge them automatically at the end. What do you think?

Conclusion

I hope I made myself clear and that the process makes sense.

Don’t forget we did that just because we didn’t want to use any database, but only HDFS to store our data, for some reasons. We wanted to be able to query quickly through Impala to have “almost-realtime” data (with a coordinator every 5min for instance that would do it).

Another solution would be to sink Flume into HBase then query over it, or create a Hive/Impala table on top.

Oh, this post is missing some gifs.

If you’re not me, come say hi on twitter :