Saturday, June 6, 2009

HDFS Scribe Integration

It is finally here: you can configure the open source log-aggregator, scribe, to log data directly into the Hadoop distributed file system.

Many Web 2.0 companies have to deploy a bunch of costly filers to capture weblogs being generated by their application. Currently, there is no option other than a costly filer because the write-rate for this stream is huge. The Hadoop-Scribe integration allows this write-load to be distributed among a bunch of commodity machines, thus reducing the total cost of this infrastructure.

The challenge was to make HDFS be real-timeish in behaviour. Scribe uses libhdfs which is the C-interface to the HDFs client. There were various bugs in libhdfs that needed to be solved first. Then came the FileSystem API. One of the major issues was that the FileSystem API caches FileSystem handles and always returned the same FileSystem handle when called from multiple threads. There was no reference counting of the handle. This caused problems with scribe, because Scribe is highly multi-threaded. A new API FileSystem.newInstance() was introduced to support Scribe.

Making the HDFS write code path more real-time was painful. There are various timeouts/settings in HDFS that were hardcoded and needed to be changed to allow the application to fail fast. At the bottom of this blog-post, I am attaching the settings that we have currently configured to make the HDFS-write very real-timeish. The last of the JIRAS, HADOOP-2757 is in the pipeline to be committed to Hadoop trunk very soon.

What about Namenode being the single point of failure? This is acceptable in a warehouse type of application but cannot be tolerated by a realtime application. Scribe typically aggregates click-logs from a bunch of webservers, and losing *all* click log data of a website for a 10 minutes or so (minimum time for a namenode restart) cannot be tolerated. The solution is to configure two overlapping clusters on the same hardware. Run two separate namenodes N1 and N2 on two different machines. Run one set of datanode software on all slave machines that report to N1 and the other set of datanode software on the same set of slave machines that report to N2. The two datanode instances on a single slave machine share the same data directories. This configuration allows HDFS to be highly available for writes!

The highly-available-for-writes-HDFS configuration is also required for software upgrades on the cluster. We can shutdown one of the overlapping HDFS clusters, upgrade it to new hadoop software, and then put it back online before starting the same process for the second HDFS cluster.

What are the main changes to scribe that were needed? Scribe already had the feature that it buffers data when it is unable to write to the configured storage. The default scribe behaviour is to replay this buffer back to the storage when the storage is back online. Scribe is configured to support no-buffer-replay when the primary storage is back online. Scribe-hdfs is configured to write data to a cluster N1 and if N1 fails then it writes data to cluster N2. Scribe treats N1 and N2 as two equivalent primary stores. The scribe configuration should have fs_type=hdfs. For scribe compilation, you can use ./configure --enable-hdfs LDFLAGS="-ljvm -lhdfs". A good example for configuring scribe-hdfs is in a file called hdfs_example2.conf in the scribe code base.

Here are the settings for the Hadoop 0.17 configuration that is needed by an application doing writes in realtime:

ipc.client.idlethreshold
10000
Defines the threshold number of connections after which
connections will be inspected for idleness.
ipc.client.connection.maxidletime
10000
The maximum time in msec after which a client will bring down the
connection to the server.
ipc.client.connect.max.retries
2
Indicates the number of retries a client will make to establish
a server connection.
ipc.server.listen.queue.size
128
Indicates the length of the listen queue for servers accepting
client connections.
ipc.server.tcpnodelay
true
Turn on/off Nagle's algorithm for the TCP socket connection on
the server. Setting to true disables the algorithm and may decrease latency
with a cost of more/smaller packets.
ipc.client.tcpnodelay
true
Turn on/off Nagle's algorithm for the TCP socket connection on
the client. Setting to true disables the algorithm and may decrease latency
with a cost of more/smaller packets.
ipc.ping.interval
5000
The Client sends a ping message to server every period. This is helpful
to detect socket connections that were idle and have been terminated by a failed server.
ipc.client.connect.maxwaittime
5000
The Client waits for this much time for a socket connect call to be establised
with the server.
dfs.datanode.socket.write.timeout
20000
The dfs Client waits for this much time for a socket write call to the datanode.
ipc.client.ping
false
HADOOP-2757



29 comments:

  1. Very neat stuff, Dhruba. The overlapping clusters idea sounds very interesting. Is it going to be a supported way to have high-availability HDFS in the future? It seems simple enough, but I can imagine that changes in HDFS could break it. Also, how is metadata synced between the clusters?

    ReplyDelete
  2. Hi Dhruba, this post is great. There's only one thing I'm wondering, agiardullo wrote on his page that Hadoop version 0.21 is required, but you mentioned about version 0.17 of Hadoop in this post.
    I tried to compile the latest version of Scribe with both version 0.21 and 0.18 ( stable one ) of Hadoop, but failed at all.
    And the code of scribe still has some major bugs itself

    ReplyDelete
  3. @Matei: The overlapping HDFS cluster idea works only when one needs HA for new data writes. In our case, it is ok for scribe data that was present in the a HDFS cluster to be unavilable for a while, but it is imperative that new scribe data be not lost!

    ReplyDelete
  4. @ledzepvn: The Hadoop-Scribe integration will work only for Hadoop trunk (i.e. 0.21 and above). Please let me know what kind of failure you got while trying to compile scribe with hadoop 0.21.

    ReplyDelete
  5. @Dhruba : I saw your comment on my blog. you're right that the original feature of Scribe is more suitable if you need the data immediately.
    Currently I'm still afraid of the stability of hdfsappend function.

    ReplyDelete
  6. @ledzepvn: Thanks for your comments. I agree with you that hdfsappend is yet to be stable in the hadoop trunk. We are using hadoop 0.19 + a set of patches (HADOOP-2757, HADOOP-6099, HDFS-200 among others).

    ReplyDelete
  7. Hi Dhruba

    Would you be able to double check if any other patches besides (HADOOP-2757, HADOOP-6099, HDFS-200) would be required to get Scribe-HDFS working?

    ReplyDelete
  8. Hi chris, which version of hadoop are you using?

    ReplyDelete
  9. Hadoop 0.20. I'd prefer not to use 0.21 (trunk) yet. I am working on backporting those patches.

    ReplyDelete
  10. Hi Chris, I a using hadoop 0.19 and the following patch file

    http://www.borthakur.com/ftp/scribehdfs.txt

    All these patches have made it to hadoop trunk although I am not sure which ones are in 0.20 and which ones are not. Hope this helps.

    ReplyDelete
  11. Dhruba

    Thank you very much. I am going to go through this.

    ReplyDelete
  12. Dhruba:

    Very close! I managed to take your patch file, and get it all applied to 0.20. I also applied HADOOP-2757, HADOOP-6099, HDFS-200. One thing I noticed though was when Scribe is writing in append mode, I try to hadoop fs -cat on the file. I do not see new data. If I close Scribe, the reader can see it right away. I think this is related to HDFS-200. My question to you is, can you share what you patched for HDFS-200? Or tell me which files you used from ticket. I just applied fsyncConcurrentReaders12_20.txt. I feel there might be more work needed for that.

    ReplyDelete
  13. I make scribe roll logs very frequently. In our site, each scribe-hdfs server is configured to roll after 1 GB. This 1 GB is filled up in around 2 minutes or so. So , not able to read the data from the scribe file while it is being appended to has not been a problem for us.

    In your case, you can make scribe roll a file frequently or you can make the block size of the file smaller. Let me know if that works.

    ReplyDelete
  14. Then everything is working :)

    Thanks again for your work! If anyone comes across this blog, here are the patch sets I put together to make it all work for 0.20:

    http://github.com/lenn0x/Hadoop-Append/tree/master

    ReplyDelete
  15. Thanks a lot Chris for testing it out.

    ReplyDelete
  16. Hi Dhruba,
    You mentioned that you are running 2 overlapping Hadoop. I can understand that if C1 is dead then you can still write to C2 bevause of NN2 but:
    - If C1 is up, then Datanode1 will issue a write to NN1 but then DataNode2 will foud a block with no information on NN2. So is this assumption correct?
    --> if Yes, how do you solve this?
    --> How do you recover?
    --> Is this a standard Hadoop HA configuration for write HA pattern?

    Regarding Scribe,
    - Are you using any structured format from your application so you can directly load your data to an hive table using a column separator?
    - Is you log fuly structured?
    --> If yes do you have more information on this?
    --> If Not, how do you move from unstructured to structured record? Only using Hive UDF?
    - Are you keeping the raw log information around?

    And finally, is you "hadoop archive node" available from Hadoop SVN?

    Sorry for this long post but it's nice to have THE right person to ask ;-)
    thanks in advance,
    Louisia

    ReplyDelete
  17. @Louisia: thanks for your questions.

    Scribe uses HDFS files to store data. Scribe continues to write to the primary HDFS cluster C1. If the primary cluster fails (i.e. N1 is in accessible) then scribe starts writing data to C2. That means that scribe data is stored in two clusters. The two overlapping cluster approach is good for software upgrades too. If you want to upgrade the hadoop version of one cluster, you being that cluster down (scribe automatically starts writing to the other cluster), do the software upgrade and being it back online.

    The HA is actually two parts. The scribe part : this is handled by configuring the two HDFS file paths as the primary location and alternate location for writing data. If the primary location is unavailable, scribe automatically starts writing data to the alternate location.

    The second part of the HA solution is to load data into Hive from HDFS. It is ok for this part to be not very realtime, especially if one of C1 or C2 is inaccessible. A distcp-like process copies data from C1 and C2 into the production Hive warehouse (once every 5 minutes).

    Then we run another process called the "loader" that massages the raw data (is needed) and inserts appropriate metadata into the Hive metastore. The raw logs are not stored for 7 days and then deleted.

    The Hadoop archive node is actively beign used inhouse. It will be exposed as the HDFSWatcher via http://issues.apache.org/jira/browse/HDFS-220

    ReplyDelete
  18. Me again, with 2 more questions:

    What's the best way to integrate scribe and Hive?
    - What's the best file storage for Hive?
    - which store should I use for scribe?
    - how do I load/query scribe data from Hive?
    - Is your loader transform scribe files into standard sequence files? If yes how?

    ReplyDelete
  19. Scribe is used to log messages that are application specific and is transparent to the scribe framework. You can store scribe data in HDFS storage if you so desire. You can then use a software called the Loader (that invokes the Hive command "Load table") to make a table point to the newly created HDFS file. The Loader typically does some minimal transformation. The Loader might be part of the Hive distribution in future.

    ReplyDelete
  20. I am new in HDFS.How to do setup of HDFS in different machine??
    or we need only master machine that is having HDFS and other donot.can anybody clarify this.

    Thanks

    ReplyDelete
  21. Hi Piyush, This is the best place to start: http://wiki.apache.org/hadoop/GettingStartedWithHadoop

    ReplyDelete
  22. With scribe, can I invoke a process/program which "modify or format" my data before I store in the Hadoop dfs?

    ReplyDelete
  23. scribe (by itself) has no such ability to invoke a process/program before storing the data in hdfs.

    ReplyDelete
  24. Thanks Dhruba!

    I have another question, I have a bunch of tiny data coming in from our front end and I want to store it in the hdfs via scribe. Does that mean each tiny data will be a separate file in the hdfs or is there a smarter way to aggregate or append the data toether in scribe so I can store it as one file in hdfs?

    ReplyDelete
  25. Can I use the buffer store to buffer all the tiny piece of data. Then flush it to hdfs as one file, is this possible. I'm thinking whether I could use the buffer store as a way to aggregate my data.

    Thanks again!

    ReplyDelete
  26. Hey,

    I read the ticket HDFS-220 but seemingly the code for DFSCron never made it into a releasable form?

    I guess that you need to run a cronjob to move all data from C2NN's meta into C1NN's meta? Otherwise C1NN would never know about the files in C2 AFAICT.

    Is that correct?

    Thanks!

    ReplyDelete
  27. @lazyboy: you would probably have to change some scribe code to integrarte HDFS directly into the scribe BufferStore.

    @Anonymous: HDFS-220 has not (yet:-)) got committed into any releasable version of Apache Hadoop. Yes, you can run a cron job to move the data from one cluster to another.

    ReplyDelete
  28. @Dhurba,

    Thanks you have put some very relevant information here. We run scribe in PROD and off late we are seeing an issue w.r.t higher timeouts which have a repel effect by sending TRY_LATER to upstream agents since a big timeout of > 1 sec results in the buffer getting full.

    To counter this we did some code reading and after figuring that a new FileSystem object is created for each new file [thereby avoiding excluded nodes added due to aggressive timeouts being retried again] brought down
    dfs.datanode.socket.write.timeout = 5 sec
    dfs.socket.timeout = 5 sec

    and since the timeout equation for read as in code is -> dfs.socket.timeout + 3 * num_nodes_pipeline
    and dfs.datanode.socket.write.timeout = dfs.datanode.socket.write.timeout + 5 * num_nodes_pipeline

    now the write timeout's seem to have gone away however read timeouts at DFSClient have become a little aggressive.
    On second though i am wondering if the read timeout should always be greater than the write timeout. You haven't mentioned anything related to the same in the above configuration. Any thoughts/pointers on the same would be very useful.



    - Inder

    ReplyDelete
  29. Hi Everybody,

    May you please help me:

    i wanna move data from a local directory(or any external source) to hdfs every hour.

    so far i'v tried using oozie shell action but i am still getting some errors, may anyone show me the easy way to do that.

    ReplyDelete