Главная
Блог разработчиков phpBB
 
+ 17 предустановленных модов
+ SEO-оптимизация форума
+ авторизация через соц. сети
+ защита от спама

Загружаем в Oracle данные SNMP-мониторинга

Anna | 4.06.2014 | нет комментариев
Некоторое время назад, я написал статью посвященную вопросам оптимизации загрузки данных в БД Oracle. Судя по обилию последовавших комментариев, статья вызвала живой интерес, но, судя по тем-же комментариям (а также последовавшей статье о загрузке данных в PostgreSQL) многими она была осознана не так, как я на то рассчитывал. По большей части, повинен в этом я сам, от того что, в процессе облегчения изложения материала, оторвался от жизни настоль, что задача перестала быть внятна окружающим (это, в свою очередь, отрицательно сказалось на понимании причин выбора способов, использованных для ее решения).

Сегодня, я хочу поправить допущенные ошибки. Я расскажу о реальной задаче обработки данных SNMP-мониторинга, уделяя наивысшее внимание техническим подробностям. Я постараюсь обосновать выбор подходов для ее решения и сравнить их продуктивность. Также, я уделю внимание тем техническим моментам, которые могут вызвать трудности у новичков. Раньше чем двигаться дальше, я хочу выразить свою признательность DenKrepxlix123zhekappp и каждому прочим товарищам, давшим немыслимое число пригодных советов, в процессе обсуждения предыдущей статьи.

Для чего все это?

Хочу сразу сказать, что меня абсолютно не волнуют такие вопросы как:

  1. С какой максимальной скоростью дозволено загрузить данные в Oracle?
  2. Что стремительней Oracle либо PostgreSQL?
  3. Насколько стремительно дозволено осуществлять вставку в таблицу БД?

По большей части, эти вопросы, и сходственные им, не имеют смысла (во каждом случае в отрыве от подробностей аппаратной конфигурации). Я абсолютно осмысленно не говорю ни слова о том, на каком «железе» работает мой сервер Oracle. На мой взор, это не значимо. Но что же тогда значимо?

Значимо то, что есть настоящая задача, скажем сбора данных SNMP-мониторинга, в процессе выполнения которой, непрерывно генерируется огромный объем данных, которые необходимо обработать. При этом, существенны следующие моменты:

  1. Неудовлетворительно легко вставить данные в таблицу (как именно обязаны быть обработаны данные и отчего, я расскажу в дальнейшем разделе)
  2. Данные генерируются не на сервере БД (скорее каждого, будет несколько серверов сбора данных, передающих данные в цельную БД)
  3. Данные поступают непрерывно и также непрерывно обязаны обрабатываться, желанно минимизировать время обработки данных (Дабы обеспечить минимальное время реакции на появление какой либо аварийной обстановки)
  4. Допускается потеря части данных (если случилась авария, мы найдем это на дальнейшем цикле опроса, даже при потере части данных нынешнего)
  5. История метаморфозы основных параметров должна сохраняться долговременно

Я рассматриваю разные варианты решения этой задачи и сопоставляю их продуктивность. Разумеется, целью является поиск особенно продуктивного решения.

Постановка задачи

Для начала, припомним, какие именно данные мы получаем, применяя SNMP? Реально, мы можем получить значения некоторых предопределенных переменных, запросив волнующий нас OID, применяя GET-запрос. Скажем, запросив OID = 1.3.6.1.2.1.1.3.0, мы можем получить значение такой главной для мониторинга величины как sysUpTime. Значения переменных, доступ к которым предоставляется по SNMP, не непременно числовые. Это могут быть и строки.

Но SNMP не ограничивается доступом к комплекту скалярных переменных. Значения переменных могут быть сгруппированных в таблицы. В всякой строке таблицы группируются значения переменных, связанных с каким-то отдельным источником. Для доступа к всякому значению, нужно дополнить OID присвоенный столбцу таблицы некоторым идентификатором, определяющим строку изложения источника. Данный идентификатор строки мы будем называть индексом источника.

В случае опроса списка интерфейсов (1.3.6.1.2.1.2), в роли идентификатора источника выступает целое число, но, для других таблиц, это может быть IP-адреc либо что-то другое, определяемое спецификацией. Трудность заключается в том, что нам не знаменит индекс опрашиваемого источника предварительно и мы не можем получить значение волнующей нас переменной применяя GET-запрос.

Для чтения значений в таблицах, нужно применять GETNEXT-запросы, возвращающие OID и значение переменной, дальнейшей в лексикографическом порядке за OID-ом, указанном в запросе. Так передавая OID-столбца, представляющий собой префикс OID-а волнующей нас переменной, мы получаем соответствующее значение из первой строки таблицы. Дабы получить значение дальнейшей строки, мы передаем OID, полученный в рамках результата на 1-й запрос и так дальше, пока таблица не будет просмотрена всецело.

В целях оптимизации продуктивности (путем уменьшения числа отсылаемых запросов), мы можем передавать OID-ы нескольких столбцов одним запросом. Помимо того, во 2 версии SNMP была добавлена вероятность образования BULK-запросов. Один BULK-запрос заменяет несколько исполненных друг за ином GETNEXT-запросов, что разрешает прочитать всю таблицу целиком за один запрос (при довольной величине BULK). Обо каждому этом я теснее рассказывал ранее.

Повторяю я все это для того, Дабы было ясно — таблицы не непрерывны! Изложение (скажем где-то в БД) таблицы интерфейсов вручную, с назначением им OID-ов абсолютно безрезультатно. Строки таблиц могут добавляться и удаляться при переконфигурации оборудования. Больше того, индекс какого-то присутствующего интерфейса может измениться! Реально, одной из задач системы SNMP-мониторинга является автоматизация отслеживания всех изменений просматриваемых таблиц.

Как это будет выглядеть в БД? Достаточно легко:

image

Данные, полученные в процессе мониторинга, мы будем привязывать к источникам (ae_resource). Источники, в свою очередь, будут связаны в двухуровневую иерархию. На верхнем ярусе будет представлен источник устройства. С ним, по owner_id, будут связаны дочерние источники, скажем интерфейсы (тот факт, что это именно интерфейсы, а не что-то другое, будет определяться значением type_id из справочника ae_resource_type). Значения device_id, у родительского и всех дочерних источников, будут совпадать и указывать на изложение оборудования.

Дозволено подметить, что у таблицы ae_resource имеются поля start_date и end_date. Поддерживать их в актуальном состоянии — наша задача. Мы обязаны создавать новые источники, по мере необходимости, проставляя им дату начала действия в start_date и завершать действие устаревших источников, устанавливая end_date. Для идентификации источников, будет применяться поле name (в случае интерфейсов, это значение признака ifDescr — 1.3.6.1.2.1.2.2.1.2). Индекс источника будем сберегать в поле res_num (в случае его метаморфозы, источник со ветхим значением индекса должен быть закрыт, позже чего, должен быть сделан новейший источник).

Надобность поддержки списка интерфейсов в актуальном состоянии — основная повод, по которой данные придется обрабатывать (правда обыкновенная вставка полученных данных в таблицу заняла-бы значительно поменьше времени). Но уж если мы все равно обрабатываем данные, отчего-бы не получить от этого максимальную пользу? В процессе мониторинга, мы получаем дюже много данных, часть из которых не изменяется либо изменяется незначительно. Мы можем уменьшить объем данных, сберегаемых в БД (что целебно отразится как на ее объеме, так и на продуктивности), если будет сберегать только важнейшие метаморфозы. Но как определить, какие метаморфозы являются важными? В этом нам помогут политики:

image

Значение всякого полученного нами параметра будет связано с некоторым доменом (ae_domain). Регулярное выражение (regexp) поможет валидировать корректность значения. Перед сохранением в БД, значение может быть преобразовано к какому-то иному домену (скажем строки мы получаем в шестнадцатеричном представлении, которое было бы недурно преобразовывать в больше привычную форму). Правила реформирования будут определяться таблицей ae_domain_convert.

Какие метаморфозы будут считаться важными? Это зависит от домена. По умолчанию, важным будет считаться всякое метаморфоза значения (то есть, если значение не изменилось, запись в БД выполняться не будет). Для некоторых параметров имеет толк задать специальные правила. Скажем sysUpTime (позже соответствующего реформирования) — однообразно вырастающая числовая величина. Уменьшение этого значения обозначает, что хост перезагрузился. Задание специальной политики для этого домена дозволит нам записывать в БД только события уменьшения значения (обозначающее перезагрузки), при этом, в БД будет записываться не полученное, а предыдущее значение (то есть наивысший достигнутый uptime).

В ae_threshold будем задавать пороги, пересечение которых (в заданном направлении) будет рассматриваться как важное метаморфоза. Добавочно введем специальный тип порога (delta), определяющий безусловное значение разности между предыдущим и полученным значением. Задание такого порога может быть комфортно, скажем, для счетчиков трафика, таких как ifInOctets (1.3.6.1.2.1.2.2.1.10).

Целиком, схема данных будет выглядеть дальнейшим образом:

Схема данных

image
Скрипт

create sequence ae_platform_model_seq start with 100;

create table ae_platform_model (
  id             number                              not null,
  name           varchar2(30)                        not null,
  description    varchar2(300)
);

comment on table ae_platform_model is 'Модель оборудования';

create unique index ae_platform_model_pk on ae_platform_model(id);

alter table ae_platform_model add
  constraint pk_ae_platform_model primary key(id);

create sequence ae_device_seq cache 100;

create table ae_device (
  id             number                              not null,
  model_id       number                              not null,
  start_date     date        default sysdate         not null,
  end_date       date        default null  
);

comment on table ae_device is 'Оборудование';

create unique index ae_device_pk on ae_device(id);

create index ae_device_fk on ae_device(device_id);

create index ae_device_model_fk on ae_device(model_id);

create index ae_device_zone_fk on ae_device(zone_id);

alter table ae_device add
  constraint pk_ae_device primary key(id);

alter table ae_device add
  constraint fk_ae_device_model foreign key (model_id) 
    references ae_platform_model(id);

create sequence ae_resource_class_seq start with 100;

create table ae_resource_class (
  id             number                              not null,
  owner_id       number,
  is_logical     number(1)                           not null,
  name           varchar2(30)                        not null,
  description    varchar2(300)
);

comment on table ae_resource_class is 'Класс источника';

comment on column ae_resource_class.is_logical is 'Знак логичного источника';

create unique index ae_resource_class_pk on ae_resource_class(id);

create index ae_resource_class_fk on ae_resource_class(owner_id);

alter table ae_resource_class
  add constraint ae_resource_class_ck check (is_logical in (0, 1));

alter table ae_resource_class add
  constraint pk_ae_resource_class primary key(id);

create sequence ae_resource_type_seq start with 100;

create table ae_resource_type (
  id             number                              not null,
  owner_id       number,
  parent_id      number,
  class_id       number                              not null,
  name           varchar2(30)                        not null,
  description    varchar2(300)
);

comment on table ae_resource_type is 'Тип источника';

create unique index ae_resource_type_pk on ae_resource_type(id);

create index ae_resource_type_owner_fk on ae_resource_type(owner_id);

create index ae_resource_type_parent_fk on ae_resource_type(parent_id);

alter table ae_resource_type add
  constraint pk_ae_resource_type primary key(id);

alter table ae_resource_type add
  constraint fk_ae_resource_type foreign key (class_id) 
    references ae_resource_class(id);

alter table ae_resource_type add
  constraint fk_ae_resource_type_owner foreign key (owner_id) 
    references ae_resource_type(id);

alter table ae_resource_type add
  constraint fk_ae_resource_type_parent foreign key (parent_id) 
    references ae_resource_type(id);

create sequence ae_resource_seq cache 100;

create table ae_resource (
  id             number                              not null,
  device_id      number                              not null,
  owner_id       number           default null,            
  type_id        number                              not null,
  name           varchar2(1000)                      not null,
  res_num        varchar2(300)                       not null,
  res_id         number,
  tmp_id         number,
  start_date     date        default sysdate         not null,
  end_date       date        default null  
);

create unique index ae_resource_pk on ae_resource(id);

create index ae_res_dev_fk on ae_resource(device_id);

create index ae_res_dev_type_fk on ae_resource(type_id);

create index ae_res_dev_res_fk on ae_resource(res_id);

create index ae_res_dev_res_tmp_fk on ae_resource(tmp_id);

alter table ae_resource add
  constraint pk_ae_resource primary key(id);

alter table ae_resource add
  constraint fk_ae_res_device foreign key (device_id) 
    references ae_device(id);

alter table ae_resource add
  constraint fk_ae_res_dev_parent foreign key (owner_id) 
    references ae_resource(id);

alter table ae_resource add
  constraint fk_ae_res_dev_type foreign key (type_id) 
    references ae_resource_type(id);

create table ae_policy_type (
  id                 number                           not null,
  name               varchar2(30)                     not null,
  description        varchar2(100)
);

comment on table ae_policy_type is 'Список поддерживаемых платформ';

create unique index ae_policy_type_pk on ae_policy_type(id);

create unique index ae_policy_type_uk on ae_policy_type(name);

alter table ae_policy_type add
  constraint pk_ae_policy_type primary key(id);

create table ae_state_policy (
  id                 number                           not null,
  type_id            number                           not null,
  name               varchar2(30)                     not null,
  description        varchar2(100)
);

comment on table ae_state_policy is 'Список поддерживаемых платформ';

create unique index ae_state_policy_pk on ae_state_policy(id);

create index ae_state_policy_fk on ae_state_policy(type_id);

alter table ae_state_policy add
  constraint pk_ae_state_policy primary key(id);

alter table ae_state_policy add
  constraint fk_ae_state_policy foreign key (type_id) 
    references ae_policy_type(id);

create table ae_threshold_type (
  id             number                              not null,
  name           varchar2(30)                        not null,
  description    varchar2(300)
);

create unique index ae_threshold_type_pk on ae_threshold_type(id);

alter table ae_threshold_type add
  constraint pk_ae_threshold_type primary key(id);

create sequence ae_threshold_seq start with 100;

create table ae_threshold (
  id             number                              not null,
  type_id        number                              not null,
  policy_id      number                              not null,
  value          varchar2(100)                       not null
);

create unique index ae_threshold_pk on ae_threshold(id);

create index ae_threshold_direction_fk on ae_threshold(type_id);

create index ae_threshold_profile_fk on ae_threshold(policy_id);

alter table ae_threshold add
  constraint pk_ae_threshold primary key(id);

alter table ae_threshold add
  constraint fk_ae_threshold_type foreign key (type_id) 
    references ae_threshold_type(id);

alter table ae_threshold add
  constraint fk_ae_threshold_policy foreign key (policy_id) 
    references ae_state_policy(id);

create sequence ae_domain_convert_seq start with 100;

create table ae_domain (
  id             number                              not null,
  policy_id      number            default null,
  regexp         varchar2(100),
  is_case_sens   number(1)         default 0         not null,
  description    varchar2(100)
);

create unique index ae_domain_pk on ae_domain(id);

create index ae_domain_fk on ae_domain(policy_id);

alter table ae_domain
  add constraint ae_domain_ck check (is_case_sens in (0, 1));

alter table ae_domain add
  constraint pk_ae_domain primary key(id);

alter table ae_domain add
  constraint fk_ae_domain foreign key (policy_id) 
    references ae_state_policy(id);

create sequence ae_parameter_seq start with 1000;

create table ae_parameter (
  id             number                              not null,
  domain_id      number                              not null,
  parent_id      number,
  name           varchar2(30)                        not null,  
  description    varchar2(100)
);

create unique index ae_parameter_pk on ae_parameter(id);

create unique index ae_parameter_uk on ae_parameter(name);

create index ae_parameter_domain_fk on ae_parameter(domain_id);

create index ae_parameter_parent_fk on ae_parameter(parent_id);

alter table ae_parameter add
  constraint pk_ae_parameter primary key(id);

alter table ae_parameter add
  constraint fk_ae_parameter_domain foreign key (domain_id) 
    references ae_domain(id);

alter table ae_parameter add
  constraint fk_ae_parameter foreign key (parent_id) 
    references ae_parameter(id);

create sequence ae_state_seq cache 100;

create table ae_state (
  id             number                              not null,
  res_id         number                              not null,
  param_id       number                              not null,
  value          varchar2(300),
  datetime       timestamp default current_timestamp not null
);

comment on table ae_state is 'Состояние параметра';

comment on column ae_state.datetime is 'Дата и время последнего метаморфозы';

create unique index ae_state_pk on ae_state(id);

create index ae_state_res_fk on ae_state(res_id);

create index ae_state_param_fk on ae_state(param_id);

alter table ae_state add
  constraint pk_ae_state primary key(id);

alter table ae_state add
  constraint fk_ae_state_res foreign key (res_id) 
    references ae_resource(id);

alter table ae_state add
  constraint fk_ae_state_param foreign key (param_id) 
    references ae_parameter(id);

create sequence ae_state_log_seq cache 100;

create table ae_state_log (
  id             number                              not null,
  res_id         number                              not null,
  param_id       number                              not null,
  value          varchar2(300),
  datetime       timestamp default current_timestamp not null
) pctfree 0
  partition by range (datetime)
( partition ae_state_log_p1 values less than (maxvalue)
);

comment on table ae_state_log is 'Хронология метаморфозы состояния параметра';

create unique index ae_state_log_pk on ae_state_log(datetime, id) local;

alter table ae_state_log add
  constraint pk_ae_state_log primary key(datetime, id);

create sequence ae_profile_type_seq;

create table ae_profile_type (
  id             number                              not null,
  name           varchar2(30)                        not null,
  description    varchar2(100)
);

create unique index ae_profile_type_pk on ae_profile_type(id);

create unique index ae_profile_type_uk on ae_profile_type(name);

alter table ae_profile_type add
  constraint pk_ae_profile_type primary key(id);

create sequence ae_profile_seq;

create table ae_profile (
  id             number                              not null,
  type_id        number                              not null,
  is_default     number(1)         default 0         not null,
  model_id       number                              not null,
  script_id      number            default null,
  name           varchar2(30)                        not null,
  description    varchar2(100)
);

create unique index ae_profile_pk on ae_profile(id);

create index ae_profile_type_fk on ae_profile(type_id);

create index ae_profile_model_fk on ae_profile(model_id);

create index ae_profile_script_fk on ae_profile(script_id);

alter table ae_profile
  add constraint ae_profile_ck check (is_default in (0, 1));

alter table ae_profile add
  constraint pk_ae_profile primary key(id);

alter table ae_profile add
  constraint fk_ae_profile_type foreign key (type_id) 
    references ae_profile_type(id);

create sequence ae_profile_detail_seq;

create table ae_profile_detail (
  id             number                              not null,
  type_id        number                              not null,
  profile_id     number                              not null,
  model_id       number                              not null,
  param_id       number                              not null
);

create unique index ae_profile_detail_pk on ae_profile_detail(id);

create index ae_profile_detail_fk on ae_profile_detail(profile_id);

create index ae_profile_detail_type_fk on ae_profile_detail(type_id);

create index ae_profile_detail_model_fk on ae_profile_detail(model_id);

create index ae_profile_detail_param_fk on ae_profile_detail(param_id);

alter table ae_profile_detail add
  constraint pk_ae_profile_detail primary key(id);

alter table ae_profile_detail add
  constraint fk_ae_profile_detail foreign key (profile_id) 
    references ae_profile(id);

alter table ae_profile_detail add
  constraint fk_ae_profile_detail_type foreign key (type_id) 
    references ae_resource_type(id);

alter table ae_profile_detail add
  constraint fk_ae_profile_detail_model foreign key (model_id) 
    references ae_platform_model(id);

create global temporary table ae_state_tmp (
  id             number                              not null,
  device_id      number                              not null,
  profile_id     number                              not null,
  param_id       number                              not null,
  num            varchar2(300),
  value          varchar2(300),
  datetime       timestamp default current_timestamp not null
) on commit delete rows;

create index ae_state_tmp_ix on ae_state_tmp(device_id, profile_id, param_id, num);

Сейчас осталось заполнить справочники данными:

Тестовые данные

Insert into AE_POLICY_TYPE
   (ID, NAME, DESCRIPTION)
 Values
   (1, 'default', NULL);
Insert into AE_POLICY_TYPE
   (ID, NAME, DESCRIPTION)
 Values
   (2, 'uptime', NULL);
Insert into AE_POLICY_TYPE
   (ID, NAME, DESCRIPTION)
 Values
   (3, 'threshold', NULL);
COMMIT;

Insert into AE_STATE_POLICY
   (ID, NAME, DESCRIPTION, TYPE_ID)
 Values
   (1, 'default', NULL, 1);
Insert into AE_STATE_POLICY
   (ID, NAME, DESCRIPTION, TYPE_ID)
 Values
   (2, 'uptime', NULL, 2);
COMMIT;

Insert into AE_DOMAIN
   (ID, REGEXP, IS_CASE_SENS, DESCRIPTION, POLICY_ID)
 Values
   (10, '((d )D*,s*)?(d ):(d ):(d )(.d )?', 0, 'SNMP uptime', 1);
Insert into AE_DOMAIN
   (ID, REGEXP, IS_CASE_SENS, DESCRIPTION, POLICY_ID)
 Values
   (11, 'd ', 0, 'SNMP число', 1);
Insert into AE_DOMAIN
   (ID, REGEXP, IS_CASE_SENS, DESCRIPTION, POLICY_ID)
 Values
   (12, '([a-fA-Fd]) ', 0, 'SNMP строка', 1);
Insert into AE_DOMAIN
   (ID, REGEXP, IS_CASE_SENS, DESCRIPTION, POLICY_ID)
 Values
   (13, '.*', 0, 'SNMP Произвольная строка', 1);
Insert into AE_DOMAIN
   (ID, REGEXP, IS_CASE_SENS, DESCRIPTION, POLICY_ID)
 Values
   (14, 'd ', 0, 'SNMP uptime (числовая форма)', 2);
COMMIT;

Insert into AE_PARAMETER
   (ID, DOMAIN_ID, PARENT_ID, NAME, DESCRIPTION)
 Values
   (101, 14, NULL, 'uptime', 'SNMP Uptime');
Insert into AE_PARAMETER
   (ID, DOMAIN_ID, PARENT_ID, NAME, DESCRIPTION)
 Values
   (102, 11, NULL, 'ifIndex', 'Индекс интерфейса');
Insert into AE_PARAMETER
   (ID, DOMAIN_ID, PARENT_ID, NAME, DESCRIPTION)
 Values
   (103, 13, NULL, 'ifName', 'Имя интерфейса');
Insert into AE_PARAMETER
   (ID, DOMAIN_ID, PARENT_ID, NAME, DESCRIPTION)
 Values
   (104, 11, NULL, 'ifInOctets', 'Входящий трафик');
Insert into AE_PARAMETER
   (ID, DOMAIN_ID, PARENT_ID, NAME, DESCRIPTION)
 Values
   (105, 11, NULL, 'ifOutOctets', 'Исходящий трафик');
COMMIT;

Insert into AE_PLATFORM_MODEL
   (ID, NAME, DESCRIPTION)
 Values
   (1, 'test', NULL);
COMMIT;

Insert into AE_PROFILE_TYPE
   (ID, NAME, DESCRIPTION)
 Values
   (1, 'mon', 'Мониторинг');
COMMIT;

Insert into AE_PROFILE
   (ID, TYPE_ID, IS_DEFAULT, MODEL_ID, SCRIPT_ID, 
    NAME, DESCRIPTION)
 Values
   (1, 1, 1, 1, NULL, 
    'test', NULL);
COMMIT;

Insert into AE_RESOURCE_CLASS
   (ID, IS_LOGICAL, NAME, DESCRIPTION, OWNER_ID)
 Values
   (1, 0, 'Устройство', NULL, NULL);
Insert into AE_RESOURCE_CLASS
   (ID, IS_LOGICAL, NAME, DESCRIPTION, OWNER_ID)
 Values
   (2, 0, 'Интерфейс', NULL, 1);
COMMIT;

Insert into AE_RESOURCE_TYPE
   (ID, CLASS_ID, NAME, DESCRIPTION, OWNER_ID, 
    PARENT_ID)
 Values
   (1, 1, 'Host', NULL, NULL, 
    NULL);
Insert into AE_RESOURCE_TYPE
   (ID, CLASS_ID, NAME, DESCRIPTION, OWNER_ID, 
    PARENT_ID)
 Values
   (2, 2, 'Interface', NULL, 1, 
    NULL);
COMMIT;

Insert into AE_PROFILE_DETAIL
   (ID, TYPE_ID, PROFILE_ID, MODEL_ID, PARAM_ID)
 Values
   (4, 2, 1, 1, 104);
Insert into AE_PROFILE_DETAIL
   (ID, TYPE_ID, PROFILE_ID, MODEL_ID, PARAM_ID)
 Values
   (5, 2, 1, 1, 105);
Insert into AE_PROFILE_DETAIL
   (ID, TYPE_ID, PROFILE_ID, MODEL_ID, PARAM_ID)
 Values
   (6, 1, 1, 1, 1);
Insert into AE_PROFILE_DETAIL
   (ID, TYPE_ID, PROFILE_ID, MODEL_ID, PARAM_ID)
 Values
   (1, 1, 1, 1, 101);
Insert into AE_PROFILE_DETAIL
   (ID, TYPE_ID, PROFILE_ID, MODEL_ID, PARAM_ID)
 Values
   (2, 2, 1, 1, 102);
Insert into AE_PROFILE_DETAIL
   (ID, TYPE_ID, PROFILE_ID, MODEL_ID, PARAM_ID)
 Values
   (3, 2, 1, 1, 103);
COMMIT;

Insert into AE_DEVICE
   (ID, MODEL_ID, START_DATE, END_DATE)
 Values
   (0, 1, TO_DATE('10/30/2013 15:37:16', 'MM/DD/YYYY HH24:MI:SS'), NULL);
COMMIT;

Insert into AE_RESOURCE
   (ID, DEVICE_ID, OWNER_ID, TYPE_ID, NAME, 
    RES_NUM, RES_ID, START_DATE, END_DATE, TMP_ID)
 Values
   (1, 0, NULL, 1, '127.0.0.1', 
    '0', NULL, TO_DATE('10/30/2013 15:24:44', 'MM/DD/YYYY HH24:MI:SS'), NULL, NULL);
COMMIT;

Insert into AE_THRESHOLD_TYPE
   (ID, NAME, DESCRIPTION)
 Values
   (1, 'increase', 'Увеличение');
Insert into AE_THRESHOLD_TYPE
   (ID, NAME, DESCRIPTION)
 Values
   (2, 'decrease', 'Уменьшение');
Insert into AE_THRESHOLD_TYPE
   (ID, NAME, DESCRIPTION)
 Values
   (3, 'delta', 'Приращение');
COMMIT;

Insert into AE_THRESHOLD
   (ID, TYPE_ID, POLICY_ID, VALUE)
 Values
   (1, 3, 1, '100');
COMMIT;

И подготовить заготовку кода тестирования:

Тестовый код

package com.acme.ae.tests.jdbc;

import oracle.jdbc.driver.OracleCallableStatement;
import oracle.sql.*;

import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

public class Test {

	private final static String CLASS_NAME = "oracle.jdbc.driver.OracleDriver";
	private final static String USER_CONN  = "jdbc:oracle:thin:@192.168.124.5:1523:new11";
	private final static String USER_NAME  = "ais";
	private final static String USER_PASS  = "ais";

	private final static boolean AUTO_COMMIT_MODE = false;
	private final static int     BULK_SIZE        = 100;
	private final static int     ALL_SIZE         = 1000;

	private final static String  TRACE_ON_SQL     =
			"ALTER SESSION SET EVENTS '10046 trace name context forever, level 12'";

	private final static Long   DEVICE_ID         = 0L;
	private final static Long   PROFILE_ID        = 1L;
	private final static Long   UPTIME_PARAM_ID   = 101L;
	private final static Long   IFNAME_PARAM_ID   = 103L;
	private final static Long   INOCT_PARAM_ID    = 104L;
	private final static String FAKE_NUM_VALUE    = "0";

	private Connection c = null;

	private void start() throws ClassNotFoundException, SQLException {
		Class.forName(CLASS_NAME);
		c = DriverManager.getConnection(USER_CONN, USER_NAME, USER_PASS);
		c.setAutoCommit(AUTO_COMMIT_MODE);
		CallableStatement st = c.prepareCall(TRACE_ON_SQL);
		try {
			st.execute();
		} finally {
			st.close();
		}
	}

	private void stop() throws SQLException  {
		if (c != null) {
			c.close();
		}
	}

	public static void main(String[] args) {
		Test t = new Test();
		try {
			try {
				t.start();
				t.test_plsql();

				// Тут будем вызывать тестовый код

			} finally {
				t.stop();
			}
		} catch (Exception e) {
			System.out.println(e.toString());
		}
	}
}

Для детального обзора продуктивности, будем применять трассировку event 10046 на сервере, с дальнейшей обработкой трейсов утилитой tkprof.

Самый неторопливый метод (plsql)

Начнем тестирование с особенно явственной обработки по одной записи. Помимо собственно оценки продуктивности, написание этого кода поможет нам отменнее разобраться с тем как именно будут обрабатываться данные.

PL/SQL-код

CREATE OR REPLACE package AIS.ae_monitoring as
    procedure    addValue( p_device    in  number
                         , p_profile   in  number
                         , p_param     in  number
                         , p_num       in  varchar2
                         , p_val       in  varchar2 );
end ae_monitoring;
/

CREATE OR REPLACE package body AIS.ae_monitoring as

    g_ifName_parameter    constant number default 103;
    g_default_policy      constant number default 1;
    g_uptime_policy       constant number default 2;
    g_threshold_policy    constant number default 3;
    g_increase_type       constant number default 1;
    g_decrease_type       constant number default 2;
    g_delta_type          constant number default 3;

    procedure addValue( p_device    in  number
                      , p_profile   in  number
                      , p_param     in  number
                      , p_num       in  varchar2
                      , p_val       in  varchar2 ) as
    cursor    c_res(p_type number) is
    select    r.id, r.name
    from      ae_resource r
    where     r.device_id = p_device
    and       r.res_num = p_num
    and       r.type_id = p_type
    and       r.start_date <= sysdate and sysdate <= nvl(r.end_date, sysdate   1);
    cursor    c_state(p_resid number) is
    select    s.value
    from      ae_state s
    where     s.res_id = p_resid
    and       s.param_id = p_param;
    l_resid   ae_resource.id%type            default null;
    l_resname ae_resource.name%type          default null;
    l_oldval  ae_state.value%type            default null;
    l_restype ae_profile_detail.type_id%type default null;
    l_owntype ae_resource_type.owner_id%type default null;
    l_owner   ae_resource.id%type            default null;
    l_policy  ae_state_policy.type_id%type   default null;
    l_polid   ae_state_policy.id%type        default null;
    l_count   number                         default 0;
    begin

      -- Получить тип источника
      select d.type_id, r.owner_id
      into   l_restype, l_owntype
      from   ae_profile_detail d
      inner  join ae_resource_type r on (r.id = d.type_id)
      where  d.profile_id = p_profile
      and    d.param_id = p_param;

      -- Получить ID обладателя
      if not l_owntype is null then
         select r.id into l_owner
         from   ae_resource r
         where  r.device_id = p_device
         and    r.type_id = l_owntype;
      end if;

      -- Обработать имя интерфейса
      if p_param = g_ifName_parameter then
         open c_res(l_restype);
         fetch c_res into l_resid, l_resname;
         if c_res%notfound or l_resname <> p_val then

            -- Закрыть ветхий источник интерфейса
            update ae_resource set end_date = sysdate
            where id = l_resid;

            -- Сделать новейший источник интерфейса
            insert into ae_resource(id, device_id, owner_id, type_id, res_num, name)
            values (ae_resource_seq.nextval, p_device, l_owner, l_restype, p_num, p_val);

         end if;
         close c_res;
         return;
      end if;

      -- Получить ID источника
      open c_res(l_restype);
      fetch c_res into l_resid, l_resname;
      if c_res%notfound then
         -- Если источник не обнаружен, сделать новейший источник интерфейса
         insert into ae_resource(id, device_id, owner_id, type_id, res_num, name)
         values (ae_resource_seq.nextval, p_device, l_owner, l_restype, p_num, p_val)
         returning id into l_resid;
      end if;

      -- Получить ветхое значение параметра
      open c_state(l_resid);
      fetch c_state into l_oldval;
      if c_state%notfound then
         l_oldval := null;
      end if;
      close c_state;

      -- Получить политику сохранения значений
      select l.type_id, l.id
      into   l_policy, l_polid
      from   ae_parameter p
      inner  join ae_domain d on (d.id = p.domain_id)
      inner  join ae_state_policy l on (l.id = d.policy_id)
      where  p.id = p_param;

      -- Получить число пересеченных порогов
      select count(*)
      into   l_count
      from   ae_threshold t
      where  t.policy_id = l_polid
      and (( t.type_id = g_increase_type and l_oldval <= t.value and p_val >= t.value ) or
           ( t.type_id = g_decrease_type and l_oldval >= t.value and p_val <= t.value ) or
           ( t.type_id = g_delta_type and abs(p_val - l_oldval) >= t.value ));

      -- Сберечь запись в ae_state_log в соответствии с политикой
      if l_oldval is null or l_count > 0 or
         ( l_policy = g_uptime_policy and p_val < l_oldval) or
         ( l_policy = g_default_policy and p_val <> l_oldval) then
         insert into ae_state_log(id, res_id, param_id, value)
         values (ae_state_log_seq.nextval, l_resid, p_param, decode(l_policy, g_uptime_policy, nvl(l_oldval, p_val), p_val));
      end if;

      -- Обновить ae_state
      update ae_state set value = p_val
                      ,   datetime = current_timestamp
      where  res_id = l_resid and param_id = p_param;
      if sql%rowcount = 0 then
         insert into ae_state(id, param_id, res_id, value)
         values (ae_state_seq.nextval, p_param, l_resid, p_val);
      end if;

      close c_res;
    exception
      when others then
        if c_res%isopen then close c_res; end if;
        if c_state%isopen then close c_state; end if;
        raise;
    end;

end ae_monitoring;
/
Java-код

	private final static String  ADD_VAL_SQL      = 
			"begin ae_monitoring.addValue(?,?,?,?,?); end;";

	private void test_plsql() throws SQLException {
		System.out.println("test_plsql:");
		CallableStatement st = c.prepareCall(ADD_VAL_SQL);
		Long timestamp = System.currentTimeMillis();
		Long uptime = 0L;
		Long inoct  = 0L;
		try {
			for (int i = 1; i <= ALL_SIZE; i  ) {

				// Передать uptime
				st.setLong(1,   DEVICE_ID);
				st.setLong(2,   PROFILE_ID);
				st.setLong(3,   UPTIME_PARAM_ID);
				st.setString(4, FAKE_NUM_VALUE);
				st.setString(5, uptime.toString());
				st.execute();

				// Передать имя интерфейса
				st.setLong(1,   DEVICE_ID);
				st.setLong(2,   PROFILE_ID);
				st.setLong(3,   IFNAME_PARAM_ID);
				st.setString(4, Integer.toString((i % 100)   1));
				st.setString(5, Integer.toString((i % 100)   1));
				st.execute();

				// Передать счетчик трафика
				st.setLong(1,   DEVICE_ID);
				st.setLong(2,   PROFILE_ID);
				st.setLong(3,   INOCT_PARAM_ID);
				st.setString(4, Integer.toString((i % 100)   1));
				st.setString(5, inoct.toString());
				st.execute();

				// Увеличить счетчики
				uptime  = 100L;
				if (uptime >= 1000) {
					uptime = 0L;
				}
				inoct  = 10L;
			}
		} finally {
			st.close();
		}
		Long delta_1 = System.currentTimeMillis() - timestamp;
		System.out.println((ALL_SIZE * 1000L) / delta_1);
		timestamp = System.currentTimeMillis();
		c.commit();
		Long delta_2 = System.currentTimeMillis() - timestamp;
		System.out.println(delta_2);
		System.out.println((ALL_SIZE * 1000L) / (delta_1 - delta_2));
	}

Видимо, что для правильной поддержки этим кодом списка интерфейсов, нужно, Дабы имя интерфейса передавалось в БД до остальных признаков источника.

Итоги тестирования абсолютно предсказуемы:

итоги

OVERALL TOTALS FOR ALL NON-RECURSIVE STATEMENTS

call count cpu elapsed disk query current rows
------- ------ -------- ---------- ---------- ---------- ---------- ----------
Parse 1 0.00 0.00 0 0 0 0
Execute 3000 4.23 4.13 7 102942 6615 3000
Fetch 0 0.00 0.00 0 0 0 0
------- ------ -------- ---------- ---------- ---------- ---------- ----------
total 3001 4.23 4.13 7 102942 6615 3000

Misses in library cache during parse: 1
Misses in library cache during execute: 1

Elapsed times include waiting on following events:
  Event waited on Times Max. Wait Total Waited
  ---------------------------------------- Waited ---------- ------------
  SQL*Net message to client3002 0.00 0.00
  SQL*Net message from client 3002 5.92 7.12
  latch: library cache 4 0.00 0.00
  log file sync 1 0.00 0.00

OVERALL TOTALS FOR ALL RECURSIVE STATEMENTS

call count cpu elapsed disk query current rows
------- ------ -------- ---------- ---------- ---------- ---------- ----------
Parse 69 0.00 0.00 0 0 0 0
Execute 17261 2.42 2.36 7 9042 6615 3160
Fetch 14000 0.38 0.37 0 93900 0 13899
------- ------ -------- ---------- ---------- ---------- ---------- ----------
total 31330 2.81 2.74 7 102942 6615 17059

Misses in library cache during parse: 10
Misses in library cache during execute: 10

Elapsed times include waiting on following events:
  Event waited on Times Max. Wait Total Waited
  ---------------------------------------- Waited ---------- ------------
  db file sequential read 7 0.00 0.00

Мы исполняем дюже огромное число запросов и тратим много времени на сетевое взаимодействие.

Используем массовую обработку (temporary)

Особенно коренной метод борьбы с убыточными затратами — переход к массовой обработке. Мы можем заранее сберечь комплект данных для обработки в какой-то таблице, а после этого обработать данные не по одной записи, а все сразу. Безусловно, для промежуточного хранения данных, дозволено применять обыкновенные таблицы, но применение для этих целей GTT больше выигрышно, в итоге снижения убыточных затрат на журналирование.

Для вставки данных во временную таблицу мы будем применять DML, а не вызовы хранимых процедур, что дозволит нам применять JDBC batch, для снижения убыточных затрат на сетевое взаимодействие.

При применении этого подхода, не требуется, Дабы имя интерфейса обрабатывалось до остальных его параметров. Довольно того, Дабы имена всех обрабатываемых интерфейсов присутствовали в обрабатываемом комплекте данных.

PL/SQL-код

CREATE OR REPLACE package AIS.ae_monitoring as
    procedure  saveValues;
end ae_monitoring;
/

CREATE OR REPLACE package body AIS.ae_monitoring as

    g_ifName_parameter    constant number default 103;
    g_default_policy      constant number default 1;
    g_uptime_policy       constant number default 2;
    g_threshold_policy    constant number default 3;
    g_increase_type       constant number default 1;
    g_decrease_type       constant number default 2;
    g_delta_type          constant number default 3;

    procedure saveValues as
    begin

      -- Сделать источник, если он отсутствует
      merge into ae_resource d
      using ( select t.id, t.device_id, t.num, t.value name, p.type_id, o.id owner_id
              from   ae_state_tmp t
              inner  join ae_profile_detail p on (p.profile_id = t.profile_id and p.param_id = t.param_id)
              inner  join ae_resource_type r on (r.id = p.type_id)
              left   join ae_resource o on (o.device_id = t.device_id and o.type_id = r.owner_id)
              where  t.param_id = g_ifName_parameter
            ) s
      on ( d.device_id = s.device_id and d.res_num = s.num and d.type_id = s.type_id and
           d.start_date <= sysdate and sysdate <= nvl(d.end_date, sysdate   1) )
      when matched then
        update set d.tmp_id = s.id
        where  d.name <> s.name
      when not matched then
        insert (id, device_id, owner_id, type_id, res_num, name)
        values (ae_resource_seq.nextval, s.device_id, s.owner_id, s.type_id, s.num, s.name);

      -- Добавить недостающие ae_resource
      insert into ae_resource(id, device_id, owner_id, type_id, res_num, name)
      select ae_resource_seq.nextval, t.device_id, o.id, p.type_id, t.num, t.value
      from   ae_state_tmp t
      inner  join ae_resource c on (c.tmp_id = t.id)
      inner  join ae_profile_detail p on (p.profile_id = t.profile_id and p.param_id = t.param_id)
      inner  join ae_resource_type r on (r.id = p.type_id)
      left   join ae_resource o on (o.device_id = t.device_id and o.type_id = r.owner_id);

      -- Закрыть устаревшие интерфейсы
      update ae_resource set end_date = sysdate
                         ,   tmp_id   = null
      where  tmp_id > 0;

      -- Сберечь записи в ae_state_log
      insert into ae_state_log(id, res_id, param_id, value)
      select ae_state_log_seq.nextval, id, param_id, value
      from ( select distinct r.id, t.param_id,
                    decode(l.type_id, g_uptime_policy, nvl(s.value, t.value), t.value) value
             from   ae_state_tmp t
             inner  join ae_profile_detail p on (p.profile_id = t.profile_id and p.param_id = t.param_id)
             inner  join ae_resource r on ( r.device_id = t.device_id and r.res_num = t.num and r.type_id = p.type_id and
                                            r.start_date <= sysdate and sysdate <= nvl(r.end_date, sysdate   1))
             left   join ae_state s on (s.res_id = r.id and s.param_id = t.param_id)
             inner  join ae_parameter a on (a.id = p.param_id)
             inner  join ae_domain d on (d.id = a.domain_id)
             inner  join ae_state_policy l on (l.id = d.policy_id)
             left   join ae_threshold h on (
                    h.policy_id = l.id and
                 (( h.type_id = g_increase_type and s.value <= h.value and t.value >= h.value ) or
                  ( h.type_id = g_decrease_type and s.value >= h.value and t.value <= h.value ) or
                  ( h.type_id = g_delta_type and abs(t.value - s.value) >= h.value )))
             where  ( s.id is null or not h.id is null
             or   ( l.type_id = g_uptime_policy and t.value < s.value )
             or   ( l.type_id = g_default_policy and t.value <> s.value ) )
             and    t.param_id <> g_ifName_parameter );

      -- Обновить ae_state
      merge into ae_state d
      using ( select t.param_id, t.value, r.id res_id
from   ae_state_tmp t
              inner  join ae_profile_detail p on (p.profile_id = t.profile_id and p.param_id = t.param_id)
              inner  join ae_resource r on ( r.device_id = t.device_id and r.res_num = t.num and r.type_id = p.type_id and
                                             r.start_date <= sysdate and sysdate <= nvl(r.end_date, sysdate   1))
              where  t.param_id <> g_ifName_parameter
            ) s
      on (d.res_id = s.res_id and d.param_id = s.param_id)
      when matched then
        update set d.value = s.value
               ,   d.datetime = current_timestamp
      when not matched then
        insert (id, param_id, res_id, value)
        values (ae_state_seq.nextval, s.param_id, s.res_id, s.value);

      -- Сберечь метаморфозы
      commit write nowait;
    end;

end ae_monitoring;
/
Java-код

	private final static int     BULK_SIZE        = 200;

	private final static String  INS_VAL_SQL      =
			"insert into ae_state_tmp(id, device_id, profile_id, param_id, num, value) values (?,?,?,?,?,?)";

	private final static String  SAVE_VALUES_SQL  = 
			"begin ae_monitoring.saveValues; end;";

	private void test_temporary() throws SQLException {
		System.out.println("test_temporary:");
		CallableStatement st = c.prepareCall(INS_VAL_SQL);
		Long timestamp = System.currentTimeMillis();
		Long uptime = 0L;
		Long inoct  = 0L;
		Long ix     = 1L;
		int  bulk   = BULK_SIZE; 
		try {
			for (int i = 1; i <= ALL_SIZE; i  ) {

				// Передать uptime
				st.setLong(1,   ix  );
				st.setLong(2,   DEVICE_ID);
				st.setLong(3,   PROFILE_ID);
				st.setLong(4,   UPTIME_PARAM_ID);
				st.setString(5, FAKE_NUM_VALUE);
				st.setString(6, uptime.toString());
				st.addBatch();

				// Передать имя интерфейса
				st.setLong(1,   ix  );
				st.setLong(2,   DEVICE_ID);
				st.setLong(3,   PROFILE_ID);
				st.setLong(4,   IFNAME_PARAM_ID);
				st.setString(5, Integer.toString((i % 100)   1));
				st.setString(6, Integer.toString((i % 100)   1));
				st.addBatch();

				// Передать счетчик трафика
				st.setLong(1,   ix  );
				st.setLong(2,   DEVICE_ID);
				st.setLong(3,   PROFILE_ID);
				st.setLong(4,   INOCT_PARAM_ID);
				st.setString(5, Integer.toString((i % 100)   1));
				st.setString(6, inoct.toString());
				st.addBatch();

				if (--bulk <= 0) {
					st.executeBatch();
					bulk = BULK_SIZE;
				}

				// Увеличить счетчики
				uptime  = 100L;
				if (uptime >= 1000) {
					uptime = 0L;
				}
				inoct  = 10L;
			}
			if (bulk < BULK_SIZE) {
				st.executeBatch();
			}
		} finally {
			st.close();
		}
		Long delta_1 = System.currentTimeMillis() - timestamp;
		System.out.println((ALL_SIZE * 1000L) / delta_1);
		timestamp = System.currentTimeMillis();
		st = c.prepareCall(SAVE_VALUES_SQL);
		timestamp = System.currentTimeMillis();
		try {
			st.execute();
		} finally {
			st.close();
		}
		Long delta_2 = System.currentTimeMillis() - timestamp;
		System.out.println(delta_2);
		System.out.println((ALL_SIZE * 1000L) / (delta_1 - delta_2));
	}

Запускаем данный код на выполнение и получаем:

java.sql.SQLException: ORA-30926: немыслимо получить устойчивый комплект строк в начальных таблицах
ORA-06512: на  "AIS.AE_MONITORING", line 205
ORA-06512: на  line 1

Если подумать, повод этой ошибки становится внятна. Если комплект обрабатываемых данных содержит двойственные данные для какой-то переменной (мы поспели прочитать ее несколько раз), появляется задача. При типичном функционировании SNMP-мониторинга, такой обстановки появляться не должно, но мы обязаны предусмотреть что-то, Дабы не допустить падения приложения, если она все-таки возникнет.

Первое, что приходит в голову — хранение агрегированных данных по всякой переменной. Мы будем добавлять новую запись если ее еще нет либо обновлять существующую, записывай в нее новое значение:

Java-код

	private final static int     BULK_SIZE        = 200;

	private final static String  MERGE_VAL_SQL    =
			"merge into ae_state_tmp d "  
			"using ( select ? id,? device_id,? profile_id,? param_id,? num,? value "  
			"        from dual"  
			"      ) s "  
			"on ( d.device_id = s.device_id and d.profile_id = s.profile_id and "  
			"     d.param_id = s.param_id and d.num = s.num ) "  
			"when matched then "  
			"  update set d.value = s.value "  
			"when not matched then "  
			"  insert (id, device_id, profile_id, param_id, num, value) "  
			"  values (s.id, s.device_id, s.profile_id, s.param_id, s.num, s.value)";

	private final static String  SAVE_VALUES_SQL  = 
			"begin ae_monitoring.saveValues; end;";

	private void test_temporary() throws SQLException {
		System.out.println("test_temporary:");
		CallableStatement st = c.prepareCall(MERGE_VAL_SQL);
		...

Сейчас мы имеем дело с той-же задачей, но на стадии выполнения batch-запроса:

java.sql.BatchUpdateException: ORA-00600: код внутренней ошибки, доводы: [6704], [2], [0], [6301696], [], [], [], [], [], [], [], []

Доводится отказаться от batch:

private final static int     BULK_SIZE        = 1;

Итоги приметно улучшились:

итоги

OVERALL TOTALS FOR ALL NON-RECURSIVE STATEMENTS

call count cpu elapsed disk query current rows
------- ------ -------- ---------- ---------- ---------- ---------- ----------
Parse 2 0.00 0.00 0 0 0 0
Execute 1001 1.02 1.01 0 9002 3503 3001
Fetch 0 0.00 0.00 0 0 0 0
------- ------ -------- ---------- ---------- ---------- ---------- ----------
total 1003 1.02 1.01 0 9002 3503 3001

Misses in library cache during parse: 1
Misses in library cache during execute: 1

Elapsed times include waiting on following events:
  Event waited on Times Max. Wait Total Waited
  ---------------------------------------- Waited ---------- ------------
  SQL*Net message to client 1002 0.00 0.00
  SQL*Net message from client 1002 0.00 0.41
  log file sync 1 0.00 0.00

OVERALL TOTALS FOR ALL RECURSIVE STATEMENTS

call count cpu elapsed disk query current rows
------- ------ -------- ---------- ---------- ---------- ---------- ----------
Parse 23 0.01 0.01 0 1 0 0
Execute 23 0.21 0.21 43 29392 348 111
Fetch 11 0.00 0.00 0 27 0 10
------- ------ -------- ---------- ---------- ---------- ---------- ----------
total 57 0.22 0.23 43 29420 348 121

Misses in library cache during parse: 8
Misses in library cache during execute: 6

Elapsed times include waiting on following events:
  Event waited on Times Max. Wait Total Waited
  ---------------------------------------- Waited ---------- ------------
  db file sequential read 41 0.01 0.01
  db file scattered read 1 0.00 0.00

Альтернативный подход (distinct)

Мы можем упростить себе жизнь на этапе вставки данных (заодно избавившись от вероятности ORA-600), добавив группировку на этапе массовой обработки. В этом случае, мы можем не скромничать с BULK_SIZE, выставив его по максимуму.

PL/SQL-код

CREATE OR REPLACE package AIS.ae_monitoring as
    procedure  saveValuesDistinct;
end ae_monitoring;
/

CREATE OR REPLACE package body AIS.ae_monitoring as

    g_ifName_parameter    constant number default 103;
    g_default_policy      constant number default 1;
    g_uptime_policy       constant number default 2;
    g_threshold_policy    constant number default 3;
    g_increase_type       constant number default 1;
    g_decrease_type       constant number default 2;
    g_delta_type          constant number default 3;

    procedure saveValuesDistinct as
    begin

      -- Сделать источник, если он отсутствует
      merge into ae_resource d
      using ( select t.id, t.device_id, t.num, t.value name, p.type_id, o.id owner_id
              from   ( select device_id, profile_id, param_id, num
                       ,      max(id) keep (dense_rank last order by datetime) id
                       ,      max(value) keep (dense_rank last order by datetime) value
                       ,      max(datetime) datetime
                       from   ae_state_tmp
                       group  by device_id, profile_id, param_id, num
                     ) t
              inner  join ae_profile_detail p on (p.profile_id = t.profile_id and p.param_id = t.param_id)
              inner  join ae_resource_type r on (r.id = p.type_id)
              left   join ae_resource o on (o.device_id = t.device_id and o.type_id = r.owner_id)
              where  t.param_id = g_ifName_parameter
            ) s
      on ( d.device_id = s.device_id and d.res_num = s.num and d.type_id = s.type_id and
           d.start_date <= sysdate and sysdate <= nvl(d.end_date, sysdate   1) )
      when matched then
        update set d.tmp_id = s.id
        where  d.name <> s.name
      when not matched then
        insert (id, device_id, owner_id, type_id, res_num, name)
        values (ae_resource_seq.nextval, s.device_id, s.owner_id, s.type_id, s.num, s.name);

      -- Добавить недостающие ae_resource
      insert into ae_resource(id, device_id, owner_id, type_id, res_num, name)
      select ae_resource_seq.nextval, t.device_id, o.id, p.type_id, t.num, t.value
      from   ( select device_id, profile_id, param_id, num
               ,      max(id) keep (dense_rank last order by datetime) id
               ,      max(value) keep (dense_rank last order by datetime) value
               ,      max(datetime) datetime
               from   ae_state_tmp
               group  by device_id, profile_id, param_id, num
             ) t
      inner  join ae_resource c on (c.tmp_id = t.id)
      inner  join ae_profile_detail p on (p.profile_id = t.profile_id and p.param_id = t.param_id)
      inner  join ae_resource_type r on (r.id = p.type_id)
      left   join ae_resource o on (o.device_id = t.device_id and o.type_id = r.owner_id);

      -- Закрыть устаревшие интерфейсы
      update ae_resource set end_date = sysdate
                         ,   tmp_id   = null
      where  tmp_id > 0;

      -- Сберечь записи в ae_state_log
      insert into ae_state_log(id, res_id, param_id, value)
      select ae_state_log_seq.nextval, id, param_id, value
      from ( select distinct r.id, t.param_id,
                    decode(l.type_id, g_uptime_policy, nvl(s.value, t.value), t.value) value
             from   ( select device_id, profile_id, param_id, num
                      ,      max(id) keep (dense_rank last order by datetime) id
                      ,      max(value) keep (dense_rank last order by datetime) value
                      ,      max(datetime) datetime
                      from   ae_state_tmp
                      group  by device_id, profile_id, param_id, num
                    ) t
             inner  join ae_profile_detail p on (p.profile_id = t.profile_id and p.param_id = t.param_id)
             inner  join ae_resource r on ( r.device_id = t.device_id and r.res_num = t.num and r.type_id = p.type_id and
                                            r.start_date <= sysdate and sysdate <= nvl(r.end_date, sysdate   1))
             left   join ae_state s on (s.res_id = r.id and s.param_id = t.param_id)
             inner  join ae_parameter a on (a.id = p.param_id)
             inner  join ae_domain d on (d.id = a.domain_id)
             inner  join ae_state_policy l on (l.id = d.policy_id)
             left   join ae_threshold h on (
                    h.policy_id = l.id and
                 (( h.type_id = g_increase_type and s.value <= h.value and t.value >= h.value ) or
                  ( h.type_id = g_decrease_type and s.value >= h.value and t.value <= h.value ) or
                  ( h.type_id = g_delta_type and abs(t.value - s.value) >= h.value )))
             where  ( s.id is null or not h.id is null
             or   ( l.type_id = g_uptime_policy and t.value < s.value )
             or   ( l.type_id = g_default_policy and t.value <> s.value ) )
             and    t.param_id <> g_ifName_parameter );

      -- Обновить ae_state
      merge into ae_state d
      using ( select t.param_id, t.value, r.id res_id
              from   ( select device_id, profile_id, param_id, num
                      ,      max(id) keep (dense_rank last order by datetime) id
                      ,      max(value) keep (dense_rank last order by datetime) value
                      ,      max(datetime) datetime
                      from   ae_state_tmp
                      group  by device_id, profile_id, param_id, num
                    ) t
              inner  join ae_profile_detail p on (p.profile_id = t.profile_id and p.param_id = t.param_id)
              inner  join ae_resource r on ( r.device_id = t.device_id and r.res_num = t.num and r.type_id = p.type_id and
                                             r.start_date <= sysdate and sysdate <= nvl(r.end_date, sysdate   1))
              where  t.param_id <> g_ifName_parameter
            ) s
      on (d.res_id = s.res_id and d.param_id = s.param_id)
      when matched then
        update set d.value = s.value
               ,   d.datetime = current_timestamp
      when not matched then
        insert (id, param_id, res_id, value)
        values (ae_state_seq.nextval, s.param_id, s.res_id, s.value);

      -- Сберечь метаморфозы
      commit write nowait;
    end;

end ae_monitoring;
/
Java-код

	private final static int     BULK_SIZE        = 200;

	private final static String  INS_VAL_SQL      =
			"insert into ae_state_tmp(id, device_id, profile_id, param_id, num, value) values (?,?,?,?,?,?)";

	private final static String  SAVE_VALUES_DISTINCT_SQL  = 
			"begin ae_monitoring.saveValuesDistinct; end;";

	private void test_temporary_distinct() throws SQLException {
		System.out.println("test_temporary:");
		CallableStatement st = c.prepareCall(INS_VAL_SQL);
		Long timestamp = System.currentTimeMillis();
		Long uptime = 0L;
		Long inoct  = 0L;
		Long ix     = 1L;
		int  bulk   = BULK_SIZE; 
		try {
			for (int i = 1; i <= ALL_SIZE; i  ) {

				// Передать uptime
				st.setLong(1,   ix  );
				st.setLong(2,   DEVICE_ID);
				st.setLong(3,   PROFILE_ID);
				st.setLong(4,   UPTIME_PARAM_ID);
				st.setString(5, FAKE_NUM_VALUE);
				st.setString(6, uptime.toString());
				st.addBatch();

				// Передать имя интерфейса
				st.setLong(1,   ix  );
				st.setLong(2,   DEVICE_ID);
				st.setLong(3,   PROFILE_ID);
				st.setLong(4,   IFNAME_PARAM_ID);
				st.setString(5, Integer.toString((i % 100)   1));
				st.setString(6, Integer.toString((i % 100)   1));
				st.addBatch();

				// Передать счетчик трафика
				st.setLong(1,   ix  );
				st.setLong(2,   DEVICE_ID);
				st.setLong(3,   PROFILE_ID);
				st.setLong(4,   INOCT_PARAM_ID);
				st.setString(5, Integer.toString((i % 100)   1));
				st.setString(6, inoct.toString());
				st.addBatch();

				if (--bulk <= 0) {
					st.executeBatch();
					bulk = BULK_SIZE;
				}

				// Увеличить счетчики
				uptime  = 100L;
				if (uptime >= 1000) {
					uptime = 0L;
				}
				inoct  = 10L;
			}
			if (bulk < BULK_SIZE) {
				st.executeBatch();
			}
		} finally {
			st.close();
		}
		Long delta_1 = System.currentTimeMillis() - timestamp;
		System.out.println((ALL_SIZE * 1000L) / delta_1);
		timestamp = System.currentTimeMillis();
		st = c.prepareCall(SAVE_VALUES_DISTINCT_SQL);
		timestamp = System.currentTimeMillis();
		try {
			st.execute();
		} finally {
			st.close();
		}
		Long delta_2 = System.currentTimeMillis() - timestamp;
		System.out.println(delta_2);
		System.out.println((ALL_SIZE * 1000L) / (delta_1 - delta_2));
	}

Запускаем и глядим итоги:

итоги

OVERALL TOTALS FOR ALL NON-RECURSIVE STATEMENTS

call count cpu elapsed disk query current rows
------- ------ -------- ---------- ---------- ---------- ---------- ----------
Parse 2 0.00 0.00 0 0 0 0
Execute 1001 0.36 0.33 0 96 6616 3001
Fetch 0 0.00 0.00 0 0 0 0
------- ------ -------- ---------- ---------- ---------- ---------- ----------
total 1003 0.36 0.33 0 96 6616 3001

Misses in library cache during parse: 2
Misses in library cache during execute: 1

Elapsed times include waiting on following events:
  Event waited on Times Max. Wait Total Waited
  ---------------------------------------- Waited ---------- ------------
  SQL*Net message to client 1002 0.00 0.00
  SQL*Net message from client 1002 0.00 0.41
  log file sync 1 0.00 0.00

OVERALL TOTALS FOR ALL RECURSIVE STATEMENTS

call count cpu elapsed disk query current rows
------- ------ -------- ---------- ---------- ---------- ---------- ----------
Parse 30 0.01 0.01 0 3 0 0
Execute 30 0.41 0.40 3 48932 1104 218
Fetch 8 0.00 0.00 0 176 0 8
------- ------ -------- ---------- ---------- ---------- ---------- ----------
total 68 0.44 0.43 3 49111 1104 226

Misses in library cache during parse: 8
Misses in library cache during execute: 7

Elapsed times include waiting on following events:
  Event waited on Times Max. Wait Total Waited
  ---------------------------------------- Waited ---------- ------------
  db file sequential read 3 0.00 0.00

Как и ожидалось, издержки на вставку данных уменьшились, 0.49 0.49 1 129930 0 13909 ——- —— ——– ———- ———- ———- ———- ———- total 31360 3.48 3.43 6 135976 6610 17069 Misses in library cache during parse: 8 Misses in library cache during execute: 11 Elapsed times include waiting on following events: Event waited on Times Max. Wait Total Waited —————————————- Waited ———- ———— db file sequential read 6 0.00 0.00

Улучшились показатели, связанные с передачей данных с заказчика, но несколько усложнилась обработка данных, по сопоставлению с вариантом plsql.

Используем коллекции по разумному (bulk)

Видимо, что мы используем потенциал массивов, передаваемых с заказчика не в полной мере. Было бы недурно передавать массив непринужденно в SQL-запросы, Дабы задействовать массовую обработку, но как это сделать? Мы не можем применять BULK COLLECT, от того что наши запросы слишком трудные для этого. К счастью, мы можем обернуть коллекцию в TABLE:

PL/SQL-код

CREATE OR REPLACE package AIS.ae_monitoring as
    procedure  saveValues( p_tab       in  ae_state_tab );
end ae_monitoring;
/

CREATE OR REPLACE package body AIS.ae_monitoring as

    g_ifName_parameter    constant number default 103;
    g_default_policy      constant number default 1;
    g_uptime_policy       constant number default 2;
    g_threshold_policy    constant number default 3;
    g_increase_type       constant number default 1;
    g_decrease_type       constant number default 2;
    g_delta_type          constant number default 3;

    procedure  saveValues( p_tab in  ae_state_tab ) as
    begin

      -- Сделать источник, если он отсутствует
      merge into ae_resource d
      using ( select t.device_id, t.num, t.value name, p.type_id, o.id owner_id
              from   ( select device_id, profile_id, param_id, num
                       ,      max(value) value
                       from   table( p_tab )
                       group  by device_id, profile_id, param_id, num
                     ) t
              inner  join ae_profile_detail p on (p.profile_id = t.profile_id and p.param_id = t.param_id)
              inner  join ae_resource_type r on (r.id = p.type_id)
              left   join ae_resource o on (o.device_id = t.device_id and o.type_id = r.owner_id)
              where  t.param_id = g_ifName_parameter
            ) s
      on ( d.device_id = s.device_id and d.res_num = s.num and d.type_id = s.type_id and
           d.start_date <= sysdate and sysdate <= nvl(d.end_date, sysdate   1) )
      when not matched then
        insert (id, device_id, owner_id, type_id, res_num, name)
        values (ae_resource_seq.nextval, s.device_id, s.owner_id, s.type_id, s.num, s.name);

      -- Сберечь записи в ae_state_log
      insert into ae_state_log(id, res_id, param_id, value)
      select ae_state_log_seq.nextval, id, param_id, value
      from ( select distinct r.id, t.param_id,
                    decode(l.type_id, g_uptime_policy, nvl(s.value, t.value), t.value) value
             from   ( select device_id, profile_id, param_id, num
                       ,      max(value) value
                       from   table( p_tab )
                       group  by device_id, profile_id, param_id, num
                     ) t
             inner  join ae_profile_detail p on (p.profile_id = t.profile_id and p.param_id = t.param_id)
             inner  join ae_resource r on ( r.device_id = t.device_id and r.res_num = t.num and r.type_id = p.type_id and
                                            r.start_date <= sysdate and sysdate <= nvl(r.end_date, sysdate   1))
             left   join ae_state s on (s.res_id = r.id and s.param_id = t.param_id)
             inner  join ae_parameter a on (a.id = p.param_id)
             inner  join ae_domain d on (d.id = a.domain_id)
             inner  join ae_state_policy l on (l.id = d.policy_id)
             left   join ae_threshold h on (
                    h.policy_id = l.id and
                 (( h.type_id = g_increase_type and s.value <= h.value and t.value >= h.value ) or
                  ( h.type_id = g_decrease_type and s.value >= h.value and t.value <= h.value ) or
                  ( h.type_id = g_delta_type and abs(t.value - s.value) >= h.value )))
             where  ( s.id is null or not h.id is null
             or   ( l.type_id = g_uptime_policy and t.value < s.value )
             or   ( l.type_id = g_default_policy and t.value <> s.value ) )
             and    t.param_id <> g_ifName_parameter );

      -- Обновить ae_state
      merge into ae_state d
      using ( select t.param_id, t.value, r.id res_id
              from   ( select device_id, profile_id, param_id, num
                       ,      max(value) value
                       from   table( p_tab )
                       group  by device_id, profile_id, param_id, num
                     ) t
              inner  join ae_profile_detail p on (p.profile_id = t.profile_id and p.param_id = t.param_id)
              inner  join ae_resource r on ( r.device_id = t.device_id and r.res_num = t.num and r.type_id = p.type_id and
                                             r.start_date <= sysdate and sysdate <= nvl(r.end_date, sysdate   1))
              where  t.param_id <> g_ifName_parameter
            ) s
      on (d.res_id = s.res_id and d.param_id = s.param_id)
      when matched then
        update set d.value = s.value
               ,   d.datetime = current_timestamp
      when not matched then
        insert (id, param_id, res_id, value)
        values (ae_state_seq.nextval, s.param_id, s.res_id, s.value);

      -- Сберечь метаморфозы
      commit write nowait;
    end;

end ae_monitoring;
/
Java-код

	private final static String  BULK_VALUES_SQL  = 
			"begin ae_monitoring.saveValues(?); end;";

	private void test_bulk() throws SQLException {
		System.out.println("test_bulk:");
		OracleCallableStatement st = (OracleCallableStatement)c.prepareCall(BULK_VALUES_SQL);
		int oracleId = CharacterSet.CL8MSWIN1251_CHARSET;
		CharacterSet charSet = CharacterSet.make(oracleId);		
		Long timestamp = System.currentTimeMillis();
		Long uptime = 0L;
		Long inoct  = 0L;
		RecType r[] = new RecType[ALL_SIZE * 3]; 
		int ix      = 0;
		for (int i = 1; i <= ALL_SIZE; i  ) {

			// Передать uptime
			r[ix  ] = new RecType(
					new NUMBER(DEVICE_ID),
					new NUMBER(PROFILE_ID),
					new NUMBER(UPTIME_PARAM_ID),
					new CHAR(FAKE_NUM_VALUE, charSet),
					new CHAR(uptime.toString(), charSet));

			// Передать имя интерфейса
			r[ix  ] = new RecType(
					new NUMBER(DEVICE_ID),
					new NUMBER(PROFILE_ID),
					new NUMBER(IFNAME_PARAM_ID),
					new CHAR(Integer.toString((i % 100)   1), charSet),
					new CHAR(Integer.toString((i % 100)   1), charSet));

			// Передать счетчик трафика
			r[ix  ] = new RecType(
					new NUMBER(DEVICE_ID),
					new NUMBER(PROFILE_ID),
					new NUMBER(INOCT_PARAM_ID),
					new CHAR(Integer.toString((i % 100)   1), charSet),
					new CHAR(inoct.toString(), charSet));

			// Увеличить счетчики
			uptime  = 100L;
			if (uptime >= 1000) {
				uptime = 0L;
			}
			inoct  = 10L;
		}
		RecTab t = new RecTab(r);
		try {
			st.setORAData(1, t);
			st.execute();
		} finally {
			st.close();
		}
		System.out.println((ALL_SIZE * 1000L) / (System.currentTimeMillis() - timestamp));
	}

Итог говорит сам за себя:

итоги

OVERALL TOTALS FOR ALL NON-RECURSIVE STATEMENTS

call count cpu elapsed disk query current rows
------- ------ -------- ---------- ---------- ---------- ---------- ----------
Parse 4 0.00 0.00 0 0 0 0
Execute 4 0.20 0.20 4 696 1095 3
Fetch 1 0.00 0.00 0 9 0 1
------- ------ -------- ---------- ---------- ---------- ---------- ----------
total 9 0.20 0.20 4 705 1095 4

Misses in library cache during parse: 0

Elapsed times include waiting on following events:
  Event waited on Times Max. Wait Total Waited
  ---------------------------------------- Waited ---------- ------------
  SQL*Net message to client 6 0.00 0.00
  SQL*Net message from client 6 0.10 0.19
  SQL*Net more data from client 41 0.00 0.00

OVERALL TOTALS FOR ALL RECURSIVE STATEMENTS

call count cpu elapsed disk query current rows
------- ------ -------- ---------- ---------- ---------- ---------- ----------
Parse 30 0.00 0.00 0 0 0 0
Execute 38 0.18 0.17 4 591 1095 217
Fetch 46 0.00 0.00 0 96 0 30
------- ------ -------- ---------- ---------- ---------- ---------- ----------
total 114 0.18 0.18 4 687 1095 247

Misses in library cache during parse: 7
Misses in library cache during execute: 7

Elapsed times include waiting on following events:
  Event waited on Times Max. Wait Total Waited
  ---------------------------------------- Waited ---------- ------------
  db file sequential read 4 0.00 0.00

Итоги

Как показывают итоги тестирования, непосредственная передача массивов в Oracle из клиентского кода (bulk) разрешает добиться наилучшей продуктивности. Варианты с применением GTT (temporary, distinct) не крепко ему уступают в плане продуктивности, но гораздо проще, с точки зрения Java-кода. Вариант temporary, помимо того, дает вероятность пронаблюдать ORA-600, при применении batch и неудачном расположении звезд.

Какой именно подход применять для обработки данных — решать вам. Итоги тестирования выложены на GitHub.

 

Источник: programmingmaster.ru

Оставить комментарий
Форум phpBB, русская поддержка форума phpBB
Рейтинг@Mail.ru 2008 - 2017 © BB3x.ru - русская поддержка форума phpBB