From Apache Flume to Apache Impala using HDFS

March 31st, 2016 | cloudera, flume, hadoop, hbase, hdfs, impala |

Let’s use HDFS as a database !

So, we have data coming from one of our service, which is the source of a flume agent. Now, we want to be able to query them in a scalable fashion without using hbase or any other database, to be lean.

One way is to use HDFS as a database (Flume has a HDFS sink that handle partitioning), create a Hive table on top to query its content, and because we want something performant and fast, to actually use Impala to query the data using a Apache Parquet format.

Here’s a little diagram of the stack :

flume to impala

  • Apache Oozie is used to regularly export the HDFS content to a parquet format.
  • We are storing our data into HDFS in an Apache Avro format (Snappy compressed) because of all its advantages and because we are already using it everywhere.

Let’s review the stack one piece at a time, starting with the Flume configuration.

Flume

Let’s say we have a avro source where our custom service is sending events :

agent1.sources.events_source.type = avro
agent1.sources.events_source.bind = 0.0.0.0
agent1.sources.events_source.port = 9876

Let’s first configure the Flume HDFS sink where we are going to export our events, the configuration is pretty long but every piece has its importance :

agent1.sinks = ... s

agent1.sinks.s.type = hdfs
agent1.sinks.s.hdfs.path = /user/flume/events/ymd=%Y-%m-%d/h=%H
agent1.sinks.s.hdfs.inUsePrefix = .
agent1.sinks.s.hdfs.fileType = DataStream
agent1.sinks.s.hdfs.filePrefix = events
agent1.sinks.s.hdfs.fileSuffix = .avro
agent1.sinks.s.hdfs.rollInterval = 300
agent1.sinks.s.hdfs.rollSize = 0
agent1.sinks.s.hdfs.rollCount = 0
agent1.sinks.s.serializer = com.company.CustomAvroSerializer$Builder
agent1.sinks.s.channel = events_channel

Let’s review this config.

Partition

Because of our volume of data, we want to partition them per year-month-day then by hour. The “ymd=” and “h=” in the path are important, it represents the “column” name of the time dimension that will be queryable later.

Note that your Flume source must have a “timestamp” in the header for Flume to know what is the time dimension. If you don’t have this info, you can simply add hdfs.useLocalTimeStamp = true to use the ingestion time, but it’s discouraged because that means you don’t have any timestamp column in your data, and you’re going to get stuck later when doing some Impala partitioning.

Roll interval

We decide to roll a new file every 5min, and not based on size nor count (they have to be explicitely set to 0 because they have another default value).

By default, Flume buffers into a .tmp file we can’t rely on, and because we want to access the fresh data quickly, 5min is a good start. This is going to generate a bunch of file (144 per day), but we don’t care because we are going to export them later into a hourly parquet format and clean up the old HDFS content.

Moreover, if Flume suddenly dies, you are going to lose maximum 5min of data, instead of the whole buffer. Stopping properly Flume flushes the buffer hopefully.

File name

The inUsePrefix to “.” is to hide the working files to Hive during a query (it ignores the aka hidden files). If you don’t, some MapReduces can fail because at first, Hive saw a file Flume was buffering into (a .tmp), then the time to execute the MR, it was not there anymore (because of some flush), and kaboom, the MR will fail :

Caused by: java.io.FileNotFoundException: File does not exist: hdfs://hadoop01:8020/user/flume/events/ymd=2016–03–17/h=15/events.1458223202146.avro.tmp

File type

By default, the filetype is SequenceFile. We don’t want that, because that makes Flume convert the output stream to a SequenceFile that Hive will not be able to read because the avro schema won’t be inside. Setting it to DataStream let the data sent unaltered.

FYI, a typical SequenceFile body :

SEQ♠!org.apache.hadoop.io.LongWritable”org.apache.hadoop.io.BytesWritable ▒c(g▒s▒▒►▒▒▒|TF..

A snappy compressed avro file body :

Obj♦avro.schema▒8{"type":"record","name":"Order",...}avro.codecsnappy ▒▒T▒▒▒▒♣

Avro serializer

Our custom serializer is doing some conversion of the original event, and simply emits some avro using a DataFileWriter with the snappyCodec:

DatumWriter writer = new GenericDatumWriter<>(schema);
dataFileWriter = new DataFileWriter<>(writer)
                    .setCodec(CodecFactory.snappyCodec())
                    .create(schema, out);
              

Multiple Flume ?

Be careful if multiple Flume are writing to the same location, the buffer is not shareable and you could have name conflicts. By default, Flume named the file with a timestamp (in milliseconds). That’s fine most of the time but you never know if they are going to collide one day. Consider having two different configurations with a different prefix or suffix to the filename.

Performance consideration

Don’t forget to monitor your Flume when adding a HDFS sink. The overhead is noticeable; there are a lot more I/O threads (by default it’s 10 but I noticed way more threads with VisualVM), and the CPU slightly increases.

Our Flume is now properly sinking to HDFS, let’s check it out.

HDFS

First thing is to verify the data are correct and readable.

We check the partitioning is present :

$ hdfs dfs -ls /user/flume/events/ymd=2016-01-29
/user/flume/events/ymd=2016-01-29/h=00
/user/flume/events/ymd=2016-01-29/h=01
...

Then we check a file to see if it’s parsable :

$ hdfs dfs -cat /user/flume/events/ymd=2016–03–14/h=15/events.1458147890641.avro
Objavro.schema�8{“type”:”record”,”name”:”OrderRecord”,”namespace”:”com.company.avro”,”fields”:[...

Our schema is there, it’s not a SequenceFile, good! Let’s use the avro tools to deserialize the content. It’s simply a .jar with some useful functions (getschema, tojson), downloadable here.

$ curl -sLO http://apache.crihan.fr/dist/avro/avro-1.7.7/java/avro-tools-1.7.7.jar
$ java -jar avro-tools-1.7.7.jar getschema events.1458147890641.avro
{
  "type" : "record",
  "name" : "Order",
  "namespace" : "com.company",
  "fields" : [ {
    "name" : "type",
    "type" : "string"
  }, {
...
$ java -jar avro-tools-1.7.7.jar tojson logs.1458147890641.avro
{"type":"AD12","customer_id":2458189, ...
{"type":"AD12","customer_id":9515711, ...

Our HDFS is in place and get streamed new data. Let’s now configure Hive to create a table on top of the files.

Hive

Avro is hopefully standard in Hadoop and Hive has everything to read avro files to create a table.

AvroSerDe

The magic happens when using the AvroSerDe (Avro Serializer Deserializer). It is used to read avro files to create a table, and vice-versa, to create avro files from a table (with some INSERT). It also detects and uncompresses the files when compressed with Snappy.

Under the hood, it’s simply using DataFile[Writer|Reader] code to read/write avro content.

https://cwiki.apache.org/confluence/display/Hive/AvroSerDe

Create the table

We create a Hive external table mapped onto our .avro files using AvroSerDe and specifying a avro schema to read the data (it will probably be the same that the schema used to write the data BUT it could be different, such as only a slice):

CREATE EXTERNAL TABLE avro_events
PARTITIONED BY (ymd STRING, h INT)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION '/user/flume/events/'
TBLPROPERTIES ('avro.schema.literal' = '
{
 "type" : "record",
 "name" : "Order",
 "namespace" : "com.company",
 "fields" : [ {
   "name" : "type",
   "type" : "string"
 } ]
}');

The PARTITIONED BY matches our structure and names :

$ hdfs dfs -ls /user/flume/events/ymd=2016–03–17/h=12 
/user/flume/events/ymd=2016–03–17/h=12/events.1458212422229.avro
/user/flume/events/ymd=2016–03–17/h=12/events.1458212456756.avro
...

It’s possible to externalize the schema into its proper file on HDFS and use it :

TBLPROPERTIES ('avro.schema.url' = '/user/flume/Order.avsc');

You can generate the schema if you don’t have it (if it’s generated on the fly by the code with some Scala macros for instance):

$ java -jar avro-tools-1.7.7.jar getschema events.avro > Order.avsc
$ hdfs dfs -put Order.avsc /user/flume/

ERR: Long schemas

If you have a looong schema, therefore a long query, you could end up with this error:

FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. MetaException(message:javax.jdo.JDODataStoreException: Put request failed : INSERT INTO “TABLE_PARAMS” (“PARAM_VALUE”,”TBL_ID”,”PARAM_KEY”) VALUES (?,?,?)
org.datanucleus.store.rdbms.exceptions.MappedDatastoreException: INSERT INTO “TABLE_PARAMS” (“PARAM_VALUE”,”TBL_ID”,”PARAM_KEY”) VALUES (?,?,?)
Caused by: org.postgresql.util.PSQLException: ERROR: value too long for type character varying(4000)

You must use an external schema in this case, your query is too long.

If you’re using a UI like Hue to do the query, and notice some weird things, use the hive shell, the error will be properly displayed, Hue won’t.

ERR: Partition and field names conflicts

If you have a column in your data which the name is the same as the partition dimensions name (ymd and h in our case), you are going to get this error :

FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. org.apache.hadoop.hive.ql.metadata.HiveException: Partition column name hour conflicts with table columns.

You need to change the partition column name you are using.

Notify Hive of the new partitions

Our data are in HDFS, our Hive table is mapped onto it, good. But Hive need to discover the data now, it’s not automatic because we are short-circuiting it (we are inserting data without any Hive INSERT).

msck repair table to the rescue. It simply looks in the folder to discover new directories and add them to the metadata.

hive> msck repair table avro_events;
OK
Partitions not in metastore: avro_events:ymd=2016–03–17/hour=12
Repair: Added partition to metastore avro_events:ymd=2016–03–17/h=12
Time taken: 2.339 seconds, Fetched: 1 row(s)

Hopefully, we are not going to do that manually each time we need, we are going to use Oozie later in the process, and directly via Impala.

Drop partitions

If you’re playing a bit and saw some “Partitions missing from filesystem” when doing a msck repair, you can remove the partitions (metadata) manually :

hive> alter table avro_events DROP IF EXISTS PARTITION (ymd='2016–03–21') PURGE;
Dropped the partition ymd=2016–03–21/h=13
Dropped the partition ymd=2016–03–21/h=14
Dropped the partition ymd=2016–03–21/h=15
Dropped the partition ymd=2016–03–21/h=16
Dropped the partition ymd=2016–03–21/h=17
Dropped the partition ymd=2016–03–21/h=18
Dropped the partition ymd=2016–03–21/h=19
Dropped the partition ymd=2016–03–21/h=20
Dropped the partition ymd=2016–03–21/h=21
Dropped the partition ymd=2016–03–21/h=22
Dropped the partition ymd=2016–03–21/h=23

Querying

You can already query the Hive table to get proper records.

For instance, if you have a column “timestamp”, it’s nice to check the latest ingested data time :

select max(`timestamp`) from avro_events;

Hive has some magic column we can used to get more insights about the table content. To know how many records each partition contains, we can use the virtual column INPUT__FILE__NAME :

hive> select INPUT__FILE__NAME, COUNT(*)
FROM events
GROUP BY INPUT__FILE__NAME
ORDER BY INPUT__FILE__NAME;
hdfs://hadoop:8020/events/ymd=2016-03-22/h=00/...avro      910
hdfs://hadoop:8020/events/ymd=2016-03-22/h=00/...avro      1572
hdfs://hadoop:8020/events/ymd=2016-03-22/h=00/...avro      4884

That can come in handy if you just want to look what’s inside a specific file :

hive> SELECT COUNT(*) FROM avro_events
      WHERE INPUT__FILE__NAME = '...';

Because we are not crazy enough to query through Hive, let’s focus on querying the data through Impala to get blazing fast responses.

Impala

We can use Impala to query the avro table, but for performance reason, we are going to export it into a Parquet table afterwards. This step is mostly to be aware that it’s possible.

Query the Hive avro table

If right now, in Impala, you do:

> show tables;

You won’t see the Hive table yet. You need to make Impala aware it’s there:

> INVALIDATE METADATA avro_events;

When it’s done, you’ll be able to query it, with frozen data still. It’s the same problem that with Hive.

For Impala to see the latest data of the existing partitions, we can use REFRESH :

> REFRESH avro_events;

But that won’t discover the new partitions still (ie: if Flume just created a new hour partition).

We have to use the equivalent of Hive’s msck repair table, but for Impala:

> ALTER TABLE avro_events RECOVER PARTITIONS;

This RECOVER PARTITIONS will do what REFRESH does (see the latest data of the existing partitions), but will discover the new ones too. I don’t know the impact and process time on big tables with tons of partitions.

Query a Parquet table

Because we want to be taken seriously, we want to store our data into Parquet files to get fast queries. Parquet stores the data in columns rather in rows, supporting über-fast filtering because it doesn’t need to parse every rows.

First, we need to create a partitioned Impala table stored in Parquet :

CREATE TABLE avro_events_as_parquet (type STRING, ...)
PARTITIONED BY(ymd STRING, h INT)
STORED AS PARQUETFILE;

It doesn’t have to follow the same partitioning than the Hive table but for the sake of clarity, it does.

It’s empty, we are going to fill it with the avro table. To do that, we are going to base our partition logic on a “timestamp” column you should have in your data. We can’t retrieve the partition values from the Hive table avro folders name because it’s not queryable.

-- We ensure we're viewing the latest partitions of the Hive table
-- where Flume is sinking its content

ALTER TABLE avro_events RECOVER PARTITIONS;
REFRESH avro_events;

-- We insert the data overwriting the partitions we are going to
-- write into, to be sure we don't append stuff to existing (that
-- would do duplicates and wrong stats!)

INSERT OVERWRITE avro_events_as_parquet
PARTITION(ymd, h)
  SELECT type, ..., 
    FROM_UNIXTIME(FLOOR(`timestamp`/1000), 'yyyy-MM-dd') AS ymd,
    CAST(FROM_UNIXTIME(FLOOR(`timestamp`/1000), 'HH') AS INT) AS h
  FROM avro_events
  [ WHERE timestamp < $min AND timestamp > $max ];

-- We compute the stats of the new partitions
COMPUTE INCREMENTAL STATS avro_events_as_parquet;

We specify the partition values doing some transformation on our “timestamp” column (note: FROM_UNIXTIME uses the current locale).

The WHERE is not necessary the first time, you just want to load everything. Later, when you have a scheduled job every hour for instance, you want to filter which partition you want to write, thus the WHERE.

The end result is something like :

$ hdfs dfs -ls .../avro_events_as_parquet/ymd=2016–03–23/h=13
78.5 M avro_events_as_parquet/ymd=2016–03–23/h=13/644a2fbc8ad76fb2–2bd5c46c0a9a2cba_497985827_data.0.parq

A nice .parq file combining all the 5min-range avro files Flume created.

Now, because Flume is streaming, and because you want to query Impala without doing all the manual updates yourself, you’re planning a recurring Oozie job to take care of them.

Oozie

It’s quite straight-forward, it’s just an encapsulation of the scripts we just used.

First, we define a coordinator running every hour that will write the previous hour partition. (the coordinator could trigger the workflow every 5mins to have less lag in Impala if that’s necessary; the same Parquet partition would just be overwritten 12 times per hour with more and more data each time)

We take care of adding a small lag before the exact new hour, 12:00, and instead run the workflow at 12:05, to be sure Flume has flushed its data (the 5min is not random, it’s the same value as Flume rollInterval).

That means we can define a property in the coordinator.xml like that :


  partitionDate
  ${coord:dateOffset(coord:dateOffset(coord:nominalTime(), -1, 'HOUR'), -5, 'MINUTE')}

If the workflow runs at 12:05, then partitionDate=2016–03–31T11:00Z. The partition ymd=2016–03–31/h=11 will contain every data in [11:00, 12:00).

Then, in the worflow, we create a action where we pass this value :


    ${jobTracker}
    ${nameNode}

    ${insertOverwriteScript}
    ${partitionDate}
    ...

And the script uses it to create the WHERE condition we just talked about in the Impala part using some shell script transformations (because with what we did, we just have a plain TZ date, but we need 2 timestamps at least (min and max), or any other time value we could use from our data if we have them, such as “day”, “hour”).

I assume you know Oozie and what you do to not provide the full scripts in there.

Improvements

I think the stack could be simplified using a serializer to export into Parquet directly to HDFS, but that would still create a bunch of tiny parquet files (because we want to flush a lot) so we would still need to merge them automatically at the end. What do you think?

Conclusion

I hope I made myself clear and that the process makes sense.

Don’t forget we did that just because we didn’t want to use any database, but only HDFS to store our data, for some reasons. We wanted to be able to query quickly through Impala to have “almost-realtime” data (with a coordinator every 5min for instance that would do it).

Another solution would be to sink Flume into HBase then query over it, or create a Hive/Impala table on top.

Oh, this post is missing some gifs.

If you’re not me, come say hi on twitter :

HBase merge and split impact in HDFS

December 24th, 2015 | cloudera, hadoop, hbase, hdfs |

Why merge ?

I had a table with a lot of regions (almost a thousand), more than I wanted, and more than I should, according to the HBase book.
The max size of the HFile hbase.hregion.max.filesize was 10G.
I raised it to 20G and recompact the whole table, thinking I’m done. But nothing happened.
Why ? Because HBase does not merge regions automatically.

Compaction is used to merge Storefiles in a same HStore (one HStore per column family, per region).
A region that exists, is going to exist forever. Except if we delete it or merge it manually.

I then decided to merge some regions, first to give a try, and second to see the impact in HDFS, because I’m curious.
If you are wondering too, you’re in the good place, keep reading. It’s not complex.

I’ll first do a merge without raising hbase.hregion.max.filesize, to see what is happening.
Then I’ll raise the max and do another merge, and check the differences.

HBase version : 1.0.0-cdh5.4.7
Hadoop Version : 2.6.0-cdh5.4.7

Merge still at 10G max

First, you need to find 2 consecutive regions to merge together.
The consecutivity is important, you can merge region that are not consecutive but it’s not recommanded (overlapping).
e.g. : If you have 2 regions which start/end keys are 0-9 and 9-A, you want to create a region which start/end keys are 0-A.

In HDFS, there is no order, it’s all guids. To know to what they correspond, one way is to go to the HBase admin and select the table.
That will display every of its regions name, uuid, start/end keys.

http://hadoopmaster:60010/table.jsp?name=my_table
# or :16010 if recent

A region name is something like :

my_table,0115d0b6f99f58a34...2a9e72781c7,1440840915183.fbb00c100422d1cc0f9b7e39d6c6bd91.
# signification :
[table],[start key],[timestamp].[encoded ID]

The encoded ID is what we are interested into. This is the folder in HDFS (/hbase/data/default/my_table/fbb00c100422d1cc0f9b7e39d6c6bd91) where the data of this region are.

Let’s merge it with its following one.

hbase> merge_region 'fbb00c100422d1cc0f9b7e39d6c6bd91', ‘a12acd303c0b7e512c8926666c5f02eb'

That creates a new region 65bd... that contains a HFile which size is growing slowly, as we can see in HDFS :
(here is a diff from before and after the merge_region)

>          0 2015-12-23 12:24 .../my_table/65bd82b5477fcc2090804c351d89700a
>        226 2015-12-23 12:24 .../my_table/65bd82b5477fcc2090804c351d89700a/.regioninfo
>          0 2015-12-23 12:24 .../my_table/65bd82b5477fcc2090804c351d89700a/.tmp
> 2684354560 2015-12-23 12:24 .../my_table/65bd82b5477fcc2090804c351d89700a/.tmp/752530e58ae8478d812696b066edcc9f
>          0 2015-12-23 12:24 .../my_table/65bd82b5477fcc2090804c351d89700a/recovered.edits
>          0 2015-12-23 12:24 .../my_table/65bd82b5477fcc2090804c351d89700a/recovered.edits/2206186528.seqid
>          0 2015-12-23 12:24 .../my_table/65bd82b5477fcc2090804c351d89700a/t
>        109 2015-12-23 12:24 .../my_table/65bd82b5477fcc2090804c351d89700a/t/ccd883e710664f1fbf605590deaf2868.a12acd303c0b7e512c8926666c5f02eb
>        109 2015-12-23 12:24 .../my_table/65bd82b5477fcc2090804c351d89700a/t/e17b4ea9b9fa47c1839999426ef9ffe7.fbb00c100422d1cc0f9b7e39d6c6bd91

<          0 2015-12-23 12:13 .../my_table/a12acd303c0b7e512c8926666c5f02eb/recovered.edits
<          0 2015-12-23 12:13 .../my_table/a12acd303c0b7e512c8926666c5f02eb/recovered.edits/2198106631.seqid
---
>          0 2015-12-23 12:24 .../my_table/a12acd303c0b7e512c8926666c5f02eb/recovered.edits
>          0 2015-12-23 12:24 .../my_table/a12acd303c0b7e512c8926666c5f02eb/recovered.edits/2198106637.seqid

<          0 2015-12-23 11:45 .../my_table/fbb00c100422d1cc0f9b7e39d6c6bd91
---
>          0 2015-12-23 12:24 .../my_table/fbb00c100422d1cc0f9b7e39d6c6bd91
>          0 2015-12-23 12:24 .../my_table/fbb00c100422d1cc0f9b7e39d6c6bd91/.merges

<          0 2015-12-23 12:13 .../my_table/fbb00c100422d1cc0f9b7e39d6c6bd91/recovered.edits
<          0 2015-12-23 12:13 .../my_table/fbb00c100422d1cc0f9b7e39d6c6bd91/recovered.edits/2206186546.seqid
---
>          0 2015-12-23 12:24 .../my_table/fbb00c100422d1cc0f9b7e39d6c6bd91/recovered.edits
>          0 2015-12-23 12:24 .../my_table/fbb00c100422d1cc0f9b7e39d6c6bd91/recovered.edits/2206186549.seqid

What we can see :
new region in the folder 65b... with a HFile in .tmp (2.7GB, growing) and a .regioninfo (very important, that’s the metadata to identify what is this region)
– new empty folder .merges in one of the region we are merging
recovered.edits folders. Don’t mind them, I won’t display them anymore. For more info, check this nice Cloudera blog post to know more about it.

After a few minutes, it was done, the HFile grew up to 17GB (which was over the limit of 10GB).
HBase started the reverse process : it split the big region I just made ! :-(

>           0 2015-12-23 13:05 .../my_table/2c142664dc0929d7c6cc5fa6fe3b4e40
>         226 2015-12-23 13:05 .../my_table/2c142664dc0929d7c6cc5fa6fe3b4e40/.regioninfo
>           0 2015-12-23 13:05 .../my_table/2c142664dc0929d7c6cc5fa6fe3b4e40/t
>         109 2015-12-23 13:05 .../my_table/2c142664dc0929d7c6cc5fa6fe3b4e40/t/752530e58ae8478d812696b066edcc9f.65bd82b5477fcc2090804c351d89700a

>           0 2015-12-23 13:05 .../my_table/65bd82b5477fcc2090804c351d89700a
>         226 2015-12-23 12:24 .../my_table/65bd82b5477fcc2090804c351d89700a/.regioninfo
>           0 2015-12-23 13:05 .../my_table/65bd82b5477fcc2090804c351d89700a/.splits
>           0 2015-12-23 13:05 .../my_table/65bd82b5477fcc2090804c351d89700a/.tmp
>           0 2015-12-23 13:05 .../my_table/65bd82b5477fcc2090804c351d89700a/t
> 17860937303 2015-12-23 13:05 .../my_table/65bd82b5477fcc2090804c351d89700a/t/752530e58ae8478d812696b066edcc9f

>           0 2015-12-23 13:05 .../my_table/743bfa035be56bf412d00803abe433b8
>         226 2015-12-23 13:05 .../my_table/743bfa035be56bf412d00803abe433b8/.regioninfo
>           0 2015-12-23 13:05 .../my_table/743bfa035be56bf412d00803abe433b8/.tmp
>   134217728 2015-12-23 13:05 .../my_table/743bfa035be56bf412d00803abe433b8/.tmp/e377603958894f8ca1ec598112b95bf4
>           0 2015-12-23 13:05 .../my_table/743bfa035be56bf412d00803abe433b8/t
>         109 2015-12-23 13:05 .../my_table/743bfa035be56bf412d00803abe433b8/t/752530e58ae8478d812696b066edcc9f.65bd82b5477fcc2090804c351d89700a

<           0 2015-12-23 11:45 .../my_table/a12acd303c0b7e512c8926666c5f02eb

<           0 2015-12-23 11:45 .../my_table/fbb00c100422d1cc0f9b7e39d6c6bd91

– the two old regions are removed (a12... and fbb...)
– the split region has a .splits folder
– 2 new regions appeared : 2c1... and 743...
– only one of these 2 regions have a HFile that is slowly growing (meaning: sequential process)

Meanwhile, in the logs…

// 2 new regions from a SPLIT
2015-12-23 13:05:32,817 INFO org.apache.hadoop.hbase.master.RegionStates: Transition null to {2c142664dc0929d7c6cc5fa6fe3b4e40 state=SPLITTING_NEW, ts=1450872332817, server=hadoopslave04,60020,1450869198826}
2015-12-23 13:05:32,817 INFO org.apache.hadoop.hbase.master.RegionStates: Transition null to {743bfa035be56bf412d00803abe433b8 state=SPLITTING_NEW, ts=1450872332817, server=hadoopslave04,60020,1450869198826}

// the region we are splitting was OPEN
// it goes to SPLITTING then SPLIT, and is set offline for the time being
2015-12-23 13:05:32,817 INFO org.apache.hadoop.hbase.master.RegionStates: Transition {65bd82b5477fcc2090804c351d89700a state=OPEN, ts=1450869854560, server=hadoopslave04,60020,1450869198826} to {65bd82b5477fcc2090804c351d89700a state=SPLITTING, ts=1450872332817, server=hadoopslave04,60020,1450869198826}
2015-12-23 13:05:34,767 INFO org.apache.hadoop.hbase.master.RegionStates: Transition {65bd82b5477fcc2090804c351d89700a state=SPLITTING, ts=1450872334767, server=hadoopslave04,60020,1450869198826} to {65bd82b5477fcc2090804c351d89700a state=SPLIT, ts=1450872334767, server=hadoopslave04,60020,1450869198826}
2015-12-23 13:05:34,767 INFO org.apache.hadoop.hbase.master.RegionStates: Offlined 65bd82b5477fcc2090804c351d89700a from hadoopslave04,60020,1450869198826

// both 2 new regions switch from SPLITTING_NEW to OPEN
2015-12-23 13:05:34,767 INFO org.apache.hadoop.hbase.master.RegionStates: Transition {2c142664dc0929d7c6cc5fa6fe3b4e40 state=SPLITTING_NEW, ts=1450872334767, server=hadoopslave04,60020,1450869198826} to {2c142664dc0929d7c6cc5fa6fe3b4e40 state=OPEN, ts=1450872334767, server=hadoopslave04,60020,1450869198826}
2015-12-23 13:05:34,767 INFO org.apache.hadoop.hbase.master.RegionStates: Transition {743bfa035be56bf412d00803abe433b8 state=SPLITTING_NEW, ts=1450872334767, server=hadoopslave04,60020,1450869198826} to {743bfa035be56bf412d00803abe433b8 state=OPEN, ts=1450872334767, server=hadoopslave04,60020,1450869198826}

// daughter a and b = new regions with start keys; the parent being the split region
2015-12-23 13:05:34,873 INFO org.apache.hadoop.hbase.master.AssignmentManager: Handled SPLIT event; parent=my_table,fe7f...,1450869853820.65bd82b5477fcc2090804c351d89700a., daughter a=my_table,fe7f...,1450872332556.2c142664dc0929d7c6cc5fa6fe3b4e40., daughter b=my_table,feff7...,1450872332556.743bfa035be56bf412d00803abe433b8., on hadoopslave04,60020,1450869198826

// then the split region saw it reference in the metadata (zookeeper) removed
2015-12-23 13:08:28,965 INFO org.apache.hadoop.hbase.MetaTableAccessor: Deleted references in merged region my_table,fe7f...,1450869853820.65bd82b5477fcc2090804c351d89700a., qualifier=mergeA and qualifier=mergeB

Back to HDFS

After a while, HBase is done with the daughter b region 743b..., and starts creating the daughter a region 2c14....

>           0 2015-12-23 13:25 .../my_table/2c142664dc0929d7c6cc5fa6fe3b4e40
>         226 2015-12-23 13:05 .../my_table/2c142664dc0929d7c6cc5fa6fe3b4e40/.regioninfo
>           0 2015-12-23 13:41 .../my_table/2c142664dc0929d7c6cc5fa6fe3b4e40/.tmp
>           0 2015-12-23 13:41 .../my_table/2c142664dc0929d7c6cc5fa6fe3b4e40/t
>  8732040437 2015-12-23 13:41 .../my_table/2c142664dc0929d7c6cc5fa6fe3b4e40/t/2388513b0d55429888478924914af494

>           0 2015-12-23 13:05 .../my_table/65bd82b5477fcc2090804c351d89700a
>         226 2015-12-23 12:24 .../my_table/65bd82b5477fcc2090804c351d89700a/.regioninfo
>           0 2015-12-23 13:05 .../my_table/65bd82b5477fcc2090804c351d89700a/.splits
>           0 2015-12-23 13:05 .../my_table/65bd82b5477fcc2090804c351d89700a/.tmp
>           0 2015-12-23 13:05 .../my_table/65bd82b5477fcc2090804c351d89700a/t
> 17860937303 2015-12-23 13:05 .../my_table/65bd82b5477fcc2090804c351d89700a/t/752530e58ae8478d812696b066edcc9f

>           0 2015-12-23 13:05 .../my_table/743bfa035be56bf412d00803abe433b8
>         226 2015-12-23 13:05 .../my_table/743bfa035be56bf412d00803abe433b8/.regioninfo
>           0 2015-12-23 13:25 .../my_table/743bfa035be56bf412d00803abe433b8/.tmp
>           0 2015-12-23 13:25 .../my_table/743bfa035be56bf412d00803abe433b8/t
>  8733203481 2015-12-23 13:25 .../my_table/743bfa035be56bf412d00803abe433b8/t/e377603958894f8ca1ec598112b95bf4

It’s done. The region has been successfully split.
After a few minutes, the big region 65bd... will be removed automatically.

2015-12-23 13:43:28,908 INFO org.apache.hadoop.hbase.MetaTableAccessor: Deleted my_table,fe7f...,1450869853820.65bd82b5477fcc2090804c351d89700a.
2015-12-23 13:43:28,908 INFO org.apache.hadoop.hbase.master.CatalogJanitor: Scanned 722 catalog row(s), gc'd 0 unreferenced merged region(s) and 1 unreferenced parent region(s)

Note: if we do the difference between the sum of the daughter region sizes and the big one, we get a delta of +395MB (the single HFile is bigger).

We’ve successfully merged 2 regions into one, that was automatically split into two. Hurray !

Raising hbase.hregion.max.filesize to avoid splitting

Now, let’s change hbase.hregion.max.filesize in Cloudera to 20G and merge again to get a big one, without split.

We apply the same process than before and merge manually the 2 regions we got previously 2c14... and 743b....
That creates a new region 1e64... which size is surprisingly lesser than our previous merge (we only get a delta of 212KB) that is not going to be split.

$ hdfs dfs -ls -R /hbase/data/default/my_table/1e64aa6f3f5cf067f6d5339230ef6db7
-rw-r--r--   3 hbase hbase         226 2015-12-23 13:45 /hbase/data/default/my_table/1e64aa6f3f5cf067f6d5339230ef6db7/.regioninfo
drwxr-xr-x   - hbase hbase           0 2015-12-23 14:12 /hbase/data/default/my_table/1e64aa6f3f5cf067f6d5339230ef6db7/.tmp
drwxr-xr-x   - hbase hbase           0 2015-12-23 13:48 /hbase/data/default/my_table/1e64aa6f3f5cf067f6d5339230ef6db7/recovered.edits
-rw-r--r--   3 hbase hbase           0 2015-12-23 13:48 /hbase/data/default/my_table/1e64aa6f3f5cf067f6d5339230ef6db7/recovered.edits/2206186536.seqid
drwxr-xr-x   - hbase hbase           0 2015-12-23 14:12 /hbase/data/default/my_table/1e64aa6f3f5cf067f6d5339230ef6db7/t
-rw-r--r--   3 hbase hbase 17465031518 2015-12-23 14:12 /hbase/data/default/my_table/1e64aa6f3f5cf067f6d5339230ef6db7/t/d1109c52de404b0c9d07e2e9c7fdeb5e

So that worked, I have one less region ! Let’s continue why the thousands more I have.

Why should I know this ?

Knowing what’s going on in HDFS with HBase is important when you are facing issues and errors in HBase table structure.
To know if you have that kind of issues, just give a try with :

$ hbase hbck my_table

If you see some ERRORS such as :
– No HDFS region dir found
– Region not deployed on any region server.
– Region found in META, but not in HDFS or deployed on any region server.
– First region should start with an empty key. You need to create a new region and regioninfo in HDFS to plug the hole.
– You need to create a new .regioninfo and region dir in hdfs to plug the hole.
– ERROR: Found lingering reference file hdfs://…

That’s going to be a bit useful. (and maybe complicated for you, that depends of the problem!)

That can happen quite easily unfortunately (I successfully ran into those issue just with a merge_region, not sure why exactly).