本文共 1571 字,大约阅读时间需要 5 分钟。
首先因为Hive的储存是基于HDFS的,所以目标等同于,flume监控文件并上传HDFS上
Hive建表
create table test(name String,gender String)row format delimited fields terminated by ',';
Flume配置文件如下
监控文件 /usr/local/hive.log
上传路径 hdfs://hadoop:9000/user/hive/warehouse/driver.db/test
a3.sources = r3a3.sinks = k3a3.channels = c3# Describe/configure the sourcea3.sources.r3.type = execa3.sources.r3.command = tail -F /usr/local/hive.log# Describe the sinka3.sinks.k3.type = hdfsa3.sinks.k3.hdfs.path = hdfs://hadoop:9000/user/hive/warehouse/driver.db/test#上传文件的前缀a3.sinks.k3.hdfs.filePrefix = upload-#是否按照时间滚动文件夹a3.sinks.k3.hdfs.round = true#多少时间单位创建一个新的文件夹a3.sinks.k3.hdfs.roundValue = 1#重新定义时间单位a3.sinks.k3.hdfs.roundUnit = hour#是否使用本地时间戳a3.sinks.k3.hdfs.useLocalTimeStamp = true#积攒多少个Event才flush到HDFS一次a3.sinks.k3.hdfs.batchSize = 100#设置文件类型,可支持压缩a3.sinks.k3.hdfs.fileType = DataStream#多久生成一个新的文件a3.sinks.k3.hdfs.rollInterval = 30#设置每个文件的滚动大小大概是128Ma3.sinks.k3.hdfs.rollSize = 134217700#文件的滚动与Event数量无关a3.sinks.k3.hdfs.rollCount = 0# Use a channel which buffers events in memorya3.channels.c3.type = memorya3.channels.c3.capacity = 1000a3.channels.c3.transactionCapacity = 100# Bind the source and sink to the channela3.sources.r3.channels = c3a3.sinks.k3.channel = c3
启动Flume 监控
flume-ng agent --name a3 --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/flume-dir-hdfs.conf -Dflume.root.logger=INFO,console
向文件中写入数据
[root@hadoop local]# echo lisi,nan >> hive.log[root@hadoop local]# echo wangwu,nan >> hive.log[root@hadoop local]# echo xiaoming,nan >> hive.log[root@hadoop local]# echo xiaohong,nv >> hive.log
转载地址:http://kaazi.baihongyu.com/