数据采集及清洗

数据采集

数据采集:日志文件 => Flume => HDFS => ODS

选择Flume1.8+作为采集日志数据的工具:

  • 提供了一个非常好用的 Taildir Source
  • 使用该source,可以监控多个目录,对目录中新写入的数据进行实时采集

taildir Source的特点:

  • 使用正则表达式匹配目录中的文件名
  • 监控的文件中,一旦有数据写入,Flume就会将信息写入到指定的Sink
  • 高可靠,不会丢失数据
  • 不会对跟踪文件有任何处理,不会重命名也不会删除
  • 不支持Windows,不能读二进制文件。支持按行读取文本文件

采集启动日志和事件日志

本系统中要采集两种日志:启动日志、事件日志,不同的日志放置在不同的目录下。
要想一次拿到全部日志需要监控多个目录。

总体思路
1、taildir监控多个目录
2、修改自定义拦截器,不同来源的数据加上不同标志
3、hdfs sink 根据标志写文件

  • agent的配置 /data/lagoudw/conf/flume-log2hdfs3.conf

    a1.sources = r1  
    a1.sinks = k1  
    a1.channels = c1  
        
    # taildir source  
    a1.sources.r1.type = TAILDIR  
    a1.sources.r1.positionFile =/data/lagoudw/conf/startlog_position.json  
    a1.sources.r1.filegroups = f1 f2  
    a1.sources.r1.filegroups.f1 = /data/lagoudw/logs/middle/start/.*log  
    a1.sources.r1.headers.f1.logtype = start  
    a1.sources.r1.filegroups.f2 = /data/lagoudw/logs/middle/event/.*log  
    a1.sources.r1.headers.f2.logtype = event  
    a1.sources.r1.interceptors = i1  
    a1.sources.r1.interceptors.i1.type = com.lagou.dw.flume.interceptor.LogTypeInterceptor$Builder  
        
    # memorychannel  
    a1.channels.c1.type = memory  
    a1.channels.c1.capacity = 100000  
    a1.channels.c1.transactionCapacity = 2000  
        
    # hdfs sink  
    a1.sinks.k1.type = hdfs  
    # %{xxx} 取headers对应字段的值  
    a1.sinks.k1.hdfs.path = /user/data/logs/%{logtype}/dt=%{logtime}  
    a1.sinks.k1.hdfs.filePrefix = %{logtype}log  
    # DataStream will not compress output file and please don’t set codeC  
    a1.sinks.k1.hdfs.fileType = DataStream  
    
  • 自定义拦截器

    • 依赖
      <dependencies>  
          <dependency>  
              <groupId>junit</groupId>  
              <artifactId>junit</artifactId>  
              <version>4.12</version>  
              <scope>provided</scope>  
          </dependency>  
         <dependency>  
      	    <groupId>org.apache.flume</groupId>  
      	    <artifactId>flume-ng-core</artifactId>  
      	    <version>1.9.0</version>  
      	    <scope>provided</scope>  
      	</dependency>  
      	<dependency>  
      	    <groupId>com.alibaba</groupId>  
      	    <artifactId>fastjson</artifactId>  
      	    <version>1.1.23</version>  
      	</dependency>  
      </dependencies>  
      	    
      	    
      <build>  
          <plugins>  
              <plugin>  
                  <artifactId>maven-compiler-plugin</artifactId>  
                  <version>2.3.2</version>  
                  <configuration>  
                      <source>1.8</source>  
                      <target>1.8</target>  
                  </configuration>  
              </plugin>  
              <plugin>  
                  <artifactId>maven-assembly-plugin</artifactId>  
                  <configuration>  
                      <descriptorRefs>  
                          <descriptorRef>jar-with-dependencies</descriptorRef>  
                      </descriptorRefs>  
                  </configuration>  
                  <executions>  
                      <execution>  
                          <id>make-assembly</id>  
                          <phase>package</phase>  
                          <goals>  
                              <goal>single</goal>  
                          </goals>  
                      </execution>  
                  </executions>  
              </plugin>  
          </plugins>  
      </build>  
      
    • 拦截器代码
      import com.alibaba.fastjson.JSON;  
      import com.alibaba.fastjson.JSONArray;  
      import com.alibaba.fastjson.JSONObject;  
      import com.google.common.base.Charsets;  
      import org.apache.flume.Context;  
      import org.apache.flume.Event;  
      import org.apache.flume.event.SimpleEvent;  
      import org.apache.flume.interceptor.Interceptor;  
      import org.junit.Test;  
      	    
      import java.time.Instant;  
      import java.time.LocalDateTime;  
      import java.time.ZoneId;  
      import java.time.format.DateTimeFormatter;  
      import java.util.ArrayList;  
      import java.util.HashMap;  
      import java.util.List;  
      import java.util.Map;  
      	    
      public class LogTypeInterceptor implements Interceptor {  
          @Override  
          public void initialize() {  
      	    
          }  
      	    
          @Override  
          public Event intercept(Event event) {  
      //        1、获取 event 的 header  
              Map<String, String> headers = event.getHeaders();  
      //        2、获取 event 的 body  
              String eventBody = new String(event.getBody(), Charsets.UTF_8);  
      //        3、解析body获取json串  
              //使用空格分割,无视空格的个数  
              String[] bodyArr = eventBody.split("\\s+");  
              try {  
                  String jsonStr = bodyArr[6];  
                  JSONObject jsonObject = JSON.parseObject(jsonStr);  
      	    
                  //        4、解析json串获取时间戳  
                  String timeStampStr = "";  
                  if (headers.getOrDefault("logtype", "start").equalsIgnoreCase("start")) {  
                      timeStampStr = jsonObject.getJSONObject("app_active").getString("time");  
                  } else {  
                      JSONArray jsonArray = jsonObject.getJSONArray("lagou_event");  
                      JSONObject parseObject = JSON.parseObject(jsonArray.get(0).toString());  
                      timeStampStr = parseObject.getString("time");  
                  }  
      	    
                  //        5、将时间戳转换为字符串 "yyyy-MM-dd"  
                  long timeStamp = Long.parseLong(timeStampStr);  
                  DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");  
                  Instant instant = Instant.ofEpochMilli(timeStamp);  
                  LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());  
                  String date = dateTimeFormatter.format(localDateTime);  
      	    
                  //        6、将转换后的字符串放置header中  
                  headers.put("logtime", date);  
                  event.setHeaders(headers);  
      	    
              } catch (Throwable e) {  
      //            e.printStackTrace();  
                  headers.put("logtime", "Unknown");  
                  event.setHeaders(headers);  
              }  
      	    
              //        7、返回event  
              return event;  
          }  
      	    
          @Override  
          public List<Event> intercept(List<Event> events) {  
              List<Event> eventList = new ArrayList<>();  
      	    
              events.forEach(event -> {  
                  Event intercepted = intercept(event);  
                  if (intercepted != null) {  
                      eventList.add(intercepted);  
                  }  
              });  
      	    
              return eventList;  
          }  
      	    
          @Override  
          public void close() {  
      	    
          }  
      	    
          public static class Builder implements Interceptor.Builder {  
      	    
              @Override  
              public Interceptor build() {  
      	    
                  return new LogTypeInterceptor();  
              }  
      	    
              @Override  
              public void configure(Context context) {  
      	    
              }  
          }  
      	    
          @Test  
          public void testStartJunit() {  
              // test how it work  
              String str = "2020-08-20 11:56:08.703 [main] INFO  com.lagou.ecommerce.AppStart - {\"app_active\":{\"name\":\"app_active\",\"json\":{\"entry\":\"2\",\"action\":\"0\",\"error_code\":\"0\"},\"time\":1595329188129},\"attr\":{\"area\":\"泉州\",\"uid\":\"2F10092A9990\",\"app_v\":\"1.1.17\",\"event_type\":\"common\",\"device_id\":\"1FB872-9A1009990\",\"os_type\":\"0.18\",\"channel\":\"VS\",\"language\":\"chinese\",\"brand\":\"xiaomi-5\"}}";  
              // new event  
              Map<String, String> map = new HashMap<>();  
              map.put("logtype", "start");  
              SimpleEvent simpleEvent = new SimpleEvent();  
              simpleEvent.setHeaders(map);  
              simpleEvent.setBody(str.getBytes(Charsets.UTF_8));  
      	    
              // interceptor  
              LogTypeInterceptor customInterceptor = new LogTypeInterceptor();  
              Event intercepted = customInterceptor.intercept(simpleEvent);  
              // result  
              System.out.println(intercepted.getHeaders());  
          }  
      	    
          @Test  
          public void testEventJunit() {  
              // test how it work  
              String str = "2020-08-20 12:00:58.786 [main] INFO  com.lagou.ecommerce.AppEvent - {\"lagou_event\":[{\"name\":\"goods_detail_loading\",\"json\":{\"entry\":\"1\",\"goodsid\":\"0\",\"loading_time\":\"71\",\"action\":\"4\",\"staytime\":\"45\",\"showtype\":\"5\"},\"time\":1595313626535}],\"attr\":{\"area\":\"盘锦\",\"uid\":\"2F10092A2996\",\"app_v\":\"1.1.18\",\"event_type\":\"common\",\"device_id\":\"1FB872-9A1002996\",\"os_type\":\"8.6\",\"channel\":\"IP\",\"language\":\"chinese\",\"brand\":\"xiaomi-8\"}}\n";  
              // new event  
              Map<String, String> map = new HashMap<>();  
              map.put("logtype", "event");  
              SimpleEvent simpleEvent = new SimpleEvent();  
              simpleEvent.setHeaders(map);  
              simpleEvent.setBody(str.getBytes(Charsets.UTF_8));  
      	    
              // interceptor  
              LogTypeInterceptor customInterceptor = new LogTypeInterceptor();  
              Event intercepted = customInterceptor.intercept(simpleEvent);  
              // result  
              System.out.println(intercepted.getHeaders());  
          }  
      }  
      	    
      
    • jar包准备 使用maven的package打包,并上传后缀为jar-with-dependencies的包到 $FLUME_HOME/lib

      也可以采用软链接的方式

      ln -s /data/lagoudw/jars/FlumeTimestampInterceptor.jar /opt/lagou/servers/flume-1.9.0/lib/FlumeTimestampInterceptor.jar  
      
  • 运行flume实时采集数据

    flume-ng agent --conf-file /data/lagoudw/conf/flume-log2hdfs3.conf\  
    -name a1 -Dflume.root.logger=INFO,console  
    

    没有带上flume的配置文件可能会抛出GC的异常,需要修改flume的配置文件

    vim $FLUME_HOME/conf/flume-env.sh   
        
    # 最大最小都改成1000M,减少内存抖动带来的性能影响  
    export JAVA_OPTS="-Xms1000m -Xmx1000m -Dcom.sun.management.jmxremote"  
    

    启动的时候带上系统配置参数–conf

    flume-ng agent --conf /opt/lagou/servers/flume-1.9.0/conf/ \  
    --conf-file /data/lagoudw/conf/flume-log2hdfs3.conf -name a1 \  
    -Dflume.root.logger=INFO,console  
    

    生产环境使用下面的指令启动

    nohup flume-ng agent --conf /opt/lagou/servers/flume-1.9.0/conf/ \  
    --conf-file /data/lagoudw/conf/flume-log2hdfs3.conf -name a1 \  
    -Dflume.root.logger=INFO,LOGFILE > /dev/null 2>&1 &  
        
    # /dev/null,代表linux的空设备文件,所有往这个文件里面写入的内容都会丢失,俗称黑洞  
    # 2>&1 错误输出将会和标准输出输出到同一个地方  
    # >/dev/null 2>&1 不会输出任何信息到控制台,也不会有任何信息输出到文件中  
    

json数据处理

数据文件中每行必须是一个完整的 json 串,一个 json串 不能跨越多行。 Hive 处理json数据总体来说有三个办法:

  • 使用内建的函数get_json_object
  • json_tuple 使用自定义的UDF
  • 第三方的SerDe

使用内建函数处理

  • get_json_object get_json_object(string json_string, string path)
    返回值:String
    说明:解析json字符串json_string,返回path指定的内容;如果输入的json字符串 无效,那么返回NUll;函数每次只能返回一个数据项;

  • json_tuple json_tuple(jsonStr, k1, k2, …)
    返回值:所有的输入参数、输出参数都是String;
    说明:参数为一组键k1,k2,。。。。。和json字符串,返回值的元组。该方法比get_json_object高效,因此可以在一次调用中输入多个键;

  • 示例代码

    -- 数据格式  
    -- user4;17;male;{"id": 4,"ids": [401,402,403,304],"total_number": 5}  
        
    -- 内建函数处理json数据  
    CREATE TABLE IF NOT EXISTS jsont1(  
    username string,  
    age int,  
    sex string,  
    json string  
    )  
    row format delimited fields terminated by ';';  
        
    load data local inpath '/data/lagoudw/data/weibo.json' overwrite into table jsont1;  
        
    select * from jsont1;  
        
    -- get single  
    select username, age, sex, GET_JSON_OBJECT(json, "$.id") id,  
    GET_JSON_OBJECT(json, "$.ids") ids,  
    GET_JSON_OBJECT(json, "$.total_number") total_number  
    from jsont1;  
        
    -- get array element  
    select username, age, sex, GET_JSON_OBJECT(json, "$.id") id,  
    GET_JSON_OBJECT(json, "$.ids[0]") id1,  
    GET_JSON_OBJECT(json, "$.ids[1]") id2,  
    GET_JSON_OBJECT(json, "$.ids[2]") id3,  
    GET_JSON_OBJECT(json, "$.ids[3]") id4,  
    GET_JSON_OBJECT(json, "$.total_number") total_number  
    from jsont1;  
        
    -- get with json_tuple  
    select json_tuple(json, 'id', 'ids', 'total_number') from jsont1;  
        
    --  UDTF's are not supported outside the SELECT clause, nor nested in expressions  
    select username, age, sex, json_tuple(json, 'id', 'ids', 'total_number') from jsont1;  
        
    -- 需要使用lateral view  
    select REGEXP_REPLACE("[101,102,103]", "\\[|\\]","");  
        
    select split(REGEXP_REPLACE("[101,102,103]", "\\[|\\]",""), ",");  
    select explode(split(REGEXP_REPLACE("[101,102,103]", "\\[|\\]",""), ","));  
        
    select username, age, sex, id, ids, num  
    from jsont1  
    lateral view json_tuple(json, 'id', 'ids', 'total_number') t1   
    as id, ids, num;  
        
    with tmp as(  
    select username, age, sex, id, ids, num  
    from jsont1  
    lateral view json_tuple(json, 'id', 'ids', 'total_number') t1   
    as id, ids, num  
    )  
    select username, age, sex, id, ids1, num  
    from tmp   
    lateral view explode(split(REGEXP_REPLACE(ids, "\\[|\\]",""), ",")) t1 as ids1;  
        
    -- json_tuple 优点是一次可以解析多个json字段,对嵌套结果的解析操作复杂;  
    

使用UDF处理

自定义UDF处理json串中的数组。自定义UDF函数: 输入:json串、数组的key
输出:字符串数组

  • pom依赖
    <dependencies>  
        <dependency>  
            <groupId>junit</groupId>  
            <artifactId>junit</artifactId>  
            <version>4.12</version>  
            <scope>provided</scope>  
        </dependency>  
        <dependency>  
            <groupId>com.alibaba</groupId>  
            <artifactId>fastjson</artifactId>  
            <version>1.1.23</version>  
        </dependency>  
    <dependency>  
        <groupId>org.apache.hive</groupId>  
        <artifactId>hive-exec</artifactId>  
        <version>2.3.7</version>  
        <scope>provided</scope>  
    </dependency>  
    </dependencies>  
        
        
    <build>  
        <plugins>  
            <plugin>  
                <artifactId>maven-compiler-plugin</artifactId>  
                <version>2.3.2</version>  
                <configuration>  
                    <source>1.8</source>  
                    <target>1.8</target>  
                </configuration>  
            </plugin>  
            <plugin>  
                <artifactId>maven-assembly-plugin</artifactId>  
                <configuration>  
                    <descriptorRefs>  
                        <descriptorRef>jar-with-dependencies</descriptorRef>  
                    </descriptorRefs>  
                </configuration>  
                <executions>  
                    <execution>  
                        <id>make-assembly</id>  
                        <phase>package</phase>  
                        <goals>  
                            <goal>single</goal>  
                        </goals>  
                    </execution>  
                </executions>  
            </plugin>  
        </plugins>  
    </build>  
    
  • java代码
    import com.alibaba.fastjson.JSON;  
    import com.alibaba.fastjson.JSONArray;  
    import com.alibaba.fastjson.JSONObject;  
    import org.apache.hadoop.hive.ql.exec.UDF;  
    import org.apache.parquet.Strings;  
    import org.junit.Test;  
        
    import java.util.ArrayList;  
        
    public class ParseJsonArray extends UDF {  
        public ArrayList<String> evaluate(String jsonStr, String  
                arrKey){  
            if (Strings.isNullOrEmpty(jsonStr)) {  
                return null;  
            }  
        
            try {  
                JSONObject jsonObject = JSON.parseObject(jsonStr);  
                JSONArray jsonArray = jsonObject.getJSONArray(arrKey);  
                ArrayList<String> results = new ArrayList<>();  
                for (Object o : jsonArray) {  
                    results.add(o.toString());  
                }  
                return results;  
            } catch (Throwable t) {  
                return null;  
            }  
        }  
        
        @Test  
        public void testParse() {  
            String jsonStr = "{\"id\": 2,\"ids\": [201,202,203,204],\"total_number\": 4}";  
            String key = "ids";  
        
            ArrayList<String> arrayList = evaluate(jsonStr, key);  
            System.out.println(JSON.toJSONString(arrayList));  
        }  
    }  
    
  • 使用自定义UDF
    -- 使用自定义 UDF 函数  
    -- 添加开发的jar包  
    add jar /data/lagoudw/jars/HiveUDF.jar;  
    -- 加载临时函数  
    create temporary function lagou_json_array as "com.lagou.dw.hive.udf.ParseJsonArray";  
        
    -- 执行查询  
    select username, age, sex, lagou_json_array(json, 'ids') ids from jsont1;  
        
    -- 解析json串中的数组,并展开  
    select username, age, sex, ids1  
    from jsont1  
    lateral view explode(lagou_json_array(json, 'ids')) t1 as ids1;  
        
    -- 解析json串中的id、num  
    select username, age, sex, id, num  
    from jsont1  
    lateral view json_tuple(json, 'id', 'total_number') t1 as id, num;  
        
    -- 解析json串中的数组,并展开  
    select username, age, sex, id, num, ids1  
    from jsont1  
    lateral view json_tuple(json, 'id', 'total_number') t1 as id, num  
    lateral view explode(lagou_json_array(json, 'ids')) t1 as ids1;  
    

使用SerDe处理

SerDe 是Serializer 和 Deserializer 的简写形式。Hive使用Serde进行行对象的序列 与反序列化。最后实现把文件内容映射到 hive 表中的字段数据类型。SerDe包括 Serialize/Deserilize 两个功能:

  • Serialize把Hive使用的java object转换成能写入HDFS字节序列,或者其他系统 能识别的流文件
  • Deserilize把字符串或者二进制流转换成Hive能识别的java object对象

  • Read : HDFS files => InputFileFormat => <key, value> => Deserializer => Row object
  • Write : Row object => Seriallizer => <key, value> => OutputFileFormat => HDFS files

https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide#DeveloperGuide-HiveSerDe

  • JsonSerDe 数据文件,必须是一行一个json

    {"id": 1,"ids": [101,102,103],"total_number": 3}  
    {"id": 2,"ids": [201,202,203,204],"total_number": 4}  
    {"id": 3,"ids": [301,302,303,304,305],"total_number": 5}  
    {"id": 4,"ids": [401,402,403,304],"total_number": 5}  
    {"id": 5,"ids": [501,502,503],"total_number": 3}  
    

    创建表的时候指定序列化类

    create table jsont2(  
      id int,  
    	ids array<string>,   
      total_number int)  
    ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe';  
    load data local inpath '/data/lagoudw/data/json2.dat' into table jsont2;