При анализе событий, генерируемых ИТ-системами, мы часто сталкиваемся с необходимостью выполнять их статистический анализ или анализировать последовательность событий для выявления аномалий.
Это требует от нас группировки, т. е. мы идентифицируем наборы связанных событий на основе набора определенных критериев. Чаще всего мы группируем события, которые имеют одинаковые значения атрибутов и/или произошли в определенное время.
Все инструменты анализа данных позволяют проводить статистический анализ. В Splunk мы можем выполнять такие вычисления с помощью команд «stats», «streamstats» и «eventstats».
Иная ситуация со вторым типом группировки, то есть анализом последовательности связанных событий. Дело простое, когда данные для анализа находятся в одном файле, а события объединены с одним атрибутом (например, идентификатором сеанса).
Уровень сложности значительно увеличивается в следующих ситуациях:
- события, которые должны быть сгруппированы, находятся в разных файлах;
- объединение событий должно выполняться с использованием нескольких атрибутов;
- последовательность событий должна начинаться и заканчиваться определенными событиями;
- события должны группироваться только тогда, когда между их появлением прошло определенное время.
Представьте себе гипотетическую систему, поддерживающую банковские переводы. В системе есть два компонента, которые ведут журнал в отдельные файлы. Компонент A отвечает за контроль доступа к приложению. Записывает в файл «component_a.log» информацию, кому, когда и с какого IP был предоставлен доступ, когда произошел выход из системы, а также информацию об ошибках. Компонент B выполняет передачи, упорядоченные зарегистрированными пользователями. Записывает информацию о выполненных операциях в файл «component_b.log».
Наша цель — отслеживать транзакции пользователей и обнаруживать всевозможные аномалии. Следовательно, нам необходимо сгенерировать отчет, содержащий следующую информацию:
- время определения перевода; имя пользователя, который определил перевод;
- IP-адрес, используемый пользователем, определяющим перевод; параметры перевода (название перевода, сумма, счет номер отправителя и получателя);
- время подтверждения перевода;
- имя пользователя, утвердившего перевод;
- IP-адрес пользователя, утвердившего перевод.
Информацию из пунктов 2, 3, 6, 7 можно найти в файле «component_a.log» остается в файле «component_b.log». Для получения указанного отчета необходимо создать транзакцию, которая свяжет все необходимые события.
Примеры журналов сохранения, подтверждения и отправки передачи могут выглядеть следующим образом:
Образцы журналов операции сохранения, утверждения и отправки перевода
Пользователь operator1 входит в систему в 12:41:30 (msg = logged_in) и сохраняет перевод в 12:42:01. (сообщение = передача_сохранено). События записывались двумя разными компонентами системы в два отдельных файла. Подключаем их с помощью поля session_id.
Перевод был подписан пользователем operator2 в 12:45:35. В этом случае поле session_id имеет другое значение. Поэтому мы не можем использовать его для привязки к предыдущему событию.
Вместо этого мы используем поле transfer_id. Перевод был отправлен компонентом B в 12:58:02. Мы связываем это событие с предыдущим с помощью поля transfer_id. В отчет мы должны включить информацию (имя, srcip) о пользователе, одобряющем перевод. Поэтому мы связываем событие утверждения передачи (msg = transfer_aproved) из файла component_b.log с событием входа в систему (msg = logged_in).
Запрос SPL, который выполняет описанную выше группировку и генерирует отчет в Splunek выглядит следующим образом:
index = demo (source = component_a.log OR source = component_b.log) | eval session_id_approval = if (msg = «transfer_approved», session_id, null), session_id = if (msg = «transfer_approved», null, session_id), time_transfer_saved = if (msg = «transfer_saved», _time, null) | транзакция session_id, session_id_approval, transfer_id | присоединиться к session_id_approval [поисковый индекс = demo source = component_a.log msg = logged_in | переименовать _time как time_approval, user как user_approval, srcip как srcip_approval, session_id как session_id_approval] | таблица time_transfer_saved, user, srcip, transfer_id, * _acc amount, currency, time_approval, user_approval, srcip_approval | eval time_transfer_saved = strftime (time_transfer_saved, «% F% T»), time_approval = strftime (time_approval, «% F% T»)
Первая строка запроса определяет базовый набор событий, над которыми мы будем работать. Затем, используя команду eval, мы подготавливаем наши события для обработки командой транзакции.
Мы записываем значение поля session_id в session_id_approval поле. На более позднем этапе мы будем использовать их для подключения к событию входа пользователя operator2.
Удалите поле session_id из события подтверждения передачи (msg = transfer_approved), чтобы не мешать процессу подключения событий с использованием поля session_id.
Если этот шаг опущен, функция группировки транзакций запутается, имея дело с двумя разными значениями поля session_id.
Мы группируем события после полей session_id, session_id_approval и transfer_id.
Команда транзакции автоматически обнаруживает группы связанных событий на основе указанных нами критериев и возвращает группы, найденные как новые события. Используя команду join, мы связываем событие подтверждения передачи с событием входа в систему пользователя. подтверждение перевода (оператор2).
Благодаря этому мы получаем его имя и ip-адрес, с которого он был подключен. Для этой цели мы используем поляsession_id_approval из события утверждения передачи и session_id из события входа пользователя operator2. Наконец, мы форматируем и генерируем отчет с помощью table command.
Результат запроса будет отображаться в виде таблицы:
В этом примере показано, как использовать Splunk за несколько простых шагов для обнаружения групп, связанных друг с другом событиями, в ситуации, когда определение отношения между ними требует использования нескольких атрибутов.
Главный «герой» описываемого решения — команда транзакции.
Возможности ее применения очень широки и выходят за рамки представленной темы.