2016年10月17日月曜日

Logstash+Elasticsearch+Kibinaで自宅への不正アクセス状況を可視化してみる

Elasticが流行り始めて久しいですが、自分も使ってみたい。
と言っても、よくあるようなsyslogの監視とかTwitterの解析とかつまらないので
あまり他にやっていないものを読み込ませようと思いました。

と言うことで、勉強にもなるかと思い、サイバー攻撃関連のデータを可視化してみようと思います。
自宅にFWとかProxyとかあれば、それぞれのログを突っ込んでみたいけど、そんな大層なことはやっていないので、外と繋がっている普通の無線LANルータにあるアクセスログを取り込もうと思います。
ちなみに、無線LANルータのログの取り出し方は別にまとめます。一気に書くと内容が多くなりすぎちゃうので。。


事前準備

Elasticで公開されている手順どおりにインストールを進めていく。

まずは必須なパッケージである、JavaとApacheをインストール

# yum install java

# yum install httpd

ちなみに、Web画面でApacheを使うので、 Firewallのポート空けるのを忘れずに。

Logstash,Elasticsearch,Kibanaのインストール

yumでインストールするため、リポジトリを追加する。
以下2ファイルを作成。

# cat /etc/yum.repos.d/Elastic.repo 
[elasticsearch-2.x]
name=Elasticsearch repository for 2.x packages
baseurl=https://packages.elastic.co/elasticsearch/2.x/centos
gpgcheck=1
gpgkey=https://packages.elastic.co/GPG-KEY-elasticsearch
enabled=1

# cat /etc/yum.repos.d/Logstash.repo 
[logstash-2.3]
name=Logstash repository for 2.3.x packages
baseurl=https://packages.elastic.co/logstash/2.3/centos
gpgcheck=1
gpgkey=https://packages.elastic.co/GPG-KEY-elasticsearch
enabled=1

まずは、ElasticsearchとLogstashパッケージをインストールする。
# yum install elasticsearch
# yum install logstash


Kibanaはインストール不要のため、yumは使わず、Webページから圧縮ファイルをダウンロードしてくる。
# curl -O https://download.elastic.co/kibana/kibana/kibana-4.5.4-linux-x64.tar.gz
% Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
Dload  Upload   Total   Spent    Left  Speed
100 31.7M  100 31.7M    0     0  1013k      0  0:00:32  0:00:32 --:--:-- 2611k

解凍し、公開Webのフォルダに格納する。
# tar -zvxf kibana-4.5.4-linux-x64.tar.gz
# mv kibana-4.5.4-linux-x64 /var/www/html/
# mv /var/www/html/kibana-4.5.4-linux-x64 /var/www/html/kibana

セットアップ

まずは、Elasticsearchの設定をする。設定ファイルは以下。
# vi /etc/elasticsearch/elasticsearch.yml 

以下の2箇所を修正、追記。
普通は0.0.0.0で公開されるので設定不要なのだが、VMなど接続するIPアドレスが変わる場合は、きちんと接続可能なIPアドレスを明記する必要あり。
# network.host: 192.168.0.1network.host: ***.***.***.***

# enable cross-origin resource sharing
http.cors.enabled: true

次にKibanaの設定。設定ファイルは以下。
# vi /var/www/html/kibana/config/kibana.yml 

こちらもIPアドレスを記載。
server.host: "192.168.56.101"
elasticsearch.url: "http://192.168.56.101:9200"

最後にLogstashの設定。
ElasticsearchとKibanaはただ連携する設定しかしていないけど、
Logstashは対象のログファイルの読み込み設定とフォーマット、
Elasticsearchと連携する設定を行う。

以下ファイルを作成。
# vi /etc/logstash/conf.d/Security.conf

中身はこんな感じです。
input {
  file{
    path="/var/log/PR500MI/rSecurity.log"
    start_position = "beginning"
  }
}

filter {
  grok{
    match={
      "message" = "%{DATA:logdate} SRC=%{IP:srcip}/%{NUMBER:srcport} DST=%{IP:dstip}/%{NUMBER:dstport} %{WORD:protocol} %{DATA:action}\[%{DATA:actiondetail}\]"
    }
  }
  date {
    match = ["logdate", "YYYY/MM/dd HH:mm:ss"]
    locale = "en"
  }
  geoip {
    source = ["srcip"]
  }
}

output { elasticsearch {
    hosts = [  "192.168.56.101:9200" ]
  }
} 

解説を入れておくと、
input{ file{ pathで指定するのが読み込む対象のファイル(今回は別で取り込んでいる無線LANルータのアクセスログ)
start_position〜の記載は、ファイルの読み込みを先頭から行うように指定。
(デフォルトだと末尾から読み込む?らしいので)

また、
filter{ grok{ matchは、読み込むログのフォーマットを指定する。
フォーマットは以下にまとまっている。
https://github.com/elastic/logstash/blob/v1.4.2/patterns/grok-patterns

また、フォーマットをチェックするために、以下のサイトを活用した。
http://grokdebug.herokuapp.com/

ここまででとりあえずはセットアップ完了。

起動

実際に起動してみる。
順番は、Logstash -> Elasticsearch -> Kibanaの順(なはず)

Logstashの起動
# service logstash start

設定ファイルの誤りでログの読み込みに失敗している、などの情報はログに残る。
 /var/log/logstash配下に出てるので確認する。

次にElasticsearchの起動
# service elasticsearch start
こちらも同様にログをチェックして、エラーが出ていないか確認する。
こちらは/var/log/elasticsearch配下です。

最後のKibanaの起動
# /var/www/html/kibana/bin/kibana

Kibanaでログデータを確認

長かった準備はここまででKibanaでログデータが参照できるか確認してみる。
KibanaのURL(http://***.***.***.***:5601/)へアクセスする。
こんな画面が出てくるはず。


書いてある通り、ここではIndexを指定する。
Kibanaでは分析するログデータのIndexとなる列を指定することで、柔軟な分析が可能となる。

本来、ここのページではログデータ内のカラムを参照し、Indexとなる列を選択させる画面である。
こんな風に選択する画面が出てくるのが正しいはず。

じゃあ、今回は何故違ったかと言うと、ログデータが正しく読み込めていないためだった。
色々ログファイルを見てみると、Elasticsearchのエラーログに以下のエラーが出ていた。(一部いじっています)
[2016-09-23 23:40:03,346][DEBUG][action.bulk              ] [Cobalt Man] [logstash-2016.09.23][4] failed to execute bulk item (index) index {[logstash-2016.09.23][logs][AVdXfSraUEBq1amZMjWs], source[{"message":"/09/23 13:56:48 SRC=119.93.79.167/35125 DST=***.***.***.***/23 TCP 廃棄[パケットフィルタ]","@version":"1","@timestamp":"2016-09-23T14:40:02.635Z","path":"/var/log/PR500MI/Security.log","host":"centos.localdomain","logdate":"/09/23 13:56:48","srcip":"119.93.79.167","srcport":"35125","dstip":"***.***.***.***","dstport":"23","protocol":"TCP","action":"廃棄","actiondetail":"パケットフィルタ","tags":["_dateparsefailure"],"geoip":{"ip":"119.93.79.167","country_code2":"PH","country_code3":"PHL","country_name":"Philippines","continent_code":"AS","region_name":"53","city_name":"Muntinlupa","latitude":14.390299999999996,"longitude":121.04750000000001,"timezone":"Asia/Manila","real_region_name":"Rizal","location":[121.04750000000001,14.390299999999996]}}]}
MapperParsingException[failed to parse [logdate]]; nested: IllegalArgumentException[Invalid format: "/09/23 13:56:48"];
 at org.elasticsearch.index.mapper.FieldMapper.parse(FieldMapper.java:329)
 at org.elasticsearch.index.mapper.DocumentParser.parseObjectOrField(DocumentParser.java:309)
 at org.elasticsearch.index.mapper.DocumentParser.parseValue(DocumentParser.java:436)
 at org.elasticsearch.index.mapper.DocumentParser.parseObject(DocumentParser.java:262)
 at org.elasticsearch.index.mapper.DocumentParser.parseDocument(DocumentParser.java:122)
 at org.elasticsearch.index.mapper.DocumentMapper.parse(DocumentMapper.java:309)
 at org.elasticsearch.index.shard.IndexShard.prepareCreate(IndexShard.java:529)
 at org.elasticsearch.index.shard.IndexShard.prepareCreateOnPrimary(IndexShard.java:506)
 at org.elasticsearch.action.index.TransportIndexAction.prepareIndexOperationOnPrimary(TransportIndexAction.java:215)
 at org.elasticsearch.action.index.TransportIndexAction.executeIndexRequestOnPrimary(TransportIndexAction.java:224)
 at org.elasticsearch.action.bulk.TransportShardBulkAction.shardIndexOperation(TransportShardBulkAction.java:327)
 at org.elasticsearch.action.bulk.TransportShardBulkAction.shardOperationOnPrimary(TransportShardBulkAction.java:120)
 at org.elasticsearch.action.bulk.TransportShardBulkAction.shardOperationOnPrimary(TransportShardBulkAction.java:68)
 at org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryPhase.doRun(TransportReplicationAction.java:639)
 at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
 at org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryOperationTransportHandler.messageReceived(TransportReplicationAction.java:279)
 at org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryOperationTransportHandler.messageReceived(TransportReplicationAction.java:271)
 at org.elasticsearch.transport.RequestHandlerRegistry.processMessageReceived(RequestHandlerRegistry.java:75)
 at org.elasticsearch.transport.TransportService$4.doRun(TransportService.java:376)
 at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: Invalid format: "/09/23 13:56:48"
 at org.joda.time.format.DateTimeParserBucket.doParseMillis(DateTimeParserBucket.java:187)
 at org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:826)
 at org.elasticsearch.index.mapper.core.DateFieldMapper$DateFieldType.parseStringValue(DateFieldMapper.java:362)
 at org.elasticsearch.index.mapper.core.DateFieldMapper.innerParseCreateField(DateFieldMapper.java:528)
 at org.elasticsearch.index.mapper.core.NumberFieldMapper.parseCreateField(NumberFieldMapper.java:241)
 at org.elasticsearch.index.mapper.FieldMapper.parse(FieldMapper.java:321)
 ... 22 more

"[Invalid format: "/09/23 13:56:48"]"なんてのを見ると分かるけど、無線LANルータのログファイルのデータが正しく読み込めていないことが原因で、パースエラーが起こっている様子だった。
ログファイルには、ちゃんと"2016/09/23 13:56:48 ..."と記載されていたが、なぜかElasticsearchで読み込まれるときには"/09/23 13:56:48"となってしまっていました。。

まとめ

実は、この原因が解決せず、Elasticは諦めました。誰か原因わかれば教えてください。
次回、同じことをSplunkで行った結果をまとめます。

0 件のコメント:

コメントを投稿