数据采集
数据采集:日志文件 => Flume => HDFS => ODS
选择Flume1.8+作为采集日志数据的工具:
- 提供了一个非常好用的 Taildir Source
- 使用该source,可以监控多个目录,对目录中新写入的数据进行实时采集
taildir Source的特点:
- 使用正则表达式匹配目录中的文件名
- 监控的文件中,一旦有数据写入,Flume就会将信息写入到指定的Sink
- 高可靠,不会丢失数据
- 不会对跟踪文件有任何处理,不会重命名也不会删除
- 不支持Windows,不能读二进制文件。支持按行读取文本文件
采集启动日志和事件日志
本系统中要采集两种日志:启动日志、事件日志,不同的日志放置在不同的目录下。
要想一次拿到全部日志需要监控多个目录。
总体思路
1、taildir监控多个目录
2、修改自定义拦截器,不同来源的数据加上不同标志
3、hdfs sink 根据标志写文件
-
agent的配置 /data/lagoudw/conf/flume-log2hdfs3.conf
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # taildir source a1.sources.r1.type = TAILDIR a1.sources.r1.positionFile =/data/lagoudw/conf/startlog_position.json a1.sources.r1.filegroups = f1 f2 a1.sources.r1.filegroups.f1 = /data/lagoudw/logs/middle/start/.*log a1.sources.r1.headers.f1.logtype = start a1.sources.r1.filegroups.f2 = /data/lagoudw/logs/middle/event/.*log a1.sources.r1.headers.f2.logtype = event a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.lagou.dw.flume.interceptor.LogTypeInterceptor$Builder # memorychannel a1.channels.c1.type = memory a1.channels.c1.capacity = 100000 a1.channels.c1.transactionCapacity = 2000 # hdfs sink a1.sinks.k1.type = hdfs # %{xxx} 取headers对应字段的值 a1.sinks.k1.hdfs.path = /user/data/logs/%{logtype}/dt=%{logtime} a1.sinks.k1.hdfs.filePrefix = %{logtype}log # DataStream will not compress output file and please don’t set codeC a1.sinks.k1.hdfs.fileType = DataStream
-
自定义拦截器
- 依赖
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>1.9.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.1.23</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
- 拦截器代码
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.google.common.base.Charsets; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.event.SimpleEvent; import org.apache.flume.interceptor.Interceptor; import org.junit.Test; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class LogTypeInterceptor implements Interceptor { @Override public void initialize() { } @Override public Event intercept(Event event) { // 1、获取 event 的 header Map<String, String> headers = event.getHeaders(); // 2、获取 event 的 body String eventBody = new String(event.getBody(), Charsets.UTF_8); // 3、解析body获取json串 //使用空格分割,无视空格的个数 String[] bodyArr = eventBody.split("\\s+"); try { String jsonStr = bodyArr[6]; JSONObject jsonObject = JSON.parseObject(jsonStr); // 4、解析json串获取时间戳 String timeStampStr = ""; if (headers.getOrDefault("logtype", "start").equalsIgnoreCase("start")) { timeStampStr = jsonObject.getJSONObject("app_active").getString("time"); } else { JSONArray jsonArray = jsonObject.getJSONArray("lagou_event"); JSONObject parseObject = JSON.parseObject(jsonArray.get(0).toString()); timeStampStr = parseObject.getString("time"); } // 5、将时间戳转换为字符串 "yyyy-MM-dd" long timeStamp = Long.parseLong(timeStampStr); DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); Instant instant = Instant.ofEpochMilli(timeStamp); LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault()); String date = dateTimeFormatter.format(localDateTime); // 6、将转换后的字符串放置header中 headers.put("logtime", date); event.setHeaders(headers); } catch (Throwable e) { // e.printStackTrace(); headers.put("logtime", "Unknown"); event.setHeaders(headers); } // 7、返回event return event; } @Override public List<Event> intercept(List<Event> events) { List<Event> eventList = new ArrayList<>(); events.forEach(event -> { Event intercepted = intercept(event); if (intercepted != null) { eventList.add(intercepted); } }); return eventList; } @Override public void close() { } public static class Builder implements Interceptor.Builder { @Override public Interceptor build() { return new LogTypeInterceptor(); } @Override public void configure(Context context) { } } @Test public void testStartJunit() { // test how it work String str = "2020-08-20 11:56:08.703 [main] INFO com.lagou.ecommerce.AppStart - {\"app_active\":{\"name\":\"app_active\",\"json\":{\"entry\":\"2\",\"action\":\"0\",\"error_code\":\"0\"},\"time\":1595329188129},\"attr\":{\"area\":\"泉州\",\"uid\":\"2F10092A9990\",\"app_v\":\"1.1.17\",\"event_type\":\"common\",\"device_id\":\"1FB872-9A1009990\",\"os_type\":\"0.18\",\"channel\":\"VS\",\"language\":\"chinese\",\"brand\":\"xiaomi-5\"}}"; // new event Map<String, String> map = new HashMap<>(); map.put("logtype", "start"); SimpleEvent simpleEvent = new SimpleEvent(); simpleEvent.setHeaders(map); simpleEvent.setBody(str.getBytes(Charsets.UTF_8)); // interceptor LogTypeInterceptor customInterceptor = new LogTypeInterceptor(); Event intercepted = customInterceptor.intercept(simpleEvent); // result System.out.println(intercepted.getHeaders()); } @Test public void testEventJunit() { // test how it work String str = "2020-08-20 12:00:58.786 [main] INFO com.lagou.ecommerce.AppEvent - {\"lagou_event\":[{\"name\":\"goods_detail_loading\",\"json\":{\"entry\":\"1\",\"goodsid\":\"0\",\"loading_time\":\"71\",\"action\":\"4\",\"staytime\":\"45\",\"showtype\":\"5\"},\"time\":1595313626535}],\"attr\":{\"area\":\"盘锦\",\"uid\":\"2F10092A2996\",\"app_v\":\"1.1.18\",\"event_type\":\"common\",\"device_id\":\"1FB872-9A1002996\",\"os_type\":\"8.6\",\"channel\":\"IP\",\"language\":\"chinese\",\"brand\":\"xiaomi-8\"}}\n"; // new event Map<String, String> map = new HashMap<>(); map.put("logtype", "event"); SimpleEvent simpleEvent = new SimpleEvent(); simpleEvent.setHeaders(map); simpleEvent.setBody(str.getBytes(Charsets.UTF_8)); // interceptor LogTypeInterceptor customInterceptor = new LogTypeInterceptor(); Event intercepted = customInterceptor.intercept(simpleEvent); // result System.out.println(intercepted.getHeaders()); } }
-
jar包准备 使用maven的package打包,并上传后缀为jar-with-dependencies的包到 $FLUME_HOME/lib
也可以采用软链接的方式
ln -s /data/lagoudw/jars/FlumeTimestampInterceptor.jar /opt/lagou/servers/flume-1.9.0/lib/FlumeTimestampInterceptor.jar
- 依赖
-
运行flume实时采集数据
flume-ng agent --conf-file /data/lagoudw/conf/flume-log2hdfs3.conf\ -name a1 -Dflume.root.logger=INFO,console
没有带上flume的配置文件可能会抛出GC的异常,需要修改flume的配置文件
vim $FLUME_HOME/conf/flume-env.sh # 最大最小都改成1000M,减少内存抖动带来的性能影响 export JAVA_OPTS="-Xms1000m -Xmx1000m -Dcom.sun.management.jmxremote"
启动的时候带上系统配置参数–conf
flume-ng agent --conf /opt/lagou/servers/flume-1.9.0/conf/ \ --conf-file /data/lagoudw/conf/flume-log2hdfs3.conf -name a1 \ -Dflume.root.logger=INFO,console
生产环境使用下面的指令启动
nohup flume-ng agent --conf /opt/lagou/servers/flume-1.9.0/conf/ \ --conf-file /data/lagoudw/conf/flume-log2hdfs3.conf -name a1 \ -Dflume.root.logger=INFO,LOGFILE > /dev/null 2>&1 & # /dev/null,代表linux的空设备文件,所有往这个文件里面写入的内容都会丢失,俗称黑洞 # 2>&1 错误输出将会和标准输出输出到同一个地方 # >/dev/null 2>&1 不会输出任何信息到控制台,也不会有任何信息输出到文件中
json数据处理
数据文件中每行必须是一个完整的 json 串,一个 json串 不能跨越多行。 Hive 处理json数据总体来说有三个办法:
- 使用内建的函数get_json_object
- json_tuple 使用自定义的UDF
- 第三方的SerDe
使用内建函数处理
-
get_json_object get_json_object(string json_string, string path)
返回值:String
说明:解析json字符串json_string,返回path指定的内容;如果输入的json字符串 无效,那么返回NUll;函数每次只能返回一个数据项; -
json_tuple json_tuple(jsonStr, k1, k2, …)
返回值:所有的输入参数、输出参数都是String;
说明:参数为一组键k1,k2,。。。。。和json字符串,返回值的元组。该方法比get_json_object高效,因此可以在一次调用中输入多个键; -
示例代码
-- 数据格式 -- user4;17;male;{"id": 4,"ids": [401,402,403,304],"total_number": 5} -- 内建函数处理json数据 CREATE TABLE IF NOT EXISTS jsont1( username string, age int, sex string, json string ) row format delimited fields terminated by ';'; load data local inpath '/data/lagoudw/data/weibo.json' overwrite into table jsont1; select * from jsont1; -- get single select username, age, sex, GET_JSON_OBJECT(json, "$.id") id, GET_JSON_OBJECT(json, "$.ids") ids, GET_JSON_OBJECT(json, "$.total_number") total_number from jsont1; -- get array element select username, age, sex, GET_JSON_OBJECT(json, "$.id") id, GET_JSON_OBJECT(json, "$.ids[0]") id1, GET_JSON_OBJECT(json, "$.ids[1]") id2, GET_JSON_OBJECT(json, "$.ids[2]") id3, GET_JSON_OBJECT(json, "$.ids[3]") id4, GET_JSON_OBJECT(json, "$.total_number") total_number from jsont1; -- get with json_tuple select json_tuple(json, 'id', 'ids', 'total_number') from jsont1; -- UDTF's are not supported outside the SELECT clause, nor nested in expressions select username, age, sex, json_tuple(json, 'id', 'ids', 'total_number') from jsont1; -- 需要使用lateral view select REGEXP_REPLACE("[101,102,103]", "\\[|\\]",""); select split(REGEXP_REPLACE("[101,102,103]", "\\[|\\]",""), ","); select explode(split(REGEXP_REPLACE("[101,102,103]", "\\[|\\]",""), ",")); select username, age, sex, id, ids, num from jsont1 lateral view json_tuple(json, 'id', 'ids', 'total_number') t1 as id, ids, num; with tmp as( select username, age, sex, id, ids, num from jsont1 lateral view json_tuple(json, 'id', 'ids', 'total_number') t1 as id, ids, num ) select username, age, sex, id, ids1, num from tmp lateral view explode(split(REGEXP_REPLACE(ids, "\\[|\\]",""), ",")) t1 as ids1; -- json_tuple 优点是一次可以解析多个json字段,对嵌套结果的解析操作复杂;
使用UDF处理
自定义UDF处理json串中的数组。自定义UDF函数: 输入:json串、数组的key
输出:字符串数组
- pom依赖
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.1.23</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>2.3.7</version> <scope>provided</scope> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
- java代码
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import org.apache.hadoop.hive.ql.exec.UDF; import org.apache.parquet.Strings; import org.junit.Test; import java.util.ArrayList; public class ParseJsonArray extends UDF { public ArrayList<String> evaluate(String jsonStr, String arrKey){ if (Strings.isNullOrEmpty(jsonStr)) { return null; } try { JSONObject jsonObject = JSON.parseObject(jsonStr); JSONArray jsonArray = jsonObject.getJSONArray(arrKey); ArrayList<String> results = new ArrayList<>(); for (Object o : jsonArray) { results.add(o.toString()); } return results; } catch (Throwable t) { return null; } } @Test public void testParse() { String jsonStr = "{\"id\": 2,\"ids\": [201,202,203,204],\"total_number\": 4}"; String key = "ids"; ArrayList<String> arrayList = evaluate(jsonStr, key); System.out.println(JSON.toJSONString(arrayList)); } }
- 使用自定义UDF
-- 使用自定义 UDF 函数 -- 添加开发的jar包 add jar /data/lagoudw/jars/HiveUDF.jar; -- 加载临时函数 create temporary function lagou_json_array as "com.lagou.dw.hive.udf.ParseJsonArray"; -- 执行查询 select username, age, sex, lagou_json_array(json, 'ids') ids from jsont1; -- 解析json串中的数组,并展开 select username, age, sex, ids1 from jsont1 lateral view explode(lagou_json_array(json, 'ids')) t1 as ids1; -- 解析json串中的id、num select username, age, sex, id, num from jsont1 lateral view json_tuple(json, 'id', 'total_number') t1 as id, num; -- 解析json串中的数组,并展开 select username, age, sex, id, num, ids1 from jsont1 lateral view json_tuple(json, 'id', 'total_number') t1 as id, num lateral view explode(lagou_json_array(json, 'ids')) t1 as ids1;
使用SerDe处理
SerDe 是Serializer 和 Deserializer 的简写形式。Hive使用Serde进行行对象的序列 与反序列化。最后实现把文件内容映射到 hive 表中的字段数据类型。SerDe包括 Serialize/Deserilize 两个功能:
- Serialize把Hive使用的java object转换成能写入HDFS字节序列,或者其他系统 能识别的流文件
-
Deserilize把字符串或者二进制流转换成Hive能识别的java object对象
- Read : HDFS files => InputFileFormat => <key, value> => Deserializer => Row object
- Write : Row object => Seriallizer => <key, value> => OutputFileFormat => HDFS files
https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide#DeveloperGuide-HiveSerDe
-
JsonSerDe 数据文件,必须是一行一个json
{"id": 1,"ids": [101,102,103],"total_number": 3} {"id": 2,"ids": [201,202,203,204],"total_number": 4} {"id": 3,"ids": [301,302,303,304,305],"total_number": 5} {"id": 4,"ids": [401,402,403,304],"total_number": 5} {"id": 5,"ids": [501,502,503],"total_number": 3}
创建表的时候指定序列化类
create table jsont2( id int, ids array<string>, total_number int) ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'; load data local inpath '/data/lagoudw/data/json2.dat' into table jsont2;