と言っても、よくあるようなsyslogの監視とかTwitterの解析とかつまらないので
あまり他にやっていないものを読み込ませようと思いました。
と言うことで、勉強にもなるかと思い、サイバー攻撃関連のデータを可視化してみようと思います。
自宅にFWとかProxyとかあれば、それぞれのログを突っ込んでみたいけど、そんな大層なことはやっていないので、外と繋がっている普通の無線LANルータにあるアクセスログを取り込もうと思います。
ちなみに、無線LANルータのログの取り出し方は別にまとめます。一気に書くと内容が多くなりすぎちゃうので。。
事前準備
Elasticで公開されている手順どおりにインストールを進めていく。まずは必須なパッケージである、JavaとApacheをインストール
# yum install java
# yum install httpd
ちなみに、Web画面でApacheを使うので、 Firewallのポート空けるのを忘れずに。
Logstash,Elasticsearch,Kibanaのインストール
yumでインストールするため、リポジトリを追加する。以下2ファイルを作成。
# cat /etc/yum.repos.d/Elastic.repo
[elasticsearch-2.x]
name=Elasticsearch repository for 2.x packages
baseurl=https://packages.elastic.co/elasticsearch/2.x/centos
gpgcheck=1
gpgkey=https://packages.elastic.co/GPG-KEY-elasticsearch
enabled=1
# cat /etc/yum.repos.d/Logstash.repo
[logstash-2.3]
name=Logstash repository for 2.3.x packages
baseurl=https://packages.elastic.co/logstash/2.3/centos
gpgcheck=1
gpgkey=https://packages.elastic.co/GPG-KEY-elasticsearch
enabled=1
まずは、ElasticsearchとLogstashパッケージをインストールする。
# yum install elasticsearch
# yum install logstash
Kibanaはインストール不要のため、yumは使わず、Webページから圧縮ファイルをダウンロードしてくる。
# curl -O https://download.elastic.co/kibana/kibana/kibana-4.5.4-linux-x64.tar.gz
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 31.7M 100 31.7M 0 0 1013k 0 0:00:32 0:00:32 --:--:-- 2611k
解凍し、公開Webのフォルダに格納する。
# tar -zvxf kibana-4.5.4-linux-x64.tar.gz
# mv kibana-4.5.4-linux-x64 /var/www/html/
# mv /var/www/html/kibana-4.5.4-linux-x64 /var/www/html/kibana
セットアップ
まずは、Elasticsearchの設定をする。設定ファイルは以下。# vi /etc/elasticsearch/elasticsearch.yml
以下の2箇所を修正、追記。
普通は0.0.0.0で公開されるので設定不要なのだが、VMなど接続するIPアドレスが変わる場合は、きちんと接続可能なIPアドレスを明記する必要あり。
# network.host: 192.168.0.1network.host: ***.***.***.***
# enable cross-origin resource sharing
http.cors.enabled: true
次にKibanaの設定。設定ファイルは以下。
# vi /var/www/html/kibana/config/kibana.yml
こちらもIPアドレスを記載。
server.host: "192.168.56.101"
elasticsearch.url: "http://192.168.56.101:9200"
最後にLogstashの設定。
ElasticsearchとKibanaはただ連携する設定しかしていないけど、
Logstashは対象のログファイルの読み込み設定とフォーマット、
Elasticsearchと連携する設定を行う。
以下ファイルを作成。
# vi /etc/logstash/conf.d/Security.conf
中身はこんな感じです。
input {
file{
path="/var/log/PR500MI/rSecurity.log"
start_position = "beginning"
}
}
filter {
grok{
match={
"message" = "%{DATA:logdate} SRC=%{IP:srcip}/%{NUMBER:srcport} DST=%{IP:dstip}/%{NUMBER:dstport} %{WORD:protocol} %{DATA:action}\[%{DATA:actiondetail}\]"
}
}
date {
match = ["logdate", "YYYY/MM/dd HH:mm:ss"]
locale = "en"
}
geoip {
source = ["srcip"]
}
}
output { elasticsearch {
hosts = [ "192.168.56.101:9200" ]
}
}
解説を入れておくと、
input{ file{ pathで指定するのが読み込む対象のファイル(今回は別で取り込んでいる無線LANルータのアクセスログ)
start_position〜の記載は、ファイルの読み込みを先頭から行うように指定。
(デフォルトだと末尾から読み込む?らしいので)
また、
filter{ grok{ matchは、読み込むログのフォーマットを指定する。
フォーマットは以下にまとまっている。
https://github.com/elastic/logstash/blob/v1.4.2/patterns/grok-patterns
また、フォーマットをチェックするために、以下のサイトを活用した。
http://grokdebug.herokuapp.com/
ここまででとりあえずはセットアップ完了。
起動
実際に起動してみる。順番は、Logstash -> Elasticsearch -> Kibanaの順(なはず)
Logstashの起動
# service logstash start
設定ファイルの誤りでログの読み込みに失敗している、などの情報はログに残る。
/var/log/logstash配下に出てるので確認する。
次にElasticsearchの起動
# service elasticsearch start
こちらも同様にログをチェックして、エラーが出ていないか確認する。こちらは/var/log/elasticsearch配下です。
最後のKibanaの起動
# /var/www/html/kibana/bin/kibana
Kibanaでログデータを確認
長かった準備はここまででKibanaでログデータが参照できるか確認してみる。KibanaのURL(http://***.***.***.***:5601/)へアクセスする。
こんな画面が出てくるはず。
書いてある通り、ここではIndexを指定する。
Kibanaでは分析するログデータのIndexとなる列を指定することで、柔軟な分析が可能となる。
本来、ここのページではログデータ内のカラムを参照し、Indexとなる列を選択させる画面である。
こんな風に選択する画面が出てくるのが正しいはず。
じゃあ、今回は何故違ったかと言うと、ログデータが正しく読み込めていないためだった。
色々ログファイルを見てみると、Elasticsearchのエラーログに以下のエラーが出ていた。(一部いじっています)
[2016-09-23 23:40:03,346][DEBUG][action.bulk ] [Cobalt Man] [logstash-2016.09.23][4] failed to execute bulk item (index) index {[logstash-2016.09.23][logs][AVdXfSraUEBq1amZMjWs], source[{"message":"/09/23 13:56:48 SRC=119.93.79.167/35125 DST=***.***.***.***/23 TCP 廃棄[パケットフィルタ]","@version":"1","@timestamp":"2016-09-23T14:40:02.635Z","path":"/var/log/PR500MI/Security.log","host":"centos.localdomain","logdate":"/09/23 13:56:48","srcip":"119.93.79.167","srcport":"35125","dstip":"***.***.***.***","dstport":"23","protocol":"TCP","action":"廃棄","actiondetail":"パケットフィルタ","tags":["_dateparsefailure"],"geoip":{"ip":"119.93.79.167","country_code2":"PH","country_code3":"PHL","country_name":"Philippines","continent_code":"AS","region_name":"53","city_name":"Muntinlupa","latitude":14.390299999999996,"longitude":121.04750000000001,"timezone":"Asia/Manila","real_region_name":"Rizal","location":[121.04750000000001,14.390299999999996]}}]}
MapperParsingException[failed to parse [logdate]]; nested: IllegalArgumentException[Invalid format: "/09/23 13:56:48"];
at org.elasticsearch.index.mapper.FieldMapper.parse(FieldMapper.java:329)
at org.elasticsearch.index.mapper.DocumentParser.parseObjectOrField(DocumentParser.java:309)
at org.elasticsearch.index.mapper.DocumentParser.parseValue(DocumentParser.java:436)
at org.elasticsearch.index.mapper.DocumentParser.parseObject(DocumentParser.java:262)
at org.elasticsearch.index.mapper.DocumentParser.parseDocument(DocumentParser.java:122)
at org.elasticsearch.index.mapper.DocumentMapper.parse(DocumentMapper.java:309)
at org.elasticsearch.index.shard.IndexShard.prepareCreate(IndexShard.java:529)
at org.elasticsearch.index.shard.IndexShard.prepareCreateOnPrimary(IndexShard.java:506)
at org.elasticsearch.action.index.TransportIndexAction.prepareIndexOperationOnPrimary(TransportIndexAction.java:215)
at org.elasticsearch.action.index.TransportIndexAction.executeIndexRequestOnPrimary(TransportIndexAction.java:224)
at org.elasticsearch.action.bulk.TransportShardBulkAction.shardIndexOperation(TransportShardBulkAction.java:327)
at org.elasticsearch.action.bulk.TransportShardBulkAction.shardOperationOnPrimary(TransportShardBulkAction.java:120)
at org.elasticsearch.action.bulk.TransportShardBulkAction.shardOperationOnPrimary(TransportShardBulkAction.java:68)
at org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryPhase.doRun(TransportReplicationAction.java:639)
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
at org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryOperationTransportHandler.messageReceived(TransportReplicationAction.java:279)
at org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryOperationTransportHandler.messageReceived(TransportReplicationAction.java:271)
at org.elasticsearch.transport.RequestHandlerRegistry.processMessageReceived(RequestHandlerRegistry.java:75)
at org.elasticsearch.transport.TransportService$4.doRun(TransportService.java:376)
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: Invalid format: "/09/23 13:56:48"
at org.joda.time.format.DateTimeParserBucket.doParseMillis(DateTimeParserBucket.java:187)
at org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:826)
at org.elasticsearch.index.mapper.core.DateFieldMapper$DateFieldType.parseStringValue(DateFieldMapper.java:362)
at org.elasticsearch.index.mapper.core.DateFieldMapper.innerParseCreateField(DateFieldMapper.java:528)
at org.elasticsearch.index.mapper.core.NumberFieldMapper.parseCreateField(NumberFieldMapper.java:241)
at org.elasticsearch.index.mapper.FieldMapper.parse(FieldMapper.java:321)
... 22 more
"[Invalid format: "/09/23 13:56:48"]"なんてのを見ると分かるけど、無線LANルータのログファイルのデータが正しく読み込めていないことが原因で、パースエラーが起こっている様子だった。
ログファイルには、ちゃんと"2016/09/23 13:56:48 ..."と記載されていたが、なぜかElasticsearchで読み込まれるときには"/09/23 13:56:48"となってしまっていました。。
まとめ
実は、この原因が解決せず、Elasticは諦めました。誰か原因わかれば教えてください。次回、同じことをSplunkで行った結果をまとめます。
0 件のコメント:
コメントを投稿