Для преобразования данных

Преобразователи — это промежуточные шаги графа. Преобразователи получают данные через подключенные входные порты, обрабатывают их указанным пользователем способом и отправляют через подключенные выходные порты в следующие шаги.

Список шагов для преобразования данных:

Sort - сортирует записи

Filter - фильтрует записи

Gather - собирает записи из разныых потоков в один

Copy - копирует записи

Concat - объединяет записи

Map - пользовательский алгоритм обработки

Dedup - устраняет копии по ключу

Normalizer - преобразует запись с несколькими полями, разбивая её на несколько записей с одним полем (нормальзует запись)

DataIntersection - объединяет отсортированные потоки данных, может применять преобразование

Sort

Sort сортирует полученные записи в соответствии с указанным ключом сортировки и копирует каждую из них на все подключенные выходные порты. Позволяет использовать несколько параллельных потоков для сортировки больших данных.

Порты Sort:

Тип портаНомерОбязательныйОписаниеМетаданные
Input0даДля входящего потока записейОдинаковые метаданные на входных и выходных портах
Output0даДля отсортированных записей
1-nнетДля отсортированных записей

Атрибуты Sort:

АтрибутОбязательныйОписаниеВозможные значения
sortKeyдаИмена полей и порядок сортировкиsortKey="x_coord(a); y_coord(d)"
sortInMemoryнетПри sortInMemory="true" выполняется внутренняя сортировка.sortInMemory="false" (по умолчанию)

Пример. Сортировка данных.

Входные записи содержат имена файлов и их размер. Нужно отсортировать файлы по размеру, начиная с самого большого (descending – по убыванию). Метаданные содержат поля «FileName», «FileSize».

Входящие записи:

FileNameFileSize
file.txt2048
file.docx1048576
file.xml65536

Решение:

Ключ сортировки: FileSortKey="FileSize(d)"

Исходящие записи:

FileNameFileSize
file.docx1048576
file.xml65536
file.txt2048

Filter

Filter фильтрует входные данные в соответствии с логическим выражением. Отправляет все записи, соответствующие выражению фильтра, в первый выходной порт и все отклоненные записи во второй выходной порт.

Порты Filter:

Тип портаНомерОбязательныйОписаниеМетаданные
Input0даДля входящего потока записейОдинаковые метаданные на входных и выходных портах
Output0даДля отфильтрованных записей
1нетДля отклонённых записей

Атрибуты Filter:

АтрибутОбязательныйОписаниеВозможные значения
filterExpressionдаВыражение, по которому фильтруются записи. Для записи преобразования используется JavaScript. Возвращает логическое значение.
<attr name="filterExpression">
    <![CDATA[ 
        $in[0].count != 177 && $in[0].product == «карандаш» 
    ]]>
</attr>

Пример. Фильтрация данных.

Входные данные содержат данные о продуктах, проданных в прошлом году. Нужно узнать данные только по карандашам. Метаданные содержат поля Product, Count и Location.

Входящие записи:

ProductCountLocation
карандаш1553екатеринбург
бумага6475новгород
ручка598владикавказ
карандаш177омск
карандаш239волгоград
бумага19казань
ластик53ростов

Решение:

Выражение для фильтрации: $in[0].product == «карандаш»

Исходящие записи:

ProductCountLocation
карандаш1553екатеринбург
карандаш177омск
карандаш239волгоград

Gather

Gather собирает записи со всех входящих портов и отправляет в порядке получения на все выходные порты. Порядок получения записей не зависит от порядка входных портов. Этот шаг соблюдает порядок записей в потоках, но не соблюдает порядок потоков. На выходе получается список записей в непредсказуемом порядке. Порядок записей на разных выходах будет одинаков. Шаг не имеет атрибутов.

Порты Gather:

Тип портаНомерОбязательныйОписаниеМетаданные
Input0даДля входящего потока записейОдинаковые метаданные на входных и выходных портах
1-nнетДля входящего потока записей
Output0даДля отфильтрованных записей
1-nнетДля отклонённых записей

Пример. Сбор записей с нскольких входных портов.

Нужно собрать записи с нескольких входов. Метаданные содержат одно поле «id».

Входящие записи:

Gather input

Решение:

Описание метаданных:

    <Global>
        <Metadata id="ObjectWithPos">
                <Field name="id" type="int"/>
        </Metadata>
    </Global>

Объявление шага:


<Node id="x" guiX="250" guiY="220" guiName="Gather" type="Gather"/>

Исходящие записи:

Gather output

Copy

Copy получает записи через один входной порт и копирует каждую из них на все подключенные выходные порты. Шаг не имеет атрибутов.

Порты Copy:

Тип портаНомерОбязательныйОписаниеМетаданные
Input0даДля входящего потока записейЛюбые
Output0даДля скопированных записейКак у Input 0
1-nнетКак у Output 0

Пример. Копирование данных.

Нужно скопировать записи с метаданными «carID» и «mark» в три потока.

Входящие записи:

порт 0:

carIDmark
145mercedes
856toyota
245chevrolet

Решение:

Для копирования в несколько потоков нужно подключить Copy несколько выходных портов. Записи на всех выходных портах будут идти в одинаковом порядке.

Исходящие записи:

порт 0:

carIDmark
145mercedes
856toyota
245chevrolet

порт 1:

carIDmark
145mercedes
856toyota
245chevrolet

порт 2:

carIDmark
145mercedes
856toyota
245chevrolet

Concat

Concat получает записи, поступившие из первого входного порта, отправляет их на общий выходной порт и добавляет к ним записи, из остальных входных портов. Если шаг имеет более двух входных портов, записи принимаются и отправляются на выход в соответствии с порядком входных портов. Если некоторые входные порты не содержат записей, такие порты пропускается. Шаг не имеет атрибутов.

Порты Concat:

Тип портаНомерОбязательныйОписаниеМетаданные
Input0даДля входящего потока записейЛюбые
1-nнетКак у Input 0
Output0даДля объединенных записей

Пример. Объединение записей.

Нужно объединить записи. Поданные на вход метаданные имеют поля «flower», «color».

Входящие записи:

порт 0:

flowercolor
маккрасный
ромашкабелый
василекголубой

порт 1:

flowercolor
розасиреневый
лилиярозовый

порт 2:

flowercolor
подсолнухжелтый
анемонвишневый
гипсофилазеленый

Решение:

После конкатенации будут получены следующие записи.

Исходящие записи:

порт 0:

flowercolor
маккрасный
ромашкабелый
василекголубой
розасиреневый
лилиярозовый
подсолнухжелтый
анемонвишневый
гипсофилазеленый

Map

Map позволяет написать пользовательский алгоритм обработки данных, используя внутренний язык системы. Можно по своему усмотрению трансформировать данные между входным и выходными портами, если предложенных шагов не хватает для выполнения необходимых преобразований данных.

Имеет единственный входной порт и как минимум один выходной. Может отправлять разные записи в разные выходные порты или даже отправлять одну и ту же запись на несколько выходных портов. Работает только с одним элементом, сохраняет порядок записей.

С помощью Map можно:

  • удалить ненужные значения полей
  • проверить записи с помощью функций или регулярных выражений
  • создать новые или изменить существующие поля
  • преобразовать типы данных

Порты Map:

Тип портаНомерОбязательныйОписаниеМетаданные
Input0даДля входящего потока записейЛюбые
Output0даДля преобразованных записей
1-nнет

Атрибуты Map:

АтрибутОбязательныйОписаниеВозможные значения
transform

1

Алгоритм преобразования данных
transformURL 1Имя внешнего файла, в котором описано преобразввание
charsetнетКодировка внешнего файла, определяющего преобразование

<attr name="transform">
    <![CDATA[
        function transform() {
            $out[0].person = $in[0].name.toString() + "_" + $in[0].surname.toString();
            $out[1].person = $in[0].name.toString().toUpperCase() + " " + $in[0].surname.toString().toUpperCase();
            return ALL;
        }
    ]]>
</attr>
1

Один из атрибутов должен быть указан.

Пример. Обработка данных с помощью Map.

Нужно получить произведение и сумму полученных на вход данных и отправить результаты на разные выходные порты. Входные метаданные содержат поля a, b. Нужно отправить результат перемножения a*b на первый порт, а результат сложения a+b на второй порт.

Входящие записи:

ab
56
24
12

Решение:

Преобразование:


<Attr name="transform"><![CDATA[
    pub fn transform() -> OutPort {
        let res_mul = $in[0].a * $in[0].b;
        let res_add = $in[0].a + $in[0].b;

        $out[0].res_mul = res_mul;
        $out[1].res_add = res_add;

        return ALL;
    }

    ]]>
</Attr>

Исходящие записи:

порт 0:

multiplied
30
6
2

порт 1:

added
11
5
3

Dedup

Dedup удаляет повторяющиеся записи по ключу.

Порты Dedup:

Тип портаНомерОбязательныйОписаниеМетаданные
Input0даДля входных записейЛюбые
Output0даДля дедуплицированных записей.Как у Input 0
Output1нетДля дубликатов записей.Как у Input 0

Атрибуты Dedup:

АтрибутОбязательныйОписаниеВозможные значения
dedupKeyнетКлюч, по которому производится дедупликация (удаление дубликатов) записей. Если ключ не установлен, весь входной поток рассматривается как одна группа и удаляются только полные дубликаты (по всем полям записи).dedupKey="x_coord"
sortedнетПризнак, ортированы ли входные записи.1sorted="true" по умолчанию
dupAmountнетЕсли dedupKey задан, отбирается только указанное в dupAmount количество записей с одинаковыми значениями в полях, указанных в качестве ключа.dupAmount="1" по умолчанию
equalNULLнетПо умолчанию записи с нулевыми значениями ключевых полей считаются равными. Если equalNULL="false", записи считаются разными.equalNULL="true" по умолчанию
1

Dedup может обрабатывать данные в двух режимах: сортированном и несортированном.

Сортированный и несортированный ввод

Если вы хотите обработать огромное количество записей с множеством различных значений ключей, сначала отсортируйте записи, а затем используйте Dedup с отсортированными входными данными.

Если ваши данные содержат несколько разных значений ключей, вы можете использовать несортированный ввод.

Дедуплицирование несортированных входных данных не требует предварительной сортировки, но ограничивается доступной памятью, поскольку записи, которые должны быть отправлены на первый выходной порт, кэшируются. Требования к оперативной памяти в несортированном режиме зависят от значений атрибута dupAmount. Меньшее количество дубликатов означает, что требуется меньше памяти.

Если вы используете несортированные входные записи, порядок выходных записей не обязательно будет таким же, как порядок входных записей.

Пример. Дедупликация сортированных записей.

Записи содержат время входов на некоторый ресурс с различных ip адресов. Записи отсортированы по времени входа. Нужно найти время первого входа для каждого ip адреса. Метаданные содержат поля «ip» и «time».

Входящие записи:

iptime
67.249.105.11811:46:12
208.25.71.8805:14:15
161.100.209.23523:12:32
161.100.209.23523:19:34
67.249.105.11815:34:09
223.78.208.18415:35:43
52.151.181.421:51:17
223.78.208.18415:38:49
161.100.209.23523:28:16

Решение:

Ключ дедупликации: dedupKey = «ip»

Исходящие записи:

iptime
67.249.105.11811:46:12
208.25.71.8805:14:15
161.100.209.23523:12:32
223.78.208.18415:35:43
52.151.181.421:51:17

Normalizer

Normalizer создает одну или несколько выходных записей из каждой отдельной входной записи. Входные записи не обязательно сортировать.

Порты Normalizer:

Тип портаНомерОбязательныйОписаниеМетаданные
Input0даДля входных записейЛюбые
Output0даДля нормализованных записейЛюбые

Атрибуты Normalizer:

АтрибутОбязательныйОписаниеВозможные значения
normalizeдаОпределение способа нормализации записей
<attr name="normalize"><![CDATA[
    function count() {
	n++;

	return length($in[0]);
    }
    function transform(integer idx) {
	$out[0].rn = n;
	$out[0].name = getFieldLabel($in[0], idx);
	$out[0].value = getStringValue($in[0], idx);
	
	return ALL;
    }
]]></attr>
charsetнетКодировка файла, в котором описано преобразование normalizecharset="UTF-8"

Для Normalizer нужно определить функцию преобразования. Преобразование должно быть определено в файле задания в артибуте normalize. Эта функция будет вызываться заданное количество раз для каждой записи, поданной на вход этому шагу.

Порядок вызова функций описан ниже.

Алгоритм нормализации записей

Алгоритм нормализации записей

Функции шага Normalizer:

integer count()

ПараметрЗначение
ОбязательныйДа
Входные параметрынет
ВозвращаетЧисло, которое определяет количество вызовов функции Transform() для каждой записи. Если функция count() возвращает 0, то последующий вызов Transform() не производится.
ВызовВызывается по одному разу для каждой входной записи.
ОписаниеОписывает количество повторений вызова функции transform()
Пример
function integer count() {
   customers = split($in.0.customers,";");
   return length(customers);
}

integer transform(integer idx)

ПараметрЗначение
ОбязательныйДа
Входные параметрыinteger idx - целые числа от 0 до count-1 (здесь count — это число, возвращаемое функцией count().
ВозвращаетЦелое число. Число соответствует ![возвращаемому значению преобразования]().
ВызовВызывается один раз для каждой выходной записи. Количество вызовов определяется возвращаемым значением функции count().
ОписаниеСоздает выходные записи.
Пример
function integer transform(integer idx) {
   myString = customers[idx];
   $out.0.OneCustomer = str2integer(myString);
   $out.0.RecordNo = $in.0.recordNo;
   $out.0.OrderWithinRecord = idx;

   return OK;
}

Пример. Преобразование записи с многозначными полями в несколько записей.

Входные записи содержат название должности и список имён сотрудников. Нужно преобразовать записи в кортежи, содержащие название должности и одно имя сотрудника.

менеджер | [Егор, Алина]
разработчик | [Артём, Никита, Данил]

Решение

Определим преобразование, используя атрибуты Normalize:

function integer count() {
    return length($in.0.users);
}

function integer transform(integer idx) {
    $out.0.group = $in.0.group;
    $out.0.user = $in.0.users[idx];
    return OK;
}

Normalizer вернёт следующие записи:

менеджер |Егор
менеджер |Алина
разработчик|Артём
разработчик|Никита
разработчик|Данил

Заметка: Если преобразование указано во внешнем файле, рекомендуется явно указать исходную кодировку в артибуте charset.

DataIntersection

DataIntersection получает отсортированные данные с двух портов, сравнивает значения ключей в обоих и обрабатывает записи следующим образом:

  • Такие входные записи, которые находятся как на входном порте 0, так и на входном порте 1, обрабатываются в соответствии с определяемым пользователем преобразованием, и результат отправляется на выходной порт 1.

  • Такие входные записи, которые находятся только на входном порте 0, отправляются без изменений на выходной порт 0.

  • Такие входные записи, которые есть только на входном порте 1, отправляются без изменений на выходной порт 2.

Записи считаются находящимися на обоих портах, если значения всех полей ключа соединения в них совпадают.

Преобразование должно быть определено, если подключён выходной порт 1.

Порты DataIntersection:

Тип портаНомерОбязательныйОписаниеМетаданные
Input0даДля входящего потока записей (поток А)

Любые2

1даДля входящего потока записей (поток B)

Любые2

Output3

0нетДля неизмененных входных записей (содержащихся только в потоке A).как на Input 0
1нетДля измененных выходных записей (содержащихся в обоих входных потоках)Любые
2нетДля неизмененных выходных записей (содержащихся только в потоке B).как на Input 1
2

Часть полей метаданных должна совпадать с полями ключа соединения.

3

Хотя бы один выходной порт из трех должен быть подключён.

Атрибуты DataIntersection:

АтрибутОбязательныйОписаниеВозможные значения
joinKeyдаКлюч соединенияjoinKey="$client_plan_gid=$client_plan_gid"
transform

4

Функция преобразования данных, определённая в графе
transformURL

4

Функция преобразования данных, определённая во внешнем файле
charsetКодировка внешнего файла, содержащего преобразование.charset="UTF-8" по умолчанию
equalNULLПо умолчанию записи с нулевыми значениями ключевых полей считаются равными. Если установлено значение false, они считаются отличными друг от друга.equalNULL="true" по умолчанию
keyDuplicatesРазрешает дублирование ключа. По умолчанию записи с нулевыми значениями ключевых полей считаются равными. Если установлено значение false, они считаются отличными друг от друга.keyDuplicates="true" по умолчанию
4

Должен быть указан хотя бы один из этих атрибутов.

Ключ соединения

Выражается как последовательность отдельных подвыражений, отделенных друг от друга точкой с запятой. Каждое подвыражение представляет собой присвоение имени поля из первого входного порта (с префиксом в виде знака доллара $) слева и имени поля из второго входного порта (с префиксом $) с правой стороны.

Заметка: Компонент может возвращать количество записей, отличное от исходного количества входных записей.
Если для параметра keyDuplicates установлено значение false, количество выходных записей может быть меньше количества входных записей, поскольку используется только первая из записей с дубликатом ключа.
Если для параметра keyDuplicates установлено значение true, количество выходных записей может быть больше, чем количество входных записей. На выходе создается декартово произведение записей, имеющих одинаковый ключ.