Ingesting files with Apache Flume

Ingesting files with Apache Flume


Apache Flume is a scalable, high-volume data ingestion system that allows users to load streaming data into HDFS. Typical use cases for Flume include landing of application logs and other machine data in HDFS for further analysis.

Flume agent component diagram

There are three major Flume components that interact with each other in order to move data from the client (Data Source) to the final destination (e.g. HDFS):

  • The source is the entry point that receives all data related events (e.g. log entries). There are different types of sources like JMS Source, Avro Source, Spooling Directory Source etc.
  • When a source receives an event it stores it in one or more channels. The channel is a temporary store that keeps the event until it gets consumed by a sink. There are different types of channels – a memory channel simply stores the event in a memory queue. There are also durable channels that support message recovery and are backed up by a local file system.
  • Sinks remove events from a channel and put them into an external store. One type of a sink is the HDFS sink that writes files to HDFS.

Sources, sinks, and channels reside inside a Flume agent – a JVM instance running Flume that gets its settings from a configuration file.

I’ve been asked recently if it is possible to use Flume for file ingestion. The key requirement was to preserve the original file names so they can be easily identified in HDFS (by default Flume replaces the file names with a combination of an UUID and a timestamp to prevent name clashes).

In this tutorial I will quickly demonstrate how to configure a simple Flume agent that lifts files from a specific directory and writes them in HDFS. I am using BigInsights v.4.1 but you shouldn’t have problems following the steps with any other Hadoop distribution.

Setting up a Flume agent using Ambari

If you are using an ODP-compliant distribution like BigInsights you can setup everything using Ambari. If you want to configure the agent manually jump to the next section (Setting up a Flume agent manually).

Log in to Ambari and make sure your services are up and running. Click on the "Flume" service and then select the "Configure Agents" option.

Ambari Dashboard - Flume service

Expand the "flume.conf" section and copy and paste the following configuration after the "# Flume agent config" line:

# Define a source, a channel, and a sink
agent.sources = src1
agent.channels = chan1
agent.sinks = sink1

# Set the source type to Spooling Directory and set the directory
# location to /home/flume/ingestion/

agent.sources.src1.type = spooldir
agent.sources.src1.spoolDir = /home/flume/ingestion/
agent.sources.src1.basenameHeader = true

# Configure the channel as simple in-memory queue
agent.channels.chan1.type = memory
agent.channels.chan1.capacity = 1000

# Define the HDFS sink and set its path to your target HDFS directory
agent.sinks.sink1.type = hdfs
agent.sinks.sink1.hdfs.path = hdfs://big.example.com:8020/user/flume/stage
agent.sinks.sink1.hdfs.fileType = DataStream

# Disable rollover functionallity as we want to keep the original files
agent.sinks.sink1.rollCount = 0
agent.sinks.sink1.rollInterval = 0
agent.sinks.sink1.rollSize = 0
agent.sinks.sink1.idleTimeout = 0

# Set the files to their original name
agent.sinks.sink1.hdfs.filePrefix = %{basename}

# Connect source and sink
agent.sources.src1.channels = chan1
agent.sinks.sink1.channel = chan1

If you inspect the code above you will notice that it configures a single spooling directory source, connects it to an in-memory channel, and then links the channel to an HDFS sink. This configuration is pretty basic and the only options worth mentioning are basenameHeader (which adds a header storing the original file name) and filePrefix (which sets the stored header as the name of the HDFS file).

After you have the agent configuration in place click the Save button.

flume.conf contents

You will now have to restart flume by selecting "Restart" -> "Restart" affected.

Flume restart

Go back to the Summary tab and you should be able to see your newly defined agent up and running.

Summary - Flume agents

Setting up a Flume agent manually

If you already have set up the agent in Ambari skip this section and jump straight to Testing the agent.

Log in to the host that will be running the agent using SSH and create a flume-env.sh file in your flume-server/conf directory. You can do this by copying the existing .template file. In BigInsights v4 the conf directory is located in /usr/iop/current/flume-server.

[root@big ~]# cp /usr/iop/current/flume-server/conf/flume-env.sh.template /usr/iop/current/flume-server/conf/flume-env.sh
[root@big ~]#

Open the flume-env.sh file and set the JAVA_HOME variable to the location of your JDK. You should also uncomment the JAVA_OPTS setting.

JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.51-1.b16.el6_7.x86_64
JAVA_OPTS="-Xms100m -Xmx200m -Dcom.sun.management.jmxremote"

Create a new file named flume.conf in flume-server/conf and paste the agent configuration inside.

# Define a source, a channel, and a sink
agent.sources = src1
agent.channels = chan1
agent.sinks = sink1

# Set the source type to Spooling Directory and set the directory
# location to /home/flume/ingestion/

agent.sources.src1.type = spooldir
agent.sources.src1.spoolDir = /home/flume/ingestion/
agent.sources.src1.basenameHeader = true

# Configure the channel as simple in-memory queue
agent.channels.chan1.type = memory
agent.channels.chan1.capacity = 1000

# Define the HDFS sink and set its path to your target HDFS directory
agent.sinks.sink1.type = hdfs
agent.sinks.sink1.hdfs.path = hdfs://big.example.com:8020/user/flume/stage
agent.sinks.sink1.hdfs.fileType = DataStream

# Disable rollover functionallity as we want to keep the original files
agent.sinks.sink1.rollCount = 0
agent.sinks.sink1.rollInterval = 0
agent.sinks.sink1.rollSize = 0
agent.sinks.sink1.idleTimeout = 0

# Set the files to their original name
agent.sinks.sink1.hdfs.filePrefix = %{basename}

# Connect source and sink
agent.sources.src1.channels = chan1
agent.sinks.sink1.channel = chan1

Change to user flume and start the agent:

[root@big ~]# su - flume
[flume@big ~]$
[flume@big ~]$ flume-ng agent -c /usr/iop/current/flume-server/conf -f /usr/iop/current/flume-server/conf/flume.conf -n agent

If you encounter any problems with starting the agent you can inspect the /var/log/flume/flume.log file for additional details.

Testing the agent

Open another SSH session to the Flume host. Using the hdfs user create a target HDFS directory. I am using /user/flume/stage as stated in the agent configuration.

[root@big ~]# su - hdfs
[hdfs@big ~]$ hadoop fs -mkdir /user/flume
[hdfs@big ~]$ hadoop fs -mkdir /user/flume/stage
[hdfs@big ~]$ hadoop fs -chown -R flume:hadoop /user/flume

Now switch to flume and create the source directory.

[root@big ~]# su - flume
[flume@big ~]$ mkdir /home/flume/ingestion
[flume@big ~]$ cd /home/flume/ingestion

Generate a bunch of random text files inside the ingestion directory.

[flume@big ingestion]$ for i in {1..50}; do base64 /dev/urandom | head -c 1000 > file$i.txt; done
[flume@big ingestion]$

Flume will immediately start picking them up and you’ll see the files piling up in the target HDFS directory. Note that files with a .tmp extension are currently being processed.

[flume@big ~]$ hadoop fs -ls /user/flume/stage
Found 38 items
-rw-r--r--   1 flume hadoop        770 2015-09-30 02:51 /user/flume/stage/file1.txt.1443577914316
-rw-r--r--   1 flume hadoop        231 2015-09-30 02:51 /user/flume/stage/file1.txt.1443577914317.tmp
-rw-r--r--   1 flume hadoop        770 2015-09-30 02:51 /user/flume/stage/file10.txt.1443577914736
-rw-r--r--   1 flume hadoop        231 2015-09-30 02:51 /user/flume/stage/file10.txt.1443577914737.tmp
…
-rw-r--r--   1 flume hadoop        770 2015-09-30 02:51 /user/flume/stage/file25.txt.1443577918203
-rw-r--r--   1 flume hadoop        231 2015-09-30 02:51 /user/flume/stage/file25.txt.1443577918204.tmp
-rw-r--r--   1 flume hadoop        770 2015-09-30 02:51 /user/flume/stage/file26.txt.1443577918478
-rw-r--r--   1 flume hadoop        231 2015-09-30 02:51 /user/flume/stage/file26.txt.1443577918479.tmp
[flume@big ~]$

If you look back in the source directory you’ll notice that the files that have already been processed now have the .COMPLETED extension.

[flume@big ingestion]$ ls
file10.txt.COMPLETED  file16.txt.COMPLETED  file21.txt.COMPLETED  file27.txt           file32.txt  file38.txt  file43.txt  file49.txt  file8.txt
file11.txt.COMPLETED  file17.txt.COMPLETED  file22.txt.COMPLETED  file28.txt           file33.txt  file39.txt  file44.txt  file4.txt   file9.txt
file12.txt.COMPLETED  file18.txt.COMPLETED  file23.txt.COMPLETED  file29.txt           file34.txt  file3.txt   file45.txt  file50.txt
file13.txt.COMPLETED  file19.txt.COMPLETED  file24.txt.COMPLETED  file2.txt.COMPLETED  file35.txt  file40.txt  file46.txt  file5.txt
file14.txt.COMPLETED  file1.txt.COMPLETED   file25.txt.COMPLETED  file30.txt           file36.txt  file41.txt  file47.txt  file6.txt
file15.txt.COMPLETED  file20.txt.COMPLETED  file26.txt            file31.txt           file37.txt  file42.txt  file48.txt  file7.txt
[flume@big ingestion]$

Back in Ambari you can monitor various metrics for your channel.

Ambari - Channel metrics

This concludes our brief tutorial on ingesting files via Flume.